- Modified `.gitignore` to include and manage `.devcontainer/signoz/data/*` while preserving `.keep` files. - Updated `.prettierrc` to include the `@prettier/plugin-xml` plugin and configured XML formatting options. - Added OpenTelemetry SDK and exporter gems to the `Gemfile` for enhanced monitoring capabilities. - Removed `package-lock.json` as part of the transition to Yarn for dependency management. - Enhanced `.devcontainer` configuration with new services for SigNoz, including ClickHouse and related configurations. - Introduced new ClickHouse configuration files for user and cluster settings. - Updated Nginx and OpenTelemetry collector configurations to support new logging and monitoring features. - Improved user experience in the `UserSearchBar` component by updating the placeholder text. These changes aim to improve project maintainability, monitoring capabilities, and user experience.
206 lines
6.1 KiB
Ruby
206 lines
6.1 KiB
Ruby
# typed: strict
|
|
class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
|
|
include HasBulkEnqueueJobs
|
|
queue_as :fa_user_favs
|
|
|
|
USERS_PER_FULL_PAGE = T.let(Rails.env.test? ? 9 : 190, Integer)
|
|
|
|
sig { params(args: T.untyped).void }
|
|
def initialize(*args)
|
|
super(*T.unsafe(args))
|
|
@seen_post_ids = T.let(Set.new, T::Set[Integer])
|
|
@page_id = T.let(nil, T.nilable(String))
|
|
@page_number = T.let(0, Integer)
|
|
@total_items_seen = T.let(0, Integer)
|
|
@first_job_entry = T.let(nil, T.nilable(HttpLogEntry))
|
|
@full_scan = T.let(false, T::Boolean)
|
|
@force_scan = T.let(false, T::Boolean)
|
|
@last_page_post_ids = T.let(Set.new, T::Set[Integer])
|
|
@use_http_cache = T.let(false, T::Boolean)
|
|
end
|
|
|
|
sig { override.params(args: T::Hash[Symbol, T.untyped]).void }
|
|
def perform(args)
|
|
@first_job_entry = nil
|
|
user = init_from_args!(args, build_user: false)
|
|
@full_scan = !!args[:full_scan]
|
|
@use_http_cache = !!args[:use_http_cache]
|
|
user ||
|
|
begin
|
|
defer_job(Domain::Fa::Job::UserPageJob, { url_name: args[:url_name] })
|
|
fatal_error("user does not exist: #{args}")
|
|
end
|
|
user = T.must(user)
|
|
|
|
logger.prefix = "[#{user.url_name&.bold} / #{user.state&.bold}]"
|
|
return unless user_due_for_scan?(:favs)
|
|
|
|
max_page_number =
|
|
T.let([((user.num_favorites || 0) + 1) / 48, 100].max, Integer)
|
|
logger.info "[max page number] [#{max_page_number.to_s.bold}]"
|
|
|
|
existing_faved_ids =
|
|
T.let(
|
|
Set.new(user.fav_post_joins.active.pluck(:post_id)),
|
|
T::Set[Integer],
|
|
)
|
|
|
|
to_add = T.let(Set.new, T::Set[Integer])
|
|
|
|
while true
|
|
ret = scan_page(user: user)
|
|
break if ret == :break
|
|
return if ret == :stop
|
|
|
|
if !@full_scan
|
|
new_favs = @last_page_post_ids - existing_faved_ids
|
|
if new_favs.empty?
|
|
user.scanned_favs_at = Time.zone.now
|
|
|
|
to_add += @seen_post_ids - existing_faved_ids
|
|
logger.info "[partial scan] [add #{to_add.size.to_s.bold}] [remove none]"
|
|
ReduxApplicationRecord.transaction do
|
|
to_add.each_slice(1000) do |slice|
|
|
user.fav_post_joins.upsert_all(
|
|
slice.map { |id| { post_id: id, removed: false } },
|
|
unique_by: :index_domain_fa_favs_on_user_id_and_post_id,
|
|
update_only: [:removed],
|
|
)
|
|
end
|
|
user.save!
|
|
end
|
|
logger.info "[reached end of unobserved favs] [stopping scan]"
|
|
return
|
|
end
|
|
end
|
|
|
|
break if @page_number > max_page_number
|
|
@page_number += 1
|
|
end
|
|
|
|
to_remove = existing_faved_ids - @seen_post_ids
|
|
to_add = @seen_post_ids - existing_faved_ids
|
|
logger.info "[calc change favs] [add #{to_add.size.to_s.bold}] [remove #{to_remove.size.to_s.bold}]"
|
|
|
|
ReduxApplicationRecord.transaction do
|
|
if to_remove.any?
|
|
user
|
|
.fav_post_joins
|
|
.active
|
|
.where(post_id: to_remove)
|
|
.update_all(removed: true)
|
|
end
|
|
|
|
if to_add.any?
|
|
to_add.each_slice(1000) do |slice|
|
|
user.fav_post_joins.upsert_all(
|
|
slice.map { |id| { post_id: id, removed: false } },
|
|
unique_by: :index_domain_fa_favs_on_user_id_and_post_id,
|
|
update_only: [:removed],
|
|
)
|
|
end
|
|
end
|
|
|
|
user.scanned_favs_at = Time.zone.now
|
|
user.save!
|
|
end
|
|
logger.info "[updated favs list] [posts: #{user.fav_post_joins.count.to_s.bold}]"
|
|
end
|
|
|
|
private
|
|
|
|
sig { params(user: Domain::Fa::User).returns(T.nilable(Symbol)) }
|
|
def scan_page(user:)
|
|
ret = nil
|
|
|
|
url =
|
|
if @page_id
|
|
"https://www.furaffinity.net/favorites/#{user.url_name}/#{@page_id}/next"
|
|
else
|
|
"https://www.furaffinity.net/favorites/#{user.url_name}/"
|
|
end
|
|
response =
|
|
http_client.get(
|
|
url,
|
|
caused_by_entry: causing_log_entry,
|
|
use_http_cache: @use_http_cache,
|
|
)
|
|
self.first_log_entry ||= response.log_entry
|
|
if response.status_code != 200
|
|
fatal_error(
|
|
"http #{response.status_code.to_s.red.bold}, " +
|
|
"log entry #{response.log_entry.id.to_s.bold}",
|
|
)
|
|
end
|
|
|
|
if Domain::Fa::Job::ScanUserUtils.user_disabled_or_not_found?(
|
|
user,
|
|
response,
|
|
)
|
|
logger.error("account disabled / not found, abort")
|
|
return :stop
|
|
end
|
|
|
|
page = Domain::Fa::Parser::Page.new(response.body)
|
|
fatal_error("not a favs listing page") unless page.probably_listings_page?
|
|
submissions = page.submissions_parsed
|
|
@page_id = page.favorites_next_button_id
|
|
ret = :break if @page_id.nil?
|
|
@total_items_seen += submissions.length
|
|
|
|
posts_to_create_hashes = []
|
|
existing_fa_id_to_post_id =
|
|
Domain::Fa::Post
|
|
.where(fa_id: submissions.map(&:id))
|
|
.pluck(:fa_id, :id)
|
|
.to_h
|
|
|
|
posts_to_create_hashes =
|
|
submissions
|
|
.reject { |submission| existing_fa_id_to_post_id[submission.id] }
|
|
.map do |submission|
|
|
Domain::Fa::Post.hash_from_submission_parser_helper(
|
|
submission,
|
|
first_seen_log_entry: response.log_entry,
|
|
)
|
|
end
|
|
|
|
created_post_ids = []
|
|
created_post_ids =
|
|
Domain::Fa::Post
|
|
.insert_all!(posts_to_create_hashes, returning: %i[id fa_id])
|
|
.map { |row| row["id"] } unless posts_to_create_hashes.empty?
|
|
|
|
enqueue_new_post_scan_jobs(
|
|
posts_to_create_hashes.map { |hash| hash[:fa_id] },
|
|
)
|
|
|
|
@last_page_post_ids = Set.new
|
|
created_post_ids.each do |id|
|
|
@seen_post_ids.add(id)
|
|
@last_page_post_ids.add(id)
|
|
end
|
|
existing_fa_id_to_post_id.values.each do |id|
|
|
@seen_post_ids.add(id)
|
|
@last_page_post_ids.add(id)
|
|
end
|
|
|
|
logger.info [
|
|
"[page #{@page_number.to_s.bold}]",
|
|
"[posts: #{submissions.length.to_s.bold}]",
|
|
"[created: #{posts_to_create_hashes.size.to_s.bold}]",
|
|
].join(" ")
|
|
|
|
ret
|
|
end
|
|
|
|
sig { params(fa_ids: T::Array[Integer]).void }
|
|
def enqueue_new_post_scan_jobs(fa_ids)
|
|
bulk_enqueue_jobs do
|
|
fa_ids.each do |fa_id|
|
|
defer_job(Domain::Fa::Job::ScanPostJob, { fa_id: fa_id })
|
|
end
|
|
end
|
|
end
|
|
end
|