improve post enqueuer speed
This commit is contained in:
1
Gemfile
1
Gemfile
@@ -105,3 +105,4 @@ gem "good_job"
|
||||
gem "neighbor"
|
||||
gem "disco"
|
||||
gem "faiss"
|
||||
# gem "pg_party"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::FaJobBase < Scraper::JobBase
|
||||
class Domain::Fa::Job::Base < Scraper::JobBase
|
||||
discard_on ActiveJob::DeserializationError
|
||||
|
||||
def self.http_factory_method
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::Base
|
||||
queue_as :fa_browse_page
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::Base
|
||||
queue_as :static_file
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::Base
|
||||
queue_as :fa_post
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
# Gather and record all the follows for a user
|
||||
# This will be used to create an index of follower -> followed
|
||||
# of a specific user, for recommender training
|
||||
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::Base
|
||||
USERS_PER_FULL_PAGE = Rails.env.test? ? 9 : 190
|
||||
|
||||
queue_as :fa_user_follows
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
@@ -104,12 +106,12 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
end
|
||||
|
||||
# the last page will have < 200 users, we know we're at the end
|
||||
ret = :break if user_list.length < 190
|
||||
ret = :break if user_list.length < USERS_PER_FULL_PAGE
|
||||
|
||||
@last_in_user_list = user_list.last.url_name
|
||||
@total_follows_seen += user_list.length
|
||||
|
||||
users_to_create = []
|
||||
users_to_create_hashes = []
|
||||
duration, followed_user_ids = measure do
|
||||
existing_url_name_to_id = Domain::Fa::User.where(
|
||||
url_name: user_list.map(&:url_name),
|
||||
@@ -117,7 +119,7 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
[url_name, id]
|
||||
end.to_h
|
||||
|
||||
users_to_create = user_list.reject do |user|
|
||||
users_to_create_hashes = user_list.reject do |user|
|
||||
existing_url_name_to_id[user.url_name]
|
||||
end.map do |user|
|
||||
{
|
||||
@@ -128,20 +130,15 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
end
|
||||
|
||||
created_user_ids = Domain::Fa::User.upsert_all(
|
||||
users_to_create,
|
||||
users_to_create_hashes,
|
||||
unique_by: :url_name,
|
||||
update_only: :url_name,
|
||||
returning: %i[id url_name],
|
||||
).map do |row|
|
||||
row["id"]
|
||||
end unless users_to_create.empty?
|
||||
end unless users_to_create_hashes.empty?
|
||||
|
||||
users_to_create.each do |user_hash|
|
||||
Domain::Fa::Job::UserPageJob.perform_later(
|
||||
url_name: user_hash[:url_name],
|
||||
caused_by_entry: best_caused_by_entry,
|
||||
)
|
||||
end
|
||||
enqueue_new_user_pagescan_jobs(users_to_create_hashes)
|
||||
|
||||
(created_user_ids || []) + existing_url_name_to_id.values
|
||||
end
|
||||
@@ -150,7 +147,7 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
logger.info(
|
||||
"page #{@page_number.to_s.bold} - " +
|
||||
"#{user_list.length.to_s.bold} users on page, " +
|
||||
"created #{users_to_create.size.to_s.bold} " +
|
||||
"created #{users_to_create_hashes.size.to_s.bold} " +
|
||||
"(took #{duration_ms.to_s.bold} ms)"
|
||||
)
|
||||
|
||||
@@ -161,6 +158,30 @@ class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
|
||||
ret
|
||||
end
|
||||
|
||||
def enqueue_new_user_pagescan_jobs(user_hashes)
|
||||
user_page_jobs = user_hashes.map do |user_hash|
|
||||
Domain::Fa::Job::UserPageJob.new(
|
||||
url_name: user_hash[:url_name],
|
||||
caused_by_entry: best_caused_by_entry,
|
||||
)
|
||||
end
|
||||
|
||||
key_mapper = Domain::Fa::Job::UserPageJob.good_job_concurrency_config[:key]
|
||||
key_to_job = user_page_jobs.map do |job|
|
||||
key = job.instance_eval(&key_mapper)
|
||||
job.good_job_concurrency_key = key
|
||||
[key, job]
|
||||
end.to_h
|
||||
existing_keys = GoodJob::Job.where(concurrency_key: key_to_job.keys).pluck(:concurrency_key)
|
||||
existing_keys.each do |key|
|
||||
key_to_job.delete(key)
|
||||
end
|
||||
|
||||
suppress_good_job_concurrency(Domain::Fa::Job::UserPageJob) do
|
||||
GoodJob::Bulk.enqueue(key_to_job.values)
|
||||
end
|
||||
end
|
||||
|
||||
def best_caused_by_entry
|
||||
@first_job_entry || @caused_by_entry
|
||||
end
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::Base
|
||||
queue_as :fa_user_gallery
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::FaJobBase
|
||||
class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::Base
|
||||
queue_as :fa_user_page
|
||||
ignore_signature_args :caused_by_entry
|
||||
|
||||
|
||||
@@ -49,6 +49,14 @@ class Scraper::JobBase < ApplicationJob
|
||||
end,
|
||||
)
|
||||
|
||||
def suppress_good_job_concurrency(for_klass)
|
||||
old_limit = for_klass.good_job_concurrency_config[:total_limit]
|
||||
for_klass.good_job_concurrency_config[:total_limit] = nil
|
||||
yield
|
||||
ensure
|
||||
for_klass.good_job_concurrency_config[:total_limit] = old_limit
|
||||
end
|
||||
|
||||
def write_point(name, tags: {}, fields: {})
|
||||
Metrics::Reporter.singleton.write_point(
|
||||
name,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
class Domain::Fa::PostEnqueuer
|
||||
class Domain::Fa::PostEnqueuer < Domain::Fa::BulkJob
|
||||
include HasColorLogger
|
||||
|
||||
def initialize(start_at:, low_water_mark:, high_water_mark:)
|
||||
@@ -20,15 +20,29 @@ class Domain::Fa::PostEnqueuer
|
||||
if already_enqueued <= @low_water_mark
|
||||
to_enqueue = @high_water_mark - already_enqueued
|
||||
logger.info("enqueuing #{to_enqueue.to_s.bold} more posts - #{already_enqueued.to_s.bold} already enqueued")
|
||||
rows = to_enqueue.times.map do
|
||||
@post_iterator.next
|
||||
end
|
||||
ReduxApplicationRecord.transaction do
|
||||
rows.each do |post_id, fa_id|
|
||||
Domain::Fa::Job::ScanPostJob.perform_later({ fa_id: fa_id })
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
rows = measure(proc { |p| "gather #{p.length.to_s.bold} posts to enqueue" }) do
|
||||
to_enqueue.times.map do
|
||||
@post_iterator.next
|
||||
end
|
||||
end
|
||||
|
||||
key_mapper = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:key]
|
||||
jobs = rows.map do |post_id, fa_id|
|
||||
logger.info "enqueue #{post_id.to_s.bold} (fa_id: #{fa_id.to_s.bold})"
|
||||
job = Domain::Fa::Job::ScanPostJob.new({ fa_id: fa_id })
|
||||
job.good_job_concurrency_key = job.instance_eval(&key_mapper)
|
||||
job
|
||||
end
|
||||
|
||||
measure("enqueue jobs") do
|
||||
old_limit = Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit]
|
||||
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = nil
|
||||
jobs.each_slice(10) do |slice|
|
||||
GoodJob::Bulk.enqueue(slice)
|
||||
end
|
||||
ensure
|
||||
Domain::Fa::Job::ScanPostJob.good_job_concurrency_config[:total_limit] = old_limit
|
||||
end
|
||||
throw StopIteration if rows.empty?
|
||||
else
|
||||
logger.info(
|
||||
|
||||
13
spec/helpers/debug_helpers.rb
Normal file
13
spec/helpers/debug_helpers.rb
Normal file
@@ -0,0 +1,13 @@
|
||||
module DebugHelpers
|
||||
def debug_sql
|
||||
logger = ActiveRecord::Base.logger
|
||||
ActiveRecord::Base.logger = Logger.new($stdout)
|
||||
yield
|
||||
ensure
|
||||
ActiveRecord::Base.logger = logger
|
||||
end
|
||||
|
||||
def quiet_color_logger(&block)
|
||||
ColorLogger.quiet(&block)
|
||||
end
|
||||
end
|
||||
@@ -28,12 +28,6 @@ describe Domain::Fa::Job::BrowsePageJob do
|
||||
end
|
||||
end
|
||||
|
||||
shared_context "test queue adapter" do
|
||||
before do
|
||||
ActiveJob::Base.queue_adapter = :test
|
||||
end
|
||||
end
|
||||
|
||||
shared_examples "enqueue post scan" do |expect_to_enqueue|
|
||||
it "enqueues post scan job" do
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to match([
|
||||
@@ -117,7 +111,6 @@ describe Domain::Fa::Job::BrowsePageJob do
|
||||
end
|
||||
|
||||
context "with no posts found on page" do
|
||||
include_context "test queue adapter"
|
||||
let! :log_entries do
|
||||
SpecUtil.init_http_client_mock(
|
||||
http_client_mock, [
|
||||
@@ -139,7 +132,6 @@ describe Domain::Fa::Job::BrowsePageJob do
|
||||
|
||||
context "with one unseen post" do
|
||||
include_context "user and post getters"
|
||||
include_context "test queue adapter"
|
||||
let! :log_entries do
|
||||
SpecUtil.init_http_client_mock(
|
||||
http_client_mock, [
|
||||
@@ -193,7 +185,6 @@ describe Domain::Fa::Job::BrowsePageJob do
|
||||
context "with one seen post" do
|
||||
include_context "user and post getters"
|
||||
include_context "create user and post"
|
||||
include_context "test queue adapter"
|
||||
let! :log_entries do
|
||||
SpecUtil.init_http_client_mock(
|
||||
http_client_mock, [
|
||||
@@ -277,7 +268,6 @@ describe Domain::Fa::Job::BrowsePageJob do
|
||||
end
|
||||
|
||||
context "with a page that responds with an error" do
|
||||
include_context "test queue adapter"
|
||||
let! :log_entries do
|
||||
SpecUtil.init_http_client_mock(
|
||||
http_client_mock, [
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe Domain::Fa::Job::UserFollowsJob do
|
||||
FOLLOWS_ON_ZZREG_PAGE = 10
|
||||
|
||||
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
|
||||
let(:set_zzreg_http_mock) {
|
||||
proc {
|
||||
@@ -26,7 +28,6 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
}
|
||||
|
||||
before do
|
||||
ActiveJob::Base.queue_adapter = :test
|
||||
Scraper::ClientFactory.http_client_mock = http_client_mock
|
||||
@zzreg_mock_log_entries = set_zzreg_http_mock.call
|
||||
end
|
||||
@@ -39,7 +40,7 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
it "creates the right follows" do
|
||||
e = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
user.reload
|
||||
expect(user.follows.length).to eq(770)
|
||||
expect(user.follows.length).to eq(FOLLOWS_ON_ZZREG_PAGE)
|
||||
expect(user.scanned_follows_at).to_not be_nil
|
||||
end
|
||||
end
|
||||
@@ -49,7 +50,7 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
expect do
|
||||
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
|
||||
expect(ret).to_not be_a(Exception)
|
||||
end.to change { Domain::Fa::User.count }.by(771)
|
||||
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE + 1)
|
||||
end
|
||||
|
||||
it "enqueues a user page job" do
|
||||
@@ -68,13 +69,13 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
it "can be performed by url_name" do
|
||||
expect do
|
||||
Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
|
||||
end.to change { Domain::Fa::User.count }.by(770)
|
||||
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE)
|
||||
end
|
||||
|
||||
it "can be performed by direct post object" do
|
||||
expect do
|
||||
Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
end.to change { Domain::Fa::User.count }.by(770)
|
||||
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE)
|
||||
end
|
||||
|
||||
it "does not enqueue a user page job" do
|
||||
@@ -114,7 +115,7 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
expect do
|
||||
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
expect(ret).to_not be_a(Exception)
|
||||
end.to change { Domain::Fa::User.count }.by(769)
|
||||
end.to change { Domain::Fa::User.count }.by(FOLLOWS_ON_ZZREG_PAGE - 1)
|
||||
|
||||
followed.reload
|
||||
expect(followed.num_submissions).to eq(10)
|
||||
@@ -125,18 +126,18 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
expect(followed.state_detail).to eq({})
|
||||
|
||||
# newly created users should have the right 'first_seen_entry' id
|
||||
stripes_user = Domain::Fa::User.find_by url_name: "stripes"
|
||||
expect(stripes_user).to_not be_nil
|
||||
expect(stripes_user.state_detail["first_seen_entry"]).to eq(@zzreg_mock_log_entries[0].id)
|
||||
accelo_user = Domain::Fa::User.find_by url_name: "accelo"
|
||||
expect(accelo_user).to_not be_nil
|
||||
expect(accelo_user.state_detail["first_seen_entry"]).to eq(@zzreg_mock_log_entries[0].id)
|
||||
end
|
||||
|
||||
it "newly inserted users have a name associated with them" do
|
||||
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
expect(ret).to_not be_a(Exception)
|
||||
|
||||
user = Domain::Fa::User.find_by(url_name: "aimi")
|
||||
user = Domain::Fa::User.find_by(url_name: "accelo")
|
||||
expect(user).to_not be_nil
|
||||
expect(user.name).to eq("Aimi")
|
||||
expect(user.name).to eq("Accelo")
|
||||
end
|
||||
|
||||
include_examples "zzreg follow creation"
|
||||
@@ -165,7 +166,7 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
expect(ret).to_not be_a(Exception)
|
||||
|
||||
user.reload
|
||||
expect(user.follows.length).to eq(770)
|
||||
expect(user.follows.length).to eq(FOLLOWS_ON_ZZREG_PAGE)
|
||||
expect(user.follows.where(followed: smaz_user).first).to be_nil
|
||||
expect(user.follows.where(followed: agi_type01_user).first).to eq(follow_2)
|
||||
|
||||
@@ -180,8 +181,21 @@ describe Domain::Fa::Job::UserFollowsJob do
|
||||
|
||||
# newly created users are enqueued by url name
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
|
||||
job[:args][0][:url_name] == "meesh"
|
||||
job[:args][0][:url_name] == "accelo"
|
||||
end).to_not be_nil
|
||||
end
|
||||
|
||||
it "does not enqueue a job if the user is not new" do
|
||||
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
expect(ret).to_not be_a(Exception)
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to eq(FOLLOWS_ON_ZZREG_PAGE - 1)
|
||||
end
|
||||
|
||||
it "does not enqueue jobs already in the queue" do
|
||||
Domain::Fa::Job::UserPageJob.perform_later({ url_name: "accelo" })
|
||||
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
|
||||
expect(ret).to_not be_a(Exception)
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to eq(FOLLOWS_ON_ZZREG_PAGE - 1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe Domain::Fa::PostEnqueuer do
|
||||
it "works" do
|
||||
ActiveJob::Base.queue_adapter = :test
|
||||
creator = SpecUtil.build_domain_fa_user
|
||||
creator.save!
|
||||
post_fa_ids = 7.times.map do
|
||||
@@ -25,6 +26,11 @@ describe Domain::Fa::PostEnqueuer do
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[0...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
|
||||
# jobs should have concurrency keys
|
||||
expect(
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).first[:good_job].concurrency_key
|
||||
).to_not be_nil
|
||||
|
||||
enqueuer.run_once
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[1...5])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob)
|
||||
@@ -33,9 +39,7 @@ describe Domain::Fa::PostEnqueuer do
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[2...7])
|
||||
SpecUtil.shift_jobs(Domain::Fa::Job::ScanPostJob, 3)
|
||||
|
||||
expect do
|
||||
enqueuer.run_once
|
||||
end.to raise_exception(StopIteration)
|
||||
expect { enqueuer.run_once }.to raise_exception(StopIteration)
|
||||
expect(enqueued_fa_ids.call).to eq(post_fa_ids[5...7])
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe Domain::Fa::UserEnqueuer do
|
||||
it "works" do
|
||||
ActiveJob::Base.queue_adapter = :test
|
||||
users = 7.times.map do
|
||||
SpecUtil.create_domain_fa_user
|
||||
end
|
||||
|
||||
@@ -14,16 +14,26 @@
|
||||
#
|
||||
# See https://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration
|
||||
require "./spec/helpers/twitter_helpers"
|
||||
require "./spec/helpers/debug_helpers"
|
||||
|
||||
RSpec.configure do |config|
|
||||
config.include TwitterHelpers
|
||||
config.include DebugHelpers
|
||||
|
||||
# can tag classes with `quiet: false` to make ColorLogger loud
|
||||
config.around(:each) do |example|
|
||||
if example.example.metadata[:quiet].is_a?(FalseClass)
|
||||
example.call
|
||||
else
|
||||
ColorLogger.quiet(&example)
|
||||
quiet_color_logger(&example)
|
||||
end
|
||||
end
|
||||
|
||||
config.around(:each) do |example|
|
||||
if example.example.metadata[:debug_sql]
|
||||
debug_sql(&example)
|
||||
else
|
||||
example.call
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -130,29 +130,48 @@ class SpecUtil
|
||||
end
|
||||
|
||||
def self.enqueued_jobs(job_klass = nil)
|
||||
unless ::ActiveJob::QueueAdapters::TestAdapter === ::ActiveJob::Base.queue_adapter
|
||||
raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
|
||||
end
|
||||
# unless ::ActiveJob::QueueAdapters::TestAdapter === ::ActiveJob::Base.queue_adapter
|
||||
# raise StandardError, "set `ActiveJob::Base.queue_adapter = :test`"
|
||||
# end
|
||||
|
||||
jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
|
||||
jobs.filter do |job|
|
||||
GoodJob::Job.all.map do |job|
|
||||
{
|
||||
job: job.job_class.constantize,
|
||||
queue: job.queue_name,
|
||||
priority: job.priority,
|
||||
args: ::ActiveJob::Arguments.deserialize(job.serialized_params["arguments"]),
|
||||
good_job: job,
|
||||
}
|
||||
end.filter do |job|
|
||||
if job_klass
|
||||
job[:job] == job_klass
|
||||
else
|
||||
true
|
||||
end
|
||||
end.map do |job|
|
||||
job.slice(:job, :queue, :priority).merge({
|
||||
args: ::ActiveJob::Arguments.deserialize(job[:args]),
|
||||
})
|
||||
end
|
||||
# jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
|
||||
# jobs.filter do |job|
|
||||
# if job_klass
|
||||
# job[:job] == job_klass
|
||||
# else
|
||||
# true
|
||||
# end
|
||||
# end.map do |job|
|
||||
# job.slice(:job, :queue, :priority).merge({
|
||||
# args: ::ActiveJob::Arguments.deserialize(job[:args]),
|
||||
# })
|
||||
# end
|
||||
end
|
||||
|
||||
def self.shift_jobs(job_klass, by = 1)
|
||||
jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
|
||||
# jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
|
||||
# by.times do
|
||||
# index = jobs.index { |job| job[:job] == job_klass }
|
||||
# jobs.delete_at(index)
|
||||
# end
|
||||
by.times do
|
||||
index = jobs.index { |job| job[:job] == job_klass }
|
||||
jobs.delete_at(index)
|
||||
job = enqueued_jobs.find { |job| job[:job] == job_klass }
|
||||
job[:good_job].destroy if job
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -253,7 +253,7 @@ class Domain::Fa::Parser::ReduxPageTest < ActiveSupport::TestCase
|
||||
def test_watchlist_zzreg
|
||||
parser = get_parser("watchlist_zzreg.html", require_logged_in: false)
|
||||
user_list = parser.user_list
|
||||
assert_equal 770, user_list.length
|
||||
assert_equal 10, user_list.length
|
||||
assert_equal "-creeps", user_list[0].url_name
|
||||
assert_equal "-creeps", user_list[0].name
|
||||
|
||||
|
||||
Reference in New Issue
Block a user