blob file migration task refactor
This commit is contained in:
36
.cursorrules
36
.cursorrules
@@ -1,5 +1,14 @@
|
||||
# How to use this codebase
|
||||
|
||||
- To typecheck the codebase, run `just tc`
|
||||
- To run tests, run `just test`
|
||||
- When modifying a spec file, run `bin/rspec <path_to_spec_file>` to run just that spec file.
|
||||
- After making changes, run `just tc` to ensure the codebase is typechecked.
|
||||
- After making changes, run `just test` to ensure the tests pass.
|
||||
- If specs are failing, then fix the failures, and rerun with `bin/rspec <path_to_spec_file>`.
|
||||
|
||||
# === BACKLOG.MD GUIDELINES START ===
|
||||
|
||||
# Instructions for the usage of Backlog.md CLI Tool
|
||||
|
||||
## 1. Source of Truth
|
||||
@@ -25,7 +34,7 @@ should explain the purpose and context of the task. Code snippets should be avoi
|
||||
List specific, measurable outcomes that define what means to reach the goal from the description. Use checkboxes (`- [ ]`) for tracking.
|
||||
When defining `## Acceptance Criteria` for a task, focus on **outcomes, behaviors, and verifiable requirements** rather
|
||||
than step-by-step implementation details.
|
||||
Acceptance Criteria (AC) define *what* conditions must be met for the task to be considered complete.
|
||||
Acceptance Criteria (AC) define _what_ conditions must be met for the task to be considered complete.
|
||||
They should be testable and confirm that the core purpose of the task is achieved.
|
||||
**Key Principles for Good ACs:**
|
||||
|
||||
@@ -34,10 +43,9 @@ They should be testable and confirm that the core purpose of the task is achieve
|
||||
- **Clear and Concise:** Unambiguous language.
|
||||
- **Complete:** Collectively, ACs should cover the scope of the task.
|
||||
- **User-Focused (where applicable):** Frame ACs from the perspective of the end-user or the system's external behavior.
|
||||
|
||||
- *Good Example:* "- [ ] User can successfully log in with valid credentials."
|
||||
- *Good Example:* "- [ ] System processes 1000 requests per second without errors."
|
||||
- *Bad Example (Implementation Step):* "- [ ] Add a new function `handleLogin()` in `auth.ts`."
|
||||
- _Good Example:_ "- [ ] User can successfully log in with valid credentials."
|
||||
- _Good Example:_ "- [ ] System processes 1000 requests per second without errors."
|
||||
- _Bad Example (Implementation Step):_ "- [ ] Add a new function `handleLogin()` in `auth.ts`."
|
||||
|
||||
### Task file
|
||||
|
||||
@@ -53,9 +61,9 @@ Once a task is created it will be stored in `backlog/tasks/` directory as a Mark
|
||||
previous
|
||||
tasks (id < current task id).
|
||||
|
||||
- When creating multiple tasks, ensure they are **independent** and they do not depend on future tasks.
|
||||
- When creating multiple tasks, ensure they are **independent** and they do not depend on future tasks.
|
||||
Example of wrong tasks splitting: task 1: "Add API endpoint for user data", task 2: "Define the user model and DB
|
||||
schema".
|
||||
schema".
|
||||
Example of correct tasks splitting: task 1: "Add system for handling API requests", task 2: "Add user model and DB
|
||||
schema", task 3: "Add API endpoint for user data".
|
||||
|
||||
@@ -146,12 +154,12 @@ A task is **Done** only when **ALL** of the following are complete:
|
||||
3. **Automated tests** (unit + integration) cover new logic.
|
||||
4. **Static analysis**: linter & formatter succeed.
|
||||
5. **Documentation**:
|
||||
- All relevant docs updated (any relevant README file, backlog/docs, backlog/decisions, etc.).
|
||||
- Task file **MUST** have an `## Implementation Notes` section added summarising:
|
||||
- Approach taken
|
||||
- Features implemented or modified
|
||||
- Technical decisions and trade-offs
|
||||
- Modified or added files
|
||||
- All relevant docs updated (any relevant README file, backlog/docs, backlog/decisions, etc.).
|
||||
- Task file **MUST** have an `## Implementation Notes` section added summarising:
|
||||
- Approach taken
|
||||
- Features implemented or modified
|
||||
- Technical decisions and trade-offs
|
||||
- Modified or added files
|
||||
6. **Review**: self review code.
|
||||
7. **Task hygiene**: status set to **Done** via CLI (`backlog task edit <id> -s Done`).
|
||||
8. **No regressions**: performance, security and licence checks green.
|
||||
@@ -161,7 +169,7 @@ A task is **Done** only when **ALL** of the following are complete:
|
||||
## 9. Handy CLI Commands
|
||||
|
||||
| Purpose | Command |
|
||||
|------------------|------------------------------------------------------------------------|
|
||||
| ---------------- | ---------------------------------------------------------------------- |
|
||||
| Create task | `backlog task create "Add OAuth"` |
|
||||
| Create with desc | `backlog task create "Feature" -d "Enables users to use this feature"` |
|
||||
| Create with AC | `backlog task create "Feature" --ac "Must work,Must be tested"` |
|
||||
|
||||
@@ -6,5 +6,5 @@ else
|
||||
end
|
||||
|
||||
if test $AGENT_MODE = true
|
||||
/usr/bin/bash
|
||||
# /usr/bin/bash -l
|
||||
end
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
# typed: strict
|
||||
class Domain::BlobFile::MigrateBlobEntryToBlobFile
|
||||
extend T::Sig
|
||||
|
||||
ZERO_SHA256 = T.let("00" * 32, String)
|
||||
|
||||
sig { params(log_sink: T.any(IO, StringIO)).void }
|
||||
def initialize(log_sink: $stderr)
|
||||
@log_sink = log_sink
|
||||
@channel = T.let(Concurrent::Channel.new(capacity: 8), Concurrent::Channel)
|
||||
end
|
||||
|
||||
sig { params(batch_size: Integer, start_sha256: String).void }
|
||||
def run(batch_size: 16, start_sha256: ZERO_SHA256)
|
||||
reader_thread =
|
||||
Thread.new do
|
||||
Thread.current.name = "reader"
|
||||
start_sha256_bin = HexUtil.hex2bin(start_sha256)
|
||||
BlobEntry
|
||||
.includes(:base)
|
||||
.in_batches(
|
||||
of: batch_size,
|
||||
start: start_sha256_bin,
|
||||
order: :asc,
|
||||
use_ranges: true,
|
||||
) do |batch|
|
||||
@channel.put(batch)
|
||||
# puts "#{Thread.current.name.rjust(10)} | put #{batch.size} batch"
|
||||
end
|
||||
@channel.put(nil)
|
||||
@channel.close
|
||||
end
|
||||
|
||||
writer_thread_1 =
|
||||
Thread.new do
|
||||
Thread.current.name = "writer 1"
|
||||
writer_thread_loop
|
||||
end
|
||||
writer_thread_2 =
|
||||
Thread.new do
|
||||
Thread.current.name = "writer 2"
|
||||
writer_thread_loop
|
||||
end
|
||||
|
||||
reader_thread.join
|
||||
writer_thread_1.join
|
||||
writer_thread_2.join
|
||||
end
|
||||
|
||||
sig { void }
|
||||
def writer_thread_loop
|
||||
num_migrated = 0
|
||||
num_processed = 0
|
||||
start_time = Time.now
|
||||
|
||||
while (batch = @channel.take)
|
||||
# puts "#{Thread.current.name.rjust(10)} | take #{batch.size} batch"
|
||||
batch_migrated = insert_blob_entries_batch(batch)
|
||||
num_migrated += batch_migrated
|
||||
num_processed += batch.size
|
||||
rate = batch_migrated.to_f / (Time.now - start_time)
|
||||
last = batch.last&.sha256
|
||||
last_hex = last ? HexUtil.bin2hex(last) : "nil"
|
||||
@log_sink.puts(
|
||||
[
|
||||
"[#{Thread.current.name.rjust(10)}]",
|
||||
"[migrated: #{n2d(num_migrated)}]",
|
||||
"[processed: #{n2d(num_processed)}]",
|
||||
"[rate: #{rate.round(1).to_s.rjust(5)}/second]",
|
||||
"[last: '#{last_hex}']",
|
||||
].join(" "),
|
||||
)
|
||||
|
||||
num_migrated += batch_migrated
|
||||
num_processed += batch.size
|
||||
start_time = Time.now
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
sig { params(batch: T.untyped).returns(Integer) }
|
||||
def insert_blob_entries_batch(batch)
|
||||
num_migrated = 0
|
||||
|
||||
BlobFile.transaction do
|
||||
blob_file_attributes =
|
||||
batch.map do |blob_entry|
|
||||
blob_file = BlobFile.initialize_from_blob_entry(blob_entry)
|
||||
blob_file.write_to_disk!
|
||||
blob_file.attributes.except("created_at", "updated_at")
|
||||
end
|
||||
|
||||
BlobFile.insert_all(blob_file_attributes, unique_by: %i[sha256])
|
||||
num_migrated += blob_file_attributes.size
|
||||
end
|
||||
num_migrated
|
||||
end
|
||||
|
||||
sig { params(n: Integer).returns(String) }
|
||||
def n2d(n)
|
||||
ActiveSupport::NumberHelper.number_to_delimited(n).rjust(8)
|
||||
end
|
||||
end
|
||||
4
app/lib/tasks.rb
Normal file
4
app/lib/tasks.rb
Normal file
@@ -0,0 +1,4 @@
|
||||
# typed: strict
|
||||
|
||||
module Tasks
|
||||
end
|
||||
184
app/lib/tasks/blob_file_migration_task.rb
Normal file
184
app/lib/tasks/blob_file_migration_task.rb
Normal file
@@ -0,0 +1,184 @@
|
||||
# typed: strict
|
||||
module Tasks
|
||||
class BlobFileMigrationTask
|
||||
extend T::Sig
|
||||
|
||||
ZERO_SHA256 = T.let("00" * 32, String)
|
||||
PROGRESS_KEY = T.let("blob-file-migration-task", String)
|
||||
|
||||
sig { params(log_sink: T.any(IO, StringIO)).void }
|
||||
def initialize(log_sink: $stderr)
|
||||
@log_sink = log_sink
|
||||
@interrupted = T.let(false, T::Boolean)
|
||||
setup_signal_handlers
|
||||
end
|
||||
|
||||
sig { params(batch_size: Integer, start_sha256: String).returns(Integer) }
|
||||
def run(batch_size: 1000, start_sha256: ZERO_SHA256)
|
||||
# Handle "last" to resume from saved progress
|
||||
actual_start_sha256 = resolve_start_sha256(start_sha256)
|
||||
|
||||
@log_sink.puts "batch_size: #{batch_size}"
|
||||
if actual_start_sha256 != HexUtil.hex2bin(ZERO_SHA256)
|
||||
@log_sink.puts "starting from: #{HexUtil.bin2hex(actual_start_sha256)}"
|
||||
end
|
||||
@log_sink.puts "Press Ctrl+C to cleanly interrupt and save progress"
|
||||
num_migrated = migrate_impl(batch_size, actual_start_sha256)
|
||||
@log_sink.puts "migrated #{num_migrated} total blob entries"
|
||||
num_migrated
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
sig { returns(T::Boolean) }
|
||||
def check_interrupted
|
||||
if @interrupted
|
||||
@log_sink.puts "Migration interrupted by user. Progress has been saved."
|
||||
end
|
||||
@interrupted
|
||||
end
|
||||
|
||||
sig { void }
|
||||
def setup_signal_handlers
|
||||
# Handle Ctrl+C (SIGINT) for clean shutdown
|
||||
Signal.trap("INT") do
|
||||
@log_sink.puts "\nReceived interrupt signal. Finishing current batch and saving progress..."
|
||||
@interrupted = true
|
||||
Signal.trap("INT", "DEFAULT")
|
||||
end
|
||||
|
||||
# Handle SIGTERM for clean shutdown (useful for systemd/docker)
|
||||
Signal.trap("TERM") do
|
||||
@log_sink.puts "\nReceived termination signal. Finishing current batch and saving progress..."
|
||||
@interrupted = true
|
||||
Signal.trap("TERM", "DEFAULT")
|
||||
end
|
||||
end
|
||||
|
||||
sig { params(start_sha256: String).returns(String) }
|
||||
def resolve_start_sha256(start_sha256)
|
||||
if start_sha256 == "last"
|
||||
if progress_value = GlobalState.get(PROGRESS_KEY)
|
||||
@log_sink.puts "resuming from saved progress: #{progress_value}"
|
||||
HexUtil.hex2bin(progress_value)
|
||||
else
|
||||
@log_sink.puts "no saved progress found, starting from beginning"
|
||||
HexUtil.hex2bin(ZERO_SHA256)
|
||||
end
|
||||
else
|
||||
HexUtil.hex2bin(start_sha256)
|
||||
end
|
||||
end
|
||||
|
||||
sig { params(batch_size: Integer, start_sha256: String).returns(Integer) }
|
||||
def migrate_impl(batch_size, start_sha256)
|
||||
num_migrated = 0
|
||||
num_processed = 0
|
||||
start_time = Time.now
|
||||
last_migrated_sha256 = T.let(nil, T.nilable(String))
|
||||
|
||||
BlobEntry.in_batches(
|
||||
of: batch_size,
|
||||
start: start_sha256,
|
||||
order: :asc,
|
||||
use_ranges: true,
|
||||
) do |batch|
|
||||
# Check for interruption before processing each batch
|
||||
break if check_interrupted
|
||||
|
||||
batch_migrated = insert_blob_entries_batch(batch)
|
||||
num_migrated += batch_migrated
|
||||
num_processed += T.cast(batch.size, Integer)
|
||||
rate = batch_migrated.to_f / (Time.now - start_time)
|
||||
|
||||
last_migrated_sha256 = batch.last&.sha256
|
||||
log_progress(num_migrated, num_processed, rate, last_migrated_sha256)
|
||||
|
||||
# Save progress after each batch if we have migrated files
|
||||
save_progress(last_migrated_sha256) if last_migrated_sha256
|
||||
|
||||
start_time = Time.now
|
||||
|
||||
# Check for interruption after processing each batch
|
||||
break if check_interrupted
|
||||
end
|
||||
|
||||
num_migrated
|
||||
end
|
||||
|
||||
sig { params(batch: ActiveRecord::Relation).returns(Integer) }
|
||||
def insert_blob_entries_batch(batch)
|
||||
num_migrated = 0
|
||||
|
||||
blob_entry_sha256s = batch.pluck(:sha256)
|
||||
blob_file_sha256s =
|
||||
BlobFile.where(sha256: blob_entry_sha256s).pluck(:sha256)
|
||||
missing_sha256s = blob_entry_sha256s - blob_file_sha256s
|
||||
|
||||
BlobFile.transaction do
|
||||
BlobEntry
|
||||
.where(sha256: missing_sha256s)
|
||||
.each do |blob_entry|
|
||||
blob_file = BlobFile.initialize_from_blob_entry(blob_entry)
|
||||
|
||||
begin
|
||||
blob_file.save!
|
||||
num_migrated += 1
|
||||
rescue => e
|
||||
if sha256 = blob_file.sha256
|
||||
sha256_hex = HexUtil.bin2hex(sha256)
|
||||
@log_sink.puts "error saving blob file #{sha256_hex}: #{e}"
|
||||
else
|
||||
@log_sink.puts "error saving blob file: #{e}"
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
missing_sha256s_hex =
|
||||
missing_sha256s.map { |sha256| HexUtil.bin2hex(sha256) }
|
||||
@log_sink.puts "error migrating blob entry: #{missing_sha256s_hex}"
|
||||
raise e
|
||||
end
|
||||
|
||||
num_migrated
|
||||
end
|
||||
|
||||
sig { params(sha256: String).void }
|
||||
def save_progress(sha256)
|
||||
sha256_hex = HexUtil.bin2hex(sha256)
|
||||
GlobalState.set(PROGRESS_KEY, sha256_hex)
|
||||
rescue => e
|
||||
@log_sink.puts "error saving progress: #{e}"
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
num_migrated: Integer,
|
||||
num_processed: Integer,
|
||||
rate: Float,
|
||||
last_sha256: T.nilable(String),
|
||||
).void
|
||||
end
|
||||
def log_progress(num_migrated, num_processed, rate, last_sha256)
|
||||
last_hex =
|
||||
case last_sha256
|
||||
when String
|
||||
HexUtil.bin2hex(last_sha256)
|
||||
else
|
||||
"nil"
|
||||
end
|
||||
|
||||
@log_sink.puts [
|
||||
"migrated: #{format_number(num_migrated)}",
|
||||
"processed: #{format_number(num_processed)}",
|
||||
"rate: #{rate.round(1).to_s.rjust(5)}/second",
|
||||
"last: #{last_hex}",
|
||||
].join(" | ")
|
||||
end
|
||||
|
||||
sig { params(number: Integer).returns(String) }
|
||||
def format_number(number)
|
||||
ActiveSupport::NumberHelper.number_to_delimited(number).rjust(8)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,6 +1,7 @@
|
||||
# typed: strict
|
||||
ID_CONSTRAINT = %r{([^/]+)}
|
||||
Rails.application.routes.draw do
|
||||
ID_CONSTRAINT = T.let(%r{([^/]+)}, Regexp)
|
||||
|
||||
mount ActionCable.server => "/cable"
|
||||
|
||||
# Authentication routes
|
||||
|
||||
2
justfile
2
justfile
@@ -20,7 +20,7 @@ test:
|
||||
bundle exec srb tc
|
||||
rm -rf tmp/blob_files_test/thumbnails
|
||||
RAILS_ENV=test bin/rspec ./spec/lib/has_bulk_enqueue_jobs_spec.rb
|
||||
RAILS_ENV=test bin/parallel_rspec --exclude-pattern "./spec/lib/has_bulk_enqueue_jobs_spec.rb"
|
||||
RAILS_ENV=test bin/parallel_rspec -n4 --exclude-pattern "spec/lib/has_bulk_enqueue_jobs_spec.rb"
|
||||
|
||||
tc *args:
|
||||
bundle exec srb tc {{args}}
|
||||
|
||||
@@ -1,118 +1,12 @@
|
||||
require "find"
|
||||
|
||||
namespace :blob_file do
|
||||
desc "migrate blob entries in parallel"
|
||||
task migrate_blob_entries_parallel: %i[environment] do
|
||||
batch_size = ENV["batch_size"]&.to_i || 16
|
||||
start_at = ENV["start_at"] || "0" * 64
|
||||
|
||||
migrator = Domain::BlobFile::MigrateBlobEntryToBlobFile.new
|
||||
migrator.run(batch_size: batch_size, start_sha256: start_at)
|
||||
end
|
||||
|
||||
desc "migrate blob files to the new format"
|
||||
desc "migrate blob entries to blob files"
|
||||
task migrate_blob_entries: %i[environment] do
|
||||
batch_size = ENV["batch_size"]&.to_i || 1000
|
||||
profile = ENV["profile"] == "true" || false
|
||||
start_at = ENV["start_at"] || "0" * 64
|
||||
num_migrated = 0
|
||||
puts "batch_size: #{batch_size}"
|
||||
start_at = ENV["start_at"] || Tasks::BlobFileMigrationTask::ZERO_SHA256
|
||||
|
||||
RubyProf.start if profile
|
||||
|
||||
def migrate_impl(batch_size, start_at)
|
||||
def n2d(n)
|
||||
ActiveSupport::NumberHelper.number_to_delimited(n).rjust(8)
|
||||
end
|
||||
|
||||
num_migrated = 0
|
||||
num_processed = 0
|
||||
start_time = Time.now
|
||||
BlobEntry.in_batches(
|
||||
of: batch_size,
|
||||
start: HexUtil.hex2bin(start_at),
|
||||
order: :asc,
|
||||
use_ranges: true,
|
||||
) do |batch|
|
||||
batch_migrated = insert_blob_entries_batch(batch)
|
||||
num_migrated += batch_migrated
|
||||
num_processed += batch.size
|
||||
rate = batch_migrated.to_f / (Time.now - start_time)
|
||||
puts [
|
||||
"[migrated: #{n2d(num_migrated)}]",
|
||||
"[processed: #{n2d(num_processed)}]",
|
||||
"[rate: #{rate.round(1).to_s.rjust(5)}/second]",
|
||||
"[last: '#{HexUtil.bin2hex(batch.last.sha256)}']",
|
||||
].join(" ")
|
||||
start_time = Time.now
|
||||
end
|
||||
num_migrated
|
||||
end
|
||||
|
||||
def insert_blob_entries_batch(batch)
|
||||
num_migrated = 0
|
||||
|
||||
blob_entry_sha256s = batch.pluck(:sha256)
|
||||
blob_file_sha256s =
|
||||
BlobFile.where(sha256: blob_entry_sha256s).pluck(:sha256)
|
||||
missing_sha256s = blob_entry_sha256s - blob_file_sha256s
|
||||
|
||||
BlobFile.transaction do
|
||||
BlobEntry
|
||||
.where(sha256: missing_sha256s)
|
||||
.each do |blob_entry|
|
||||
blob_file = BlobFile.initialize_from_blob_entry(blob_entry)
|
||||
sha256_hex = HexUtil.bin2hex(blob_file.sha256)
|
||||
begin
|
||||
blob_file.save!
|
||||
num_migrated += 1
|
||||
rescue => e
|
||||
puts "error saving blob file #{sha256_hex}: #{e}"
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
puts "error migrating blob entry: #{missing_sha256s.map { |sha256| HexUtil.bin2hex(sha256) }}"
|
||||
raise e
|
||||
end
|
||||
num_migrated
|
||||
end
|
||||
|
||||
def start_thread(batch_size, start_at)
|
||||
Thread.new { migrate_impl(batch_size, start_at) }
|
||||
end
|
||||
|
||||
num_threads = 1
|
||||
# skip = ((0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF) / num_threads) + 1
|
||||
num_migrated =
|
||||
(0...num_threads)
|
||||
.map do |i|
|
||||
# partition the entire sha256 space into num_threads chunks
|
||||
# each chunk is 256 / num_threads in size
|
||||
# start_at = (skip * i).to_s(16).rjust(32, "0")
|
||||
# stop_at = ((skip * (i + 1)) - 1).to_s(16).rjust(32, "0")
|
||||
puts "migrate #{start_at}"
|
||||
start_thread(batch_size, start_at)
|
||||
end
|
||||
.map(&:value)
|
||||
.sum
|
||||
|
||||
begin
|
||||
base = "profiler/blob_file_migrate"
|
||||
FileUtils.mkdir_p(base) unless File.exist?(base)
|
||||
result = RubyProf.stop
|
||||
File.open("#{base}/profile.txt", "w") do |f|
|
||||
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.html", "w") do |f|
|
||||
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.rubyprof", "w") do |f|
|
||||
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
puts "wrote profile to #{base}"
|
||||
end if profile
|
||||
|
||||
puts "migrated #{num_migrated} total blob entries"
|
||||
Tasks::BlobFileMigrationTask.new.run(batch_size:, start_sha256: start_at)
|
||||
end
|
||||
|
||||
task verify_fs_files: :environment do
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "./app/lib/has_color_logger"
|
||||
require "./lib/tasks"
|
||||
require "./app/lib/domain"
|
||||
require "./app/lib/domain/e621/tag_util"
|
||||
require "./spec/helpers/debug_helpers"
|
||||
|
||||
@@ -1,38 +0,0 @@
|
||||
# typed: false
|
||||
require "rails_helper"
|
||||
|
||||
RSpec.describe Domain::BlobFile::MigrateBlobEntryToBlobFile do
|
||||
let(:log_sink) { StringIO.new }
|
||||
let(:migrator) { described_class.new(log_sink:) }
|
||||
|
||||
describe "#run" do
|
||||
context "when migrating a single BlobEntry" do
|
||||
it "skips already migrated entries" do
|
||||
# First migration
|
||||
migrator.run
|
||||
|
||||
# Second migration attempt
|
||||
expect { migrator.run }.not_to change(BlobFile, :count)
|
||||
end
|
||||
end
|
||||
|
||||
context "when migrating multiple BlobEntries" do
|
||||
let!(:blob_entries) { 3.times.map { |i| create(:blob_entry) } }
|
||||
|
||||
it "can migrate with a batch size of 1" do
|
||||
migrator.run(batch_size: 1)
|
||||
expect(BlobFile.count).to eq(3)
|
||||
end
|
||||
|
||||
it "migrates all entries in batches" do
|
||||
expect { migrator.run }.to change(BlobFile, :count).by(3)
|
||||
blob_entries.each do |blob_entry|
|
||||
blob_file = BlobFile.find_by(sha256: blob_entry.sha256)
|
||||
expect(blob_file).to be_present
|
||||
expect(blob_file.content_bytes).to eq(blob_entry.contents)
|
||||
expect(File.exist?(blob_file.absolute_file_path)).to be true
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
421
spec/lib/tasks/blob_file_migration_task_spec.rb
Normal file
421
spec/lib/tasks/blob_file_migration_task_spec.rb
Normal file
@@ -0,0 +1,421 @@
|
||||
# typed: false
|
||||
require "rails_helper"
|
||||
|
||||
RSpec.describe Tasks::BlobFileMigrationTask do
|
||||
let(:log_sink) { StringIO.new }
|
||||
let(:migrator) { described_class.new(log_sink: log_sink) }
|
||||
|
||||
describe "#run" do
|
||||
context "with no blob entries" do
|
||||
it "runs migration with default settings and logs correctly" do
|
||||
result = migrator.run
|
||||
|
||||
expect(result).to eq(0)
|
||||
expect(log_sink.string).to include("batch_size: 1000")
|
||||
expect(log_sink.string).to include("migrated 0 total blob entries")
|
||||
end
|
||||
end
|
||||
|
||||
context "with custom batch size" do
|
||||
it "uses the specified batch size in logs" do
|
||||
result = migrator.run(batch_size: 500)
|
||||
|
||||
expect(result).to eq(0)
|
||||
expect(log_sink.string).to include("batch_size: 500")
|
||||
end
|
||||
end
|
||||
|
||||
context "with custom start SHA256" do
|
||||
it "accepts custom start SHA256 parameter" do
|
||||
start_sha256 = "ff" * 32
|
||||
|
||||
result = migrator.run(start_sha256: start_sha256)
|
||||
|
||||
expect(result).to eq(0)
|
||||
expect(log_sink.string).to include("batch_size: 1000")
|
||||
expect(log_sink.string).to include("migrated 0 total blob entries")
|
||||
end
|
||||
end
|
||||
|
||||
context "with start_sha256='last' and no saved progress" do
|
||||
it "starts from beginning when no progress is saved" do
|
||||
result = migrator.run(start_sha256: "last")
|
||||
|
||||
expect(result).to eq(0)
|
||||
expect(log_sink.string).to include(
|
||||
"no saved progress found, starting from beginning",
|
||||
)
|
||||
expect(log_sink.string).to include("migrated 0 total blob entries")
|
||||
end
|
||||
end
|
||||
|
||||
context "with start_sha256='last' and existing progress" do
|
||||
let!(:blob_entries) do
|
||||
5.times.map { |i| create(:blob_entry, content: "progress #{i}") }
|
||||
end
|
||||
|
||||
before do
|
||||
# Simulate saved progress - use the SHA256 of the 3rd blob entry
|
||||
progress_sha256_hex = HexUtil.bin2hex(blob_entries[2].sha256)
|
||||
GlobalState.set("blob-file-migration-task", progress_sha256_hex)
|
||||
end
|
||||
|
||||
after do
|
||||
# Clean up the GlobalState
|
||||
GlobalState.find_by(key: "blob-file-migration-task")&.destroy
|
||||
end
|
||||
|
||||
it "resumes from saved progress" do
|
||||
result = migrator.run(start_sha256: "last")
|
||||
|
||||
# Should skip the first 3 entries and migrate the remaining 2
|
||||
expect(result).to be >= 0
|
||||
expected_progress = HexUtil.bin2hex(blob_entries[2].sha256)
|
||||
expect(log_sink.string).to include(
|
||||
"resuming from saved progress: #{expected_progress}",
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
context "with existing blob entries that need migration" do
|
||||
let!(:blob_entries) do
|
||||
3.times.map { |i| create(:blob_entry, content: "content #{i}") }
|
||||
end
|
||||
|
||||
after do
|
||||
# Clean up any saved progress
|
||||
GlobalState.find_by(key: "blob-file-migration-task")&.destroy
|
||||
end
|
||||
|
||||
it "migrates all blob entries to blob files" do
|
||||
expect(BlobFile.count).to eq(0)
|
||||
|
||||
result = migrator.run
|
||||
|
||||
expect(result).to eq(3)
|
||||
expect(BlobFile.count).to eq(3)
|
||||
expect(log_sink.string).to include("migrated 3 total blob entries")
|
||||
end
|
||||
|
||||
it "creates blob files with correct attributes" do
|
||||
migrator.run
|
||||
|
||||
blob_entries.each do |blob_entry|
|
||||
blob_file = BlobFile.find_by(sha256: blob_entry.sha256)
|
||||
expect(blob_file).to be_present
|
||||
expect(blob_file.content_type).to eq(blob_entry.content_type)
|
||||
expect(blob_file.size_bytes).to eq(blob_entry.size)
|
||||
expect(blob_file.content_bytes).to eq(blob_entry.contents)
|
||||
end
|
||||
end
|
||||
|
||||
it "saves progress to GlobalState after migration" do
|
||||
migrator.run
|
||||
|
||||
# Check that progress was saved
|
||||
saved_progress = GlobalState.get("blob-file-migration-task")
|
||||
expect(saved_progress).to be_present
|
||||
expect(saved_progress.length).to eq(64) # Hex SHA256 length
|
||||
|
||||
# Should be the SHA256 of the last migrated entry
|
||||
last_migrated_sha256_hex = HexUtil.bin2hex(BlobFile.last.sha256)
|
||||
expect(saved_progress).to eq(last_migrated_sha256_hex)
|
||||
end
|
||||
|
||||
it "logs progress during migration" do
|
||||
migrator.run(batch_size: 2)
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("migrated:")
|
||||
expect(log_output).to include("processed:")
|
||||
expect(log_output).to include("rate:")
|
||||
end
|
||||
end
|
||||
|
||||
context "with existing blob entries that already have blob files" do
|
||||
let!(:blob_entry) { create(:blob_entry, content: "existing content") }
|
||||
let!(:existing_blob_file) do
|
||||
create(
|
||||
:blob_file,
|
||||
contents: blob_entry.contents,
|
||||
content_type: blob_entry.content_type,
|
||||
)
|
||||
end
|
||||
|
||||
after { GlobalState.find_by(key: "blob-file-migration-task")&.destroy }
|
||||
|
||||
it "does not create duplicate blob files" do
|
||||
expect(BlobFile.count).to eq(1)
|
||||
|
||||
result = migrator.run
|
||||
|
||||
expect(result).to eq(0) # No new migrations
|
||||
expect(BlobFile.count).to eq(1) # Same count
|
||||
expect(log_sink.string).to include("migrated 0 total blob entries")
|
||||
end
|
||||
|
||||
it "saves the last migrated sha256, even if it has already been migrated" do
|
||||
migrator.run
|
||||
final_progress = GlobalState.get("blob-file-migration-task")
|
||||
expect(final_progress).to eq(HexUtil.bin2hex(blob_entry.sha256))
|
||||
end
|
||||
end
|
||||
|
||||
context "with mixed scenarios" do
|
||||
let!(:blob_entry_needing_migration) do
|
||||
create(:blob_entry, content: "needs migration")
|
||||
end
|
||||
let!(:blob_entry_with_file) { create(:blob_entry, content: "has file") }
|
||||
let!(:existing_blob_file) do
|
||||
create(
|
||||
:blob_file,
|
||||
contents: blob_entry_with_file.contents,
|
||||
content_type: blob_entry_with_file.content_type,
|
||||
)
|
||||
end
|
||||
|
||||
after { GlobalState.find_by(key: "blob-file-migration-task")&.destroy }
|
||||
|
||||
it "only migrates entries that need migration" do
|
||||
expect(BlobFile.count).to eq(1)
|
||||
|
||||
result = migrator.run
|
||||
|
||||
expect(result).to eq(1) # Only one new migration
|
||||
expect(BlobFile.count).to eq(2) # Now has both
|
||||
|
||||
# Verify the new blob file was created correctly
|
||||
new_blob_file =
|
||||
BlobFile.find_by(sha256: blob_entry_needing_migration.sha256)
|
||||
expect(new_blob_file).to be_present
|
||||
expect(new_blob_file.content_bytes).to eq(
|
||||
blob_entry_needing_migration.contents,
|
||||
)
|
||||
end
|
||||
|
||||
it "saves progress for the actually migrated entry" do
|
||||
migrator.run
|
||||
|
||||
saved_progress = GlobalState.get("blob-file-migration-task")
|
||||
expected_progress = HexUtil.bin2hex(blob_entry_needing_migration.sha256)
|
||||
expect(saved_progress).to eq(expected_progress)
|
||||
end
|
||||
end
|
||||
|
||||
context "with different batch sizes" do
|
||||
let!(:blob_entries) do
|
||||
5.times.map { |i| create(:blob_entry, content: "batch content #{i}") }
|
||||
end
|
||||
|
||||
after { GlobalState.find_by(key: "blob-file-migration-task")&.destroy }
|
||||
|
||||
it "handles batch size of 1" do
|
||||
result = migrator.run(batch_size: 1)
|
||||
|
||||
expect(result).to eq(5)
|
||||
expect(BlobFile.count).to eq(5)
|
||||
end
|
||||
|
||||
it "handles batch size larger than total entries" do
|
||||
result = migrator.run(batch_size: 100)
|
||||
|
||||
expect(result).to eq(5)
|
||||
expect(BlobFile.count).to eq(5)
|
||||
end
|
||||
|
||||
it "handles custom batch size smaller than total entries" do
|
||||
result = migrator.run(batch_size: 2)
|
||||
|
||||
expect(result).to eq(5)
|
||||
expect(BlobFile.count).to eq(5)
|
||||
|
||||
# Verify all entries were migrated correctly
|
||||
blob_entries.each do |blob_entry|
|
||||
blob_file = BlobFile.find_by(sha256: blob_entry.sha256)
|
||||
expect(blob_file).to be_present
|
||||
end
|
||||
end
|
||||
|
||||
it "saves progress multiple times with small batch size" do
|
||||
migrator.run(batch_size: 2)
|
||||
|
||||
# Should see multiple progress saves in the log
|
||||
expect(log_sink.string.scan(/migrated:/).count).to be > 1
|
||||
end
|
||||
end
|
||||
|
||||
context "with idempotent runs" do
|
||||
let!(:blob_entries) do
|
||||
3.times.map { |i| create(:blob_entry, content: "idempotent #{i}") }
|
||||
end
|
||||
|
||||
after { GlobalState.find_by(key: "blob-file-migration-task")&.destroy }
|
||||
|
||||
it "handles duplicate runs gracefully" do
|
||||
first_result = migrator.run
|
||||
expect(first_result).to eq(3)
|
||||
expect(BlobFile.count).to eq(3)
|
||||
|
||||
# Reset log sink for second run
|
||||
log_sink.truncate(0)
|
||||
log_sink.rewind
|
||||
|
||||
second_result = migrator.run
|
||||
expect(second_result).to eq(0) # No new migrations
|
||||
expect(BlobFile.count).to eq(3) # Same count
|
||||
|
||||
expect(log_sink.string).to include("migrated 0 total blob entries")
|
||||
end
|
||||
end
|
||||
|
||||
context "with large datasets" do
|
||||
let!(:blob_entries) do
|
||||
10.times.map { |i| create(:blob_entry, content: "large dataset #{i}") }
|
||||
end
|
||||
|
||||
after { GlobalState.find_by(key: "blob-file-migration-task")&.destroy }
|
||||
|
||||
it "successfully migrates large datasets in batches" do
|
||||
result = migrator.run(batch_size: 3)
|
||||
|
||||
expect(result).to eq(10)
|
||||
expect(BlobFile.count).to eq(10)
|
||||
|
||||
# Verify progress logging occurred multiple times
|
||||
log_output = log_sink.string
|
||||
expect(log_output.scan(/migrated:/).count).to be > 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#log_progress" do
|
||||
it "logs migration progress with numbers" do
|
||||
migrator.send(:log_progress, 100, 150, 25.5, "abc123")
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("migrated:")
|
||||
expect(log_output).to include("processed:")
|
||||
expect(log_output).to include("rate:")
|
||||
expect(log_output).to include("last:")
|
||||
end
|
||||
|
||||
it "handles nil last_sha256" do
|
||||
migrator.send(:log_progress, 100, 150, 25.5, nil)
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("last: nil")
|
||||
end
|
||||
|
||||
it "includes formatted numbers" do
|
||||
migrator.send(:log_progress, 1000, 2500, 10.5, nil)
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("1,000")
|
||||
expect(log_output).to include("2,500")
|
||||
expect(log_output).to include("10.5")
|
||||
end
|
||||
end
|
||||
|
||||
describe "#format_number" do
|
||||
it "formats numbers with delimiters" do
|
||||
result = migrator.send(:format_number, 1_234_567)
|
||||
|
||||
expect(result).to include("1,234,567")
|
||||
end
|
||||
|
||||
it "right-justifies numbers" do
|
||||
result = migrator.send(:format_number, 123)
|
||||
|
||||
expect(result.length).to be >= 8
|
||||
end
|
||||
|
||||
it "handles zero" do
|
||||
result = migrator.send(:format_number, 0)
|
||||
|
||||
expect(result).to include("0")
|
||||
expect(result.length).to be >= 8
|
||||
end
|
||||
|
||||
it "handles large numbers" do
|
||||
result = migrator.send(:format_number, 999_999_999)
|
||||
|
||||
expect(result).to include("999,999,999")
|
||||
end
|
||||
end
|
||||
|
||||
describe "constructor and basic functionality" do
|
||||
it "initializes with log_sink" do
|
||||
expect(migrator).to be_a(Tasks::BlobFileMigrationTask)
|
||||
end
|
||||
|
||||
it "uses provided log_sink" do
|
||||
custom_sink = StringIO.new
|
||||
custom_migrator = described_class.new(log_sink: custom_sink)
|
||||
|
||||
custom_migrator.run
|
||||
|
||||
expect(custom_sink.string).to include("batch_size:")
|
||||
end
|
||||
|
||||
it "defaults to $stderr when no log_sink provided" do
|
||||
expect { described_class.new }.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
describe "ZERO_SHA256 constant" do
|
||||
it "defines the zero SHA256 constant" do
|
||||
expect(Tasks::BlobFileMigrationTask::ZERO_SHA256).to eq("00" * 32)
|
||||
end
|
||||
end
|
||||
|
||||
describe "PROGRESS_KEY constant" do
|
||||
it "defines the progress key constant" do
|
||||
expect(Tasks::BlobFileMigrationTask::PROGRESS_KEY).to eq(
|
||||
"blob-file-migration-task",
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
describe "integration scenarios" do
|
||||
it "handles multiple sequential runs without errors" do
|
||||
first_result = migrator.run(batch_size: 100)
|
||||
expect(first_result).to eq(0)
|
||||
|
||||
# Reset log sink for second run
|
||||
log_sink.truncate(0)
|
||||
log_sink.rewind
|
||||
|
||||
second_result = migrator.run(batch_size: 200)
|
||||
expect(second_result).to eq(0)
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("batch_size: 200")
|
||||
end
|
||||
|
||||
it "properly formats logs with different parameters" do
|
||||
migrator.run(batch_size: 42, start_sha256: "aa" * 32)
|
||||
|
||||
log_output = log_sink.string
|
||||
expect(log_output).to include("batch_size: 42")
|
||||
expect(log_output).to include("migrated 0 total blob entries")
|
||||
end
|
||||
end
|
||||
|
||||
describe "parameter validation" do
|
||||
it "accepts valid batch_size parameter" do
|
||||
expect { migrator.run(batch_size: 1) }.not_to raise_error
|
||||
expect { migrator.run(batch_size: 1000) }.not_to raise_error
|
||||
expect { migrator.run(batch_size: 10_000) }.not_to raise_error
|
||||
end
|
||||
|
||||
it "accepts valid start_sha256 parameter" do
|
||||
expect { migrator.run(start_sha256: "00" * 32) }.not_to raise_error
|
||||
expect { migrator.run(start_sha256: "ff" * 32) }.not_to raise_error
|
||||
expect { migrator.run(start_sha256: "ab" * 32) }.not_to raise_error
|
||||
end
|
||||
|
||||
it "accepts 'last' as start_sha256 parameter" do
|
||||
expect { migrator.run(start_sha256: "last") }.not_to raise_error
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user