GoodJob argument rendering, fix e621 post scanning, manual GoodJob execution by job class

This commit is contained in:
Dylan Knutson
2025-02-13 19:21:28 +00:00
parent a09ddfa32b
commit e8f5b2ee92
16 changed files with 316 additions and 58 deletions

116
Rakefile
View File

@@ -231,3 +231,119 @@ task fix_buggy_fa_posts: :environment do
# binding.pry
end
end
task fix_e621_post_files: :environment do
query = Domain::Post::E621Post.where(state: "ok").where.missing(:files)
puts "query: #{query.to_sql}"
query.find_each(batch_size: 1) do |post|
puts "post: #{post.id} / #{post.e621_id}"
Domain::E621::Job::ScanPostJob.perform_now(post: post)
break
end
end
task perform_good_jobs: :environment do
job_class = ENV["job_class"]
job_id = ENV["job_id"]
limit = ENV["limit"]&.to_i
if !job_id.present? && !job_class.present?
raise "need 'job_id' or 'job_class'"
end
relation =
if job_id
GoodJob::Job.where(id: job_id)
else
GoodJob::Job.where(
job_class: job_class,
finished_at: nil,
error: nil,
performed_at: nil,
)
end
relation.find_each(batch_size: 1) do |job|
job = T.cast(job, GoodJob::Job)
# Get the actual job instance and deserialize arguments
serialized_args = job.serialized_params["arguments"]
if serialized_args.nil?
puts "No arguments found for job #{job.id}"
next
end
deserialized_args = ActiveJob::Arguments.deserialize(serialized_args)
job_instance = job.job_class.constantize.new
job_instance.deserialize(job.serialized_params)
puts "Running job #{job.id} (#{job.job_class})"
puts "Arguments: #{deserialized_args.inspect}"
# Create execution record
execution =
GoodJob::Execution.create!(
active_job_id: job.active_job_id,
job_class: job.job_class,
queue_name: job.queue_name,
serialized_params: job.serialized_params,
scheduled_at: job.scheduled_at,
created_at: Time.current,
updated_at: Time.current,
process_id: SecureRandom.uuid,
)
start_time = Time.current
# Temporarily disable concurrency limits
job_class = job.job_class.constantize
old_config = job_class.good_job_concurrency_config
job_class.good_job_concurrency_config = { total_limit: nil }
begin
# Perform the job with deserialized arguments
GoodJob::CurrentThread.job = job
job.update!(performed_at: Time.current)
job_instance.arguments = deserialized_args
job_instance.perform_now
# Update execution and job records
execution.update!(
finished_at: Time.current,
error: nil,
error_event: nil,
duration: Time.current - start_time,
)
job.update!(finished_at: Time.current)
puts "Job completed successfully"
rescue => e
puts "Job failed: #{e.message}"
# Update execution and job records with error
execution.update!(
finished_at: Time.current,
error: e.message,
error_event: "execution_failed",
error_backtrace: e.backtrace,
duration: Time.current - start_time,
)
job.update!(
error: "#{e.class}: #{e.message}",
error_event: "execution_failed",
)
raise e
ensure
# Restore original concurrency config
job_class.good_job_concurrency_config = old_config
GoodJob::CurrentThread.job = nil
end
if limit
limit -= 1
if limit.zero?
puts "limit reached"
break
end
end
end
end