move http log entry migration task into lib class, bulk import blob entries / http entries

This commit is contained in:
2023-02-03 16:35:51 +00:00
parent 49cd601862
commit b276357244
24 changed files with 615 additions and 472 deletions

3
.gitignore vendored
View File

@@ -34,4 +34,5 @@
# Ignore master key for decrypting credentials and more.
/config/master.key
/profiler/
/profiler/
/flamegraph.svg

View File

@@ -81,5 +81,6 @@ gem "diffy"
gem "rb-bsdiff", path: "../rb-bsdiff"
gem "ruby-prof"
gem "table_print"
# gem "concurrent-ruby-ext", require: "concurrent"
# gem 'cli-ui'

View File

@@ -200,6 +200,7 @@ GEM
mini_portile2 (~> 2.8.0)
stimulus-rails (1.2.1)
railties (>= 6.0.0)
table_print (1.5.7)
thor (1.2.1)
timeout (0.3.1)
turbo-rails (1.3.3)
@@ -245,6 +246,7 @@ DEPENDENCIES
sprockets-rails
sqlite3 (~> 1.4)
stimulus-rails
table_print
turbo-rails
tzinfo-data
web-console
@@ -255,4 +257,4 @@ RUBY VERSION
ruby 3.2.0p0
BUNDLED WITH
2.2.32
2.4.6

356
Rakefile
View File

@@ -1,10 +1,15 @@
# Add your own tasks in files placed in lib/tasks ending in .rake,
# for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.
require "rake/testtask"
require_relative "config/application"
Rails.application.load_tasks
task :log => :environment do
ActiveRecord::Base.logger = Logger.new(STDOUT)
end
task :ractor => [:environment] do
require "http_log_entry"
require "xdiff"
@@ -30,338 +35,18 @@ task :find_last_inserted => :environment do
end
end
task :migrate_legacy_http_entries, [:start_at, :finish_at] => [:environment] do |t, args|
task :http_log_entry_bulk_importer, [:batch_size, :cache_size, :start_at, :finish_at] => [:environment] do |t, args|
# last inserted - 179899
BATCH_SIZE = 50
PROFILE = false
MAX_WORKERS = 4
# model id to start inserting from
START_AT = args[:start_at]&.to_i || 0
FINISH_AT = args[:finish_at]&.to_i || nil
batch_size = args[:batch_size]&.to_i || 100
cache_size = args[:cache_size]&.to_i || 16
start_at = args[:start_at]&.to_i || 0
finish_at = args[:finish_at]&.to_i || 100
puts "migrate_legacy_http_entries: " +
"#{START_AT || "(nil)"} -> #{FINISH_AT || "(nil)"}"
first_start = Time.now
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: MAX_WORKERS - 1,
max_threads: MAX_WORKERS - 1,
max_queue: MAX_WORKERS - 1,
fallback_policy: :caller_runs,
)
RubyProf.start if PROFILE
blob_entry_cache = Concurrent::Map.new do |hash, key|
hash[key] = ConcurrentBlobCache.new
end
dump_cache_stats = -> {
puts "-" * 40
puts "Cache state: "
blob_entry_cache.each_pair do |key, cache|
cache.with_read_lock do |cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
end
end
end
}
# insert_stats & last_model_id are guarded by insert_stats_guard
insert_stats_guard = Concurrent::ReadWriteLock.new
insert_stats = InsertStats.new
last_model_id = nil
stats_printer = Thread.new do
Thread.current.name = "stats-printer"
last_num_inserted = 0
last_time = Time.now
rates = []
loop do
sleep 2
now = Time.now
duration = now - last_time
last_time = now
insert_stats_guard.with_read_lock do
rate = (insert_stats.num_inserted - last_num_inserted) / duration
last_num_inserted = insert_stats.num_inserted
# rolling average over last 10 seconds
rates.unshift(rate)
while rates.size > 5
rates.pop
end
rate = rates.sum / rates.size
puts "#{Thread.current.name} - " +
"Inserted #{insert_stats} - " +
"#{rate.round(2)}/sec (last id: #{last_model_id})"
end
end
end
worker_exception = nil
Legacy::HttpLogEntry.order(id: :asc).find_in_batches(
batch_size: BATCH_SIZE,
start: START_AT,
finish: FINISH_AT,
) do |lhles|
pool.post do
# puts "#{Thread.current.name} - post batch of #{lhles.size}"
sub_insert_stats = import_legacy_http_log_entries(lhles, blob_entry_cache)
# puts "#{Thread.current.name} - finish batch: #{sub_insert_stats}"
insert_stats_guard.with_write_lock do
insert_stats.add!(sub_insert_stats)
last_model_id = lhles.last.id
end
rescue
puts "#{Thread.current.name} - exception processing batch: #{$!}"
worker_exception = $!
end
raise worker_exception if worker_exception
end
puts "Waiting for worker threads to finish..."
pool.shutdown
pool.wait_for_termination
stats_printer.kill
stats_printer.join
raise worker_exception if worker_exception
insert_stats_guard.with_read_lock do
bytes_stored = insert_stats.bytes_stored
bytes_length = insert_stats.bytes_length
ratio = bytes_stored.to_f / bytes_length
duration = Time.now - first_start
rate = insert_stats.num_inserted / duration
puts "-" * 40
puts "Total content stored: #{humansize(bytes_stored)}"
puts "Total content length: #{humansize(bytes_length)}"
puts "Size ratio: #{ratio.round(2)}"
puts "Total inserted: #{insert_stats.num_inserted}"
puts "Total duration: #{duration.round(0)} seconds (#{rate.round(2)}/second)"
end
if PROFILE
Dir.mkdir("profiler") unless File.exist?("profiler")
result = RubyProf.stop
File.open("profiler/migrate_legacy_http_entries.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("profiler/migrate_legacy_http_entries.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
end
end
class ConcurrentBlobCache
def initialize
@guard = Concurrent::ReadWriteLock.new
@cache = BlobCache.new
end
def with_read_lock(&block)
@guard.with_read_lock do
block.call(@cache)
end
end
def with_write_lock(&block)
@guard.with_write_lock do
block.call(@cache)
end
end
end
def humansize(size)
units = %w[B KiB MiB GiB TiB Pib EiB ZiB]
return "0.0 B" if size == 0
exp = (Math.log(size) / Math.log(1024)).to_i
exp += 1 if (size.to_f / 1024 ** exp >= 1024 - 0.05)
exp = units.size - 1 if exp > units.size - 1
"%.1f %s" % [size.to_f / 1024 ** exp, units[exp]]
end
InsertStats = Struct.new(
:num_inserted,
:bytes_stored,
:bytes_length,
) do
def initialize
self.num_inserted = 0
self.bytes_stored = 0
self.bytes_length = 0
end
def add!(other)
self.num_inserted += other.num_inserted
self.bytes_stored += other.bytes_stored
self.bytes_length += other.bytes_length
end
def to_s
ratio = self.bytes_stored.to_f / self.bytes_length
"+#{self.num_inserted}, " +
"#{humansize(self.bytes_stored)}/#{humansize(self.bytes_length)} " +
"(#{ratio.round(2)})"
end
end
def import_legacy_http_log_entries(lhles, blob_entry_cache)
insert_stats = InsertStats.new
lhles.each do |lhle|
retries = 0
sub_insert_stats = nil
begin
sub_insert_stats = import_legacy_http_log_entry(lhle, blob_entry_cache)
rescue
if retries < 2
retries += 1
# add some random delay to avoid conflicting with other threads
sleep rand(0.0..0.1)
retry
else
raise
end
end
insert_stats.add!(sub_insert_stats)
end
insert_stats
end
def import_legacy_http_log_entry(lhle, blob_entry_cache)
content_type = lhle.content_type
cache_key = "#{content_type}|#{lhle.host}"
blob_cache = blob_entry_cache[cache_key]
contents = lhle.response_body
if contents.nil?
return InsertStats.new
end
if ::HttpLogEntry.where(id: lhle.id).select(:id).first
return InsertStats.new
end
# 2% chance we don't supply any candidates to new blob creation, so the
# candidate pool doesn't get stale.
candidates = blob_cache.with_read_lock do |cache|
if cache.at_capacity? && rand(0..100) >= 5
cache.candidates
else
[]
end
end
blob_entry = ::BlobEntry.build_entry(
content_type: content_type,
contents: contents,
candidates: candidates,
)
blob_entry.save!
blob_cache.with_write_lock do |cache|
if blob_entry.base
cache.reward(blob_entry.base)
else
cache.insert(blob_entry, extra: lhle.full_path)
end
end
request_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.res_headers)
log_entry = ::HttpLogEntry.new(
id: lhle.id,
scheme: lhle.scheme,
host: lhle.host,
path: lhle.path,
query: lhle.query,
verb: lhle.verb,
content_type: content_type,
status_code: lhle.status,
response_time_ms: lhle.response_time,
request_headers: request_headers,
response_headers: response_headers,
blob_entry: blob_entry,
requested_at: lhle.requested_at,
created_at: lhle.created_at,
updated_at: lhle.updated_at,
)
log_entry.save!
insert_stats = InsertStats.new()
insert_stats.num_inserted = 1
insert_stats.bytes_length = blob_entry.contents.size
insert_stats.bytes_stored = blob_entry.contents_stored_size
insert_stats
end
class BlobCache
COUNT = 0
OBJ = 1
EXTRA = 2
def initialize(max_size = 32)
@max_size = max_size
@candidates = []
end
def at_capacity?
@candidates.count == @max_size
end
def candidates
@candidates.map { |c| c[OBJ] }.to_a
end
def reward(candidate)
@candidates.each do |entry|
if entry[OBJ].id == candidate.id
entry[COUNT] += 1.0
else
entry[COUNT] -= 0.1
end
end
sort!
end
def insert(candidate, extra: nil)
new_entry = [0.0, candidate, extra]
idx = @candidates.bsearch_index { |entry| entry[COUNT] <= 0 }
if idx == nil
@candidates.push(new_entry)
else
@candidates.insert(idx, new_entry)
end
while @candidates.size > @max_size
@candidates.pop
end
end
def to_s
@candidates.map do |entry|
" - #{entry[COUNT].round(1)} score, id #{entry[OBJ].id} (#{entry[EXTRA]})"
end.join("\n")
end
private
def sort!
@candidates.sort_by! { |entry| -entry[COUNT] }
end
LegacyImport::HttpLogEntryBulkImporter.
new(batch_size, cache_size, start_at, finish_at).
run
end
task :insert_sst_entry => [:environment] do
@@ -387,8 +72,8 @@ task :insert_sst_entry => [:environment] do
# puts "value: #{value[0..20]}... (#{value.size})"
to_insert << {
key: LogStoreSstEntry.hex2bin(key),
value: LogStoreSstEntry.hex2bin(value),
key: HexUtil.hex2bin(key),
value: HexUtil.hex2bin(value),
}
i += 1
@@ -413,3 +98,14 @@ task :insert_sst_entry => [:environment] do
per_sec = i / duration
puts "Inserted #{i} total entries, took #{duration.round(1)} seconds, #{per_sec.round(1)}/sec"
end
namespace :test do
desc "Test lib source"
Rake::TestTask.new(:lib) do |t|
t.libs << "test"
t.pattern = "test/lib/**/*_test.rb"
t.verbose = true
end
end
Rake::Task[:test].enhance { Rake::Task["test:lib"].invoke }

8
app/lib/hex_util.rb Normal file
View File

@@ -0,0 +1,8 @@
class HexUtil
def self.hex2bin(str)
[str].pack("H*")
end
def self.bin2hex(bin)
bin.unpack("H*").first.upcase
end
end

2
app/lib/legacy_import.rb Normal file
View File

@@ -0,0 +1,2 @@
module LegacyImport
end

View File

@@ -0,0 +1,64 @@
class LegacyImport::AdaptiveCache
Entry = Struct.new(:score, :id, :obj, :extra)
def initialize(max_size = 32, reward = 1.0, punish = 0.1)
@max_size = max_size
@candidates = []
@reward = reward
@punish = punish
end
def at_capacity?
@candidates.count == @max_size
end
def candidates
@candidates.map { |c| c.obj }
end
def scores
@candidates.map { |c| c.score }
end
def reward(candidate_id)
@candidates.each do |entry|
if entry.id == candidate_id
entry.score += @reward
else
entry.score -= @punish
end
end
sort!
end
def contains?(candidate_id)
!!@candidates.find { |entry| entry.id == candidate_id }
end
def insert(id, candidate, extra = nil)
new_entry = Entry.new(0.0, id, candidate, extra)
idx = @candidates.bsearch_index { |entry| entry.score <= 0 }
if idx == nil
@candidates.push(new_entry)
else
@candidates.insert(idx, new_entry)
end
while @candidates.size > @max_size
@candidates.pop
end
end
def to_s
@candidates.map do |entry|
" - #{entry.score.round(1)} score, id #{entry.id} - #{entry.extra}"
end.join("\n")
end
private
def sort!
@candidates.sort_by! { |entry| -entry.score }
end
end

View File

@@ -0,0 +1,341 @@
require "set"
class LegacyImport::HttpLogEntryBulkImporter
PROFILE = false
def initialize(batch_size, cache_size, start_id, end_id)
@batch_size = batch_size
@cache_size = cache_size
@start_id = start_id
@end_id = end_id
@insert_stats = InsertStats.new
@timings = Timings.new
# key is content_type|domain
# value is the adaptive cache
@blob_entry_cache = Hash.new do |hash, key|
hash[key] = LegacyImport::AdaptiveCache.new(cache_size, 1.0, 0.1)
end
end
def run
RubyProf.start if PROFILE
puts "HttpLogEntryBulkImporter: " +
"#{@start_id || "(nil)"} -> #{@end_id || "(nil)"}, batch size #{@batch_size}, cache size #{@cache_size}"
start_at = Time.now
last_model_id = nil
stats_printer = Thread.new do
Thread.current.name = "stats-printer"
last_num_inserted = 0
last_time = Time.now
rates = []
loop do
sleep 3
now = Time.now
duration = now - last_time
last_time = now
rate = (@insert_stats.http_entries_inserted - last_num_inserted) / duration
last_num_inserted = @insert_stats.http_entries_inserted
# rolling average over last 10 seconds
rates.unshift(rate)
while rates.size > 5
rates.pop
end
rate = rates.sum / rates.size
hr
puts "Inserted #{@insert_stats} - " +
"#{rate.round(2)}/sec (last id: #{last_model_id})"
tp(@timings.entries.map do |entry|
{
key: entry[:key],
duration: "#{entry[:key_secs].round(1)} sec",
percent: "#{(100 * entry[:proportion]).round(1)}%".rjust(5),
}
end)
hr
end
end
query = Legacy::HttpLogEntry.
order(id: :asc).
includes(:parent_log_entry, :blob_entry)
@timings.start :bulk_load
query.find_in_batches(
batch_size: @batch_size,
start: @start_id,
finish: @end_id,
) do |legacy_models|
@timings.finish :bulk_load
import_legacy_models(legacy_models)
last_model_id = legacy_models.last&.id
@timings.start :bulk_load
end
@timings.finish :bulk_load
stats_printer.kill
duration = Time.now - start_at
bytes_stored = @insert_stats.bytes_stored
bytes_length = @insert_stats.bytes_length
ratio = bytes_stored.to_f / bytes_length
rate = @insert_stats.http_entries_inserted / duration
hr
puts "Last id: #{last_model_id}"
puts "Cache size: #{@cache_size}"
puts "Batch size: #{@batch_size}"
puts "Total content stored: #{InsertStats.humansize(bytes_stored)}"
puts "Total content length: #{InsertStats.humansize(bytes_length)}"
puts "Size ratio: #{ratio.round(2)}"
puts "Total http inserted: #{@insert_stats.http_entries_inserted}"
puts "Total blobs inserted: #{@insert_stats.blob_entries_inserted}"
puts "Total duration: #{duration.round(0)} seconds (#{rate.round(2)}/second)"
hr
# dump_cache_stats
if PROFILE
Dir.mkdir("profiler") unless File.exist?("profiler")
result = RubyProf.stop
File.open("profiler/migrate_legacy_http_entries.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("profiler/migrate_legacy_http_entries.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
end
end
private
EMPTY_HEADER = ::HttpLogEntryHeader.find_or_create(headers: {})
def import_legacy_models(legacy_models)
@timings.start :lookup_existing_http
already_exist_ids = ::HttpLogEntry.
where(id: legacy_models.map(&:id)).
pluck(:id).
to_set
@timings.finish :lookup_existing_http
# ignore the models which have no stored content (for now)
@timings.start :reject_empty_legacy
legacy_models.reject! do |m|
already_exist_ids.include?(m.id) || m.response_body.nil?
end
@timings.finish :reject_empty_legacy
# compute all blob entries for the legacy models, removing duplicates
@timings.start :lookup_existing_bes
legacy_models_and_sha256 = legacy_models.map do |m|
[m, Digest::SHA256.digest(m.response_body)]
end
existing_bes_by_sha256 = ::BlobEntry.where(sha256: legacy_models_and_sha256.map(&:second)).map do |be|
[be.sha256, be]
end.to_h
@timings.finish :lookup_existing_bes
@timings.start :build_new_bes
response_blobs = legacy_models_and_sha256.map do |pair|
legacy_model, sha256 = pair
content_type = legacy_model.content_type
cache_key = "#{legacy_model.host}|#{content_type}"
cache = @blob_entry_cache[cache_key]
# N% chance (if we're not at cache capacity) to not supply any candidates,
# to give new entries in the cache a chance to replace poor performing ones
candidates = if cache.at_capacity? && rand(0..100) >= 5
cache.candidates
else
[]
end
blob = existing_bes_by_sha256[sha256] || ::BlobEntry.build_entry(
content_type: content_type,
sha256: sha256,
contents: legacy_model.response_body,
candidates: candidates,
)
# reward the base if it was used, if not, insert this blob into the
# cache so it'll be a future candidate (unless it's not a new model)
if blob.base_sha256
cache.reward(HexUtil.bin2hex(blob.base_sha256)[0..8])
else
cache.insert(HexUtil.bin2hex(blob.sha256)[0..8], blob, legacy_model.full_path)
end unless blob.persisted?
blob
end
@timings.finish :build_new_bes
legacy_models_with_response_sha256 = legacy_models.zip(response_blobs.map(&:sha256))
# remove all the already created blobs, and duplicates
blobs_without_dups = {}
response_blobs.each do |be|
next if be.persisted?
found = blobs_without_dups[be.sha256]
if found.nil? || found.bytes_stored > be.bytes_stored
# use the smaller of the two if there's an unpersisted duplicate
blobs_without_dups[be.sha256] = be
end
end
# bulk-insert all the new blob entries
@timings.start :insert_new_bes
blobs_to_insert = blobs_without_dups.values
BlobEntry.insert_all!(blobs_to_insert.map(&:to_bulk_insert_hash)) if blobs_to_insert.any?
@insert_stats.blob_entries_inserted += blobs_to_insert.size
@insert_stats.bytes_length += blobs_to_insert.map(&:contents).map(&:size).sum
@insert_stats.bytes_stored += blobs_to_insert.map(&:bytes_stored).sum
@timings.finish :insert_new_bes
@timings.start :insert_new_headers
http_models = ReduxApplicationRecord.transaction do
legacy_models_with_response_sha256.map do |pair|
legacy_model, response_sha256 = pair
request_headers = ::HttpLogEntryHeader.find_or_create(headers: legacy_model.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: legacy_model.res_headers)
# request_headers = EMPTY_HEADER
# response_headers = EMPTY_HEADER
build_http_model(legacy_model, request_headers, response_headers, response_sha256)
end
end
@timings.finish :insert_new_headers
@timings.start :insert_new_https
::HttpLogEntry.insert_all!(http_models.map(&:to_bulk_insert_hash)) if http_models.any?
@insert_stats.http_entries_inserted += http_models.size
@timings.finish :insert_new_https
end
def build_http_model(legacy_model, request_headers, response_headers, response_sha256)
log_entry = ::HttpLogEntry.new(
id: legacy_model.id,
uri_scheme: legacy_model.scheme,
uri_host: legacy_model.host,
uri_path: legacy_model.path,
uri_query: legacy_model.query,
verb: legacy_model.verb,
content_type: legacy_model.content_type,
status_code: legacy_model.status,
response_time_ms: legacy_model.response_time,
request_headers: request_headers,
response_headers: response_headers,
response_sha256: response_sha256,
requested_at: legacy_model.requested_at,
created_at: legacy_model.created_at,
updated_at: legacy_model.updated_at,
)
end
def dump_cache_stats
hr
puts "Cache state: "
@blob_entry_cache.each_pair do |key, cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
end
end
end
def hr
puts "-" * 40
end
Timings = Struct.new(
:keys,
:totals
) do
def initialize
@start_at = Time.now
self.keys = []
self.totals = Hash.new do |h, key|
h[key] = { started: nil, secs: 0.0 }
end
end
def start(key)
self.keys << key unless self.keys.include?(key)
entry = self.totals[key]
raise("#{key} already started") if entry[:started]
entry[:started] = Time.now
end
def finish(key)
entry = self.totals[key]
raise("#{key} does not exist") unless entry
started = entry[:started]
entry[:started] = nil
raise("#{key} not started") unless started
entry[:secs] += Time.now - started
end
def entries
total_secs = Time.now - @start_at
total_measured_secs = self.totals.values.map { |e| e[:secs] }.sum
self.keys.map do |key|
key_secs = self.totals[key][:secs]
{
key: key,
key_secs: key_secs,
proportion: key_secs / total_measured_secs,
}
end.chain([
{
key: :measured_total,
key_secs: total_measured_secs,
proportion: total_measured_secs / total_secs,
},
{
key: :actual_total,
key_secs: total_secs,
proportion: 1.0,
},
])
end
end
InsertStats = Struct.new(
:http_entries_inserted,
:blob_entries_inserted,
:bytes_stored,
:bytes_length,
) do
def initialize
self.http_entries_inserted = 0
self.blob_entries_inserted = 0
self.bytes_stored = 0
self.bytes_length = 0
end
def to_s
ratio = self.bytes_stored.to_f / self.bytes_length
"+#{self.http_entries_inserted} reqs, +#{self.blob_entries_inserted} blobs - " +
"#{self.class.humansize(self.bytes_stored)}/#{self.class.humansize(self.bytes_length)} " +
"(#{ratio.round(2)})"
end
def self.humansize(size)
units = %w[B KiB MiB GiB TiB Pib EiB ZiB]
return "0.0 B" if size == 0
exp = (Math.log(size) / Math.log(1024)).to_i
exp += 1 if (size.to_f / 1024 ** exp >= 1024 - 0.05)
exp = units.size - 1 if exp > units.size - 1
"%.1f %s" % [size.to_f / 1024 ** exp, units[exp]]
end
end
end

View File

@@ -1,10 +1,12 @@
class BlobEntry < ReduxApplicationRecord
self.primary_key = :sha256
belongs_to :base,
optional: true,
foreign_key: :base_sha256,
class_name: "::BlobEntry"
validates_presence_of(
:sha256,
:content_type,
:contents,
:size
@@ -16,7 +18,7 @@ class BlobEntry < ReduxApplicationRecord
content_type: self.read_attribute(:content_type),
contents: self.read_attribute(:contents),
size: self.read_attribute(:size),
base_id: self.read_attribute(:base_id),
base_sha256: self.read_attribute(:base_sha256),
}
end
@@ -29,15 +31,11 @@ class BlobEntry < ReduxApplicationRecord
end
end
def contents_stored_size
def bytes_stored
self.read_attribute(:contents).size
end
def self.build_entry(content_type:, contents:, candidates: [])
sha256 = Digest::SHA256.digest(contents)
existing = BlobEntry.find_by(sha256: sha256, content_type: content_type)
return existing unless existing.nil?
def self.build_entry(content_type:, sha256:, contents:, candidates: [])
record = BlobEntry.new(sha256: sha256, content_type: content_type, size: contents.size)
smallest_patch_size = nil

View File

@@ -1,17 +1,19 @@
class HttpLogEntry < ReduxApplicationRecord
enum verb: %i[get post]
belongs_to :blob_entry, class_name: "::BlobEntry"
belongs_to :response,
foreign_key: :response_sha256,
class_name: "::BlobEntry"
belongs_to :request_headers,
class_name: "::HttpLogEntryHeader"
class_name: "::HttpLogEntryHeader"
belongs_to :response_headers,
class_name: "::HttpLogEntryHeader"
class_name: "::HttpLogEntryHeader"
validates_inclusion_of(:verb, in: ::HttpLogEntry.verbs.keys)
validates_presence_of(
:scheme,
:host,
:path,
:uri_scheme,
:uri_host,
:uri_path,
:status_code,
:response_time_ms,
:content_type,
@@ -21,4 +23,24 @@ class HttpLogEntry < ReduxApplicationRecord
def full_path
"#{scheme}://#{host}#{path}#{query ? "?#{query}" : ""}"
end
def to_bulk_insert_hash
{
id: self.id,
uri_scheme: self.uri_scheme,
uri_host: self.uri_host,
uri_path: self.uri_path,
uri_query: self.uri_query,
verb: self.verb,
content_type: self.content_type,
status_code: self.status_code,
response_time_ms: self.response_time_ms,
request_headers_id: self.request_headers_id,
response_headers_id: self.response_headers_id,
response_sha256: self.response_sha256,
requested_at: self.requested_at,
created_at: self.created_at,
updated_at: self.updated_at,
}
end
end

View File

@@ -1,8 +1,5 @@
class HttpLogEntryHeader < ReduxApplicationRecord
validates_presence_of(
:sha256,
:headers
)
validates_presence_of(:sha256)
def self.find_or_create(headers:)
raise("must be a hash") unless headers.is_a?(Hash)

View File

@@ -320,30 +320,32 @@ class Legacy::HttpLogEntry < LegacyApplicationRecord
# -> string
def response_body
return response_body_native if diff_type == "native"
@response_body ||= begin
return response_body_native if diff_type == "native"
our_string = if blob_entry
path = blob_entry.ensure_file_path
unless path
raise NoBEPathException, "no path for blob entry " \
"#{blob_entry_id} (HLE id: #{id}) (#{blob_entry.file_path})"
our_string = if blob_entry
path = blob_entry.ensure_file_path
unless path
raise NoBEPathException, "no path for blob entry " \
"#{blob_entry_id} (HLE id: #{id}) (#{blob_entry.file_path})"
end
File.read(path)
else
resp_body
end
our_string = self.class.gunzip(our_string) if gzipped
return nil if our_string.nil?
# our_string = our_string.force_encoding("UTF-8")
if parent_log_entry
self.class.apply_patch(parent_log_entry.response_body, our_string, diff_type)
else
our_string
end
File.read(path)
else
resp_body
end
our_string = self.class.gunzip(our_string) if gzipped
return nil if our_string.nil?
# our_string = our_string.force_encoding("UTF-8")
if parent_log_entry
self.class.apply_patch(parent_log_entry.response_body, our_string, diff_type)
else
our_string
end
end
def response_body_native

View File

@@ -11,6 +11,7 @@ module ReduxScraper
# Initialize configuration defaults for originally generated Rails version.
config.load_defaults 7.0
config.active_record.legacy_connection_handling = false
config.autoload_paths << config.root.join("app/lib")
# Configuration for the application, engines, and railties goes here.
#

View File

@@ -13,7 +13,17 @@ redux_prod: &redux_prod
adapter: postgresql
host: scraper-postgres.local
port: 5432
database: scraper_redux
database: redux_prod
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/redux_migrate
pool: 64
redux_test: &redux_test
adapter: postgresql
host: scraper-postgres.local
port: 5432
database: redux_test
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/redux_migrate
@@ -23,9 +33,9 @@ legacy_prod: &legacy_prod
adapter: postgresql
host: scraper-postgres.local
port: 5432
database: site_scraper_prod
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
database: legacy_prod
username: scraper_legacy
password: zL7zDRXycLhLFJLQj5Zh
database_tasks: false
pool: 64
@@ -40,8 +50,7 @@ development:
# Do not set this db to the same as development or production.
test:
redux:
<<: *default
database: db/test.sqlite3
<<: *redux_test
legacy:
<<: *legacy_prod

View File

@@ -1,9 +1,13 @@
class CreateLogStoreSstEntries < ActiveRecord::Migration[7.0]
def change
def up
create_table :log_store_sst_entries, id: false, primary_key: :key do |t|
t.binary :key, null: false
t.binary :value, null: false
t.index :key, unique: true
end
end
def down
raise("don't revert this migration!")
end
end

View File

@@ -1,47 +1,49 @@
class CreateHttpLogEntries < ActiveRecord::Migration[7.0]
def change
enable_extension "hstore"
create_table :http_log_entries do |t|
t.string :scheme
t.string :host
t.string :path
t.string :query
t.integer :verb
t.string :uri_scheme, null: false
t.string :uri_host, null: false
t.string :uri_path, null: false
t.string :uri_query
t.string :uri_hash
t.integer :verb, null: false
# request/response headers
t.references :request_headers
t.references :response_headers
t.references :request_headers, null: false
t.references :response_headers, null: false
t.integer :status_code
t.integer :response_time_ms
t.string :content_type
t.references :blob_entry
t.integer :status_code, null: false
t.integer :response_time_ms, null: false
t.string :content_type, null: false
t.binary :response_sha256, null: false
t.datetime :requested_at
t.timestamps
t.index [:host, :path]
t.index [:content_type]
t.datetime :requested_at, null: false
t.timestamps null: false
end
create_table :http_log_entry_headers do |t|
t.binary :sha256, null: false
t.hstore :headers, null: false
t.jsonb :headers, null: false
t.index :sha256, unique: true
t.timestamps
t.timestamps null: false
end
create_table :blob_entries do |t|
t.binary :sha256
t.references :base, index: false
create_table :blob_entries, id: false, primary_key: :sha256 do |t|
t.binary :sha256, null: false
t.binary :base_sha256
t.string :content_type
t.integer :size
t.binary :contents
t.timestamps
t.string :content_type, null: false
t.integer :size, null: false
t.binary :contents, null: false
t.timestamps null: false
t.index :sha256, unique: true
end
add_foreign_key :http_log_entries, :blob_entries, column: :response_sha256, primary_key: :sha256
add_foreign_key :http_log_entries, :http_log_entry_headers, column: :request_headers_id, primary_key: :id
add_foreign_key :http_log_entries, :http_log_entry_headers, column: :response_headers_id, primary_key: :id
add_foreign_key :blob_entries, :blob_entries, column: :base_sha256, primary_key: :sha256
end
end

48
db/schema.rb generated
View File

@@ -12,46 +12,43 @@
ActiveRecord::Schema[7.0].define(version: 2023_01_31_012417) do
# These are extensions that must be enabled in order to support this database
enable_extension "hstore"
enable_extension "pg_stat_statements"
enable_extension "plpgsql"
create_table "blob_entries", force: :cascade do |t|
t.binary "sha256"
t.bigint "base_id"
t.string "content_type"
t.integer "size"
t.binary "contents"
create_table "blob_entries", id: false, force: :cascade do |t|
t.binary "sha256", null: false
t.binary "base_sha256"
t.string "content_type", null: false
t.integer "size", null: false
t.binary "contents", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["sha256"], name: "index_blob_entries_on_sha256", unique: true
end
create_table "http_log_entries", force: :cascade do |t|
t.string "scheme"
t.string "host"
t.string "path"
t.string "query"
t.integer "verb"
t.bigint "request_headers_id"
t.bigint "response_headers_id"
t.integer "status_code"
t.integer "response_time_ms"
t.string "content_type"
t.bigint "blob_entry_id"
t.datetime "requested_at"
t.string "uri_scheme", null: false
t.string "uri_host", null: false
t.string "uri_path", null: false
t.string "uri_query"
t.string "uri_hash"
t.integer "verb", null: false
t.bigint "request_headers_id", null: false
t.bigint "response_headers_id", null: false
t.integer "status_code", null: false
t.integer "response_time_ms", null: false
t.string "content_type", null: false
t.binary "response_sha256", null: false
t.datetime "requested_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["blob_entry_id"], name: "index_http_log_entries_on_blob_entry_id"
t.index ["content_type"], name: "index_http_log_entries_on_content_type"
t.index ["host", "path"], name: "index_http_log_entries_on_host_and_path"
t.index ["request_headers_id"], name: "index_http_log_entries_on_request_headers_id"
t.index ["response_headers_id"], name: "index_http_log_entries_on_response_headers_id"
end
create_table "http_log_entry_headers", force: :cascade do |t|
t.binary "sha256", null: false
t.hstore "headers", null: false
t.jsonb "headers", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["sha256"], name: "index_http_log_entry_headers_on_sha256", unique: true
@@ -62,4 +59,9 @@ ActiveRecord::Schema[7.0].define(version: 2023_01_31_012417) do
t.binary "value", null: false
t.index ["key"], name: "index_log_store_sst_entries_on_key", unique: true
end
add_foreign_key "blob_entries", "blob_entries", column: "base_sha256", primary_key: "sha256"
add_foreign_key "http_log_entries", "blob_entries", column: "response_sha256", primary_key: "sha256"
add_foreign_key "http_log_entries", "http_log_entry_headers", column: "request_headers_id"
add_foreign_key "http_log_entries", "http_log_entry_headers", column: "response_headers_id"
end

View File

View File

View File

@@ -1,11 +0,0 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

View File

@@ -1,11 +0,0 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

View File

@@ -1,11 +0,0 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

View File

@@ -1,11 +0,0 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
# one: {}
# column: value
#
# two: {}
# column: value

View File

@@ -0,0 +1,35 @@
require "test_helper"
class AdaptiveCacheTest < ActiveSupport::TestCase
def test_works
cache = LegacyImport::AdaptiveCache.new(4, 1.0, 0.0)
refute cache.at_capacity?
assert_equal [], cache.candidates
# works even when the candidate doesn't exist
cache.reward 1
assert_equal [], cache.candidates
cache.insert :a, "a"
cache.insert :b, "b"
cache.insert :c, "c"
cache.insert :d, "d"
assert cache.at_capacity?
assert_equal ["d", "c", "b", "a"], cache.candidates
5.times { cache.reward :a }
3.times { cache.reward :b }
1.times { cache.reward :c }
assert_equal ["a", "b", "c", "d"], cache.candidates
assert_equal [5.0, 3.0, 1.0, 0.0], cache.scores
3.times { cache.reward :c } # 1 => 4
assert_equal ["a", "c", "b", "d"], cache.candidates
assert_equal [5.0, 4.0, 3.0, 0.0], cache.scores
# new 'e' should bump off 'd' which has a 0 score
cache.insert :e, "e"
assert_equal ["a", "c", "b", "e"], cache.candidates
assert_equal [5.0, 4.0, 3.0, 0.0], cache.scores
end
end