Migrate create_post_file_fingerprints rake task to typed class

- Extract rake task logic into Tasks::CreatePostFileFingerprintsTask class
- Add full Sorbet strict typing with comprehensive method signatures
- Implement Mode enum with proper string serialization for execution modes
- Inherit from InterruptableTask for signal handling (SIGINT/SIGTERM)
- Add comprehensive test coverage with 24 examples covering all modes
- Update Rakefile to instantiate and use the new task class
- Support all existing execution modes: post_file_descending, posts_descending, user, users_descending
- Maintain backward compatibility with existing environment variable interface
This commit is contained in:
Dylan Knutson
2025-07-08 05:10:43 +00:00
parent be36c74bbd
commit 4f8a5cfcff
3 changed files with 657 additions and 111 deletions

124
Rakefile
View File

@@ -293,120 +293,22 @@ task run_fa_user_avatar_jobs: :environment do
end
task create_post_file_fingerprints: %i[environment set_logger_stdout] do
PB_FORMAT = "%B %c/%C (%r/sec) %J%% %a %E"
task = Tasks::CreatePostFileFingerprintsTask.new
def migrate_posts_for_user(user)
puts "migrating posts for #{user.to_param}"
posts = user.posts.includes(files: %i[blob thumbnails bit_fingerprints])
pb =
ProgressBar.create(
total: posts.count,
progress_mark: " ",
remainder_mark: " ",
format: PB_FORMAT,
)
posts.find_in_batches(batch_size: 64) do |batch|
ReduxApplicationRecord.transaction do
batch.each do |post|
migrate_post(post)
pb.progress = [pb.progress + 1, pb.total].min
end
end
mode =
if ENV["post_file_descending"].present?
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
elsif ENV["posts_descending"].present?
Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending
elsif ENV["user"].present?
Tasks::CreatePostFileFingerprintsTask::Mode::User
elsif ENV["users_descending"].present?
Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending
else
raise "need one of: post_file_descending, posts_descending, user, users_descending"
end
end
def migrate_post(post)
puts "#{post.creator&.url_name} (#{post.creator&.user_user_follows_to_count}) :: #{post.to_param} / '#{post.title_for_view}'"
ColorLogger.quiet do
post.files.each do |file|
migrate_post_file(file)
rescue StandardError => e
puts "error: #{e.message}"
end
end
end
def migrate_post_file(post_file)
ColorLogger.quiet do
Domain::PostFileThumbnailJob.new.perform({ post_file: })
rescue => e
puts "error: #{e.message}"
end
end
if ENV["post_file_descending"].present?
total = 49_783_962 # cache this value
pb =
ProgressBar.create(
total:,
progress_mark: " ",
remainder_mark: " ",
format: PB_FORMAT,
)
i = 0
Domain::PostFile
.where(state: "ok")
.includes(:blob)
.find_each(
order: :desc,
batch_size: 32,
start: ENV["start_at"],
) do |post_file|
i += 1
if i % 100 == 0
post = post_file.post
creator_str =
(post.class.has_creators? ? post.creator.to_param : "(none)")
post_desc =
"#{creator_str&.rjust(20)} / #{post_file.post&.to_param}".ljust(40)
puts "post_file = #{post_file.id} :: #{post_desc} - #{post_file.post.title_for_view}"
end
migrate_post_file(post_file)
pb.progress = [pb.progress + 1, pb.total].min
end
elsif ENV["posts_descending"].present?
# total = Domain::Post.count
total = 66_431_808 # cache this value
pb =
ProgressBar.create(
total:,
progress_mark: " ",
remainder_mark: " ",
format: PB_FORMAT,
)
Domain::Post.find_each(order: :desc) do |post|
migrate_post(post) unless post.is_a?(Domain::Post::InkbunnyPost)
pb.progress = [pb.progress + 1, pb.total].min
end
elsif ENV["user"].present?
for_user = ENV["user"] || raise("need 'user'")
user = DomainController.find_model_from_param(Domain::User, for_user)
raise "user '#{for_user}' not found" unless user
migrate_posts_for_user(user)
elsif ENV["users_descending"].present?
# all users with posts, ordered by post count descending
migrated_file = File.open("migrated_files.txt", "a+")
migrated_file.seek(0)
migrated_users = migrated_file.readlines.map(&:strip)
users =
Domain::User::FaUser.order(
Arel.sql("user_user_follows_to_count DESC NULLS LAST"),
).pluck(:id)
users.each do |user_id|
user = Domain::User::FaUser.find(user_id)
next if migrated_users.include?(user.to_param)
puts "migrating posts for #{user.to_param} (#{user.num_watched_by} watched by)"
migrate_posts_for_user(user)
migrated_file.write("#{user.to_param}\n")
migrated_file.flush
end
migrated_file.close
else
raise "need 'user' or 'users_descending'"
end
task.run(mode: mode, user_param: ENV["user"], start_at: ENV["start_at"])
end
task enqueue_pending_post_files: :environment do

View File

@@ -0,0 +1,190 @@
# typed: strict
module Tasks
class CreatePostFileFingerprintsTask < InterruptableTask
extend T::Sig
PB_FORMAT = T.let("%B %c/%C (%r/sec) %J%% %a %E", String)
class Mode < T::Enum
enums do
PostFileDescending = new("post_file_descending")
PostsDescending = new("posts_descending")
User = new("user")
UsersDescending = new("users_descending")
end
end
sig { params(log_sink: T.any(IO, StringIO)).void }
def initialize(log_sink: $stderr)
@log_sink = log_sink
super(log_sink:)
end
sig do
params(
mode: Mode,
user_param: T.nilable(String),
start_at: T.nilable(String),
).void
end
def run(mode:, user_param: nil, start_at: nil)
case mode
when Mode::PostFileDescending
run_post_file_descending(start_at)
when Mode::PostsDescending
run_posts_descending
when Mode::User
raise "need 'user_param' when mode is Mode::User" unless user_param
run_single_user(user_param)
when Mode::UsersDescending
run_users_descending
else
T.absurd(mode)
end
end
private
sig { params(start_at: T.nilable(String)).void }
def run_post_file_descending(start_at)
total = 49_783_962 # cache this value
pb = create_progress_bar(total)
i = 0
query = Domain::PostFile.where(state: "ok").includes(:blob)
query.find_each(
order: :desc,
batch_size: 32,
start: start_at,
) do |post_file|
break if interrupted?
i += 1
if i % 100 == 0
post = post_file.post
creator_str =
if post&.class&.has_creators?
T.unsafe(post).creator&.to_param || "(none)"
else
"(none)"
end
post_desc =
"#{creator_str&.rjust(20)} / #{post_file.post&.to_param}".ljust(40)
log(
"post_file = #{post_file.id} :: #{post_desc} - #{post_file.post&.title_for_view}",
)
end
migrate_post_file(post_file)
pb.progress = [pb.progress + 1, pb.total].min
end
end
sig { void }
def run_posts_descending
total = 66_431_808 # cache this value
pb = create_progress_bar(total)
Domain::Post.find_each(order: :desc) do |post|
break if interrupted?
migrate_post(post)
pb.progress = [pb.progress + 1, pb.total].min
end
end
sig { params(user_param: String).void }
def run_single_user(user_param)
user = DomainController.find_model_from_param(Domain::User, user_param)
raise "user '#{user_param}' not found" unless user
migrate_posts_for_user(user)
end
sig { void }
def run_users_descending
migrated_file = File.open("migrated_files.txt", "a+")
migrated_file.seek(0)
migrated_users = migrated_file.readlines.map(&:strip)
users =
Domain::User::FaUser.order(
Arel.sql("user_user_follows_to_count DESC NULLS LAST"),
).pluck(:id)
users.each do |user_id|
user = Domain::User::FaUser.find(user_id)
next if migrated_users.include?(user.to_param)
log(
"migrating posts for #{user.to_param} (#{user.num_watched_by} watched by)",
)
migrate_posts_for_user(user)
migrated_file.write("#{user.to_param}\n")
migrated_file.flush
end
migrated_file.close
end
sig { params(user: Domain::User).void }
def migrate_posts_for_user(user)
log("migrating posts for #{user.to_param}")
posts = user.posts.includes(files: %i[blob thumbnails bit_fingerprints])
pb = create_progress_bar(posts.count)
posts.find_in_batches(batch_size: 64) do |batch|
ReduxApplicationRecord.transaction do
batch.each do |post|
break if interrupted?
migrate_post(post)
pb.progress = [pb.progress + 1, pb.total].min
end
end
end
end
sig { params(post: Domain::Post).void }
def migrate_post(post)
creator_info =
if post.class.has_creators? && creator = T.unsafe(post).creator
"#{creator.url_name} (#{creator.user_user_follows_to_count})"
else
"(no creator)"
end
log("#{creator_info} :: #{post.to_param} / '#{post.title_for_view}'")
ColorLogger.quiet do
post.files.each do |file|
break if interrupted?
migrate_post_file(file)
rescue StandardError => e
log("error: #{e.message}")
end
end
end
sig { params(post_file: Domain::PostFile).void }
def migrate_post_file(post_file)
ColorLogger.quiet do
Domain::PostFileThumbnailJob.new.perform({ post_file: })
rescue => e
log("error: #{e.message}")
end
end
sig { params(total: Integer).returns(ProgressBar::Base) }
def create_progress_bar(total)
ProgressBar.create(
total: total,
progress_mark: " ",
remainder_mark: " ",
format: PB_FORMAT,
)
end
sig { params(message: String).void }
def log(message)
@log_sink.puts(message)
end
end
end

View File

@@ -0,0 +1,454 @@
# typed: false
require "rails_helper"
RSpec.describe Tasks::CreatePostFileFingerprintsTask do
let(:log_sink) { StringIO.new }
let(:task) { described_class.new(log_sink: log_sink) }
before do
# Mock external dependencies
allow(ProgressBar).to receive(:create).and_return(
instance_double(
ProgressBar::Base,
progress: 0,
total: 100,
"progress=": nil,
),
)
allow(Domain::PostFileThumbnailJob).to receive(:new).and_return(
instance_double(Domain::PostFileThumbnailJob, perform: nil),
)
allow(ColorLogger).to receive(:quiet).and_yield
end
describe "#run" do
context "with user mode missing user_param" do
it "raises an error when user mode is used without user_param" do
expect {
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: nil,
)
}.to raise_error("need 'user_param' when mode is Mode::User")
end
end
context "with post_file_descending mode" do
let!(:post_files) do
5.times.map { |i| create(:domain_post_file, state: "ok") }
end
it "processes post files and calls thumbnail job for each" do
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending)
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
5,
).times
end
it "respects start_at parameter" do
# Create files with specific IDs
file1 = create(:domain_post_file, state: "ok", id: 1001)
file2 = create(:domain_post_file, state: "ok", id: 1002)
file3 = create(:domain_post_file, state: "ok", id: 1003)
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
start_at: "1002",
)
# Should process files with id <= 1002 (file1 and file2)
expect(Domain::PostFileThumbnailJob).to have_received(:new).at_least(
2,
).times
end
it "logs progress every 100 items" do
# Create 101 files to trigger logging
101.times { create(:domain_post_file, state: "ok") }
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending)
log_output = log_sink.string
expect(log_output).to include("post_file = ")
end
it "stops when interrupted" do
# Create a few files
3.times { create(:domain_post_file, state: "ok") }
# Simulate interrupt after first iteration
allow(task).to receive(:interrupted?).and_return(false, true)
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending)
# Should stop early due to interrupt
expect(Domain::PostFileThumbnailJob).to have_received(:new).at_most(
1,
).times
end
end
context "with posts_descending mode" do
let!(:posts) do
3.times.map { |i| create(:domain_post_fa_post, fa_id: 100 + i) }
end
it "processes all posts and their files" do
# Add files to posts
posts.each { |post| create(:domain_post_file, post: post) }
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending)
# Should call thumbnail job for each file
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
3,
).times
# Should log each post
log_output = log_sink.string
posts.each { |post| expect(log_output).to include(post.to_param) }
end
it "stops when interrupted" do
# Add files to posts
posts.each { |post| create(:domain_post_file, post: post) }
# Simulate interrupt after first post
allow(task).to receive(:interrupted?).and_return(false, true)
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending)
# Should process fewer files due to interrupt
expect(Domain::PostFileThumbnailJob).to have_received(:new).at_most(
1,
).times
end
end
context "with user mode" do
let!(:user) { create(:domain_user_fa_user, url_name: "testuser") }
let!(:posts) do
3.times.map do |i|
create(:domain_post_fa_post, fa_id: 200 + i).tap do |post|
post.creator = user
post.save!
create(:domain_post_file, post: post)
end
end
end
it "processes posts for the specified user" do
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::User, user_param: "fa@testuser")
# Should process all files for this user's posts
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
3,
).times
# Should log user processing
log_output = log_sink.string
expect(log_output).to include("migrating posts for #{user.to_param}")
end
it "raises an error for non-existent user" do
expect {
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: "fa@nonexistent",
)
}.to raise_error("user 'fa@nonexistent' not found")
end
end
context "with users_descending mode" do
let!(:users) do
3.times.map do |i|
create(
:domain_user_fa_user,
url_name: "user#{i}",
user_user_follows_to_count: (10 - i),
).tap do |user|
# Create a post for each user
post = create(:domain_post_fa_post, fa_id: 500 + i)
post.creator = user
post.save!
create(:domain_post_file, post: post)
end
end
end
before do
# Mock file operations for migrated_files.txt
allow(File).to receive(:open).and_return(
instance_double(
File,
seek: nil,
readlines: [],
write: nil,
flush: nil,
close: nil,
),
)
end
it "processes users in descending order by follower count" do
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending)
# Should process all users' files
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
3,
).times
# Should log each user
log_output = log_sink.string
users.each do |user|
expect(log_output).to include("migrating posts for #{user.to_param}")
end
end
end
end
describe "post processing functionality" do
context "with a post that has a creator" do
let(:creator) do
create(
:domain_user_fa_user,
url_name: "testcreator",
user_user_follows_to_count: 50,
)
end
let(:post) { create(:domain_post_fa_post, title: "Test Post") }
let!(:post_files) do
2.times.map { create(:domain_post_file, post: post) }
end
before do
post.creator = creator
post.save!
end
it "logs creator information and processes post files" do
task.send(:migrate_post, post)
log_output = log_sink.string
expect(log_output).to include("testcreator (50)")
expect(log_output).to include("Test Post")
# Should call thumbnail job for each file
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
2,
).times
end
it "handles errors in post file processing gracefully" do
# Make the job raise an error
allow(Domain::PostFileThumbnailJob).to receive(:new).and_return(
instance_double(Domain::PostFileThumbnailJob).tap do |job|
allow(job).to receive(:perform).and_raise(
StandardError,
"Test error",
)
end,
)
expect { task.send(:migrate_post, post) }.not_to raise_error
log_output = log_sink.string
expect(log_output).to include("error: Test error")
end
it "stops processing files when interrupted" do
# Simulate interrupt after first file
allow(task).to receive(:interrupted?).and_return(false, true)
task.send(:migrate_post, post)
# Should only process one file due to interrupt
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
1,
).times
end
end
context "with a post that has no creator" do
let(:post) { create(:domain_post_e621_post) }
let!(:post_file) { create(:domain_post_file, post: post) }
it "logs no creator information" do
task.send(:migrate_post, post)
log_output = log_sink.string
expect(log_output).to include("(no creator)")
expect(log_output).to include(post.to_param)
# Should still process the file
expect(Domain::PostFileThumbnailJob).to have_received(:new).once
end
end
end
describe "post file processing" do
let(:post_file) { create(:domain_post_file) }
it "calls the PostFileThumbnailJob with correct parameters" do
job_instance = instance_double(Domain::PostFileThumbnailJob, perform: nil)
allow(Domain::PostFileThumbnailJob).to receive(:new).and_return(
job_instance,
)
task.send(:migrate_post_file, post_file)
expect(job_instance).to have_received(:perform).with({ post_file: })
end
it "handles job errors gracefully" do
job_instance = instance_double(Domain::PostFileThumbnailJob)
allow(Domain::PostFileThumbnailJob).to receive(:new).and_return(
job_instance,
)
allow(job_instance).to receive(:perform).and_raise(
StandardError,
"Job failed",
)
expect { task.send(:migrate_post_file, post_file) }.not_to raise_error
log_output = log_sink.string
expect(log_output).to include("error: Job failed")
end
end
describe "utility methods" do
describe "#create_progress_bar" do
it "creates a progress bar with the correct format" do
progress_bar = instance_double(ProgressBar::Base)
allow(ProgressBar).to receive(:create).and_return(progress_bar)
result = task.send(:create_progress_bar, 1000)
expect(ProgressBar).to have_received(:create).with(
total: 1000,
progress_mark: " ",
remainder_mark: " ",
format: "%B %c/%C (%r/sec) %J%% %a %E",
)
expect(result).to eq(progress_bar)
end
end
describe "#log" do
it "writes messages to the log sink" do
task.send(:log, "Test message")
expect(log_sink.string).to include("Test message")
end
end
end
describe "interrupt functionality" do
let!(:posts) do
3.times.map { |i| create(:domain_post_fa_post, fa_id: 400 + i) }
end
before do
# Add files to posts so they actually get processed
posts.each { |post| create(:domain_post_file, post: post) }
end
it "inherits interrupt functionality from InterruptableTask" do
expect(task).to respond_to(:interrupted?)
expect(task).to be_a(Tasks::InterruptableTask)
end
it "stops processing when interrupted in posts_descending mode" do
# Mock the interrupt to happen after first post
call_count = 0
allow(task).to receive(:interrupted?) do
call_count += 1
call_count > 1
end
task.run(mode: Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending)
# Should process fewer jobs due to interrupt
expect(Domain::PostFileThumbnailJob).to have_received(:new).at_most(
1,
).times
end
it "can be interrupted during user post processing" do
user = create(:domain_user_fa_user, url_name: "interruptuser")
posts.each do |post|
post.creator = user
post.save!
end
# Simulate interrupt after first post in batch
allow(task).to receive(:interrupted?).and_return(false, true)
task.run(
mode: Tasks::CreatePostFileFingerprintsTask::Mode::User,
user_param: "fa@interruptuser",
)
# Should process fewer jobs due to interrupt
expect(Domain::PostFileThumbnailJob).to have_received(:new).at_most(
1,
).times
end
end
describe "error handling" do
it "continues processing other files when one file fails" do
post = create(:domain_post_fa_post)
2.times { create(:domain_post_file, post: post) }
# Make first job call fail, second succeed
job1 = instance_double(Domain::PostFileThumbnailJob)
job2 = instance_double(Domain::PostFileThumbnailJob, perform: nil)
allow(Domain::PostFileThumbnailJob).to receive(:new).and_return(
job1,
job2,
)
allow(job1).to receive(:perform).and_raise(
StandardError,
"First job failed",
)
expect { task.send(:migrate_post, post) }.not_to raise_error
# Should still try to process both files
expect(Domain::PostFileThumbnailJob).to have_received(:new).exactly(
2,
).times
expect(job2).to have_received(:perform)
log_output = log_sink.string
expect(log_output).to include("error: First job failed")
end
end
describe "Mode enum" do
it "has the correct enum values" do
expect(Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending).to be_a(
described_class::Mode,
)
expect(Tasks::CreatePostFileFingerprintsTask::Mode::PostsDescending).to be_a(
described_class::Mode,
)
expect(Tasks::CreatePostFileFingerprintsTask::Mode::User).to be_a(described_class::Mode)
expect(Tasks::CreatePostFileFingerprintsTask::Mode::UsersDescending).to be_a(
described_class::Mode,
)
end
it "can be serialized and deserialized" do
mode = Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending
expect(mode.serialize).to eq("post_file_descending")
expect(described_class::Mode.deserialize("post_file_descending")).to eq(
mode,
)
end
end
end