deferred jobs in models

This commit is contained in:
Dylan Knutson
2025-01-29 17:29:00 +00:00
parent 8f81468fc0
commit 4d5784b630
15 changed files with 156 additions and 67 deletions

View File

@@ -5,7 +5,6 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
sig { override.params(args: T::Hash[Symbol, T.untyped]).void }
def perform(args)
response = http_client.get("https://e621.net/posts.json")
log_entry = response.log_entry
if response.status_code != 200
fatal_error(
@@ -13,13 +12,14 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
)
end
json = JSON.parse(response.body)
if json["posts"].nil?
fatal_error("no posts in response, hle #{log_entry.id}")
end
posts_json =
T.cast(
JSON.parse(response.body)["posts"],
T::Array[T::Hash[String, T.untyped]],
)
e621_posts =
json["posts"].map do |post_json|
posts_json.map do |post_json|
Domain::E621::TagUtil.initialize_or_update_post(
post_json: post_json,
caused_by_entry: causing_log_entry,
@@ -37,13 +37,6 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
e621_post.save!
end
(created_posts + updated_posts).uniq.each do |post|
logger.info(
"[e621_id: #{post.e621_id.to_s.bold}] enqueueing static file job",
)
defer_job(Domain::E621::Job::StaticFileJob, { post: post })
end
logger.info(
"#{updated_posts.count} updated, #{created_posts.count} created, #{seen_posts.count} seen",
)

View File

@@ -38,12 +38,8 @@ class Domain::E621::Job::ScanPostJob < Domain::E621::Job::Base
post =
Domain::E621::TagUtil.initialize_or_update_post(
post_json: post_json,
caused_by_entry: log_entry,
caused_by_entry: causing_log_entry,
)
post.save!
unless post.file.present?
defer_job(Domain::E621::Job::StaticFileJob, { post: post })
end
end
end

View File

@@ -110,7 +110,6 @@ class Domain::E621::Job::ScanUserFavsJob < Domain::E621::Job::Base
"#{prefix} [created post: e621 id #{post.e621_id} / id #{post.id}]",
)
total_new_posts += 1
defer_job(Domain::E621::Job::StaticFileJob, post: post)
end
end
end

View File

@@ -9,8 +9,6 @@ class Scraper::JobBase < ApplicationJob
class JobError < RuntimeError
end
DeferredJob = Struct.new(:job_class, :params, :set_args)
class WrappedHttpClient
extend T::Sig
@@ -360,7 +358,7 @@ class Scraper::JobBase < ApplicationJob
).void
end
def defer_job(job_class, params, set_args = {})
@deferred_jobs << DeferredJob.new(job_class, params, set_args)
@deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
end
sig { void }

7
app/lib/deferred_job.rb Normal file
View File

@@ -0,0 +1,7 @@
# typed: strict
class DeferredJob < T::Struct
const :job_class, T.class_of(Scraper::JobBase)
const :params, T::Hash[Symbol, T.untyped]
const :set_args, T::Hash[Symbol, T.untyped]
end

View File

@@ -13,7 +13,7 @@ class Domain::E621::TagUtil
end
def self.initialize_or_update_post(post_json:, caused_by_entry: nil)
# create all posts that don't already exist
e621_id = post_json["id"]
e621_id = T.cast(post_json["id"], Integer)
e621_post = Domain::E621::Post.find_or_initialize_by(e621_id: e621_id)
e621_updated_at = post_json["updated_at"]
@@ -27,7 +27,7 @@ class Domain::E621::TagUtil
"caused_by_entry_id"
] = caused_by_entry.id if caused_by_entry
e621_md5 = post_json["file"]["md5"]
e621_md5 = T.cast(post_json["file"]["md5"], String)
if e621_post.md5 && e621_post.md5 != e621_md5
logger.warn(
"md5 changed for post: #{e621_post.md5.to_s.bold} => #{e621_md5.to_s.bold}",
@@ -64,6 +64,13 @@ class Domain::E621::TagUtil
e621_post.sources_array = post_json["sources"]
e621_post.tags_array = post_json["tags"]
if e621_post.md5_changed? && e621_post.md5.present?
e621_post.enqueue_job_after_save(
Domain::E621::Job::StaticFileJob,
{ post: e621_post, caused_by_entry: },
)
end
e621_post
end
end

View File

@@ -28,12 +28,7 @@ class Domain::E621::Task::CollectPostFavsTask
post_jsons.each do |post_json|
post =
Domain::E621::TagUtil.initialize_or_update_post(post_json: post_json)
if post.nil?
logger.warn "post not found: #{post_json["id"]}"
next
end
post.save! if post.changed?
raise if post.new_record?
post.save!
next if post.scanned_post_favs_at
Domain::E621::Job::ScanPostFavsJob.perform_now(post: post)
end

View File

@@ -25,7 +25,8 @@ module Scraper::Metrics::JobBaseMetrics
sig do
params(
source_class: T.class_of(Scraper::JobBase),
source_class:
T.any(T.class_of(Scraper::JobBase), T.class_of(ReduxApplicationRecord)),
enqueued_class: T.class_of(Scraper::JobBase),
).void
end

View File

@@ -5,6 +5,8 @@ class Domain::E621::User < ReduxApplicationRecord
include AttrJson::Record::QueryScopes
json_attributes_scope :scanned_favs_at
json_attributes_scope :scanned_favs_status
json_attributes_scope :num_other_favs_cached
validates_inclusion_of :scanned_favs_status,
in: %w[ok error],

View File

@@ -3,6 +3,13 @@ class ReduxApplicationRecord < ActiveRecord::Base
extend T::Sig
include HasPrometheusClient
# hack to make sorbet recognize the `@after_save_deferred_jobs` instance variable
sig { params(attributes: T.untyped).void }
def initialize(attributes = nil)
@after_save_deferred_jobs = T.let(nil, T.nilable(T::Array[DeferredJob]))
super(attributes)
end
self.abstract_class = true
logger.level = Logger::ERROR
@@ -14,15 +21,49 @@ class ReduxApplicationRecord < ActiveRecord::Base
sig { params(attr_name: Symbol).void }
def self.json_attributes_scope(attr_name)
scope :"where_#{attr_name}",
->(value) do
if value.nil? || value == :null
where("json_attributes->>'#{attr_name}' IS NULL")
elsif value == :not_null
where("json_attributes->>'#{attr_name}' IS NOT NULL")
else
where("json_attributes->>'#{attr_name}' = ?", value)
end
->(expr, *binds) do
where("json_attributes->>'#{attr_name}' #{expr}", binds)
end
scope :"order_#{attr_name}",
->(dir) do
unless [:asc, :desc, nil].include?(dir)
raise("invalid direction: #{dir}")
end
order(Arel.sql "json_attributes->>'#{attr_name}' #{dir}")
end
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).void
end
def enqueue_job_after_save(job_class, params, set_args = {})
@after_save_deferred_jobs ||= []
@after_save_deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
end
after_save do
T.bind(self, ReduxApplicationRecord)
@after_save_deferred_jobs ||= T.let([], T.nilable(T::Array[DeferredJob]))
GoodJob::Bulk.enqueue do
@after_save_deferred_jobs.each do |deferred_job|
deferred_job
.job_class
.set(deferred_job.set_args)
.perform_later(deferred_job.params)
logger.info(
"[class: #{self.class.name}][id: #{id}][enqueued job: #{deferred_job.job_class.name}]",
)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: self.class,
enqueued_class: deferred_job.job_class,
)
end
end
end
private

View File

@@ -4,6 +4,18 @@ namespace :e621 do
Domain::E621::Job::PostsIndexJob.set(priority: -10).perform_later({})
end
desc "download files for posts missing files"
task download_missing_files: %i[environment set_logger_stdout] do
relation =
Domain::E621::Post
.where(file: nil, state: :ok)
.where.not(file_url_str: nil)
puts "will download #{relation.count} posts"
relation.find_each do |p|
Domain::E621::Job::StaticFileJob.perform_now(post: p)
end
end
desc "scan e621 user favs"
task scan_user_favs: :environment do
while user =
@@ -16,20 +28,14 @@ namespace :e621 do
end
desc "scan e621 user favs, descending by num_other_favs_cached"
task scan_user_favs_descending: :environment do
# total number of favs
# ReduxApplicationRecord.connection.execute(
# "SELECT SUM((json_attributes->>'num_other_favs_cached')::text::int) as total_favs
# FROM domain_e621_users WHERE json_attributes->>'num_other_favs_cached' IS NOT NULL",
# ).first["total_favs"]
#
#
task scan_user_favs_descending: %i[environment set_logger_stdout] do
user_query =
lambda do
Domain::E621::User
.where_scanned_favs_at(:null)
.where("json_attributes->>'num_other_favs_cached' is not null")
.order(Arel.sql "json_attributes->>'num_other_favs_cached' DESC")
.where_scanned_favs_at("is not null")
.where_scanned_favs_status("<> ?", "error")
.where_num_other_favs_cached("is not null")
.order_num_other_favs_cached(:desc)
.first
end
@@ -40,7 +46,7 @@ namespace :e621 do
desc "Gather cached user fav counts based on post fav lists"
task collect_post_favs: :environment do
max_page = ENV["MAX_PAGE"] || 1
max_page = (ENV["MAX_PAGE"] || 1).to_i
default_query = "status:any order:favcount"
query = nil
while query.blank?

View File

@@ -527,6 +527,15 @@ class Domain::E621::User
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def order(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def order_num_other_favs_cached(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def order_scanned_favs_at(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def order_scanned_favs_status(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def preload(*args, &blk); end
@@ -570,9 +579,15 @@ class Domain::E621::User
sig { params(args: T.untyped).returns(PrivateAssociationRelation) }
def where(*args); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def where_num_other_favs_cached(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def where_scanned_favs_at(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def where_scanned_favs_status(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def with(*args, &blk); end
@@ -1339,6 +1354,15 @@ class Domain::E621::User
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def order(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def order_num_other_favs_cached(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def order_scanned_favs_at(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def order_scanned_favs_status(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def preload(*args, &blk); end
@@ -1382,9 +1406,15 @@ class Domain::E621::User
sig { params(args: T.untyped).returns(PrivateRelation) }
def where(*args); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def where_num_other_favs_cached(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def where_scanned_favs_at(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def where_scanned_favs_status(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def with(*args, &blk); end

View File

@@ -21,7 +21,7 @@ class Domain::Inkbunny::Job::FileJob
end
def perform_later(args, &block); end
sig { params(args: T.untyped).returns(T.untyped) }
sig { params(args: T.untyped).void }
def perform_now(args); end
end
end

View File

@@ -21,7 +21,7 @@ class Domain::Inkbunny::Job::UserAvatarJob
end
def perform_later(args, &block); end
sig { params(args: T.untyped).returns(T.untyped) }
sig { params(args: T.untyped).void }
def perform_now(args); end
end
end

View File

@@ -5,22 +5,23 @@ describe Domain::E621::Job::PostsIndexJob do
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
before { Scraper::ClientFactory.http_client_mock = http_client_mock }
it "works" do
it "works", quiet: false do
file = create(:http_log_entry)
HttpClientMockHelpers.init_http_client_mock(
http_client_mock,
[
{
uri: "https://e621.net/posts.json",
status_code: 200,
content_type: "application/json; charset=utf-8",
contents:
SpecUtil.read_fixture_file("domain/e621/job/posts_index_1.json"),
caused_by_entry: file,
},
],
)
log_entries =
HttpClientMockHelpers.init_http_client_mock(
http_client_mock,
[
{
uri: "https://e621.net/posts.json",
status_code: 200,
content_type: "application/json; charset=utf-8",
contents:
SpecUtil.read_fixture_file("domain/e621/job/posts_index_1.json"),
caused_by_entry: file,
},
],
)
described_class.perform_now({ caused_by_entry: file })
@@ -36,5 +37,18 @@ describe Domain::E621::Job::PostsIndexJob do
"species" => array_including("mammal", "procyonid", "raccoon"),
),
)
# expect jobs to be enqueued
expect(
SpecUtil.enqueued_job_args(Domain::E621::Job::StaticFileJob).count,
).to eq(5)
expect(
SpecUtil.enqueued_job_args(Domain::E621::Job::StaticFileJob)[0],
).to eq(
{
post: Domain::E621::Post.find_by(e621_id: 4_247_444),
caused_by_entry: log_entries[0],
},
)
end
end