post enqueuer file jobs
This commit is contained in:
@@ -9,9 +9,10 @@ class Domain::Fa::PostEnqueuer
|
||||
raise if @high_water_mark <= @low_water_mark
|
||||
@post_iterator = Enumerator.new do |e|
|
||||
Domain::Fa::Post.
|
||||
where(file_url_str: nil, state: "ok").
|
||||
where("id >= ?", start_at).
|
||||
pluck_each(:id, :fa_id) do |p|
|
||||
where("file_id is null").
|
||||
where(state: "ok").
|
||||
pluck_each(:id, :fa_id, :file_url_str) do |p|
|
||||
e << p
|
||||
end
|
||||
end
|
||||
@@ -30,9 +31,14 @@ class Domain::Fa::PostEnqueuer
|
||||
|
||||
measure("enqueue jobs") do
|
||||
bulk_enqueue_jobs do
|
||||
rows.each do |post_id, fa_id|
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
rows.each do |post_id, fa_id, file_url_str|
|
||||
if file_url_str.nil?
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "post scan #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
else
|
||||
Domain::Fa::Job::ScanFileJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "file scan #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -49,12 +55,8 @@ class Domain::Fa::PostEnqueuer
|
||||
private
|
||||
|
||||
def enqueued_count
|
||||
if Rails.env.test?
|
||||
return SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).count
|
||||
end
|
||||
|
||||
GoodJob::Job.where(
|
||||
queue_name: "fa_post",
|
||||
queue_name: ["static_file", "fa_post"],
|
||||
finished_at: nil,
|
||||
performed_at: nil,
|
||||
).count
|
||||
|
||||
@@ -3,19 +3,37 @@ require "rails_helper"
|
||||
describe Domain::Fa::PostEnqueuer do
|
||||
let(:creator) { SpecUtil.create_domain_fa_user }
|
||||
let!(:posts) do
|
||||
7.times.map do
|
||||
no_file_url = 4.times.map do
|
||||
SpecUtil.create_domain_fa_post(creator: creator)
|
||||
end
|
||||
with_file_url = 3.times.map do
|
||||
SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
|
||||
p.file_url_str = "https://www.example.com/img.jpg"
|
||||
p.save!
|
||||
end
|
||||
end
|
||||
no_file_url + with_file_url
|
||||
end
|
||||
let(:job_klasses) {
|
||||
[
|
||||
Domain::Fa::Job::ScanPostJob,
|
||||
Domain::Fa::Job::ScanFileJob,
|
||||
]
|
||||
}
|
||||
let(:enqueued_fa_ids) do
|
||||
proc do
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).map do |job|
|
||||
SpecUtil.enqueued_jobs(job_klasses).map do |job|
|
||||
job[:args][0][:fa_id]
|
||||
end
|
||||
end
|
||||
end
|
||||
let(:enqueued_fa_jobs) do
|
||||
proc do
|
||||
SpecUtil.enqueued_jobs(job_klasses).map do |job|
|
||||
job[:job]
|
||||
end
|
||||
end
|
||||
end
|
||||
let(:enqueuer) do
|
||||
Domain::Fa::PostEnqueuer.new(
|
||||
start_at: 0,
|
||||
@@ -29,20 +47,33 @@ describe Domain::Fa::PostEnqueuer do
|
||||
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
expect(enqueued_fa_jobs.call).to eq(
|
||||
[Domain::Fa::Job::ScanPostJob] * 4 +
|
||||
[Domain::Fa::Job::ScanFileJob] * 1
|
||||
)
|
||||
SpecUtil.shift_jobs(job_klasses)
|
||||
|
||||
# jobs should have concurrency keys
|
||||
expect(
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).first[:good_job].concurrency_key
|
||||
SpecUtil.enqueued_jobs(job_klasses).first[:good_job].concurrency_key
|
||||
).to_not be_nil
|
||||
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
expect(enqueued_fa_jobs.call).to eq(
|
||||
[Domain::Fa::Job::ScanPostJob] * 3 +
|
||||
[Domain::Fa::Job::ScanFileJob] * 1
|
||||
)
|
||||
SpecUtil.shift_jobs(job_klasses)
|
||||
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob, 3)
|
||||
expect(enqueued_fa_jobs.call).to eq(
|
||||
[Domain::Fa::Job::ScanPostJob] * 2 +
|
||||
[Domain::Fa::Job::ScanFileJob] * 3
|
||||
)
|
||||
|
||||
SpecUtil.shift_jobs(job_klasses, 3)
|
||||
|
||||
expect { enqueuer.run_once }.to raise_exception(StopIteration)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
|
||||
|
||||
@@ -139,21 +139,27 @@ class SpecUtil
|
||||
good_job: job,
|
||||
}
|
||||
end.filter do |job|
|
||||
if job_klass
|
||||
job[:job] == job_klass
|
||||
else
|
||||
true
|
||||
end
|
||||
job_is_klass(job_klass, job)
|
||||
end
|
||||
end
|
||||
|
||||
def self.shift_jobs(job_klass, by = 1)
|
||||
def self.shift_jobs(job_klass = nil, by = 1)
|
||||
by.times do
|
||||
job = enqueued_jobs.find { |job| job[:job] == job_klass }
|
||||
job = enqueued_jobs.find { |job| job_is_klass(job_klass, job) }
|
||||
job[:good_job].destroy if job
|
||||
end
|
||||
end
|
||||
|
||||
def self.job_is_klass(job_klass, job)
|
||||
if job_klass.nil?
|
||||
true
|
||||
elsif job_klass.is_a? Array
|
||||
job_klass.include? job[:job]
|
||||
else
|
||||
job_klass == job[:job]
|
||||
end
|
||||
end
|
||||
|
||||
def self.build_domain_fa_user(name: nil)
|
||||
Domain::Fa::User.new(
|
||||
name: name || random_string,
|
||||
@@ -161,8 +167,8 @@ class SpecUtil
|
||||
end
|
||||
|
||||
def self.create_domain_fa_user(...)
|
||||
build_domain_fa_user(...).tap do |user|
|
||||
user.save!
|
||||
build_domain_fa_user(...).tap do |model|
|
||||
model.save!
|
||||
end
|
||||
end
|
||||
|
||||
@@ -174,4 +180,10 @@ class SpecUtil
|
||||
fa_id: fa_id || @last_fa_id,
|
||||
)
|
||||
end
|
||||
|
||||
def self.create_domain_fa_post(...)
|
||||
build_domain_fa_post(...).tap do |model|
|
||||
model.save!
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user