620 lines
18 KiB
Ruby
620 lines
18 KiB
Ruby
# Add your own tasks in files placed in lib/tasks ending in .rake,
|
|
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
|
|
|
|
require "rake/testtask"
|
|
require_relative "config/application"
|
|
|
|
Rails.application.load_tasks
|
|
Dir.glob(Rails.root.join("rake", "*.rake")).each { |rake_file| load rake_file }
|
|
|
|
task set_ar_stdout: :environment do
|
|
ActiveRecord::Base.logger = Logger.new($stdout)
|
|
end
|
|
|
|
task set_logger_stdout: :environment do
|
|
Rails.logger = Logger.new($stdout)
|
|
Rails.logger.formatter =
|
|
proc { |severity, datetime, progname, msg| "#{severity}: #{msg}\n" }
|
|
ActiveRecord::Base.logger = nil
|
|
ActiveJob::Base.logger = nil
|
|
GoodJob.logger = Rails.logger
|
|
end
|
|
|
|
task periodic_tasks: %i[environment set_logger_stdout] do
|
|
Thread.new do
|
|
loop do
|
|
Rake::Task["pghero:capture_space_stats"].execute
|
|
puts "logged space stats"
|
|
sleep 6.hours
|
|
end
|
|
end
|
|
|
|
Thread.new do
|
|
loop do
|
|
Rake::Task["pghero:capture_query_stats"].execute
|
|
puts "logged query stats"
|
|
sleep 5.minutes
|
|
end
|
|
end
|
|
|
|
loop { sleep 10 }
|
|
end
|
|
|
|
namespace :db_sampler do
|
|
task export: :environment do
|
|
url_names = ENV["url_names"] || raise("need 'url_names' (comma-separated)")
|
|
outfile = $stdout
|
|
DbSampler.new(outfile).export(url_names.split(","))
|
|
ensure
|
|
outfile.close if outfile
|
|
end
|
|
|
|
task import: [:environment] do
|
|
infile = $stdin
|
|
DbSampler.new(infile).import
|
|
ensure
|
|
infile.close if infile
|
|
end
|
|
end
|
|
|
|
task good_job: %i[environment set_ar_stdout set_logger_stdout] do
|
|
env_hash = {
|
|
"RAILS_ENV" => "worker",
|
|
"GOOD_JOB_POLL_INTERVAL" => "5",
|
|
"GOOD_JOB_MAX_CACHE" => "10000",
|
|
"GOOD_JOB_QUEUE_SELECT_LIMIT" => "4096",
|
|
"GOOD_JOB_MAX_THREADS" => "4",
|
|
"GOOD_JOB_ENABLE_CRON" => "1",
|
|
"GOOD_JOB_QUEUES" =>
|
|
ENV["GOOD_JOB_QUEUES"] ||
|
|
%w[manual:4 fa_post,e621:2 *:6].reject(&:nil?).join(";"),
|
|
}
|
|
|
|
env_hash.each do |key, value|
|
|
ENV[key] = value
|
|
puts "$> #{key.light_black.bold} = #{value.bold}"
|
|
end
|
|
|
|
cmd = "bundle exec good_job"
|
|
puts "$> #{cmd.bold}"
|
|
exec(cmd)
|
|
end
|
|
|
|
task :reverse_csv do
|
|
file = ENV["file"] || raise("need 'file' (file path)")
|
|
in_csv = CSV.parse(File.open(file, "r+"), headers: true)
|
|
out_csv =
|
|
CSV.new(
|
|
File.open("rev_" + file, "w"),
|
|
write_headers: true,
|
|
headers: in_csv.headers,
|
|
)
|
|
in_csv.reverse_each { |row| out_csv << row.map(&:second) }
|
|
out_csv.close
|
|
end
|
|
|
|
task migrate_to_domain: :environment do
|
|
only_user = ENV["only_user"]
|
|
allowed_domains = %w[e621 fa ib]
|
|
only_domains = (ENV["only_domains"] || "").split(",")
|
|
only_domains = allowed_domains if only_domains.empty?
|
|
if (only_domains - allowed_domains).any?
|
|
raise "only_domains must be a subset of #{allowed_domains.join(", ")}"
|
|
end
|
|
|
|
migrator = Domain::MigrateToDomain.new
|
|
|
|
if only_domains.include?("e621")
|
|
# migrator.migrate_e621_users(only_user: only_user)
|
|
# migrator.migrate_e621_posts(only_user: only_user)
|
|
migrator.migrate_e621_users_favs(only_user: only_user)
|
|
end
|
|
|
|
if only_domains.include?("fa")
|
|
# migrator.migrate_fa_users(only_user: only_user)
|
|
# migrator.migrate_fa_posts(only_user: only_user)
|
|
# migrator.migrate_fa_users_favs(only_user: only_user)
|
|
migrator.migrate_fa_users_followed_users(only_user: only_user)
|
|
end
|
|
|
|
if only_domains.include?("ib")
|
|
migrator.migrate_inkbunny_users(only_user: only_user)
|
|
migrator.migrate_inkbunny_posts(only_user: only_user)
|
|
migrator.migrate_inkbunny_pools(only_user: nil) if only_user.nil?
|
|
end
|
|
end
|
|
|
|
task infer_last_submission_log_entries: :environment do
|
|
only_fa_id = ENV["only_fa_id"]
|
|
start = ENV["start_at"]&.to_i || nil
|
|
|
|
if only_fa_id
|
|
relation = Domain::Fa::Post.where(fa_id: only_fa_id)
|
|
else
|
|
relation =
|
|
Domain::Fa::Post
|
|
.where(state: :ok)
|
|
.where(last_submission_page_id: nil)
|
|
.or(Domain::Fa::Post.where(state: :ok).where(posted_at: nil))
|
|
end
|
|
|
|
relation.find_each(batch_size: 10, start:) do |post|
|
|
parts = ["[id: #{post.id}]", "[fa_id: #{post.fa_id}]"]
|
|
|
|
log_entry = post.guess_last_submission_page
|
|
unless log_entry
|
|
parts << "[no log entry]"
|
|
next
|
|
end
|
|
|
|
contents = log_entry.response&.contents
|
|
unless contents
|
|
parts << "[no contents]"
|
|
next
|
|
end
|
|
|
|
parser = Domain::Fa::Parser::Page.new(contents)
|
|
if parser.submission_not_found?
|
|
parts << "[removed]"
|
|
post.state = :removed
|
|
else
|
|
posted_at = parser.submission.posted_date
|
|
post.posted_at ||= posted_at
|
|
parts << "[posted at: #{posted_at}]"
|
|
end
|
|
|
|
if post.last_submission_page_id.present? &&
|
|
log_entry.id != post.last_submission_page_id
|
|
parts << "[overwrite]"
|
|
end
|
|
post.last_submission_page_id = log_entry.id
|
|
|
|
parts << "[log entry: #{log_entry.id}]"
|
|
parts << "[uri: #{log_entry.uri.to_s}]"
|
|
post.save!
|
|
rescue => e
|
|
parts << "[error: #{e.message}]"
|
|
ensure
|
|
puts parts.join(" ")
|
|
end
|
|
end
|
|
|
|
task fix_fa_post_files: :environment do
|
|
file_ids = ENV["file_ids"]&.split(",") || raise("need 'file_ids'")
|
|
Domain::Fa::Post
|
|
.where(file_id: file_ids)
|
|
.find_each { |post| post.fix_file_by_uri! }
|
|
end
|
|
|
|
task fix_fa_post_files_by_csv: :environment do
|
|
require "csv"
|
|
|
|
csv_file = ENV["csv_file"] || raise("need 'csv_file'")
|
|
CSV
|
|
.open(csv_file, headers: true)
|
|
.each do |row|
|
|
id = row["id"].to_i
|
|
post = Domain::Fa::Post.find(id)
|
|
post.fix_file_by_uri!
|
|
end
|
|
end
|
|
|
|
task fix_buggy_fa_posts: :environment do
|
|
post_fa_ids = %w[7704069 7704068 6432347 6432346].map(&:to_i)
|
|
|
|
require "uri"
|
|
|
|
post_fa_ids.each do |fa_id|
|
|
post = Domain::Fa::Post.find_by(fa_id: fa_id)
|
|
next unless post&.file
|
|
post_file_url_str = Addressable::URI.parse(post.file_url_str).to_s
|
|
file_url_str = Addressable::URI.parse(CGI.unescape(post.file.uri.to_s)).to_s
|
|
hle = post.guess_last_submission_page
|
|
|
|
parser = Domain::Fa::Parser::Page.new(hle.response.contents)
|
|
if parser.submission_not_found?
|
|
post.file = nil
|
|
post.save!
|
|
puts "submission not found"
|
|
else
|
|
submission = parser.submission
|
|
full_res_img = Addressable::URI.parse(submission.full_res_img)
|
|
full_res_img.scheme = "https" if full_res_img.scheme.blank?
|
|
matches = full_res_img.to_s == post.file_url_str
|
|
end
|
|
end
|
|
end
|
|
|
|
task enqueue_fa_posts_missing_files: %i[environment set_logger_stdout] do
|
|
Domain::Post::FaPost
|
|
.where(state: "ok")
|
|
.where
|
|
.missing(:file)
|
|
.find_each(order: :desc) do |post|
|
|
Domain::Fa::Job::ScanPostJob.perform_now(post:)
|
|
end
|
|
end
|
|
|
|
task fix_e621_post_files: :environment do
|
|
query = Domain::Post::E621Post.where(state: "ok").where.missing(:files)
|
|
limit = ENV["limit"]&.to_i
|
|
puts "query: #{query.to_sql}"
|
|
|
|
query.find_each(batch_size: 10) do |post|
|
|
Domain::E621::Task::FixE621PostMissingFiles.new.run(post)
|
|
if limit
|
|
limit -= 1
|
|
if limit.zero?
|
|
puts "limit reached"
|
|
break
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
task fix_ok_e621_posts_missing_files: :environment do
|
|
query = Domain::Post::E621Post.where(state: "ok").where.missing(:file)
|
|
progress_bar =
|
|
ProgressBar.create(total: query.count, format: "%t: %c/%C %B %p%% %a %e")
|
|
query.find_each(batch_size: 10) do |post|
|
|
Domain::E621::Job::ScanPostJob.perform_now(post:)
|
|
progress_bar.progress = [progress_bar.progress + 1, progress_bar.total].min
|
|
end
|
|
end
|
|
|
|
task perform_good_jobs: :environment do
|
|
job_class = ENV["job_class"]
|
|
job_id = ENV["job_id"]
|
|
limit = ENV["limit"]&.to_i
|
|
|
|
if !job_id.present? && !job_class.present?
|
|
raise "need 'job_id' or 'job_class'"
|
|
end
|
|
|
|
relation =
|
|
if job_id
|
|
job =
|
|
GoodJob::Job.find_by(id: job_id) ||
|
|
GoodJob::Execution.find_by(id: job_id)&.job
|
|
if job.nil?
|
|
puts "no job found with id #{job_id}"
|
|
exit 1
|
|
end
|
|
puts "found job with id #{job.id}" if job.id != job_id
|
|
GoodJob::Job.where(id: job.id)
|
|
else
|
|
GoodJob::Job.queued.where(job_class: job_class).order(created_at: :asc)
|
|
end
|
|
|
|
relation.find_each(batch_size: 1) do |job|
|
|
job = T.cast(job, GoodJob::Job)
|
|
|
|
# Get the actual job instance and deserialize arguments
|
|
serialized_args = job.serialized_params["arguments"]
|
|
if serialized_args.nil?
|
|
puts "No arguments found for job #{job.id}"
|
|
next
|
|
end
|
|
|
|
deserialized_args = ActiveJob::Arguments.deserialize(serialized_args)
|
|
job_instance = job.job_class.constantize.new
|
|
job_instance.deserialize(job.serialized_params)
|
|
|
|
puts "Running job #{job.id} (#{job.job_class})"
|
|
|
|
# Create execution record
|
|
execution =
|
|
GoodJob::Execution.create!(
|
|
active_job_id: job.active_job_id,
|
|
job_class: job.job_class,
|
|
queue_name: job.queue_name,
|
|
serialized_params: job.serialized_params,
|
|
scheduled_at: job.scheduled_at,
|
|
created_at: Time.current,
|
|
updated_at: Time.current,
|
|
process_id: SecureRandom.uuid,
|
|
)
|
|
|
|
start_time = Time.current
|
|
|
|
# Temporarily disable concurrency limits
|
|
job_class = job.job_class.constantize
|
|
old_config = job_class.good_job_concurrency_config
|
|
job_class.good_job_concurrency_config = { total_limit: nil }
|
|
|
|
begin
|
|
# Perform the job with deserialized arguments
|
|
GoodJob::CurrentThread.job = job
|
|
job.update!(performed_at: Time.current)
|
|
job_instance.arguments = deserialized_args
|
|
job_instance.perform_now
|
|
|
|
# Update execution and job records
|
|
execution.update!(
|
|
finished_at: Time.current,
|
|
error: nil,
|
|
error_event: nil,
|
|
duration: Time.current - start_time,
|
|
)
|
|
job.update!(finished_at: Time.current)
|
|
puts "Job completed successfully"
|
|
rescue => e
|
|
puts "Job failed: #{e.message}"
|
|
# Update execution and job records with error
|
|
execution.update!(
|
|
finished_at: Time.current,
|
|
error: e.message,
|
|
error_event: "execution_failed",
|
|
error_backtrace: e.backtrace,
|
|
duration: Time.current - start_time,
|
|
)
|
|
job.update!(
|
|
error: "#{e.class}: #{e.message}",
|
|
error_event: "execution_failed",
|
|
)
|
|
raise e
|
|
ensure
|
|
job.update!(
|
|
executions_count: GoodJob::Execution.where(active_job_id: job.id).count,
|
|
)
|
|
# Restore original concurrency config
|
|
job_class.good_job_concurrency_config = old_config
|
|
GoodJob::CurrentThread.job = nil
|
|
end
|
|
|
|
if limit
|
|
limit -= 1
|
|
if limit.zero?
|
|
puts "limit reached"
|
|
break
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
task fix_removed_fa_posts: :environment do
|
|
colorize_state = ->(state) do
|
|
case state
|
|
when "ok"
|
|
"ok".green
|
|
when "removed"
|
|
"removed".red
|
|
else
|
|
state.to_s
|
|
end.bold
|
|
end
|
|
|
|
last_fa_id = ENV["start_at"]&.to_i
|
|
while true
|
|
query =
|
|
Domain::Post::FaPost
|
|
.where(state: "removed")
|
|
.where.not(title: nil)
|
|
.order(fa_id: :desc)
|
|
query = query.where(fa_id: ...last_fa_id) if last_fa_id
|
|
post = query.first
|
|
break unless post
|
|
last_fa_id = post.fa_id
|
|
|
|
puts "[before] [post.state: #{colorize_state.call(post.state)}] [post.file.id: #{post.file&.id}] [post.id: #{post.id}] [post.fa_id: #{post.fa_id}] [post.title: #{post.title}]"
|
|
Domain::Fa::Job::ScanPostJob.perform_now(post: post, force_scan: true)
|
|
post.reload
|
|
puts "[after] [post.state: #{colorize_state.call(post.state)}] [post.file.id: #{post.file&.id}] [post.id: #{post.id}] [post.fa_id: #{post.fa_id}] [post.title: #{post.title}]"
|
|
sleep 2
|
|
end
|
|
rescue => e
|
|
puts "error: #{e.message}"
|
|
binding.pry
|
|
end
|
|
|
|
task fix_fa_user_avatars: :environment do
|
|
new_users_missing_avatar =
|
|
Domain::User::FaUser.where.missing(:avatar).select(:url_name)
|
|
old_users_with_avatar =
|
|
Domain::Fa::User
|
|
.where(url_name: new_users_missing_avatar)
|
|
.includes(:avatar)
|
|
.filter(&:avatar)
|
|
|
|
old_users_with_avatar.each do |old_user|
|
|
old_avatar = old_user.avatar
|
|
new_user = Domain::User::FaUser.find_by(url_name: old_user.url_name)
|
|
|
|
if old_avatar.log_entry.nil?
|
|
puts "enqueue fresh download for #{old_user.url_name}"
|
|
new_avatar = Domain::UserAvatar.new
|
|
new_user.avatar = new_avatar
|
|
new_user.save!
|
|
Domain::Fa::Job::UserAvatarJob.perform_now(avatar: new_avatar)
|
|
new_avatar.reload
|
|
|
|
binding.pry
|
|
next
|
|
end
|
|
|
|
new_avatar = Domain::UserAvatar.new
|
|
new_avatar.log_entry_id = old_avatar.log_entry_id
|
|
new_avatar.last_log_entry_id = old_avatar.log_entry_id
|
|
new_avatar.url_str = old_avatar.file_url_str
|
|
new_avatar.downloaded_at = old_avatar.log_entry&.created_at
|
|
new_avatar.state =
|
|
case old_avatar.state
|
|
when "ok"
|
|
old_avatar.log_entry_id.present? ? "ok" : "pending"
|
|
when "file_not_found"
|
|
new_avatar.error_message = old_avatar.state
|
|
"file_404"
|
|
else
|
|
new_avatar.error_message = old_avatar.state
|
|
"http_error"
|
|
end
|
|
new_user.avatar = new_avatar
|
|
new_user.save!
|
|
puts "migrated #{old_user.url_name}"
|
|
rescue => e
|
|
puts "error: #{e.message}"
|
|
binding.pry
|
|
end
|
|
end
|
|
|
|
task run_fa_user_avatar_jobs: :environment do
|
|
avatars =
|
|
Domain::UserAvatar
|
|
.where(state: "pending")
|
|
.joins(:user)
|
|
.where(user: { type: Domain::User::FaUser.name })
|
|
|
|
puts "count: #{avatars.count}"
|
|
|
|
avatars.each do |avatar|
|
|
Domain::Fa::Job::UserAvatarJob.perform_now(avatar:)
|
|
avatar.reload
|
|
puts "perform avatar job for #{avatar.user.url_name} - #{avatar.state.bold}"
|
|
end
|
|
end
|
|
|
|
task sample_migrated_favs: :environment do
|
|
new_user = Domain::User::FaUser.where.not(migrated_user_favs_at: nil).last
|
|
old_user = Domain::Fa::User.find_by(url_name: new_user.url_name)
|
|
|
|
puts "user: #{new_user.url_name}"
|
|
puts "old fav count: #{old_user.fav_posts.count}"
|
|
puts "new fav count: #{new_user.faved_posts.count}"
|
|
end
|
|
|
|
task create_post_file_fingerprints: :environment do
|
|
def migrate_posts_for_user(user)
|
|
puts "migrating posts for #{user.to_param}"
|
|
pb =
|
|
ProgressBar.create(
|
|
total: user.posts.count,
|
|
format: "%t: %c/%C %B %p%% %a %e",
|
|
)
|
|
|
|
user
|
|
.posts
|
|
.includes(:files)
|
|
.find_in_batches(batch_size: 64) do |batch|
|
|
ReduxApplicationRecord.transaction do
|
|
batch.each { |post| migrate_post(post) }
|
|
pb.progress = [pb.progress + 1, pb.total].min
|
|
end
|
|
end
|
|
end
|
|
|
|
def migrate_post(post)
|
|
puts "migrating #{post.id} / #{post.to_param} / '#{post.title_for_view}'"
|
|
ColorLogger.quiet do
|
|
post.files.each do |file|
|
|
migrate_post_file(file)
|
|
rescue StandardError => e
|
|
puts "error: #{e.message}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def migrate_post_file(post_file)
|
|
job = Domain::PostFileThumbnailJob.new
|
|
ColorLogger.quiet do
|
|
job.perform({ post_file: })
|
|
rescue => e
|
|
puts "error: #{e.message}"
|
|
end
|
|
end
|
|
|
|
if ENV["post_file_descending"].present?
|
|
total = 49_783_962 # cache this value
|
|
pb = ProgressBar.create(total:, format: "%t: %c/%C %B %p%% %a %e")
|
|
i = 0
|
|
Domain::PostFile
|
|
.where(state: "ok")
|
|
.includes(:blob)
|
|
.find_each(
|
|
order: :desc,
|
|
batch_size: 32,
|
|
start: ENV["start_at"],
|
|
) do |post_file|
|
|
i += 1
|
|
if i % 100 == 0
|
|
puts "migrating #{post_file.id} / #{post_file.post.title_for_view}"
|
|
end
|
|
migrate_post_file(post_file)
|
|
pb.progress = [pb.progress + 1, pb.total].min
|
|
end
|
|
elsif ENV["posts_descending"].present?
|
|
# total = Domain::Post.count
|
|
total = 66_431_808 # cache this value
|
|
pb = ProgressBar.create(total:, format: "%t: %c/%C %B %p%% %a %e")
|
|
Domain::Post.find_each(order: :desc) do |post|
|
|
migrate_post(post) unless post.is_a?(Domain::Post::InkbunnyPost)
|
|
pb.progress = [pb.progress + 1, pb.total].min
|
|
end
|
|
elsif ENV["user"].present?
|
|
for_user = ENV["user"] || raise("need 'user'")
|
|
user = DomainController.find_model_from_param(Domain::User, for_user)
|
|
raise "user '#{for_user}' not found" unless user
|
|
migrate_posts_for_user(user)
|
|
elsif ENV["users_descending"].present?
|
|
# all users with posts, ordered by post count descending
|
|
migrated_file = File.open("migrated_files.txt", "a+")
|
|
migrated_file.seek(0)
|
|
migrated_users = migrated_file.readlines.map(&:strip)
|
|
users =
|
|
Domain::User::FaUser.order(
|
|
Arel.sql("json_attributes->>'num_watched_by' DESC NULLS LAST"),
|
|
).pluck(:id)
|
|
|
|
users.each do |user_id|
|
|
user = Domain::User::FaUser.find(user_id)
|
|
next if migrated_users.include?(user.to_param)
|
|
puts "migrating posts for #{user.to_param} (#{user.num_watched_by} watched by)"
|
|
migrate_posts_for_user(user)
|
|
migrated_file.write("#{user.to_param}\n")
|
|
migrated_file.flush
|
|
end
|
|
migrated_file.close
|
|
else
|
|
raise "need 'user' or 'users_descending'"
|
|
end
|
|
end
|
|
|
|
task enqueue_pending_post_files: :environment do
|
|
query = Domain::PostFile.where(state: "pending")
|
|
puts "enqueueing #{query.count} pending post files"
|
|
query.find_in_batches(batch_size: 100, start: ENV["start_at"]) do |batch|
|
|
while (
|
|
queue_size =
|
|
GoodJob::Job.where(
|
|
job_class: "Job::PostFileJob",
|
|
performed_at: nil,
|
|
scheduled_at: nil,
|
|
error: nil,
|
|
).count
|
|
) > 100
|
|
puts "queue size: #{queue_size}"
|
|
sleep 10
|
|
end
|
|
batch.each do |post_file|
|
|
Job::PostFileJob.set(priority: 10).perform_later(post_file:)
|
|
end
|
|
end
|
|
end
|
|
|
|
task find_post_files_with_empty_response: :environment do
|
|
query =
|
|
Domain::PostFile
|
|
.where(state: "ok", retry_count: 0)
|
|
.joins(:log_entry)
|
|
.where(http_log_entries: { response_sha256: BlobFile::EMPTY_FILE_SHA256 })
|
|
|
|
pb = ProgressBar.create(total: query.count, format: "%t: %c/%C %B %p%% %a %e")
|
|
|
|
query.find_each(batch_size: 10) do |post_file|
|
|
# puts "post_file: #{post_file.id} / '#{post_file.post.to_param}'"
|
|
post_file.state_pending!
|
|
post_file.save!
|
|
Job::PostFileJob.perform_now(post_file:)
|
|
pb.progress = [pb.progress + 1, pb.total].min
|
|
end
|
|
end
|