multithreaded post file fingerprint creation
This commit is contained in:
@@ -48,26 +48,46 @@ module Tasks
|
|||||||
def run_post_file_descending(start_at)
|
def run_post_file_descending(start_at)
|
||||||
last_post_file_id = get_progress(start_at)&.to_i
|
last_post_file_id = get_progress(start_at)&.to_i
|
||||||
|
|
||||||
query = Domain::PostFile.where(state: "ok").includes(:blob, :thumbnails)
|
query = Domain::PostFile.where(state: "ok")
|
||||||
query = query.where(id: ..last_post_file_id) if last_post_file_id
|
query = query.where(id: ..last_post_file_id) if last_post_file_id
|
||||||
|
|
||||||
log("counting post files to process...")
|
log("counting post files to process...")
|
||||||
# total = 49_783_962 # cache this value
|
# total = 49_783_962 # cache this value
|
||||||
total = query.count
|
total = query.count
|
||||||
pb = create_progress_bar(total)
|
pb = create_progress_bar(total)
|
||||||
|
batch_size = 16
|
||||||
|
num_threads = 6
|
||||||
|
mutex = Mutex.new
|
||||||
|
|
||||||
query.find_each(
|
query.find_in_batches(
|
||||||
order: :desc,
|
order: :desc,
|
||||||
batch_size: 32,
|
batch_size: batch_size * num_threads,
|
||||||
start: last_post_file_id,
|
start: last_post_file_id,
|
||||||
) do |post_file|
|
) do |post_files|
|
||||||
|
break if interrupted?
|
||||||
|
last_post_file = T.must(post_files.last)
|
||||||
|
|
||||||
|
post_files
|
||||||
|
.each_slice(post_files.size / num_threads)
|
||||||
|
.map
|
||||||
|
.with_index do |batch, index|
|
||||||
|
Thread.new do
|
||||||
|
batch.each do |post_file|
|
||||||
|
break if interrupted?
|
||||||
|
migrate_post_file(post_file)
|
||||||
|
ensure
|
||||||
|
mutex.synchronize do
|
||||||
|
pb.progress = [pb.progress + 1, pb.total].min
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
.map(&:join)
|
||||||
|
|
||||||
break if interrupted?
|
break if interrupted?
|
||||||
|
|
||||||
migrate_post_file(post_file)
|
if pb.progress % 128 == 0
|
||||||
pb.progress = [pb.progress + 1, pb.total].min
|
post = last_post_file.post
|
||||||
|
|
||||||
if pb.progress % 100 == 0
|
|
||||||
post = post_file.post
|
|
||||||
creator_str =
|
creator_str =
|
||||||
if post&.class&.has_creators?
|
if post&.class&.has_creators?
|
||||||
T.unsafe(post).creator&.to_param || "(none)"
|
T.unsafe(post).creator&.to_param || "(none)"
|
||||||
@@ -75,15 +95,15 @@ module Tasks
|
|||||||
"(none)"
|
"(none)"
|
||||||
end
|
end
|
||||||
post_desc =
|
post_desc =
|
||||||
"#{creator_str&.rjust(20)} / #{post_file.post&.to_param}".ljust(40)
|
"#{creator_str&.rjust(20)} / #{last_post_file.post&.to_param}".ljust(
|
||||||
|
40,
|
||||||
|
)
|
||||||
log(
|
log(
|
||||||
"post_file = #{post_file.id} :: #{post_desc} - #{post_file.post&.title_for_view}",
|
"post_file = #{last_post_file.id} :: #{post_desc} - #{last_post_file.post&.title_for_view}",
|
||||||
)
|
)
|
||||||
last_post_file_id = T.must(post_file.id)
|
last_post_file_id = T.must(last_post_file.id)
|
||||||
save_progress(last_post_file_id.to_s)
|
save_progress(last_post_file_id.to_s)
|
||||||
end
|
end
|
||||||
|
|
||||||
break if interrupted?
|
|
||||||
end
|
end
|
||||||
|
|
||||||
save_progress(last_post_file_id.to_s) if last_post_file_id
|
save_progress(last_post_file_id.to_s) if last_post_file_id
|
||||||
|
|||||||
Reference in New Issue
Block a user