merge futureized cache operations

This commit is contained in:
2023-02-03 19:03:10 +00:00
parent 238cd9c4ae
commit 56b3fd33d8

View File

@@ -97,7 +97,6 @@ class LegacyImport::HttpLogEntryBulkImporter
puts "Total blobs inserted: #{@insert_stats.blob_entries_inserted}"
puts "Total duration: #{duration.round(0)} seconds (#{rate.round(2)}/second)"
hr
# dump_cache_stats
if PROFILE
Dir.mkdir("profiler") unless File.exist?("profiler")
@@ -138,8 +137,23 @@ class LegacyImport::HttpLogEntryBulkImporter
end
@timings.finish :reject_empty_legacy
legacy_model_id_to_response_sha256 = bulk_import_blob_entries(legacy_models)
legacy_model_id_to_header_sha256s, header_sha256_to_header_id = bulk_import_headers(legacy_models)
blob_creation_future = ForkFuture.new do
bulk_import_blob_entries(legacy_models)
end
header_creation_future = ForkFuture.new do
bulk_import_headers(legacy_models)
end
insert_stats, timings, cache_ops, legacy_model_id_to_response_sha256 = blob_creation_future.join
@insert_stats.merge!(insert_stats)
@timings.merge!(timings)
cache_ops.each do |op|
@blob_entry_cache[op[0]].send(op[1], *op[2..])
end
insert_stats, timings, legacy_model_id_to_header_sha256s, header_sha256_to_header_id = header_creation_future.join
@insert_stats.merge!(insert_stats)
@timings.merge!(timings)
@timings.start :insert_new_https
http_models = legacy_models.map do |legacy_model|
@@ -157,8 +171,12 @@ class LegacyImport::HttpLogEntryBulkImporter
end
def bulk_import_blob_entries(legacy_models)
insert_stats = InsertStats.new
timings = Timings.new
cache_ops = []
# compute all blob entries for the legacy models, removing duplicates
@timings.start :lookup_existing_bes
timings.start :lookup_existing_bes
legacy_model_id_to_response_sha256 = legacy_models.map do |m|
[m.id, Digest::SHA256.digest(m.response_body)]
@@ -167,9 +185,9 @@ class LegacyImport::HttpLogEntryBulkImporter
sha256_to_existing_blob_entry = ::BlobEntry.where(sha256: legacy_model_id_to_response_sha256.values).map do |be|
[be.sha256, be]
end.to_h
@timings.finish :lookup_existing_bes
timings.finish :lookup_existing_bes
@timings.start :build_new_bes
timings.start :build_new_bes
blob_entries_to_insert = legacy_models.map do |legacy_model|
sha256 = legacy_model_id_to_response_sha256[legacy_model.id] || raise
next nil if sha256_to_existing_blob_entry[sha256]
@@ -196,32 +214,39 @@ class LegacyImport::HttpLogEntryBulkImporter
# reward the base if it was used, if not, insert this blob into the
# cache so it'll be a future candidate (unless it's not a new model)
# cache keys are hex encoded for easier viewing / debugging
if blob.base_sha256
cache.reward(HexUtil.bin2hex(blob.base_sha256)[0..8])
else
cache.insert(HexUtil.bin2hex(blob.sha256)[0..8], blob, legacy_model.full_path)
end unless blob.persisted?
if !blob.persisted? && @cache_size > 0
op = if blob.base_sha256
[cache_key, :reward, HexUtil.bin2hex(blob.base_sha256)[0..8]]
else
[cache_key, :insert, HexUtil.bin2hex(blob.sha256)[0..8], blob, legacy_model.full_path]
end
cache_ops << op
cache.send(op[1], *op[2..])
end
blob.valid? || raise("invalid blob entry")
blob
end.reject(&:nil?).uniq do |blob_entry|
blob_entry.sha256
end
@timings.finish :build_new_bes
timings.finish :build_new_bes
# bulk-insert all the new blob entries
@timings.start :insert_new_bes
timings.start :insert_new_bes
BlobEntry.insert_all!(blob_entries_to_insert.map(&:to_bulk_insert_hash)) if blob_entries_to_insert.any?
@insert_stats.blob_entries_inserted += blob_entries_to_insert.size
@insert_stats.bytes_length += blob_entries_to_insert.map(&:contents).map(&:size).sum
@insert_stats.bytes_stored += blob_entries_to_insert.map(&:bytes_stored).sum
@timings.finish :insert_new_bes
insert_stats.blob_entries_inserted += blob_entries_to_insert.size
insert_stats.bytes_length += blob_entries_to_insert.map(&:contents).map(&:size).sum
insert_stats.bytes_stored += blob_entries_to_insert.map(&:bytes_stored).sum
timings.finish :insert_new_bes
legacy_model_id_to_response_sha256
[insert_stats, timings, cache_ops, legacy_model_id_to_response_sha256]
end
def bulk_import_headers(legacy_models)
@timings.start :build_new_headers
insert_stats = InsertStats.new
timings = Timings.new
timings.start :build_new_headers
header_sha256_to_header_model = {}
legacy_model_id_to_header_sha256s =
legacy_models.map do |legacy_model|
@@ -233,10 +258,10 @@ class LegacyImport::HttpLogEntryBulkImporter
header_sha256_to_header_model[res_sha256] = response_headers
[legacy_model.id, { req_sha256: req_sha256, res_sha256: res_sha256 }]
end.to_h
@timings.finish :build_new_headers
timings.finish :build_new_headers
# excluding existing headers, and bulk-insert the new headers
@timings.start :insert_new_headers
timings.start :insert_new_headers
header_sha256_to_header_id = ::HttpLogEntryHeader.where(sha256: header_sha256_to_header_model.keys).map do |model|
[model.sha256, model.id]
end.to_h
@@ -260,10 +285,10 @@ class LegacyImport::HttpLogEntryBulkImporter
sha256 = ::HexUtil.hex2bin(sha256[2..])
header_sha256_to_header_id[sha256] = id
end if headers_to_insert.any?
@insert_stats.header_entries_inserted += headers_to_insert.size
@timings.finish :insert_new_headers
insert_stats.header_entries_inserted += headers_to_insert.size
timings.finish :insert_new_headers
[legacy_model_id_to_header_sha256s, header_sha256_to_header_id]
[insert_stats, timings, legacy_model_id_to_header_sha256s, header_sha256_to_header_id]
end
def build_http_log_entry(legacy_model, request_headers_id, response_headers_id, response_sha256)
@@ -288,18 +313,6 @@ class LegacyImport::HttpLogEntryBulkImporter
model
end
def dump_cache_stats
hr
puts "Cache state: "
@blob_entry_cache.each_pair do |key, cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
end
end
end
def hr
puts "-" * 40
end
@@ -311,14 +324,20 @@ class LegacyImport::HttpLogEntryBulkImporter
def initialize
@start_at = Time.now
self.keys = []
self.totals = Hash.new do |h, key|
h[key] = { started: nil, secs: 0.0 }
self.totals = {}
end
def merge!(other)
raise if other.nil?
other.keys.each do |key|
self.keys << key unless self.keys.include?(key)
self.entry_for(key)[:secs] += other.totals[key][:secs]
end
end
def start(key)
self.keys << key unless self.keys.include?(key)
entry = self.totals[key]
entry = self.entry_for(key)
raise("#{key} already started") if entry[:started]
entry[:started] = Time.now
end
@@ -355,6 +374,10 @@ class LegacyImport::HttpLogEntryBulkImporter
},
])
end
def entry_for(key)
self.totals[key] ||= { started: nil, secs: 0.0 }
end
end
InsertStats = Struct.new(
@@ -372,6 +395,14 @@ class LegacyImport::HttpLogEntryBulkImporter
self.bytes_length = 0
end
def merge!(other)
self.http_entries_inserted += other.http_entries_inserted
self.blob_entries_inserted += other.blob_entries_inserted
self.header_entries_inserted += other.header_entries_inserted
self.bytes_stored += other.bytes_stored
self.bytes_length += other.bytes_length
end
def to_s
ratio = self.bytes_stored.to_f / self.bytes_length
[