deferred jobs in models
This commit is contained in:
@@ -5,7 +5,6 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
|
||||
sig { override.params(args: T::Hash[Symbol, T.untyped]).void }
|
||||
def perform(args)
|
||||
response = http_client.get("https://e621.net/posts.json")
|
||||
log_entry = response.log_entry
|
||||
|
||||
if response.status_code != 200
|
||||
fatal_error(
|
||||
@@ -13,13 +12,14 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
|
||||
)
|
||||
end
|
||||
|
||||
json = JSON.parse(response.body)
|
||||
if json["posts"].nil?
|
||||
fatal_error("no posts in response, hle #{log_entry.id}")
|
||||
end
|
||||
posts_json =
|
||||
T.cast(
|
||||
JSON.parse(response.body)["posts"],
|
||||
T::Array[T::Hash[String, T.untyped]],
|
||||
)
|
||||
|
||||
e621_posts =
|
||||
json["posts"].map do |post_json|
|
||||
posts_json.map do |post_json|
|
||||
Domain::E621::TagUtil.initialize_or_update_post(
|
||||
post_json: post_json,
|
||||
caused_by_entry: causing_log_entry,
|
||||
@@ -37,13 +37,6 @@ class Domain::E621::Job::PostsIndexJob < Domain::E621::Job::Base
|
||||
e621_post.save!
|
||||
end
|
||||
|
||||
(created_posts + updated_posts).uniq.each do |post|
|
||||
logger.info(
|
||||
"[e621_id: #{post.e621_id.to_s.bold}] enqueueing static file job",
|
||||
)
|
||||
defer_job(Domain::E621::Job::StaticFileJob, { post: post })
|
||||
end
|
||||
|
||||
logger.info(
|
||||
"#{updated_posts.count} updated, #{created_posts.count} created, #{seen_posts.count} seen",
|
||||
)
|
||||
|
||||
@@ -38,12 +38,8 @@ class Domain::E621::Job::ScanPostJob < Domain::E621::Job::Base
|
||||
post =
|
||||
Domain::E621::TagUtil.initialize_or_update_post(
|
||||
post_json: post_json,
|
||||
caused_by_entry: log_entry,
|
||||
caused_by_entry: causing_log_entry,
|
||||
)
|
||||
post.save!
|
||||
|
||||
unless post.file.present?
|
||||
defer_job(Domain::E621::Job::StaticFileJob, { post: post })
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -110,7 +110,6 @@ class Domain::E621::Job::ScanUserFavsJob < Domain::E621::Job::Base
|
||||
"#{prefix} [created post: e621 id #{post.e621_id} / id #{post.id}]",
|
||||
)
|
||||
total_new_posts += 1
|
||||
defer_job(Domain::E621::Job::StaticFileJob, post: post)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -9,8 +9,6 @@ class Scraper::JobBase < ApplicationJob
|
||||
class JobError < RuntimeError
|
||||
end
|
||||
|
||||
DeferredJob = Struct.new(:job_class, :params, :set_args)
|
||||
|
||||
class WrappedHttpClient
|
||||
extend T::Sig
|
||||
|
||||
@@ -360,7 +358,7 @@ class Scraper::JobBase < ApplicationJob
|
||||
).void
|
||||
end
|
||||
def defer_job(job_class, params, set_args = {})
|
||||
@deferred_jobs << DeferredJob.new(job_class, params, set_args)
|
||||
@deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
|
||||
end
|
||||
|
||||
sig { void }
|
||||
|
||||
7
app/lib/deferred_job.rb
Normal file
7
app/lib/deferred_job.rb
Normal file
@@ -0,0 +1,7 @@
|
||||
# typed: strict
|
||||
|
||||
class DeferredJob < T::Struct
|
||||
const :job_class, T.class_of(Scraper::JobBase)
|
||||
const :params, T::Hash[Symbol, T.untyped]
|
||||
const :set_args, T::Hash[Symbol, T.untyped]
|
||||
end
|
||||
@@ -13,7 +13,7 @@ class Domain::E621::TagUtil
|
||||
end
|
||||
def self.initialize_or_update_post(post_json:, caused_by_entry: nil)
|
||||
# create all posts that don't already exist
|
||||
e621_id = post_json["id"]
|
||||
e621_id = T.cast(post_json["id"], Integer)
|
||||
e621_post = Domain::E621::Post.find_or_initialize_by(e621_id: e621_id)
|
||||
|
||||
e621_updated_at = post_json["updated_at"]
|
||||
@@ -27,7 +27,7 @@ class Domain::E621::TagUtil
|
||||
"caused_by_entry_id"
|
||||
] = caused_by_entry.id if caused_by_entry
|
||||
|
||||
e621_md5 = post_json["file"]["md5"]
|
||||
e621_md5 = T.cast(post_json["file"]["md5"], String)
|
||||
if e621_post.md5 && e621_post.md5 != e621_md5
|
||||
logger.warn(
|
||||
"md5 changed for post: #{e621_post.md5.to_s.bold} => #{e621_md5.to_s.bold}",
|
||||
@@ -64,6 +64,13 @@ class Domain::E621::TagUtil
|
||||
e621_post.sources_array = post_json["sources"]
|
||||
e621_post.tags_array = post_json["tags"]
|
||||
|
||||
if e621_post.md5_changed? && e621_post.md5.present?
|
||||
e621_post.enqueue_job_after_save(
|
||||
Domain::E621::Job::StaticFileJob,
|
||||
{ post: e621_post, caused_by_entry: },
|
||||
)
|
||||
end
|
||||
|
||||
e621_post
|
||||
end
|
||||
end
|
||||
|
||||
@@ -28,12 +28,7 @@ class Domain::E621::Task::CollectPostFavsTask
|
||||
post_jsons.each do |post_json|
|
||||
post =
|
||||
Domain::E621::TagUtil.initialize_or_update_post(post_json: post_json)
|
||||
if post.nil?
|
||||
logger.warn "post not found: #{post_json["id"]}"
|
||||
next
|
||||
end
|
||||
post.save! if post.changed?
|
||||
raise if post.new_record?
|
||||
post.save!
|
||||
next if post.scanned_post_favs_at
|
||||
Domain::E621::Job::ScanPostFavsJob.perform_now(post: post)
|
||||
end
|
||||
|
||||
@@ -25,7 +25,8 @@ module Scraper::Metrics::JobBaseMetrics
|
||||
|
||||
sig do
|
||||
params(
|
||||
source_class: T.class_of(Scraper::JobBase),
|
||||
source_class:
|
||||
T.any(T.class_of(Scraper::JobBase), T.class_of(ReduxApplicationRecord)),
|
||||
enqueued_class: T.class_of(Scraper::JobBase),
|
||||
).void
|
||||
end
|
||||
|
||||
@@ -5,6 +5,8 @@ class Domain::E621::User < ReduxApplicationRecord
|
||||
include AttrJson::Record::QueryScopes
|
||||
|
||||
json_attributes_scope :scanned_favs_at
|
||||
json_attributes_scope :scanned_favs_status
|
||||
json_attributes_scope :num_other_favs_cached
|
||||
|
||||
validates_inclusion_of :scanned_favs_status,
|
||||
in: %w[ok error],
|
||||
|
||||
@@ -3,6 +3,13 @@ class ReduxApplicationRecord < ActiveRecord::Base
|
||||
extend T::Sig
|
||||
include HasPrometheusClient
|
||||
|
||||
# hack to make sorbet recognize the `@after_save_deferred_jobs` instance variable
|
||||
sig { params(attributes: T.untyped).void }
|
||||
def initialize(attributes = nil)
|
||||
@after_save_deferred_jobs = T.let(nil, T.nilable(T::Array[DeferredJob]))
|
||||
super(attributes)
|
||||
end
|
||||
|
||||
self.abstract_class = true
|
||||
logger.level = Logger::ERROR
|
||||
|
||||
@@ -14,15 +21,49 @@ class ReduxApplicationRecord < ActiveRecord::Base
|
||||
sig { params(attr_name: Symbol).void }
|
||||
def self.json_attributes_scope(attr_name)
|
||||
scope :"where_#{attr_name}",
|
||||
->(value) do
|
||||
if value.nil? || value == :null
|
||||
where("json_attributes->>'#{attr_name}' IS NULL")
|
||||
elsif value == :not_null
|
||||
where("json_attributes->>'#{attr_name}' IS NOT NULL")
|
||||
else
|
||||
where("json_attributes->>'#{attr_name}' = ?", value)
|
||||
end
|
||||
->(expr, *binds) do
|
||||
where("json_attributes->>'#{attr_name}' #{expr}", binds)
|
||||
end
|
||||
|
||||
scope :"order_#{attr_name}",
|
||||
->(dir) do
|
||||
unless [:asc, :desc, nil].include?(dir)
|
||||
raise("invalid direction: #{dir}")
|
||||
end
|
||||
order(Arel.sql "json_attributes->>'#{attr_name}' #{dir}")
|
||||
end
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
job_class: T.class_of(Scraper::JobBase),
|
||||
params: T::Hash[Symbol, T.untyped],
|
||||
set_args: T::Hash[Symbol, T.untyped],
|
||||
).void
|
||||
end
|
||||
def enqueue_job_after_save(job_class, params, set_args = {})
|
||||
@after_save_deferred_jobs ||= []
|
||||
@after_save_deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
|
||||
end
|
||||
|
||||
after_save do
|
||||
T.bind(self, ReduxApplicationRecord)
|
||||
@after_save_deferred_jobs ||= T.let([], T.nilable(T::Array[DeferredJob]))
|
||||
GoodJob::Bulk.enqueue do
|
||||
@after_save_deferred_jobs.each do |deferred_job|
|
||||
deferred_job
|
||||
.job_class
|
||||
.set(deferred_job.set_args)
|
||||
.perform_later(deferred_job.params)
|
||||
logger.info(
|
||||
"[class: #{self.class.name}][id: #{id}][enqueued job: #{deferred_job.job_class.name}]",
|
||||
)
|
||||
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
|
||||
source_class: self.class,
|
||||
enqueued_class: deferred_job.job_class,
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
@@ -4,6 +4,18 @@ namespace :e621 do
|
||||
Domain::E621::Job::PostsIndexJob.set(priority: -10).perform_later({})
|
||||
end
|
||||
|
||||
desc "download files for posts missing files"
|
||||
task download_missing_files: %i[environment set_logger_stdout] do
|
||||
relation =
|
||||
Domain::E621::Post
|
||||
.where(file: nil, state: :ok)
|
||||
.where.not(file_url_str: nil)
|
||||
puts "will download #{relation.count} posts"
|
||||
relation.find_each do |p|
|
||||
Domain::E621::Job::StaticFileJob.perform_now(post: p)
|
||||
end
|
||||
end
|
||||
|
||||
desc "scan e621 user favs"
|
||||
task scan_user_favs: :environment do
|
||||
while user =
|
||||
@@ -16,20 +28,14 @@ namespace :e621 do
|
||||
end
|
||||
|
||||
desc "scan e621 user favs, descending by num_other_favs_cached"
|
||||
task scan_user_favs_descending: :environment do
|
||||
# total number of favs
|
||||
# ReduxApplicationRecord.connection.execute(
|
||||
# "SELECT SUM((json_attributes->>'num_other_favs_cached')::text::int) as total_favs
|
||||
# FROM domain_e621_users WHERE json_attributes->>'num_other_favs_cached' IS NOT NULL",
|
||||
# ).first["total_favs"]
|
||||
#
|
||||
#
|
||||
task scan_user_favs_descending: %i[environment set_logger_stdout] do
|
||||
user_query =
|
||||
lambda do
|
||||
Domain::E621::User
|
||||
.where_scanned_favs_at(:null)
|
||||
.where("json_attributes->>'num_other_favs_cached' is not null")
|
||||
.order(Arel.sql "json_attributes->>'num_other_favs_cached' DESC")
|
||||
.where_scanned_favs_at("is not null")
|
||||
.where_scanned_favs_status("<> ?", "error")
|
||||
.where_num_other_favs_cached("is not null")
|
||||
.order_num_other_favs_cached(:desc)
|
||||
.first
|
||||
end
|
||||
|
||||
@@ -40,7 +46,7 @@ namespace :e621 do
|
||||
|
||||
desc "Gather cached user fav counts based on post fav lists"
|
||||
task collect_post_favs: :environment do
|
||||
max_page = ENV["MAX_PAGE"] || 1
|
||||
max_page = (ENV["MAX_PAGE"] || 1).to_i
|
||||
default_query = "status:any order:favcount"
|
||||
query = nil
|
||||
while query.blank?
|
||||
|
||||
30
sorbet/rbi/dsl/domain/e621/user.rbi
generated
30
sorbet/rbi/dsl/domain/e621/user.rbi
generated
@@ -527,6 +527,15 @@ class Domain::E621::User
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def order(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def order_num_other_favs_cached(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def order_scanned_favs_at(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def order_scanned_favs_status(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def preload(*args, &blk); end
|
||||
|
||||
@@ -570,9 +579,15 @@ class Domain::E621::User
|
||||
sig { params(args: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def where(*args); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def where_num_other_favs_cached(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def where_scanned_favs_at(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def where_scanned_favs_status(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
|
||||
def with(*args, &blk); end
|
||||
|
||||
@@ -1339,6 +1354,15 @@ class Domain::E621::User
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def order(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def order_num_other_favs_cached(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def order_scanned_favs_at(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def order_scanned_favs_status(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def preload(*args, &blk); end
|
||||
|
||||
@@ -1382,9 +1406,15 @@ class Domain::E621::User
|
||||
sig { params(args: T.untyped).returns(PrivateRelation) }
|
||||
def where(*args); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def where_num_other_favs_cached(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def where_scanned_favs_at(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def where_scanned_favs_status(*args, &blk); end
|
||||
|
||||
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
|
||||
def with(*args, &blk); end
|
||||
|
||||
|
||||
2
sorbet/rbi/dsl/domain/inkbunny/job/file_job.rbi
generated
2
sorbet/rbi/dsl/domain/inkbunny/job/file_job.rbi
generated
@@ -21,7 +21,7 @@ class Domain::Inkbunny::Job::FileJob
|
||||
end
|
||||
def perform_later(args, &block); end
|
||||
|
||||
sig { params(args: T.untyped).returns(T.untyped) }
|
||||
sig { params(args: T.untyped).void }
|
||||
def perform_now(args); end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -21,7 +21,7 @@ class Domain::Inkbunny::Job::UserAvatarJob
|
||||
end
|
||||
def perform_later(args, &block); end
|
||||
|
||||
sig { params(args: T.untyped).returns(T.untyped) }
|
||||
sig { params(args: T.untyped).void }
|
||||
def perform_now(args); end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -5,22 +5,23 @@ describe Domain::E621::Job::PostsIndexJob do
|
||||
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
|
||||
before { Scraper::ClientFactory.http_client_mock = http_client_mock }
|
||||
|
||||
it "works" do
|
||||
it "works", quiet: false do
|
||||
file = create(:http_log_entry)
|
||||
|
||||
HttpClientMockHelpers.init_http_client_mock(
|
||||
http_client_mock,
|
||||
[
|
||||
{
|
||||
uri: "https://e621.net/posts.json",
|
||||
status_code: 200,
|
||||
content_type: "application/json; charset=utf-8",
|
||||
contents:
|
||||
SpecUtil.read_fixture_file("domain/e621/job/posts_index_1.json"),
|
||||
caused_by_entry: file,
|
||||
},
|
||||
],
|
||||
)
|
||||
log_entries =
|
||||
HttpClientMockHelpers.init_http_client_mock(
|
||||
http_client_mock,
|
||||
[
|
||||
{
|
||||
uri: "https://e621.net/posts.json",
|
||||
status_code: 200,
|
||||
content_type: "application/json; charset=utf-8",
|
||||
contents:
|
||||
SpecUtil.read_fixture_file("domain/e621/job/posts_index_1.json"),
|
||||
caused_by_entry: file,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
described_class.perform_now({ caused_by_entry: file })
|
||||
|
||||
@@ -36,5 +37,18 @@ describe Domain::E621::Job::PostsIndexJob do
|
||||
"species" => array_including("mammal", "procyonid", "raccoon"),
|
||||
),
|
||||
)
|
||||
|
||||
# expect jobs to be enqueued
|
||||
expect(
|
||||
SpecUtil.enqueued_job_args(Domain::E621::Job::StaticFileJob).count,
|
||||
).to eq(5)
|
||||
expect(
|
||||
SpecUtil.enqueued_job_args(Domain::E621::Job::StaticFileJob)[0],
|
||||
).to eq(
|
||||
{
|
||||
post: Domain::E621::Post.find_by(e621_id: 4_247_444),
|
||||
caused_by_entry: log_entries[0],
|
||||
},
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user