add tests for blob entry migration script
This commit is contained in:
@@ -43,48 +43,46 @@ class BlobEntrySampleExporter
|
||||
end
|
||||
|
||||
def migrate_blob_entries(start_at, batch_size)
|
||||
offset = start_at && HexUtil.hex2bin(start_at)
|
||||
keep_going = true
|
||||
total_imported = 0
|
||||
measure(proc {
|
||||
"imported #{total_imported} blob entry models"
|
||||
offset = start_at && HexUtil.hex2bin(start_at) || "\0"
|
||||
total_inserted = 0
|
||||
connection = ReduxApplicationRecord.connection
|
||||
|
||||
measure(proc { |ignore, dur|
|
||||
rate = (total_inserted / dur).round(2).to_s.bold
|
||||
"inserted #{total_inserted.to_s.bold} blob entry models, #{rate}/sec"
|
||||
}) do
|
||||
while keep_going
|
||||
missing = []
|
||||
measure(proc {
|
||||
"migrated #{missing.size} blob entries, #{total_imported} total, " +
|
||||
"last offset #{HexUtil.bin2hex(offset)}"
|
||||
relation = BlobEntry.where("sha256 >= ?", connection.escape_bytea(offset))
|
||||
relation.pluck_in_batches(:sha256, batch_size: batch_size) do |batch|
|
||||
offset = batch.last
|
||||
measure(proc { |num, dur|
|
||||
rate = (num / dur).round(2).to_s.bold
|
||||
"migrated #{num.to_s.bold} blobs (#{rate}/sec), #{total_inserted.to_s.bold} total, " +
|
||||
"last offset #{HexUtil.bin2hex(offset).to_s.bold}"
|
||||
}) do
|
||||
be_sha256s = if offset
|
||||
BlobEntry.where("sha256 > E'\\\\x#{HexUtil.bin2hex(offset)}'")
|
||||
else
|
||||
BlobEntry
|
||||
end.order(sha256: :asc).limit(batch_size).pluck(:sha256)
|
||||
if be_sha256s.empty?
|
||||
keep_going = false
|
||||
break
|
||||
binds = batch.map do |sha256|
|
||||
connection.escape_bytea(sha256)
|
||||
end
|
||||
offset = be_sha256s.last
|
||||
bind_nums = batch.size.times.map { |idx| "$#{idx + 1}" }
|
||||
|
||||
bep_sha256s = BlobEntryP.where(sha256: be_sha256s).pluck(:sha256)
|
||||
missing = be_sha256s - bep_sha256s
|
||||
next if missing.empty?
|
||||
total_imported += missing.size
|
||||
query = <<-SQL
|
||||
INSERT INTO blob_entries_p
|
||||
(
|
||||
SELECT sha256, base_sha256, content_type, size, contents, created_at
|
||||
FROM blob_entries
|
||||
WHERE sha256 IN (#{bind_nums.join(", ")})
|
||||
AND sha256 NOT IN (select sha256 from blob_entries_p)
|
||||
) RETURNING sha256
|
||||
SQL
|
||||
|
||||
missing_formatted = missing.map do |sha256|
|
||||
"E'\\\\x#{HexUtil.bin2hex(sha256)}'"
|
||||
end.join(", ")
|
||||
ReduxApplicationRecord.connection.execute <<-SQL
|
||||
INSERT INTO blob_entries_p
|
||||
(
|
||||
SELECT sha256, base_sha256, content_type, size, contents, created_at
|
||||
FROM blob_entries
|
||||
WHERE sha256 IN (#{missing_formatted})
|
||||
) RETURNING sha256
|
||||
SQL
|
||||
result = connection.exec_query query, "SQL", binds
|
||||
num_inserted = result.rows.length
|
||||
total_inserted += num_inserted
|
||||
num_inserted
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
total_inserted
|
||||
end
|
||||
|
||||
def write_blob_entry(file, blob_entry)
|
||||
|
||||
@@ -17,7 +17,7 @@ module HasMeasureDuration
|
||||
else
|
||||
duration_str = "#{(duration * 1000).round(0).to_s.bold} ms"
|
||||
end
|
||||
title = title.call(ret) if title.respond_to?(:call)
|
||||
title = title.call(ret, duration) if title.respond_to?(:call)
|
||||
logger.info "#{title} - #{duration_str}"
|
||||
ret
|
||||
end
|
||||
|
||||
@@ -79,8 +79,9 @@ class BlobEntryP < ReduxApplicationRecord
|
||||
/application\/json/,
|
||||
]
|
||||
|
||||
def self.build_record(content_type:, sha256:, contents:, candidates: [])
|
||||
record = BlobEntryP.new(sha256: sha256, content_type: content_type, size: contents.size)
|
||||
def self.build_record(content_type:, sha256: nil, contents:, candidates: [])
|
||||
sha256 ||= Digest::SHA256.digest(contents)
|
||||
record = self.new(sha256: sha256, content_type: content_type, size: contents.size)
|
||||
|
||||
smallest_patch_size = nil
|
||||
smallest_patch = nil
|
||||
|
||||
102
spec/lib/blob_entry_sample_exporter_spec.rb
Normal file
102
spec/lib/blob_entry_sample_exporter_spec.rb
Normal file
@@ -0,0 +1,102 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe BlobEntrySampleExporter do
|
||||
def raw_insert_be(contents)
|
||||
sha256 = Digest::SHA256.digest(contents)
|
||||
ReduxApplicationRecord.connection.exec_insert(
|
||||
[
|
||||
"INSERT INTO blob_entries",
|
||||
"(sha256, content_type, size, contents, created_at, updated_at)",
|
||||
"VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
|
||||
].join(" "),
|
||||
"SQL",
|
||||
[
|
||||
ReduxApplicationRecord.connection.escape_bytea(sha256),
|
||||
"text",
|
||||
contents.size,
|
||||
ReduxApplicationRecord.connection.escape_bytea(contents),
|
||||
]
|
||||
)
|
||||
model = BlobEntry.find(sha256)
|
||||
end
|
||||
|
||||
def expect_same_model(bep, be, except: [])
|
||||
([
|
||||
:sha256, :base_sha256,
|
||||
:contents, :size, :bytes_stored,
|
||||
:content_type, :created_at,
|
||||
] - except).each do |attr|
|
||||
expect(bep.send(attr)).to eq(be.send(attr)), "'#{attr}' does not match"
|
||||
end
|
||||
end
|
||||
|
||||
it "migrates entries" do
|
||||
be_model = raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
expect(BlobEntryP.find_by(sha256: be_model.sha256)).to be_nil
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 1)
|
||||
expect(imported).to eq(1)
|
||||
end.to change(BlobEntryP, :count).by(1)
|
||||
|
||||
bep_model = BlobEntryP.find_by(sha256: be_model.sha256)
|
||||
expect(bep_model).not_to be_nil
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
|
||||
it "migrates entries in batches" do
|
||||
num_models = 16
|
||||
be_models = num_models.times.map do
|
||||
raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
end
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 2)
|
||||
expect(imported).to eq(num_models)
|
||||
end.to change(BlobEntryP, :count).by(num_models)
|
||||
|
||||
be_models.each do |be_model|
|
||||
bep_model = BlobEntryP.find(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end
|
||||
|
||||
it "does not migrate existing entries" do
|
||||
contents = "this is a test #{SpecUtil.random_string}"
|
||||
be_model = raw_insert_be(contents)
|
||||
bep_model = BlobEntryP.build_record(contents: contents, content_type: "text")
|
||||
bep_model.save!
|
||||
expect_same_model(bep_model, be_model, except: [:created_at])
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 1)
|
||||
expect(imported).to eq(0)
|
||||
end.to change(BlobEntryP, :count).by(0)
|
||||
end
|
||||
|
||||
it "handles a subset of bep already being migrated" do
|
||||
num_models = 128
|
||||
num_migrated = 32
|
||||
|
||||
be_models = num_models.times.map do
|
||||
raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
end
|
||||
|
||||
already_migrated = be_models.sample(num_migrated)
|
||||
expect do
|
||||
already_migrated.each do |be_model|
|
||||
bep_model = BlobEntryP.ensure(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end.to change(BlobEntryP, :count).by(num_migrated)
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 16)
|
||||
expect(imported).to eq(num_models - num_migrated)
|
||||
end.to change(BlobEntryP, :count).by(num_models - num_migrated)
|
||||
|
||||
be_models.each do |be_model|
|
||||
bep_model = BlobEntryP.find(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user