task for filling in fa post holes

This commit is contained in:
Dylan Knutson
2023-04-02 20:51:29 +09:00
parent 450a5844eb
commit 4c774faafd
3 changed files with 149 additions and 73 deletions

View File

@@ -3,17 +3,26 @@ class Domain::Fa::PostEnqueuer
include HasColorLogger
include HasMeasureDuration
def initialize(start_at:, low_water_mark:, high_water_mark:)
def initialize(reverse_scan_holes:, start_at:, low_water_mark:, high_water_mark:)
@low_water_mark = low_water_mark
@high_water_mark = high_water_mark
raise if @high_water_mark <= @low_water_mark
@post_iterator = Enumerator.new do |e|
Domain::Fa::Post.
where("id >= ?", start_at).
where("file_id is null").
where(state: "ok").
pluck_each(:id, :fa_id, :file_url_str) do |p|
e << p
if reverse_scan_holes
while start_at > 0
if !Domain::Fa::Post.exists?(fa_id: start_at)
e << [nil, start_at, nil]
end
start_at -= 1
end
else
Domain::Fa::Post.
where("id >= ?", start_at).
where("file_id is null").
where(state: "ok").
pluck_each(:id, :fa_id, :file_url_str) do |p|
e << p
end
end
end
end
@@ -26,7 +35,9 @@ class Domain::Fa::PostEnqueuer
rows = measure(proc { |p| "gather #{p.length.to_s.bold} posts to enqueue" }) do
to_enqueue.times.map do
@post_iterator.next
end
rescue StopIteration
nil
end.reject(&:nil?)
end
measure("enqueue jobs") do
@@ -34,15 +45,22 @@ class Domain::Fa::PostEnqueuer
rows.each do |post_id, fa_id, file_url_str|
if file_url_str.nil?
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
logger.info "post scan #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
action = "page scan"
else
Domain::Fa::Job::ScanFileJob.perform_later({ fa_id: fa_id })
logger.info "file scan #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
action = "file scan"
end
if post_id.nil?
model_info = "(not seen)"
else
model_info = "post #{post_id.to_s.bold}"
end
logger.info "[#{action}] [#{model_info}] (fa_id: #{fa_id.to_s.bold})"
end
end
end
throw StopIteration if rows.empty?
raise StopIteration if rows.empty?
else
logger.info(
"#{already_enqueued.to_s.bold} already enqueued (max #{@high_water_mark.to_s.bold}) - " +

View File

@@ -7,6 +7,25 @@ namespace :fa do
poll_duration = 10
enqueuer = Domain::Fa::PostEnqueuer.new(
reverse_scan_holes: false,
start_at: start_at,
low_water_mark: low_water_mark,
high_water_mark: high_water_mark,
)
loop do
sleep poll_duration if enqueuer.run_once == :sleep
end
end
task :enqueue_missing_posts => [:set_logger_stdout, :environment] do |t, args|
start_at = ENV["start_at"]&.to_i || raise("need start_at (highest fa_id already present)")
low_water_mark = 50
high_water_mark = 300
poll_duration = 10
enqueuer = Domain::Fa::PostEnqueuer.new(
reverse_scan_holes: true,
start_at: start_at,
low_water_mark: low_water_mark,
high_water_mark: high_water_mark,

View File

@@ -2,18 +2,6 @@ require "rails_helper"
describe Domain::Fa::PostEnqueuer do
let(:creator) { SpecUtil.create_domain_fa_user }
let!(:posts) do
no_file_url = 4.times.map do
SpecUtil.create_domain_fa_post(creator: creator)
end
with_file_url = 3.times.map do
SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
p.file_url_str = "https://www.example.com/img.jpg"
p.save!
end
end
no_file_url + with_file_url
end
let(:job_klasses) {
[
Domain::Fa::Job::ScanPostJob,
@@ -34,63 +22,114 @@ describe Domain::Fa::PostEnqueuer do
end
end
end
let(:enqueuer) do
Domain::Fa::PostEnqueuer.new(
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,
)
context "forward scanning posts missing file" do
let!(:posts) do
no_file_url = 4.times.map do
SpecUtil.create_domain_fa_post(creator: creator)
end
with_file_url = 3.times.map do
SpecUtil.build_domain_fa_post(creator: creator).tap do |p|
p.file_url_str = "https://www.example.com/img.jpg"
p.save!
end
end
no_file_url + with_file_url
end
let(:enqueuer) do
Domain::Fa::PostEnqueuer.new(
reverse_scan_holes: false,
start_at: 0,
high_water_mark: 5,
low_water_mark: 3,
)
end
it "enqueues posts" do
post_fa_ids = posts.map(&:fa_id)
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 4 +
[Domain::Fa::Job::ScanFileJob] * 1
)
SpecUtil.shift_jobs
# jobs should have concurrency keys
expect(
SpecUtil.enqueued_jobs(job_klasses).first[:good_job].concurrency_key
).to_not be_nil
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 3 +
[Domain::Fa::Job::ScanFileJob] * 1
)
SpecUtil.shift_jobs
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 2 +
[Domain::Fa::Job::ScanFileJob] * 3
)
SpecUtil.shift_jobs(job_klasses, 3)
expect { enqueuer.run_once }.to raise_exception(StopIteration)
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
end
it "does not enqueue posts which are already in the queue" do
post_fa_ids = posts.map(&:fa_id)
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: post_fa_ids[1] })
expect(enqueued_fa_ids.call).to eq([post_fa_ids[1]])
# post [1] should be filtered out
enqueuer.run_once
expect(enqueued_fa_ids.call[0]).to eq(post_fa_ids[1])
expect(enqueued_fa_ids.call[1..].shuffle).to contain_exactly(
post_fa_ids[0],
post_fa_ids[2],
post_fa_ids[3],
)
end
end
it "enqueues posts" do
post_fa_ids = posts.map(&:fa_id)
context "reverse enqueues missing models" do
let!(:post1) do
SpecUtil.create_domain_fa_post(creator: creator, fa_id: 3)
end
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 4 +
[Domain::Fa::Job::ScanFileJob] * 1
)
SpecUtil.shift_jobs(job_klasses)
let(:enqueuer) do
Domain::Fa::PostEnqueuer.new(
reverse_scan_holes: true,
start_at: 7,
high_water_mark: 5,
low_water_mark: 2,
)
end
# jobs should have concurrency keys
expect(
SpecUtil.enqueued_jobs(job_klasses).first[:good_job].concurrency_key
).to_not be_nil
it "works" do
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq([7, 6, 5, 4, 2])
SpecUtil.shift_jobs
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 3 +
[Domain::Fa::Job::ScanFileJob] * 1
)
SpecUtil.shift_jobs(job_klasses)
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq([6, 5, 4, 2])
SpecUtil.shift_jobs
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
expect(enqueued_fa_jobs.call).to eq(
[Domain::Fa::Job::ScanPostJob] * 2 +
[Domain::Fa::Job::ScanFileJob] * 3
)
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq([5, 4, 2])
SpecUtil.shift_jobs
SpecUtil.shift_jobs(job_klasses, 3)
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq([4, 2, 1])
SpecUtil.shift_jobs
expect { enqueuer.run_once }.to raise_exception(StopIteration)
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
end
it "does not enqueue posts which are already in the queue" do
post_fa_ids = posts.map(&:fa_id)
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: post_fa_ids[1] })
expect(enqueued_fa_ids.call).to eq([post_fa_ids[1]])
# post [1] should be filtered out
enqueuer.run_once
expect(enqueued_fa_ids.call[0]).to eq(post_fa_ids[1])
expect(enqueued_fa_ids.call[1..].shuffle).to contain_exactly(
post_fa_ids[0],
post_fa_ids[2],
post_fa_ids[3],
)
expect { enqueuer.run_once }.to raise_exception(StopIteration)
end
end
end