flatten ib job namespaces

This commit is contained in:
Dylan Knutson
2025-06-24 17:58:34 +00:00
parent 70c65ffdbd
commit e1c21fb2df
6 changed files with 255 additions and 257 deletions

View File

@@ -1,72 +1,72 @@
# typed: strict
module Domain::Inkbunny::Job
class LatestPostsJob < Base
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
processor = ApiSearchPageProcessor.new
first_log_entry = nil
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
class Domain::Inkbunny::Job::LatestPostsJob < Domain::Inkbunny::Job::Base
ApiSearchPageProcessor = Domain::Inkbunny::Job::ApiSearchPageProcessor
while true
loop_count += 1
if loop_count > ApiSearchPageProcessor::MAX_PAGE_COUNT
fatal_error("loop_count > #{ApiSearchPageProcessor::MAX_PAGE_COUNT}")
end
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
processor = ApiSearchPageProcessor.new
first_log_entry = nil
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
url = ApiSearchPageProcessor.build_api_search_url(rid: rid, page: page)
response = http_client.post(url)
while true
loop_count += 1
if loop_count > ApiSearchPageProcessor::MAX_PAGE_COUNT
fatal_error("loop_count > #{ApiSearchPageProcessor::MAX_PAGE_COUNT}")
end
if response.status_code != 200
fatal_error("api_search failed: #{response.status_code}")
end
url = ApiSearchPageProcessor.build_api_search_url(rid: rid, page: page)
response = http_client.post(url)
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
num_new_posts = T.cast(result[:num_new_posts], Integer)
logger.info(
[
"[rid: #{rid}]",
"[page #{page}]",
"[new posts: #{num_new_posts}]",
"[total changed posts: #{result[:num_total_changed_posts]}]",
"[total changed users: #{result[:num_total_changed_users]}]",
"[total posts: #{result[:num_total_posts]}]",
"[total users: #{result[:num_total_users]}]",
].join(" "),
if response.status_code != 200
fatal_error("api_search failed: #{response.status_code}")
end
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
if num_new_posts < ApiSearchPageProcessor::SUBMISSIONS_PER_PAGE
logger.info("[no new posts, stopping]")
break
end
rid ||= T.cast(result[:rid], String)
page += 1
num_new_posts = T.cast(result[:num_new_posts], Integer)
logger.info(
[
"[rid: #{rid}]",
"[page #{page}]",
"[new posts: #{num_new_posts}]",
"[total changed posts: #{result[:num_total_changed_posts]}]",
"[total changed users: #{result[:num_total_changed_users]}]",
"[total posts: #{result[:num_total_posts]}]",
"[total users: #{result[:num_total_users]}]",
].join(" "),
)
if num_new_posts < ApiSearchPageProcessor::SUBMISSIONS_PER_PAGE
logger.info("[no new posts, stopping]")
break
end
rid ||= T.cast(result[:rid], String)
page += 1
end
processor.all_users.each do |user|
if user.due_for_gallery_scan?
defer_job(
Domain::Inkbunny::Job::UserGalleryJob,
{ user: user, caused_by_entry: first_log_entry },
{ priority: 2 },
)
end
end
posts_to_update = processor.all_posts.reject(&:deep_updated_at)
if posts_to_update.any?
processor.all_users.each do |user|
if user.due_for_gallery_scan?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{
ib_post_ids: posts_to_update.map(&:ib_id),
caused_by_entry: first_log_entry,
},
Domain::Inkbunny::Job::UserGalleryJob,
{ user: user, caused_by_entry: first_log_entry },
{ priority: 2 },
)
end
end
posts_to_update = processor.all_posts.reject(&:deep_updated_at)
if posts_to_update.any?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{
ib_post_ids: posts_to_update.map(&:ib_id),
caused_by_entry: first_log_entry,
},
)
end
end
end

View File

@@ -1,14 +1,12 @@
# typed: strict
module Domain::Inkbunny::Job
class StaticFileJob < Base
include Domain::StaticFileJobHelper
queue_as :static_file
class Domain::Inkbunny::Job::StaticFileJob < Domain::Inkbunny::Job::Base
include Domain::StaticFileJobHelper
queue_as :static_file
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
file = post_file_from_args!
logger.push_tags(make_arg_tag(file), make_arg_tag(file.post))
download_post_file(file)
end
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
file = post_file_from_args!
logger.push_tags(make_arg_tag(file), make_arg_tag(file.post))
download_post_file(file)
end
end

View File

@@ -1,86 +1,86 @@
# typed: strict
module Domain::Inkbunny::Job
class UpdatePoolJob < Base
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
pool = T.let(args[:pool], Domain::PostGroup::InkbunnyPool)
logger.push_tags(make_arg_tag(pool))
class Domain::Inkbunny::Job::UpdatePoolJob < Domain::Inkbunny::Job::Base
ApiSearchPageProcessor = Domain::Inkbunny::Job::ApiSearchPageProcessor
if (dule = pool.deep_update_log_entry)
if dule.status_code == 200
logger.info("pool already deep updated, skipping")
return
else
logger.info(
format_tags_arr(
[
make_arg_tag(dule, name: "last_hle"),
"retrying pool deep update",
].flatten,
),
)
end
end
processor = ApiSearchPageProcessor.new
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
while true
loop_count += 1
if loop_count > ApiSearchPageProcessor::MAX_PAGE_COUNT
fatal_error("loop_count > #{ApiSearchPageProcessor::MAX_PAGE_COUNT}")
end
url =
ApiSearchPageProcessor.build_api_search_url(
pool_id: pool.ib_id,
rid: rid,
page: page,
)
response = http_client.post(url)
pool.deep_update_log_entry = causing_log_entry
fatal_error("api_search failed") if response.status_code != 200
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
rid ||= T.cast(result[:rid], String)
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
pool = T.let(args[:pool], Domain::PostGroup::InkbunnyPool)
logger.push_tags(make_arg_tag(pool))
if (dule = pool.deep_update_log_entry)
if dule.status_code == 200
logger.info("pool already deep updated, skipping")
return
else
logger.info(
format_tags_arr(
[
make_tag("rid", rid),
make_tag("page", page),
make_tag("total changed posts", result[:num_total_changed_posts]),
make_tag("total posts", result[:num_total_posts]),
make_tag("total users", result[:num_total_users]),
],
make_arg_tag(dule, name: "last_hle"),
"retrying pool deep update",
].flatten,
),
)
break if result[:num_pages] == page
page += 1
end
pool.save!
posts_to_update = processor.all_posts.reject(&:deep_updated_at)
if posts_to_update.any?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{ ib_post_ids: posts_to_update.map(&:ib_id) },
)
end
ensure
pool.save! if pool
logger.pop_tags
end
processor = ApiSearchPageProcessor.new
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
while true
loop_count += 1
if loop_count > ApiSearchPageProcessor::MAX_PAGE_COUNT
fatal_error("loop_count > #{ApiSearchPageProcessor::MAX_PAGE_COUNT}")
end
url =
ApiSearchPageProcessor.build_api_search_url(
pool_id: pool.ib_id,
rid: rid,
page: page,
)
response = http_client.post(url)
pool.deep_update_log_entry = causing_log_entry
fatal_error("api_search failed") if response.status_code != 200
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
rid ||= T.cast(result[:rid], String)
logger.info(
format_tags_arr(
[
make_tag("rid", rid),
make_tag("page", page),
make_tag("total changed posts", result[:num_total_changed_posts]),
make_tag("total posts", result[:num_total_posts]),
make_tag("total users", result[:num_total_users]),
],
),
)
break if result[:num_pages] == page
page += 1
end
pool.save!
posts_to_update = processor.all_posts.reject(&:deep_updated_at)
if posts_to_update.any?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{ ib_post_ids: posts_to_update.map(&:ib_id) },
)
end
ensure
pool.save! if pool
logger.pop_tags
end
end

View File

@@ -1,5 +1,7 @@
# typed: strict
class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
ApiSearchPageProcessor = Domain::Inkbunny::Job::ApiSearchPageProcessor
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
ib_post_ids = T.cast(args[:ib_post_ids], T.nilable(T::Array[Integer]))
@@ -99,7 +101,7 @@ class Domain::Inkbunny::Job::UpdatePostsJob < Domain::Inkbunny::Job::Base
Domain::Post::InkbunnyPost.includes(:pools).find_by(ib_id: ib_post_id)
if post.blank?
processor = Domain::Inkbunny::Job::ApiSearchPageProcessor.new
processor = ApiSearchPageProcessor.new
processor.upsert_post_from_submission_json!(
submission_json,
caused_by_entry: log_entry,

View File

@@ -1,45 +1,43 @@
# typed: strict
module Domain::Inkbunny::Job
class UserAvatarJob < Base
queue_as :static_file
class Domain::Inkbunny::Job::UserAvatarJob < Domain::Inkbunny::Job::Base
queue_as :static_file
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
avatar = avatar_from_args!
logger.push_tags(make_arg_tag(avatar))
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
avatar = avatar_from_args!
logger.push_tags(make_arg_tag(avatar))
url_str = avatar.url_str
if url_str.blank?
logger.warn("avatar has no url_str")
return
end
response = http_client.get(url_str)
logger.tagged(make_arg_tag(response.log_entry)) do
self.first_log_entry ||= response.log_entry
avatar.last_log_entry = response.log_entry
case response.status_code
when 200
avatar.state = "ok"
avatar.error_message = nil
avatar.downloaded_at = response.log_entry.created_at
avatar.log_entry = response.log_entry
logger.info("downloaded avatar")
when 404
avatar.state = "file_404"
avatar.error_message = "http #{response.status_code}"
logger.info("avatar 404")
else
avatar.state = "http_error"
avatar.error_message = "http #{response.status_code}"
fatal_error(
"http #{response.status_code}, log entry #{response.log_entry.id}",
)
end
end
ensure
avatar.save! if avatar
url_str = avatar.url_str
if url_str.blank?
logger.warn("avatar has no url_str")
return
end
response = http_client.get(url_str)
logger.tagged(make_arg_tag(response.log_entry)) do
self.first_log_entry ||= response.log_entry
avatar.last_log_entry = response.log_entry
case response.status_code
when 200
avatar.state = "ok"
avatar.error_message = nil
avatar.downloaded_at = response.log_entry.created_at
avatar.log_entry = response.log_entry
logger.info("downloaded avatar")
when 404
avatar.state = "file_404"
avatar.error_message = "http #{response.status_code}"
logger.info("avatar 404")
else
avatar.state = "http_error"
avatar.error_message = "http #{response.status_code}"
fatal_error(
"http #{response.status_code}, log entry #{response.log_entry.id}",
)
end
end
ensure
avatar.save! if avatar
end
end

View File

@@ -1,82 +1,82 @@
# typed: strict
module Domain::Inkbunny::Job
class UserGalleryJob < Base
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
logger.push_tags(
make_tag("id", user.id),
make_tag("user_ib_id", user.ib_id),
class Domain::Inkbunny::Job::UserGalleryJob < Domain::Inkbunny::Job::Base
ApiSearchPageProcessor = Domain::Inkbunny::Job::ApiSearchPageProcessor
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
logger.push_tags(
make_tag("id", user.id),
make_tag("user_ib_id", user.ib_id),
)
if user.scanned_gallery_at&.after?(1.week.ago)
logger.warn(
"gallery scanned #{time_ago_in_words(user.scanned_gallery_at)}, skipping",
)
return
end
processor = ApiSearchPageProcessor.new
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
max_loop_count = T.let(2000, Integer)
loop do
loop_count += 1
raise("loop_count: #{loop_count}") if loop_count > max_loop_count
url =
ApiSearchPageProcessor.build_api_search_url(
ib_user_id: user.ib_id,
rid: rid,
page: page,
)
response = http_client.post(url)
if response.status_code != 200
fatal_error("api_search failed: #{response.status_code}")
end
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
num_new_posts = result[:num_new_posts]
logger.info(
[
"[rid: #{rid}]",
"[page: #{page}]",
"[new posts: #{num_new_posts}]",
"[total new posts: #{result[:num_total_new_posts]}]",
"[total changed posts: #{result[:num_total_changed_posts]}]",
"[total posts: #{result[:num_total_posts]}]",
].join(" "),
)
if user.scanned_gallery_at&.after?(1.week.ago)
logger.warn(
"gallery scanned #{time_ago_in_words(user.scanned_gallery_at)}, skipping",
)
return
user.reload
if user.scanned_gallery_at.present? && num_new_posts == 0
logger.info("[no new posts, stopping]")
break
end
processor = ApiSearchPageProcessor.new
rid = T.let(nil, T.nilable(String))
page = T.let(1, Integer)
loop_count = T.let(0, Integer)
max_loop_count = T.let(2000, Integer)
loop do
loop_count += 1
raise("loop_count: #{loop_count}") if loop_count > max_loop_count
url =
ApiSearchPageProcessor.build_api_search_url(
ib_user_id: user.ib_id,
rid: rid,
page: page,
)
response = http_client.post(url)
if response.status_code != 200
fatal_error("api_search failed: #{response.status_code}")
end
result =
processor.process!(
JSON.parse(response.body),
caused_by_entry: response.log_entry,
)
num_new_posts = result[:num_new_posts]
logger.info(
[
"[rid: #{rid}]",
"[page: #{page}]",
"[new posts: #{num_new_posts}]",
"[total new posts: #{result[:num_total_new_posts]}]",
"[total changed posts: #{result[:num_total_changed_posts]}]",
"[total posts: #{result[:num_total_posts]}]",
].join(" "),
)
user.reload
if user.scanned_gallery_at.present? && num_new_posts == 0
logger.info("[no new posts, stopping]")
break
end
rid = T.cast(result[:rid], String)
break if T.cast(result[:num_pages], Integer) <= page
page += 1
end
user.scanned_gallery_at = Time.current
user.save!
if processor.changed_posts.any?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{ ib_post_ids: processor.changed_posts.map(&:ib_id) },
)
end
Domain::User.reset_counters(user.id, :user_post_creations)
ensure
logger.pop_tags
rid = T.cast(result[:rid], String)
break if T.cast(result[:num_pages], Integer) <= page
page += 1
end
user.scanned_gallery_at = Time.current
user.save!
if processor.changed_posts.any?
defer_job(
Domain::Inkbunny::Job::UpdatePostsJob,
{ ib_post_ids: processor.changed_posts.map(&:ib_id) },
)
end
Domain::User.reset_counters(user.id, :user_post_creations)
ensure
logger.pop_tags
end
end