migrate sst entries, start on migrating http log entries

This commit is contained in:
2023-02-01 04:10:49 +00:00
parent 654ff8b71a
commit 08bbacca65
24 changed files with 1247 additions and 7 deletions

2
.gitignore vendored
View File

@@ -33,3 +33,5 @@
# Ignore master key for decrypting credentials and more.
/config/master.key
/profiler/

10
Gemfile
View File

@@ -12,6 +12,7 @@ gem "sprockets-rails"
# Use sqlite3 as the database for Active Record
gem "sqlite3", "~> 1.4"
gem "pg"
gem "pry"
# Use the Puma web server [https://github.com/puma/puma]
gem "puma", "~> 5.0"
@@ -72,4 +73,11 @@ group :test do
gem "webdrivers"
end
gem 'xdiff', path: '../xdiff-rb'
gem 'xdiff', path: '../xdiff-rb'
# for legacy import
gem 'diffy'
gem 'rb-bsdiff', path: '../rb-bsdiff'
gem 'ruby-prof'
# gem 'concurrent-ruby-ext', require: 'concurrent'

View File

@@ -1,3 +1,8 @@
PATH
remote: ../rb-bsdiff
specs:
rb-bsdiff (0.1.0)
PATH
remote: ../xdiff-rb
specs:
@@ -86,12 +91,14 @@ GEM
rack-test (>= 0.6.3)
regexp_parser (>= 1.5, < 3.0)
xpath (~> 3.2)
coderay (1.1.3)
concurrent-ruby (1.2.0)
crass (1.0.6)
date (3.3.3)
debug (1.7.1)
irb (>= 1.5.0)
reline (>= 0.3.1)
diffy (3.4.2)
erubi (1.12.0)
globalid (1.1.0)
activesupport (>= 5.0)
@@ -133,6 +140,9 @@ GEM
nokogiri (1.14.0-x86_64-linux)
racc (~> 1.4)
pg (1.4.5)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
public_suffix (5.0.1)
puma (5.6.5)
nio4r (~> 2.0)
@@ -171,6 +181,7 @@ GEM
reline (0.3.2)
io-console (~> 0.5)
rexml (3.2.5)
ruby-prof (1.4.5)
rubyzip (2.3.2)
selenium-webdriver (4.8.0)
rexml (~> 3.2, >= 3.2.5)
@@ -218,11 +229,15 @@ DEPENDENCIES
bootsnap
capybara
debug
diffy
importmap-rails
jbuilder
pg
pry
puma (~> 5.0)
rails (~> 7.0.4, >= 7.0.4.2)
rb-bsdiff!
ruby-prof
selenium-webdriver
sprockets-rails
sqlite3 (~> 1.4)

250
Rakefile
View File

@@ -5,6 +5,256 @@ require_relative "config/application"
Rails.application.load_tasks
# Find the last HttpLogEntry that was migrated
task :find_last_inserted => :environment do
last_found = nil
Legacy::HttpLogEntry.order(id: :asc).find_each do |lhle|
contents = lhle.response_body
next if contents.nil?
model = HttpLogEntry.find_by(id: lhle.id)
if model.nil?
puts "last found id: #{last_found&.id || "(none)"}"
break
end
last_found = model
end
end
task :migrate_legacy_http_entries, [:start_at, :num_shards, :this_shard] => [:environment] do |t, args|
start_at = args[:start_at].to_i
num_shards = args[:num_shards].to_i
this_shard = args[:this_shard].to_i
do_profile = false
RubyProf.start if do_profile
start = Time.now
first_start = start
i = 0
to_insert = []
lhle_query = Legacy::HttpLogEntry.
order(id: :asc).
includes(:req_headers_ref, :res_headers_ref, :parent_log_entry, :blob_entry)
# map from url (String) -> [BlobCache]
candidate_blob_entry_cache = Hash.new
total_contents_length = 0
total_contents_stored = 0
num_inserted = 0
dump_cache_stats = -> {
puts "Cache state: "
candidate_blob_entry_cache.each do |key, cache|
if cache.candidates.count > 0
puts "for #{key}: "
puts cache.to_s
puts
end
end
}
dump_compression_stats = -> {
puts "Total content length: #{total_contents_length}"
puts "Total content stored: #{total_contents_stored}"
puts "Compression factor: #{(total_contents_stored / total_contents_length.to_f).round(3)}"
}
last_time = Time.now
last_num_inserted = num_inserted
last_model = nil
dump_timing_stats = -> {
now = Time.now
duration = now - last_time
last_time = now
rate = (num_inserted - last_num_inserted) / duration
last_num_inserted = num_inserted
puts "Inserted #{num_inserted} - #{rate.round(2)}/sec (last id: #{last_model&.id})"
}
lhle_query.find_in_batches(batch_size: 100, start: start_at) do |lhles|
lhles.each do |lhle|
next unless lhle.id % num_shards == this_shard
last_model = lhle
tries = 0
blob_entry = begin
ReduxApplicationRecord.transaction do
create_for_lhle(lhle, candidate_blob_entry_cache)
end
rescue
if tries < 2
tries += 1
retry
else
raise
end
end
next if blob_entry.nil?
num_inserted += 1
total_contents_length += blob_entry.contents.size
total_contents_stored += blob_entry.contents_stored_size
if num_inserted % 100 == 0
puts "-" * 40
dump_timing_stats.call
dump_compression_stats.call
end
if num_inserted % 1000 == 0
puts "-" * 40
dump_cache_stats.call
end
end
end
duration = Time.now - first_start
rate = num_inserted / duration
puts "-" * 40
puts "Done, last id inserted: #{last_model&.id}, " +
"total inserted: #{num_inserted}, " +
"#{duration.round(0)} seconds, " +
"#{rate.round(2)}/second"
if do_profile
File.mkdir("profiler") unless File.exists?("profiler")
result = RubyProf.stop
File.open("profiler/migrate_legacy_http_entries.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("profiler/migrate_legacy_http_entries.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
end
end
def create_for_lhle(lhle, all_blobs_cache)
content_type = lhle.content_type
cache_key = "#{content_type}|#{lhle.host}"
all_blobs_cache[cache_key] ||= BlobCache.new
blob_cache = all_blobs_cache[cache_key]
contents = lhle.response_body
if contents.nil?
return nil
end
if ::HttpLogEntry.where(id: lhle.id).select(:id).first
return nil
end
request_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.req_headers)
response_headers = ::HttpLogEntryHeader.find_or_create(headers: lhle.res_headers)
# 2% chance we don't supply any candidates to new blob creation, so the
# candidate pool doesn't get stale.
candidates = if blob_cache.at_capacity? && rand(0..100) >= 5
blob_cache.candidates
else
[]
end
blob_entry = ::BlobEntry.build_entry(
content_type: content_type,
contents: contents,
candidates: candidates,
)
blob_entry.save!
if blob_entry.base
blob_cache.reward(blob_entry.base)
else
blob_cache.insert(blob_entry, extra: lhle.full_path)
end
log_entry = ::HttpLogEntry.new(
id: lhle.id,
scheme: lhle.scheme,
host: lhle.host,
path: lhle.path,
query: lhle.query,
verb: lhle.verb,
content_type: content_type,
status_code: lhle.status,
response_time_ms: lhle.response_time,
request_headers: request_headers,
response_headers: response_headers,
blob_entry: blob_entry,
requested_at: lhle.requested_at,
created_at: lhle.created_at,
updated_at: lhle.updated_at,
)
log_entry.save!
contents_length = blob_entry.contents.size
contents_stored = blob_entry.contents_stored_size
# puts "inserted: #{log_entry.id} - #{log_entry.full_path} " +
# "(#{(contents_stored / contents_length.to_f).round(2)} compressed)"
blob_entry
end
class BlobCache
COUNT = 0
OBJ = 1
EXTRA = 2
def initialize(max_size = 32)
@max_size = max_size
@candidates = []
end
def at_capacity?
@candidates.count == @max_size
end
def candidates
@candidates.map { |c| c[OBJ] }
end
def reward(candidate)
@candidates.each do |entry|
if entry[OBJ].id == candidate.id
entry[COUNT] += 1.0
else
entry[COUNT] -= 0.1
end
end
sort!
end
def insert(candidate, extra: nil)
new_entry = [0.0, candidate, extra]
idx = @candidates.bsearch_index { |entry| entry[COUNT] <= 0 }
if idx == nil
@candidates.push(new_entry)
else
@candidates.insert(idx, new_entry)
end
while @candidates.size > @max_size
@candidates.pop
end
end
def to_s
@candidates.map do |entry|
" - #{entry[COUNT].round(1)} score, id #{entry[OBJ].id} (#{entry[EXTRA]})"
end.join("\n")
end
private
def sort!
@candidates.sort_by! { |entry| -entry[COUNT] }
end
end
task :insert_sst_entry => [:environment] do
start = Time.now
first_start = start

65
app/models/blob_entry.rb Normal file
View File

@@ -0,0 +1,65 @@
class BlobEntry < ReduxApplicationRecord
belongs_to :base,
optional: true,
class_name: "::BlobEntry"
validates_presence_of(
:sha256,
:content_type,
:contents,
:size
)
def contents
contents_raw = self.read_attribute(:contents)
if self.base
@contents_base_patched ||= XDiff.patch(self.base.contents, contents_raw)
else
contents_raw
end
end
def contents_stored_size
self.read_attribute(:contents).size
end
def self.build_entry(content_type:, contents:, candidates: [])
sha256 = Digest::SHA256.digest(contents)
existing = BlobEntry.find_by(sha256: sha256, content_type: content_type)
return existing unless existing.nil?
record = BlobEntry.new(sha256: sha256, content_type: content_type, size: contents.size)
smallest_patch_size = nil
smallest_patch = nil
smallest_candidate = nil
candidates.map do |candidate|
# only consider candidates with the same content type (may relax this later)
next if candidate.content_type != content_type
# only consider candidates who themselves aren't patch-based
next unless candidate.base.nil?
[candidate, XDiff.diff(candidate.contents, contents)]
end.reject(&:nil?).each do |pair|
candidate, patch = pair
if smallest_patch_size.nil? || patch.size < smallest_patch_size
smallest_patch_size = patch.size
smallest_patch = patch
smallest_candidate = candidate
end
end
# only use a patch if it's <= 60% the original content size
if smallest_patch_size && smallest_patch_size <= (contents.size * 0.6)
record.base = smallest_candidate
record.contents = smallest_patch
else
# no candidate present, store the whole contents directly in the record
record.contents = contents
end
raise RuntimeError.new("invariant!") if record.contents != contents
record
end
end

View File

@@ -0,0 +1,24 @@
class HttpLogEntry < ReduxApplicationRecord
enum verb: %i[get post]
belongs_to :blob_entry, class_name: "::BlobEntry"
belongs_to :request_headers,
class_name: "::HttpLogEntryHeader"
belongs_to :response_headers,
class_name: "::HttpLogEntryHeader"
validates_inclusion_of(:verb, in: ::HttpLogEntry.verbs.keys)
validates_presence_of(
:scheme,
:host,
:path,
:status_code,
:response_time_ms,
:content_type,
:requested_at
)
def full_path
"#{scheme}://#{host}#{path}#{query ? "?#{query}" : ""}"
end
end

View File

@@ -0,0 +1,21 @@
class HttpLogEntryHeader < ReduxApplicationRecord
validates_presence_of(
:sha256,
:headers
)
def self.find_or_create(headers:)
raise("must be a hash") unless headers.is_a?(Hash)
headers = headers.dup
headers.delete("date")
headers.delete("expires")
headers.delete("cf-ray")
headers = headers.sort.to_h
sha256 = Digest::SHA256.digest(headers.to_s)
HttpLogEntryHeader.find_or_create_by!(sha256: sha256) do |c|
c.headers = headers
end
end
end

View File

@@ -0,0 +1,125 @@
# frozen_string_literal: true
# == Schema Information
#
# Table name: blob_entries
#
# id :integer not null, primary key
# created_at :datetime
# updated_at :datetime
# file_size :integer
# refcount :integer
# sha256 :string(64)
# dir_depth :integer default(2), not null
#
require "digest"
class Legacy::BlobEntry < LegacyApplicationRecord
self.table_name = "blob_entries"
validates_presence_of :sha256
validates_presence_of :refcount
validates_presence_of :file_size
validates_uniqueness_of :sha256
validates_length_of :sha256, is: 64
validates_presence_of :dir_depth
before_validation do
self.dir_depth ||= 2
self.file_size = File.size(file_path)
self.refcount ||= 0
end
def file_relative_path
sha256 || raise
dir_depth || raise
self.class.file_path_at_depth(sha256: sha256, depth: dir_depth)
end
def ensure_file_path
sha256 || raise
dir_depth || raise
unless File.exist?(file_path)
found = false
(2..5).each do |depth|
path = File.join(
Legacy::SConfig.blob_static_dir,
self.class.file_path_at_depth(sha256: sha256, depth: depth)
)
next unless File.exist?(path)
self.dir_depth = depth
save!
found = true
Legacy::SConfig.logger.warn("found fixed path at #{depth} for BE id #{id}")
break
end
return nil unless found
end
file_path
end
def self.file_path_at_depth(sha256:, depth:, stride: 1, hash_length: 64)
# generate something like sha256[0]/sha256[1]/sha256
raise("invalid sha256: #{sha256}") unless sha256.length == hash_length
parts = (0...depth).map do |idx|
sha256[(idx * stride)...((idx + 1) * stride)]
end + [sha256]
File.join(*parts)
end
def file_path
File.join Legacy::SConfig.blob_static_dir, file_relative_path
end
def inc_refcount
BlobEntry.increment_counter(:refcount, id)
end
def dec_refcount
BlobEntry.decrement_counter(:refcount, id)
end
def self.create_from_blob(blob:, opts: {})
sha256 = Digest::SHA256.hexdigest blob
write_out = lambda { |be, _contents|
dir = File.dirname be.file_path
FileUtils.mkdir_p dir
f = File.open(be.file_path, "wb")
begin
f.write(blob)
f.fsync
unless File.exist?(be.file_path)
raise("error ensuring blob exists for #{be.id}")
end
ensure
f.close
end
}
be = nil
BlobEntry.transaction do
be = BlobEntry.find_by(sha256: sha256)
if be && !be.ensure_file_path
# correct directory depth as well
Legacy::SConfig.logger.warn("file doesn't exist for #{be.id}, writing again...")
write_out.call(be, blob)
elsif !be
new_be = BlobEntry.new(opts.merge(sha256: sha256))
write_out.call(new_be, blob)
new_be.save!
be = new_be
end
end
be
end
end

View File

@@ -0,0 +1,444 @@
# frozen_string_literal: true
# == Schema Information
#
# Table name: cache_http_log_entries
#
# id :integer not null, primary key
# scheme :string
# host :string
# path :string
# query :string
# verb :integer
# status :integer
# response_time :integer
# content_type :string
# response_size :integer
# parent_log_entry_id :integer
# blob_entry_id :integer
# gzipped :boolean
# requested_at :datetime
# created_at :datetime not null
# updated_at :datetime not null
# resp_body :binary
# imported_from_file :string
# req_headers_id :integer
# res_headers_id :integer
# diff_type :integer default(0)
#
require "zlib"
require "stringio"
module Diffy
class Diff
def tempfile(string)
t = Tempfile.new("diffy")
# ensure tempfiles aren't unlinked when GC runs by maintaining a
# reference to them.
@tempfiles ||= []
@tempfiles.push(t)
t.binmode
t.print(string)
t.flush
t.close
t.path
end
end
end
class Legacy::HttpLogEntry < LegacyApplicationRecord
self.table_name = "cache_http_log_entries"
# threshold or less: store in database directly
KEEP_INTERNALLY_THRESHOLD = 1024 * 64
belongs_to :parent_log_entry, class_name: "Legacy::HttpLogEntry"
belongs_to :blob_entry, class_name: "Legacy::BlobEntry"
validates_presence_of(
:scheme,
:host,
:path,
:status,
:response_time,
:content_type,
:response_size
)
enum verb: %i[get post]
validates_inclusion_of :verb, in: Legacy::HttpLogEntry.verbs.keys
# text: use Diffy diffing
# binary: use BSDiff
# native: use the native LogStore server to store the entry
enum diff_type: %i[text binary native]
validates_inclusion_of :diff_type, in: Legacy::HttpLogEntry.diff_types.keys
after_initialize do
self.diff_type = "native" if new_record?
end
# out of line req/response headers
belongs_to :req_headers_ref,
foreign_key: :req_headers_id, class_name: "Legacy::HttpLogEntryHeader"
belongs_to :res_headers_ref,
foreign_key: :res_headers_id, class_name: "Legacy::HttpLogEntryHeader"
attr_accessor :can_force_update
before_update do
if can_force_update
true
else
raise("HttpLogEntry is immutable!")
false
end
end
private
def set_header_impl(method, headers)
headers.delete("date")
headers.delete("expires")
headers.delete("cf-ray")
send(
"#{method}=",
Legacy::HttpLogEntryHeader.find_or_create(headers: headers)
)
end
public
def req_headers=(headers)
set_header_impl(:req_headers_ref, headers)
end
def res_headers=(headers)
set_header_impl(:res_headers_ref, headers)
end
private
def get_header_impl(method)
ref = send("#{method}_ref")
if ref
ref.headers
else
self.class.superclass.instance_method(method).bind(self).call
end
end
public
def req_headers
ref = req_headers_ref
if ref
ref.headers
else
{}
end
end
def res_headers
ref = res_headers_ref
if ref
ref.headers
else
{}
end
end
before_create do
self.requested_at ||= DateTime.now
end
def body_stored?
!!(blob_entry_id || parent_log_entry_id || resp_body)
end
def response_body=(body_string)
if diff_type == "native"
set_response_body_native(body_string, {})
else
set_response_body(body_string)
end
end
def full_path
"#{scheme}://#{host}#{path}#{query ? "?#{query}" : ""}"
end
def set_response_body_native(body_string, opts = {})
raise("legacy can't write")
# try and find a good HttpLogEntry to diff this against
candidate_keys = if !opts[:skip_find_candidates]
Legacy::HttpLogEntry.where(
host: host,
path: path,
diff_type: self.class.diff_types["native"],
).limit(5).to_a.map(&:resp_body).reject(&:nil?).reject(&:empty?)
else
[]
end
SConfig.with_log_store_client do |lsc|
ret = lsc.put_binary(
LogStore::PutBinaryArgs.new(hint_hashes: candidate_keys, contents: body_string)
)
self.resp_body = ret.key
end
body_string
end
# string ->
def set_response_body(body_string, opts = {})
return set_response_body_native(body_string, opts) if diff_type == "native"
# try and find a good HttpLogEntry to diff this against
candidate_entries = Legacy::HttpLogEntry.where(
host: host,
path: path,
parent_log_entry_id: nil,
).limit(3).to_a
# add or remove trailing slash to each of the paths
hint_paths = opts[:similar_content_path_hints] || []
hint_paths += hint_paths.map do |p|
if p == "/"
p
elsif p[-1] == "/"
p[0..-2]
else
p + "/"
end
end
body_string = body_string.force_encoding("UTF-8")
good_ce = nil
use_string = body_string
gzipped = false
if body_string.valid_encoding?
if hint_paths.any?
candidate_entries += Legacy::HttpLogEntry.where(
host: host,
path: hint_paths,
parent_log_entry_id: nil,
).limit(50).to_a
end
SConfig.logger.info("Comparing against #{candidate_entries.length} " \
"candidates: #{candidate_entries.map(&:path).join(", ")}")
candidate_entries.each do |ce|
SConfig.logger.info "Comparing diff against HLE (#{ce.id}: #{ce.path})"
ce_body = ce.response_body
if !ce_body || (!ce_body.valid_encoding? && diff_type == "text")
SConfig.logger.info "HLE #{ce.id} has invalid encoded response body"
next
end
ce_diff = self.class.get_diff(ce_body, body_string, diff_type)
if (diff_type == "text") && (/^Binary files .+ and .+ differ/ =~ ce_diff)
SConfig.logger.warn("diff detected HLE #{ce.id} was a binary, skipping...")
next
end
# verify we can reconstruct the original body string
if self.class.apply_patch(ce_body, ce_diff, diff_type) != body_string
SConfig.logger.error("couldn't succesfully apply patch to get orig...")
next
end
gzipped_diff = self.class.gzip(ce_diff)
ce_use_string = nil
ce_gzipped = nil
if gzipped_diff.length < ce_diff.length
ce_gzipped = true
ce_use_string = gzipped_diff
else
ce_gzipped = false
ce_use_string = ce_diff
end
# haven't found a smaller use_string
if use_string.length < ce_use_string.length
SConfig.logger.info(
"Previous config was still smaller (#{use_string.length} vs" \
" #{ce_use_string.length} bytes)"
)
next
else
SConfig.logger.info(
"HLE (#{ce.id}) is good candidate: #{ce_use_string.length} bytes " \
"(gz: #{ce_gzipped})"
)
end
good_ce = ce
gzipped = ce_gzipped
use_string = ce_use_string
end
else
SConfig.logger.error("Invalid encoding detected, not storing diff")
end
self.parent_log_entry = good_ce # or nil, if none found
self.gzipped = gzipped
if use_string.length < self.class::KEEP_INTERNALLY_THRESHOLD
self.resp_body = use_string
SConfig.logger.info "Storing data interally"
else
self.blob_entry = BlobEntry.create_from_blob(blob: use_string, opts: { dir_depth: 4 })
blob_entry.inc_refcount
SConfig.logger.info "Storing data in blob entry #{blob_entry.id}..."
end
if response_body != body_string
raise("internal error, response_body != body_string")
end
stored_bytes = use_string.length
total_bytes = body_string.length
SConfig.logger.info(
"Stored #{stored_bytes}/#{total_bytes} bytes" \
" (#{(stored_bytes.to_f / total_bytes.to_f * 100.0).round(1)}\% of original)"
)
response_body
rescue StandardError
blob_entry && blob_entry.dec_refcount
raise
end
class NoBEPathException < RuntimeError
end
# -> string
def response_body
return response_body_native if diff_type == "native"
our_string = if blob_entry
path = blob_entry.ensure_file_path
unless path
raise NoBEPathException, "no path for blob entry " \
"#{blob_entry_id} (HLE id: #{id}) (#{blob_entry.file_path})"
end
File.read(path)
else
resp_body
end
our_string = self.class.gunzip(our_string) if gzipped
return nil if our_string.nil?
# our_string = our_string.force_encoding("UTF-8")
if parent_log_entry
self.class.apply_patch(parent_log_entry.response_body, our_string, diff_type)
else
our_string
end
end
def response_body_native
raise unless diff_type == "native"
return "" unless resp_body
# old:
# ret = nil
# SConfig.with_log_store_client do |lsc|
# args = LogStore::GetBinaryArgs.new(key: resp_body)
# ret = lsc.get_binary(args).contents
# end
# ret
# new:
@response_body_native ||=
::LogStoreSstEntry.find_by_hex_key(resp_body).patched_value
end
def self.encode_str(str)
str.encode(Encoding.find("UTF-8"), invalid: :replace, undef: :replace, replace: "")
end
def self.gunzip(data)
io = StringIO.new(data, "rb")
Zlib::GzipReader.new(io).read
end
def self.gzip(string)
wio = StringIO.new("w")
w_gz = Zlib::GzipWriter.new(wio)
w_gz.write(string)
w_gz.close
wio.string
end
def self.get_diff(old_bytes, new_bytes, diff_type)
if diff_type == "text"
return Diffy::Diff.new(old_bytes, new_bytes, diff: "-e").to_s
end
raise("unknown diff type '#{diff_type}'") if diff_type != "binary"
tf_old = Tempfile.new("old-file")
tf_new = Tempfile.new("new-file")
tf_out = Tempfile.new("patch")
files = [tf_old, tf_new, tf_out]
begin
files.each(&:binmode)
tf_old.write(old_bytes)
tf_new.write(new_bytes)
files.each(&:close)
if BSDiff.diff(tf_old.path, tf_new.path, tf_out.path)
tf_out.open
bytes = tf_out.read
tf_out.close
return bytes
else
return nil
end
ensure
files.each(&:unlink)
end
end
def self.apply_patch(old_text, patch, diff_type)
tf_orig = Tempfile.new("apply-patch", encoding: "ascii-8bit")
tf_patch = Tempfile.new("apply-patch", encoding: "ascii-8bit")
tf_out = Tempfile.new("applied-patch", encoding: "ascii-8bit")
files = [tf_orig, tf_patch, tf_out]
begin
tf_out.close
tf_orig.write(old_text)
tf_patch.write(patch)
tf_orig.close
tf_patch.close
if diff_type == "text"
`patch -e #{tf_orig.path} #{tf_patch.path} -o #{tf_out.path}`
tf_out.open
ret = tf_out.read
tf_out.close
ret
elsif diff_type == "binary"
if BSDiff.patch(tf_orig.path, tf_out.path, tf_patch.path)
tf_out.open
ret = tf_out.read
tf_out.close
ret
end
else
raise("invalid diff type #{diff_type}")
end
ensure
files.each(&:unlink)
end
end
end

View File

@@ -0,0 +1,25 @@
# frozen_string_literal: true
# == Schema Information
#
# Table name: cache_http_log_entry_headers
#
# id :integer not null, primary key
# headers :hstore not null
# sha256 :binary not null
# created_at :datetime
# updated_at :datetime
#
class Legacy::HttpLogEntryHeader < LegacyApplicationRecord
self.table_name = "cache_http_log_entry_headers"
def self.find_or_create(headers:)
temp = Legacy::HttpLogEntryHeader.new(headers: headers)
sha256 = Digest::SHA256.digest(temp.headers.to_s)
Legacy::HttpLogEntryHeader.find_or_create_by!(sha256: sha256) do |c|
c.headers = temp.headers
end
end
end

View File

@@ -0,0 +1,94 @@
class LegacyApplicationRecord < ActiveRecord::Base
# self.primary_abstract_class = true
self.abstract_class = true
connects_to database: { writing: :legacy, reading: :legacy }
end
module Legacy
end
class Legacy::SConfig
def self.data_dir
"/home/scraper/scraper_data_original/scraper_data"
end
def self.blob_data_dir
File.join data_dir, "blobs"
end
def self.e621_data_dir
File.join data_dir, "e621"
end
def self.fa_data_dir
File.join data_dir, "fa"
end
def self.ib_data_dir
File.join data_dir, "ib"
end
def self.blob_static_dir
File.join blob_data_dir, "static"
end
def self.e621_static_dir
File.join e621_data_dir, "static"
end
def self.fa_post_static_dir
File.join fa_data_dir, "static/posts"
end
def self.fa_icons_static_dir
File.join fa_data_dir, "static/icons"
end
def self.ib_post_static_dir
File.join ib_data_dir, "static/posts"
end
def self.e621_json_dir
File.join e621_data_dir, "json"
end
def self.fa_html_dir
File.join fa_data_dir, "html"
end
def self.fa_cookie_jar_dir
File.join fa_data_dir, "cookies"
end
def self.ib_logs_dir
File.join ib_data_dir, "logs"
end
def self.ib_cookie_jar_dir
File.join ib_data_dir, "cookies"
end
def self.http_logger_data_dir
File.join data_dir, "http_logger"
end
def self.logger
@@logger ||= begin
l = Logger.new(STDOUT)
l.level = Logger::INFO
l.datetime_format = "%Y-%m-%d %H:%M:%S"
l.formatter = proc do |sev, datetime, _prog, msg|
color = case sev
when "INFO" then :blue
when "ERROR" then :red
when "DEBUG" then :yellow
else :white
end
date_format = datetime.strftime("%Y-%m-%d %H:%M:%S")
"[#{date_format}] #{sev.ljust(5).send(color)}: #{msg}\n"
end
l
end
end
end

View File

@@ -1,4 +1,5 @@
class ReduxApplicationRecord < ActiveRecord::Base
primary_abstract_class
# self.primary_abstract_class = true
self.abstract_class = true
connects_to database: { writing: :redux, reading: :redux }
end

View File

@@ -1,7 +1,7 @@
<!DOCTYPE html>
<html>
<head>
<title>LegacyExplorer</title>
<title>ReduxScraper</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<%= csrf_meta_tags %>
<%= csp_meta_tag %>

View File

@@ -6,10 +6,11 @@ require "rails/all"
# you've limited to :test, :development, or :production.
Bundler.require(*Rails.groups)
module LegacyExplorer
module ReduxScraper
class Application < Rails::Application
# Initialize configuration defaults for originally generated Rails version.
config.load_defaults 7.0
config.active_record.legacy_connection_handling = false
# Configuration for the application, engines, and railties goes here.
#

View File

@@ -23,8 +23,8 @@ legacy_prod: &legacy_prod
host: scraper-postgres.local
port: 5432
database: site_scraper_prod
username: scraper
password: zL7zDRXycLhLFJLQj5Zh
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
database_tasks: false
development:

View File

@@ -0,0 +1,47 @@
class CreateHttpLogEntries < ActiveRecord::Migration[7.0]
def change
enable_extension "hstore"
create_table :http_log_entries do |t|
t.string :scheme
t.string :host
t.string :path
t.string :query
t.integer :verb
# request/response headers
t.references :request_headers
t.references :response_headers
t.integer :status_code
t.integer :response_time_ms
t.string :content_type
t.references :blob_entry
t.datetime :requested_at
t.timestamps
t.index [:host, :path]
t.index [:content_type]
end
create_table :http_log_entry_headers do |t|
t.binary :sha256, null: false
t.hstore :headers, null: false
t.index :sha256, unique: true
t.timestamps
end
create_table :blob_entries do |t|
t.binary :sha256
t.references :base, index: false
t.string :content_type
t.integer :size
t.binary :contents
t.timestamps
t.index :sha256, unique: true
end
end
end

46
db/schema.rb generated
View File

@@ -10,13 +10,57 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2023_01_29_050320) do
ActiveRecord::Schema[7.0].define(version: 2023_01_31_012417) do
# These are extensions that must be enabled in order to support this database
enable_extension "hstore"
enable_extension "pg_stat_statements"
enable_extension "plpgsql"
create_table "blob_entries", force: :cascade do |t|
t.binary "sha256"
t.bigint "base_id"
t.string "content_type"
t.integer "size"
t.binary "contents"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["sha256"], name: "index_blob_entries_on_sha256", unique: true
end
create_table "http_log_entries", force: :cascade do |t|
t.string "scheme"
t.string "host"
t.string "path"
t.string "query"
t.integer "verb"
t.bigint "request_headers_id"
t.bigint "response_headers_id"
t.integer "status_code"
t.integer "response_time_ms"
t.string "content_type"
t.bigint "blob_entry_id"
t.datetime "requested_at"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["blob_entry_id"], name: "index_http_log_entries_on_blob_entry_id"
t.index ["content_type"], name: "index_http_log_entries_on_content_type"
t.index ["host", "path"], name: "index_http_log_entries_on_host_and_path"
t.index ["request_headers_id"], name: "index_http_log_entries_on_request_headers_id"
t.index ["response_headers_id"], name: "index_http_log_entries_on_response_headers_id"
end
create_table "http_log_entry_headers", force: :cascade do |t|
t.binary "sha256", null: false
t.hstore "headers", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["sha256"], name: "index_http_log_entry_headers_on_sha256", unique: true
end
create_table "log_store_sst_entries", id: false, force: :cascade do |t|
t.binary "key", null: false
t.binary "value", null: false
t.index ["key"], name: "index_log_store_sst_entries_on_key", unique: true
end
end

20
start_migration.fish Executable file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env fish
set SESSION migration
set NUM_SHARDS $argv[1]
set START_AT $argv[2]
tmux new-session -d -s $SESSION
for i in (seq 2 $NUM_SHARDS)
tmux select-layout tiled
tmux split-window -t $SESSION:0.0 -h -d
end
tmux select-layout tiled
for i in (seq 0 $NUM_SHARDS)
tmux send-keys -t $SESSION:0.$i 'bin/rake migrate_legacy_http_entries[' $START_AT ', ' $NUM_SHARDS ', ' $i ']' C-m
end
tmux attach-session -t $SESSION

11
test/fixtures/blob_entries.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

11
test/fixtures/http_log_entries.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

View File

@@ -0,0 +1,11 @@
# Read about fixtures at https://api.rubyonrails.org/classes/ActiveRecord/FixtureSet.html
# This model initially had no columns defined. If you add columns to the
# model remove the "{}" from the fixture names and add the columns immediately
# below each fixture, per the syntax in the comments below
#
one: {}
# column: value
#
two: {}
# column: value

View File

@@ -0,0 +1,7 @@
require "test_helper"
class BlobEntryTest < ActiveSupport::TestCase
# test "the truth" do
# assert true
# end
end

View File

@@ -0,0 +1,7 @@
require "test_helper"
class HttpLogEntryHeaderTest < ActiveSupport::TestCase
# test "the truth" do
# assert true
# end
end

View File

@@ -0,0 +1,7 @@
require "test_helper"
class HttpLogEntryTest < ActiveSupport::TestCase
# test "the truth" do
# assert true
# end
end