more migration to log tagging

This commit is contained in:
Dylan Knutson
2025-02-14 19:24:03 +00:00
parent 97f7e50d61
commit 5737e5790e
6 changed files with 237 additions and 191 deletions

View File

@@ -4,12 +4,22 @@ module Domain::Inkbunny::Job
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
pool = T.let(args[:pool], Domain::PostGroup::InkbunnyPool)
logger.prefix = "[pool #{pool.id.to_s.bold}]"
logger.push_tags(make_tag("pool", pool.id))
logger.push_tags(make_arg_tag(pool))
if pool.deep_update_log_entry.present?
logger.info("skipping, already deep updated")
return
if (dule = pool.deep_update_log_entry)
if dule.status_code == 200
logger.info("pool already deep updated, skipping")
return
else
logger.info(
format_tags_arr(
[
make_arg_tag(dule, name: "last_hle"),
"retrying pool deep update",
].flatten,
),
)
end
end
processor = ApiSearchPageProcessor.new
@@ -20,7 +30,7 @@ module Domain::Inkbunny::Job
while true
loop_count += 1
if loop_count > ApiSearchPageProcessor::MAX_LOOP_COUNT
raise("loop_count: #{loop_count}")
fatal_error("loop_count > #{ApiSearchPageProcessor::MAX_LOOP_COUNT}")
end
url =
@@ -29,12 +39,11 @@ module Domain::Inkbunny::Job
rid: rid,
page: page,
)
response = http_client.post(url)
pool.deep_update_log_entry = causing_log_entry
if response.status_code != 200
fatal_error("api_search failed: #{response.status_code}")
end
response = http_client.post(url)
pool.deep_update_log_entry = causing_log_entry
fatal_error("api_search failed") if response.status_code != 200
result =
processor.process!(
@@ -45,13 +54,15 @@ module Domain::Inkbunny::Job
rid ||= T.cast(result[:rid], String)
logger.info(
[
"[rid: #{rid}]",
"[page #{page}]",
"[total changed posts: #{result[:num_total_changed_posts]}]",
"[total posts: #{result[:num_total_posts]}]",
"[total users: #{result[:num_total_users]}]",
].join(" "),
format_tags_arr(
[
make_tag("rid", rid),
make_tag("page", page),
make_tag("total changed posts", result[:num_total_changed_posts]),
make_tag("total posts", result[:num_total_posts]),
make_tag("total users", result[:num_total_users]),
],
),
)
break if result[:num_pages] == page
@@ -68,6 +79,7 @@ module Domain::Inkbunny::Job
)
end
ensure
pool.save! if pool
logger.pop_tags
end
end

View File

@@ -8,7 +8,7 @@ class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
pools_to_update = T::Set[Domain::PostGroup::InkbunnyPool].new
if ib_post_ids.empty?
logger.info "empty ib_post_ids"
logger.warn "empty ib_post_ids"
return
end
@@ -59,25 +59,25 @@ class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
)
url = build_api_submissions_url(ib_post_ids_chunk)
response = http_client.get(url)
if response.status_code != 200
fatal_error("api_submissions failed: #{response.status_code}")
end
api_submissions_json = JSON.parse(response.body)
submission_jsons = api_submissions_json["submissions"]
logger.info("api_submissions page has #{submission_jsons.size} posts")
logger.tagged(make_arg_tag(response, name: "submissions_hle")) do
if response.status_code != 200
fatal_error("api_submissions failed: #{response.status_code}")
end
api_submissions_json = JSON.parse(response.body)
submission_jsons = api_submissions_json["submissions"]
logger.info("api_submissions page has #{submission_jsons.size} posts")
submission_jsons.each do |submission_json|
Domain::Post::InkbunnyPost.transaction do
deep_update_post_from_submission_json(
submission_json,
response.log_entry,
missing_pool_post_ib_ids,
pools_to_update,
)
submission_jsons.each do |submission_json|
Domain::Post::InkbunnyPost.transaction do
deep_update_post_from_submission_json(
submission_json,
response.log_entry,
missing_pool_post_ib_ids,
pools_to_update,
)
end
end
end
logger.prefix = ""
end
sig do
@@ -94,111 +94,135 @@ class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
missing_pool_post_ib_ids,
pools_to_update
)
logger.prefix = "ib_id #{submission_json["submission_id"].to_s.bold}"
logger.info "update post #{submission_json["submission_id"].to_s.bold}"
ib_post_id = submission_json["submission_id"]
post =
Domain::Post::InkbunnyPost.includes(:pools).find_by!(
ib_id: submission_json["submission_id"],
)
logger.info "deep update post #{post.ib_id.to_s.bold}"
post.deep_updated_at = Time.zone.now
post.description = submission_json["description"]
post.writing = submission_json["writing"]
post.rating = submission_json["rating"]
post.submission_type = submission_json["submission_type"]
post.num_views = submission_json["views"]
post.num_files = submission_json["pagecount"]
post.num_favs = submission_json["favorites_count"]&.to_i
post.num_comments = submission_json["comments_count"]&.to_i
if last_updated = submission_json["last_file_update_datetime"]
post.last_file_updated_at = Time.zone.parse(last_updated)
end
post.keywords = submission_json["keywords"]
post.deep_update_log_entry = log_entry
Domain::Post::InkbunnyPost.includes(:pools).find_by(ib_id: ib_post_id)
if pools_json = submission_json["pools"]
update_submission_pools(
post,
pools_json,
missing_pool_post_ib_ids,
pools_to_update,
if post.blank?
logger.error(
format_tags(make_tag("ib_post_id", ib_post_id), "post not found"),
)
return
end
if submission_json["user_icon_url_large"]
user = T.must(post.creator)
avatar = user.avatar
avatar_url_str = submission_json["user_icon_url_large"]
if !avatar || avatar.url_str != avatar_url_str
avatar = user.build_avatar
avatar.url_str = avatar_url_str
user.deep_update_log_entry = log_entry
logger.info "avatar url changed, enqueuing download for avatar #{user.name}"
avatar.enqueue_job_after_save(
Domain::Inkbunny::Job::UserAvatarJob,
{ avatar: avatar, caused_by_entry: log_entry },
logger.tagged(make_arg_tag(post)) do
logger.info "begin deep update post"
post.deep_updated_at = Time.zone.now
post.description = submission_json["description"]
post.writing = submission_json["writing"]
post.rating = submission_json["rating"]
post.submission_type = submission_json["submission_type"]
post.num_views = submission_json["views"]
post.num_files = submission_json["pagecount"]
post.num_favs = submission_json["favorites_count"]&.to_i
post.num_comments = submission_json["comments_count"]&.to_i
if last_updated = submission_json["last_file_update_datetime"]
post.last_file_updated_at = Time.zone.parse(last_updated)
end
post.keywords = submission_json["keywords"]
post.deep_update_log_entry = log_entry
if pools_json = submission_json["pools"]
update_submission_pools(
post,
pools_json,
missing_pool_post_ib_ids,
pools_to_update,
)
end
user.save!
end
post_files_by_ib_id = post.files.index_by(&:ib_id)
file_jsons = submission_json["files"] || fatal_error("no files[] array")
post.save!
file_jsons.each_with_index do |file_json, index|
ib_file_id = file_json["file_id"]&.to_i
if ib_file_id.blank?
logger.error "files[#{index}] for post #{post.ib_id.to_s.bold} has no ib_id, skipping"
next
end
next if post_files_by_ib_id[ib_file_id]
md5_initial = file_json["initial_file_md5"]
# We create all files, even those with null MD5 sums (which also do not have
# a valid download URL), so that post.files.count will be accurate and match
# pagecount.
file =
post.files.create do |file|
md5_initial.present? ? file.state_ok! : file.state_terminal_error!
file.ib_id = ib_file_id
file.ib_created_at = Time.zone.parse(file_json["create_datetime"])
file.file_order = file_json["submission_file_order"]&.to_i
file.ib_detail_raw = file_json
file.file_name = file_json["file_name"]
file.url_str = file_json["file_url_full"]
file.md5_initial = md5_initial
file.md5_full = file_json["full_file_md5"]
file.md5s = {
initial_file_md5: md5_initial,
full_file_md5: file_json["full_file_md5"],
large_file_md5: file_json["large_file_md5"],
small_file_md5: file_json["small_file_md5"],
thumbnail_md5: file_json["thumbnail_md5"],
}
if submission_json["user_icon_url_large"]
user = T.must(post.creator)
logger.tagged(make_arg_tag(user)) do
avatar = user.avatar
avatar_url_str = submission_json["user_icon_url_large"]
if !avatar || avatar.url_str != avatar_url_str
avatar = user.build_avatar
avatar.url_str = avatar_url_str
user.deep_update_log_entry = log_entry
logger.info "avatar url changed, enqueuing avatar download"
avatar.enqueue_job_after_save(
Domain::Inkbunny::Job::UserAvatarJob,
{ avatar: avatar, caused_by_entry: log_entry },
)
end
user.save!
end
if file.state_terminal_error?
logger.error "file #{file.ib_id.to_s.bold} is poorly formed, skipping enqueue"
next
end
if file.invalid?
logger.error "file #{file.ib_id.to_s.bold} (ib_id #{post.ib_id.to_s.bold}) is invalid: #{file.errors.full_messages.join(", ")}"
fatal_error(
"file #{file.ib_id.to_s.bold} is invalid: #{file.errors.full_messages.join(", ")}",
post_files_by_ib_id =
T.cast(
post.files.index_by(&:ib_id),
T::Hash[Integer, Domain::PostFile::InkbunnyPostFile],
)
file_jsons = submission_json["files"] || fatal_error("no files[] array")
post.save!
file_jsons.each_with_index do |file_json, index|
ib_file_id = file_json["file_id"]&.to_i
logger.tagged(
make_tag("ib_file_id", ib_file_id),
make_tag("file_idx", index),
) do
if ib_file_id.blank?
logger.error("file has no ib_id, skipping")
next
end
md5_initial = file_json["initial_file_md5"]
logger.error("file has no md5") if md5_initial.blank?
file = post_files_by_ib_id[ib_file_id]
if file.present? && (file.md5_initial != md5_initial)
fatal_error(
"md5_initial changed #{file.md5_initial} -> #{md5_initial}",
)
elsif file.present?
logger.info("file already exists, skipping")
next
end
# We create all files, even those with null MD5 sums (which also do not have
# a valid download URL), so that post.files.count will be accurate and match
# pagecount.
file =
post.files.create do |file|
md5_initial.present? ? file.state_ok! : file.state_terminal_error!
file.ib_id = ib_file_id
file.ib_created_at = Time.zone.parse(file_json["create_datetime"])
file.file_order = file_json["submission_file_order"]&.to_i
file.ib_detail_raw = file_json
file.file_name = file_json["file_name"]
file.url_str = file_json["file_url_full"]
file.md5_initial = md5_initial
file.md5_full = file_json["full_file_md5"]
file.md5s = {
initial_file_md5: md5_initial,
full_file_md5: file_json["full_file_md5"],
large_file_md5: file_json["large_file_md5"],
small_file_md5: file_json["small_file_md5"],
thumbnail_md5: file_json["thumbnail_md5"],
}
end
if file.state_terminal_error?
logger.error "file is in terminal error state, skipping enqueue"
next
end
if file.invalid?
fatal_error "file is invalid: #{format_tags_arr(file.errors.full_messages)}"
end
logger.info "created new file"
defer_job(
Domain::Inkbunny::Job::StaticFileJob,
{ file: file },
{ priority: 1 },
)
end
end
logger.info "[post ib_id #{post.ib_id.to_s.bold}] " +
"new file #{file.ib_id.to_s.bold} - #{file.file_name&.black&.bold}"
defer_job(
Domain::Inkbunny::Job::StaticFileJob,
{ file: file },
{ priority: 1 },
)
end
end
@@ -227,10 +251,7 @@ class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
p =
Domain::Post::InkbunnyPost.find_or_initialize_by(
ib_id: ib_id,
) do |p|
p.creator = post.creator
# p.state_detail = { "created_from" => "pool_mention" }
end
) { |p| p.creator = post.creator }
if p.new_record?
missing_pool_post_ib_ids.add(ib_id)
p.save!

View File

@@ -7,9 +7,7 @@ module Domain::Inkbunny::Job
def perform(args)
user = user_from_args!
avatar = T.must(user.avatar)
logger.prefix =
proc { "[user #{user.name.to_s.bold} / #{user.ib_id.to_s.bold}]" }
logger.push_tags(make_arg_tag(user), make_arg_tag(avatar))
url_str = avatar.url_str
if url_str.blank?
@@ -18,26 +16,28 @@ module Domain::Inkbunny::Job
end
response = http_client.get(url_str)
self.first_log_entry ||= response.log_entry
avatar.last_log_entry = response.log_entry
logger.tagged(make_arg_tag(response.log_entry)) do
self.first_log_entry ||= response.log_entry
avatar.last_log_entry = response.log_entry
case response.status_code
when 200
avatar.state = "ok"
avatar.error_message = nil
avatar.downloaded_at = response.log_entry.created_at
avatar.log_entry = response.log_entry
logger.info("downloaded avatar")
when 404
avatar.state = "file_404"
avatar.error_message = "http #{response.status_code}"
logger.info("avatar 404")
else
avatar.state = "http_error"
avatar.error_message = "http #{response.status_code}"
fatal_error(
"http #{response.status_code}, log entry #{response.log_entry.id}",
)
case response.status_code
when 200
avatar.state = "ok"
avatar.error_message = nil
avatar.downloaded_at = response.log_entry.created_at
avatar.log_entry = response.log_entry
logger.info("downloaded avatar")
when 404
avatar.state = "file_404"
avatar.error_message = "http #{response.status_code}"
logger.info("avatar 404")
else
avatar.state = "http_error"
avatar.error_message = "http #{response.status_code}"
fatal_error(
"http #{response.status_code}, log entry #{response.log_entry.id}",
)
end
end
ensure
avatar.save! if avatar

View File

@@ -327,7 +327,7 @@ class Scraper::JobBase < ApplicationJob
block.call
ensure
ColorLogger.log_line_accumulator = nil
ColorLogger.quiet { job.enqueue_deferred_jobs! }
job.enqueue_deferred_jobs!
log_lines = T.must(log_lines)
good_job = GoodJob::CurrentThread.job
last_execution = Scraper::JobBase.last_good_job_execution
@@ -371,11 +371,19 @@ class Scraper::JobBase < ApplicationJob
GoodJob::Bulk.enqueue do
@deferred_jobs.each do |deferred_job|
args = deferred_job.params.merge({ caused_by_entry: causing_log_entry })
deferred_job.job_class.set(deferred_job.set_args).perform_later(args)
job =
deferred_job.job_class.set(deferred_job.set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: self.class,
enqueued_class: deferred_job.job_class,
)
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
make_tag("job_id", job.job_id),
"enqueue deferred job",
),
)
end
end
end

View File

@@ -29,47 +29,51 @@ module HasColorLogger
sig { params(tags: String).returns(String) }
def format_tags(*tags)
format_tags_arr(tags)
end
sig { params(tags: T::Array[String]).returns(String) }
def format_tags_arr(tags)
tags.map { |tag| "[#{tag}]" }.join(" ")
end
sig { params(arg: T.untyped).returns(T::Array[String]) }
def make_arg_tag(arg)
sig do
params(arg: T.untyped, name: T.nilable(String)).returns(T::Array[String])
end
def make_arg_tag(arg, name: nil)
tags = []
return tags if arg.nil?
if arg.is_a?(Domain::User)
prefix_and_attr = arg.class.param_prefix_and_attribute
tags << make_tag("user.kind", arg.class.view_prefix)
tags << make_tag("user.id", arg.id)
tags << make_tag(
"user.#{prefix_and_attr[0]}",
arg.send(prefix_and_attr[1]),
)
end
if arg.is_a?(Domain::Post)
prefix_and_attr = arg.class.param_prefix_and_attribute
tags << make_tag("post.kind", arg.class.view_prefix)
tags << make_tag("post.id", arg.id)
tags << make_tag(
"post.#{prefix_and_attr[0]}",
arg.send(prefix_and_attr[1]),
)
end
if arg.is_a?(Domain::PostFile)
tags << make_tag("file.id", arg.id)
tags << make_tag("file.state", arg.state)
end
if arg.is_a?(Domain::UserAvatar)
tags << make_tag("avatar.id", arg.id)
tags << make_tag("avatar.state", arg.state)
end
if arg.is_a?(HttpLogEntry)
tags << make_tag("hle.id", arg.id)
tags << make_tag("hle.code", arg.status_code)
case arg
when Domain::User
name ||= "user"
prefix, attr = arg.class.param_prefix_and_attribute
tags << make_tag("#{name}.kind", prefix)
tags << make_tag("#{name}.id", arg.id)
tags << make_tag("#{name}.#{attr}", arg.send(attr))
when Domain::Post
name ||= "post"
prefix, attr = arg.class.param_prefix_and_attribute
tags << make_tag("#{name}.kind", prefix)
tags << make_tag("#{name}.id", arg.id)
tags << make_tag("#{name}.#{attr}", arg.send(attr))
when Domain::PostFile
name ||= "file"
tags << make_tag("#{name}.id", arg.id)
tags << make_tag("#{name}.state", arg.state)
when Domain::UserAvatar
name ||= "avatar"
tags << make_tag("#{name}.id", arg.id)
tags << make_tag("#{name}.state", arg.state)
when Domain::PostGroup
name ||= "pool"
tags << make_tag("#{name}.id", arg.id)
when HttpLogEntry
name ||= "hle"
tags << make_tag("#{name}.id", arg.id)
tags << make_tag("#{name}.code", arg.status_code)
else
tags << make_tag("unknown", arg.class.name)
end
tags

View File

@@ -46,7 +46,7 @@ class Scraper::CurlHttpPerformer
).returns(Response)
end
def do_request(method, url, request_headers)
Timeout.timeout(60) { do_request_impl(method, url, request_headers) }
do_request_impl(method, url, request_headers)
end
private
@@ -61,6 +61,7 @@ class Scraper::CurlHttpPerformer
def do_request_impl(method, url, request_headers)
t, curl = get_curl
start_at = Time.now
curl.timeout = 30
curl.url = Addressable::URI.encode url
curl.follow_location = true
request_headers.each { |key, value| curl.headers[key.to_s] = value }