Files
redux-scraper/app/lib/tasks/blob_file_migration_task.rb
2025-07-08 16:29:46 +00:00

136 lines
3.9 KiB
Ruby

# typed: strict
module Tasks
class BlobFileMigrationTask < InterruptableTask
extend T::Sig
ZERO_SHA256 = T.let("00" * 32, String)
PROGRESS_KEY = T.let("blob-file-migration-task", String)
sig { override.returns(String) }
def progress_key
PROGRESS_KEY
end
sig { params(batch_size: Integer, start_sha256: String).returns(Integer) }
def run(batch_size: 1000, start_sha256: ZERO_SHA256)
# Handle "last" to resume from saved progress
actual_start_sha256 = get_progress(start_sha256) || ZERO_SHA256
log "batch_size: #{batch_size}"
if actual_start_sha256 != ZERO_SHA256
log "starting from: #{actual_start_sha256}"
end
num_migrated = migrate_impl(batch_size, actual_start_sha256)
log "migrated #{num_migrated} total blob entries"
num_migrated
end
private
sig { params(batch_size: Integer, start_sha256: String).returns(Integer) }
def migrate_impl(batch_size, start_sha256)
num_migrated = 0
num_processed = 0
start_time = Time.now
last_migrated_sha256 = T.let(nil, T.nilable(String))
BlobEntry.in_batches(
of: batch_size,
start: HexUtil.hex2bin(start_sha256),
order: :asc,
use_ranges: true,
) do |batch|
# Check for interruption before processing each batch
break if @interrupt_monitor.interrupted?
batch_migrated = insert_blob_entries_batch(batch)
num_migrated += batch_migrated
num_processed += T.cast(batch.size, Integer)
rate = batch_migrated.to_f / (Time.now - start_time)
last_migrated_sha256 = batch.last&.sha256
log_progress(num_migrated, num_processed, rate, last_migrated_sha256)
if last_migrated_sha256
save_progress(HexUtil.bin2hex(last_migrated_sha256))
end
start_time = Time.now
# Check for interruption after processing each batch
break if @interrupt_monitor.interrupted?
end
num_migrated
end
sig { params(batch: ActiveRecord::Relation).returns(Integer) }
def insert_blob_entries_batch(batch)
num_migrated = 0
blob_entry_sha256s = batch.pluck(:sha256)
blob_file_sha256s =
BlobFile.where(sha256: blob_entry_sha256s).pluck(:sha256)
missing_sha256s = blob_entry_sha256s - blob_file_sha256s
BlobFile.transaction do
BlobEntry
.where(sha256: missing_sha256s)
.each do |blob_entry|
blob_file = BlobFile.initialize_from_blob_entry(blob_entry)
begin
blob_file.save!
num_migrated += 1
rescue => e
if sha256 = blob_file.sha256
sha256_hex = HexUtil.bin2hex(sha256)
log "error saving blob file #{sha256_hex}: #{e}"
else
log "error saving blob file: #{e}"
end
end
end
rescue => e
missing_sha256s_hex =
missing_sha256s.map { |sha256| HexUtil.bin2hex(sha256) }
log "error migrating blob entry: #{missing_sha256s_hex}"
raise e
end
num_migrated
end
sig do
params(
num_migrated: Integer,
num_processed: Integer,
rate: Float,
last_sha256: T.nilable(String),
).void
end
def log_progress(num_migrated, num_processed, rate, last_sha256)
last_hex =
case last_sha256
when String
HexUtil.bin2hex(last_sha256)
else
"nil"
end
log [
"migrated: #{format_number(num_migrated)}",
"processed: #{format_number(num_processed)}",
"rate: #{rate.round(1).to_s.rjust(5)}/second",
"last: #{last_hex}",
].join(" | ")
end
sig { params(number: Integer).returns(String) }
def format_number(number)
ActiveSupport::NumberHelper.number_to_delimited(number).rjust(8)
end
end
end