more good_job arguments, specs tolerate enqueuing user links

This commit is contained in:
Dylan Knutson
2025-02-25 17:05:33 +00:00
parent d32d583196
commit 5ad0828aa8
13 changed files with 224 additions and 151 deletions

View File

@@ -178,6 +178,7 @@ module Domain::PostsHelper
class LinkForSource < T::ImmutableStruct
include T::Struct::ActsAsComparable
const :model, ReduxApplicationRecord
const :title, String
const :model_path, String
@@ -185,6 +186,8 @@ module Domain::PostsHelper
end
class SourceResult < T::ImmutableStruct
include T::Struct::ActsAsComparable
const :model, ReduxApplicationRecord
const :title, String
end

View File

@@ -111,6 +111,14 @@ module GoodJobHelper
)
end
if job_id = args_hash[:caused_by_job_id]
job = GoodJob::Job.find_by(id: job_id)
if job
args << JobArg.new(key: :caused_by_job, value: job, inferred: false)
args.delete_if { |arg| arg.key == :caused_by_job_id }
end
end
args.sort_by(&:key)
end

View File

@@ -26,4 +26,22 @@ class ApplicationJob < ActiveJob::Base
@ignore_signature_args.concat(args)
@ignore_signature_args
end
# collect all ignore_signature_args from all superclasses
sig { returns(T::Array[Symbol]) }
def self.gather_ignore_signature_args
args = T.let(%i[_aj_symbol_keys _aj_ruby2_keywords], T::Array[Symbol])
klass = T.let(self, T.class_of(ApplicationJob))
loop do
args += klass.ignore_signature_args
if (superklass = klass.superclass) && superklass < ApplicationJob
klass = superklass
else
break
end
end
args.uniq.sort
end
end

View File

@@ -401,104 +401,91 @@ class Domain::Fa::Job::Base < Scraper::JobBase
end
FoundLink = Scraper::LinkFinder::FoundLink
class JobDef < T::ImmutableStruct
include T::Struct::ActsAsComparable
const :job, Domain
end
sig do
params(log_entry: HttpLogEntry, suppress_jobs: T::Array[T.untyped]).void
end
def enqueue_jobs_from_found_links(log_entry, suppress_jobs: [])
return if skip_enqueue_found_links?
start_time = Time.now
unless PERMITTED_CONTENT_TYPES.any? { |ct|
ct.match(log_entry.content_type)
}
raise("unsupported content type: #{log_entry.content_type}")
end
document = log_entry.response&.contents || return
link_finder = Scraper::LinkFinder.new(T.must(log_entry.uri_host), document)
link_finder.logger.level = :error
links = link_finder.find_links
job_defs = []
url_names =
links.filter_map do |link|
link.is_a?(FoundLink::FaUser) ? link.url_name : nil
logger.tagged("link-finder") do
start_time = Time.now
unless PERMITTED_CONTENT_TYPES.any? { |ct|
ct.match(log_entry.content_type)
}
raise("unsupported content type: #{log_entry.content_type}")
end
url_name_to_fa_user =
T.let(
Domain::User::FaUser.where(url_name: url_names).index_by(&:url_name),
T::Hash[String, Domain::User::FaUser],
)
document = log_entry.response&.contents || return
link_finder =
Scraper::LinkFinder.new(T.must(log_entry.uri_host), document)
link_finder.logger.level = :error
links = link_finder.find_links
fa_ids =
links.filter_map do |link|
link.is_a?(FoundLink::FaPost) ? link.fa_id : nil
end
fa_id_to_fa_post =
T.cast(
Domain::Post::FaPost.where(fa_id: fa_ids).index_by(&:fa_id),
T::Hash[Integer, Domain::Post::FaPost],
)
links
.filter_map do |link|
if link.is_a?(FoundLink::FaUser) || link.is_a?(FoundLink::FaPost)
link
else
nil
url_names =
links.filter_map do |link|
link.is_a?(FoundLink::FaUser) ? link.url_name : nil
end
end
.each do |link|
case link
when FoundLink::FaUser
url_name = link.url_name
user =
url_name_to_fa_user[url_name] ||
Domain::User::FaUser.create!(url_name:) do |user|
user.name ||= url_name
end
enqueue_user_scan(user)
when FoundLink::FaPost
fa_id = link.fa_id
post =
fa_id_to_fa_post[fa_id] ||
Domain::Post::FaPost.build(fa_id:) do |post|
post.first_seen_entry_id = log_entry.id
end
if post.new_record?
post.save!
defer_job(Domain::Fa::Job::ScanPostJob, { post: })
url_name_to_fa_user =
T.let(
Domain::User::FaUser.where(url_name: url_names).index_by(&:url_name),
T::Hash[String, Domain::User::FaUser],
)
fa_ids =
links.filter_map do |link|
link.is_a?(FoundLink::FaPost) ? link.fa_id : nil
end
fa_id_to_fa_post =
T.cast(
Domain::Post::FaPost.where(fa_id: fa_ids).index_by(&:fa_id),
T::Hash[Integer, Domain::Post::FaPost],
)
links
.filter_map do |link|
if link.is_a?(FoundLink::FaUser) || link.is_a?(FoundLink::FaPost)
link
else
nil
end
end
.each do |link|
case link
when FoundLink::FaUser
url_name = link.url_name
user =
url_name_to_fa_user[url_name] ||
Domain::User::FaUser.create!(url_name:) do |user|
user.name ||= url_name
end
enqueue_user_scan(user)
when FoundLink::FaPost
fa_id = link.fa_id
post =
fa_id_to_fa_post[fa_id] ||
Domain::Post::FaPost.build(fa_id:) do |post|
post.first_seen_entry_id = log_entry.id
end
if post.new_record?
post.save!
defer_job(Domain::Fa::Job::ScanPostJob, { post: })
end
end
end
end
job_defs.uniq!
job_defs.reject! do |job_def|
suppress_jobs.any? do |suppress|
suppress == job_def.slice(*suppress.keys)
end
duration_ms = (1000 * (Time.now - start_time)).to_i.to_s
logger.info(format_tags(make_tag("duration", "#{duration_ms} ms")))
end
job_defs.each do |job_def|
job_class = job_def[:job]
params = job_def[:params]
desc = job_def[:desc]
logger.debug(
[
"link finder -",
job_class.name.split("::").last.to_s.ljust(22).bold.light_black,
desc,
].join(" "),
)
defer_job(job_class, params.merge({ caused_by_entry: log_entry }))
end
duration_ms = (1000 * (Time.now - start_time)).to_i.to_s
logger.info(
"link finder - enqueue #{job_defs.size.to_s.light_white.bold} jobs (#{duration_ms.bold} ms)",
)
rescue StandardError => e
logger.error(
"link finder - error enqueuing jobs: #{e.class.name} - #{e.message}",
format_tags(
make_tag("error.class", e.class.name),
make_tag("error.message", e.message),
),
)
end
end

View File

@@ -4,7 +4,7 @@ class Scraper::JobBase < ApplicationJob
thread_mattr_accessor :last_good_job_execution
abstract!
ignore_signature_args :caused_by_entry
ignore_signature_args :caused_by_entry, :caused_by_job_id
class JobError < RuntimeError
end
@@ -123,25 +123,12 @@ class Scraper::JobBase < ApplicationJob
total_limit: 1,
key:
proc do
T.bind(self, ApplicationJob)
T.bind(self, Scraper::JobBase)
if arguments.size != 1
raise("wrong number of arguments: #{arguments.inspect}")
end
# collect all ignore_signature_args from all superclasses
ignore_signature_args =
T.let(%i[_aj_symbol_keys _aj_ruby2_keywords], T::Array[Symbol])
klass = T.let(self.class, T.class_of(ApplicationJob))
loop do
ignore_signature_args += klass.ignore_signature_args
if (superklass = klass.superclass) && superklass < ApplicationJob
klass = superklass
else
break
end
end
ignore_signature_args = self.class.gather_ignore_signature_args
sig_arguments =
arguments[0]
.reject { |key, value| ignore_signature_args.include?(key.to_sym) }
@@ -237,7 +224,13 @@ class Scraper::JobBase < ApplicationJob
def enqueue_deferred_jobs!
GoodJob::Bulk.enqueue do
@deferred_jobs.each do |deferred_job|
args = deferred_job.params.merge({ caused_by_entry: causing_log_entry })
args =
deferred_job.params.merge(
{
caused_by_entry: causing_log_entry,
caused_by_job_id: self.job_id,
},
)
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(

View File

@@ -36,6 +36,8 @@
<%= render "good_job/arguments/domain_user", user: job_arg.value %>
<% when Domain::UserAvatar %>
<%= render "good_job/arguments/domain_user_avatar", user_avatar: job_arg.value %>
<% when GoodJob::Job %>
<%= render "good_job/arguments/good_job_job", job: job_arg.value %>
<% else %>
<div class="text-truncate">
<% if job_arg.inferred %>

View File

@@ -0,0 +1,44 @@
<%# Display GoodJob job information with associated details %>
<div class="d-flex align-items-center gap-2">
<div class="d-flex align-items-center gap-2">
<span class="badge bg-primary">
<i class="fa-solid fa-gears me-1"></i>
<%= link_to "/jobs/jobs/#{job.id}", class: "text-white" do %>
<%= job.job_class %>
<% end %>
</span>
<% if job.queue_name.present? %>
<span class="badge bg-info">
<i class="fa-solid fa-list me-1"></i><%= job.queue_name %>
</span>
<% end %>
<% if job.priority.present? %>
<span class="badge bg-secondary">
<i class="fa-solid fa-arrow-up me-1"></i>Priority: <%= job.priority %>
</span>
<% end %>
</div>
<div class="d-flex align-items-center ms-auto gap-2">
<% if job.error.present? %>
<span class="badge bg-danger" title="<%= job.error %>">
<i class="fa-solid fa-exclamation-triangle me-1"></i>Error
</span>
<% end %>
<% if job.executions_count && job.executions_count > 0 %>
<span class="badge bg-secondary">
<i class="fa-solid fa-rotate me-1"></i>Attempts: <%= job.executions_count %>
</span>
<% end %>
<% if job.scheduled_at.present? %>
<span class="badge bg-light text-dark" title="Scheduled for <%= job.scheduled_at.strftime("%Y-%m-%d %H:%M:%S") %>">
<i class="fa-regular fa-clock me-1"></i>Scheduled
</span>
<% end %>
<% if job.performed_at.present? %>
<span class="badge <%= job.finished_at.present? ? "bg-success" : "bg-warning text-dark" %>">
<i class="fa-solid <%= job.finished_at.present? ? "fa-check" : "fa-spinner fa-spin" %> me-1"></i>
<%= job.finished_at.present? ? "Completed" : "Running" %>
</span>
<% end %>
</div>
</div>

View File

@@ -1,6 +1,20 @@
# typed: strict
unless Rails.env.test?
START_PROMETHEUS_EXPORTER =
T.let(
# do not start in test or console mode
(
!Rails.env.test? && !Rails.const_defined?("Console") &&
Rails.const_defined?("Server")
) ||
# always start in sever mode
Rails.const_defined?("Server") ||
# always start in worker mode
(Rails.env == "worker"),
T::Boolean,
)
if START_PROMETHEUS_EXPORTER
require "prometheus_exporter"
require "prometheus_exporter/client"
require "prometheus_exporter/metric"
@@ -24,6 +38,7 @@ else
# PrometheusExporter::Client is a singleton and we can't easily mock it out
# in a test environment, and simply constructing a new instance attempts to
# connect to a Prometheus server, which we can't rely on in a test environment.
$stderr.puts "PrometheusExporter is disabled in test, console, and rake task environments"
module PrometheusExporter
class Client
class RemoteMetric

View File

@@ -29,7 +29,14 @@ describe Domain::Fa::Job::BrowsePageJob do
it "enqueues post scan job" do
expect(
SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanPostJob),
).to match([{ post: find_post.call, caused_by_entry: log_entries[0] }])
).to match(
[
hash_including(
post: find_post.call,
caused_by_entry: log_entries[0],
),
],
)
end
end
@@ -43,15 +50,13 @@ describe Domain::Fa::Job::BrowsePageJob do
shared_examples "enqueue file scan" do |expect_to_enqueue|
if expect_to_enqueue
it "enqueues file scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanFileJob)).to match(
expect(
SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanFileJob),
).to match(
[
including(
args: [
{
post_file: find_post.call.file,
caused_by_entry: log_entries[0],
},
],
hash_including(
post_file: find_post.call.file,
caused_by_entry: log_entries[0],
),
],
)
@@ -66,12 +71,13 @@ describe Domain::Fa::Job::BrowsePageJob do
shared_examples "enqueue user page scan" do |expect_to_enqueue|
if expect_to_enqueue
it "enqueues user page job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob)).to match(
expect(
SpecUtil.enqueued_job_args(Domain::Fa::Job::UserPageJob),
).to match(
[
including(
args: [
{ user: find_creator.call, caused_by_entry: log_entries[0] },
],
hash_including(
user: find_creator.call,
caused_by_entry: log_entries[0],
),
],
)
@@ -89,13 +95,12 @@ describe Domain::Fa::Job::BrowsePageJob do
if expect_to_enqueue
it "enqueues user gallery job" do
expect(
SpecUtil.enqueued_jobs(Domain::Fa::Job::UserGalleryJob),
SpecUtil.enqueued_job_args(Domain::Fa::Job::UserGalleryJob),
).to match(
[
including(
args: [
{ user: find_creator.call, caused_by_entry: log_entries[0] },
],
hash_including(
user: find_creator.call,
caused_by_entry: log_entries[0],
),
],
)
@@ -289,10 +294,10 @@ describe Domain::Fa::Job::BrowsePageJob do
expect(post3).to_not be_nil
expect(post3.creator).to eq(find_creator.call)
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to match(
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanPostJob)).to match(
[
including(args: [{ post: post1, caused_by_entry: log_entries[0] }]),
including(args: [{ post: post3, caused_by_entry: log_entries[0] }]),
hash_including({ post: post1, caused_by_entry: log_entries[0] }),
hash_including({ post: post3, caused_by_entry: log_entries[0] }),
],
)
end

View File

@@ -200,7 +200,7 @@ describe Domain::Fa::Job::FavsJob do
post.reload
end.to change(post, :state).from("removed").to("ok")
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanPostJob)).to match(
array_including({ post:, caused_by_entry: log_entries[0] }),
array_including(hash_including(post:, caused_by_entry: log_entries[0])),
)
end
@@ -211,26 +211,26 @@ describe Domain::Fa::Job::FavsJob do
).by(5)
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::ScanPostJob)).to match(
array_including(
{
hash_including(
post: Domain::Post::FaPost.find_by(fa_id: 52_106_426),
caused_by_entry: log_entries[0],
},
{
),
hash_including(
post: Domain::Post::FaPost.find_by(fa_id: 36_755_337),
caused_by_entry: log_entries[0],
},
{
),
hash_including(
post: Domain::Post::FaPost.find_by(fa_id: 40_769_488),
caused_by_entry: log_entries[0],
},
{
),
hash_including(
post: Domain::Post::FaPost.find_by(fa_id: 20_808_448),
caused_by_entry: log_entries[0],
},
{
),
hash_including(
post: Domain::Post::FaPost.find_by(fa_id: 20_585_829),
caused_by_entry: log_entries[0],
},
),
),
)
end
@@ -317,16 +317,6 @@ describe Domain::Fa::Job::FavsJob do
),
caused_by_entry_idx: 0,
},
# {
# uri: "https://www.furaffinity.net/favorites/zzreg/475297391/next",
# status_code: 200,
# content_type: "text/html",
# contents:
# SpecUtil.read_fixture_file(
# "domain/fa/job/favs_zzreg_page_2_475297391.html",
# ),
# caused_by_entry_idx: 0,
# },
]
end

View File

@@ -37,7 +37,7 @@ describe Domain::Fa::Job::UserPageJob do
expect(avatar.state).to eq("pending")
expect(
SpecUtil.enqueued_job_args(Domain::Fa::Job::UserAvatarJob),
).to match([{ avatar: avatar, caused_by_entry: @log_entries[0] }])
).to match([hash_including(avatar:, caused_by_entry: @log_entries[0])])
end
end

View File

@@ -64,10 +64,10 @@ describe Domain::Inkbunny::Job::LatestPostsJob do
SpecUtil.enqueued_job_args(Domain::Inkbunny::Job::UpdatePostsJob),
).to match(
[
{
hash_including(
ib_post_ids: [3_104_202, 3_104_200, 3_104_197],
caused_by_entry: log_entries[0],
},
),
],
)
@@ -75,9 +75,12 @@ describe Domain::Inkbunny::Job::LatestPostsJob do
SpecUtil.enqueued_job_args(Domain::Inkbunny::Job::UserGalleryJob),
).to match(
[
{ user: user_thendyart, caused_by_entry: log_entries[0] },
{ user: user_seff, caused_by_entry: log_entries[0] },
{ user: user_soulcentinel, caused_by_entry: log_entries[0] },
hash_including(user: user_thendyart, caused_by_entry: log_entries[0]),
hash_including(user: user_seff, caused_by_entry: log_entries[0]),
hash_including(
user: user_soulcentinel,
caused_by_entry: log_entries[0],
),
],
)
end

View File

@@ -90,7 +90,12 @@ describe Domain::Inkbunny::Job::UpdatePostsJob do
expect(
SpecUtil.enqueued_job_args(Domain::Inkbunny::Job::UpdatePostsJob),
).to match(
[{ ib_post_ids: pool_ib_post_ids, caused_by_entry: log_entries[0] }],
[
hash_including(
ib_post_ids: pool_ib_post_ids,
caused_by_entry: log_entries[0],
),
],
)
end
@@ -139,7 +144,7 @@ describe Domain::Inkbunny::Job::UpdatePostsJob do
expect(file.state).to eq("ok")
expect(
SpecUtil.enqueued_job_args(Domain::Inkbunny::Job::StaticFileJob),
).to include({ file:, caused_by_entry: log_entries[0] })
).to include(hash_including(file:, caused_by_entry: log_entries[0]))
end
it "throws an error when the md5_initial changes" do