add bulk enqueue job helper

This commit is contained in:
Dylan Knutson
2023-04-01 14:29:15 +09:00
parent a413b31a2c
commit 8d85f7ebe1
12 changed files with 179 additions and 114 deletions

View File

@@ -2,6 +2,9 @@
# This will be used to create an index of follower -> followed
# of a specific user, for recommender training
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
include HasMeasureDuration
include HasBulkEnqueueJobs
USERS_PER_FULL_PAGE = Rails.env.test? ? 9 : 190
queue_as :fa_user_follows
@@ -36,18 +39,18 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
to_add = nil
to_remove = nil
duration, _ignore = measure do
measure(proc { |jobs|
"add #{to_add.size.to_s.bold} follows, " +
"remove #{to_remove.size.to_s.bold} follows"
}) do
existing_followed_ids = Set.new(@user.follows.pluck(:followed_id))
to_remove = existing_followed_ids - @scanned_followed_ids
to_add = @scanned_followed_ids - existing_followed_ids
end
duration_ms = (duration * 1000).to_i
logger.info("add #{to_add.size.to_s.bold} follows, " +
"remove #{to_remove.size.to_s.bold} follows " +
"(took #{duration_ms.to_s.bold} ms)")
duration, _ignore = measure do
measure(proc {
"updated follows list to #{@user.follows.count.to_s.bold} users"
}) do
ReduxApplicationRecord.transaction do
@user.follows.where(followed_id: to_remove).delete_all
@user.follows.insert_all!(to_add.map do |id|
@@ -65,12 +68,6 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
caused_by_entry: best_caused_by_entry,
})
end
duration_ms = (1000 * duration).to_i
logger.info(
"bulk set follows list to #{@user.follows.count.to_s.bold} users " +
"(took #{duration_ms.to_s.bold} ms)"
)
end
private
@@ -112,7 +109,11 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
@total_follows_seen += user_list.length
users_to_create_hashes = []
duration, followed_user_ids = measure do
followed_user_ids = measure(proc {
"page #{@page_number.to_s.bold} - " +
"#{user_list.length.to_s.bold} users on page, " +
"created #{users_to_create_hashes.size.to_s.bold}"
}) do
existing_url_name_to_id = Domain::Fa::User.where(
url_name: user_list.map(&:url_name),
).pluck(:id, :url_name).map do |id, url_name|
@@ -139,18 +140,9 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
end unless users_to_create_hashes.empty?
enqueue_new_user_pagescan_jobs(users_to_create_hashes)
(created_user_ids || []) + existing_url_name_to_id.values
end
duration_ms = (duration * 1000).to_i
logger.info(
"page #{@page_number.to_s.bold} - " +
"#{user_list.length.to_s.bold} users on page, " +
"created #{users_to_create_hashes.size.to_s.bold} " +
"(took #{duration_ms.to_s.bold} ms)"
)
followed_user_ids.each do |user_id|
@scanned_followed_ids.add(user_id)
end
@@ -159,26 +151,13 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
end
def enqueue_new_user_pagescan_jobs(user_hashes)
user_page_jobs = user_hashes.map do |user_hash|
Domain::Fa::Job::UserPageJob.new(
url_name: user_hash[:url_name],
caused_by_entry: best_caused_by_entry,
)
end
key_mapper = Domain::Fa::Job::UserPageJob.good_job_concurrency_config[:key]
key_to_job = user_page_jobs.map do |job|
key = job.instance_eval(&key_mapper)
job.good_job_concurrency_key = key
[key, job]
end.to_h
existing_keys = GoodJob::Job.where(concurrency_key: key_to_job.keys).pluck(:concurrency_key)
existing_keys.each do |key|
key_to_job.delete(key)
end
suppress_good_job_concurrency(Domain::Fa::Job::UserPageJob) do
GoodJob::Bulk.enqueue(key_to_job.values)
bulk_enqueue_jobs do
user_hashes.each do |user_hash|
Domain::Fa::Job::UserPageJob.perform_later({
url_name: user_hash[:url_name],
caused_by_entry: best_caused_by_entry,
})
end
end
end

View File

@@ -18,13 +18,6 @@ class Scraper::JobBase < ApplicationJob
@gallery_dl_client ||= Scraper::ClientFactory.get_gallery_dl_client
end
def measure
now = Time.now
ret = yield
dur = Time.now - now
[dur, ret]
end
good_job_control_concurrency_with(
total_limit: 1,
key: proc do
@@ -49,12 +42,12 @@ class Scraper::JobBase < ApplicationJob
end,
)
def suppress_good_job_concurrency(for_klass)
old_limit = for_klass.good_job_concurrency_config[:total_limit]
for_klass.good_job_concurrency_config[:total_limit] = nil
yield
ensure
for_klass.good_job_concurrency_config[:total_limit] = old_limit
# make the concurrency config threadlocal so it can be modified
# per-thread temporarily for HasGoodJobSuppressConcurrency
gjcc = self.good_job_concurrency_config.dup
@@gjcc_tl ||= Concurrent::ThreadLocalVar.new { gjcc.dup }
def self.good_job_concurrency_config
@@gjcc_tl.value
end
def write_point(name, tags: {}, fields: {})

View File

@@ -1,17 +0,0 @@
class Domain::Fa::BulkJob
include HasColorLogger
def measure(title)
now = Time.now
ret = yield
duration = Time.now - now
if duration >= 5
duration_str = "#{duration.round(2).to_s.bold} sec"
else
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
end
title = title.call(ret) if title.respond_to?(:call)
logger.info "#{title} - #{duration_str}"
ret
end
end

View File

@@ -1,4 +1,6 @@
class Domain::Fa::FactorCalculator < Domain::Fa::BulkJob
class Domain::Fa::FactorCalculator
include HasMeasureDuration
def initialize(epochs = 20)
factors = Domain::Fa::UserFactor::FACTORS_WIDTHS
@recommender = Disco::Recommender.new(
@@ -19,10 +21,6 @@ class Domain::Fa::FactorCalculator < Domain::Fa::BulkJob
measure("fit #{dataset.length.to_s.bold} follows") do
@recommender.fit(dataset)
end
# measure("optimize recs") do
# @recommender.optimize_item_recs
# end
end
def write_factors

View File

@@ -1,5 +1,6 @@
class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
include HasColorLogger
class Domain::Fa::PostEnqueuer
include HasMeasureDuration
include HasBulkEnqueueJobs
def initialize(start_at:, low_water_mark:, high_water_mark:)
@low_water_mark = low_water_mark
@@ -26,22 +27,13 @@ class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
end
end
key_mapper = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:key]
jobs = rows.map do |post_id, fa_id|
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
job = Domain::Fa::Job::ScanPostJob.new({ fa_id: fa_id })
job.good_job_concurrency_key = job.instance_eval(&key_mapper)
job
end
measure("enqueue jobs") do
old_limit = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit]
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = nil
jobs.each_slice(10) do |slice|
GoodJob::Bulk.enqueue(slice)
bulk_enqueue_jobs do
rows.each do |post_id, fa_id|
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
end
end
ensure
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = old_limit
end
throw StopIteration if rows.empty?
else

View File

@@ -23,11 +23,9 @@ class Domain::Fa::UserEnqueuer
rows = to_enqueue.times.map do
@user_iterator.next
end
ReduxApplicationRecord.transaction do
rows.each do |user|
Domain::Fa::Job::UserFollowsJob.perform_later({ user: user })
logger.info "enqueue #{user.url_name.bold} (#{user.id.to_s.bold})"
end
rows.each do |user|
Domain::Fa::Job::UserFollowsJob.perform_later({ user: user })
logger.info "enqueue #{user.url_name.bold} (#{user.id.to_s.bold})"
end
throw StopIteration if rows.empty?
else

View File

@@ -0,0 +1,27 @@
module HasBulkEnqueueJobs
extend ActiveSupport::Concern
included do
def bulk_enqueue_jobs(&block)
# must set total_limit to 0 before generating jobs,
# else the instances will have their own total_limit set
old_limit = Scraper::JobBase.good_job_concurrency_config[:total_limit]
Scraper::JobBase.good_job_concurrency_config[:total_limit] = nil
key_to_job = GoodJob::Bulk.capture(&block).map do |job|
[job.good_job_concurrency_key, job]
end.to_h
ReduxApplicationRecord.transaction do
existing_keys = GoodJob::Job.
where(concurrency_key: key_to_job.keys).
pluck(:concurrency_key)
existing_keys.each do |key|
key_to_job.delete(key)
end
GoodJob::Bulk.enqueue(key_to_job.values)
end
ensure
Scraper::JobBase.good_job_concurrency_config[:total_limit] = old_limit
end
end
end

View File

@@ -0,0 +1,21 @@
module HasMeasureDuration
extend ActiveSupport::Concern
included do
include HasColorLogger
def measure(title)
now = Time.now
ret = yield
duration = Time.now - now
if duration >= 5
duration_str = "#{duration.round(2).to_s.bold} sec"
else
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
end
title = title.call(ret) if title.respond_to?(:call)
logger.info "#{title} - #{duration_str}"
ret
end
end
end

View File

@@ -1,26 +1,31 @@
require "rails_helper"
describe Domain::Fa::PostEnqueuer do
it "works" do
creator = SpecUtil.build_domain_fa_user
creator.save!
post_fa_ids = 7.times.map do
post = SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
let(:creator) { SpecUtil.create_domain_fa_user }
let!(:posts) do
7.times.map do
SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
p.save!
end
end.map(&:fa_id)
enqueuer = Domain::Fa::PostEnqueuer.new(
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,
)
enqueued_fa_ids = proc do
end
end
let(:enqueued_fa_ids) do
proc do
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).map do |job|
job[:args][0][:fa_id]
end
end
end
let(:enqueuer) do
Domain::Fa::PostEnqueuer.new(
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,
)
end
it "works" do
post_fa_ids = posts.map(&:fa_id)
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
@@ -42,4 +47,19 @@ describe Domain::Fa::PostEnqueuer do
expect { enqueuer.run_once }.to raise_exception(StopIteration)
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
end
it "does not enqueue posts which are already in the queue" do
post_fa_ids = posts.map(&:fa_id)
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: post_fa_ids[1] })
expect(enqueued_fa_ids.call).to eq([post_fa_ids[1]])
# post [1] should be filtered out
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq([
post_fa_ids[1],
post_fa_ids[0],
post_fa_ids[2],
post_fa_ids[3],
])
end
end

View File

@@ -19,6 +19,7 @@ describe Domain::Fa::UserEnqueuer do
end
enqueuer.run_once
expect(get_enqueued_users.call.length).to eq(5)
expect(get_enqueued_users.call).to eq(users[0...5])
SpecUtil.shift_jobs(Domain::Fa::Job::UserFollowsJob)

View File

@@ -0,0 +1,53 @@
require "rails_helper"
describe HasBulkEnqueueJobs do
let(:test_class) do
Class.new do
include HasBulkEnqueueJobs
def config_object_id
Scraper::JobBase.good_job_concurrency_config.object_id
end
end
end
it "has correct thread-local access to config" do
get_config_obj_id = proc {
Scraper::JobBase.good_job_concurrency_config.object_id
}
id1 = get_config_obj_id.call
# id stays consistent
expect(get_config_obj_id.call).to eq(id1)
# id changes in a different thread
id2 = nil
Thread.new do
id2 = get_config_obj_id.call
end.join
expect(id2).to_not be_nil
expect(id2).to_not eq(id1)
end
it "enqueues jobs correctly" do
inst = test_class.new
expect(SpecUtil.enqueued_jobs).to eq([])
inst.bulk_enqueue_jobs do
Domain::Fa::Job::BrowsePageJob.perform_later
end
expect(SpecUtil.enqueued_jobs.length).to eq(1)
expect(
SpecUtil.enqueued_jobs[0][:good_job].concurrency_key
).to start_with("Domain::Fa::Job::BrowsePageJob")
inst.bulk_enqueue_jobs do
Domain::Fa::Job::BrowsePageJob.perform_later
end
expect(SpecUtil.enqueued_jobs.length).to eq(1)
end
it "works with no enqueued jobs" do
inst = test_class.new
inst.bulk_enqueue_jobs { }
end
end

View File

@@ -134,7 +134,7 @@ class SpecUtil
# raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
# end
GoodJob::Job.all.map do |job|
GoodJob::Job.order(created_at: :asc).all.map do |job|
{
job: job.job_class.constantize,
queue: job.queue_name,