refactor http log entry migration task

This commit is contained in:
2023-02-02 18:52:40 +00:00
parent 08bbacca65
commit 49cd601862
7 changed files with 236 additions and 111 deletions

12
Gemfile
View File

@@ -2,6 +2,7 @@ source "https://rubygems.org"
git_source(:github) { |repo| "https://github.com/#{repo}.git" }
ruby "3.2.0"
# ruby "3.0.3"
# Bundle edge Rails instead: gem "rails", github: "rails/rails", branch: "main"
gem "rails", "~> 7.0.4", ">= 7.0.4.2"
@@ -73,11 +74,12 @@ group :test do
gem "webdrivers"
end
gem 'xdiff', path: '../xdiff-rb'
gem "xdiff", path: "../xdiff-rb"
# for legacy import
gem 'diffy'
gem 'rb-bsdiff', path: '../rb-bsdiff'
gem "diffy"
gem "rb-bsdiff", path: "../rb-bsdiff"
gem 'ruby-prof'
# gem 'concurrent-ruby-ext', require: 'concurrent'
gem "ruby-prof"
# gem "concurrent-ruby-ext", require: "concurrent"
# gem 'cli-ui'

View File

@@ -125,6 +125,7 @@ GEM
matrix (0.4.2)
method_source (1.0.0)
mini_mime (1.1.2)
mini_portile2 (2.8.1)
minitest (5.17.0)
msgpack (1.6.0)
net-imap (0.3.4)
@@ -137,7 +138,8 @@ GEM
net-smtp (0.3.3)
net-protocol
nio4r (2.5.8)
nokogiri (1.14.0-x86_64-linux)
nokogiri (1.14.1)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
pg (1.4.5)
pry (0.14.2)
@@ -194,16 +196,17 @@ GEM
actionpack (>= 5.2)
activesupport (>= 5.2)
sprockets (>= 3.0.0)
sqlite3 (1.6.0-x86_64-linux)
sqlite3 (1.6.0)
mini_portile2 (~> 2.8.0)
stimulus-rails (1.2.1)
railties (>= 6.0.0)
thor (1.2.1)
timeout (0.3.1)
turbo-rails (1.3.2)
turbo-rails (1.3.3)
actionpack (>= 6.0.0)
activejob (>= 6.0.0)
railties (>= 6.0.0)
tzinfo (2.0.5)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
web-console (4.2.0)
actionview (>= 6.0.0)
@@ -223,7 +226,7 @@ GEM
zeitwerk (2.6.6)
PLATFORMS
x86_64-linux
ruby
DEPENDENCIES
bootsnap
@@ -252,4 +255,4 @@ RUBY VERSION
ruby 3.2.0p0
BUNDLED WITH
2.4.5
2.2.32

305
Rakefile
View File

@@ -5,6 +5,15 @@ require_relative "config/application"
Rails.application.load_tasks
task :ractor => [:environment] do
require "http_log_entry"
require "xdiff"
r = Ractor.new {
XDiff.diff("foo", "bar")
}
r.take
end
# Find the last HttpLogEntry that was migrated
task :find_last_inserted => :environment do
last_found = nil
@@ -21,107 +30,128 @@ task :find_last_inserted => :environment do
end
end
task :migrate_legacy_http_entries, [:start_at, :num_shards, :this_shard] => [:environment] do |t, args|
start_at = args[:start_at].to_i
num_shards = args[:num_shards].to_i
this_shard = args[:this_shard].to_i
do_profile = false
RubyProf.start if do_profile
task :migrate_legacy_http_entries, [:start_at, :finish_at] => [:environment] do |t, args|
# last inserted - 179899
start = Time.now
first_start = start
i = 0
to_insert = []
BATCH_SIZE = 50
PROFILE = false
MAX_WORKERS = 4
# model id to start inserting from
START_AT = args[:start_at]&.to_i || 0
FINISH_AT = args[:finish_at]&.to_i || nil
lhle_query = Legacy::HttpLogEntry.
order(id: :asc).
includes(:req_headers_ref, :res_headers_ref, :parent_log_entry, :blob_entry)
puts "migrate_legacy_http_entries: " +
"#{START_AT || "(nil)"} -> #{FINISH_AT || "(nil)"}"
# map from url (String) -> [BlobCache]
candidate_blob_entry_cache = Hash.new
first_start = Time.now
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: MAX_WORKERS - 1,
max_threads: MAX_WORKERS - 1,
max_queue: MAX_WORKERS - 1,
fallback_policy: :caller_runs,
)
total_contents_length = 0
total_contents_stored = 0
num_inserted = 0
RubyProf.start if PROFILE
blob_entry_cache = Concurrent::Map.new do |hash, key|
hash[key] = ConcurrentBlobCache.new
end
dump_cache_stats = -> {
puts "-" * 40
puts "Cache state: "
candidate_blob_entry_cache.each do |key, cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
blob_entry_cache.each_pair do |key, cache|
cache.with_read_lock do |cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
end
end
end
}
dump_compression_stats = -> {
puts "Total content length: #{total_contents_length}"
puts "Total content stored: #{total_contents_stored}"
puts "Compression factor: #{(total_contents_stored / total_contents_length.to_f).round(3)}"
}
# insert_stats & last_model_id are guarded by insert_stats_guard
insert_stats_guard = Concurrent::ReadWriteLock.new
insert_stats = InsertStats.new
last_model_id = nil
last_time = Time.now
last_num_inserted = num_inserted
last_model = nil
stats_printer = Thread.new do
Thread.current.name = "stats-printer"
last_num_inserted = 0
last_time = Time.now
rates = []
dump_timing_stats = -> {
now = Time.now
duration = now - last_time
last_time = now
rate = (num_inserted - last_num_inserted) / duration
last_num_inserted = num_inserted
puts "Inserted #{num_inserted} - #{rate.round(2)}/sec (last id: #{last_model&.id})"
}
loop do
sleep 2
lhle_query.find_in_batches(batch_size: 100, start: start_at) do |lhles|
lhles.each do |lhle|
next unless lhle.id % num_shards == this_shard
now = Time.now
duration = now - last_time
last_time = now
last_model = lhle
tries = 0
insert_stats_guard.with_read_lock do
rate = (insert_stats.num_inserted - last_num_inserted) / duration
last_num_inserted = insert_stats.num_inserted
blob_entry = begin
ReduxApplicationRecord.transaction do
create_for_lhle(lhle, candidate_blob_entry_cache)
end
rescue
if tries < 2
tries += 1
retry
else
raise
end
# rolling average over last 10 seconds
rates.unshift(rate)
while rates.size > 5
rates.pop
end
next if blob_entry.nil?
num_inserted += 1
total_contents_length += blob_entry.contents.size
total_contents_stored += blob_entry.contents_stored_size
rate = rates.sum / rates.size
if num_inserted % 100 == 0
puts "-" * 40
dump_timing_stats.call
dump_compression_stats.call
end
if num_inserted % 1000 == 0
puts "-" * 40
dump_cache_stats.call
puts "#{Thread.current.name} - " +
"Inserted #{insert_stats} - " +
"#{rate.round(2)}/sec (last id: #{last_model_id})"
end
end
end
duration = Time.now - first_start
rate = num_inserted / duration
puts "-" * 40
puts "Done, last id inserted: #{last_model&.id}, " +
"total inserted: #{num_inserted}, " +
"#{duration.round(0)} seconds, " +
"#{rate.round(2)}/second"
worker_exception = nil
Legacy::HttpLogEntry.order(id: :asc).find_in_batches(
batch_size: BATCH_SIZE,
start: START_AT,
finish: FINISH_AT,
) do |lhles|
pool.post do
# puts "#{Thread.current.name} - post batch of #{lhles.size}"
sub_insert_stats = import_legacy_http_log_entries(lhles, blob_entry_cache)
# puts "#{Thread.current.name} - finish batch: #{sub_insert_stats}"
insert_stats_guard.with_write_lock do
insert_stats.add!(sub_insert_stats)
last_model_id = lhles.last.id
end
rescue
puts "#{Thread.current.name} - exception processing batch: #{$!}"
worker_exception = $!
end
raise worker_exception if worker_exception
end
if do_profile
File.mkdir("profiler") unless File.exists?("profiler")
puts "Waiting for worker threads to finish..."
pool.shutdown
pool.wait_for_termination
stats_printer.kill
stats_printer.join
raise worker_exception if worker_exception
insert_stats_guard.with_read_lock do
bytes_stored = insert_stats.bytes_stored
bytes_length = insert_stats.bytes_length
ratio = bytes_stored.to_f / bytes_length
duration = Time.now - first_start
rate = insert_stats.num_inserted / duration
puts "-" * 40
puts "Total content stored: #{humansize(bytes_stored)}"
puts "Total content length: #{humansize(bytes_length)}"
puts "Size ratio: #{ratio.round(2)}"
puts "Total inserted: #{insert_stats.num_inserted}"
puts "Total duration: #{duration.round(0)} seconds (#{rate.round(2)}/second)"
end
if PROFILE
Dir.mkdir("profiler") unless File.exist?("profiler")
result = RubyProf.stop
File.open("profiler/migrate_legacy_http_entries.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
@@ -132,30 +162,105 @@ task :migrate_legacy_http_entries, [:start_at, :num_shards, :this_shard] => [:en
end
end
def create_for_lhle(lhle, all_blobs_cache)
class ConcurrentBlobCache
def initialize
@guard = Concurrent::ReadWriteLock.new
@cache = BlobCache.new
end
def with_read_lock(&block)
@guard.with_read_lock do
block.call(@cache)
end
end
def with_write_lock(&block)
@guard.with_write_lock do
block.call(@cache)
end
end
end
def humansize(size)
units = %w[B KiB MiB GiB TiB Pib EiB ZiB]
return "0.0 B" if size == 0
exp = (Math.log(size) / Math.log(1024)).to_i
exp += 1 if (size.to_f / 1024 ** exp >= 1024 - 0.05)
exp = units.size - 1 if exp > units.size - 1
"%.1f %s" % [size.to_f / 1024 ** exp, units[exp]]
end
InsertStats = Struct.new(
:num_inserted,
:bytes_stored,
:bytes_length,
) do
def initialize
self.num_inserted = 0
self.bytes_stored = 0
self.bytes_length = 0
end
def add!(other)
self.num_inserted += other.num_inserted
self.bytes_stored += other.bytes_stored
self.bytes_length += other.bytes_length
end
def to_s
ratio = self.bytes_stored.to_f / self.bytes_length
"+#{self.num_inserted}, " +
"#{humansize(self.bytes_stored)}/#{humansize(self.bytes_length)} " +
"(#{ratio.round(2)})"
end
end
def import_legacy_http_log_entries(lhles, blob_entry_cache)
insert_stats = InsertStats.new
lhles.each do |lhle|
retries = 0
sub_insert_stats = nil
begin
sub_insert_stats = import_legacy_http_log_entry(lhle, blob_entry_cache)
rescue
if retries < 2
retries += 1
# add some random delay to avoid conflicting with other threads
sleep rand(0.0..0.1)
retry
else
raise
end
end
insert_stats.add!(sub_insert_stats)
end
insert_stats
end
def import_legacy_http_log_entry(lhle, blob_entry_cache)
content_type = lhle.content_type
cache_key = "#{content_type}|#{lhle.host}"
all_blobs_cache[cache_key] ||= BlobCache.new
blob_cache = all_blobs_cache[cache_key]
blob_cache = blob_entry_cache[cache_key]
contents = lhle.response_body
if contents.nil?
return nil
return InsertStats.new
end
if ::HttpLogEntry.where(id: lhle.id).select(:id).first
return nil
return InsertStats.new
end
request_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.res_headers)
# 2% chance we don't supply any candidates to new blob creation, so the
# candidate pool doesn't get stale.
candidates = if blob_cache.at_capacity? && rand(0..100) >= 5
blob_cache.candidates
candidates = blob_cache.with_read_lock do |cache|
if cache.at_capacity? && rand(0..100) >= 5
cache.candidates
else
[]
end
end
blob_entry = ::BlobEntry.build_entry(
content_type: content_type,
@@ -164,12 +269,17 @@ def create_for_lhle(lhle, all_blobs_cache)
)
blob_entry.save!
if blob_entry.base
blob_cache.reward(blob_entry.base)
else
blob_cache.insert(blob_entry, extra: lhle.full_path)
blob_cache.with_write_lock do |cache|
if blob_entry.base
cache.reward(blob_entry.base)
else
cache.insert(blob_entry, extra: lhle.full_path)
end
end
request_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.res_headers)
log_entry = ::HttpLogEntry.new(
id: lhle.id,
scheme: lhle.scheme,
@@ -190,12 +300,11 @@ def create_for_lhle(lhle, all_blobs_cache)
log_entry.save!
contents_length = blob_entry.contents.size
contents_stored = blob_entry.contents_stored_size
# puts "inserted: #{log_entry.id} - #{log_entry.full_path} " +
# "(#{(contents_stored / contents_length.to_f).round(2)} compressed)"
blob_entry
insert_stats = InsertStats.new()
insert_stats.num_inserted = 1
insert_stats.bytes_length = blob_entry.contents.size
insert_stats.bytes_stored = blob_entry.contents_stored_size
insert_stats
end
class BlobCache
@@ -213,7 +322,7 @@ class BlobCache
end
def candidates
@candidates.map { |c| c[OBJ] }
@candidates.map { |c| c[OBJ] }.to_a
end
def reward(candidate)

View File

@@ -10,6 +10,16 @@ class BlobEntry < ReduxApplicationRecord
:size
)
def to_bulk_insert_hash
{
sha256: self.read_attribute(:sha256),
content_type: self.read_attribute(:content_type),
contents: self.read_attribute(:contents),
size: self.read_attribute(:size),
base_id: self.read_attribute(:base_id),
}
end
def contents
contents_raw = self.read_attribute(:contents)
if self.base

View File

@@ -3,7 +3,7 @@ class HttpLogEntryHeader < ReduxApplicationRecord
:sha256,
:headers
)
def self.find_or_create(headers:)
raise("must be a hash") unless headers.is_a?(Hash)

View File

@@ -17,6 +17,7 @@ redux_prod: &redux_prod
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/redux_migrate
pool: 64
legacy_prod: &legacy_prod
adapter: postgresql
@@ -26,6 +27,7 @@ legacy_prod: &legacy_prod
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
database_tasks: false
pool: 64
development:
redux:

1
db/schema.rb generated
View File

@@ -62,5 +62,4 @@ ActiveRecord::Schema[7.0].define(version: 2023_01_31_012417) do
t.binary "value", null: false
t.index ["key"], name: "index_log_store_sst_entries_on_key", unique: true
end
end