improve post enqueuer speed

This commit is contained in:
Dylan Knutson
2023-04-01 11:22:00 +09:00
parent 92d79a9f9d
commit 7108f33a01
19 changed files with 172 additions and 3197 deletions

View File

@@ -105,3 +105,4 @@ gem "good_job"
gem "neighbor"
gem "disco"
gem "faiss"
# gem "pg_party"

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::FaJobBase < Scraper::JobBase
class Domain::Fa::Job::Base < Scraper::JobBase
discard_on ActiveJob::DeserializationError
def self.http_factory_method

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::Base
queue_as :fa_browse_page
ignore_signature_args :caused_by_entry

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::Base
queue_as :static_file
ignore_signature_args :caused_by_entry

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::Base
queue_as :fa_post
ignore_signature_args :caused_by_entry

View File

@@ -1,7 +1,9 @@
# Gather and record all the follows for a user
# This will be used to create an index of follower -> followed
# of a specific user, for recommender training
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
USERS_PER_FULL_PAGE = Rails.env.test? ? 9 : 190
queue_as :fa_user_follows
ignore_signature_args :caused_by_entry
@@ -104,12 +106,12 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
end
# the last page will have < 200 users, we know we're at the end
ret = :break if user_list.length < 190
ret = :break if user_list.length < USERS_PER_FULL_PAGE
@last_in_user_list = user_list.last.url_name
@total_follows_seen += user_list.length
users_to_create = []
users_to_create_hashes = []
duration, followed_user_ids = measure do
existing_url_name_to_id = Domain::Fa::User.where(
url_name: user_list.map(&:url_name),
@@ -117,7 +119,7 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
[url_name, id]
end.to_h
users_to_create = user_list.reject do |user|
users_to_create_hashes = user_list.reject do |user|
existing_url_name_to_id[user.url_name]
end.map do |user|
{
@@ -128,20 +130,15 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
end
created_user_ids = Domain::Fa::User.upsert_all(
users_to_create,
users_to_create_hashes,
unique_by: :url_name,
update_only: :url_name,
returning: %i[id url_name],
).map do |row|
row["id"]
end unless users_to_create.empty?
end unless users_to_create_hashes.empty?
users_to_create.each do |user_hash|
Domain::Fa::Job::UserPageJob.perform_later(
url_name: user_hash[:url_name],
caused_by_entry: best_caused_by_entry,
)
end
enqueue_new_user_pagescan_jobs(users_to_create_hashes)
(created_user_ids || []) + existing_url_name_to_id.values
end
@@ -150,7 +147,7 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
logger.info(
"page #{@page_number.to_s.bold} - " +
"#{user_list.length.to_s.bold} users on page, " +
"created #{users_to_create.size.to_s.bold} " +
"created #{users_to_create_hashes.size.to_s.bold} " +
"(took #{duration_ms.to_s.bold} ms)"
)
@@ -161,6 +158,30 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
ret
end
def enqueue_new_user_pagescan_jobs(user_hashes)
user_page_jobs = user_hashes.map do |user_hash|
Domain::Fa::Job::UserPageJob.new(
url_name: user_hash[:url_name],
caused_by_entry: best_caused_by_entry,
)
end
key_mapper = Domain::Fa::Job::UserPageJob.good_job_concurrency_config[:key]
key_to_job = user_page_jobs.map do |job|
key = job.instance_eval(&key_mapper)
job.good_job_concurrency_key = key
[key, job]
end.to_h
existing_keys = GoodJob::Job.where(concurrency_key: key_to_job.keys).pluck(:concurrency_key)
existing_keys.each do |key|
key_to_job.delete(key)
end
suppress_good_job_concurrency(Domain::Fa::Job::UserPageJob) do
GoodJob::Bulk.enqueue(key_to_job.values)
end
end
def best_caused_by_entry
@first_job_entry || @caused_by_entry
end

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::Base
queue_as :fa_user_gallery
ignore_signature_args :caused_by_entry

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::FaJobBase
class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::Base
queue_as :fa_user_page
ignore_signature_args :caused_by_entry

View File

@@ -49,6 +49,14 @@ class Scraper::JobBase < ApplicationJob
end,
)
def suppress_good_job_concurrency(for_klass)
old_limit = for_klass.good_job_concurrency_config[:total_limit]
for_klass.good_job_concurrency_config[:total_limit] = nil
yield
ensure
for_klass.good_job_concurrency_config[:total_limit] = old_limit
end
def write_point(name, tags: {}, fields: {})
Metrics::Reporter.singleton.write_point(
name,

View File

@@ -1,4 +1,4 @@
class Domain::Fa::PostEnqueuer
class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
include HasColorLogger
def initialize(start_at:, low_water_mark:, high_water_mark:)
@@ -20,15 +20,29 @@ class Domain::Fa::PostEnqueuer
if already_enqueued <= @low_water_mark
to_enqueue = @high_water_mark - already_enqueued
logger.info("enqueuing #{to_enqueue.to_s.bold} more posts - #{already_enqueued.to_s.bold} already enqueued")
rows = to_enqueue.times.map do
@post_iterator.next
end
ReduxApplicationRecord.transaction do
rows.each do |post_id, fa_id|
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
rows = measure(proc { |p| "gather #{p.length.to_s.bold} posts to enqueue" }) do
to_enqueue.times.map do
@post_iterator.next
end
end
key_mapper = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:key]
jobs = rows.map do |post_id, fa_id|
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
job = Domain::Fa::Job::ScanPostJob.new({ fa_id: fa_id })
job.good_job_concurrency_key = job.instance_eval(&key_mapper)
job
end
measure("enqueue jobs") do
old_limit = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit]
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = nil
jobs.each_slice(10) do |slice|
GoodJob::Bulk.enqueue(slice)
end
ensure
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = old_limit
end
throw StopIteration if rows.empty?
else
logger.info(

View File

@@ -0,0 +1,13 @@
module DebugHelpers
def debug_sql
logger = ActiveRecord::Base.logger
ActiveRecord::Base.logger = Logger.new($stdout)
yield
ensure
ActiveRecord::Base.logger = logger
end
def quiet_color_logger(&block)
ColorLogger.quiet(&block)
end
end

View File

@@ -28,12 +28,6 @@ describe Domain::Fa::Job::BrowsePageJob do
end
end
shared_context "test queue adapter" do
before do
ActiveJob::Base.queue_adapter = :test
end
end
shared_examples "enqueue post scan" do |expect_to_enqueue|
it "enqueues post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to match([
@@ -117,7 +111,6 @@ describe Domain::Fa::Job::BrowsePageJob do
end
context "with no posts found on page" do
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
@@ -139,7 +132,6 @@ describe Domain::Fa::Job::BrowsePageJob do
context "with one unseen post" do
include_context "user and post getters"
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
@@ -193,7 +185,6 @@ describe Domain::Fa::Job::BrowsePageJob do
context "with one seen post" do
include_context "user and post getters"
include_context "create user and post"
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
@@ -277,7 +268,6 @@ describe Domain::Fa::Job::BrowsePageJob do
end
context "with a page that responds with an error" do
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [

View File

@@ -1,6 +1,8 @@
require "rails_helper"
describe Domain::Fa::Job::UserFollowsJob do
FOLLOWS_ON_ZZREG_PAGE = 10
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
let(:set_zzreg_http_mock) {
proc {
@@ -26,7 +28,6 @@ describe Domain::Fa::Job::UserFollowsJob do
}
before do
ActiveJob::Base.queue_adapter = :test
Scraper::ClientFactory.http_client_mock = http_client_mock
@zzreg_mock_log_entries = set_zzreg_http_mock.call
end
@@ -39,7 +40,7 @@ describe Domain::Fa::Job::UserFollowsJob do
it "creates the right follows" do
e = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
user.reload
expect(user.follows.length).to eq(770)
expect(user.follows.length).to eq(FOLLOWS_ON_ZZREG_PAGE)
expect(user.scanned_follows_at).to_not be_nil
end
end
@@ -49,7 +50,7 @@ describe Domain::Fa::Job::UserFollowsJob do
expect do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
expect(ret).to_not be_a(Exception)
end.to change { Domain::Fa::User.count }.by(771)
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE + 1)
end
it "enqueues a user page job" do
@@ -68,13 +69,13 @@ describe Domain::Fa::Job::UserFollowsJob do
it "can be performed by url_name" do
expect do
Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
end.to change { Domain::Fa::User.count }.by(770)
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE)
end
it "can be performed by direct post object" do
expect do
Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
end.to change { Domain::Fa::User.count }.by(770)
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE)
end
it "does not enqueue a user page job" do
@@ -114,7 +115,7 @@ describe Domain::Fa::Job::UserFollowsJob do
expect do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
end.to change { Domain::Fa::User.count }.by(769)
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE - 1)
followed.reload
expect(followed.num_submissions).to eq(10)
@@ -125,18 +126,18 @@ describe Domain::Fa::Job::UserFollowsJob do
expect(followed.state_detail).to eq({})
# newly created users should have the right 'first_seen_entry' id
stripes_user = Domain::Fa::User.find_by url_name: "stripes"
expect(stripes_user).to_not be_nil
expect(stripes_user.state_detail["first_seen_entry"]).to eq(@zzreg_mock_log_entries[0].id)
accelo_user = Domain::Fa::User.find_by url_name: "accelo"
expect(accelo_user).to_not be_nil
expect(accelo_user.state_detail["first_seen_entry"]).to eq(@zzreg_mock_log_entries[0].id)
end
it "newly inserted users have a name associated with them" do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
user = Domain::Fa::User.find_by(url_name: "aimi")
user = Domain::Fa::User.find_by(url_name: "accelo")
expect(user).to_not be_nil
expect(user.name).to eq("Aimi")
expect(user.name).to eq("Accelo")
end
include_examples "zzreg follow creation"
@@ -165,7 +166,7 @@ describe Domain::Fa::Job::UserFollowsJob do
expect(ret).to_not be_a(Exception)
user.reload
expect(user.follows.length).to eq(770)
expect(user.follows.length).to eq(FOLLOWS_ON_ZZREG_PAGE)
expect(user.follows.where(followed: smaz_user).first).to be_nil
expect(user.follows.where(followed: agi_type01_user).first).to eq(follow_2)
@@ -180,8 +181,21 @@ describe Domain::Fa::Job::UserFollowsJob do
# newly created users are enqueued by url name
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:url_name] == "meesh"
job[:args][0][:url_name] == "accelo"
end).to_not be_nil
end
it "does not enqueue a job if the user is not new" do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to eq(FOLLOWS_ON_ZZREG_PAGE - 1)
end
it "does not enqueue jobs already in the queue" do
Domain::Fa::Job::UserPageJob.perform_later({ url_name: "accelo" })
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to eq(FOLLOWS_ON_ZZREG_PAGE - 1)
end
end
end

View File

@@ -1,6 +1,7 @@
require "rails_helper"
describe Domain::Fa::PostEnqueuer do
it "works" do
ActiveJob::Base.queue_adapter = :test
creator = SpecUtil.build_domain_fa_user
creator.save!
post_fa_ids = 7.times.map do
@@ -25,6 +26,11 @@ describe Domain::Fa::PostEnqueuer do
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
# jobs should have concurrency keys
expect(
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).first[:good_job].concurrency_key
).to_not be_nil
enqueuer.run_once
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
@@ -33,9 +39,7 @@ describe Domain::Fa::PostEnqueuer do
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob, 3)
expect do
enqueuer.run_once
end.to raise_exception(StopIteration)
expect { enqueuer.run_once }.to raise_exception(StopIteration)
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
end
end

View File

@@ -1,6 +1,7 @@
require "rails_helper"
describe Domain::Fa::UserEnqueuer do
it "works" do
ActiveJob::Base.queue_adapter = :test
users = 7.times.map do
SpecUtil.create_domain_fa_user
end

View File

@@ -14,16 +14,26 @@
#
# See https://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration
require "./spec/helpers/twitter_helpers"
require "./spec/helpers/debug_helpers"
RSpec.configure do |config|
config.include TwitterHelpers
config.include DebugHelpers
# can tag classes with `quiet: false` to make ColorLogger loud
config.around(:each) do |example|
if example.example.metadata[:quiet].is_a?(FalseClass)
example.call
else
ColorLogger.quiet(&example)
quiet_color_logger(&example)
end
end
config.around(:each) do |example|
if example.example.metadata[:debug_sql]
debug_sql(&example)
else
example.call
end
end

View File

@@ -130,29 +130,48 @@ class SpecUtil
end
def self.enqueued_jobs(job_klass = nil)
unless ::ActiveJob::QueueAdapters::TestAdapter === ::ActiveJob::Base.queue_adapter
raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
end
# unless ::ActiveJob::QueueAdapters::TestAdapter === ::ActiveJob::Base.queue_adapter
# raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
# end
jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
jobs.filter do |job|
GoodJob::Job.all.map do |job|
{
job: job.job_class.constantize,
queue: job.queue_name,
priority: job.priority,
args: ::ActiveJob::Arguments.deserialize(job.serialized_params["arguments"]),
good_job: job,
}
end.filter do |job|
if job_klass
job[:job] == job_klass
else
true
end
end.map do |job|
job.slice(:job, :queue, :priority).merge({
args: ::ActiveJob::Arguments.deserialize(job[:args]),
})
end
# jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
# jobs.filter do |job|
# if job_klass
# job[:job] == job_klass
# else
# true
# end
# end.map do |job|
# job.slice(:job, :queue, :priority).merge({
# args: ::ActiveJob::Arguments.deserialize(job[:args]),
# })
# end
end
def self.shift_jobs(job_klass, by = 1)
jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
# jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
# by.times do
# index = jobs.index { |job| job[:job] == job_klass }
# jobs.delete_at(index)
# end
by.times do
index = jobs.index { |job| job[:job] == job_klass }
jobs.delete_at(index)
job = enqueued_jobs.find { |job| job[:job] == job_klass }
job[:good_job].destroy if job
end
end

File diff suppressed because it is too large Load Diff

View File

@@ -253,7 +253,7 @@ class Domain::Fa::Parser::ReduxPageTest < ActiveSupport::TestCase
def test_watchlist_zzreg
parser = get_parser("watchlist_zzreg.html", require_logged_in: false)
user_list = parser.user_list
assert_equal 770, user_list.length
assert_equal 10, user_list.length
assert_equal "-creeps", user_list[0].url_name
assert_equal "-creeps", user_list[0].name