create flat sst entries

This commit is contained in:
2023-02-04 04:40:48 +00:00
parent c5c77b885b
commit 187e840571
12 changed files with 191 additions and 77 deletions

109
Rakefile
View File

@@ -10,39 +10,102 @@ task :log => :environment do
ActiveRecord::Base.logger = Logger.new(STDOUT)
end
task :ractor => [:environment] do
require "http_log_entry"
require "xdiff"
r = Ractor.new {
XDiff.diff("foo", "bar")
}
r.take
task :write_log_store_sse_entry_parent_keys => [:environment] do |t, args|
batch_size = 1000
query = ::LogStoreSstEntry.where(base_key: nil)
progress = 0
query.find_in_batches(batch_size: batch_size) do |sst_entries|
progress += sst_entries.size
ssts_to_upsert = sst_entries.reject do |sst_entry|
sst_entry.parent_key.nil?
end.map do |sst_entry|
{
key: sst_entry.key,
base_key: sst_entry.parent_key,
value: sst_entry.value,
}
end
::LogStoreSstEntry.upsert_all(
ssts_to_upsert,
returning: false,
update_only: [:base_key],
) if ssts_to_upsert.any?
puts "finish batch, last id #{sst_entries.last&.hex_key} - #{progress}"
end
end
# Find the last HttpLogEntry that was migrated
task :find_last_inserted => :environment do
last_found = nil
Legacy::HttpLogEntry.order(id: :asc).find_each do |lhle|
contents = lhle.response_body
next if contents.nil?
task :write_log_store_sse_contents, [:start_at] => [:environment] do |t, args|
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i || 100
start_at = args[:start_at]&.to_i || ENV["start_at"]
start_at = HexUtil.hex2bin(start_at) if start_at
use_upsert = false
model = HttpLogEntry.find_by(id: lhle.id)
if model.nil?
puts "last found id: #{last_found&.id || "(none)"}"
break
start_time = Time.now
query = ::LogStoreSstEntry.where(contents: nil).includes(:base)
progress = 0
query.find_in_batches(batch_size: batch_size, start: start_at) do |sst_entries|
if use_upsert
ssts_to_update = sst_entries.map do |sst_entry|
{
key: sst_entry.key,
contents: sst_entry.patched_value,
base_key: sst_entry.base_key,
value: sst_entry.value,
}
end
::LogStoreSstEntry.upsert_all(ssts_to_update) if ssts_to_update.any?
else
::LogStoreSstEntry.transaction do
sst_entries.each do |sst_entry|
sst_entry.update_columns(contents: sst_entry.patched_value)
end
end
end
last_found = model
progress += sst_entries.size
rate = progress.to_f / (Time.now - start_time)
puts "finish batch, last id #{sst_entries.last&.hex_key} - #{progress} - #{rate.round(1)} / second"
end
end
task :write_flat_sse_entries => [:environment] do
batch_size = ENV["batch_size"]&.to_i || 100
start_at = ENV["start_at"]
if start_at == "last"
start_at = ::FlatSstEntry.last&.key
else
start_at = HexUtil.hex2bin(start_at) if start_at
end
query = ::LogStoreSstEntry.includes(:base)
total = ::LogStoreSstEntry.count - ::FlatSstEntry.count
progress = 0
start_time = Time.now
query.find_in_batches(batch_size: batch_size, start: start_at) do |sst_entries|
to_upsert = sst_entries.map do |sst_entry|
{
key: sst_entry.key,
contents: sst_entry.patched_value,
}
end
ForkFuture.each_slice(10, to_upsert) do |slice|
::FlatSstEntry.upsert_all(slice) if slice.any?
end
progress += sst_entries.size
rate = progress.to_f / (Time.now - start_time)
puts "finish batch, last id #{sst_entries.last&.hex_key} - #{progress} / #{total} - #{(100.0 * progress / total).round(1)}% - #{rate.round(1)} / second"
end
end
task :http_log_entry_bulk_importer, [:batch_size, :cache_size, :start_at, :finish_at] => [:environment] do |t, args|
# last inserted - 179899
# last inserted - 1424758
PROFILE = false
# model id to start inserting from
batch_size = args[:batch_size]&.to_i || 100
cache_size = args[:cache_size]&.to_i || 16
start_at = args[:start_at]&.to_i || 0
finish_at = args[:finish_at]&.to_i || 100
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i || 8192
cache_size = args[:cache_size]&.to_i || ENV["cache_size"]&.to_i || 6
start_at = args[:start_at]&.to_i || ENV["start_at"]&.to_i || 0
finish_at = args[:finish_at]&.to_i || ENV["finish_at"]&.to_i || nil
LegacyImport::HttpLogEntryBulkImporter.
new(batch_size, cache_size, start_at, finish_at).

View File

@@ -21,17 +21,22 @@ class ForkFuture
end
def self.parallel_map(num_processes, enumerator, &block)
size = enumerator.size
slice_size = (size.to_f / num_processes).ceil
slice_size = [1, slice_size].max
raise if (slice_size * num_processes) < size
enumerator.each_slice(slice_size).map do |slice|
ForkFuture.each_slice(num_processes, enumerator).map do |slice|
ForkFuture.new do
slice.map(&block)
end
end.to_a.map(&:join).flatten(1)
end
def self.parallel_each(num_processes, enumerator, &block)
ForkFuture.each_slice(num_processes, enumerator).each do |slice|
ForkFuture.new do
slice.each(&block)
nil
end
end.to_a.map(&:join)
end
def join
wait!
r = @result[:result]
@@ -46,6 +51,14 @@ class ForkFuture
private
def self.each_slice(num_processes, enumerator)
size = enumerator.size
slice_size = (size.to_f / num_processes).ceil
slice_size = [1, slice_size].max
raise if (slice_size * num_processes) < size
enumerator.each_slice(slice_size)
end
def wait!
@result ||= begin
result_buffer = @read.read

View File

@@ -8,7 +8,7 @@ class LegacyImport::HttpLogEntryBulkImporter
@cache_size = cache_size
@start_id = start_id
@end_id = end_id
@fork_amount = 8
@fork_amount = 10
@insert_stats = InsertStats.new
@timings = Timings.new
@@ -30,39 +30,25 @@ class LegacyImport::HttpLogEntryBulkImporter
last_model_id = nil
stats_printer = Thread.new do
Thread.current.name = "stats-printer"
last_num_inserted = 0
last_time = Time.now
rates = []
i = 0
loop do
sleep 3
now = Time.now
duration = now - last_time
last_time = now
rate = (@insert_stats.http_entries_inserted - last_num_inserted) / duration
last_num_inserted = @insert_stats.http_entries_inserted
# rolling average over last 10 seconds
rates.unshift(rate)
while rates.size > 5
rates.pop
end
rate = rates.sum / rates.size
duration = Time.now - start_at
rate = @insert_stats.http_entries_inserted / duration
hr
puts "Inserted #{@insert_stats} - " +
puts "insert stats: #{@insert_stats} - " +
"#{rate.round(2)}/sec (last id: #{last_model_id})"
dump_timings
hr
i += 1
if i % 5 == 0
hr
dump_timings
end
end
end
query = Legacy::HttpLogEntry.
order(id: :asc).
includes(:parent_log_entry, :blob_entry)
includes({ :parent_log_entry => :blob_entry }, :blob_entry)
@timings.start :bulk_load
query.find_in_batches(
@@ -135,7 +121,13 @@ class LegacyImport::HttpLogEntryBulkImporter
@timings.start :reject_empty_legacy
legacy_models = ForkFuture.parallel_map(@fork_amount, legacy_models) do |legacy_model|
next nil if already_exist_ids.include?(legacy_model.id)
next nil if legacy_model.response_body.nil?
begin
next nil if legacy_model.response_body.nil?
rescue
puts "legacy model #{legacy_model.id} (#{legacy_model.full_path}): error reading response body"
next nil
end
# legacy model now has response body loaded
legacy_model
end
@@ -234,7 +226,7 @@ class LegacyImport::HttpLogEntryBulkImporter
end
end
blob_entry.valid? || raise("invalid blob entry")
blob_entry.valid? || raise("invalid blob entry (legacy model id #{legacy_model.id}): #{blob_entry.errors.full_messages}")
cache.send(cache_op[1], *cache_op[2..]) if cache_op
[blob_entry, cache_op]
end.reject(&:nil?).map do |pair|

View File

@@ -9,9 +9,9 @@ class BlobEntry < ReduxApplicationRecord
validates_presence_of(
:sha256,
:content_type,
:contents,
:size
)
validates :contents, length: { minimum: 0, allow_nil: false, message: "can't be nil" }
validates :sha256, length: { is: 32 }
validates :base_sha256, length: { is: 32 }, if: :base_sha256

View File

@@ -0,0 +1,9 @@
class FlatSstEntry < ReduxApplicationRecord
self.primary_key = :key
# key - bytea
# contents - bytea
def self.find_by_hex_key(hex_key)
find_by(key: HexUtil.hex2bin(hex_key))
end
end

View File

@@ -362,6 +362,7 @@ class Legacy::HttpLogEntry < LegacyApplicationRecord
# new:
@response_body_native ||=
::FlatSstEntry.find_by_hex_key(resp_body)&.contents ||
::LogStoreSstEntry.find_by_hex_key(resp_body).patched_value
end

View File

@@ -2,17 +2,36 @@ require "xdiff"
require "digest"
class LogStoreSstEntry < ReduxApplicationRecord
self.primary_key = :key
# columns:
# key - bytea
# base_key - bytea (optional)
# value - bytea
# contents - bytea
belongs_to :base,
foreign_key: :base_key,
primary_key: :key,
class_name: "::LogStoreSstEntry",
optional: true
def hex_key
self.class.bin2hex(self.key)
end
def patched_value
if contents
contents_digest = Digest::SHA256.digest(contents)
if contents_digest != key
raise RuntimeError("digest mismatch: #{HexUtil.bin2hex(contents_digest)} != #{hex_key}")
end
return contents
end
@patched_value ||= begin
if has_parent?
if base
# format is:
# 0..4 - version
# 4..8 - flags
@@ -20,8 +39,7 @@ class LogStoreSstEntry < ReduxApplicationRecord
# 12..44 - key of the parent
# 44..rest - patch to apply to parent
patch_value = self.value[44..]
parent_entry = self.parent_entry
parent_value = parent_entry.patched_value
parent_value = base.patched_value
patched_value = XDiff.patch(parent_value, patch_value)
if patched_value.length != value_length
raise RuntimeError.new("length mismatch: #{patched_value.length} != #{value_length}")
@@ -34,9 +52,9 @@ class LogStoreSstEntry < ReduxApplicationRecord
patched_value = self.value[8..]
end
value_digest = Digest::SHA256.hexdigest(patched_value).upcase
if value_digest != hex_key
raise RuntimeError("digest mismatch: #{value_digest} != #{hex_key}")
value_digest = Digest::SHA256.digest(patched_value)
if value_digest != key
raise RuntimeError("digest mismatch: #{HexUtil.bin2hex(value_digest)} != #{hex_key}")
end
patched_value
@@ -44,15 +62,7 @@ class LogStoreSstEntry < ReduxApplicationRecord
end
def parent_entry
@parent_entry ||= if has_parent?
entry = LogStoreSstEntry.find_by(key: parent_key)
if !entry
raise ActiveRecord::RecordNotFound.new("parent with #{self.class.bin2hex(parent_key)} not found")
end
entry
else
nil
end
base
end
def has_parent?

View File

@@ -0,0 +1,7 @@
class CreateParentKeyOnSstEntries < ActiveRecord::Migration[7.0]
def change
change_table :log_store_sst_entries do |t|
t.binary :base_key
end
end
end

View File

@@ -0,0 +1,7 @@
class CreateFullValueOnSstEntries < ActiveRecord::Migration[7.0]
def change
change_table :log_store_sst_entries do |t|
t.binary :contents
end
end
end

View File

@@ -0,0 +1,9 @@
class CreateFlatSstEntries < ActiveRecord::Migration[7.0]
def change
create_table :flat_sst_entries, id: false, primary_key: :key do |t|
t.binary :key, null: false
t.binary :contents, null: false
t.index :key, unique: true
end
end
end

15
db/schema.rb generated
View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2023_02_03_203205) do
ActiveRecord::Schema[7.0].define(version: 2023_02_04_023258) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_stat_statements"
enable_extension "plpgsql"
@@ -26,6 +26,12 @@ ActiveRecord::Schema[7.0].define(version: 2023_02_03_203205) do
t.index ["sha256"], name: "index_blob_entries_on_sha256", unique: true
end
create_table "flat_sst_entries", id: false, force: :cascade do |t|
t.binary "key", null: false
t.binary "contents", null: false
t.index ["key"], name: "index_flat_sst_entries_on_key", unique: true
end
create_table "http_log_entries", force: :cascade do |t|
t.string "uri_scheme", null: false
t.string "uri_host", null: false
@@ -57,11 +63,8 @@ ActiveRecord::Schema[7.0].define(version: 2023_02_03_203205) do
create_table "log_store_sst_entries", id: false, force: :cascade do |t|
t.binary "key", null: false
t.binary "value", null: false
t.binary "base_key"
t.binary "contents"
t.index ["key"], name: "index_log_store_sst_entries_on_key", unique: true
end
add_foreign_key "blob_entries", "blob_entries", column: "base_sha256", primary_key: "sha256"
add_foreign_key "http_log_entries", "blob_entries", column: "response_sha256", primary_key: "sha256"
add_foreign_key "http_log_entries", "http_log_entry_headers", column: "request_headers_id"
add_foreign_key "http_log_entries", "http_log_entry_headers", column: "response_headers_id"
end