310 lines
9.8 KiB
Ruby
310 lines
9.8 KiB
Ruby
# typed: false
|
|
# frozen_string_literal: true
|
|
T.bind(self, T.all(Rake::DSL, Object))
|
|
|
|
namespace :fa do
|
|
desc "enqueue waiting posts"
|
|
task enqueue_waiting_posts: %i[set_logger_stdout environment] do |t, args|
|
|
start_at = (ENV["start_at"] || 0).to_i
|
|
low_water_mark = 50
|
|
high_water_mark = 300
|
|
poll_duration = 10
|
|
|
|
enqueuer =
|
|
Domain::Fa::PostEnqueuer.new(
|
|
reverse_scan_holes: false,
|
|
start_at: start_at,
|
|
low_water_mark: low_water_mark,
|
|
high_water_mark: high_water_mark,
|
|
)
|
|
|
|
loop { sleep poll_duration if enqueuer.run_once == :sleep }
|
|
end
|
|
|
|
task enqueue_missing_posts: %i[set_logger_stdout environment] do |t, args|
|
|
helper = Class.new.extend(ActionView::Helpers::NumberHelper)
|
|
global_state_key = "task-fa-enqueue-missing-posts-incremental"
|
|
|
|
start_at = ENV["start_at"]
|
|
if start_at.is_a?(String) && start_at == "last"
|
|
Domain::Post::FaPost.last
|
|
start_at = DomainPostsFaAux.where(state: :ok).maximum(:fa_id) - 1000
|
|
start_at = 0 if start_at < 0
|
|
stop_at =
|
|
ENV["stop_at"]&.to_i || GlobalState.get(global_state_key)&.to_i ||
|
|
raise("need explicitly set stop_at")
|
|
else
|
|
stop_at = ENV["stop_at"]&.to_i
|
|
start_at =
|
|
start_at.to_i || raise("need start_at (highest fa_id already present)")
|
|
end
|
|
|
|
puts "start fa id: #{start_at.to_s.bold}"
|
|
puts "stop fa id: #{stop_at.to_s.bold}"
|
|
if start_at <= stop_at
|
|
puts "start_at <= stop_at, nothing to do"
|
|
next nil
|
|
end
|
|
|
|
puts "counting missing posts..."
|
|
|
|
num_posts = start_at - stop_at + 1
|
|
num_existing_posts =
|
|
Domain::Post::FaPost.where(fa_id: stop_at..start_at).count
|
|
num_missing_posts = num_posts - num_existing_posts
|
|
# done in reverse order, so start_at > stop_at
|
|
puts "total posts: #{helper.number_with_delimiter(num_posts).bold}"
|
|
puts "existing posts: #{helper.number_with_delimiter(num_existing_posts).bold}"
|
|
puts "missing posts: #{helper.number_with_delimiter(num_missing_posts).bold}"
|
|
|
|
low_water_mark = 50
|
|
high_water_mark = 300
|
|
poll_duration = 10
|
|
|
|
enqueuer =
|
|
Domain::Fa::PostEnqueuer.new(
|
|
start_at: start_at,
|
|
stop_at: stop_at,
|
|
low_water_mark: low_water_mark,
|
|
high_water_mark: high_water_mark,
|
|
)
|
|
|
|
loop { sleep poll_duration if enqueuer.run_once == :sleep }
|
|
puts "setting global state key: #{global_state_key} to #{start_at.to_s.bold}"
|
|
GlobalState.set(global_state_key, start_at.to_s)
|
|
end
|
|
|
|
task enqueue_unscanned_ok_posts: %i[set_logger_stdout environment] do
|
|
start_at = ENV["start_at"]
|
|
start_at = start_at.to_i if start_at && start_at != "last"
|
|
Tasks::Fa::EnqueueUnscannedOkPostsTask.new(start_at: start_at).run
|
|
end
|
|
|
|
desc "enqueue waiting users"
|
|
task enqueue_waiting_users: %i[set_logger_stdout environment] do |t, args|
|
|
start_at = (ENV["start_at"] || 0).to_i
|
|
low_water_mark = 50
|
|
high_water_mark = 300
|
|
poll_duration = 10
|
|
|
|
enqueuer =
|
|
Domain::Fa::UserEnqueuer.new(
|
|
start_at: start_at,
|
|
low_water_mark: low_water_mark,
|
|
high_water_mark: high_water_mark,
|
|
)
|
|
|
|
loop { sleep poll_duration if enqueuer.run_once == :sleep }
|
|
end
|
|
|
|
desc "Pull missing post information from FuzzySearch"
|
|
task pull_missing_post_info_from_fuzzysearch: %i[
|
|
set_logger_stdout
|
|
environment
|
|
] do
|
|
Tasks::Fa::QueryMissingPostsFromFuzzysearch.new(
|
|
start_at: ENV["start_at"],
|
|
).run
|
|
end
|
|
|
|
desc "run a single browse page job"
|
|
task browse_page_job: %i[set_logger_stdout environment] do
|
|
Domain::Fa::Job::BrowsePageJob.set(
|
|
priority: -20,
|
|
queue: "manual",
|
|
).perform_later({})
|
|
puts "#{Time.now} - browse_page_job - Domain::Fa::Job::BrowsePageJob"
|
|
end
|
|
|
|
desc "run a single post scan job"
|
|
task scan_post_job: %i[set_logger_stdout environment] do |t, args|
|
|
fa_id = ENV["fa_id"] || raise("must provide fa_id")
|
|
Domain::Fa::Job::ScanPostJob.set(
|
|
priority: -10,
|
|
queue: "manual",
|
|
).perform_later({ fa_id: fa_id, force_scan: true })
|
|
end
|
|
|
|
desc "run a post scan job (skip force)"
|
|
task scan_post_job_noforce: %i[set_logger_stdout environment] do |t, args|
|
|
fa_id_start = ENV["fa_id_start"]
|
|
fa_id_end = ENV["fa_id_end"]
|
|
|
|
if fa_id_start || fa_id_end
|
|
if !fa_id_start || !fa_id_end
|
|
raise("need both fa_id_start and fa_id_end")
|
|
else
|
|
fa_id_start = fa_id_start.to_i
|
|
fa_id_end = fa_id_end.to_i
|
|
if fa_id_start > fa_id_end
|
|
raise("fa_id_start must be less than fa_id_end")
|
|
end
|
|
end
|
|
else
|
|
fa_id = ENV["fa_id"] || raise("must provide fa_id")
|
|
fa_id_start = fa_id.to_i
|
|
fa_id_end = fa_id.to_i
|
|
end
|
|
|
|
puts "enqueue #{fa_id_start}..#{fa_id_end} (#{fa_id_end - fa_id_start + 1})"
|
|
|
|
for fa_id in (fa_id_start..fa_id_end)
|
|
Domain::Fa::Job::ScanPostJob.set(
|
|
priority: -10,
|
|
queue: "manual",
|
|
).perform_later({ fa_id: fa_id })
|
|
end
|
|
end
|
|
|
|
desc "run a user page scan job"
|
|
task user_page_job: %i[set_logger_stdout environment] do
|
|
url_name = ENV["url_name"] || raise("must provide url_name")
|
|
Domain::Fa::Job::UserPageJob.set(
|
|
priority: -10,
|
|
queue: "manual",
|
|
).perform_later({ url_name: url_name, force_scan: true })
|
|
end
|
|
|
|
desc "run a user gallery scan job"
|
|
task user_gallery_job: %i[set_logger_stdout environment] do
|
|
url_name = ENV["url_name"] || raise("must provide url_name")
|
|
Domain::Fa::Job::UserGalleryJob.set(
|
|
priority: -10,
|
|
queue: "manual",
|
|
).perform_later({ url_name: url_name, force_scan: true })
|
|
end
|
|
|
|
desc "backfill FaUserPostFav from exisitng user page and favs scans"
|
|
task backfill_favs_and_dates: %i[set_logger_stdout environment] do
|
|
start_at = ENV["start_at"]
|
|
mode = ENV["mode"] || "both"
|
|
mode = Tasks::Fa::BackfillFavsAndDatesTask::Mode.deserialize(mode)
|
|
user_url_name = ENV["user_url_name"]
|
|
batch_size = ENV["batch_size"]&.to_i
|
|
Tasks::Fa::BackfillFavsAndDatesTask.new(
|
|
mode:,
|
|
start_at:,
|
|
user_url_name:,
|
|
batch_size:,
|
|
).run
|
|
end
|
|
|
|
# task export_to_sqlite: %i[environment set_logger_stdout] do
|
|
# profile = !!ENV["profile"]
|
|
# sample = !!ENV["sample"]
|
|
# outfile = ENV["outfile"] || raise("'outfile' required")
|
|
|
|
# tables =
|
|
# ENV["tables"] ||
|
|
# raise(
|
|
# "'tables' required (all, #{Domain::Fa::SqliteExporter::TABLES.keys.join(", ")})",
|
|
# )
|
|
# tables = tables.split(",").map(&:to_sym)
|
|
|
|
# db = SQLite3::Database.new(outfile)
|
|
# exporter = Domain::Fa::SqliteExporter.new(db, sample, tables)
|
|
# exporter.start_profiling! if profile
|
|
# exporter.run
|
|
# exporter.end_profiling! if profile
|
|
# end
|
|
|
|
# desc "Backfill favs by scanning historical HTTP logs for favorites pages"
|
|
# task backfill_favs: :environment do
|
|
# FaBackfillFavs.new(
|
|
# start_at: ENV["start_at"]&.to_i,
|
|
# limit: ENV["limit"]&.to_i,
|
|
# batch_size: ENV["batch_size"]&.to_i,
|
|
# ).run
|
|
# end
|
|
|
|
desc "Enqueue pending favs jobs"
|
|
task enqueue_due_user_favs: :environment do
|
|
Tasks::Fa::EnqueueDueUserFavsScansTask.new.run
|
|
end
|
|
|
|
desc "Enqueue pending page jobs"
|
|
task enqueue_due_user_pages: :environment do
|
|
Tasks::Fa::EnqueueDueUserPageScansTask.new.run
|
|
end
|
|
|
|
desc "Get 404 files from FurArchiver"
|
|
task get_404_files_from_fur_archiver: :set_logger_stdout do
|
|
url_name = ENV["url_name"]
|
|
|
|
if url_name
|
|
query =
|
|
Domain::User
|
|
.find_by_param("fa@#{url_name}")
|
|
.posts
|
|
.flat_map do |post|
|
|
post.files.where(state: "terminal_error", last_status_code: 404)
|
|
end
|
|
method = :each
|
|
else
|
|
query =
|
|
Domain::PostFile
|
|
.joins(:post)
|
|
.for_post_type(Domain::Post::FaPost)
|
|
.where(state: "terminal_error", last_status_code: 404)
|
|
.where(
|
|
"((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) IS NULL OR ((\"post\".\"json_attributes\"->>'tried_from_fur_archiver')::bool) != TRUE",
|
|
)
|
|
method = :find_each
|
|
end
|
|
|
|
puts "counting..."
|
|
total = query.count
|
|
puts "total: #{total}"
|
|
pb = ProgressBar.create(total: total, format: "%t: %c/%C %B %p%% %a %e")
|
|
|
|
counter = 0
|
|
query.send(method) do |post_file|
|
|
next if post_file.url_str.include?("/stories/")
|
|
Job::FaPostFurArchiverPostFileJob.perform_now({ post_file: })
|
|
post = post_file.post
|
|
puts "processed #{post.to_param} / #{post.title_for_view}".bold
|
|
counter += 1
|
|
pb.progress = [pb.progress + 1, total].min
|
|
end
|
|
end
|
|
|
|
desc "Backfill Domain::UserJobEvent::AddTrackedObject favs scans for a user"
|
|
task backfill_add_tracked_object_favs_scans: :set_logger_stdout do
|
|
url_name = ENV["url_name"] || raise("must provide url_name")
|
|
user =
|
|
Domain::User.find_by_param("fa@#{url_name}") || raise("user not found")
|
|
Domain::Fa::BackfillTrackedObjectUserFavs.new(user:).run
|
|
end
|
|
|
|
desc "Migrate Domain::UserPostFav::FaUserPostFav"
|
|
task migrate_fa_user_post_favs: :set_logger_stdout do
|
|
batch_size = ENV["batch_size"]&.to_i || 100
|
|
start_at = ENV["start_at"]
|
|
if url_name = ENV["url_name"]
|
|
user =
|
|
Domain::User.find_by_param("fa@#{url_name}") || raise("user not found")
|
|
Tasks::Fa::MigrateFaUserPostFavs.new.run_for_user(user:, batch_size:)
|
|
else
|
|
Tasks::Fa::MigrateFaUserPostFavs.new.run_for_posts(start_at:, batch_size:)
|
|
end
|
|
end
|
|
|
|
desc "Backfill posted_at for FaPosts from url_str"
|
|
task backfill_posted_at_from_url_str: :environment do
|
|
progress = ProgressBar.create(total: nil, format: "%t: %c/%C %B %p%% %a %e")
|
|
Domain::Post::FaPost
|
|
.where(posted_at: nil)
|
|
.includes(:files)
|
|
.find_in_batches(batch_size: 100) do |batch|
|
|
ReduxApplicationRecord.transaction do
|
|
batch.each do |post|
|
|
if (posted_at = post.posted_at)
|
|
post.update(posted_at:)
|
|
progress.increment
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|