blob file migrator speedup
This commit is contained in:
27
.devcontainer/fish-shell-conf-d/utils.fish
Executable file
27
.devcontainer/fish-shell-conf-d/utils.fish
Executable file
@@ -0,0 +1,27 @@
|
||||
function blob-files-dir
|
||||
if [ $RAILS_ENV = "production" ]
|
||||
echo "/mnt/blob_files_production/v1"
|
||||
return 0
|
||||
else if [ $RAILS_ENV = "development" ]
|
||||
echo "/mnt/blob_files_development/v1"
|
||||
return 0
|
||||
else
|
||||
echo "unknown RAILS_ENV: $RAILS_ENV" >&2
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
function blob-file-path
|
||||
set -l file_name $argv[1]
|
||||
set -l prefix (blob-files-dir || return 1)
|
||||
set -l p0 (string sub -l 2 $file_name)
|
||||
set -l p1 (string sub -s 3 -l 2 $file_name)
|
||||
set -l p2 (string sub -s 5 -l 1 $file_name)
|
||||
printf "%s/%s/%s/%s/%s" $prefix $p0 $p1 $p2 $file_name
|
||||
end
|
||||
|
||||
function blob-files-stats
|
||||
set -l files_dir (blob-files-dir || return 1)
|
||||
printf "apparent size: %s\n" (du -sh --apparent-size $files_dir)
|
||||
printf "actual size: %s\n" (du -sh $files_dir)
|
||||
end
|
||||
@@ -108,7 +108,16 @@ class Scraper::HttpClient
|
||||
}
|
||||
)
|
||||
|
||||
# double write blob_file while migrating
|
||||
response_blob_file =
|
||||
BlobFile.find_or_initialize_from_blob_entry(response_blob_entry)
|
||||
|
||||
log_entry.save!
|
||||
begin
|
||||
response_blob_file.save unless response_blob_file.persisted?
|
||||
rescue => e
|
||||
puts "error saving blob file #{response_blob_file.sha256_hex}: #{e}"
|
||||
end
|
||||
rescue StandardError
|
||||
retries += 1
|
||||
retry if retries < 2
|
||||
|
||||
@@ -4,6 +4,8 @@ class BlobFile < ReduxApplicationRecord
|
||||
ROOT_DIR =
|
||||
Rails.application.config_for("blob_file_location") ||
|
||||
raise("no blob_file_location config")
|
||||
TMP_DIR = File.join(ROOT_DIR, "tmp-files")
|
||||
|
||||
FILE_PATH_PATTERNS = { v1: [2, 2, 1] }
|
||||
# consider sha256 `e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855`
|
||||
# breaking it into 3 parts, becomes:
|
||||
@@ -35,7 +37,11 @@ class BlobFile < ReduxApplicationRecord
|
||||
if not self.persisted?
|
||||
unless File.exist?(self.absolute_file_path)
|
||||
FileUtils.mkdir_p(File.dirname(self.absolute_file_path))
|
||||
File.binwrite(self.absolute_file_path, self.content_bytes)
|
||||
FileUtils.mkdir_p(TMP_DIR)
|
||||
|
||||
tmp_file_path = File.join(TMP_DIR, "blob-file-#{SecureRandom.uuid}")
|
||||
File.binwrite(tmp_file_path, self.content_bytes)
|
||||
FileUtils.mv(tmp_file_path, self.absolute_file_path)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -45,6 +51,7 @@ class BlobFile < ReduxApplicationRecord
|
||||
BlobFile.find_or_initialize_by(sha256: blob_entry.sha256) do |blob_file|
|
||||
blob_file.content_type = blob_entry.content_type
|
||||
blob_file.content_bytes = blob_entry.contents
|
||||
blob_file.created_at = blob_entry.created_at
|
||||
end
|
||||
blob_file
|
||||
end
|
||||
|
||||
@@ -1,15 +1,38 @@
|
||||
require "find"
|
||||
|
||||
namespace :blob_file do
|
||||
desc "migrate blob files to the new format"
|
||||
task migrate_blob_entries: :environment do
|
||||
task migrate_blob_entries: %i[environment] do
|
||||
batch_size = ENV["batch_size"]&.to_i || 1000
|
||||
start_at = ENV["start_at"] || ("00" * 32)
|
||||
profile = ENV["profile"] == "true" || false
|
||||
num_migrated = 0
|
||||
puts "batch_size: #{batch_size}"
|
||||
|
||||
BlobEntryP
|
||||
.where("sha256 >= decode(?, 'hex')", start_at)
|
||||
.where("sha256 NOT IN (SELECT sha256 FROM blob_files)")
|
||||
.find_in_batches(batch_size: batch_size) do |batch|
|
||||
puts "loaded #{batch.size} blob entries starting at #{HexUtil.bin2hex(batch.first.sha256)}"
|
||||
RubyProf.start if profile
|
||||
|
||||
def migrate_impl(batch_size, start_at, stop_at)
|
||||
num_migrated = 0
|
||||
start_time = Time.now
|
||||
BlobEntryP
|
||||
.where("sha256 NOT IN (SELECT sha256 FROM blob_files)")
|
||||
.includes(:base)
|
||||
.find_in_batches(
|
||||
batch_size: batch_size,
|
||||
start: HexUtil.hex2bin(start_at),
|
||||
finish: HexUtil.hex2bin(stop_at)
|
||||
) do |batch|
|
||||
batch_migrated = insert_blob_entries_batch(batch)
|
||||
num_migrated += batch_migrated
|
||||
rate = batch_migrated.to_f / (Time.now - start_time)
|
||||
puts "migrated #{batch_migrated} @ #{rate.round(1)}/second blob entries [last: #{HexUtil.bin2hex(batch.last.sha256)}]"
|
||||
start_time = Time.now
|
||||
end
|
||||
num_migrated
|
||||
end
|
||||
|
||||
def insert_blob_entries_batch(batch)
|
||||
num_migrated = 0
|
||||
BlobFile.transaction do
|
||||
batch.each do |blob_entry|
|
||||
blob_file = BlobFile.find_or_initialize_from_blob_entry(blob_entry)
|
||||
sha256_hex = HexUtil.bin2hex(blob_file.sha256)
|
||||
@@ -22,10 +45,61 @@ namespace :blob_file do
|
||||
puts "error saving blob file #{sha256_hex}: #{e}"
|
||||
end
|
||||
end
|
||||
|
||||
puts "migrated #{batch.size} blob entries [last: #{HexUtil.bin2hex(batch.last.sha256)}]"
|
||||
end
|
||||
num_migrated
|
||||
end
|
||||
|
||||
puts "migrated #{num_migrated} blob entries"
|
||||
def start_thread(batch_size, start_at, stop_at)
|
||||
Thread.new { migrate_impl(batch_size, start_at, stop_at) }
|
||||
end
|
||||
|
||||
num_ractors = 4
|
||||
skip = (0xFFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF_FFFF / num_ractors) + 1
|
||||
num_migrated =
|
||||
(0...num_ractors)
|
||||
.map do |i|
|
||||
# partition the entire sha256 space into num_ractors chunks
|
||||
# each chunk is 256 / num_ractors in size
|
||||
start_at = (skip * i).to_s(16).rjust(32, "0")
|
||||
stop_at = ((skip * (i + 1)) - 1).to_s(16).rjust(32, "0")
|
||||
puts "migrate #{start_at} -> #{stop_at}"
|
||||
start_thread(batch_size, start_at, stop_at)
|
||||
end
|
||||
.map(&:value)
|
||||
.sum
|
||||
|
||||
begin
|
||||
base = "profiler/blob_file_migrate"
|
||||
FileUtils.mkdir_p(base) unless File.exist?(base)
|
||||
result = RubyProf.stop
|
||||
File.open("#{base}/profile.txt", "w") do |f|
|
||||
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.html", "w") do |f|
|
||||
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.rubyprof", "w") do |f|
|
||||
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
puts "wrote profile to #{base}"
|
||||
end if profile
|
||||
|
||||
puts "migrated #{num_migrated} total blob entries"
|
||||
end
|
||||
|
||||
task verify_fs_files: :environment do
|
||||
dir = Rails.application.config_for("blob_file_location")
|
||||
num_verified = 0
|
||||
Find.find(dir) do |path|
|
||||
next if File.directory?(path)
|
||||
expected_sha256 = File.basename(path)
|
||||
actual_sha256 = Digest::SHA256.file(path).hexdigest
|
||||
if expected_sha256 != actual_sha256
|
||||
puts "file #{path} has mismatching sha256: #{expected_sha256} != #{actual_sha256}"
|
||||
end
|
||||
num_verified += 1
|
||||
end
|
||||
|
||||
puts "(#{Rails.env}, #{dir}) verified #{num_verified} files"
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user