enqueue waiting posts job

This commit is contained in:
Dylan Knutson
2023-03-30 10:09:17 +09:00
parent ff017290ec
commit 43848c3dd4
4 changed files with 124 additions and 0 deletions

View File

@@ -0,0 +1,48 @@
class FaPostEnqueuer
include HasColorLogger
def initialize(start_at:, low_water_mark:, high_water_mark:)
@low_water_mark = low_water_mark
@high_water_mark = high_water_mark
raise if @high_water_mark <= @low_water_mark
@post_iterator = Domain::Fa::Post.
where(file_url_str: nil, state: "ok").
select(:id, :fa_id, :file_url_str, :state, :state_detail, :log_entry_detail).
find_each(start: start_at)
end
def run_once
already_enqueued = enqueued_count
if already_enqueued <= @low_water_mark
to_enqueue = @high_water_mark - already_enqueued
logger.info("enqueuing #{to_enqueue.to_s.bold} more posts - #{already_enqueued.to_s.bold} already enqueued")
to_enqueue.times do
GoodJob::Bulk.enqueue do
post = @post_iterator.next
Domain::Fa::Job::ScanPostJob.perform_later(post: post)
logger.info "enqueue #{post.id.to_s.bold} (fa_id: #{post.fa_id.to_s.bold})"
end
end
else
logger.info(
"#{already_enqueued.to_s.bold} already enqueued (max #{@high_water_mark.to_s.bold}) - " +
"waiting to fall below #{@low_water_mark.to_s.bold}"
)
:sleep
end
end
private
def enqueued_count
if Rails.env.test?
return SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).count
end
GoodJob::Job.where(
queue_name: "fa_post",
finished_at: nil,
performed_at: nil,
).count
end
end

View File

@@ -1,4 +1,22 @@
namespace :fa do
desc "enqueue waiting posts"
task :enqueue_waiting_posts => [:set_logger_stdout, :environment] do |t, args|
start_at = (ENV["start_at"] || 0).to_i
low_water_mark = 50
high_water_mark = 300
poll_duration = 10
enqueuer = FaPostEnqueuer.new(
start_at: start_at,
low_water_mark: low_water_mark,
high_water_mark: high_water_mark,
)
loop do
sleep poll_duration if enqueuer.run_once == :sleep
end
end
desc "Import existing FA posts"
task :import_existing, [:start_at] => [:environment] do |t, args|
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i

View File

@@ -0,0 +1,35 @@
describe FaPostEnqueuer do
it "works" do
ActiveJob::Base.queue_adapter = :test
creator = SpecUtil.build_domain_fa_user
creator.save!
posts = 7.times.map do
post = SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
p.save!
end
end
enqueuer = FaPostEnqueuer.new(
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,
)
enqueuer.run_once
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(5)
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
enqueuer.run_once
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(4)
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
enqueuer.run_once
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(5)
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob, 3)
expect do
enqueuer.run_once
end.to raise_exception(StopIteration)
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(2)
end
end

View File

@@ -147,4 +147,27 @@ class SpecUtil
})
end
end
def self.shift_jobs(job_klass, by = 1)
jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
by.times do
index = jobs.index { |job| job[:job] == job_klass }
jobs.delete_at(index)
end
end
def self.build_domain_fa_user
Domain::Fa::User.new(
name: random_string(10),
)
end
def self.build_domain_fa_post(creator: nil, fa_id: nil)
@last_fa_id ||= 0
@last_fa_id += 1
Domain::Fa::Post.new(
creator: creator || build_domain_fa_user,
fa_id: fa_id || @last_fa_id,
)
end
end