add bulk enqueue job helper
This commit is contained in:
@@ -2,6 +2,9 @@
|
||||
# This will be used to create an index of follower -> followed
|
||||
# of a specific user, for recommender training
|
||||
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
include HasMeasureDuration
|
||||
include HasBulkEnqueueJobs
|
||||
|
||||
USERS_PER_FULL_PAGE = Rails.env.test? ? 9 : 190
|
||||
|
||||
queue_as :fa_user_follows
|
||||
@@ -36,18 +39,18 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
|
||||
to_add = nil
|
||||
to_remove = nil
|
||||
duration, _ignore = measure do
|
||||
measure(proc { |jobs|
|
||||
"add #{to_add.size.to_s.bold} follows, " +
|
||||
"remove #{to_remove.size.to_s.bold} follows"
|
||||
}) do
|
||||
existing_followed_ids = Set.new(@user.follows.pluck(:followed_id))
|
||||
to_remove = existing_followed_ids - @scanned_followed_ids
|
||||
to_add = @scanned_followed_ids - existing_followed_ids
|
||||
end
|
||||
|
||||
duration_ms = (duration * 1000).to_i
|
||||
logger.info("add #{to_add.size.to_s.bold} follows, " +
|
||||
"remove #{to_remove.size.to_s.bold} follows " +
|
||||
"(took #{duration_ms.to_s.bold} ms)")
|
||||
|
||||
duration, _ignore = measure do
|
||||
measure(proc {
|
||||
"updated follows list to #{@user.follows.count.to_s.bold} users"
|
||||
}) do
|
||||
ReduxApplicationRecord.transaction do
|
||||
@user.follows.where(followed_id: to_remove).delete_all
|
||||
@user.follows.insert_all!(to_add.map do |id|
|
||||
@@ -65,12 +68,6 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
caused_by_entry: best_caused_by_entry,
|
||||
})
|
||||
end
|
||||
|
||||
duration_ms = (1000 * duration).to_i
|
||||
logger.info(
|
||||
"bulk set follows list to #{@user.follows.count.to_s.bold} users " +
|
||||
"(took #{duration_ms.to_s.bold} ms)"
|
||||
)
|
||||
end
|
||||
|
||||
private
|
||||
@@ -112,7 +109,11 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
@total_follows_seen += user_list.length
|
||||
|
||||
users_to_create_hashes = []
|
||||
duration, followed_user_ids = measure do
|
||||
followed_user_ids = measure(proc {
|
||||
"page #{@page_number.to_s.bold} - " +
|
||||
"#{user_list.length.to_s.bold} users on page, " +
|
||||
"created #{users_to_create_hashes.size.to_s.bold}"
|
||||
}) do
|
||||
existing_url_name_to_id = Domain::Fa::User.where(
|
||||
url_name: user_list.map(&:url_name),
|
||||
).pluck(:id, :url_name).map do |id, url_name|
|
||||
@@ -139,18 +140,9 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
end unless users_to_create_hashes.empty?
|
||||
|
||||
enqueue_new_user_pagescan_jobs(users_to_create_hashes)
|
||||
|
||||
(created_user_ids || []) + existing_url_name_to_id.values
|
||||
end
|
||||
|
||||
duration_ms = (duration * 1000).to_i
|
||||
logger.info(
|
||||
"page #{@page_number.to_s.bold} - " +
|
||||
"#{user_list.length.to_s.bold} users on page, " +
|
||||
"created #{users_to_create_hashes.size.to_s.bold} " +
|
||||
"(took #{duration_ms.to_s.bold} ms)"
|
||||
)
|
||||
|
||||
followed_user_ids.each do |user_id|
|
||||
@scanned_followed_ids.add(user_id)
|
||||
end
|
||||
@@ -159,26 +151,13 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
end
|
||||
|
||||
def enqueue_new_user_pagescan_jobs(user_hashes)
|
||||
user_page_jobs = user_hashes.map do |user_hash|
|
||||
Domain::Fa::Job::UserPageJob.new(
|
||||
url_name: user_hash[:url_name],
|
||||
caused_by_entry: best_caused_by_entry,
|
||||
)
|
||||
end
|
||||
|
||||
key_mapper = Domain::Fa::Job::UserPageJob.good_job_concurrency_config[:key]
|
||||
key_to_job = user_page_jobs.map do |job|
|
||||
key = job.instance_eval(&key_mapper)
|
||||
job.good_job_concurrency_key = key
|
||||
[key, job]
|
||||
end.to_h
|
||||
existing_keys = GoodJob::Job.where(concurrency_key: key_to_job.keys).pluck(:concurrency_key)
|
||||
existing_keys.each do |key|
|
||||
key_to_job.delete(key)
|
||||
end
|
||||
|
||||
suppress_good_job_concurrency(Domain::Fa::Job::UserPageJob) do
|
||||
GoodJob::Bulk.enqueue(key_to_job.values)
|
||||
bulk_enqueue_jobs do
|
||||
user_hashes.each do |user_hash|
|
||||
Domain::Fa::Job::UserPageJob.perform_later({
|
||||
url_name: user_hash[:url_name],
|
||||
caused_by_entry: best_caused_by_entry,
|
||||
})
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -18,13 +18,6 @@ class Scraper::JobBase < ApplicationJob
|
||||
@gallery_dl_client ||= Scraper::ClientFactory.get_gallery_dl_client
|
||||
end
|
||||
|
||||
def measure
|
||||
now = Time.now
|
||||
ret = yield
|
||||
dur = Time.now - now
|
||||
[dur, ret]
|
||||
end
|
||||
|
||||
good_job_control_concurrency_with(
|
||||
total_limit: 1,
|
||||
key: proc do
|
||||
@@ -49,12 +42,12 @@ class Scraper::JobBase < ApplicationJob
|
||||
end,
|
||||
)
|
||||
|
||||
def suppress_good_job_concurrency(for_klass)
|
||||
old_limit = for_klass.good_job_concurrency_config[:total_limit]
|
||||
for_klass.good_job_concurrency_config[:total_limit] = nil
|
||||
yield
|
||||
ensure
|
||||
for_klass.good_job_concurrency_config[:total_limit] = old_limit
|
||||
# make the concurrency config threadlocal so it can be modified
|
||||
# per-thread temporarily for HasGoodJobSuppressConcurrency
|
||||
gjcc = self.good_job_concurrency_config.dup
|
||||
@@gjcc_tl ||= Concurrent::ThreadLocalVar.new { gjcc.dup }
|
||||
def self.good_job_concurrency_config
|
||||
@@gjcc_tl.value
|
||||
end
|
||||
|
||||
def write_point(name, tags: {}, fields: {})
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
class Domain::Fa::BulkJob
|
||||
include HasColorLogger
|
||||
|
||||
def measure(title)
|
||||
now = Time.now
|
||||
ret = yield
|
||||
duration = Time.now - now
|
||||
if duration >= 5
|
||||
duration_str = "#{duration.round(2).to_s.bold} sec"
|
||||
else
|
||||
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
|
||||
end
|
||||
title = title.call(ret) if title.respond_to?(:call)
|
||||
logger.info "#{title} - #{duration_str}"
|
||||
ret
|
||||
end
|
||||
end
|
||||
@@ -1,4 +1,6 @@
|
||||
class Domain::Fa::FactorCalculator < Domain::Fa::BulkJob
|
||||
class Domain::Fa::FactorCalculator
|
||||
include HasMeasureDuration
|
||||
|
||||
def initialize(epochs = 20)
|
||||
factors = Domain::Fa::UserFactor::FACTORS_WIDTHS
|
||||
@recommender = Disco::Recommender.new(
|
||||
@@ -19,10 +21,6 @@ class Domain::Fa::FactorCalculator < Domain::Fa::BulkJob
|
||||
measure("fit #{dataset.length.to_s.bold} follows") do
|
||||
@recommender.fit(dataset)
|
||||
end
|
||||
|
||||
# measure("optimize recs") do
|
||||
# @recommender.optimize_item_recs
|
||||
# end
|
||||
end
|
||||
|
||||
def write_factors
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
|
||||
include HasColorLogger
|
||||
class Domain::Fa::PostEnqueuer
|
||||
include HasMeasureDuration
|
||||
include HasBulkEnqueueJobs
|
||||
|
||||
def initialize(start_at:, low_water_mark:, high_water_mark:)
|
||||
@low_water_mark = low_water_mark
|
||||
@@ -26,22 +27,13 @@ class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
|
||||
end
|
||||
end
|
||||
|
||||
key_mapper = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:key]
|
||||
jobs = rows.map do |post_id, fa_id|
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
job = Domain::Fa::Job::ScanPostJob.new({ fa_id: fa_id })
|
||||
job.good_job_concurrency_key = job.instance_eval(&key_mapper)
|
||||
job
|
||||
end
|
||||
|
||||
measure("enqueue jobs") do
|
||||
old_limit = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit]
|
||||
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = nil
|
||||
jobs.each_slice(10) do |slice|
|
||||
GoodJob::Bulk.enqueue(slice)
|
||||
bulk_enqueue_jobs do
|
||||
rows.each do |post_id, fa_id|
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
end
|
||||
end
|
||||
ensure
|
||||
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = old_limit
|
||||
end
|
||||
throw StopIteration if rows.empty?
|
||||
else
|
||||
|
||||
@@ -23,11 +23,9 @@ class Domain::Fa::UserEnqueuer
|
||||
rows = to_enqueue.times.map do
|
||||
@user_iterator.next
|
||||
end
|
||||
ReduxApplicationRecord.transaction do
|
||||
rows.each do |user|
|
||||
Domain::Fa::Job::UserFollowsJob.perform_later({ user: user })
|
||||
logger.info "enqueue #{user.url_name.bold} (#{user.id.to_s.bold})"
|
||||
end
|
||||
rows.each do |user|
|
||||
Domain::Fa::Job::UserFollowsJob.perform_later({ user: user })
|
||||
logger.info "enqueue #{user.url_name.bold} (#{user.id.to_s.bold})"
|
||||
end
|
||||
throw StopIteration if rows.empty?
|
||||
else
|
||||
|
||||
27
app/lib/has_bulk_enqueue_jobs.rb
Normal file
27
app/lib/has_bulk_enqueue_jobs.rb
Normal file
@@ -0,0 +1,27 @@
|
||||
module HasBulkEnqueueJobs
|
||||
extend ActiveSupport::Concern
|
||||
included do
|
||||
def bulk_enqueue_jobs(&block)
|
||||
# must set total_limit to 0 before generating jobs,
|
||||
# else the instances will have their own total_limit set
|
||||
old_limit = Scraper::JobBase.good_job_concurrency_config[:total_limit]
|
||||
Scraper::JobBase.good_job_concurrency_config[:total_limit] = nil
|
||||
|
||||
key_to_job = GoodJob::Bulk.capture(&block).map do |job|
|
||||
[job.good_job_concurrency_key, job]
|
||||
end.to_h
|
||||
|
||||
ReduxApplicationRecord.transaction do
|
||||
existing_keys = GoodJob::Job.
|
||||
where(concurrency_key: key_to_job.keys).
|
||||
pluck(:concurrency_key)
|
||||
existing_keys.each do |key|
|
||||
key_to_job.delete(key)
|
||||
end
|
||||
GoodJob::Bulk.enqueue(key_to_job.values)
|
||||
end
|
||||
ensure
|
||||
Scraper::JobBase.good_job_concurrency_config[:total_limit] = old_limit
|
||||
end
|
||||
end
|
||||
end
|
||||
21
app/lib/has_measure_duration.rb
Normal file
21
app/lib/has_measure_duration.rb
Normal file
@@ -0,0 +1,21 @@
|
||||
module HasMeasureDuration
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
include HasColorLogger
|
||||
|
||||
def measure(title)
|
||||
now = Time.now
|
||||
ret = yield
|
||||
duration = Time.now - now
|
||||
if duration >= 5
|
||||
duration_str = "#{duration.round(2).to_s.bold} sec"
|
||||
else
|
||||
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
|
||||
end
|
||||
title = title.call(ret) if title.respond_to?(:call)
|
||||
logger.info "#{title} - #{duration_str}"
|
||||
ret
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,26 +1,31 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe Domain::Fa::PostEnqueuer do
|
||||
it "works" do
|
||||
creator = SpecUtil.build_domain_fa_user
|
||||
creator.save!
|
||||
post_fa_ids = 7.times.map do
|
||||
post = SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
|
||||
let(:creator) { SpecUtil.create_domain_fa_user }
|
||||
let!(:posts) do
|
||||
7.times.map do
|
||||
SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
|
||||
p.save!
|
||||
end
|
||||
end.map(&:fa_id)
|
||||
|
||||
enqueuer = Domain::Fa::PostEnqueuer.new(
|
||||
start_at: 0,
|
||||
high_water_mark: 5,
|
||||
low_water_mark: 3,
|
||||
)
|
||||
|
||||
enqueued_fa_ids = proc do
|
||||
end
|
||||
end
|
||||
let(:enqueued_fa_ids) do
|
||||
proc do
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).map do |job|
|
||||
job[:args][0][:fa_id]
|
||||
end
|
||||
end
|
||||
end
|
||||
let(:enqueuer) do
|
||||
Domain::Fa::PostEnqueuer.new(
|
||||
start_at: 0,
|
||||
high_water_mark: 5,
|
||||
low_water_mark: 3,
|
||||
)
|
||||
end
|
||||
|
||||
it "works" do
|
||||
post_fa_ids = posts.map(&:fa_id)
|
||||
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
|
||||
@@ -42,4 +47,19 @@ describe Domain::Fa::PostEnqueuer do
|
||||
expect { enqueuer.run_once }.to raise_exception(StopIteration)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
|
||||
end
|
||||
|
||||
it "does not enqueue posts which are already in the queue" do
|
||||
post_fa_ids = posts.map(&:fa_id)
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: post_fa_ids[1] })
|
||||
expect(enqueued_fa_ids.call).to eq([post_fa_ids[1]])
|
||||
|
||||
# post [1] should be filtered out
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq([
|
||||
post_fa_ids[1],
|
||||
post_fa_ids[0],
|
||||
post_fa_ids[2],
|
||||
post_fa_ids[3],
|
||||
])
|
||||
end
|
||||
end
|
||||
|
||||
@@ -19,6 +19,7 @@ describe Domain::Fa::UserEnqueuer do
|
||||
end
|
||||
|
||||
enqueuer.run_once
|
||||
expect(get_enqueued_users.call.length).to eq(5)
|
||||
expect(get_enqueued_users.call).to eq(users[0...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::UserFollowsJob)
|
||||
|
||||
|
||||
53
spec/lib/has_bulk_enqueue_jobs_spec.rb
Normal file
53
spec/lib/has_bulk_enqueue_jobs_spec.rb
Normal file
@@ -0,0 +1,53 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe HasBulkEnqueueJobs do
|
||||
let(:test_class) do
|
||||
Class.new do
|
||||
include HasBulkEnqueueJobs
|
||||
|
||||
def config_object_id
|
||||
Scraper::JobBase.good_job_concurrency_config.object_id
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "has correct thread-local access to config" do
|
||||
get_config_obj_id = proc {
|
||||
Scraper::JobBase.good_job_concurrency_config.object_id
|
||||
}
|
||||
|
||||
id1 = get_config_obj_id.call
|
||||
# id stays consistent
|
||||
expect(get_config_obj_id.call).to eq(id1)
|
||||
|
||||
# id changes in a different thread
|
||||
id2 = nil
|
||||
Thread.new do
|
||||
id2 = get_config_obj_id.call
|
||||
end.join
|
||||
expect(id2).to_not be_nil
|
||||
expect(id2).to_not eq(id1)
|
||||
end
|
||||
|
||||
it "enqueues jobs correctly" do
|
||||
inst = test_class.new
|
||||
expect(SpecUtil.enqueued_jobs).to eq([])
|
||||
inst.bulk_enqueue_jobs do
|
||||
Domain::Fa::Job::BrowsePageJob.perform_later
|
||||
end
|
||||
expect(SpecUtil.enqueued_jobs.length).to eq(1)
|
||||
expect(
|
||||
SpecUtil.enqueued_jobs[0][:good_job].concurrency_key
|
||||
).to start_with("Domain::Fa::Job::BrowsePageJob")
|
||||
|
||||
inst.bulk_enqueue_jobs do
|
||||
Domain::Fa::Job::BrowsePageJob.perform_later
|
||||
end
|
||||
expect(SpecUtil.enqueued_jobs.length).to eq(1)
|
||||
end
|
||||
|
||||
it "works with no enqueued jobs" do
|
||||
inst = test_class.new
|
||||
inst.bulk_enqueue_jobs { }
|
||||
end
|
||||
end
|
||||
@@ -134,7 +134,7 @@ class SpecUtil
|
||||
# raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
|
||||
# end
|
||||
|
||||
GoodJob::Job.all.map do |job|
|
||||
GoodJob::Job.order(created_at: :asc).all.map do |job|
|
||||
{
|
||||
job: job.job_class.constantize,
|
||||
queue: job.queue_name,
|
||||
|
||||
Reference in New Issue
Block a user