add page specific stats for extension, dedup delayed jobs

This commit is contained in:
2023-02-26 12:35:16 -08:00
parent b9a1921067
commit 84b30ac4d5
16 changed files with 241 additions and 34 deletions

View File

@@ -25,28 +25,29 @@ class Domain::Fa::ApiController < ApplicationController
fa_ids.each do |fa_id|
post = fa_id_to_post[fa_id]
try_enqueue_post_scan(post, fa_id)
post_response = {
terminal_state: false,
seen_at: time_ago_or_never(post&.created_at),
scanned_at: "never",
downloaded_at: "never",
}
if post
post_response[:info_url] = domain_fa_posts_url(fa_id: post.fa_id)
post_response[:created_at] = time_ago_or_never(post.created_at),
post_response[:info_url] = domain_fa_post_url(fa_id: post.fa_id)
post_response[:scanned_at] = time_ago_or_never(post.scanned_at)
if post.file.present?
post_response[:scanned_at] = time_ago_or_never(post.file.created_at)
post_response[:downloaded_at] = time_ago_or_never(post.file.created_at)
post_response[:state] = "have_file"
post_response[:terminal_state] = true
post_response[:state_string] = "downloaded #{post_response[:scanned_at]} ago"
elsif post.scanned?
post_response[:scanned_at] = time_ago_or_never(post.scanned_at || post.updated_at)
post_response[:state] = "scanned_post"
post_response[:state_string] = "scanned #{post_response[:scanned_at]} ago"
else
post_response[:state] = post.state
post_response[:state_string] = "seen #{time_ago_or_never(post.created_at)} ago, #{post.state}"
end
else
post_response[:state] = "not_seen"
post_response[:state_string] = "not yet seen"
end
posts_response[fa_id] = post_response
@@ -59,8 +60,8 @@ class Domain::Fa::ApiController < ApplicationController
if user
user_response = {
created_at: time_ago_or_never(user.created_at),
scanned_gallery: time_ago_or_never(user.scanned_gallery_at),
scanned_page: time_ago_or_never(user.scanned_page_at),
scanned_gallery_at: time_ago_or_never(user.scanned_gallery_at),
scanned_page_at: time_ago_or_never(user.scanned_page_at),
}
states = []
states << "page" unless user.due_for_page_scan?
@@ -81,9 +82,32 @@ class Domain::Fa::ApiController < ApplicationController
users_response[url_name] = user_response
end
queue_depths = Hash.new do |hash, key|
hash[key] = 0
end
Delayed::Backend::ActiveRecord::Job.select(:id, :queue, :handler).where(queue: "manual").find_each do |job|
queue_depths[job.payload_object.job_data["job_class"]] += 1
end
queue_depths = queue_depths.map do |key, value|
[key.
delete_prefix("Domain::Fa::Job::").
split("::").
last.
underscore.
delete_suffix("_job").
gsub("_", " "),
value]
end.to_h
render json: {
posts: posts_response,
users: users_response,
queues: {
total_depth: queue_depths.values.sum,
depths: queue_depths,
},
}
end
@@ -131,7 +155,7 @@ class Domain::Fa::ApiController < ApplicationController
def time_ago_or_never(time)
if time
helpers.time_ago_in_words(time, include_seconds: true)
helpers.time_ago_in_words(time, include_seconds: true) + " ago"
else
"never"
end

View File

@@ -1,5 +1,6 @@
class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
queue_as :fa_browse_page
ignore_signature_args [:caused_by_entry]
def perform(args)
if self.class.ran_recently?
@@ -17,6 +18,7 @@ class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
while true
break unless scan_browse_page
break if @page_number > 150
@page_number += 1
end
@@ -57,14 +59,11 @@ class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::FaJobBase
)
if listing_page_stats.total_seen == 0
fatal_error("zero posts on browse page #{log_entry.id.to_s.bold}")
fatal_error("0 posts on browse page - log entry #{log_entry.id.to_s.bold}")
end
logger.info("saw #{listing_page_stats.new_seen.to_s.bold} new #{listing_page_stats.total_seen.to_s.bold} total posts on page #{@page_number.to_s.bold}")
@total_num_new_posts_seen += listing_page_stats.new_seen
@total_num_posts_seen += listing_page_stats.total_seen
# listing_page_stats.last_was_new
listing_page_stats.new_seen == 0
listing_page_stats.new_seen > 0
end
end

View File

@@ -1,5 +1,6 @@
class Domain::Fa::Job::ScanFileJob < Domain::Fa::Job::FaJobBase
queue_as :static_file
ignore_signature_args [:caused_by_entry]
def perform(args)
propagate_priority!

View File

@@ -1,5 +1,6 @@
class Domain::Fa::Job::ScanPostJob < Domain::Fa::Job::FaJobBase
queue_as :fa_post
ignore_signature_args [:caused_by_entry]
def perform(args)
propagate_priority!

View File

@@ -1,5 +1,7 @@
class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::FaJobBase
queue_as :fa_user_gallery
ignore_signature_args [:caused_by_entry]
MAX_PAGE_NUMBER = 350
def perform(args)

View File

@@ -1,5 +1,6 @@
class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::FaJobBase
queue_as :fa_user_page
ignore_signature_args [:caused_by_entry]
def perform(args)
propagate_priority!

View File

@@ -1,4 +1,10 @@
class Scraper::JobBase < ApplicationJob
def self.ignore_signature_args(args = nil)
@ignore_signature_args ||= []
@ignore_signature_args = args if args
@ignore_signature_args
end
def initialize(http_client = nil)
@http_client = http_client || self.class.build_http_client
@logger_prefix = ""

View File

@@ -0,0 +1,40 @@
require "delayed_job"
class DelayedDuplicatePreventionPlugin < Delayed::Plugin
module SignatureConcern
extend ActiveSupport::Concern
included do
before_validation :set_sig_and_args
validates_uniqueness_of :signature
end
private
def set_sig_and_args
self.signature = generate_signature
self.args = self.payload_object.job_data["arguments"]
end
def generate_signature
job_data = payload_object.job_data
ignore_signature_args = job_data["job_class"].
constantize.
ignore_signature_args
ignore_signature_args << :_aj_symbol_keys
arguments = job_data["arguments"]&.first
arguments = arguments.reject do |key, value|
ignore_signature_args.include?(key.to_sym)
end.to_h if arguments
sig = []
sig << job_data["job_class"]
sig << job_data["priority"] || "*"
sig << job_data["queue_name"] || "*"
sig << Digest::SHA256.hexdigest(arguments.inspect)[0...16]
sig = sig.join("|")
sig
end
end
end

View File

@@ -75,7 +75,14 @@ class Domain::Fa::Post < ReduxApplicationRecord
end
def scanned_at
state_detail["scanned_at"] && Time.at(state_detail["scanned_at"])
# at some point, `scanned_at` was populated to avoid having to look up the
# post's `last_submission_page` log entry, but we fall back to that
# if scanned_at isn't populated yet
if state_detail["scanned_at"]
Time.at(state_detail["scanned_at"])
else
last_submission_page&.created_at
end
end
def scanned_at=(time)
@@ -90,7 +97,7 @@ class Domain::Fa::Post < ReduxApplicationRecord
end
def last_submission_page
HttpLogEntry.find(self.log_entry_detail["last_submission_page_id"])
HttpLogEntry.find_by_id(self.log_entry_detail["last_submission_page_id"])
end
def have_file?

View File

@@ -0,0 +1,4 @@
require "delayed_duplicate_prevention_plugin"
Delayed::Backend::ActiveRecord::Job.send(:include, DelayedDuplicatePreventionPlugin::SignatureConcern)
Delayed::Worker.plugins << DelayedDuplicatePreventionPlugin

View File

@@ -0,0 +1,6 @@
class DelayedJobSignature < ActiveRecord::Migration[7.0]
def change
add_column :delayed_jobs, :signature, :string, index: true, using: "hash"
add_column :delayed_jobs, :args, :jsonb
end
end

7
db/schema.rb generated
View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2023_02_24_065330) do
ActiveRecord::Schema[7.0].define(version: 2023_02_26_062154) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_stat_statements"
enable_extension "plpgsql"
@@ -38,7 +38,10 @@ ActiveRecord::Schema[7.0].define(version: 2023_02_24_065330) do
t.string "queue"
t.datetime "created_at"
t.datetime "updated_at"
t.index ["priority", "run_at"], name: "delayed_jobs_priority"
t.string "signature"
t.jsonb "args"
t.index ["priority", "run_at"], name: "delayed_jobs_priority_run_at_idx"
t.index ["queue"], name: "delayed_jobs_queue_idx"
end
create_table "domain_fa_posts", force: :cascade do |t|

View File

@@ -334,33 +334,105 @@ function faIdFromViewHref(href) {
elemsCountsNode.style.height = "100%";
elemsCountsNode.style.display = "flex";
elemsCountsNode.style.flexDirection = "row";
elemsCountsNode.style.gap = "1em";
let extraStats = [
let postsStats = [
{ name: "not seen", value: "---" },
{ name: "ok", value: "---" },
{ name: "scanned", value: "---" },
{ name: "have file", value: "---" },
];
let queueStats = [
{ name: "queue depths", value: "---" },
{ name: "", value: "" },
{ name: "", value: "" },
{ name: "", value: "" },
];
let pageSpecificStats = [];
function renderStats() {
navbarStatusNode.innerHTML = "";
navbarStatusNode.appendChild(elemsCountsNode);
function renderTable(stats, lineHeight = "1em", fontSize = "1em") {
let table = `<table style="border-collapse: collapse; line-height: ${lineHeight}; width: 8em;font-size: ${fontSize}">`
stats.each(({ name, value }) => {
table += `<tr><td style="padding-right:5px">${name}:</td><td>${value}</td></tr>`
function renderTable(stats, styleOpts = {}) {
if (stats.length == 0) {
return null;
}
if (styleOpts['border-collapse'] == null) {
styleOpts['border-collapse'] = "collapse";
}
let table = document.createElement("table");
for (const [property, value] of Object.entries(styleOpts)) {
table.style.setProperty(property, value);
}
let tbody = document.createElement("tbody");
table.appendChild(tbody);
stats.each(({ name, value, sep, nameAlign, valueAlign }) => {
if (name == "" && value == "") {
tbody.innerHTML += `<tr><td>---</td><td>---</td></tr>`;
}
else {
if (sep == null) {
sep = ":";
}
if (nameAlign == null) {
nameAlign = "right";
}
if (valueAlign == null) {
valueAlign = "right";
}
tbody.innerHTML += `<tr>
<td style="text-align:${nameAlign};padding-right:5px">${name}${sep}</td>
<td style="text-align:${valueAlign}">${value}</td>
</tr>`;
}
});
table += `</table>`;
return table;
}
function optsForNumRows(numRows) {
switch (numRows) {
case 0:
case 1:
case 2:
case 3:
return { 'line-height': '1.0em', 'font-size': "1.0em" };
case 4:
return { 'line-height': '0.9em', 'font-size': "0.8em" };
default:
return { 'line-height': "0.9em", 'font-size': "0.6em" }
}
}
const baseStatsTable = renderTable([
{ name: "num users", value: urlNames.length },
{ name: "num posts", value: faIds.length }
]);
const extraStatsTable = extraStats.length > 0 ? renderTable(extraStats, "0.5em", "0.8em") : "";
elemsCountsNode.innerHTML = baseStatsTable + extraStatsTable;
], { ...optsForNumRows(2), width: "auto" });
const postsStatsTable = renderTable(postsStats, {
...optsForNumRows(postsStats.length), width: "auto"
});
const queueStatsTable = renderTable(queueStats, {
...optsForNumRows(queueStats.length), width: "auto"
});
const pageStatsTable = renderTable(pageSpecificStats, {
...optsForNumRows(pageSpecificStats.length), width: "auto"
});
elemsCountsNode.innerHTML = "";
baseStatsTable && elemsCountsNode.appendChild(baseStatsTable);
postsStatsTable && elemsCountsNode.appendChild(postsStatsTable);
queueStatsTable && elemsCountsNode.appendChild(queueStatsTable);
pageStatsTable && elemsCountsNode.appendChild(pageStatsTable);
}
renderStats();
@@ -385,6 +457,8 @@ function faIdFromViewHref(href) {
else {
console.log("reached terminal state on all objects");
}
} else {
navbarStatusNode.innerHTML = `<b>${response.status} from scraper</b>`;
}
},
});
@@ -419,12 +493,49 @@ function faIdFromViewHref(href) {
});
}
extraStats = [
postsStats = [
{ name: "not seen", value: numNotSeenPosts },
{ name: "ok", value: numOkPosts },
{ name: "scanned", value: numScannedPosts },
{ name: "have file", value: numHaveFile },
];
queueStats = Object
.entries(jsonResponse.queues.depths)
.map(([queue, depth]) => ({ name: queue, value: depth }));
queueStats = [
{ name: "queue depths", value: `${jsonResponse.queues.total_depth} total` },
...queueStats
];
while (queueStats.length < 4) {
queueStats.push({ name: "", value: "" });
}
allTerminalState &&= jsonResponse.queues.total_depth == 0;
const thisPageFaId = faIdFromViewHref(window.location.href);
const pssCommon = { sep: '', valueAlign: 'left' };
if (thisPageFaId != null) {
const postData = jsonResponse.posts[thisPageFaId];
pageSpecificStats = [
{ name: 'link', value: `<a target="_blank" href="${postData.info_url}">${thisPageFaId}</a>`, ...pssCommon },
{ name: `seen`, value: postData.seen_at, ...pssCommon },
{ name: `scanned`, value: postData.scanned_at, ...pssCommon },
{ name: `downloaded`, value: postData.downloaded_at, ...pssCommon }
];
}
const thisPageUrlName = urlNameFromUserHref(window.location.href);
if (thisPageUrlName != null) {
const userData = jsonResponse.users[thisPageUrlName];
pageSpecificStats = [
{ name: '', value: thisPageUrlName, ...pssCommon },
{ name: 'first seen', value: userData.created_at, ...pssCommon },
{ name: 'page scan', value: userData.scanned_page_at, ...pssCommon },
{ name: 'gallery scan', value: userData.scanned_gallery_at, ...pssCommon }
];
}
renderStats();
return !allTerminalState;

View File

@@ -9,8 +9,9 @@ end
# This runs in the worker processes after it has been forked
on_worker_boot do |worker_info|
ActiveJob::Base.logger = Logger.new("/dev/null")
Delayed::Worker.logger = Logger.new("/dev/null")
Rails.logger = Logger.new(STDOUT)
Delayed::Worker.logger = Logger.new(STDOUT)
puts "Worker #{Process.pid} started"
# Reconnect to the database

View File

@@ -33,7 +33,7 @@ end
worker_group(:manual) do |g|
g.read_ahead = 8
g.sleep_delay = 5
g.workers = 1
g.workers = 4
g.queues = ["manual"]
end

View File

@@ -9,8 +9,9 @@ end
# This runs in the worker processes after it has been forked
on_worker_boot do |worker_info|
ActiveJob::Base.logger = Logger.new("/dev/null")
Delayed::Worker.logger = Logger.new("/dev/null")
Rails.logger = Logger.new(STDOUT)
Delayed::Worker.logger = Logger.new(STDOUT)
puts "Worker #{Process.pid} started"
# Reconnect to the database
@@ -32,7 +33,7 @@ end
worker_group(:static_file) do |g|
g.read_ahead = 1
g.sleep_delay = 5
g.workers = 1
g.workers = 8
g.queues = ["static_file"]
end