Refactor and enhance scraper metrics and job processing
- Introduced new metrics tracking for HTTP client requests and job processing, including `HttpClientMetrics` and `JobBaseMetrics`. - Updated `Scraper::HttpClient` to utilize the new metrics for observing request start and finish events. - Enhanced `Scraper::JobBase` with metrics for job execution, including start, finish, and enqueued jobs. - Refactored `InkbunnyHttpClientConfig` to enforce strict typing and improve the handling of session IDs in URIs. - Added tests for new metrics functionality and improved existing tests for job processing and URI mapping. These changes improve observability and maintainability of the scraper and job processing systems.
This commit is contained in:
@@ -43,6 +43,8 @@ module Domain::Inkbunny::Job
|
||||
caused_by_entry: log_entry,
|
||||
)
|
||||
|
||||
rid ||= T.cast(result[:rid], String)
|
||||
|
||||
logger.info(
|
||||
[
|
||||
"[rid: #{rid}]",
|
||||
@@ -53,7 +55,6 @@ module Domain::Inkbunny::Job
|
||||
].join(" "),
|
||||
)
|
||||
|
||||
rid ||= T.cast(result[:rid], String)
|
||||
break if result[:num_pages] == page
|
||||
page += 1
|
||||
end
|
||||
|
||||
@@ -250,16 +250,20 @@ class Scraper::JobBase < ApplicationJob
|
||||
)
|
||||
end
|
||||
|
||||
# Delay a little bit on Net::ReadTimeout or Errno::ECONNREFUSED
|
||||
around_perform do |job, block|
|
||||
block.call
|
||||
rescue Net::ReadTimeout, Errno::ECONNREFUSED => e
|
||||
logger.error "#{e.class.name} - sleep for a bit"
|
||||
sleep rand(2.0..5.0)
|
||||
raise e
|
||||
end
|
||||
|
||||
# Collect log lines from the job
|
||||
around_perform do |job, block|
|
||||
log_lines = T.let([], T::Array[String])
|
||||
ColorLogger.log_line_accumulator = proc { |line| log_lines << line }
|
||||
block.call
|
||||
rescue Net::ReadTimeout, Errno::ECONNREFUSED => e
|
||||
logger.error "#{e.class.name} - sleep for a bit"
|
||||
sleep rand(2.0..7.0)
|
||||
raise e
|
||||
rescue => e
|
||||
raise e
|
||||
ensure
|
||||
ColorLogger.log_line_accumulator = nil
|
||||
ColorLogger.quiet { job.enqueue_deferred_jobs! }
|
||||
@@ -273,6 +277,23 @@ class Scraper::JobBase < ApplicationJob
|
||||
end
|
||||
end
|
||||
|
||||
# Log job metrics
|
||||
around_perform do |job, block|
|
||||
Scraper::Metrics::JobBaseMetrics.observe_job_start(job_class: self.class)
|
||||
success = T.let(false, T::Boolean)
|
||||
start_time = T.let(Time.now, Time)
|
||||
ret = block.call
|
||||
success = true
|
||||
ret
|
||||
ensure
|
||||
duration_ms = (1000 * (Time.now - T.must(start_time))).to_i
|
||||
Scraper::Metrics::JobBaseMetrics.observe_job_finish(
|
||||
job_class: self.class,
|
||||
time_ms: duration_ms,
|
||||
success: T.must(success),
|
||||
)
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
job_class: T.class_of(Scraper::JobBase),
|
||||
@@ -290,6 +311,10 @@ class Scraper::JobBase < ApplicationJob
|
||||
@deferred_jobs.each do |deferred_job|
|
||||
args = deferred_job.params.merge({ caused_by_entry: causing_log_entry })
|
||||
deferred_job.job_class.set(deferred_job.set_args).perform_later(args)
|
||||
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
|
||||
source_class: self.class,
|
||||
enqueued_class: deferred_job.job_class,
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -154,7 +154,7 @@ class Scraper::HttpClient
|
||||
),
|
||||
}
|
||||
|
||||
Metrics.observe_request_start(method, uri.host)
|
||||
Scraper::Metrics::HttpClientMetrics.observe_request_start(method, uri.host)
|
||||
|
||||
response = @http_performer.do_request(method, url, request_headers)
|
||||
|
||||
@@ -203,7 +203,7 @@ class Scraper::HttpClient
|
||||
BlobFile.find_or_initialize_from_blob_entry(response_blob_entry)
|
||||
total_time_ms = ((Time.now - requested_at) * 1000).to_i
|
||||
|
||||
Metrics.observe_request_finish(
|
||||
Scraper::Metrics::HttpClientMetrics.observe_request_finish(
|
||||
method: method,
|
||||
host: uri.host,
|
||||
content_type: T.must(content_type.split(";").first),
|
||||
|
||||
@@ -10,7 +10,7 @@ class Scraper::HttpClientConfig
|
||||
def cookies
|
||||
end
|
||||
|
||||
sig { overridable.params(performer: T.untyped).void }
|
||||
sig { overridable.params(performer: Scraper::CurlHttpPerformer).void }
|
||||
def do_login(performer)
|
||||
# nop
|
||||
end
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
# typed: false
|
||||
# typed: strict
|
||||
class Scraper::InkbunnyHttpClientConfig < Scraper::HttpClientConfig
|
||||
DEFAULT_ALLOWED_DOMAINS = %w[inkbunny.net *.ib.metapix.net]
|
||||
|
||||
sig { void }
|
||||
def initialize
|
||||
@sid = T.let(nil, T.nilable(String))
|
||||
end
|
||||
|
||||
sig { override.params(performer: Scraper::CurlHttpPerformer).void }
|
||||
def do_login(performer)
|
||||
sid_model =
|
||||
GlobalState.find_or_create_by(key: "inkbunny-sid") do |gs|
|
||||
@@ -16,14 +22,18 @@ class Scraper::InkbunnyHttpClientConfig < Scraper::HttpClientConfig
|
||||
end
|
||||
end
|
||||
|
||||
sig { override.params(uri: Addressable::URI).returns(Addressable::URI) }
|
||||
def map_uri(uri)
|
||||
raise("no sid") unless @sid
|
||||
return uri if uri.host&.end_with?("ib.metapix.net")
|
||||
|
||||
new_query_ar = URI.decode_www_form(uri.query || "") << ["sid", @sid]
|
||||
uri = uri.dup
|
||||
uri.query = URI.encode_www_form(new_query_ar).gsub("%2C", ",")
|
||||
uri
|
||||
end
|
||||
|
||||
sig { override.params(uri: Addressable::URI).returns(Addressable::URI) }
|
||||
def scrub_stored_uri(uri)
|
||||
if uri.path == "/api_login.php"
|
||||
uri = uri.dup
|
||||
@@ -38,23 +48,30 @@ class Scraper::InkbunnyHttpClientConfig < Scraper::HttpClientConfig
|
||||
uri
|
||||
end
|
||||
|
||||
sig { override.returns(T.nilable(T::Array[T.untyped])) }
|
||||
def cookies
|
||||
nil
|
||||
end
|
||||
|
||||
sig { override.returns(T::Array[[String, Numeric]]) }
|
||||
def ratelimit
|
||||
[["inkbunny.net", 2], ["*.ib.metapix.net", 0.25]]
|
||||
end
|
||||
|
||||
sig { override.returns(T::Array[String]) }
|
||||
def allowed_domains
|
||||
DEFAULT_ALLOWED_DOMAINS
|
||||
end
|
||||
|
||||
sig { override.returns(Integer) }
|
||||
|
||||
def redirect_limit
|
||||
2
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
sig { params(performer: Scraper::CurlHttpPerformer).returns(String) }
|
||||
def do_ib_login(performer)
|
||||
username =
|
||||
GlobalState.find_by(key: "inkbunny-username")&.value ||
|
||||
|
||||
@@ -46,5 +46,49 @@ module Scraper::Metrics
|
||||
30_000, # 30 sec
|
||||
].freeze
|
||||
|
||||
JOB_TIME_MS_BUCKETS = [
|
||||
10, # 10 ms
|
||||
25, # 25 ms
|
||||
50, # 50 ms
|
||||
75, # 75 ms
|
||||
100, # 100 ms
|
||||
150, # 150 ms
|
||||
200, # 200 ms
|
||||
250, # 250 ms
|
||||
500, # 500 ms
|
||||
750, # 750 ms
|
||||
1_000, # 1 sec
|
||||
1_250, # 1.25 sec
|
||||
1_500, # 1.5 sec
|
||||
1_750, # 1.75 sec
|
||||
2_000, # 2 sec
|
||||
2_250, # 2.25 sec
|
||||
2_500, # 2.5 sec
|
||||
2_750, # 2.75 sec
|
||||
3_000, # 3 sec
|
||||
3_500, # 3.5 sec
|
||||
4_000, # 4 sec
|
||||
4_500, # 4.5 sec
|
||||
5_000, # 5 sec
|
||||
5_500, # 5.5 sec
|
||||
6_000, # 6 sec
|
||||
6_500, # 6.5 sec
|
||||
7_000, # 7 sec
|
||||
7_500, # 7.5 sec
|
||||
8_000, # 8 sec
|
||||
8_500, # 8.5 sec
|
||||
9_000, # 9 sec
|
||||
9_500, # 9.5 sec
|
||||
10_000, # 10 sec
|
||||
30_000, # 30 sec
|
||||
60_000, # 1 min
|
||||
120_000, # 2 min
|
||||
300_000, # 5 min
|
||||
600_000, # 10 min
|
||||
900_000, # 15 min
|
||||
1_200_000, # 20 min
|
||||
1_800_000, # 30 min
|
||||
].freeze
|
||||
|
||||
QUARTILES = [0.99, 0.95, 0.90, 0.75, 0.50].freeze
|
||||
end
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# typed: strict
|
||||
module Scraper::HttpClient::Metrics
|
||||
module Scraper::Metrics::HttpClientMetrics
|
||||
extend T::Sig
|
||||
|
||||
sig { params(method: Symbol, host: String).void }
|
||||
91
app/lib/scraper/metrics/job_base_metrics.rb
Normal file
91
app/lib/scraper/metrics/job_base_metrics.rb
Normal file
@@ -0,0 +1,91 @@
|
||||
# typed: strict
|
||||
module Scraper::Metrics::JobBaseMetrics
|
||||
extend T::Sig
|
||||
|
||||
# a job has begun
|
||||
sig { params(job_class: T.class_of(Scraper::JobBase)).void }
|
||||
def self.observe_job_start(job_class:)
|
||||
JOB_START_COUNT.increment({ job_class: job_class.name })
|
||||
end
|
||||
|
||||
# a job has finished
|
||||
sig do
|
||||
params(
|
||||
job_class: T.class_of(Scraper::JobBase),
|
||||
time_ms: Integer,
|
||||
success: T::Boolean,
|
||||
).void
|
||||
end
|
||||
def self.observe_job_finish(job_class:, time_ms:, success:)
|
||||
keys = { job_class: job_class.name, success: success }
|
||||
JOB_FINISH_COUNT.increment(keys)
|
||||
JOB_FINISH_TIME_HISTOGRAM.observe(time_ms, keys)
|
||||
JOB_FINISH_TIME_SUMMARY.observe(time_ms, keys)
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
source_class: T.class_of(Scraper::JobBase),
|
||||
enqueued_class: T.class_of(Scraper::JobBase),
|
||||
).void
|
||||
end
|
||||
def self.observe_job_enqueued(source_class:, enqueued_class:)
|
||||
JOB_ENQUEUED_COUNT.increment(
|
||||
{ source_class: source_class.name, enqueued_class: enqueued_class.name },
|
||||
)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
JOB_ENQUEUED_COUNT =
|
||||
T.let(
|
||||
PrometheusExporter::Client.default.register(
|
||||
:counter,
|
||||
"job_enqueued_count",
|
||||
"count of jobs enqueued, labeled by source and enqueued classes",
|
||||
),
|
||||
PrometheusExporter::Client::RemoteMetric,
|
||||
)
|
||||
|
||||
JOB_START_COUNT =
|
||||
T.let(
|
||||
PrometheusExporter::Client.default.register(
|
||||
:counter,
|
||||
"job_start_count",
|
||||
"count of jobs started, labeled by job class",
|
||||
),
|
||||
PrometheusExporter::Client::RemoteMetric,
|
||||
)
|
||||
|
||||
JOB_FINISH_COUNT =
|
||||
T.let(
|
||||
PrometheusExporter::Client.default.register(
|
||||
:counter,
|
||||
"job_finish_count",
|
||||
"count of jobs finished, labeled by job class and success",
|
||||
),
|
||||
PrometheusExporter::Client::RemoteMetric,
|
||||
)
|
||||
|
||||
JOB_FINISH_TIME_HISTOGRAM =
|
||||
T.let(
|
||||
PrometheusExporter::Client.default.register(
|
||||
:histogram,
|
||||
"job_finish_time_histogram",
|
||||
"histogram of job finish times, labeled by job class and success ",
|
||||
buckets: Scraper::Metrics::JOB_TIME_MS_BUCKETS,
|
||||
),
|
||||
PrometheusExporter::Client::RemoteMetric,
|
||||
)
|
||||
|
||||
JOB_FINISH_TIME_SUMMARY =
|
||||
T.let(
|
||||
PrometheusExporter::Client.default.register(
|
||||
:summary,
|
||||
"job_finish_time_summary",
|
||||
"summary of job finish times, labeled by job class and success",
|
||||
quantiles: Scraper::Metrics::QUARTILES,
|
||||
),
|
||||
PrometheusExporter::Client::RemoteMetric,
|
||||
)
|
||||
end
|
||||
@@ -51,31 +51,68 @@ describe Scraper::InkbunnyHttpClientConfig do
|
||||
end
|
||||
|
||||
describe "#map_uri" do
|
||||
before { create(:global_state, key: "inkbunny-sid", value: "the-sid") }
|
||||
let(:sid) { "test_sid_123" }
|
||||
|
||||
it "raises if not logged in yet" do
|
||||
expect do
|
||||
subject.map_uri(
|
||||
URI.parse(
|
||||
"https://inkbunny.net/api_search.php?orderby=create_datetime",
|
||||
),
|
||||
)
|
||||
end.to raise_error("no sid")
|
||||
before do
|
||||
create(
|
||||
:global_state,
|
||||
key: "inkbunny-sid",
|
||||
value: sid,
|
||||
value_type: :string,
|
||||
)
|
||||
subject.do_login(instance_double("Scraper::CurlHttpPerformer"))
|
||||
end
|
||||
|
||||
it "adds sid to query string" do
|
||||
subject.do_login(performer)
|
||||
uri =
|
||||
subject.map_uri(
|
||||
URI.parse(
|
||||
"https://inkbunny.net/api_search.php?orderby=create_datetime",
|
||||
),
|
||||
)
|
||||
expect(uri).to eq(
|
||||
URI.parse(
|
||||
"https://inkbunny.net/api_search.php?orderby=create_datetime&sid=the-sid",
|
||||
),
|
||||
)
|
||||
context "when mapping an inkbunny.net URI" do
|
||||
it "adds the sid parameter to the query string" do
|
||||
uri =
|
||||
Addressable::URI.parse(
|
||||
"https://inkbunny.net/api_search.php?text=test",
|
||||
)
|
||||
mapped_uri = subject.map_uri(uri)
|
||||
|
||||
expect(mapped_uri.query_values).to include("sid" => sid)
|
||||
expect(mapped_uri.query_values["text"]).to eq("test")
|
||||
end
|
||||
|
||||
it "adds sid parameter when no query string exists" do
|
||||
uri = Addressable::URI.parse("https://inkbunny.net/api_search.php")
|
||||
mapped_uri = subject.map_uri(uri)
|
||||
|
||||
expect(mapped_uri.query_values).to eq("sid" => sid)
|
||||
end
|
||||
|
||||
it "preserves commas in query parameters" do
|
||||
uri =
|
||||
Addressable::URI.parse(
|
||||
"https://inkbunny.net/api_search.php?keywords=tag1,tag2",
|
||||
)
|
||||
mapped_uri = subject.map_uri(uri)
|
||||
|
||||
expect(mapped_uri.query).to include("keywords=tag1,tag2")
|
||||
expect(mapped_uri.query).to include("sid=#{sid}")
|
||||
end
|
||||
end
|
||||
|
||||
context "when mapping a metapix URI" do
|
||||
it "does not modify URIs from tx.ib.metapix.net" do
|
||||
uri =
|
||||
Addressable::URI.parse(
|
||||
"https://tx.ib.metapix.net/files/full/123456.jpg",
|
||||
)
|
||||
mapped_uri = subject.map_uri(uri)
|
||||
|
||||
expect(mapped_uri).to eq(uri)
|
||||
end
|
||||
end
|
||||
|
||||
context "when sid is not set" do
|
||||
before { subject.instance_variable_set(:@sid, nil) }
|
||||
|
||||
it "raises an error" do
|
||||
uri = Addressable::URI.parse("https://inkbunny.net/api_search.php")
|
||||
expect { subject.map_uri(uri) }.to raise_error("no sid")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -83,12 +120,12 @@ describe Scraper::InkbunnyHttpClientConfig do
|
||||
it "scrubs username and password" do
|
||||
uri =
|
||||
subject.scrub_stored_uri(
|
||||
URI.parse(
|
||||
Addressable::URI.parse(
|
||||
"https://inkbunny.net/api_login.php?username=foo&password=bar",
|
||||
),
|
||||
)
|
||||
expect(uri).to eq(
|
||||
URI.parse(
|
||||
Addressable::URI.parse(
|
||||
"https://inkbunny.net/api_login.php?username=*****&password=*****",
|
||||
),
|
||||
)
|
||||
|
||||
104
spec/lib/scraper/metrics/job_base_metrics_spec.rb
Normal file
104
spec/lib/scraper/metrics/job_base_metrics_spec.rb
Normal file
@@ -0,0 +1,104 @@
|
||||
# typed: false
|
||||
require "rails_helper"
|
||||
|
||||
describe Scraper::Metrics::JobBaseMetrics do
|
||||
# Create concrete test job classes
|
||||
class TargetJob < Scraper::JobBase
|
||||
def self.http_factory_method
|
||||
:get_fa_http_client
|
||||
end
|
||||
|
||||
def perform(args)
|
||||
sleep 0.1
|
||||
end
|
||||
end
|
||||
|
||||
class TestJob < Scraper::JobBase
|
||||
def self.http_factory_method
|
||||
:get_fa_http_client
|
||||
end
|
||||
|
||||
def perform(args)
|
||||
# Simulate some work
|
||||
sleep 0.1
|
||||
# Enqueue another job
|
||||
defer_job(TargetJob, {}) if args[:enqueue_target]
|
||||
end
|
||||
end
|
||||
|
||||
let(:test_job_class) { TestJob }
|
||||
let(:job_class_name) { test_job_class.name }
|
||||
|
||||
describe "job execution metrics" do
|
||||
it "records metrics for successful job execution" do
|
||||
expect(described_class::JOB_START_COUNT).to receive(:increment).with(
|
||||
{ job_class: job_class_name },
|
||||
)
|
||||
|
||||
# The around_perform block in JobBase ensures these are called
|
||||
expect(described_class::JOB_FINISH_COUNT).to receive(:increment).with(
|
||||
{ job_class: job_class_name, success: true },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_TIME_HISTOGRAM).to receive(
|
||||
:observe,
|
||||
).with(
|
||||
be_between(90, 200), # 0.1 seconds = ~100ms with some buffer
|
||||
{ job_class: job_class_name, success: true },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_TIME_SUMMARY).to receive(
|
||||
:observe,
|
||||
).with(be_between(90, 200), { job_class: job_class_name, success: true })
|
||||
|
||||
test_job_class.perform_now({})
|
||||
end
|
||||
|
||||
it "records metrics for failed job execution" do
|
||||
error_job =
|
||||
Class.new(test_job_class) do
|
||||
def perform(args)
|
||||
raise "test error"
|
||||
end
|
||||
end
|
||||
|
||||
expect(described_class::JOB_START_COUNT).to receive(:increment).with(
|
||||
{ job_class: error_job.name },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_COUNT).to receive(:increment).with(
|
||||
{ job_class: error_job.name, success: false },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_TIME_HISTOGRAM).to receive(
|
||||
:observe,
|
||||
).with(
|
||||
be_between(0, 100), # Should be quick since it errors immediately
|
||||
{ job_class: error_job.name, success: false },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_TIME_SUMMARY).to receive(
|
||||
:observe,
|
||||
).with(be_between(0, 100), { job_class: error_job.name, success: false })
|
||||
|
||||
expect(error_job.perform_now({})).to be_a(RuntimeError)
|
||||
end
|
||||
|
||||
it "records metrics when a job enqueues another job" do
|
||||
expect(described_class::JOB_START_COUNT).to receive(:increment).with(
|
||||
{ job_class: job_class_name },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_COUNT).to receive(:increment).with(
|
||||
{ job_class: job_class_name, success: true },
|
||||
)
|
||||
expect(described_class::JOB_FINISH_TIME_HISTOGRAM).to receive(
|
||||
:observe,
|
||||
).with(be_between(90, 200), { job_class: job_class_name, success: true })
|
||||
expect(described_class::JOB_FINISH_TIME_SUMMARY).to receive(
|
||||
:observe,
|
||||
).with(be_between(90, 200), { job_class: job_class_name, success: true })
|
||||
|
||||
# Expect the enqueued job metric to be recorded
|
||||
expect(described_class::JOB_ENQUEUED_COUNT).to receive(:increment).with(
|
||||
{ source_class: job_class_name, enqueued_class: TargetJob.name },
|
||||
)
|
||||
|
||||
test_job_class.perform_now({ enqueue_target: true })
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user