Files
redux-scraper/app/jobs/scraper/job_base.rb
2025-02-25 05:47:44 +00:00

279 lines
7.8 KiB
Ruby

# typed: strict
class Scraper::JobBase < ApplicationJob
# used to store the last job execution (GoodJob::Execution)
thread_mattr_accessor :last_good_job_execution
abstract!
ignore_signature_args :caused_by_entry
class JobError < RuntimeError
end
class WrappedHttpClient
extend T::Sig
sig { params(job: Scraper::JobBase, http_client: Scraper::HttpClient).void }
def initialize(job, http_client)
@job = job
@http_client = http_client
end
sig { params(url: String).returns(Scraper::HttpClient::Response) }
def get(url)
around_request(
proc do
@http_client.get(
url,
caused_by_entry: @job.causing_log_entry,
use_http_cache: @job.use_http_cache?,
)
end,
)
end
sig { params(url: String).returns(Scraper::HttpClient::Response) }
def post(url)
around_request(
proc do
@http_client.post(
url,
caused_by_entry: @job.causing_log_entry,
use_http_cache: @job.use_http_cache?,
)
end,
)
end
private
sig do
params(proc: T.proc.returns(Scraper::HttpClient::Response)).returns(
Scraper::HttpClient::Response,
)
end
def around_request(proc)
response = proc.call
@job.first_log_entry ||= response.log_entry
@job.last_log_entry = response.log_entry
response
end
end
sig { params(args: T.untyped).void }
def initialize(*args)
super(*T.unsafe(args))
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@gallery_dl_client = T.let(nil, T.nilable(Scraper::GalleryDlClient))
@first_log_entry = T.let(nil, T.nilable(HttpLogEntry))
@last_log_entry = T.let(nil, T.nilable(HttpLogEntry))
end
sig { abstract.returns(Symbol) }
def self.http_factory_method
end
sig { returns(WrappedHttpClient) }
def http_client
@http_client ||= Scraper::ClientFactory.send(self.class.http_factory_method)
WrappedHttpClient.new(self, @http_client)
end
sig { returns(Scraper::GalleryDlClient) }
def gallery_dl_client
@gallery_dl_client ||= Scraper::ClientFactory.get_gallery_dl_client
end
sig { abstract.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
end
sig { returns(T::Boolean) }
def force_scan?
!!arguments[0][:force_scan]
end
sig { returns(T::Boolean) }
def use_http_cache?
!!arguments[0][:use_http_cache]
end
# The log entry that caused this job to be enqueued.
sig { returns(T.nilable(HttpLogEntry)) }
def caused_by_entry
arguments[0][:caused_by_entry]
end
# The primary log entry for this job. Typically, this is the first request
# that was performed by this job.
sig { returns(T.nilable(HttpLogEntry)) }
attr_accessor :first_log_entry
# The last log entry for this job.
sig { returns(T.nilable(HttpLogEntry)) }
attr_accessor :last_log_entry
# The log entry considered to be the cause of jobs that this job enqueues.
sig { returns(T.nilable(HttpLogEntry)) }
def causing_log_entry
first_log_entry || caused_by_entry
end
good_job_control_concurrency_with(
total_limit: 1,
key:
proc do
T.bind(self, ApplicationJob)
if arguments.size != 1
raise("wrong number of arguments: #{arguments.inspect}")
end
# collect all ignore_signature_args from all superclasses
ignore_signature_args =
T.let(%i[_aj_symbol_keys _aj_ruby2_keywords], T::Array[Symbol])
klass = T.let(self.class, T.class_of(ApplicationJob))
loop do
ignore_signature_args += klass.ignore_signature_args
if (superklass = klass.superclass) && superklass < ApplicationJob
klass = superklass
else
break
end
end
sig_arguments =
arguments[0]
.reject { |key, value| ignore_signature_args.include?(key.to_sym) }
.sort_by { |key, value| key.to_sym }
sig = []
sig << self.class.name
sig << self.queue_name || "*"
sig << self.priority || "*"
sig << Digest::SHA256.hexdigest(sig_arguments.inspect)[0...16]
sig = sig.join("|")
sig
end,
)
# make the concurrency config threadlocal so it can be modified
# per-thread temporarily for HasGoodJobSuppressConcurrency
gjcc = self.good_job_concurrency_config.dup
@@gjcc_tl ||=
T.let(
Concurrent::ThreadLocalVar.new { gjcc.dup },
T.nilable(Concurrent::ThreadLocalVar),
)
sig { returns(T.untyped) }
def self.good_job_concurrency_config
@@gjcc_tl&.value
end
PERMITTED_CONTENT_TYPES =
T.let([%r{text/html}, %r{application/json}], T::Array[Regexp])
# Delay a little bit on Net::ReadTimeout or Errno::ECONNREFUSED
around_perform do |job, block|
block.call
rescue Net::ReadTimeout, Errno::ECONNREFUSED => e
logger.error "#{e.class.name} - sleep for a bit"
sleep rand(2.0..5.0)
raise e
end
# Collect log lines from the job
around_perform do |job, block|
log_lines = T.let([], T::Array[String])
ColorLogger.log_line_accumulator = proc { |line| log_lines << line }
block.call
ensure
begin
job.enqueue_deferred_jobs!
ColorLogger.log_line_accumulator = nil
log_lines = T.must(log_lines)
good_job = GoodJob::CurrentThread.job
last_execution = Scraper::JobBase.last_good_job_execution
if good_job && last_execution && good_job == last_execution.job &&
log_lines.any?
Scraper::JobBase.last_good_job_execution = nil
last_execution.create_log_lines_collection!(log_lines: log_lines)
end
rescue StandardError => e
logger.error("error collecting log lines: #{e.class.name} - #{e.message}")
end
end
# Log job metrics
around_perform do |job, block|
Scraper::Metrics::JobBaseMetrics.observe_job_start(job_class: self.class)
success = T.let(false, T::Boolean)
start_time = T.let(Time.now, Time)
ret = block.call
success = true
ret
ensure
duration_ms = (1000 * (Time.now - T.must(start_time))).to_i
Scraper::Metrics::JobBaseMetrics.observe_job_finish(
job_class: self.class,
time_ms: duration_ms,
success: T.must(success),
)
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig { void }
def enqueue_deferred_jobs!
GoodJob::Bulk.enqueue do
@deferred_jobs.each do |deferred_job|
args = deferred_job.params.merge({ caused_by_entry: causing_log_entry })
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: self.class,
enqueued_class: deferred_job.job_class,
)
if job
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
(make_tag("job_id", job.job_id)),
"enqueue deferred job",
),
)
end
end
rescue StandardError => e
logger.error("error enqueueing jobs: #{e.class.name} - #{e.message}")
end
end
sig { params(msg: T.untyped).returns(T.noreturn) }
def fatal_error(msg)
logger.error(msg)
raise JobError, msg.uncolorize
end
class DateHelper
extend ActionView::Helpers::DateHelper
end
DATE_HELPER = DateHelper
sig { params(maybe_time: T.untyped).returns(String) }
def time_ago_in_words(maybe_time)
return "never".bold if maybe_time.nil?
"#{DATE_HELPER.time_ago_in_words(maybe_time)} ago".bold
end
end