better job dedup key computation

This commit is contained in:
Dylan Knutson
2025-06-26 17:59:32 +00:00
parent e3b2463cbe
commit 3a06181db8
9 changed files with 255 additions and 50 deletions

View File

@@ -293,6 +293,8 @@ task run_fa_user_avatar_jobs: :environment do
end
task create_post_file_fingerprints: %i[environment set_logger_stdout] do
PB_FORMAT = "%B %c/%C (%r/sec) %J%% %a %E"
def migrate_posts_for_user(user)
puts "migrating posts for #{user.to_param}"
posts = user.posts.includes(files: %i[blob thumbnails bit_fingerprints])
@@ -302,7 +304,7 @@ task create_post_file_fingerprints: %i[environment set_logger_stdout] do
total: posts.count,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
format: PB_FORMAT,
)
posts.find_in_batches(batch_size: 64) do |batch|
@@ -341,7 +343,7 @@ task create_post_file_fingerprints: %i[environment set_logger_stdout] do
total:,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
format: PB_FORMAT,
)
i = 0
Domain::PostFile
@@ -354,7 +356,11 @@ task create_post_file_fingerprints: %i[environment set_logger_stdout] do
) do |post_file|
i += 1
if i % 100 == 0
puts "migrating #{post_file.id} / #{post_file.post.title_for_view}"
post_desc =
"#{post_file.post.creator&.to_param&.rjust(20)} / #{post_file.post&.to_param}".ljust(
40,
)
puts "post_file = #{post_file.id} :: #{post_desc} - #{post_file.post.title_for_view}"
end
migrate_post_file(post_file)
pb.progress = [pb.progress + 1, pb.total].min
@@ -367,7 +373,7 @@ task create_post_file_fingerprints: %i[environment set_logger_stdout] do
total:,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
format: PB_FORMAT,
)
Domain::Post.find_each(order: :desc) do |post|
migrate_post(post) unless post.is_a?(Domain::Post::InkbunnyPost)

View File

@@ -121,7 +121,11 @@ class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::Base
if file.url_str_changed?
file.enqueue_job_after_save(
Domain::Fa::Job::ScanFileJob,
{ file:, caused_by_entry: causing_log_entry },
{
file:,
caused_by_entry: causing_log_entry,
caused_by_job_id: self.job_id,
},
)
end
file.save!

View File

@@ -133,7 +133,8 @@ class Scraper::JobBase < ApplicationJob
sig_arguments =
arguments[0]
.reject { |key, value| ignore_signature_args.include?(key.to_sym) }
.sort_by { |key, value| key.to_sym }
.map { |key, value| [key.to_sym, value.to_param] }
.sort_by { |key, value| key }
sig = []
sig << self.class.name
@@ -141,6 +142,7 @@ class Scraper::JobBase < ApplicationJob
sig << self.priority || "*"
sig << Digest::SHA256.hexdigest(sig_arguments.inspect)[0...16]
sig = sig.join("|")
sig
end,
)
@@ -162,6 +164,14 @@ class Scraper::JobBase < ApplicationJob
PERMITTED_CONTENT_TYPES =
T.let([%r{text/html}, %r{application/json}], T::Array[Regexp])
thread_mattr_accessor :current_scraper_job
around_perform do |job, block|
Scraper::JobBase.current_scraper_job = job
block.call
ensure
Scraper::JobBase.current_scraper_job = nil
end
# Delay a little bit on Net::ReadTimeout or Errno::ECONNREFUSED
around_perform do |job, block|
block.call

View File

@@ -51,12 +51,18 @@ class Domain::Fa::EnqueueUnscannedOkPosts < EnqueueJobBase
end
.to_h
ReduxApplicationRecord.transaction do
posts.each do |post|
process_post(post, hle_map)
processed += 1
pb.progress = [processed, pb.total].min
end
posts.each do |post|
post_desc =
"#{(post.creator&.to_param || "(none)").rjust(20)} / #{post.to_param}".ljust(
40,
)
process_post(post, hle_map)
puts "migrate post :: #{post_desc}" if processed % 10 == 0
rescue StandardError
puts "error processing post :: #{post_desc}"
ensure
processed += 1
pb.progress = [processed, pb.total].min
end
end
end
@@ -78,16 +84,12 @@ class Domain::Fa::EnqueueUnscannedOkPosts < EnqueueJobBase
raise("uri mismatch")
end
puts "existing log entry for #{post_id_str(post)}"
post.scanned_at = hle.requested_at
post.last_submission_log_entry = hle
post.posted_at = post.posted_at if post.attributes["posted_at"].nil?
post.save!
else
enqueue do
puts "enqueued #{post_id_str(post)}"
Domain::Fa::Job::ScanPostJob.perform_later({ post: })
end
enqueue { Domain::Fa::Job::ScanPostJob.perform_later({ post: }) }
end
end

View File

@@ -7,7 +7,11 @@ class ReduxApplicationRecord < ActiveRecord::Base
# hack to make sorbet recognize the `@after_save_deferred_jobs` instance variable
sig { params(attributes: T.untyped).void }
def initialize(attributes = nil)
@after_save_deferred_jobs = T.let(nil, T.nilable(T::Array[DeferredJob]))
@after_save_deferred_jobs =
T.let(
nil,
T.nilable(T::Array[[DeferredJob, T.nilable(Scraper::JobBase)]]),
)
super(attributes)
end
@@ -41,10 +45,7 @@ class ReduxApplicationRecord < ActiveRecord::Base
sig { returns(T::Array[String]) }
def attributes_for_inspect
super.reject do |attr_name|
attr_name.start_with?("json_attributes-") #||
# attr_name.ends_with?("_backup")
end
super.reject { |attr_name| attr_name.start_with?("json_attributes-") }
end
sig do
@@ -56,19 +57,34 @@ class ReduxApplicationRecord < ActiveRecord::Base
end
def enqueue_job_after_save(job_class, params, set_args = {})
@after_save_deferred_jobs ||= []
@after_save_deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
@after_save_deferred_jobs << [
DeferredJob.new(job_class:, params:, set_args:),
Scraper::JobBase.current_scraper_job,
]
end
after_save do
T.bind(self, ReduxApplicationRecord)
@after_save_deferred_jobs ||= T.let([], T.nilable(T::Array[DeferredJob]))
@after_save_deferred_jobs ||=
T.let([], T.nilable(T::Array[[DeferredJob, T.nilable(Scraper::JobBase)]]))
GoodJob::Bulk.enqueue do
@after_save_deferred_jobs.each do |deferred_job|
@after_save_deferred_jobs.each do |deferred_job, current_job|
job_params = deferred_job.params
if current_job
job_params.merge!(
caused_by_entry: current_job.causing_log_entry,
caused_by_job_id: current_job.job_id,
)
end
job =
deferred_job
.job_class
.set(deferred_job.set_args)
.perform_later(deferred_job.params)
.perform_later(job_params)
if job
logger.info(
"[class: #{self.class.name}][id: #{id}][enqueued job: #{deferred_job.job_class.name}][job_id: #{job.job_id}]",

View File

@@ -166,22 +166,28 @@ namespace :fa do
desc "Get 404 files from FurArchiver"
task get_404_files_from_fur_archiver: :set_logger_stdout do
query =
Domain::PostFile
.joins(:post)
.for_post_type(Domain::Post::FaPost)
.where(state: "terminal_error", last_status_code: 404)
.where(
"((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) IS NULL OR ((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) != TRUE",
)
url_name = ENV["url_name"]
# query =
# Domain::User
# .find_by_param("fa@wolfsparta")
# .posts
# .flat_map do |post|
# post.files.where(state: "terminal_error", last_status_code: 404)
# end
if url_name
query =
Domain::User
.find_by_param("fa@#{url_name}")
.posts
.flat_map do |post|
post.files.where(state: "terminal_error", last_status_code: 404)
end
method = :each
else
query =
Domain::PostFile
.joins(:post)
.for_post_type(Domain::Post::FaPost)
.where(state: "terminal_error", last_status_code: 404)
.where(
"((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) IS NULL OR ((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) != TRUE",
)
method = :find_each
end
puts "counting..."
total = query.count
@@ -189,7 +195,7 @@ namespace :fa do
pb = ProgressBar.create(total: total, format: "%t: %c/%C %B %p%% %a %e")
counter = 0
query.find_each do |post_file|
query.send(method) do |post_file|
next if post_file.url_str.include?("/stories/")
Job::FaPostFurArchiverPostFileJob.perform_now({ post_file: })
post = post_file.post

View File

@@ -147,9 +147,7 @@ describe Domain::Fa::Job::ScanPostJob do
post = Domain::Post::FaPost.find_by(fa_id: 59_714_213)
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanFileJob)).to match(
array_including(
{ file: post.file, caused_by_entry: @log_entries.first },
),
[{ file: post.file, caused_by_entry: @log_entries.first }],
)
end
end

View File

@@ -2,6 +2,46 @@
require "rails_helper"
describe HasBulkEnqueueJobs do
class TestJob < Scraper::JobBase
ignore_signature_args :test_arg_ignored
include HasColorLogger
def self.http_factory_method
:get_test_http_client
end
def perform(args)
raise("no arguments: #{args.inspect}") if args.empty?
if (post = args[:post])
post = args[:post]
description = args[:description]
post.description = description
post.save!
elsif args[:defer_enqueue_job]
defer_job(TestJob2, { job_arg: args[:defer_enqueue_job] })
elsif args[:defer_after_save]
new_post = Domain::Post::FaPost.new(fa_id: 12_345)
new_post.enqueue_job_after_save(
TestJob2,
{ job_arg: args[:defer_after_save] },
)
new_post.save!
end
end
end
class TestJob2 < Scraper::JobBase
include HasColorLogger
def self.http_factory_method
:get_test_http_client
end
def perform(args)
end
end
let(:test_class) do
Class.new do
include HasBulkEnqueueJobs
@@ -40,6 +80,95 @@ describe HasBulkEnqueueJobs do
expect(SpecUtil.enqueued_jobs.length).to eq(1)
end
it "does not enqueue duplicate jobs" do
post_1_first_load = create(:domain_post_fa_post)
caused_by_entry = create(:http_log_entry)
job_id =
TestJob.perform_later(
{ post: post_1_first_load, caused_by_entry: },
).job_id
expect(SpecUtil.enqueued_job_args(include_job_id: true)).to eq(
[{ post: post_1_first_load, caused_by_entry:, job_id: }],
)
# should not enqueue another job with same post and caused_by_entry
post_1_second_load = Domain::Post::FaPost.find(post_1_first_load.id)
TestJob.perform_later({ post: post_1_second_load, caused_by_entry: })
expect(SpecUtil.enqueued_job_args(include_job_id: true)).to eq(
[{ post: post_1_first_load, caused_by_entry:, job_id: }],
)
# lack of caused_by_entry should not change the concurrency key
TestJob.perform_later({ post: post_1_first_load })
expect(SpecUtil.enqueued_job_args(include_job_id: true)).to eq(
[{ post: post_1_first_load, caused_by_entry:, job_id: }],
)
# different post should cause a new job to be enqueued
post_2 = create(:domain_post_fa_post)
job_id_2 = TestJob.perform_later({ post: post_2, caused_by_entry: }).job_id
expect(SpecUtil.enqueued_job_args(include_job_id: true)).to eq(
[
{ post: post_1_first_load, caused_by_entry:, job_id: },
{ post: post_2, caused_by_entry:, job_id: job_id_2 },
],
)
end
it "correctly performs jobs" do
post = create(:domain_post_fa_post)
job = TestJob.perform_later({ post:, description: "the post changed" })
expect(SpecUtil.enqueued_job_args(include_job_id: true)).to eq(
[{ post:, description: "the post changed", job_id: job.job_id }],
)
expect { SpecUtil.perform_jobs }.to change { post.reload.description }.from(
"Test description",
).to("the post changed")
expect(SpecUtil.enqueued_job_args()).to eq([])
end
it "defer_job propagates caused_by_entry and caused_by_job_id" do
caused_by_entry = create(:http_log_entry)
job =
TestJob.perform_later(defer_enqueue_job: "deferred job", caused_by_entry:)
SpecUtil.perform_jobs(limit: 1)
expect(SpecUtil.enqueued_job_args(include_caused_by_job: true)).to eq(
[
{
job_arg: "deferred job",
caused_by_entry:,
caused_by_job_id: job.job_id,
},
],
)
end
it "enqueue_job_after_save propagates caused_by_entry and caused_by_job_id" do
caused_by_entry = create(:http_log_entry)
job =
TestJob.perform_later(
defer_after_save: "deferred after save",
caused_by_entry:,
)
SpecUtil.perform_jobs(limit: 1)
expect(SpecUtil.enqueued_job_args(include_caused_by_job: true)).to eq(
[
{
job_arg: "deferred after save",
caused_by_entry:,
caused_by_job_id: job.job_id,
},
],
)
end
it "works with no enqueued jobs" do
inst = test_class.new
inst.bulk_enqueue_jobs {}

View File

@@ -64,6 +64,7 @@ class SpecUtil
end
def self.enqueued_jobs(job_class = nil)
GoodJob::Job
.where(performed_at: nil)
.order(created_at: :asc)
.all
.map do |job|
@@ -82,12 +83,23 @@ class SpecUtil
end
sig do
params(job_class: T.nilable(T.class_of(Scraper::JobBase))).returns(
T::Array[T.untyped],
)
params(
job_class: T.nilable(T.class_of(Scraper::JobBase)),
include_job_id: T::Boolean,
include_caused_by_job: T::Boolean,
).returns(T::Array[T.untyped])
end
def self.enqueued_job_args(job_class = nil)
enqueued_jobs(job_class).map { |job| job[:args][0] }
def self.enqueued_job_args(
job_class = nil,
include_job_id: false,
include_caused_by_job: false
)
enqueued_jobs(job_class).map do |job|
args = job[:args][0]
args.merge!(job_id: job[:good_job].id) if include_job_id
args.delete(:caused_by_job_id) unless include_caused_by_job
args
end
end
sig do
@@ -122,4 +134,26 @@ class SpecUtil
job_class == job[:job]
end
end
sig { params(limit: T.nilable(Integer)).void }
def self.perform_jobs(limit: nil)
count = 0
loop do
break if limit && count >= limit
job =
GoodJob::CurrentThread.within do
GoodJob::JobPerformer
.new("*")
.send(:job_query)
.perform_with_advisory_lock(lock_id: "1234") do |execution|
GoodJob::CurrentThread.job = execution
end
end
break unless job
raise job.handled_error if job.handled_error
count += 1
end
end
end