remove blob entry
This commit is contained in:
@@ -1,137 +0,0 @@
|
||||
class BlobEntrySampleExporter
|
||||
include HasColorLogger
|
||||
include HasMeasureDuration
|
||||
|
||||
def export_samples(limit, file)
|
||||
@num_written = 0
|
||||
@bytes_written = 0
|
||||
measure(proc {
|
||||
"wrote #{@num_written} blob entries, #{HexUtil.humansize(@bytes_written)} to #{file}"
|
||||
}) do
|
||||
File.open(file, "w") do |file|
|
||||
::BlobEntry.limit(limit).find_each(batch_size: 32) do |blob_entry|
|
||||
write_blob_entry(file, blob_entry)
|
||||
write_blob_entry(file, blob_entry.base) if blob_entry.base
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def import_samples(file)
|
||||
@num_read = 0
|
||||
measure(proc {
|
||||
"read #{@num_read} blob entries from #{file}"
|
||||
}) do
|
||||
File.open(file, "r") do |file|
|
||||
while (line = file.readline) != nil
|
||||
be = read_blob_entry(
|
||||
line.chomp.strip,
|
||||
file.readline.chomp.strip
|
||||
)
|
||||
|
||||
if be.base_sha256.present?
|
||||
base = read_blob_entry(
|
||||
file.readline.chomp.strip,
|
||||
file.readline.chomp.strip
|
||||
)
|
||||
base.save! unless base.persisted?
|
||||
end
|
||||
be.save! unless be.persisted?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def migrate_blob_entries(start_at, batch_size)
|
||||
offset = start_at && HexUtil.hex2bin(start_at) || "\0"
|
||||
total_inserted = 0
|
||||
connection = ReduxApplicationRecord.connection
|
||||
|
||||
measure(proc { |ignore, dur|
|
||||
rate = (total_inserted / dur).round(2).to_s.bold
|
||||
"inserted #{total_inserted.to_s.bold} blob entry models, #{rate}/sec"
|
||||
}) do
|
||||
relation = BlobEntry.where("sha256 >= ?", connection.escape_bytea(offset))
|
||||
relation.pluck_in_batches(:sha256, batch_size: batch_size) do |batch|
|
||||
offset = batch.last
|
||||
measure(proc { |num, dur|
|
||||
rate = (num / dur).round(2).to_s.bold
|
||||
"migrated #{num.to_s.bold} blobs (#{rate}/sec), #{total_inserted.to_s.bold} total, " +
|
||||
"last offset #{HexUtil.bin2hex(offset).to_s.bold}"
|
||||
}) do
|
||||
binds = batch.map do |sha256|
|
||||
connection.escape_bytea(sha256)
|
||||
end
|
||||
bind_nums = batch.size.times.map { |idx| "$#{idx + 1}" }
|
||||
|
||||
query = <<-SQL
|
||||
INSERT INTO blob_entries_p (
|
||||
SELECT
|
||||
be.sha256,
|
||||
be.base_sha256,
|
||||
be.content_type,
|
||||
be.size,
|
||||
be.contents,
|
||||
be.created_at
|
||||
FROM
|
||||
blob_entries be
|
||||
LEFT JOIN
|
||||
blob_entries_p bep ON be.sha256 = bep.sha256
|
||||
WHERE
|
||||
bep.sha256 IS NULL
|
||||
AND
|
||||
be.sha256 IN (#{bind_nums.join(", ")})
|
||||
) RETURNING sha256
|
||||
SQL
|
||||
|
||||
result = connection.exec_query query, "SQL", binds, prepare: true
|
||||
num_inserted = result.rows.length
|
||||
total_inserted += num_inserted
|
||||
num_inserted
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
total_inserted
|
||||
end
|
||||
|
||||
def write_blob_entry(file, blob_entry)
|
||||
hash = blob_entry.to_bulk_insert_hash
|
||||
file.puts({
|
||||
sha256: HexUtil.bin2hex(hash[:sha256]),
|
||||
base_sha256: hash[:base_sha256] ? HexUtil.bin2hex(hash[:base_sha256]) : nil,
|
||||
content_type: hash[:content_type],
|
||||
size: hash[:size],
|
||||
created_at: blob_entry.created_at,
|
||||
}.to_json)
|
||||
file.puts(Base64.strict_encode64(blob_entry.read_attribute(:contents)))
|
||||
logger.info(
|
||||
"#{HexUtil.bin2hex(blob_entry.sha256)} - " +
|
||||
"#{blob_entry.base_sha256.present?} - " +
|
||||
"#{blob_entry.content_type} - #{HexUtil.humansize(blob_entry.size)} " +
|
||||
"(#{HexUtil.humansize(blob_entry.bytes_stored)} actual)"
|
||||
)
|
||||
@bytes_written += blob_entry.bytes_stored
|
||||
@num_written += 1
|
||||
end
|
||||
|
||||
def read_blob_entry(line1, line2)
|
||||
hash = JSON.parse(line1)
|
||||
sha256 = HexUtil.hex2bin(hash["sha256"])
|
||||
be = BlobEntry.find_by(sha256: sha256)
|
||||
return be if be
|
||||
|
||||
contents = Base64.strict_decode64(line2)
|
||||
be = BlobEntry.find_by(sha256: sha256) || BlobEntry.new({
|
||||
sha256: sha256,
|
||||
base_sha256: hash["base_sha256"] ? HexUtil.hex2bin(hash["base_sha256"]) : nil,
|
||||
created_at: Time.parse(hash["created_at"]),
|
||||
contents: contents,
|
||||
size: hash["size"],
|
||||
content_type: hash["content_type"],
|
||||
})
|
||||
logger.info("#{hash["sha256"]} - #{hash["content_type"]} - #{HexUtil.humansize(hash["size"])}")
|
||||
@num_read += 1
|
||||
be
|
||||
end
|
||||
end
|
||||
@@ -183,7 +183,7 @@ class LegacyImport::HttpLogEntryBulkImporter
|
||||
[m.id, Digest::SHA256.digest(m.response_body)]
|
||||
end.to_h
|
||||
|
||||
sha256_to_existing_blob_entry = ::BlobEntry.where(sha256: legacy_model_id_to_response_sha256.values).map do |be|
|
||||
sha256_to_existing_blob_entry = ::BlobEntryP.where(sha256: legacy_model_id_to_response_sha256.values).map do |be|
|
||||
[be.sha256, be]
|
||||
end.to_h
|
||||
timings.finish :lookup_existing_bes
|
||||
@@ -205,7 +205,7 @@ class LegacyImport::HttpLogEntryBulkImporter
|
||||
[]
|
||||
end
|
||||
|
||||
blob_entry = ::BlobEntry.build_record(
|
||||
blob_entry = ::BlobEntryP.build_record(
|
||||
content_type: content_type,
|
||||
sha256: sha256,
|
||||
contents: legacy_model.response_body,
|
||||
@@ -242,7 +242,7 @@ class LegacyImport::HttpLogEntryBulkImporter
|
||||
slice_size = [(blob_entries_to_insert.size.to_f / @fork_amount).ceil, 1].max
|
||||
blob_entries_to_insert.each_slice(slice_size).map do |slice|
|
||||
ForkFuture.new do
|
||||
BlobEntry.insert_all!(slice.map(&:to_bulk_insert_hash)) if slice.any?
|
||||
BlobEntryP.insert_all!(slice.map(&:to_bulk_insert_hash)) if slice.any?
|
||||
end
|
||||
end.to_a.map(&:join) if blob_entries_to_insert.any?
|
||||
insert_stats.blob_entries_inserted += blob_entries_to_insert.size
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
class BlobEntry < ReduxApplicationRecord
|
||||
include ImmutableModel
|
||||
before_destroy { raise ActiveRecord::ReadOnlyRecord }
|
||||
before_create { raise ActiveRecord::ReadOnlyRecord }
|
||||
|
||||
self.primary_key = :sha256
|
||||
EMPTY_FILE_SHA256 = HexUtil.hex2bin("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
|
||||
|
||||
belongs_to :base,
|
||||
optional: true,
|
||||
foreign_key: :base_sha256,
|
||||
class_name: "::BlobEntry"
|
||||
|
||||
validates_presence_of(
|
||||
:sha256,
|
||||
:content_type,
|
||||
:size
|
||||
)
|
||||
validates :contents, length: { minimum: 0, allow_nil: false, message: "can't be nil" }
|
||||
validates :sha256, length: { is: 32 }
|
||||
validates :base_sha256, length: { is: 32 }, if: :base_sha256
|
||||
|
||||
after_create do
|
||||
BlobEntryP.create!(to_bulk_insert_hash.merge(created_at: created_at))
|
||||
end
|
||||
|
||||
def to_bulk_insert_hash
|
||||
[
|
||||
:sha256,
|
||||
:content_type,
|
||||
:contents,
|
||||
:size,
|
||||
:base_sha256,
|
||||
].each do |attr|
|
||||
[attr, self.read_attribute(attr)]
|
||||
end.to_h
|
||||
end
|
||||
|
||||
def contents
|
||||
@contents ||= begin
|
||||
contents_raw = self.read_attribute(:contents)
|
||||
if self.base
|
||||
XDiff.patch(self.base.contents, contents_raw)
|
||||
else
|
||||
contents_raw
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def bytes_stored
|
||||
self.read_attribute(:contents).size
|
||||
end
|
||||
end
|
||||
@@ -31,19 +31,7 @@ class BlobEntryP < ReduxApplicationRecord
|
||||
validates :base_sha256, length: { is: 32 }, if: :base_sha256
|
||||
|
||||
def self.ensure(sha256)
|
||||
find_by(sha256: sha256) || begin
|
||||
be = BlobEntry.find_by(sha256: sha256) || raise("not found: #{HexUtil.bin2hex(sha256)}")
|
||||
create!([
|
||||
:sha256,
|
||||
:base_sha256,
|
||||
:content_type,
|
||||
:size,
|
||||
:contents,
|
||||
:created_at,
|
||||
].map do |attr|
|
||||
[attr, be.read_attribute(attr)]
|
||||
end.to_h)
|
||||
end
|
||||
find_by(sha256: sha256) || raise("blob #{HexUtil.bin2hex(sha256)} does not exist")
|
||||
end
|
||||
|
||||
def contents
|
||||
|
||||
@@ -102,7 +102,7 @@ class Domain::E621::Post < ReduxApplicationRecord
|
||||
})
|
||||
end
|
||||
|
||||
http_log_entry.response ||= ::BlobEntry.find_or_build_from_legacy(legacy_model.blob_entry)
|
||||
http_log_entry.response ||= ::BlobEntryP.find_or_build_from_legacy(legacy_model.blob_entry)
|
||||
blob_entry = http_log_entry.response
|
||||
|
||||
if blob_entry && http_log_entry
|
||||
|
||||
@@ -12,11 +12,6 @@ class HttpLogEntry < ReduxApplicationRecord
|
||||
serverhost-1
|
||||
], _prefix: true
|
||||
|
||||
belongs_to :response_legacy,
|
||||
foreign_key: :response_sha256,
|
||||
class_name: "::BlobEntry",
|
||||
optional: true
|
||||
|
||||
belongs_to :response,
|
||||
foreign_key: :response_sha256,
|
||||
class_name: "::BlobEntryP",
|
||||
|
||||
@@ -80,11 +80,11 @@ class Legacy::BlobEntry < LegacyApplicationRecord
|
||||
end
|
||||
|
||||
def inc_refcount
|
||||
BlobEntry.increment_counter(:refcount, id)
|
||||
::Legacy::BlobEntry.increment_counter(:refcount, id)
|
||||
end
|
||||
|
||||
def dec_refcount
|
||||
BlobEntry.decrement_counter(:refcount, id)
|
||||
::Legacy::BlobEntry.decrement_counter(:refcount, id)
|
||||
end
|
||||
|
||||
def self.create_from_blob(blob:, opts: {})
|
||||
@@ -107,14 +107,14 @@ class Legacy::BlobEntry < LegacyApplicationRecord
|
||||
}
|
||||
|
||||
be = nil
|
||||
BlobEntry.transaction do
|
||||
be = BlobEntry.find_by(sha256: sha256)
|
||||
::Legacy::BlobEntry.transaction do
|
||||
be = ::Legacy::BlobEntry.find_by(sha256: sha256)
|
||||
if be && !be.ensure_file_path
|
||||
# correct directory depth as well
|
||||
Legacy::SConfig.logger.warn("file doesn't exist for #{be.id}, writing again...")
|
||||
write_out.call(be, blob)
|
||||
elsif !be
|
||||
new_be = BlobEntry.new(opts.merge(sha256: sha256))
|
||||
new_be = ::Legacy::BlobEntry.new(opts.merge(sha256: sha256))
|
||||
write_out.call(new_be, blob)
|
||||
new_be.save!
|
||||
be = new_be
|
||||
|
||||
@@ -75,7 +75,7 @@ class Legacy::E621::Post < LegacyApplicationRecord
|
||||
def resized_file_path(style)
|
||||
raise("no md5") unless md5
|
||||
|
||||
hashed_path = BlobEntry.file_path_at_depth(
|
||||
hashed_path = Legacy::BlobEntry.file_path_at_depth(
|
||||
sha256: md5,
|
||||
depth: 4, stride: 2, hash_length: 32,
|
||||
)
|
||||
|
||||
@@ -298,7 +298,7 @@ class Legacy::HttpLogEntry < LegacyApplicationRecord
|
||||
self.resp_body = use_string
|
||||
SConfig.logger.info "Storing data interally"
|
||||
else
|
||||
self.blob_entry = BlobEntry.create_from_blob(blob: use_string, opts: { dir_depth: 4 })
|
||||
self.blob_entry = Legacy::BlobEntry.create_from_blob(blob: use_string, opts: { dir_depth: 4 })
|
||||
blob_entry.inc_refcount
|
||||
SConfig.logger.info "Storing data in blob entry #{blob_entry.id}..."
|
||||
end
|
||||
|
||||
@@ -13,7 +13,7 @@ class CreateIndexGoodJobsJobsOnPriorityCreatedAtWhenUnfinished < ActiveRecord::M
|
||||
end
|
||||
|
||||
add_index :good_jobs, [:priority, :created_at], order: { priority: "DESC NULLS LAST", created_at: :asc },
|
||||
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished,
|
||||
algorithm: :concurrently
|
||||
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished,
|
||||
algorithm: :concurrently
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
require "rails_helper"
|
||||
|
||||
describe BlobEntrySampleExporter do
|
||||
def raw_insert_be(contents)
|
||||
sha256 = Digest::SHA256.digest(contents)
|
||||
ReduxApplicationRecord.connection.exec_insert(
|
||||
[
|
||||
"INSERT INTO blob_entries",
|
||||
"(sha256, content_type, size, contents, created_at, updated_at)",
|
||||
"VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
|
||||
].join(" "),
|
||||
"SQL",
|
||||
[
|
||||
ReduxApplicationRecord.connection.escape_bytea(sha256),
|
||||
"text",
|
||||
contents.size,
|
||||
ReduxApplicationRecord.connection.escape_bytea(contents),
|
||||
]
|
||||
)
|
||||
model = BlobEntry.find(sha256)
|
||||
end
|
||||
|
||||
def expect_same_model(bep, be, except: [])
|
||||
([
|
||||
:sha256, :base_sha256,
|
||||
:contents, :size, :bytes_stored,
|
||||
:content_type, :created_at,
|
||||
] - except).each do |attr|
|
||||
expect(bep.send(attr)).to eq(be.send(attr)), "'#{attr}' does not match"
|
||||
end
|
||||
end
|
||||
|
||||
it "migrates entries" do
|
||||
be_model = raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
expect(BlobEntryP.find_by(sha256: be_model.sha256)).to be_nil
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 1)
|
||||
expect(imported).to eq(1)
|
||||
end.to change(BlobEntryP, :count).by(1)
|
||||
|
||||
bep_model = BlobEntryP.find_by(sha256: be_model.sha256)
|
||||
expect(bep_model).not_to be_nil
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
|
||||
it "migrates entries in batches" do
|
||||
num_models = 16
|
||||
be_models = num_models.times.map do
|
||||
raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
end
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 2)
|
||||
expect(imported).to eq(num_models)
|
||||
end.to change(BlobEntryP, :count).by(num_models)
|
||||
|
||||
be_models.each do |be_model|
|
||||
bep_model = BlobEntryP.find(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end
|
||||
|
||||
it "does not migrate existing entries" do
|
||||
contents = "this is a test #{SpecUtil.random_string}"
|
||||
be_model = raw_insert_be(contents)
|
||||
bep_model = BlobEntryP.build_record(contents: contents, content_type: "text")
|
||||
bep_model.save!
|
||||
expect_same_model(bep_model, be_model, except: [:created_at])
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 1)
|
||||
expect(imported).to eq(0)
|
||||
end.to change(BlobEntryP, :count).by(0)
|
||||
end
|
||||
|
||||
it "handles a subset of bep already being migrated" do
|
||||
num_models = 128
|
||||
num_migrated = 32
|
||||
|
||||
be_models = num_models.times.map do
|
||||
raw_insert_be("this is a test #{SpecUtil.random_string}")
|
||||
end
|
||||
|
||||
already_migrated = be_models.sample(num_migrated)
|
||||
expect do
|
||||
already_migrated.each do |be_model|
|
||||
bep_model = BlobEntryP.ensure(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end.to change(BlobEntryP, :count).by(num_migrated)
|
||||
|
||||
expect do
|
||||
imported = BlobEntrySampleExporter.new.migrate_blob_entries(nil, 16)
|
||||
expect(imported).to eq(num_models - num_migrated)
|
||||
end.to change(BlobEntryP, :count).by(num_models - num_migrated)
|
||||
|
||||
be_models.each do |be_model|
|
||||
bep_model = BlobEntryP.find(be_model.sha256)
|
||||
expect_same_model(bep_model, be_model)
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user