fix metrics reporting for http clients
This commit is contained in:
@@ -50,16 +50,6 @@ class Scraper::JobBase < ApplicationJob
|
||||
@@gjcc_tl.value
|
||||
end
|
||||
|
||||
def write_point(name, tags: {}, fields: {})
|
||||
Metrics::Reporter.singleton.write_point(
|
||||
name,
|
||||
tags: tags.merge({
|
||||
class_name: self.class.name,
|
||||
}),
|
||||
fields: fields,
|
||||
)
|
||||
end
|
||||
|
||||
PERMITTED_CONTENT_TYPES = [
|
||||
/text\/html/,
|
||||
/application\/json/,
|
||||
@@ -191,7 +181,11 @@ class Scraper::JobBase < ApplicationJob
|
||||
around_perform do |job, block|
|
||||
error = nil
|
||||
start = Time.now
|
||||
block.call
|
||||
begin
|
||||
block.call
|
||||
ensure
|
||||
duration_ms = (Time.now - start) * 1000
|
||||
end
|
||||
rescue Net::ReadTimeout, Errno::ECONNREFUSED => e
|
||||
logger.error "#{e.class.name} - sleep for a bit"
|
||||
sleep rand(2.0..7.0)
|
||||
@@ -201,10 +195,12 @@ class Scraper::JobBase < ApplicationJob
|
||||
error = e
|
||||
raise e
|
||||
ensure
|
||||
duration_ms = (Time.now - start) * 1000
|
||||
job.write_point(
|
||||
"delayed_job_performed",
|
||||
Metrics::Client.singleton.write_point(
|
||||
self,
|
||||
"job_performed",
|
||||
tags: {
|
||||
job_queue: job.queue_name,
|
||||
job_class: job.class.name,
|
||||
success: error.nil?,
|
||||
error_class: error&.class&.name,
|
||||
},
|
||||
|
||||
54
app/lib/metrics/client.rb
Normal file
54
app/lib/metrics/client.rb
Normal file
@@ -0,0 +1,54 @@
|
||||
class Metrics::Client
|
||||
include HasColorLogger
|
||||
|
||||
REPORT = !Rails.env.test?
|
||||
|
||||
def self.singleton
|
||||
@singleton ||= Metrics::Client.new
|
||||
end
|
||||
def self.singleton=(instance)
|
||||
@singleton = instance
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def initialize(host: nil, token: nil, org: nil, bucket: nil, default_tags: {})
|
||||
unless REPORT
|
||||
logger.warn "!!! not reporting for this environment !!!"
|
||||
return
|
||||
end
|
||||
|
||||
host ||= Rails.application.config.x.influxdb.host || raise("no host")
|
||||
token ||= Rails.application.config.x.influxdb.token || raise("no token")
|
||||
org ||= Rails.application.config.x.influxdb.org || raise("no org")
|
||||
bucket ||= Rails.application.config.x.influxdb.bucket || raise("no bucket")
|
||||
|
||||
@client = InfluxDB2::Client.new(
|
||||
host, token,
|
||||
org: org,
|
||||
bucket: bucket,
|
||||
use_ssl: false,
|
||||
precision: InfluxDB2::WritePrecision::MILLISECOND,
|
||||
)
|
||||
|
||||
write_options = InfluxDB2::WriteOptions.new(
|
||||
write_type: InfluxDB2::WriteType::BATCHING,
|
||||
batch_size: 100, flush_interval: 5_000,
|
||||
max_retries: 3, max_retry_delay: 15_000,
|
||||
exponential_base: 2,
|
||||
)
|
||||
|
||||
point_settings = InfluxDB2::PointSettings.new(default_tags: default_tags)
|
||||
@writer = @client.create_write_api(
|
||||
write_options: write_options,
|
||||
point_settings: point_settings,
|
||||
)
|
||||
end
|
||||
|
||||
public
|
||||
|
||||
def write_point(caller, name, tags: {}, fields: {})
|
||||
return unless REPORT
|
||||
@writer.write(data: { name: name, tags: tags, fields: fields })
|
||||
end
|
||||
end
|
||||
@@ -1,5 +1,10 @@
|
||||
class Metrics::EstimateDbRowsReporter < Metrics::Reporter
|
||||
def report_impl
|
||||
def initialize
|
||||
log_writes!
|
||||
super
|
||||
end
|
||||
|
||||
def report
|
||||
extra_tables = [
|
||||
"http_log_entries",
|
||||
"http_log_entry_headers",
|
||||
@@ -19,8 +24,7 @@ class Metrics::EstimateDbRowsReporter < Metrics::Reporter
|
||||
).rows.to_h
|
||||
|
||||
write_point(
|
||||
"estimate_db_rows",
|
||||
fields: row_estimates,
|
||||
"estimate_db_rows", fields: row_estimates,
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
class Metrics::GoodJobReporter < Metrics::Reporter
|
||||
def report_impl
|
||||
def initialize
|
||||
super
|
||||
end
|
||||
|
||||
def report
|
||||
total = GoodJob::Job.count
|
||||
by_queue = GoodJob::Job.group("queue_name").count
|
||||
by_state = GoodJob::JobsFilter.new({}).states
|
||||
logger.info "write metrics: total=#{total}, by_queue=#{by_queue}, by_state=#{by_state}"
|
||||
logger.info "job queue metrics: total=#{total}, by_queue=#{by_queue}, by_state=#{by_state}"
|
||||
|
||||
write_point(
|
||||
"job_queues",
|
||||
|
||||
@@ -1,86 +1,20 @@
|
||||
class Metrics::Reporter
|
||||
include HasColorLogger
|
||||
|
||||
REPORT = !Rails.env.test?
|
||||
|
||||
def self.singleton
|
||||
@singleton ||= Metrics::Reporter.new
|
||||
end
|
||||
def self.singleton=(instance)
|
||||
@singleton = instance
|
||||
def initialize
|
||||
@client = Metrics::Client.singleton
|
||||
end
|
||||
|
||||
def initialize(host: nil, token: nil, org: nil, bucket: nil, default_tags: {})
|
||||
unless REPORT
|
||||
logger.warn "!!! not reporting for this environment !!!"
|
||||
return
|
||||
end
|
||||
|
||||
host ||= Rails.application.config.x.influxdb.host || raise("no host")
|
||||
token ||= Rails.application.config.x.influxdb.token || raise("no token")
|
||||
org ||= Rails.application.config.x.influxdb.org || raise("no org")
|
||||
bucket ||= Rails.application.config.x.influxdb.bucket || raise("no bucket")
|
||||
|
||||
@client = InfluxDB2::Client.new(
|
||||
host, token,
|
||||
org: org,
|
||||
bucket: bucket,
|
||||
use_ssl: false,
|
||||
precision: InfluxDB2::WritePrecision::MILLISECOND,
|
||||
)
|
||||
|
||||
write_options = InfluxDB2::WriteOptions.new(
|
||||
write_type: InfluxDB2::WriteType::BATCHING,
|
||||
batch_size: 100, flush_interval: 5_000,
|
||||
max_retries: 3, max_retry_delay: 15_000,
|
||||
exponential_base: 2,
|
||||
)
|
||||
|
||||
point_settings = InfluxDB2::PointSettings.new(default_tags: default_tags)
|
||||
@writer = @client.create_write_api(
|
||||
write_options: write_options,
|
||||
point_settings: point_settings,
|
||||
)
|
||||
end
|
||||
|
||||
def report
|
||||
start_at = Time.now
|
||||
success = true
|
||||
begin
|
||||
self.report_impl
|
||||
rescue Exception => e
|
||||
success = false
|
||||
ex = e
|
||||
end
|
||||
duration = Time.now - start_at
|
||||
|
||||
write_point(
|
||||
"metrics_reporter",
|
||||
tags: { class_name: self.class.name },
|
||||
fields: { success: success, duration: duration },
|
||||
)
|
||||
close!
|
||||
|
||||
color = success ? :light_blue : :red
|
||||
if success
|
||||
logger.info "reporter finished (#{duration.round(2)} sec)"
|
||||
else
|
||||
logger.error "reporter failed (#{duration.round(2)} sec) - #{ex.message}"
|
||||
end
|
||||
raise ex unless success
|
||||
end
|
||||
|
||||
def close!
|
||||
return unless REPORT
|
||||
@client.close!
|
||||
def log_writes!
|
||||
@log_writes = true
|
||||
end
|
||||
|
||||
def write_point(name, tags: {}, fields: {})
|
||||
return unless REPORT
|
||||
@writer.write(data: { name: name, tags: tags, fields: fields })
|
||||
@client.write_point(self, name, tags: tags, fields: fields)
|
||||
logger.info("reporter wrote point '#{name.bold}'") if @log_writes
|
||||
end
|
||||
|
||||
def report_impl
|
||||
raise NotImplementedError, "need to implement #report on #{self.class.name}"
|
||||
def report
|
||||
raise NotImplementedError, "implement in subclass"
|
||||
end
|
||||
end
|
||||
|
||||
@@ -31,10 +31,6 @@ class Scraper::HttpClient
|
||||
get_impl(url, caused_by_entry)
|
||||
end
|
||||
|
||||
def reporter
|
||||
Metrics::Reporter.singleton
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_impl(url, caused_by_entry)
|
||||
@@ -154,7 +150,8 @@ class Scraper::HttpClient
|
||||
raise
|
||||
end
|
||||
|
||||
reporter.write_point(
|
||||
Metrics::Client.singleton.write_point(
|
||||
self,
|
||||
"http_client_response",
|
||||
tags: {
|
||||
method: "GET",
|
||||
@@ -167,6 +164,7 @@ class Scraper::HttpClient
|
||||
total_time_ms: total_time_ms,
|
||||
content_size: response_blob_entry.size,
|
||||
content_stored: response_blob_entry.bytes_stored,
|
||||
uri: uri.to_s,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user