From 4f7217abf045445f0c52013fd70fdc6b3b0d0cc5 Mon Sep 17 00:00:00 2001 From: Dylan Knutson Date: Wed, 10 Sep 2025 15:22:03 +0000 Subject: [PATCH] multithreaded post file fingerprint creation --- .../create_post_file_fingerprints_task.rb | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/app/lib/tasks/create_post_file_fingerprints_task.rb b/app/lib/tasks/create_post_file_fingerprints_task.rb index 09ac272a..6be8affe 100644 --- a/app/lib/tasks/create_post_file_fingerprints_task.rb +++ b/app/lib/tasks/create_post_file_fingerprints_task.rb @@ -48,26 +48,46 @@ module Tasks def run_post_file_descending(start_at) last_post_file_id = get_progress(start_at)&.to_i - query = Domain::PostFile.where(state: "ok").includes(:blob, :thumbnails) + query = Domain::PostFile.where(state: "ok") query = query.where(id: ..last_post_file_id) if last_post_file_id log("counting post files to process...") # total = 49_783_962 # cache this value total = query.count pb = create_progress_bar(total) + batch_size = 16 + num_threads = 6 + mutex = Mutex.new - query.find_each( + query.find_in_batches( order: :desc, - batch_size: 32, + batch_size: batch_size * num_threads, start: last_post_file_id, - ) do |post_file| + ) do |post_files| + break if interrupted? + last_post_file = T.must(post_files.last) + + post_files + .each_slice(post_files.size / num_threads) + .map + .with_index do |batch, index| + Thread.new do + batch.each do |post_file| + break if interrupted? + migrate_post_file(post_file) + ensure + mutex.synchronize do + pb.progress = [pb.progress + 1, pb.total].min + end + end + end + end + .map(&:join) + break if interrupted? - migrate_post_file(post_file) - pb.progress = [pb.progress + 1, pb.total].min - - if pb.progress % 100 == 0 - post = post_file.post + if pb.progress % 128 == 0 + post = last_post_file.post creator_str = if post&.class&.has_creators? T.unsafe(post).creator&.to_param || "(none)" @@ -75,15 +95,15 @@ module Tasks "(none)" end post_desc = - "#{creator_str&.rjust(20)} / #{post_file.post&.to_param}".ljust(40) + "#{creator_str&.rjust(20)} / #{last_post_file.post&.to_param}".ljust( + 40, + ) log( - "post_file = #{post_file.id} :: #{post_desc} - #{post_file.post&.title_for_view}", + "post_file = #{last_post_file.id} :: #{post_desc} - #{last_post_file.post&.title_for_view}", ) - last_post_file_id = T.must(post_file.id) + last_post_file_id = T.must(last_post_file.id) save_progress(last_post_file_id.to_s) end - - break if interrupted? end save_progress(last_post_file_id.to_s) if last_post_file_id