pluck instead of full object select
This commit is contained in:
2
Gemfile
2
Gemfile
@@ -99,5 +99,5 @@ gem "influxdb-client"
|
||||
gem "discard"
|
||||
gem "concurrent-ruby-ext", require: "concurrent"
|
||||
gem "concurrent-ruby-edge", require: "concurrent-edge"
|
||||
|
||||
gem "pluck_each"
|
||||
gem "good_job"
|
||||
|
||||
12
Gemfile.lock
12
Gemfile.lock
@@ -117,7 +117,6 @@ GEM
|
||||
erubi (1.12.0)
|
||||
et-orbi (1.2.7)
|
||||
tzinfo
|
||||
ffi (1.15.5)
|
||||
fugit (1.8.1)
|
||||
et-orbi (~> 1, >= 1.2.7)
|
||||
raabro (~> 1.4)
|
||||
@@ -157,9 +156,6 @@ GEM
|
||||
activerecord
|
||||
kaminari-core (= 1.2.2)
|
||||
kaminari-core (1.2.2)
|
||||
listen (3.8.0)
|
||||
rb-fsevent (~> 0.10, >= 0.10.3)
|
||||
rb-inotify (~> 0.9, >= 0.9.10)
|
||||
loofah (2.19.1)
|
||||
crass (~> 1.0.2)
|
||||
nokogiri (>= 1.5.9)
|
||||
@@ -189,6 +185,9 @@ GEM
|
||||
mini_portile2 (~> 2.8.0)
|
||||
racc (~> 1.4)
|
||||
pg (1.4.5)
|
||||
pluck_each (0.2.0)
|
||||
activerecord (> 3.2.0)
|
||||
activesupport (> 3.0.0)
|
||||
pry (0.14.2)
|
||||
coderay (~> 1.1)
|
||||
method_source (~> 1.0)
|
||||
@@ -232,9 +231,6 @@ GEM
|
||||
thor (~> 1.0)
|
||||
zeitwerk (~> 2.5)
|
||||
rake (13.0.6)
|
||||
rb-fsevent (0.11.2)
|
||||
rb-inotify (0.10.1)
|
||||
ffi (~> 1.0)
|
||||
regexp_parser (2.6.2)
|
||||
reline (0.3.2)
|
||||
io-console (~> 0.5)
|
||||
@@ -328,9 +324,9 @@ DEPENDENCIES
|
||||
influxdb-client
|
||||
jbuilder
|
||||
kaminari
|
||||
listen
|
||||
nokogiri
|
||||
pg
|
||||
pluck_each
|
||||
pry
|
||||
pry-stack_explorer
|
||||
puma (~> 5.0)
|
||||
|
||||
@@ -5,10 +5,14 @@ class FaPostEnqueuer
|
||||
@low_water_mark = low_water_mark
|
||||
@high_water_mark = high_water_mark
|
||||
raise if @high_water_mark <= @low_water_mark
|
||||
@post_iterator = Domain::Fa::Post.
|
||||
where(file_url_str: nil, state: "ok").
|
||||
select(:id, :fa_id, :file_url_str, :state, :state_detail, :log_entry_detail).
|
||||
find_each(start: start_at)
|
||||
@post_iterator = Enumerator.new do |e|
|
||||
Domain::Fa::Post.
|
||||
where(file_url_str: nil, state: "ok").
|
||||
where("id >= ?", start_at).
|
||||
pluck_each(:id, :fa_id) do |p|
|
||||
e << p
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def run_once
|
||||
@@ -16,13 +20,16 @@ class FaPostEnqueuer
|
||||
if already_enqueued <= @low_water_mark
|
||||
to_enqueue = @high_water_mark - already_enqueued
|
||||
logger.info("enqueuing #{to_enqueue.to_s.bold} more posts - #{already_enqueued.to_s.bold} already enqueued")
|
||||
to_enqueue.times do
|
||||
GoodJob::Bulk.enqueue do
|
||||
post = @post_iterator.next
|
||||
Domain::Fa::Job::ScanPostJob.perform_later(post: post)
|
||||
logger.info "enqueue #{post.id.to_s.bold} (fa_id: #{post.fa_id.to_s.bold})"
|
||||
rows = to_enqueue.times.map do
|
||||
@post_iterator.next
|
||||
end
|
||||
ReduxApplicationRecord.transaction do
|
||||
rows.each do |post_id, fa_id|
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
end
|
||||
end
|
||||
throw StopIteration if rows.empty?
|
||||
else
|
||||
logger.info(
|
||||
"#{already_enqueued.to_s.bold} already enqueued (max #{@high_water_mark.to_s.bold}) - " +
|
||||
|
||||
@@ -3,11 +3,11 @@ describe FaPostEnqueuer do
|
||||
ActiveJob::Base.queue_adapter = :test
|
||||
creator = SpecUtil.build_domain_fa_user
|
||||
creator.save!
|
||||
posts = 7.times.map do
|
||||
post_fa_ids = 7.times.map do
|
||||
post = SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
|
||||
p.save!
|
||||
end
|
||||
end
|
||||
end.map(&:fa_id)
|
||||
|
||||
enqueuer = FaPostEnqueuer.new(
|
||||
start_at: 0,
|
||||
@@ -15,21 +15,27 @@ describe FaPostEnqueuer do
|
||||
low_water_mark: 3,
|
||||
)
|
||||
|
||||
enqueued_fa_ids = proc do
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).map do |job|
|
||||
job[:args][0][:fa_id]
|
||||
end
|
||||
end
|
||||
|
||||
enqueuer.run_once
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(5)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
|
||||
enqueuer.run_once
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(4)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
|
||||
enqueuer.run_once
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(5)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob, 3)
|
||||
|
||||
expect do
|
||||
enqueuer.run_once
|
||||
end.to raise_exception(StopIteration)
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to eq(2)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user