Files
redux-scraper/Rakefile

483 lines
14 KiB
Ruby

# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
require "rake/testtask"
require_relative "config/application"
Rails.application.load_tasks
Dir.glob(Rails.root.join("rake", "*.rake")).each { |rake_file| load rake_file }
task set_ar_stdout: :environment do
ActiveRecord::Base.logger = Logger.new($stdout)
end
task set_logger_stdout: :environment do
Rails.logger = Logger.new($stdout)
Rails.logger.formatter =
proc { |severity, datetime, progname, msg| "#{severity}: #{msg}\n" }
ActiveRecord::Base.logger = nil
ActiveJob::Base.logger = nil
GoodJob.logger = Rails.logger
end
task periodic_tasks: %i[environment set_logger_stdout] do
Thread.new do
loop do
Rake::Task["pghero:capture_space_stats"].execute
puts "logged space stats"
sleep 6.hours
end
end
Thread.new do
loop do
Rake::Task["pghero:capture_query_stats"].execute
puts "logged query stats"
sleep 5.minutes
end
end
loop { sleep 10 }
end
# TODO - migrate to Domain::Post / Domain::User
# namespace :db_sampler do
# task export: :environment do
# url_names = ENV["url_names"] || raise("need 'url_names' (comma-separated)")
# outfile = $stdout
# DbSampler.new(outfile).export(url_names.split(","))
# ensure
# outfile.close if outfile
# end
# task import: [:environment] do
# infile = $stdin
# DbSampler.new(infile).import
# ensure
# infile.close if infile
# end
# end
task good_job: %i[environment set_ar_stdout set_logger_stdout] do
env_hash = {
"RAILS_ENV" => "worker",
"GOOD_JOB_POLL_INTERVAL" => "5",
"GOOD_JOB_MAX_CACHE" => "10000",
"GOOD_JOB_QUEUE_SELECT_LIMIT" => "4096",
"GOOD_JOB_MAX_THREADS" => "4",
"GOOD_JOB_ENABLE_CRON" => "1",
"GOOD_JOB_QUEUES" =>
ENV["GOOD_JOB_QUEUES"] ||
%w[manual:4 fa_post,e621:2 *:6].reject(&:nil?).join(";"),
}
env_hash.each do |key, value|
ENV[key] = value
puts "$> #{key.light_black.bold} = #{value.bold}"
end
cmd = "bundle exec good_job"
puts "$> #{cmd.bold}"
exec(cmd)
end
task :reverse_csv do
file = ENV["file"] || raise("need 'file' (file path)")
in_csv = CSV.parse(File.open(file, "r+"), headers: true)
out_csv =
CSV.new(
File.open("rev_" + file, "w"),
write_headers: true,
headers: in_csv.headers,
)
in_csv.reverse_each { |row| out_csv << row.map(&:second) }
out_csv.close
end
task enqueue_fa_posts_missing_files: %i[environment set_logger_stdout] do
Domain::Post::FaPost
.where(state: "ok")
.where
.missing(:file)
.find_each(order: :desc) do |post|
Domain::Fa::Job::ScanPostJob.perform_now(post:)
end
end
task fix_e621_post_files: :environment do
query = Domain::Post::E621Post.where(state: "ok").where.missing(:files)
limit = ENV["limit"]&.to_i
puts "query: #{query.to_sql}"
query.find_each(batch_size: 10) do |post|
Domain::E621::Task::FixE621PostMissingFiles.new.run(post)
if limit
limit -= 1
if limit.zero?
puts "limit reached"
break
end
end
end
end
task fix_ok_e621_posts_missing_files: :environment do
query = Domain::Post::E621Post.where(state: "ok").where.missing(:file)
progress_bar =
ProgressBar.create(total: query.count, format: "%t: %c/%C %B %p%% %a %e")
query.find_each(batch_size: 10) do |post|
Domain::E621::Job::ScanPostJob.perform_now(post:)
progress_bar.progress = [progress_bar.progress + 1, progress_bar.total].min
end
end
task perform_good_jobs: :environment do
job_class = ENV["job_class"]
job_id = ENV["job_id"]
limit = ENV["limit"]&.to_i
if !job_id.present? && !job_class.present?
raise "need 'job_id' or 'job_class'"
end
relation =
if job_id
job =
GoodJob::Job.find_by(id: job_id) ||
GoodJob::Execution.find_by(id: job_id)&.job
if job.nil?
puts "no job found with id #{job_id}"
exit 1
end
puts "found job with id #{job.id}" if job.id != job_id
GoodJob::Job.where(id: job.id)
else
GoodJob::Job.queued.where(job_class: job_class).order(created_at: :asc)
end
relation.find_each(batch_size: 1) do |job|
job = T.cast(job, GoodJob::Job)
# Get the actual job instance and deserialize arguments
serialized_args = job.serialized_params["arguments"]
if serialized_args.nil?
puts "No arguments found for job #{job.id}"
next
end
deserialized_args = ActiveJob::Arguments.deserialize(serialized_args)
job_instance = job.job_class.constantize.new
job_instance.deserialize(job.serialized_params)
puts "Running job #{job.id} (#{job.job_class})"
# Create execution record
execution =
GoodJob::Execution.create!(
active_job_id: job.active_job_id,
job_class: job.job_class,
queue_name: job.queue_name,
serialized_params: job.serialized_params,
scheduled_at: job.scheduled_at,
created_at: Time.current,
updated_at: Time.current,
process_id: SecureRandom.uuid,
)
start_time = Time.current
# Temporarily disable concurrency limits
job_class = job.job_class.constantize
old_config = job_class.good_job_concurrency_config
job_class.good_job_concurrency_config = { total_limit: nil }
begin
# Perform the job with deserialized arguments
GoodJob::CurrentThread.job = job
job.update!(performed_at: Time.current)
job_instance.arguments = deserialized_args
job_instance.perform_now
# Update execution and job records
execution.update!(
finished_at: Time.current,
error: nil,
error_event: nil,
duration: Time.current - start_time,
)
job.update!(finished_at: Time.current)
puts "Job completed successfully"
rescue => e
puts "Job failed: #{e.message}"
# Update execution and job records with error
execution.update!(
finished_at: Time.current,
error: e.message,
error_event: "execution_failed",
error_backtrace: e.backtrace,
duration: Time.current - start_time,
)
job.update!(
error: "#{e.class}: #{e.message}",
error_event: "execution_failed",
)
raise e
ensure
job.update!(
executions_count: GoodJob::Execution.where(active_job_id: job.id).count,
)
# Restore original concurrency config
job_class.good_job_concurrency_config = old_config
GoodJob::CurrentThread.job = nil
end
if limit
limit -= 1
if limit.zero?
puts "limit reached"
break
end
end
end
end
task fix_removed_fa_posts: :environment do
colorize_state = ->(state) do
case state
when "ok"
"ok".green
when "removed"
"removed".red
else
state.to_s
end.bold
end
last_fa_id = ENV["start_at"]&.to_i
while true
query =
Domain::Post::FaPost
.where(state: "removed")
.where.not(title: nil)
.order(fa_id: :desc)
query = query.where(fa_id: ...last_fa_id) if last_fa_id
post = query.first
break unless post
last_fa_id = post.fa_id
puts "[before] [post.state: #{colorize_state.call(post.state)}] [post.file.id: #{post.file&.id}] [post.id: #{post.id}] [post.fa_id: #{post.fa_id}] [post.title: #{post.title}]"
Domain::Fa::Job::ScanPostJob.perform_now(post: post, force_scan: true)
post.reload
puts "[after] [post.state: #{colorize_state.call(post.state)}] [post.file.id: #{post.file&.id}] [post.id: #{post.id}] [post.fa_id: #{post.fa_id}] [post.title: #{post.title}]"
sleep 2
end
rescue => e
puts "error: #{e.message}"
binding.pry
end
task run_fa_user_avatar_jobs: :environment do
avatars =
Domain::UserAvatar
.where(state: "pending")
.joins(:user)
.where(user: { type: Domain::User::FaUser.name })
puts "count: #{avatars.count}"
avatars.each do |avatar|
Domain::Fa::Job::UserAvatarJob.perform_now(avatar:)
avatar.reload
puts "perform avatar job for #{avatar.user.url_name} - #{avatar.state.bold}"
end
end
task create_post_file_fingerprints: %i[environment set_logger_stdout] do
def migrate_posts_for_user(user)
puts "migrating posts for #{user.to_param}"
posts = user.posts.includes(files: %i[blob thumbnails bit_fingerprints])
pb =
ProgressBar.create(
total: posts.count,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
)
posts.find_in_batches(batch_size: 64) do |batch|
ReduxApplicationRecord.transaction do
batch.each do |post|
migrate_post(post)
pb.progress = [pb.progress + 1, pb.total].min
end
end
end
end
def migrate_post(post)
puts "#{post.creator&.url_name} (#{post.creator&.user_user_follows_to_count}) :: #{post.to_param} / '#{post.title_for_view}'"
ColorLogger.quiet do
post.files.each do |file|
migrate_post_file(file)
rescue StandardError => e
puts "error: #{e.message}"
end
end
end
def migrate_post_file(post_file)
ColorLogger.quiet do
Domain::PostFileThumbnailJob.new.perform({ post_file: })
rescue => e
puts "error: #{e.message}"
end
end
if ENV["post_file_descending"].present?
total = 49_783_962 # cache this value
pb =
ProgressBar.create(
total:,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
)
i = 0
Domain::PostFile
.where(state: "ok")
.includes(:blob)
.find_each(
order: :desc,
batch_size: 32,
start: ENV["start_at"],
) do |post_file|
i += 1
if i % 100 == 0
puts "migrating #{post_file.id} / #{post_file.post.title_for_view}"
end
migrate_post_file(post_file)
pb.progress = [pb.progress + 1, pb.total].min
end
elsif ENV["posts_descending"].present?
# total = Domain::Post.count
total = 66_431_808 # cache this value
pb =
ProgressBar.create(
total:,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
)
Domain::Post.find_each(order: :desc) do |post|
migrate_post(post) unless post.is_a?(Domain::Post::InkbunnyPost)
pb.progress = [pb.progress + 1, pb.total].min
end
elsif ENV["user"].present?
for_user = ENV["user"] || raise("need 'user'")
user = DomainController.find_model_from_param(Domain::User, for_user)
raise "user '#{for_user}' not found" unless user
migrate_posts_for_user(user)
elsif ENV["users_descending"].present?
# all users with posts, ordered by post count descending
migrated_file = File.open("migrated_files.txt", "a+")
migrated_file.seek(0)
migrated_users = migrated_file.readlines.map(&:strip)
users =
Domain::User::FaUser.order(
Arel.sql("user_user_follows_to_count DESC NULLS LAST"),
).pluck(:id)
users.each do |user_id|
user = Domain::User::FaUser.find(user_id)
next if migrated_users.include?(user.to_param)
puts "migrating posts for #{user.to_param} (#{user.num_watched_by} watched by)"
migrate_posts_for_user(user)
migrated_file.write("#{user.to_param}\n")
migrated_file.flush
end
migrated_file.close
else
raise "need 'user' or 'users_descending'"
end
end
task enqueue_pending_post_files: :environment do
query = Domain::PostFile.where(state: "pending")
puts "enqueueing #{query.count} pending post files"
query.find_in_batches(batch_size: 100, start: ENV["start_at"]) do |batch|
while (
queue_size =
GoodJob::Job.where(
job_class: "Job::PostFileJob",
performed_at: nil,
scheduled_at: nil,
error: nil,
).count
) > 100
puts "queue size: #{queue_size}"
sleep 10
end
batch.each do |post_file|
Job::PostFileJob.set(priority: 10).perform_later(post_file:)
end
end
end
task find_post_files_with_empty_response: :environment do
query =
Domain::PostFile
.where(state: "ok", retry_count: 0)
.joins(:log_entry)
.where(http_log_entries: { response_sha256: BlobFile::EMPTY_FILE_SHA256 })
pb = ProgressBar.create(total: query.count, format: "%t: %c/%C %B %p%% %a %e")
query.find_each(batch_size: 10) do |post_file|
# puts "post_file: #{post_file.id} / '#{post_file.post.to_param}'"
post_file.state_pending!
post_file.save!
Job::PostFileJob.perform_now(post_file:)
pb.progress = [pb.progress + 1, pb.total].min
end
end
desc "Enqueue pending post file jobs"
task enqueue_pending_post_file_jobs: :environment do
EnqueueDuePostFileJobs.new.run
end
desc "Compute null counter caches for all users"
task compute_null_user_counter_caches: :environment do
counter_caches = {
user_post_creations_count: :user_post_creations,
user_post_favs_count: :user_post_favs,
user_user_follows_from_count: :user_user_follows_from,
user_user_follows_to_count: :user_user_follows_to,
}
query =
Domain::User.where(
counter_caches.map { |col, _| "(\"#{col}\" IS NULL)" }.join(" OR "),
)
total = query.count
query = query.select(:id, *counter_caches.keys)
puts "computing #{counter_caches.keys.join(", ")} for #{total} users"
pb = ProgressBar.create(total:, format: "%t: %c/%C %B %p%% %a %e")
query.find_in_batches(batch_size: 32) do |batch|
ReduxApplicationRecord.transaction do
batch.each do |user|
nil_caches =
counter_caches.keys.filter { |cache| user.send(cache).nil? }
Domain::User.reset_counters(
user.id,
*nil_caches.map { |col| counter_caches[col] },
)
pb.progress = [pb.progress + 1, total].min
end
end
end
end