untangle blob and header creation code for http entry import

This commit is contained in:
2023-02-03 18:28:38 +00:00
parent b7f33dbe20
commit 238cd9c4ae
4 changed files with 128 additions and 64 deletions

View File

@@ -54,13 +54,7 @@ class LegacyImport::HttpLogEntryBulkImporter
hr
puts "Inserted #{@insert_stats} - " +
"#{rate.round(2)}/sec (last id: #{last_model_id})"
tp(@timings.entries.map do |entry|
{
key: entry[:key],
duration: "#{entry[:key_secs].round(1)} sec",
percent: "#{(100 * entry[:proportion]).round(1)}%".rjust(5),
}
end)
dump_timings
hr
end
end
@@ -82,7 +76,7 @@ class LegacyImport::HttpLogEntryBulkImporter
end
@timings.finish :bulk_load
stats_printer.kill
stats_printer.kill if stats_printer
duration = Time.now - start_at
bytes_stored = @insert_stats.bytes_stored
@@ -90,6 +84,8 @@ class LegacyImport::HttpLogEntryBulkImporter
ratio = bytes_stored.to_f / bytes_length
rate = @insert_stats.http_entries_inserted / duration
hr
dump_timings
hr
puts "Last id: #{last_model_id}"
puts "Cache size: #{@cache_size}"
@@ -117,7 +113,15 @@ class LegacyImport::HttpLogEntryBulkImporter
private
EMPTY_HEADER = ::HttpLogEntryHeader.find_or_create(headers: {})
def dump_timings
tp(@timings.entries.map do |entry|
{
key: entry[:key],
duration: "#{entry[:key_secs].round(1)} sec",
percent: "#{(100 * entry[:proportion]).round(1)}%".rjust(5),
}
end)
end
def import_legacy_models(legacy_models)
@timings.start :lookup_existing_http
@@ -134,19 +138,41 @@ class LegacyImport::HttpLogEntryBulkImporter
end
@timings.finish :reject_empty_legacy
legacy_model_id_to_response_sha256 = bulk_import_blob_entries(legacy_models)
legacy_model_id_to_header_sha256s, header_sha256_to_header_id = bulk_import_headers(legacy_models)
@timings.start :insert_new_https
http_models = legacy_models.map do |legacy_model|
request_headers_id = header_sha256_to_header_id[legacy_model_id_to_header_sha256s[legacy_model.id][:req_sha256]]
response_headers_id = header_sha256_to_header_id[legacy_model_id_to_header_sha256s[legacy_model.id][:res_sha256]]
response_sha256 = legacy_model_id_to_response_sha256[legacy_model.id]
request_headers_id || raise("no request header id")
response_headers_id || raise("no response header id")
response_sha256 || raise("no response sha256")
build_http_log_entry(legacy_model, request_headers_id, response_headers_id, response_sha256)
end
::HttpLogEntry.insert_all!(http_models.map(&:to_bulk_insert_hash)) if http_models.any?
@insert_stats.http_entries_inserted += http_models.size
@timings.finish :insert_new_https
end
def bulk_import_blob_entries(legacy_models)
# compute all blob entries for the legacy models, removing duplicates
@timings.start :lookup_existing_bes
legacy_models_and_sha256 = legacy_models.map do |m|
[m, Digest::SHA256.digest(m.response_body)]
end
existing_bes_by_sha256 = ::BlobEntry.where(sha256: legacy_models_and_sha256.map(&:second)).map do |be|
legacy_model_id_to_response_sha256 = legacy_models.map do |m|
[m.id, Digest::SHA256.digest(m.response_body)]
end.to_h
sha256_to_existing_blob_entry = ::BlobEntry.where(sha256: legacy_model_id_to_response_sha256.values).map do |be|
[be.sha256, be]
end.to_h
@timings.finish :lookup_existing_bes
@timings.start :build_new_bes
response_blobs = legacy_models_and_sha256.map do |pair|
legacy_model, sha256 = pair
blob_entries_to_insert = legacy_models.map do |legacy_model|
sha256 = legacy_model_id_to_response_sha256[legacy_model.id] || raise
next nil if sha256_to_existing_blob_entry[sha256]
content_type = legacy_model.content_type
cache_key = "#{legacy_model.host}|#{content_type}"
@@ -154,13 +180,13 @@ class LegacyImport::HttpLogEntryBulkImporter
# N% chance (if we're not at cache capacity) to not supply any candidates,
# to give new entries in the cache a chance to replace poor performing ones
candidates = if cache.at_capacity? && rand(0..100) >= 5
candidates = if cache.at_capacity? # && rand(0..100) >= 5
cache.candidates
else
[]
end
blob = existing_bes_by_sha256[sha256] || ::BlobEntry.build_entry(
blob = ::BlobEntry.build_record(
content_type: content_type,
sha256: sha256,
contents: legacy_model.response_body,
@@ -169,59 +195,79 @@ class LegacyImport::HttpLogEntryBulkImporter
# reward the base if it was used, if not, insert this blob into the
# cache so it'll be a future candidate (unless it's not a new model)
# cache keys are hex encoded for easier viewing / debugging
if blob.base_sha256
cache.reward(HexUtil.bin2hex(blob.base_sha256)[0..8])
else
cache.insert(HexUtil.bin2hex(blob.sha256)[0..8], blob, legacy_model.full_path)
end unless blob.persisted?
blob.valid? || raise("invalid blob entry")
blob
end.reject(&:nil?).uniq do |blob_entry|
blob_entry.sha256
end
@timings.finish :build_new_bes
legacy_models_with_response_sha256 = legacy_models.zip(response_blobs.map(&:sha256))
# remove all the already created blobs, and duplicates
blobs_without_dups = {}
response_blobs.each do |be|
next if be.persisted?
found = blobs_without_dups[be.sha256]
if found.nil? || found.bytes_stored > be.bytes_stored
# use the smaller of the two if there's an unpersisted duplicate
blobs_without_dups[be.sha256] = be
end
end
# bulk-insert all the new blob entries
@timings.start :insert_new_bes
blobs_to_insert = blobs_without_dups.values
BlobEntry.insert_all!(blobs_to_insert.map(&:to_bulk_insert_hash)) if blobs_to_insert.any?
@insert_stats.blob_entries_inserted += blobs_to_insert.size
@insert_stats.bytes_length += blobs_to_insert.map(&:contents).map(&:size).sum
@insert_stats.bytes_stored += blobs_to_insert.map(&:bytes_stored).sum
BlobEntry.insert_all!(blob_entries_to_insert.map(&:to_bulk_insert_hash)) if blob_entries_to_insert.any?
@insert_stats.blob_entries_inserted += blob_entries_to_insert.size
@insert_stats.bytes_length += blob_entries_to_insert.map(&:contents).map(&:size).sum
@insert_stats.bytes_stored += blob_entries_to_insert.map(&:bytes_stored).sum
@timings.finish :insert_new_bes
@timings.start :insert_new_headers
http_models = ReduxApplicationRecord.transaction do
legacy_models_with_response_sha256.map do |pair|
legacy_model, response_sha256 = pair
request_headers = ::HttpLogEntryHeader.find_or_create(headers: legacy_model.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: legacy_model.res_headers)
# request_headers = EMPTY_HEADER
# response_headers = EMPTY_HEADER
build_http_model(legacy_model, request_headers, response_headers, response_sha256)
end
end
@timings.finish :insert_new_headers
@timings.start :insert_new_https
::HttpLogEntry.insert_all!(http_models.map(&:to_bulk_insert_hash)) if http_models.any?
@insert_stats.http_entries_inserted += http_models.size
@timings.finish :insert_new_https
legacy_model_id_to_response_sha256
end
def build_http_model(legacy_model, request_headers, response_headers, response_sha256)
log_entry = ::HttpLogEntry.new(
def bulk_import_headers(legacy_models)
@timings.start :build_new_headers
header_sha256_to_header_model = {}
legacy_model_id_to_header_sha256s =
legacy_models.map do |legacy_model|
request_headers = ::HttpLogEntryHeader.build_record(headers: legacy_model.req_headers)
response_headers = ::HttpLogEntryHeader.build_record(headers: legacy_model.res_headers)
req_sha256 = request_headers.sha256
res_sha256 = response_headers.sha256
header_sha256_to_header_model[req_sha256] = request_headers
header_sha256_to_header_model[res_sha256] = response_headers
[legacy_model.id, { req_sha256: req_sha256, res_sha256: res_sha256 }]
end.to_h
@timings.finish :build_new_headers
# excluding existing headers, and bulk-insert the new headers
@timings.start :insert_new_headers
header_sha256_to_header_id = ::HttpLogEntryHeader.where(sha256: header_sha256_to_header_model.keys).map do |model|
[model.sha256, model.id]
end.to_h
headers_to_insert = header_sha256_to_header_model.map do |sha256, header_model|
next nil if header_sha256_to_header_id[sha256]
header_model.valid? || raise
header_model
end.reject(&:nil?).uniq do |header_model|
header_model.sha256
end
::HttpLogEntryHeader.insert_all!(
headers_to_insert.map(&:to_bulk_insert_hash),
returning: [:id, :sha256],
).rows.each do |row|
id, sha256 = row
# rails does not deserialize the returned sha256 - we have to do that ourselves
# postgres prefixes hex-encoded binaries with "\x", must strip that first
raise("invariant") unless sha256[0..1] == "\\x"
sha256 = ::HexUtil.hex2bin(sha256[2..])
header_sha256_to_header_id[sha256] = id
end if headers_to_insert.any?
@insert_stats.header_entries_inserted += headers_to_insert.size
@timings.finish :insert_new_headers
[legacy_model_id_to_header_sha256s, header_sha256_to_header_id]
end
def build_http_log_entry(legacy_model, request_headers_id, response_headers_id, response_sha256)
model = ::HttpLogEntry.new(
id: legacy_model.id,
uri_scheme: legacy_model.scheme,
uri_host: legacy_model.host,
@@ -231,13 +277,15 @@ class LegacyImport::HttpLogEntryBulkImporter
content_type: legacy_model.content_type,
status_code: legacy_model.status,
response_time_ms: legacy_model.response_time,
request_headers: request_headers,
response_headers: response_headers,
request_headers_id: request_headers_id,
response_headers_id: response_headers_id,
response_sha256: response_sha256,
requested_at: legacy_model.requested_at,
created_at: legacy_model.created_at,
updated_at: legacy_model.updated_at,
)
model.valid? || raise
model
end
def dump_cache_stats
@@ -312,21 +360,24 @@ class LegacyImport::HttpLogEntryBulkImporter
InsertStats = Struct.new(
:http_entries_inserted,
:blob_entries_inserted,
:header_entries_inserted,
:bytes_stored,
:bytes_length,
) do
def initialize
self.http_entries_inserted = 0
self.blob_entries_inserted = 0
self.header_entries_inserted = 0
self.bytes_stored = 0
self.bytes_length = 0
end
def to_s
ratio = self.bytes_stored.to_f / self.bytes_length
"+#{self.http_entries_inserted} reqs, +#{self.blob_entries_inserted} blobs - " +
"#{self.class.humansize(self.bytes_stored)}/#{self.class.humansize(self.bytes_length)} " +
"(#{ratio.round(2)})"
[
"+#{self.http_entries_inserted} requests, +#{self.blob_entries_inserted} blobs, +#{self.header_entries_inserted} headers",
"size ratio: #{ratio.round(2)} - #{self.class.humansize(self.bytes_stored)}/#{self.class.humansize(self.bytes_length)}",
].join("\n")
end
def self.humansize(size)

View File

@@ -7,10 +7,13 @@ class BlobEntry < ReduxApplicationRecord
class_name: "::BlobEntry"
validates_presence_of(
:sha256,
:content_type,
:contents,
:size
)
validates :sha256, length: { is: 32 }
validates :base_sha256, length: { is: 32 }, if: :base_sha256
def to_bulk_insert_hash
{
@@ -35,7 +38,7 @@ class BlobEntry < ReduxApplicationRecord
self.read_attribute(:contents).size
end
def self.build_entry(content_type:, sha256:, contents:, candidates: [])
def self.build_record(content_type:, sha256:, contents:, candidates: [])
record = BlobEntry.new(sha256: sha256, content_type: content_type, size: contents.size)
smallest_patch_size = nil

View File

@@ -9,6 +9,8 @@ class HttpLogEntry < ReduxApplicationRecord
belongs_to :response_headers,
class_name: "::HttpLogEntryHeader"
validates :response_sha256, length: { is: 32 }
validates_inclusion_of(:verb, in: ::HttpLogEntry.verbs.keys)
validates_presence_of(
:uri_scheme,

View File

@@ -1,7 +1,8 @@
class HttpLogEntryHeader < ReduxApplicationRecord
validates_presence_of(:sha256)
validates :sha256, length: { is: 32 }
def self.find_or_create(headers:)
def self.build_record(headers:)
raise("must be a hash") unless headers.is_a?(Hash)
headers = headers.dup
@@ -10,9 +11,16 @@ class HttpLogEntryHeader < ReduxApplicationRecord
headers.delete("cf-ray")
headers = headers.sort.to_h
sha256 = Digest::SHA256.digest(headers.to_s)
HttpLogEntryHeader.new(
sha256: sha256,
headers: headers,
)
end
HttpLogEntryHeader.find_or_create_by!(sha256: sha256) do |c|
c.headers = headers
end
def to_bulk_insert_hash
{
sha256: sha256,
headers: headers,
}
end
end