noramlizer class hierarchy

This commit is contained in:
Dylan Knutson
2025-07-10 20:24:56 +00:00
parent 91a16e12a1
commit 2a8d618a84

View File

@@ -20,21 +20,13 @@ namespace :stats do
records_array = StatsHelpers.sample_records(max_points)
# Create base normalizer for display ranges
base_normalizer = DataNormalizer.new(records_array)
base_normalizer = LinearNormalizer.new(records_array)
puts "📈 X-axis range (fav_fa_id): #{base_normalizer.x_range}"
puts "📈 Y-axis range (date): #{base_normalizer.y_range}"
# Run regressions using specialized normalizers
results = RegressionAnalyzer.new(records_array).analyze
# Define regression types for reuse across display and plotting
regressions = [
["Linear", results.linear],
["Quadratic", results.quadratic],
["Logarithmic", results.logarithmic],
["Square Root", results.square_root],
]
regressions = RegressionAnalyzer.new(records_array).analyze
# Display results (automatically denormalized)
regressions.each do |name, result|
@@ -119,54 +111,56 @@ module StatsHelpers
end
end
class AxisRange < T::ImmutableStruct
extend T::Sig
const :min, Float
const :max, Float
sig { returns(Float) }
def scale
max - min
end
sig { returns(T::Range[Float]) }
def range
min..max
end
sig { params(value: Float).returns(Float) }
def normalize(value)
(value - min) / scale
end
sig { params(value: Float).returns(Float) }
def denormalize(value)
value * scale + min
end
sig do
params(
mapper: T.nilable(T.proc.params(arg: Float).returns(String)),
).returns(String)
end
def as_string(&mapper)
mapper ||= ->(x) { x }
"#{mapper.call(min)} to #{mapper.call(max)}"
end
end
# Base class for data normalization and denormalization
class DataNormalizer
extend T::Sig
extend T::Helpers
abstract!
class Range < T::ImmutableStruct
extend T::Sig
const :min, Float
const :max, Float
sig { returns(Float) }
def scale
max - min
end
sig { returns(T::Range[Float]) }
def range
min..max
end
sig { params(value: Float).returns(Float) }
def normalize(value)
(value - min) / scale
end
sig { params(value: Float).returns(Float) }
def denormalize(value)
value * scale + min
end
sig do
params(
mapper: T.nilable(T.proc.params(arg: Float).returns(String)),
).returns(String)
end
def as_string(&mapper)
mapper ||= ->(x) { x }
"#{mapper.call(min)} to #{mapper.call(max)}"
end
end
sig { returns(T::Array[Float]) }
sig(:final) { returns(T::Array[Float]) }
attr_reader :x_values
sig { returns(T::Array[Float]) }
sig(:final) { returns(T::Array[Float]) }
attr_reader :y_values
sig { params(records: T::Array[Domain::FaFavIdAndDate]).void }
sig(:final) { params(records: T::Array[Domain::FaFavIdAndDate]).void }
def initialize(records)
data_points =
records.map do |record|
@@ -183,54 +177,54 @@ class DataNormalizer
# Calculate min/max for normalization
x_minmax = T.cast(@x_values.minmax, [Float, Float])
y_minmax = T.cast(@y_values.minmax, [Float, Float])
@x = T.let(Range.new(min: x_minmax[0], max: x_minmax[1]), Range)
@y = T.let(Range.new(min: y_minmax[0], max: y_minmax[1]), Range)
@x = T.let(AxisRange.new(min: x_minmax[0], max: x_minmax[1]), AxisRange)
@y = T.let(AxisRange.new(min: y_minmax[0], max: y_minmax[1]), AxisRange)
end
sig { returns(String) }
sig(:final) { returns(String) }
def x_range
@x.as_string
end
sig { returns(String) }
sig(:final) { returns(String) }
def y_range
@y.as_string { |x| Time.at(x) }
end
# Accessors for equation classes
sig { returns(Float) }
sig(:final) { returns(Float) }
def x_scale
@x.scale
end
sig { returns(Float) }
sig(:final) { returns(Float) }
def y_scale
@y.scale
end
sig { returns(Float) }
sig(:final) { returns(Float) }
def x_min
@x.min
end
sig { returns(Float) }
sig(:final) { returns(Float) }
def y_min
@y.min
end
# Convert raw data to normalized [0,1] scale for Rumale
sig { returns(T::Array[T::Array[Float]]) }
sig(:final) { returns(T::Array[T::Array[Float]]) }
def normalized_x_matrix
@x_values.map { |x| [@x.normalize(x)] }
end
sig { returns(T::Array[Float]) }
sig(:final) { returns(T::Array[Float]) }
def normalized_y_vector
@y_values.map { |y| @y.normalize(y) }
end
# Generate regression line points in original scale
sig { returns(T::Array[Float]) }
sig(:final) { returns(T::Array[Float]) }
def regression_x_range
step_size = @x.scale / 50.0
@x.range.step(step_size).to_a
@@ -242,25 +236,26 @@ class DataNormalizer
normalized_x_matrix
end
protected
sig { returns(Range) }
attr_reader :x, :y
# Abstract method for denormalizing regression results
sig do
abstract
.params(regression_x: T::Array[Float], weight_vec: T::Array[Float])
.returns(T::Array[Float])
end
def denormalize_regression(regression_x, weight_vec)
end
end
# Linear regression specific normalizer
class LinearNormalizer < DataNormalizer
extend T::Sig
# Denormalize linear regression results back to original scale
sig do
params(
regression_x: T::Array[Float],
norm_slope: Float,
norm_intercept: Float,
).returns(T::Array[Float])
override
.params(regression_x: T::Array[Float], weight_vec: T::Array[Float])
.returns(T::Array[Float])
end
def denormalize_regression(regression_x, norm_slope, norm_intercept)
def denormalize_regression(regression_x, weight_vec)
norm_slope = T.cast(weight_vec[1], Float)
norm_intercept = T.cast(weight_vec[0], Float)
regression_x.map do |x|
x_norm = @x.normalize(x)
y_norm = norm_slope * x_norm + norm_intercept
@@ -280,20 +275,17 @@ class LinearNormalizer < DataNormalizer
end
end
# Quadratic regression specific normalizer
class QuadraticNormalizer < DataNormalizer
extend T::Sig
# Denormalize quadratic regression results back to original scale
sig do
params(
regression_x: T::Array[Float],
norm_a: Float,
norm_b: Float,
norm_c: Float,
).returns(T::Array[Float])
override
.params(regression_x: T::Array[Float], weight_vec: T::Array[Float])
.returns(T::Array[Float])
end
def denormalize_regression(regression_x, norm_a, norm_b, norm_c)
def denormalize_regression(regression_x, weight_vec)
norm_a = T.cast(weight_vec[2], Float)
norm_b = T.cast(weight_vec[1], Float)
norm_c = T.cast(weight_vec[0], Float)
regression_x.map do |x|
x_norm = @x.normalize(x)
y_norm = norm_a * x_norm * x_norm + norm_b * x_norm + norm_c
@@ -319,6 +311,8 @@ end
# where f(x) is a transformation function and denormalization only requires y-scaling
class TransformedNormalizer < DataNormalizer
extend T::Sig
extend T::Helpers
abstract!
# Denormalize coefficients for simple transformations (only y-scaling needed)
sig do
@@ -333,13 +327,13 @@ class TransformedNormalizer < DataNormalizer
# Common denormalization logic using the transformation function
sig do
params(
regression_x: T::Array[Float],
norm_slope: Float,
norm_intercept: Float,
).returns(T::Array[Float])
override
.params(regression_x: T::Array[Float], weight_vec: T::Array[Float])
.returns(T::Array[Float])
end
def denormalize_regression(regression_x, norm_slope, norm_intercept)
def denormalize_regression(regression_x, weight_vec)
norm_slope = T.cast(weight_vec[1], Float)
norm_intercept = T.cast(weight_vec[0], Float)
regression_x.map do |x|
# y = a * f(x) + b, where coefficients are in normalized space
y_norm = norm_slope * transform_x(x) + norm_intercept
@@ -350,9 +344,8 @@ class TransformedNormalizer < DataNormalizer
protected
# Abstract method for applying the transformation function
sig { params(x: Float).returns(Float) }
sig { abstract.params(x: Float).returns(Float) }
def transform_x(x)
raise NotImplementedError, "Subclasses must implement transform_x"
end
end
@@ -369,7 +362,7 @@ class LogarithmicNormalizer < TransformedNormalizer
protected
# Apply logarithmic transformation
sig { params(x: Float).returns(Float) }
sig { override.params(x: Float).returns(Float) }
def transform_x(x)
Math.log(x)
end
@@ -388,7 +381,7 @@ class SquareRootNormalizer < TransformedNormalizer
protected
# Apply square root transformation
sig { params(x: Float).returns(Float) }
sig { override.params(x: Float).returns(Float) }
def transform_x(x)
Math.sqrt(x)
end
@@ -483,8 +476,11 @@ class PolynomialEquation < Equation
end
end
class LogarithmicEquation < Equation
# Base class for transformed equations that follow y = a * f(x) + b pattern
class TransformedEquation < Equation
extend T::Sig
extend T::Helpers
abstract!
sig do
params(
@@ -506,34 +502,34 @@ class LogarithmicEquation < Equation
slope_orig = @norm_slope * @normalizer.y_scale
intercept_orig = @norm_intercept * @normalizer.y_scale + @normalizer.y_min
"y = #{format_number(slope_orig)} * ln(x) + #{format_number(intercept_orig)}"
"y = #{format_number(slope_orig)} * #{transform_symbol("x")} + #{format_number(intercept_orig)}"
end
# Abstract method for the transformation symbol
sig { abstract.params(x: String).returns(String) }
def transform_symbol(x)
end
end
class SquareRootEquation < Equation
class LogarithmicEquation < TransformedEquation
extend T::Sig
sig do
params(
normalizer: DataNormalizer,
norm_slope: Float,
norm_intercept: Float,
).void
end
def initialize(normalizer, norm_slope, norm_intercept)
super(normalizer)
@norm_slope = norm_slope
@norm_intercept = norm_intercept
end
protected
sig { returns(String) }
def format_equation
slope_orig = @norm_slope * @normalizer.y_scale
intercept_orig = @norm_intercept * @normalizer.y_scale + @normalizer.y_min
sig { override.params(x: String).returns(String) }
def transform_symbol(x)
"ln(#{x})"
end
end
"y = #{format_number(slope_orig)} * √x + #{format_number(intercept_orig)}"
class SquareRootEquation < TransformedEquation
extend T::Sig
protected
sig { override.params(x: String).returns(String) }
def transform_symbol(x)
"#{x}"
end
end
@@ -571,74 +567,43 @@ class RegressionAnalyzer
@records = records
end
sig { returns(AnalysisResults) }
sig { returns(T::Array[[String, RegressionResult]]) }
def analyze
AnalysisResults.new(
linear: analyze_linear,
quadratic: analyze_quadratic,
logarithmic: analyze_logarithmic,
square_root: analyze_square_root,
)
[
[
"Linear",
analyze_regression(LinearNormalizer, PolynomialEquation, degree: 1),
],
[
"Quadratic",
analyze_regression(QuadraticNormalizer, PolynomialEquation, degree: 2),
],
[
"Logarithmic",
analyze_regression(LogarithmicNormalizer, LogarithmicEquation),
],
[
"Square Root",
analyze_regression(SquareRootNormalizer, SquareRootEquation),
],
]
end
private
sig { returns(RegressionResult) }
def analyze_linear
normalizer = LinearNormalizer.new(@records)
x_matrix = normalizer.normalized_x_matrix
y_vector = normalizer.normalized_y_vector
regression_x = normalizer.regression_x_range
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 1)
regressor = Rumale::LinearModel::LinearRegression.new
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
transformer: poly_features,
estimator: regressor,
},
)
pipeline.fit(x_matrix, y_vector)
# Extract normalized coefficients
weight_vec = pipeline.steps[:estimator].weight_vec
norm_intercept = weight_vec[0]
norm_slope = weight_vec[1]
r_squared = pipeline.score(x_matrix, y_vector)
# Generate regression line data in original scale
linear_y =
normalizer.denormalize_regression(
regression_x,
norm_slope,
norm_intercept,
)
# Denormalize coefficients for equation display
coefficients =
normalizer.denormalize_coefficients(norm_intercept, norm_slope)
RegressionResult.new(
equation: PolynomialEquation.new(normalizer, coefficients),
r_squared: r_squared,
x_values: regression_x,
y_values: linear_y,
)
# Generic regression analysis method to eliminate duplication
sig do
params(
normalizer_class: T.class_of(DataNormalizer),
equation_class: T.class_of(Equation),
degree: Integer,
).returns(RegressionResult)
end
sig { returns(RegressionResult) }
def analyze_quadratic
normalizer = QuadraticNormalizer.new(@records)
x_matrix = normalizer.normalized_x_matrix
y_vector = normalizer.normalized_y_vector
def analyze_regression(normalizer_class, equation_class, degree: 1)
normalizer = normalizer_class.new(@records)
regression_x = normalizer.regression_x_range
# Use pipeline approach as recommended in documentation
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 2)
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree:)
regressor = Rumale::LinearModel::LinearRegression.new(fit_bias: true)
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
@@ -648,116 +613,77 @@ class RegressionAnalyzer
)
# Fit the pipeline
x_matrix = normalizer.transformed_x_matrix
y_vector = normalizer.normalized_y_vector
pipeline.fit(x_matrix, y_vector)
r_squared = pipeline.score(x_matrix, y_vector)
weight_vec = pipeline.steps[:estimator].weight_vec
norm_c = weight_vec[0] # constant term
norm_b = weight_vec[1] # x coefficient
norm_a = weight_vec[2] # x² coefficient
weight_vec = pipeline.steps[:estimator].weight_vec.to_a
# Generate regression line data in original scale
quadratic_y =
normalizer.denormalize_regression(regression_x, norm_a, norm_b, norm_c)
regression_y =
generate_regression_line(normalizer, regression_x, weight_vec)
# Denormalize coefficients for equation display
coefficients = normalizer.denormalize_coefficients(norm_c, norm_b, norm_a)
# Create equation object
equation = create_equation(equation_class, normalizer, weight_vec)
RegressionResult.new(
equation: PolynomialEquation.new(normalizer, coefficients),
equation: equation,
r_squared: r_squared,
x_values: regression_x,
y_values: quadratic_y,
y_values: regression_y,
)
end
sig { returns(RegressionResult) }
def analyze_logarithmic
normalizer = LogarithmicNormalizer.new(@records)
y_vector = normalizer.normalized_y_vector
regression_x = normalizer.regression_x_range
# Transform x values using natural log for logarithmic regression
# y = a * ln(x) + b
log_x_matrix = normalizer.transformed_x_matrix
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 1)
regressor = Rumale::LinearModel::LinearRegression.new
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
transformer: poly_features,
estimator: regressor,
},
)
# Fit the regression on log-transformed x values
pipeline.fit(log_x_matrix, y_vector)
r_squared = pipeline.score(log_x_matrix, y_vector)
# Extract coefficients (same pattern as linear regression)
weight_vec = pipeline.steps[:estimator].weight_vec
norm_intercept = weight_vec[0]
norm_slope = weight_vec[1]
# Generate regression line data in original scale
logarithmic_y =
normalizer.denormalize_regression(
regression_x,
norm_slope,
norm_intercept,
)
RegressionResult.new(
equation: LogarithmicEquation.new(normalizer, norm_slope, norm_intercept),
r_squared: r_squared,
x_values: regression_x,
y_values: logarithmic_y,
)
# Generate regression line using appropriate denormalization method
sig do
params(
normalizer: DataNormalizer,
regression_x: T::Array[Float],
weight_vec: T::Array[Float],
).returns(T::Array[Float])
end
def generate_regression_line(normalizer, regression_x, weight_vec)
normalizer.denormalize_regression(regression_x, weight_vec)
end
sig { returns(RegressionResult) }
def analyze_square_root
normalizer = SquareRootNormalizer.new(@records)
y_vector = normalizer.normalized_y_vector
regression_x = normalizer.regression_x_range
# Transform x values using square root for square root regression
# y = a * √x + b
sqrt_x_matrix = normalizer.transformed_x_matrix
poly_features = Rumale::Preprocessing::PolynomialFeatures.new(degree: 1)
regressor = Rumale::LinearModel::LinearRegression.new
pipeline =
Rumale::Pipeline::Pipeline.new(
steps: {
transformer: poly_features,
estimator: regressor,
},
# Create appropriate equation object based on type
sig do
params(
equation_class: T.class_of(Equation),
normalizer: DataNormalizer,
weight_vec: T::Array[Float],
).returns(Equation)
end
def create_equation(equation_class, normalizer, weight_vec)
if equation_class == PolynomialEquation
case normalizer
when LinearNormalizer
coefficients =
normalizer.denormalize_coefficients(
T.cast(weight_vec[0], Float),
T.cast(weight_vec[1], Float),
)
when QuadraticNormalizer
coefficients =
normalizer.denormalize_coefficients(
T.cast(weight_vec[0], Float),
T.cast(weight_vec[1], Float),
T.cast(weight_vec[2], Float),
)
else
raise "Unsupported normalizer for PolynomialEquation: #{normalizer.class}"
end
PolynomialEquation.new(normalizer, coefficients)
elsif equation_class == LogarithmicEquation ||
equation_class == SquareRootEquation
equation_class.new(
normalizer,
T.cast(weight_vec[1], Float),
T.cast(weight_vec[0], Float),
)
# Fit the regression on square root transformed x values
pipeline.fit(sqrt_x_matrix, y_vector)
r_squared = pipeline.score(sqrt_x_matrix, y_vector)
# Extract coefficients (same pattern as other regressions)
weight_vec = pipeline.steps[:estimator].weight_vec
norm_intercept = weight_vec[0]
norm_slope = weight_vec[1]
# Generate regression line data in original scale
square_root_y =
normalizer.denormalize_regression(
regression_x,
norm_slope,
norm_intercept,
)
RegressionResult.new(
equation: SquareRootEquation.new(normalizer, norm_slope, norm_intercept),
r_squared: r_squared,
x_values: regression_x,
y_values: square_root_y,
)
else
raise "Unsupported equation class: #{equation_class}"
end
end
end