remove sst code
This commit is contained in:
@@ -1,106 +0,0 @@
|
||||
# typed: false
|
||||
require "xdiff"
|
||||
require "digest"
|
||||
|
||||
class LogStoreSstEntry < ReduxApplicationRecord
|
||||
self.primary_key = :key
|
||||
|
||||
# columns:
|
||||
# key - bytea
|
||||
# base_key - bytea (optional)
|
||||
# value - bytea
|
||||
# contents - bytea
|
||||
|
||||
belongs_to :base,
|
||||
foreign_key: :base_key,
|
||||
primary_key: :key,
|
||||
class_name: "::LogStoreSstEntry",
|
||||
optional: true
|
||||
|
||||
def hex_key
|
||||
self.class.bin2hex(self.key)
|
||||
end
|
||||
|
||||
def patched_value
|
||||
if contents
|
||||
contents_digest = Digest::SHA256.digest(contents)
|
||||
if contents_digest != key
|
||||
raise RuntimeError(
|
||||
"digest mismatch: #{HexUtil.bin2hex(contents_digest)} != #{hex_key}",
|
||||
)
|
||||
end
|
||||
|
||||
return contents
|
||||
end
|
||||
|
||||
@patched_value ||=
|
||||
begin
|
||||
if base
|
||||
# format is:
|
||||
# 0..4 - version
|
||||
# 4..8 - flags
|
||||
# 8..12 - value length after patching
|
||||
# 12..44 - key of the parent
|
||||
# 44..rest - patch to apply to parent
|
||||
patch_value = self.value[44..]
|
||||
parent_value = base.patched_value
|
||||
patched_value = XDiff.patch(parent_value, patch_value)
|
||||
if patched_value.length != value_length
|
||||
raise RuntimeError.new(
|
||||
"length mismatch: #{patched_value.length} != #{value_length}",
|
||||
)
|
||||
end
|
||||
else
|
||||
# format is:
|
||||
# 0..4 - version
|
||||
# 4..8 - flags
|
||||
# 8..rest - complete value
|
||||
patched_value = self.value[8..]
|
||||
end
|
||||
|
||||
value_digest = Digest::SHA256.digest(patched_value)
|
||||
if value_digest != key
|
||||
raise RuntimeError(
|
||||
"digest mismatch: #{HexUtil.bin2hex(value_digest)} != #{hex_key}",
|
||||
)
|
||||
end
|
||||
|
||||
patched_value
|
||||
end
|
||||
end
|
||||
|
||||
def parent_entry
|
||||
base
|
||||
end
|
||||
|
||||
def has_parent?
|
||||
@has_parent ||= (self.value_flags & 0x01) != 0
|
||||
end
|
||||
|
||||
def value_version
|
||||
self.value[0...4].unpack("L>").first
|
||||
end
|
||||
|
||||
def value_flags
|
||||
self.value[4...8].unpack("L>").first
|
||||
end
|
||||
|
||||
def value_length
|
||||
self.value[8...12].unpack("L>").first
|
||||
end
|
||||
|
||||
def parent_key
|
||||
has_parent? ? self.value[12...44] : nil
|
||||
end
|
||||
|
||||
def self.find_by_hex_key(hex_key)
|
||||
self.find_by(key: self.hex2bin(hex_key))
|
||||
end
|
||||
|
||||
def self.hex2bin(str)
|
||||
[str].pack("H*")
|
||||
end
|
||||
def self.bin2hex(bin)
|
||||
bin.unpack("H*").first.upcase
|
||||
end
|
||||
end
|
||||
112
rake/sst.rake
112
rake/sst.rake
@@ -1,112 +0,0 @@
|
||||
namespace :sst do
|
||||
desc "Write `entry.parent_key` (hex encoded) into `entry.base_key` (binary encoded, can be joined on)"
|
||||
task :write_parent_key_into_base_key => [:environment] do |t, args|
|
||||
batch_size = 1000
|
||||
query = ::LogStoreSstEntry.where(base_key: nil)
|
||||
progress = 0
|
||||
|
||||
query.find_in_batches(batch_size: batch_size) do |sst_entries|
|
||||
progress += sst_entries.size
|
||||
ssts_to_upsert = sst_entries.reject do |sst_entry|
|
||||
sst_entry.parent_key.nil?
|
||||
end.map do |sst_entry|
|
||||
{
|
||||
key: sst_entry.key,
|
||||
base_key: sst_entry.parent_key,
|
||||
value: sst_entry.value,
|
||||
}
|
||||
end
|
||||
::LogStoreSstEntry.upsert_all(
|
||||
ssts_to_upsert,
|
||||
returning: false,
|
||||
update_only: [:base_key],
|
||||
) if ssts_to_upsert.any?
|
||||
puts "finish batch, last id #{sst_entries.last&.hex_key} - #{progress}"
|
||||
end
|
||||
end
|
||||
|
||||
desc "Write a FlatSstEntry for each LogStoreSstEntry"
|
||||
task :write_flat_entries => [:environment] do
|
||||
parallelism = ENV["parallelism"]&.to_i || 4
|
||||
batch_size = ENV["batch_size"]&.to_i || 1000
|
||||
start_at = ENV["start_at"]
|
||||
if start_at == "last"
|
||||
start_at = ::FlatSstEntry.last&.key
|
||||
else
|
||||
start_at = HexUtil.hex2bin(start_at) if start_at
|
||||
end
|
||||
|
||||
query = ::LogStoreSstEntry.includes(:base).order(key: :asc)
|
||||
total = ::LogStoreSstEntry.count - ::FlatSstEntry.count
|
||||
progress = 0
|
||||
|
||||
start_time = Time.now
|
||||
query.find_in_batches(batch_size: batch_size, start: start_at) do |sst_entries|
|
||||
to_upsert = sst_entries.reject do |sst_entry|
|
||||
sst_entry.key == "1:cache_contents"
|
||||
end.map do |sst_entry|
|
||||
{
|
||||
key: sst_entry.key,
|
||||
contents: sst_entry.patched_value,
|
||||
}
|
||||
end
|
||||
ForkFuture.parallel_each_slice(parallelism, to_upsert) do |slice|
|
||||
::FlatSstEntry.insert_all(slice) if slice.any?
|
||||
end
|
||||
progress += sst_entries.size
|
||||
rate = progress.to_f / (Time.now - start_time)
|
||||
puts "finish batch, last id #{sst_entries.last&.hex_key} - #{progress} / #{total} - #{(100.0 * progress / total).round(1)}% - #{rate.round(1)} / second"
|
||||
end
|
||||
end
|
||||
|
||||
desc "Insert a LogStoreSstEntry, one for each line from the rocksdb `sst_dump` tool"
|
||||
task :insert_from_pipe => [:environment] do
|
||||
start = Time.now
|
||||
first_start = start
|
||||
i = 0
|
||||
progress_batch_size = 5000
|
||||
|
||||
to_insert = []
|
||||
|
||||
while line = $stdin.gets
|
||||
next unless line.include?(" => ")
|
||||
|
||||
line = line.chomp
|
||||
key, value = line.split(" => ")
|
||||
key = key.scan(/'(\w+)'/)&.first&.first
|
||||
value = value.strip
|
||||
|
||||
next if key.nil? || key.empty?
|
||||
next if value.empty?
|
||||
|
||||
# puts "key: #{key}"
|
||||
# puts "value: #{value[0..20]}... (#{value.size})"
|
||||
|
||||
to_insert << {
|
||||
key: HexUtil.hex2bin(key),
|
||||
value: HexUtil.hex2bin(value),
|
||||
}
|
||||
i += 1
|
||||
|
||||
if to_insert.size >= progress_batch_size
|
||||
to_insert_len = to_insert.size
|
||||
LogStoreSstEntry.insert_all(to_insert)
|
||||
to_insert = []
|
||||
|
||||
now = Time.now
|
||||
duration = now - start
|
||||
start = now
|
||||
per_sec = to_insert_len / duration
|
||||
puts "Inserted #{i} entries, #{per_sec.round(1)}/sec - #{key}"
|
||||
end
|
||||
end
|
||||
|
||||
if to_insert.count > 0
|
||||
LogStoreSstEntry.insert_all(to_insert)
|
||||
end
|
||||
|
||||
duration = Time.now - first_start
|
||||
per_sec = i / duration
|
||||
puts "Inserted #{i} total entries, took #{duration.round(1)} seconds, #{per_sec.round(1)}/sec"
|
||||
end
|
||||
end
|
||||
1178
sorbet/rbi/dsl/log_store_sst_entry.rbi
generated
1178
sorbet/rbi/dsl/log_store_sst_entry.rbi
generated
File diff suppressed because it is too large
Load Diff
@@ -1,12 +0,0 @@
|
||||
# typed: false
|
||||
require "rails_helper"
|
||||
|
||||
RSpec.describe LogStoreSstEntry, type: :model do
|
||||
describe "version string parsing" do
|
||||
it "correctly parses version and flags" do
|
||||
entry = LogStoreSstEntry.new(value: "\x00\x00\x00\x01\x00\x00\x00\x02")
|
||||
expect(entry.value_version).to eq(1)
|
||||
expect(entry.value_flags).to eq(2)
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user