quiet color logger, temp fix for api controller, more browse page job tests

This commit is contained in:
Dylan Knutson
2023-03-27 17:10:34 +09:00
parent 29cdb1669c
commit 24e52357be
12 changed files with 207 additions and 151 deletions

View File

@@ -26,13 +26,21 @@ task :set_logger_stdout => :environment do
ActiveRecord::Base.logger = nil
end
task :pool_combined do
ENV["RAILS_ENV"] = "production"
task :good_job do
ENV["RAILS_ENV"] = "worker"
ENV["GOOD_JOB_MAX_THREADS"] = "1"
ENV["GOOD_JOB_QUEUES"] = [
"manual:4",
"static_file:4",
"+static_file,fa_post:2",
"+fa_user_page,fa_user_gallery:2",
"-static_file,fa_post,manual,fa_user_page,fa_user_gallery,twitter_timeline_tweets:2",
].join(";")
proxies = ["direct", "proxy-1", "dedipath-1", "serverhost-1"]
proxy = ENV["proxy"]
raise("'proxy' must be set") unless proxy
raise("'proxy' must be one of #{proxies}") unless proxies.include?(proxy)
cmd = "bundle exec delayed_job_worker_pool pool_combined.rb"
cmd = "bundle exec good_job"
puts "$> #{cmd}"
exec(cmd)
end

View File

@@ -6,10 +6,10 @@ class Domain::Fa::ApiController < ApplicationController
fa_ids = (params[:fa_ids] || []).map(&:to_i)
url_names = (params[:url_names] || [])
jobs_async = Delayed::Backend::ActiveRecord::Job.
select(:id, :queue, :handler).
where(queue: "manual").
load_async
# jobs_async = Delayed::Backend::ActiveRecord::Job.
# select(:id, :queue, :handler).
# where(queue: "manual").
# load_async
users_async = Domain::Fa::User.
where(url_name: url_names).
@@ -91,20 +91,20 @@ class Domain::Fa::ApiController < ApplicationController
hash[key] = 0
end
jobs_async.each do |job|
queue_depths[job.payload_object.job_data["job_class"]] += 1
end
# jobs_async.each do |job|
# queue_depths[job.payload_object.job_data["job_class"]] += 1
# end
queue_depths = queue_depths.map do |key, value|
[key.
delete_prefix("Domain::Fa::Job::").
split("::").
last.
underscore.
delete_suffix("_job").
gsub("_", " "),
value]
end.to_h
# queue_depths = queue_depths.map do |key, value|
# [key.
# delete_prefix("Domain::Fa::Job::").
# split("::").
# last.
# underscore.
# delete_suffix("_job").
# gsub("_", " "),
# value]
# end.to_h
render json: {
posts: posts_response,

View File

@@ -8,7 +8,6 @@ class ApplicationJob < ActiveJob::Base
attempts: Float::INFINITY,
) do |job, exception|
job.logger.error("error: #{exception.message}\n#{exception.backtrace.join("\n")}")
binding.pry
end
# Automatically retry jobs that encountered a deadlock

View File

@@ -35,6 +35,7 @@ class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
if response.status_code != 200
fatal_error("non 200 response for /browse: #{response.status_code.to_s.underline}")
end
page = Domain::Fa::Parser::Page.new(response.body)
listing_page_stats = update_and_enqueue_posts_from_listings_page(
:browse_page, page, log_entry,

View File

@@ -118,7 +118,7 @@ class Domain::Fa::Job::FaJobBase < Scraper::JobBase
"enqueue user page job for #{user.url_name.bold}, " +
"last scanned #{time_ago_in_words(user.scanned_page_at)}"
)
enqueue_job(Domain::Fa::Job::UserPageJob, args)
defer_job(Domain::Fa::Job::UserPageJob, args)
end
end
@@ -128,7 +128,7 @@ class Domain::Fa::Job::FaJobBase < Scraper::JobBase
"enqueue user gallery job for #{user.url_name.bold}, " +
"last scanned #{time_ago_in_words(user.scanned_gallery_at)}"
)
enqueue_job(Domain::Fa::Job::UserGalleryJob, args)
defer_job(Domain::Fa::Job::UserGalleryJob, args)
end
end
end
@@ -145,13 +145,13 @@ class Domain::Fa::Job::FaJobBase < Scraper::JobBase
fa_id_str = (post.fa_id || "(nil)").to_s.bold
if !post.scanned?
logger.info "enqueue post scan for fa_id #{fa_id_str}"
enqueue_job(Domain::Fa::Job::ScanPostJob, {
defer_job(Domain::Fa::Job::ScanPostJob, {
post: post,
caused_by_entry: caused_by_entry,
}, { priority: enqueue_pri })
elsif !post.have_file?
logger.info "enqueue file scan for fa_id #{fa_id_str}"
enqueue_job(Domain::Fa::Job::ScanFileJob, {
defer_job(Domain::Fa::Job::ScanFileJob, {
post: post,
caused_by_entry: caused_by_entry,
}, { priority: enqueue_pri })

View File

@@ -9,7 +9,7 @@ class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::FaJobBase
if @post.nil?
logger.error "no post model - fa_id: #{args[:fa_id]}, enqueue scan"
enqueue_job(Domain::Fa::Job::ScanPostJob, {
defer_job(Domain::Fa::Job::ScanPostJob, {
fa_id: args[:fa_id],
caused_by_entry: @caused_by_entry,
}) if args[:fa_id]
@@ -82,7 +82,7 @@ class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::FaJobBase
end
if response.status_code != 200
enqueue_job(Domain::Fa::Job::ScanPostJob, {
defer_job(Domain::Fa::Job::ScanPostJob, {
post: @post,
caused_by_entry: response.log_entry,
force_scan: true,

View File

@@ -23,7 +23,7 @@ class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::FaJobBase
if (@post.state == "ok" && @post.file_uri && @post.file.nil?) || @force_scan
logger.info("enqueue file job (#{self.priority})")
enqueue_job(Domain::Fa::Job::ScanFileJob, {
defer_job(Domain::Fa::Job::ScanFileJob, {
post: @post,
caused_by_entry: @submission_entry || @caused_by_entry,
}, { priority: self.priority })

View File

@@ -161,7 +161,7 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
end
def enqueue_media_file(media)
enqueue_job(Domain::Twitter::Job::MediaJob, {
defer_job(Domain::Twitter::Job::MediaJob, {
media: media || raise,
caused_by_entry: @first_twitter_caused_by,
}, { priority: self.priority })

View File

@@ -1,9 +1,8 @@
class Scraper::JobBase < ApplicationJob
attr_reader :http_client, :enqueue_jobs
class JobError < RuntimeError; end
def initialize(...)
@http_client = Scraper::HttpFactory.send(self.class.http_factory_method)
@enqueue_jobs = []
@deferred_jobs = []
super(...)
end
@@ -11,6 +10,10 @@ class Scraper::JobBase < ApplicationJob
raise NotImplementedError.new("implement in #{self.name}")
end
def http_client
@http_client ||= Scraper::HttpFactory.send(self.class.http_factory_method)
end
good_job_control_concurrency_with(
total_limit: 1,
key: proc do
@@ -35,14 +38,6 @@ class Scraper::JobBase < ApplicationJob
end,
)
around_perform do |job, block|
level = job.logger.level
job.logger.level = :info
block.call
ensure
job.logger.level = level
end
def write_point(name, tags: {}, fields: {})
Metrics::Reporter.singleton.write_point(
name,
@@ -172,7 +167,7 @@ class Scraper::JobBase < ApplicationJob
desc,
].join(" "))
enqueue_job(
defer_job(
job_class,
params.merge({ caused_by_entry: log_entry }),
)
@@ -207,25 +202,27 @@ class Scraper::JobBase < ApplicationJob
)
end
def enqueue_job(job_class, params, set_args = {})
@enqueue_jobs << [job_class, params, set_args]
after_perform do |job|
ColorLogger.quiet do
job.enqueue_deferred_jobs!
end
end
after_perform do |job|
level = logger.level
logger.level = :fatal
def defer_job(job_class, params, set_args = {})
@deferred_jobs << [job_class, params, set_args]
end
def enqueue_deferred_jobs!
GoodJob::Bulk.enqueue do
job.enqueue_jobs.each do |job_class, params, set_args|
@deferred_jobs.each do |job_class, params, set_args|
job_class.set(set_args).perform_later(params)
end
end
ensure
logger.level = level
end
def fatal_error(msg)
logger.error(msg)
raise msg.uncolorize
raise JobError, msg.uncolorize
end
DATE_HELPER = Class.new.extend(ActionView::Helpers::DateHelper)

View File

@@ -1,6 +1,14 @@
class ColorLogger
def self.make(sink, instance)
@quiet = Concurrent::ThreadLocalVar.new { 0 }
def self.quiet(&block)
@quiet.value += 1
block.call
ensure
@quiet.value -= 1
end
def self.make(sink, instance)
# clean up common class names
klass_name = instance.class.name.dup
klass_name.delete_prefix!("Domain::")
@@ -22,9 +30,7 @@ class ColorLogger
end
Logger.new(sink).tap do |logger|
if Rails.env.test?
logger.level = :error
end
# logger.level = Logger::ERROR if Rails.env.test?
def logger.prefix=(p)
@logger_prefix = p
@@ -45,7 +51,12 @@ class ColorLogger
if prefix.is_a?(Proc)
prefix = prefix.call
end
[klass_name_str, prefix, msg].reject(&:blank?).join(" ") + "\n"
if @quiet.value > 0
""
else
[klass_name_str, prefix, msg].reject(&:blank?).join(" ") + "\n"
end
end
end
end

View File

@@ -1,4 +1,6 @@
class Metrics::Reporter
REPORT = !Rails.env.test? && !Rails.env.worker?
attr_reader :logger
def self.singleton
@@ -9,6 +11,8 @@ class Metrics::Reporter
end
def initialize(host: nil, token: nil, org: nil, bucket: nil, default_tags: {})
return unless REPORT
host ||= Rails.application.config.x.influxdb.host || raise("no host")
token ||= Rails.application.config.x.influxdb.token || raise("no token")
org ||= Rails.application.config.x.influxdb.org || raise("no org")
@@ -65,10 +69,12 @@ class Metrics::Reporter
end
def close!
return unless REPORT
@client.close!
end
def write_point(name, tags: {}, fields: {})
return unless REPORT
@writer.write(data: { name: name, tags: tags, fields: fields })
end

View File

@@ -1,9 +1,100 @@
require "rails_helper"
describe Domain::Fa::Job::BrowsePageJob do
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
before do
@http_client_mock = instance_double("::Scraper::HttpClient")
Scraper::HttpFactory.http_client_mock = @http_client_mock
Scraper::HttpFactory.http_client_mock = http_client_mock
end
around do |block|
ColorLogger.quiet(&block)
end
shared_context "user and post getters" do
let(:user) { proc { Domain::Fa::User.find_by(url_name: "ruby69r") } }
let(:post) { proc { Domain::Fa::Post.find_by(fa_id: 51509268) } }
before do
expect(post.call).to be_nil
expect(user.call).to be_nil
end
end
shared_context "create user and post" do
before do
creator = Domain::Fa::User.create!({
url_name: "ruby69r",
name: "Ruby_68r",
})
Domain::Fa::Post.create!({
fa_id: 51509268,
creator: creator,
})
end
end
shared_context "test queue adapter" do
before do
ActiveJob::Base.queue_adapter = :test
end
end
shared_examples "enqueue post scan" do |expect_to_enqueue|
it "enqueues post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to match([
including(args: [{
post: post.call,
caused_by_entry: log_entries[0],
}]),
])
end if expect_to_enqueue
it "does not enqueue post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to eq([])
end unless expect_to_enqueue
end
shared_examples "enqueue file scan" do |expect_to_enqueue|
it "enqueues file scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanFileJob)).to match([
including(args: [{
post: post.call,
caused_by_entry: log_entries[0],
}]),
])
end if expect_to_enqueue
it "does not enqueue post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanFileJob)).to eq([])
end unless expect_to_enqueue
end
shared_examples "enqueue user page scan" do |expect_to_enqueue|
it "enqueues user page job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob)).to match([
including(args: [{
user: user.call,
caused_by_entry: log_entries[0],
}]),
])
end if expect_to_enqueue
it "does not enqueue user page job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob)).to eq([])
end unless expect_to_enqueue
end
shared_examples "enqueue user gallery scan" do |expect_to_enqueue|
it "enqueues user gallery job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserGalleryJob)).to match([
including(args: [{
user: user.call,
caused_by_entry: log_entries[0],
}]),
])
end if expect_to_enqueue
it "does not enqueue user gallery job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserGalleryJob)).to eq([])
end unless expect_to_enqueue
end
it "enqueues one" do
@@ -29,11 +120,10 @@ describe Domain::Fa::Job::BrowsePageJob do
end
context "with no posts found on page" do
before do
ActiveJob::Base.queue_adapter = :test
@log_entries = SpecUtil.init_http_client_mock(
@http_client_mock, [
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
{
uri: "https://www.furaffinity.net/browse/",
status_code: 200,
@@ -50,81 +140,12 @@ describe Domain::Fa::Job::BrowsePageJob do
end
end
shared_context "user and post" do
let(:user) { proc { Domain::Fa::User.find_by(url_name: "ruby69r") } }
let(:post) { proc { Domain::Fa::Post.find_by(fa_id: 51509268) } }
before do
expect(post.call).to be_nil
expect(user.call).to be_nil
end
end
shared_examples "enqueue post scan" do |does_enqueue|
it "enqueues post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to match([
including(args: [{
post: post.call,
caused_by_entry: @log_entries[0],
}]),
])
end if does_enqueue
it "does not enqueue post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob)).to eq([])
end unless does_enqueue
end
shared_examples "enqueue file scan" do |does_enqueue|
it "enqueues file scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanFileJob)).to match([
including(args: [{
post: post.call,
caused_by_entry: @log_entries[0],
}]),
])
end if does_enqueue
it "does not enqueue post scan job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanFileJob)).to eq([])
end unless does_enqueue
end
shared_examples "enqueue user page scan" do |does_enqueue|
it "enqueues user page job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob)).to match([
including(args: [{
user: user.call,
caused_by_entry: @log_entries[0],
}]),
])
end if does_enqueue
it "does not enqueue user page job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob)).to eq([])
end unless does_enqueue
end
shared_examples "enqueue user gallery scan" do |does_enqueue|
it "enqueues user gallery job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserGalleryJob)).to match([
including(args: [{
user: user.call,
caused_by_entry: @log_entries[0],
}]),
])
end if does_enqueue
it "does not enqueue user gallery job" do
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserGalleryJob)).to eq([])
end unless does_enqueue
end
context "with one unseen post" do
include_context "user and post"
before do
ActiveJob::Base.queue_adapter = :test
@log_entries = SpecUtil.init_http_client_mock(
@http_client_mock, [
include_context "user and post getters"
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
{
uri: "https://www.furaffinity.net/browse/",
status_code: 200,
@@ -173,21 +194,12 @@ describe Domain::Fa::Job::BrowsePageJob do
end
context "with one seen post" do
include_context "user and post"
before do
ActiveJob::Base.queue_adapter = :test
creator = Domain::Fa::User.create!({
url_name: "ruby69r",
name: "Ruby_68r",
})
Domain::Fa::Post.create!({
fa_id: 51509268,
creator: creator,
})
@log_entries = SpecUtil.init_http_client_mock(
@http_client_mock, [
include_context "user and post getters"
include_context "create user and post"
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
{
uri: "https://www.furaffinity.net/browse/",
status_code: 200,
@@ -266,4 +278,26 @@ describe Domain::Fa::Job::BrowsePageJob do
include_examples "enqueue user gallery scan", true
end
end
context "with a page that responds with an error" do
include_context "test queue adapter"
let! :log_entries do
SpecUtil.init_http_client_mock(
http_client_mock, [
{
uri: "https://www.furaffinity.net/browse/",
status_code: 503,
content_type: "text/html",
contents: SpecUtil.read_fixture_file("domain/fa/job/browse_page_no_submissions.html"),
},
]
)
end
it "fails with a fatal error" do
expect(described_class.perform_now({})).to(
be_a(Scraper::JobBase::JobError)
)
end
end
end