add fa factor calculator

This commit is contained in:
Dylan Knutson
2023-03-31 18:31:49 +09:00
parent a2c3262d1e
commit d19acdeacd
12 changed files with 144 additions and 13 deletions

View File

@@ -0,0 +1,13 @@
class Domain::Fa::BulkJob
include HasColorLogger
def measure(title)
now = Time.now
ret = yield
duration = Time.now - now
duration_ms = (1000 * duration).to_i
title = title.call(ret) if title.respond_to?(:call)
logger.info "#{title} - #{duration_ms.to_s.bold} ms"
ret
end
end

View File

@@ -0,0 +1,46 @@
class Domain::Fa::FactorCalculator < Domain::Fa::BulkJob
def initialize
@recommender = Disco::Recommender.new(
factors: Domain::Fa::UserFactor.neighbor_attributes[:follows][:dimensions],
)
end
def fit
logger.info "loading follow rows..."
dataset = measure(proc { |r| "loaded #{r.length.to_s.bold} follows" }) do
Domain::Fa::Follow.all.pluck(:follower_id, :followed_id).map do |id1, id2|
{ user_id: id1, item_id: id2 }
end
end
measure("fit #{dataset.length.to_s.bold} follows") do
@recommender.fit(dataset)
end
measure("optimize recs") do
@recommender.optimize_item_recs
end
end
def write_factors
total = 0
logger.info "writing #{@recommender.item_ids.length.to_s.bold} factor models"
@recommender.item_ids.map do |item_id|
{
user_id: item_id,
follows: @recommender.item_factors(item_id),
}
end.each_slice(10000) do |chunk|
total += chunk.size
measure("wrote chunk of #{chunk.size.to_s.bold} - (#{total.to_s.bold} total)") do
Domain::Fa::UserFactor.upsert_all(
chunk,
unique_by: :user_id,
update_only: %i[follows],
returning: %i[id],
)
end
end
end
end

View File

@@ -1,4 +1,4 @@
class FaPostEnqueuer
class Domain::Fa::PostEnqueuer
include HasColorLogger
def initialize(start_at:, low_water_mark:, high_water_mark:)

View File

@@ -1,4 +1,4 @@
class FaUserEnqueuer
class Domain::Fa::UserEnqueuer
include HasColorLogger
def initialize(start_at:, low_water_mark:, high_water_mark:)

View File

@@ -75,6 +75,28 @@ class Domain::Fa::User < ReduxApplicationRecord
user
end
# users similar to this one by the set of users that follow this
def similar_users_by_followed(exclude_already_followed = nil)
disco_query = similar_users_disco_by_followed(exclude_already_followed)
# include the 'neighbor_distance' field, already computed by disco
Domain::Fa::User.
select("domain_fa_users.*", disco_query.select_values.last).
joins(:disco).
merge(disco_query.reselect(:user_id))
end
def similar_users_disco_by_followed(exclude_already_followed = nil)
query = self.
disco.
nearest_neighbors(:follows, distance: "euclidean")
if exclude_already_followed
query = query.where.not(user_id: exclude_already_followed.follows.select(:followed_id))
end
query
end
def self.find_or_build_from_submission_parser(submission_parser)
unless submission_parser.is_a?(Domain::Fa::Parser::ListedSubmissionParserHelper) ||
submission_parser.is_a?(Domain::Fa::Parser::SubmissionParserHelper)

View File

@@ -2,5 +2,5 @@ class Domain::Fa::UserFactor < ReduxApplicationRecord
self.table_name = "domain_fa_user_factors"
belongs_to :user, class_name: "::Domain::Fa::User"
has_neighbors :factors, dimensions: 32
has_neighbors :follows, dimensions: 32
end

View File

@@ -1,11 +1,12 @@
class CreateDomainFaUserFactorsTable < ActiveRecord::Migration[7.0]
def change
create_table :domain_fa_user_factors do |t|
t.references :user
t.vector :factors, limit: 20
t.references :user, index: { unique: true }, null: false
t.vector :follows, limit: 32
t.timestamps
end
add_index :domain_fa_user_factors, :follows, using: :ivfflat, opclass: :vector_l2_ops
add_foreign_key :domain_fa_user_factors, :domain_fa_users, column: :user_id, primary_key: :id, validate: true
end
end

7
db/schema.rb generated
View File

@@ -138,11 +138,12 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_31_023807) do
end
create_table "domain_fa_user_factors", force: :cascade do |t|
t.bigint "user_id"
t.vector "factors", limit: 20
t.bigint "user_id", null: false
t.vector "follows", limit: 32
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["user_id"], name: "index_domain_fa_user_factors_on_user_id"
t.index ["follows"], name: "index_domain_fa_user_factors_on_follows", using: :ivfflat
t.index ["user_id"], name: "index_domain_fa_user_factors_on_user_id", unique: true
end
create_table "domain_fa_users", force: :cascade do |t|

View File

@@ -6,7 +6,7 @@ namespace :fa do
high_water_mark = 300
poll_duration = 10
enqueuer = FaPostEnqueuer.new(
enqueuer = Domain::Fa::PostEnqueuer.new(
start_at: start_at,
low_water_mark: low_water_mark,
high_water_mark: high_water_mark,
@@ -24,7 +24,7 @@ namespace :fa do
high_water_mark = 100
poll_duration = 10
enqueuer = FaUserEnqueuer.new(
enqueuer = Domain::Fa::UserEnqueuer.new(
start_at: start_at,
low_water_mark: low_water_mark,
high_water_mark: high_water_mark,
@@ -35,6 +35,13 @@ namespace :fa do
end
end
desc "calculate user follow factors"
task :calculate_follow_factors => [:set_logger_stdout, :environment] do
worker = Domain::Fa::FactorCalculator.new
worker.fit
worker.write_factors
end
desc "Import existing FA posts"
task :import_existing, [:start_at] => [:environment] do |t, args|
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i

View File

@@ -0,0 +1,41 @@
require "rails_helper"
describe Domain::Fa::FactorCalculator do
before do
setup_follows = proc do |follower, users|
users.each do |user|
Domain::Fa::Follow.create!(follower: follower, followed: user)
end
end
@cluster1 = 10.times.map do |i|
SpecUtil.create_domain_fa_user(name: "cluster-1-#{i}")
end
@cluster2 = 10.times.map do |i|
SpecUtil.create_domain_fa_user(name: "cluster-2-#{i}")
end
@follower1, @follower2, @follower3 = 3.times.map do |i|
SpecUtil.create_domain_fa_user(name: "follower-#{i + 1}")
end
setup_follows.call(@follower1, @cluster1)
setup_follows.call(@follower2, @cluster2)
setup_follows.call(@follower3, @cluster1[0...5] + @cluster2[0...5])
end
it "works" do
worker = Domain::Fa::FactorCalculator.new
worker.fit
worker.write_factors
c1user = @cluster1.first
# should be able to query similar users
nearest = c1user.similar_users_by_followed.limit(5)
expect(@cluster1).to include(nearest[0])
expect(nearest).to_not include(c1user)
nearest.each do |user|
expect(user.neighbor_distance).to be >= 0.0
end
end
end

View File

@@ -1,4 +1,4 @@
describe FaPostEnqueuer do
describe Domain::Fa::PostEnqueuer do
it "works" do
ActiveJob::Base.queue_adapter = :test
creator = SpecUtil.build_domain_fa_user
@@ -9,7 +9,7 @@ describe FaPostEnqueuer do
end
end.map(&:fa_id)
enqueuer = FaPostEnqueuer.new(
enqueuer = Domain::Fa::PostEnqueuer.new(
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,

View File

@@ -40,7 +40,7 @@ describe Domain::Fa::UserFactor do
# calculate the recommender
recommender = Disco::Recommender.new(
factors: Domain::Fa::UserFactor.neighbor_attributes[:factors][:dimensions],
factors: Domain::Fa::UserFactor.neighbor_attributes[:follows][:dimensions],
)
query = Enumerator.new do |e|
Domain::Fa::Follow.all.find_each do |follow|