Files
redux-scraper/app/jobs/scraper/job_base.rb
2025-07-25 00:25:12 +00:00

344 lines
9.4 KiB
Ruby

# typed: strict
class Scraper::JobBase < ApplicationJob
# used to store the last job execution (GoodJob::Execution)
thread_mattr_accessor :last_good_job_execution
abstract!
ignore_signature_args :caused_by_entry, :caused_by_job_id
class JobError < RuntimeError
end
class WrappedHttpClient
extend T::Sig
sig { params(job: Scraper::JobBase, http_client: Scraper::HttpClient).void }
def initialize(job, http_client)
@job = job
@http_client = http_client
end
sig do
params(url: T.any(String, Addressable::URI)).returns(
Scraper::HttpClient::Response,
)
end
def get(url)
around_request(
proc do
@http_client.get(
url.to_s,
caused_by_entry: @job.causing_log_entry,
use_http_cache: @job.use_http_cache?,
)
end,
)
end
sig do
params(url: T.any(String, Addressable::URI)).returns(
Scraper::HttpClient::Response,
)
end
def post(url)
around_request(
proc do
@http_client.post(
url.to_s,
caused_by_entry: @job.causing_log_entry,
use_http_cache: @job.use_http_cache?,
)
end,
)
end
private
sig do
params(proc: T.proc.returns(Scraper::HttpClient::Response)).returns(
Scraper::HttpClient::Response,
)
end
def around_request(proc)
response = proc.call
@job.first_log_entry ||= response.log_entry
@job.last_log_entry = response.log_entry
response
end
end
sig { params(args: T.untyped).void }
def initialize(*args)
super(*T.unsafe(args))
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@suppressed_jobs = T.let(Set.new, T::Set[SuppressedJob])
@http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@tor_http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@gallery_dl_client = T.let(nil, T.nilable(Scraper::GalleryDlClient))
@first_log_entry = T.let(nil, T.nilable(HttpLogEntry))
@last_log_entry = T.let(nil, T.nilable(HttpLogEntry))
end
sig { abstract.returns(Symbol) }
def self.http_factory_method
end
sig { returns(WrappedHttpClient) }
def http_client
@http_client ||= Scraper::ClientFactory.send(self.class.http_factory_method)
WrappedHttpClient.new(self, @http_client)
end
sig { returns(WrappedHttpClient) }
def tor_http_client
@tor_http_client ||= Scraper::ClientFactory.get_tor_http_client
WrappedHttpClient.new(self, @tor_http_client)
end
sig { returns(Scraper::GalleryDlClient) }
def gallery_dl_client
@gallery_dl_client ||= Scraper::ClientFactory.get_gallery_dl_client
end
sig { abstract.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
end
sig { returns(T::Boolean) }
def force_scan?
!!arguments[0][:force_scan]
end
sig { returns(T::Boolean) }
def use_http_cache?
!!arguments[0][:use_http_cache]
end
# The log entry that caused this job to be enqueued.
sig { returns(T.nilable(HttpLogEntry)) }
def caused_by_entry
arguments[0][:caused_by_entry]
end
sig { returns(Domain::UserAvatar) }
def avatar_from_args!
T.cast(arguments[0][:avatar], Domain::UserAvatar)
end
sig { returns(Domain::PostFile) }
def post_file_from_args!
T.cast(arguments[0][:file], Domain::PostFile)
end
# The primary log entry for this job. Typically, this is the first request
# that was performed by this job.
sig { returns(T.nilable(HttpLogEntry)) }
attr_accessor :first_log_entry
# The last log entry for this job.
sig { returns(T.nilable(HttpLogEntry)) }
attr_accessor :last_log_entry
# The log entry considered to be the cause of jobs that this job enqueues.
sig { returns(T.nilable(HttpLogEntry)) }
def causing_log_entry
first_log_entry || caused_by_entry
end
sig { returns(HttpLogEntry) }
def causing_log_entry!
T.must(causing_log_entry)
end
good_job_control_concurrency_with(
total_limit: 1,
key:
proc do
T.bind(self, Scraper::JobBase)
if arguments.size != 1
raise("wrong number of arguments: #{arguments.inspect}")
end
ignore_signature_args = self.class.gather_ignore_signature_args
sig_arguments =
arguments[0]
.reject { |key, value| ignore_signature_args.include?(key.to_sym) }
.map { |key, value| [key.to_sym, value.to_param] }
.sort_by { |key, value| key }
sig = []
sig << self.class.name
sig << self.queue_name || "*"
sig << self.priority || "*"
sig << Digest::SHA256.hexdigest(sig_arguments.inspect)[0...16]
sig = sig.join("|")
sig
end,
)
# make the concurrency config threadlocal so it can be modified
# per-thread temporarily for HasGoodJobSuppressConcurrency
gjcc = self.good_job_concurrency_config.dup
@@gjcc_tl ||=
T.let(
Concurrent::ThreadLocalVar.new { gjcc.dup },
T.nilable(Concurrent::ThreadLocalVar),
)
sig { returns(T.untyped) }
def self.good_job_concurrency_config
@@gjcc_tl&.value
end
PERMITTED_CONTENT_TYPES =
T.let([%r{text/html}, %r{application/json}], T::Array[Regexp])
thread_mattr_accessor :current_scraper_job
around_perform do |job, block|
Scraper::JobBase.current_scraper_job = job
block.call
ensure
Scraper::JobBase.current_scraper_job = nil
end
# Delay a little bit on Net::ReadTimeout or Errno::ECONNREFUSED
around_perform do |job, block|
block.call
rescue Net::ReadTimeout, Errno::ECONNREFUSED => e
logger.error "#{e.class.name} - sleep for a bit"
sleep rand(2.0..5.0)
raise e
end
# Collect log lines from the job
around_perform do |job, block|
log_lines = T.let([], T::Array[String])
ColorLogger.log_line_accumulator = proc { |line| log_lines << line }
block.call
ensure
begin
job.enqueue_deferred_jobs!
ColorLogger.log_line_accumulator = nil
log_lines = T.must(log_lines)
good_job = GoodJob::CurrentThread.job
last_execution = Scraper::JobBase.last_good_job_execution
if good_job && last_execution && good_job == last_execution.job &&
log_lines.any?
Scraper::JobBase.last_good_job_execution = nil
last_execution.create_log_lines_collection!(log_lines: log_lines)
end
rescue StandardError => e
logger.error("error collecting log lines: #{e.class.name} - #{e.message}")
end
end
# Log job metrics
around_perform do |job, block|
Scraper::Metrics::JobBaseMetrics.observe_job_start(job_class: self.class)
success = T.let(false, T::Boolean)
start_time = T.let(Time.now, Time)
ret = block.call
success = true
ret
ensure
duration_ms = (1000 * (Time.now - T.must(start_time))).to_i
Scraper::Metrics::JobBaseMetrics.observe_job_finish(
job_class: self.class,
time_ms: duration_ms,
success: T.must(success),
)
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
).void
end
def suppress_deferred_job(job_class, params)
ignore_args = job_class.gather_ignore_signature_args
params_cleared =
params.reject { |key, value| ignore_args.include?(key.to_sym) }
!!@suppressed_jobs.add?(
SuppressedJob.new(job_class:, params: params_cleared),
)
end
sig { void }
def enqueue_deferred_jobs!
jobs_to_enqueue =
@deferred_jobs.filter_map do |deferred_job|
if @suppressed_jobs.any? { |suppressed_job|
if suppressed_job.matches?(deferred_job)
logger.info(
"suppressing deferred job #{deferred_job.job_class.name} with params #{deferred_job.describe_params}",
)
true
end
}
nil
else
deferred_job
end
end
GoodJob::Bulk.enqueue do
jobs_to_enqueue.each do |deferred_job|
args =
deferred_job.params.merge(
{
caused_by_entry: causing_log_entry,
caused_by_job_id: self.job_id,
},
)
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: self.class,
enqueued_class: deferred_job.job_class,
)
if job
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
(make_tag("job_id", job.job_id)),
"enqueue deferred job",
),
)
end
end
rescue StandardError => e
logger.error("error enqueueing jobs: #{e.class.name} - #{e.message}")
end
end
sig { params(msg: T.untyped).returns(T.noreturn) }
def fatal_error(msg)
logger.error(msg)
raise JobError, msg.uncolorize
end
class DateHelper
extend ActionView::Helpers::DateHelper
end
DATE_HELPER = DateHelper
sig { params(maybe_time: T.untyped).returns(String) }
def time_ago_in_words(maybe_time)
return "never".bold if maybe_time.nil?
"#{DATE_HELPER.time_ago_in_words(maybe_time)} ago".bold
end
end