first pass at stats.rake

This commit is contained in:
Dylan Knutson
2025-07-10 19:24:41 +00:00
parent 542e38b35a
commit 6c086ac9cc
37 changed files with 17958 additions and 20 deletions

526
lib/tasks/stats.rake Normal file
View File

@@ -0,0 +1,526 @@
# typed: strict
# frozen_string_literal: true
T.bind(self, T.all(Rake::DSL, Object))
require "unicode_plot"
require "rumale"
require "rumale/linear_model/linear_regression"
require "rumale/preprocessing/polynomial_features"
require "rumale/pipeline/pipeline"
namespace :stats do
desc "Generate graphs of FaFavIdAndDate models with linear and quadratic regression lines. Usage: rake stats:fa_fav_graph[max_points]"
task :fa_fav_graph, [:max_points] => :environment do |task, args|
puts "🔍 Analyzing FaFavIdAndDate data..."
# Parse max_points parameter (default to no limit)
max_points = args[:max_points]&.to_i
# Query and sample data
records_array = StatsHelpers.sample_records(max_points)
# Create normalizer with raw data
normalizer = DataNormalizer.new(records_array)
puts "📈 X-axis range (fav_fa_id): #{normalizer.x_range}"
puts "📈 Y-axis range (date): #{normalizer.y_range}"
# Run regressions using normalized data
results = RegressionAnalyzer.new(normalizer).analyze
# Display results (automatically denormalized)
puts "\n📊 Linear Regression Results:"
puts " #{results.linear.equation}"
puts " R² = #{StatsHelpers.format_r_squared(results.linear.r_squared)}"
puts "\n📊 Quadratic Regression Results:"
puts " #{results.quadratic.equation}"
puts " R² = #{StatsHelpers.format_r_squared(results.quadratic.r_squared)}"
# Generate visualizations
puts "\n🎨 Generating visualizations with UnicodePlot..."
plotter = StatsPlotter.new
plotter.plot_scatter(
"Original Data",
normalizer.x_values,
normalizer.y_values,
)
plotter.plot_regression("Linear Regression", results.linear)
plotter.plot_regression("Quadratic Regression", results.quadratic)
plotter.plot_combined(normalizer.x_values, normalizer.y_values, results)
puts "\n✅ Graph generation completed!"
end
end
# Helper methods extracted to avoid private method issues in Rake context
module StatsHelpers
extend T::Sig
sig do
params(max_points: T.nilable(Integer)).returns(
T::Array[Domain::FaFavIdAndDate],
)
end
def self.sample_records(max_points)
records = Domain::FaFavIdAndDate.complete
if records.empty?
puts "❌ No complete FaFavIdAndDate records found"
exit 1
end
total_records = records.count
puts "📊 Found #{total_records} complete records"
records = records.select(:id, :fav_fa_id, :date)
records_array = records.to_a
if max_points && total_records > max_points
puts "🎲 Randomly sampling #{max_points} points from #{total_records} total records"
srand(42) # Fixed seed for reproducibility
records_array =
T.cast(
records_array.sample(max_points),
T::Array[Domain::FaFavIdAndDate],
)
puts "📊 Using #{records_array.length} sampled records for analysis"
else
message =
(
if max_points
"within max_points limit of #{max_points}"
else
"no sampling limit specified"
end
)
puts "📊 Using all #{records_array.length} records (#{message})"
end
records_array
end
sig { params(value: Float).returns(Float) }
def self.format_r_squared(value)
value.round(3).to_f
end
end
# Handles data normalization and denormalization to prevent numerical instability
class DataNormalizer
extend T::Sig
class Range < T::ImmutableStruct
extend T::Sig
const :min, Float
const :max, Float
sig { returns(Float) }
def scale
max - min
end
sig { returns(T::Range[Float]) }
def range
min..max
end
sig { params(value: Float).returns(Float) }
def normalize(value)
(value - min) / scale
end
sig { params(value: Float).returns(Float) }
def denormalize(value)
value * scale + min
end
sig do
params(
mapper: T.nilable(T.proc.params(arg: Float).returns(String)),
).returns(String)
end
def as_string(&mapper)
mapper ||= ->(x) { x }
"#{mapper.call(min)} to #{mapper.call(max)}"
end
end
sig { returns(T::Array[Float]) }
attr_reader :x_values
sig { returns(T::Array[Float]) }
attr_reader :y_values
sig { params(records: T::Array[Domain::FaFavIdAndDate]).void }
def initialize(records)
data_points =
records.map do |record|
{
x: record.fav_fa_id.to_f,
y: T.cast(record.date&.to_time&.to_i&.to_f, Float),
}
end
data_points.sort_by! { |point| point[:x] }
@x_values = T.let(data_points.map { |p| p[:x] }, T::Array[Float])
@y_values = T.let(data_points.map { |p| p[:y] }, T::Array[Float])
# Calculate min/max for normalization
x_minmax = T.cast(@x_values.minmax, [Float, Float])
y_minmax = T.cast(@y_values.minmax, [Float, Float])
@x = T.let(Range.new(min: x_minmax[0], max: x_minmax[1]), Range)
@y = T.let(Range.new(min: y_minmax[0], max: y_minmax[1]), Range)
end
sig { returns(String) }
def x_range
@x.as_string
end
sig { returns(String) }
def y_range
@y.as_string { |x| Time.at(x) }
end
# Convert raw data to normalized [0,1] scale for Rumale
sig { returns(T::Array[T::Array[Float]]) }
def normalized_x_matrix
@x_values.map { |x| [@x.normalize(x)] }
end
sig { returns(T::Array[Float]) }
def normalized_y_vector
@y_values.map { |y| @y.normalize(y) }
end
# Generate regression line points in original scale
sig { returns(T::Array[Float]) }
def regression_x_range
step_size = @x.scale / 50.0
@x.range.step(step_size).to_a
end
# Denormalize linear regression results back to original scale
sig do
params(
regression_x: T::Array[Float],
norm_slope: Float,
norm_intercept: Float,
).returns(T::Array[Float])
end
def denormalize_linear(regression_x, norm_slope, norm_intercept)
regression_x.map do |x|
x_norm = @x.normalize(x)
y_norm = norm_slope * x_norm + norm_intercept
@y.denormalize(y_norm)
end
end
# Denormalize quadratic regression results back to original scale
sig do
params(
regression_x: T::Array[Float],
norm_a: Float,
norm_b: Float,
norm_c: Float,
).returns(T::Array[Float])
end
def denormalize_quadratic(regression_x, norm_a, norm_b, norm_c)
regression_x.map do |x|
x_norm = @x.normalize(x)
y_norm = norm_a * x_norm * x_norm + norm_b * x_norm + norm_c
@y.denormalize(y_norm)
end
end
# Generate equation strings with coefficients in original scale
sig { params(norm_slope: Float, norm_intercept: Float).returns(String) }
def linear_equation(norm_slope, norm_intercept)
slope_orig = norm_slope * @y.scale / @x.scale
intercept_orig = (norm_intercept * @y.scale + @y.min) - slope_orig * @x.min
"y = #{polynomial_equation([slope_orig, intercept_orig])}"
end
sig { params(norm_a: Float, norm_b: Float, norm_c: Float).returns(String) }
def quadratic_equation(norm_a, norm_b, norm_c)
a_orig = norm_a * @y.scale / (@x.scale * @x.scale)
b_orig = norm_b * @y.scale / @x.scale - 2 * a_orig * @x.min
c_orig =
(norm_c * @y.scale + @y.min) - b_orig * @x.min - a_orig * @x.min * @x.min
"y = #{polynomial_equation([a_orig, b_orig, c_orig])}"
end
# Convert array of coefficients into polynomial equation string
sig { params(coefficients: T::Array[Float]).returns(String) }
def polynomial_equation(coefficients)
terms =
coefficients.each_with_index.map do |coeff, power|
next if coeff.zero?
term = format_number(coeff)
case power
when 0
term
when 1
"#{term}x"
else
"#{term}x#{power.to_s.tr("0123456789", "⁰¹²³⁴⁵⁶⁷⁸⁹")}"
end
end
terms.compact.reverse.join(" + ").gsub("+ -", "- ")
end
# Format a number with significant figures and scientific notation when needed
sig { params(num: Float, sig_figs: Integer).returns(String) }
def format_number(num, sig_figs = 3)
# Handle zero case
return "0.0" if num.zero?
# Get order of scale
order = Math.log10(num.abs).floor
# Use scientific notation for very large or small numbers
if order >= 6 || order <= -3
# Scale number between 1 and 10
scaled = num / (10.0**order)
# Round to sig figs
rounded = scaled.round(sig_figs - 1)
"#{rounded}e#{order}"
else
# For normal range numbers, just round to appropriate decimal places
decimal_places = sig_figs - (order + 1)
decimal_places = 0 if decimal_places < 0
num.round(decimal_places).to_s
end
end
end
# Immutable struct representing a single regression analysis result
class RegressionResult < T::ImmutableStruct
extend T::Sig
const :equation, String
const :r_squared, Float
const :x_values, T::Array[Float]
const :y_values, T::Array[Float]
end
# Immutable struct representing the complete analysis results
class AnalysisResults < T::ImmutableStruct
extend T::Sig
const :linear, RegressionResult
const :quadratic, RegressionResult
end
# Handles regression analysis using Rumale with normalized data
class RegressionAnalyzer
extend T::Sig
sig { params(normalizer: DataNormalizer).void }
def initialize(normalizer)
@normalizer = normalizer
end
sig { returns(AnalysisResults) }
def analyze
# Use normalized data for Rumale calculations to prevent numerical instability
x_matrix = @normalizer.normalized_x_matrix
y_vector = @normalizer.normalized_y_vector
regression_x = @normalizer.regression_x_range
AnalysisResults.new(
linear: analyze_linear(x_matrix, y_vector, regression_x),
quadratic: analyze_quadratic(x_matrix, y_vector, regression_x),
)
end
private
sig do
params(
x_matrix: T::Array[T::Array[Float]],
y_vector: T::Array[Float],
regression_x: T::Array[Float],
).returns(RegressionResult)
end
def analyze_linear(x_matrix, y_vector, regression_x)
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 1)
regressor = Rumale::LinearModel::LinearRegression.new
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
transformer: poly_features,
estimator: regressor,
},
)
pipeline.fit(x_matrix, y_vector)
# Extract normalized coefficients
weight_vec = pipeline.steps[:estimator].weight_vec
norm_intercept = weight_vec[0]
norm_slope = weight_vec[1]
r_squared = pipeline.score(x_matrix, y_vector)
# Generate regression line data in original scale
linear_y =
@normalizer.denormalize_linear(regression_x, norm_slope, norm_intercept)
RegressionResult.new(
equation: @normalizer.linear_equation(norm_slope, norm_intercept),
r_squared: r_squared,
x_values: regression_x,
y_values: linear_y,
)
end
sig do
params(
x_matrix: T::Array[T::Array[Float]],
y_vector: T::Array[Float],
regression_x: T::Array[Float],
).returns(RegressionResult)
end
def analyze_quadratic(x_matrix, y_vector, regression_x)
# Use pipeline approach as recommended in documentation
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 2)
regressor = Rumale::LinearModel::LinearRegression.new(fit_bias: true)
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
transformer: poly_features,
estimator: regressor,
},
)
# Fit the pipeline
pipeline.fit(x_matrix, y_vector)
r_squared = pipeline.score(x_matrix, y_vector)
weight_vec = pipeline.steps[:estimator].weight_vec
norm_c = weight_vec[0] # constant term
norm_b = weight_vec[1] # x coefficient
norm_a = weight_vec[2] # x² coefficient
# Generate regression line data in original scale
quadratic_y =
@normalizer.denormalize_quadratic(regression_x, norm_a, norm_b, norm_c)
RegressionResult.new(
equation: @normalizer.quadratic_equation(norm_a, norm_b, norm_c),
r_squared: r_squared,
x_values: regression_x,
y_values: quadratic_y,
)
end
end
# Simplified plotting class with extracted common functionality
class StatsPlotter
extend T::Sig
sig do
params(
title: String,
x_values: T::Array[Float],
y_values: T::Array[Float],
).void
end
def plot_scatter(title, x_values, y_values)
plot_with_error_handling(title) do
UnicodePlot.scatterplot(
x_values,
y_values,
title: title,
width: 80,
height: 20,
xlabel: "fav_fa_id",
ylabel: date_axis_label(y_values),
)
end
end
sig { params(title: String, result: RegressionResult).void }
def plot_regression(title, result)
subtitle = "#{title.split.first} fit (R² = #{result.r_squared.round(3)})"
plot_with_error_handling("#{title} - #{subtitle}") do
UnicodePlot.lineplot(
result.x_values,
result.y_values,
title: title,
width: 80,
height: 20,
xlabel: "fav_fa_id",
ylabel: date_axis_label(result.y_values),
)
end
end
sig do
params(
x_values: T::Array[Float],
y_values: T::Array[Float],
results: AnalysisResults,
).void
end
def plot_combined(x_values, y_values, results)
plot_with_error_handling("📈 Combined Visualization:") do
# Base scatter plot
plot =
UnicodePlot.scatterplot(
x_values,
y_values,
title: "FaFavIdAndDate Analysis: Original Data vs Regression Models",
name: "Original Data",
width: 100,
height: 25,
xlabel: "fav_fa_id",
ylabel: date_axis_label(y_values),
)
# Add regression lines
UnicodePlot.lineplot!(
plot,
results.linear.x_values,
results.linear.y_values,
name: "Linear (R²=#{results.linear.r_squared.round(3)})",
)
UnicodePlot.lineplot!(
plot,
results.quadratic.x_values,
results.quadratic.y_values,
name: "Quadratic (R²=#{results.quadratic.r_squared.round(3)})",
)
plot
end
end
private
sig { params(y_values: T::Array[Float]).returns(String) }
def date_axis_label(y_values)
y_min, y_max = y_values.minmax
start_date = Time.at(y_min).strftime("%Y-%m-%d")
end_date = Time.at(y_max).strftime("%Y-%m-%d")
"Date (#{start_date} to #{end_date})"
end
sig { params(title: String, block: T.proc.returns(T.untyped)).void }
def plot_with_error_handling(title, &block)
puts "\n#{title}"
begin
plot = block.call
puts plot.render
rescue LoadError
puts "⚠️ UnicodePlot gem not available. Install with: gem install unicode_plot"
rescue => e
puts "⚠️ Error generating plot: #{e.message}"
end
end
end