2 Commits

Author SHA1 Message Date
Dylan Knutson
3174032ac3 rake job fixes, optimize fa favs backfill 2025-09-10 16:30:51 +00:00
Dylan Knutson
4f7217abf0 multithreaded post file fingerprint creation 2025-09-10 15:22:03 +00:00
10 changed files with 238 additions and 176 deletions

View File

@@ -25,7 +25,8 @@
"ms-azuretools.vscode-docker",
"1YiB.rust-bundle",
"rust-lang.rust-analyzer",
"saoudrizwan.claude-dev"
"saoudrizwan.claude-dev",
"ritwickdey.LiveServer"
]
}
},

View File

@@ -389,3 +389,6 @@ task compute_null_user_counter_caches: :environment do
end
end
end
puts "set proc title to #{ARGV.first}"
Process.setproctitle(ARGV.first) if $0.split("/").last == "rake"

View File

@@ -239,26 +239,36 @@ class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
user.update_fav_model(post_id:, fav_id:, explicit_time:)
end
(page_parser.submissions_parsed[1..] || [])
.filter_map do |sub_data|
post_id = (id = sub_data.id) && fa_id_to_post_id[id]
next if post_id.nil?
fav_id = sub_data.fav_id
next if fav_id.nil?
user_post_favs_with_fav_id =
(page_parser.submissions_parsed[1..] || [])
.filter_map do |sub_data|
post_id = (id = sub_data.id) && fa_id_to_post_id[id]
next if post_id.nil?
fav_id = sub_data.fav_id
next if fav_id.nil?
FavUpsertData
.new(post_id:, fav_id:)
.tap do
num_updated_with_fav_fa_id += 1
num_updated_total += 1
end
end
.group_by(&:post_id)
.values
.filter_map { |data_arr| data_arr.max_by(&:fav_id) }
.each do |data|
user.update_fav_model(post_id: data.post_id, fav_id: data.fav_id)
end
FavUpsertData
.new(post_id:, fav_id:)
.tap do
num_updated_with_fav_fa_id += 1
num_updated_total += 1
end
end
.group_by(&:post_id)
.values
.filter_map { |data_arr| data_arr.max_by(&:fav_id) }
.map do |data|
{
user_id: T.must(user.id),
post_id: data.post_id,
fa_fav_id: data.fav_id,
}
end
Domain::UserPostFav::FaUserPostFav.upsert_all(
user_post_favs_with_fav_id,
unique_by: %i[user_id post_id],
)
FavsAndDatesStats.new(
num_updated_with_fav_fa_id:,

View File

@@ -32,8 +32,8 @@ class EnqueueJobBase < Tasks::InterruptableTask
10.seconds
end
sig { void }
def run
sig { override.void }
def run_impl
@inferred_queue_size = queue_size
logger.info(
"initial queue size is #{@inferred_queue_size}, starting enqueuing",

View File

@@ -24,21 +24,30 @@ module Tasks
mode: Mode,
user_param: T.nilable(String),
start_at: T.nilable(String),
log_sink: T.any(IO, StringIO),
).void
end
def run(mode:, user_param: nil, start_at: nil)
case mode
def initialize(mode:, user_param: nil, start_at: nil, log_sink: $stderr)
super(log_sink:)
@mode = T.let(mode, Mode)
@user_param = T.let(user_param, T.nilable(String))
@start_at = T.let(start_at, T.nilable(String))
end
sig { override.void }
def run_impl
case @mode
when Mode::PostFileDescending
run_post_file_descending(start_at)
run_post_file_descending(@start_at)
when Mode::PostsDescending
run_posts_descending
when Mode::User
raise "need 'user_param' when mode is Mode::User" unless user_param
run_single_user(user_param)
raise "need 'user_param' when mode is Mode::User" unless @user_param
run_single_user(@user_param)
when Mode::UsersDescending
run_users_descending
else
T.absurd(mode)
T.absurd(@mode)
end
end
@@ -48,26 +57,46 @@ module Tasks
def run_post_file_descending(start_at)
last_post_file_id = get_progress(start_at)&.to_i
query = Domain::PostFile.where(state: "ok").includes(:blob, :thumbnails)
query = Domain::PostFile.where(state: "ok")
query = query.where(id: ..last_post_file_id) if last_post_file_id
log("counting post files to process...")
# total = 49_783_962 # cache this value
total = query.count
pb = create_progress_bar(total)
batch_size = 16
num_threads = 6
mutex = Mutex.new
query.find_each(
query.find_in_batches(
order: :desc,
batch_size: 32,
batch_size: batch_size * num_threads,
start: last_post_file_id,
) do |post_file|
) do |post_files|
break if interrupted?
last_post_file = T.must(post_files.last)
post_files
.each_slice([post_files.size / num_threads, 1].max)
.map
.with_index do |batch, index|
Thread.new do
batch.each do |post_file|
break if interrupted?
migrate_post_file(post_file)
ensure
mutex.synchronize do
pb.progress = [pb.progress + 1, pb.total].min
end
end
end
end
.map(&:join)
break if interrupted?
migrate_post_file(post_file)
pb.progress = [pb.progress + 1, pb.total].min
if pb.progress % 100 == 0
post = post_file.post
if pb.progress % 128 == 0
post = last_post_file.post
creator_str =
if post&.class&.has_creators?
T.unsafe(post).creator&.to_param || "(none)"
@@ -75,15 +104,15 @@ module Tasks
"(none)"
end
post_desc =
"#{creator_str&.rjust(20)} / #{post_file.post&.to_param}".ljust(40)
"#{creator_str&.rjust(20)} / #{last_post_file.post&.to_param}".ljust(
40,
)
log(
"post_file = #{post_file.id} :: #{post_desc} - #{post_file.post&.title_for_view}",
"post_file = #{last_post_file.id} :: #{post_desc} - #{last_post_file.post&.title_for_view}",
)
last_post_file_id = T.must(post_file.id)
last_post_file_id = T.must(last_post_file.id)
save_progress(last_post_file_id.to_s)
end
break if interrupted?
end
save_progress(last_post_file_id.to_s) if last_post_file_id

View File

@@ -88,8 +88,8 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
end
end
sig { void }
def run
sig { override.void }
def run_impl
total_stats = Stats.zero
query_string =
@@ -118,6 +118,8 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
pb = create_progress_bar(nil)
end
start_profiling!
query
.includes(:response)
.in_batches(start: @start_at, of: @batch_size) do |batch|

View File

@@ -7,6 +7,11 @@ class Tasks::Fa::MigrateFaUserPostFavs < Tasks::InterruptableTask
"fa-migrate-fa-user-post-favs"
end
sig { override.void }
def run_impl
raise "not implemented"
end
sig { params(user: Domain::User::FaUser, batch_size: Integer).void }
def run_for_user(user:, batch_size: 100)
user_faved_post_ids = user.faved_posts.pluck(:id)

View File

@@ -65,5 +65,44 @@ module Tasks
format: PB_FORMAT,
)
end
sig { void }
def run
begin
run_impl
ensure
end_profiling!
end
end
sig { abstract.void }
def run_impl
end
sig { void }
def start_profiling!
return unless ENV["PROFILE"]
@log_sink.puts "starting profiling"
RubyProf.start
end
sig { void }
def end_profiling!
return unless ENV["PROFILE"]
return unless RubyProf.running?
base = "profiler/#{progress_key}"
FileUtils.mkdir_p(base) unless File.exist?(base)
result = RubyProf.stop
File.open("#{base}/profile.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.rubyprof", "w") do |f|
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
end
@log_sink.puts "profiling results saved to #{base}"
end
end
end

View File

@@ -126,14 +126,19 @@ class Domain::User::FaUser < Domain::User
post_id: Integer,
fav_id: T.nilable(Integer),
explicit_time: T.nilable(Time),
).returns(Domain::UserPostFav)
).void
end
def update_fav_model(post_id:, fav_id: nil, explicit_time: nil)
model = self.user_post_favs.find_or_initialize_by(post_id:)
model.fa_fav_id = fav_id if fav_id.present?
model.explicit_time = explicit_time.in_time_zone if explicit_time.present?
model.save!
model
attrs = { user_id: self.id, post_id: }
attrs[:fa_fav_id] = fav_id if fav_id.present?
attrs[:explicit_time] = explicit_time.in_time_zone if explicit_time.present?
Domain::UserPostFav::FaUserPostFav.upsert(
attrs,
unique_by: %i[user_id post_id],
)
self.user_post_favs.reset
end
# TODO - write a test for this

View File

@@ -2,8 +2,19 @@
require "rails_helper"
RSpec.describe Tasks::CreatePostFileFingerprintsTask do
let(:mode) { raise("mode is required") }
let(:user_param) { nil }
let(:start_at) { nil }
let(:log_sink) { StringIO.new }
let(:task) { described_class.new(log_sink: log_sink) }
let(:task) do
described_class.new(
mode: mode,
user_param: user_param,
start_at: start_at,
log_sink: log_sink,
)
end
before do
# Mock external dependencies
@@ -28,17 +39,19 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
describe "#run" do
context "with user mode missing user_param" do
let(:mode) { Tasks::CreatePostFileFingerprintsTask::Mode::User }
it "raises an error when user mode is used without user_param" do
expect {
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: nil,
)
}.to raise_error("need 'user_param' when mode is Mode::User")
expect { task.run }.to raise_error(
"need 'user_param' when mode is Mode::User",
)
end
end
context "with PostFileDescending mode" do
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
end
let!(:post_files) do
# Create test post files with descending IDs
5
@@ -57,9 +70,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
create(:domain_post_file, post: post, state: "ok")
end
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
)
task.run
# Should have saved progress at least once
saved_progress = GlobalState.get("task-create-post-file-fingerprints")
@@ -67,20 +78,16 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
expect(saved_progress.to_i).to be > 0
end
it "starts from specified start_at ID" do
# Create post files with specific IDs
target_post_file = post_files[2] # Third post file
start_at_id = target_post_file.id.to_s
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
start_at: start_at_id,
)
expect(log_sink.string).to include("resuming from:")
context "with start_at specified" do
let(:start_at) { post_files[2].id.to_s }
it "starts from specified start_at ID" do
task.run
expect(log_sink.string).to include("resuming from:")
end
end
context "with start_at='last' and existing progress" do
let(:start_at) { "last" }
before do
# Set up saved progress
target_post_file = post_files[2]
@@ -91,22 +98,15 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
end
it "resumes from saved progress" do
task.run(
mode:
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
start_at: "last",
)
task.run
expect(log_sink.string).to include("resuming from:")
end
end
context "with start_at='last' and no existing progress" do
let(:start_at) { "last" }
it "starts from beginning when no saved progress found" do
task.run(
mode:
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
start_at: "last",
)
task.run
expect(log_sink.string).to include("no saved progress")
end
@@ -114,6 +114,9 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
end
context "with PostsDescending mode" do
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending
end
let!(:posts) do
3.times.map { |i| create(:domain_post_fa_post, fa_id: 400 + i) }
end
@@ -132,9 +135,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).exactly(3).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending,
)
task.run
end
it "uses correct progress bar total" do
@@ -147,9 +148,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
)
allow(ProgressBar).to receive(:create).and_return(progress_bar)
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending,
)
task.run
expect(ProgressBar).to have_received(:create).with(
total: 66_431_808,
@@ -161,6 +160,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
end
context "with User mode" do
let(:mode) { Tasks::CreatePostFileFingerprintsTask::Mode::User }
let!(:user) { create(:domain_user_fa_user, url_name: "testuser") }
let!(:posts) do
3.times.map do |i|
@@ -173,56 +173,54 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
posts.each { |post| create(:domain_post_file, post: post) }
end
it "processes posts for specified user" do
# Set up mocks before running the task
expect(Domain::PostFile::Thumbnail).to receive(
:create_for_post_file!,
).exactly(3).times
expect(Domain::PostFile::BitFingerprint).to receive(
:create_for_post_file!,
).exactly(3).times
context "with an existing user " do
let(:user_param) { user.to_param }
it "processes posts for specified user" do
# Set up mocks before running the task
expect(Domain::PostFile::Thumbnail).to receive(
:create_for_post_file!,
).exactly(3).times
expect(Domain::PostFile::BitFingerprint).to receive(
:create_for_post_file!,
).exactly(3).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: user.to_param,
)
task.run
expect(log_sink.string).to include(
"migrating posts for #{user.to_param}",
)
end
it "raises error when user is not found" do
expect {
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: "nonexistent_user",
expect(log_sink.string).to include(
"migrating posts for #{user.to_param}",
)
}.to raise_error(
ActionController::BadRequest,
"invalid id: \"nonexistent_user\"",
)
end
it "handles posts without creators" do
# Create a post without a creator
create(:domain_post_fa_post, creator: nil)
expect(Domain::PostFile::Thumbnail).to receive(
:create_for_post_file!,
).exactly(3).times
expect(Domain::PostFile::BitFingerprint).to receive(
:create_for_post_file!,
).exactly(3).times
task.run
end
end
it "handles posts without creators" do
# Create a post without a creator
create(:domain_post_fa_post, creator: nil)
expect(Domain::PostFile::Thumbnail).to receive(
:create_for_post_file!,
).exactly(3).times
expect(Domain::PostFile::BitFingerprint).to receive(
:create_for_post_file!,
).exactly(3).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: user.to_param,
)
context "with a nonexistent user" do
let(:user_param) { "nonexistent_user" }
it "raises error when user is not found" do
expect { task.run }.to raise_error(
ActionController::BadRequest,
"invalid id: \"nonexistent_user\"",
)
end
end
end
context "with UsersDescending mode" do
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending
end
let!(:users) do
3.times.map { |i| create(:domain_user_fa_user, url_name: "user#{i}") }
end
@@ -244,9 +242,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).exactly(3).times.and_return([])
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending,
)
task.run
users.each do |user|
expect(log_sink.string).to include(
@@ -264,9 +260,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).exactly(3).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending,
)
task.run
# Check that the migrated file was created and contains user params
if File.exist?("migrated_files.txt")
@@ -285,6 +279,10 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
end
describe "#save_progress" do
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
end
it "saves progress to GlobalState" do
task.send(:save_progress, 12_345.to_s)
@@ -295,6 +293,9 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
describe "error handling" do
let!(:post_file) { create(:domain_post_file, state: "ok") }
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
end
it "handles errors in migrate_post_file gracefully" do
# Mock the job to raise an error
@@ -302,9 +303,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).and_raise(StandardError.new("Test error"))
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
)
task.run
expect(log_sink.string).to include("error: Test error")
end
@@ -318,15 +317,17 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).and_raise(StandardError.new("Test error"))
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending,
)
task.run
expect(log_sink.string).to include("error: Test error")
end
end
describe "utility methods" do
let(:mode) do
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
end
describe "#create_progress_bar" do
it "creates a progress bar with the correct format" do
progress_bar = instance_double(ProgressBar::Base)
@@ -354,6 +355,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
end
describe "interrupt functionality" do
let(:mode) { Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending }
let!(:posts) do
3.times.map { |i| create(:domain_post_fa_post, fa_id: 400 + i) }
end
@@ -383,9 +385,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).at_most(1).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending,
)
task.run
end
it "stops processing when interrupted in post_file_descending mode" do
@@ -409,39 +409,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
:create_for_post_file!,
).at_most(1).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
)
end
it "saves progress before interruption in post_file_descending mode" do
# Create enough post files to trigger progress saving
105.times do |i|
post = create(:domain_post_fa_post, fa_id: 600 + i)
create(:domain_post_file, post: post, state: "ok")
end
# Mock the interrupt to happen after progress saving
call_count = 0
allow(task).to receive(:interrupted?) do
call_count += 1
call_count > 100 # Let it save progress first
end
expect(Domain::PostFile::Thumbnail).to receive(
:create_for_post_file!,
).at_least(1).times
expect(Domain::PostFile::BitFingerprint).to receive(
:create_for_post_file!,
).at_least(1).times
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
)
# Should have saved progress before interruption
saved_progress = GlobalState.get("task-create-post-file-fingerprints")
expect(saved_progress).to be_present
task.run
end
end
end