Files
redux-scraper/app/lib/deferred_job_sink.rb

89 lines
2.5 KiB
Ruby

# typed: strict
# frozen_string_literal: true
class DeferredJobSink
extend T::Sig
include HasColorLogger
sig { params(source_class: T.untyped).void }
def initialize(source_class)
@suppressed_jobs = T.let(Set.new, T::Set[SuppressedJob])
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@source_class = source_class
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
).void
end
def suppress_deferred_job(job_class, params)
ignore_args = job_class.gather_ignore_signature_args
params_cleared =
params.reject { |key, value| ignore_args.include?(key.to_sym) }
!!@suppressed_jobs.add?(
SuppressedJob.new(job_class:, params: params_cleared),
)
end
sig do
params(
caused_by_entry: T.nilable(HttpLogEntry),
caused_by_job_id: T.nilable(String),
).void
end
def enqueue_deferred_jobs!(caused_by_entry = nil, caused_by_job_id = nil)
jobs_to_enqueue =
@deferred_jobs.filter_map do |deferred_job|
if @suppressed_jobs.any? { |suppressed_job|
if suppressed_job.matches?(deferred_job)
logger.info(
"suppressing deferred job #{deferred_job.job_class.name} with params #{deferred_job.describe_params}",
)
true
end
}
nil
else
deferred_job
end
end
GoodJob::Bulk.enqueue do
jobs_to_enqueue.each do |deferred_job|
args =
deferred_job.params.merge({ caused_by_entry:, caused_by_job_id: })
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: @source_class,
enqueued_class: deferred_job.job_class,
)
if job
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
(make_tag("job_id", job.job_id)),
"enqueue deferred job",
),
)
end
end
rescue StandardError => e
logger.error("error enqueueing jobs: #{e.class.name} - #{e.message}")
end
end
end