fa importer job refactors and bug fixes

This commit is contained in:
2023-02-12 17:29:25 -08:00
parent f24e361e34
commit 9e0f0798aa
10 changed files with 218 additions and 81 deletions

View File

@@ -83,6 +83,7 @@ gem "diffy"
gem "rb-bsdiff", path: "../rb-bsdiff"
gem "ruby-prof"
gem "ruby-prof-speedscope"
gem "table_print"
gem "addressable"
# gem "concurrent-ruby-ext", require: "concurrent"

View File

@@ -190,6 +190,8 @@ GEM
io-console (~> 0.5)
rexml (3.2.5)
ruby-prof (1.4.5)
ruby-prof-speedscope (0.3.0)
ruby-prof (~> 1.0)
rubyzip (2.3.2)
selenium-webdriver (4.8.0)
rexml (~> 3.2, >= 3.2.5)
@@ -250,6 +252,7 @@ DEPENDENCIES
rails (~> 7.0.4, >= 7.0.4.2)
rb-bsdiff!
ruby-prof
ruby-prof-speedscope
selenium-webdriver
sprockets-rails
sqlite3 (~> 1.4)

View File

@@ -46,6 +46,14 @@ class ForkFuture
end.to_a.map(&:join)
end
def self.parallel_map_slice(num_processes, enumerator, &block)
ForkFuture.each_slice_impl(num_processes, enumerator).map do |slice|
ForkFuture.new do
block.call(slice)
end
end.to_a.map(&:join)
end
def join
wait!
r = @result[:result]

View File

@@ -0,0 +1,30 @@
class LegacyImport::BulkImportJob
def name
raise("implement #name")
end
def write_progress(progress)
File.write("tmp/#{name}_progress", progress.to_s)
end
def start_profiling!
RubyProf.start if profile?
end
def end_profiling!
if profile?
base = "profiler/#{name}"
Dir.mkdir_p(base) unless File.exist?(base)
result = RubyProf.stop
File.open("#{base}/profile.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.rubyprof", "w") do |f|
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
end
end
end
end

View File

@@ -0,0 +1,89 @@
class LegacyImport::FaPostImporter < LegacyImport::BulkImportJob
def initialize(batch_size:, forks:, start_at:)
@forks = forks || 32
@batch_size = batch_size || @forks * 32
@start_at = start_at || 0
@start_time = Time.now
end
def name
"fa_post_importer"
end
def profile?
false
end
def run
start_profiling!
progress = 0
query = ::Legacy::Fa::Post.includes(:blob_entry, :description_ref)
query.find_in_batches(start: @start_at, batch_size: @batch_size) do |batch|
# try to preload the blob entry file associated with the post
ForkFuture.each_slice_impl(32, batch).map do |smaller_batch|
Thread.new do
smaller_batch.each do |legacy_fa_post|
next if legacy_fa_post.blob_entry.nil?
path = legacy_fa_post.blob_entry.ensure_file_path
next unless path && File.exist?(path)
# try to read the file and then throw it away - we're just preloading it
IO.binread(path)
nil
rescue
puts "error preloading fa post #{legacy_fa_post.id} file: #{$!}"
end
end
end.map(&:join) if false
if @forks <= 1
progress += import_fa_posts(batch)
else
progress += ForkFuture.parallel_map_slice(@forks, batch) do |fork_batch|
import_fa_posts(fork_batch)
end.sum
end
rate = progress.to_f / (Time.now - @start_time)
puts "finish batch, last id #{batch.last&.id} - #{progress} - #{rate.round(1)} / second"
write_progress batch.last&.id
end
end_profiling!
end
def import_fa_posts(legacy_posts)
progress = 0
existing_new_post_ids = Set.new(
::Domain::Fa::Post.select(:fa_id).
where(fa_id: legacy_posts.map(&:fa_id)).
pluck(:fa_id)
)
legacy_posts.reject! do |legacy_post|
existing_new_post_ids.include?(legacy_post.fa_id)
end
legacy_posts.each do |legacy_post|
retries = 0
begin
ReduxApplicationRecord.transaction do
new_post = ::Domain::Fa::Post.build_from_legacy(legacy_post)
unless new_post.valid?
raise("errors on #{legacy_post.id}: #{new_post.errors.full_messages}")
end
new_post.save!
progress += 1
end
rescue
retries += 1
sleep 0.1 and retry if retries < 3
raise
end
end
ReduxApplicationRecord.clear_active_connections!
LegacyApplicationRecord.clear_active_connections!
progress
end
end

View File

@@ -1,5 +1,6 @@
class BlobEntry < ReduxApplicationRecord
self.primary_key = :sha256
EMPTY_FILE_SHA256 = HexUtil.hex2bin("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
belongs_to :base,
optional: true,
@@ -49,21 +50,27 @@ class BlobEntry < ReduxApplicationRecord
return entry if entry
end
return nil unless File.exist?(file_path)
file_contents = IO.binread(file_path)
file_mime = `file -ib #{file_path}`
raise("error running `file` on #{file_path}: #{file_mime}") if $?.exitstatus != 0
file_mime.chomp!
file_contents = IO.binread(file_path)
record = find_or_build(file_mime, file_contents)
# guess the name is a sha256 hash
if file_name.length == 64
if record.sha256 != file_sha256_assumed
raise("checksum mismatch for #{file_path}: #{HexUtil.bin2hex(record.sha256)} != #{file_name}")
# checksum for an empty file
if record.sha256 == EMPTY_FILE_SHA256
return nil
else
raise("checksum mismatch for #{file_path}: #{HexUtil.bin2hex(record.sha256)} != #{file_name}")
end
end
end
puts ("[blob entry] built #{file_mime} (#{HexUtil.humansize(record.size)})")
# puts ("[blob entry] built #{file_mime} (#{HexUtil.humansize(record.size)})")
record.created_at = legacy_be.created_at
record.updated_at = legacy_be.updated_at
record

View File

@@ -40,52 +40,59 @@ class Domain::Fa::Post < ReduxApplicationRecord
post = Domain::Fa::Post.new
if legacy_post.blob_entry
legacy_be = legacy_post.blob_entry
uri = Addressable::URI.parse(legacy_post.file_url) if legacy_post.file_url
uri.scheme ||= "https" if uri
# may be able to be found by original blob entry id
original_le = ::Legacy::HttpLogEntry.find_by(blob_entry_id: legacy_be.id)
if uri && !uri.host && (uri.path =~ /^\/full\//)
puts "[domain fa post] (#{legacy_post.id}) invalid path for file, skipping: #{legacy_post.file_url}"
uri = nil
elsif uri && legacy_post.blob_entry
legacy_be = legacy_post.blob_entry
# yay, check if we need to make a new blob entry
blob_entry = ::BlobEntry.find_or_build_from_legacy(legacy_be)
uri = Addressable::URI.parse(legacy_post.file_url)
uri.scheme ||= "https"
uri.path ||= "/"
# although the blob entry backing file may be missing
if blob_entry
# may be able to be found by original blob entry id
original_le = ::Legacy::HttpLogEntry.find_by(blob_entry_id: legacy_be.id)
log_entry = ::HttpLogEntry.find_by(id: original_le.id) || begin
::HttpLogEntry.build_from_legacy(original_le)
end if original_le
log_entry = ::HttpLogEntry.find_by(id: original_le.id) || begin
::HttpLogEntry.build_from_legacy(original_le)
end if original_le
log_entry ||= begin
legacy_hle = ::Legacy::HttpLogEntry.find_by(
host: uri.host,
path: uri.path,
)
legacy_hle && ::HttpLogEntry.find_by(
id: legacy_hle.id,
response_sha256: blob_entry.sha256,
)
end
log_entry ||= begin
legacy_hle = ::Legacy::HttpLogEntry.find_by(
host: uri.host,
path: uri.path,
)
legacy_hle && ::HttpLogEntry.find_by(
id: legacy_hle.id,
response_sha256: blob_entry.sha256,
)
end
# couldn't reconstruct from a legacy http log entry, try to guess and make a new one
log_entry ||=
::HttpLogEntry.new({
uri: uri,
status_code: 200,
verb: :get,
response_time_ms: -1,
content_type: blob_entry.content_type,
requested_at: Time.now,
request_headers: ::HttpLogEntryHeader.find_or_create({}),
response_headers: ::HttpLogEntryHeader.find_or_create({}),
response: blob_entry,
})
# couldn't reconstruct from a legacy http log entry, try to guess and make a new one
log_entry ||=
::HttpLogEntry.new({
uri: uri,
status_code: 200,
verb: :get,
response_time_ms: -1,
content_type: blob_entry.content_type,
requested_at: Time.now,
request_headers: ::HttpLogEntryHeader.find_or_create({}),
response_headers: ::HttpLogEntryHeader.find_or_create({}),
response: blob_entry,
})
raise("mismatch") unless log_entry.response == blob_entry
raise("mismatch") unless HexUtil.bin2hex(log_entry.response.sha256) == legacy_be.sha256
raise("mismatch") unless log_entry.response == blob_entry
raise("mismatch") unless HexUtil.bin2hex(log_entry.response.sha256) == legacy_be.sha256
post.file = log_entry
post.file = log_entry
else
puts "[domain fa post] (#{legacy_post.id}) unable to reconstruct blob entry from #{legacy_be.id}"
end
end
# TODO: populate creator_id as well - need to build the model
@@ -106,7 +113,7 @@ class Domain::Fa::Post < ReduxApplicationRecord
].each do |field|
post.send(:"#{field}=", legacy_post.send(field))
end
post.file_url_str = legacy_post.file_url
post.file_url_str = uri.to_s if uri
post
end

View File

@@ -17,7 +17,7 @@ redux_prod: &redux_prod
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/redux_migrate
pool: 64
pool: 4
redux_test: &redux_test
adapter: postgresql
@@ -27,7 +27,7 @@ redux_test: &redux_test
username: scraper_redux
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/redux_migrate
pool: 64
pool: 4
legacy_prod: &legacy_prod
adapter: postgresql
@@ -40,7 +40,7 @@ legacy_prod: &legacy_prod
password: pdkFLqRmQwPUPaDDC4pX
migrations_paths: db/legacy_migrate
database_tasks: false
pool: 64
pool: 4
legacy_prod_readonly: &legacy_prod_readonly
adapter: postgresql
@@ -51,7 +51,7 @@ legacy_prod_readonly: &legacy_prod_readonly
password: zL7zDRXycLhLFJLQj5Zh
migrations_paths: db/legacy_migrate
database_tasks: false
pool: 64
pool: 4
development:
redux:

View File

@@ -1,42 +1,12 @@
namespace :fa_post do
desc "Import existing FA posts"
task :import_existing, [:start_at] => [:environment] do |t, args|
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i || 100
start_at = args[:start_at]&.to_i || ENV["start_at"]&.to_i || 0
start_time = Time.now
progress = 0
batch_size = args[:batch_size]&.to_i || ENV["batch_size"]&.to_i
forks = args[:forks]&.to_i || ENV["forks"]&.to_i
start_at = args[:start_at]&.to_i || ENV["start_at"]&.to_i
query = ::Legacy::Fa::Post.includes(:blob_entry, :description_ref)
query.find_in_batches(start: start_at, batch_size: batch_size) do |batch|
existing_ids = Set.new(
::Domain::Fa::Post.select(:fa_id).
where(fa_id: batch.map(&:fa_id)).
pluck(:fa_id)
)
batch.reject! do |legacy_post|
existing_ids.include?(legacy_post.fa_id)
end
ForkFuture.parallel_each(4, batch) do |legacy_post|
retries = 0
begin
ReduxApplicationRecord.transaction do
new_post = ::Domain::Fa::Post.build_from_legacy(legacy_post)
unless new_post.valid?
raise("errors on #{legacy_post.id}: #{new_post.errors.full_messages}")
end
new_post.save!
end
rescue
sleep 0.1 and retry if (retries += 1) < 3
raise
end
end
progress += batch.size
rate = progress.to_f / (Time.now - start_time)
puts "finish batch, last id #{batch.last&.id} - #{progress} - #{rate.round(1)} / second"
end
LegacyImport::FaPostImporter.
new(batch_size: batch_size, forks: forks, start_at: start_at).
run
end
end

View File

@@ -59,6 +59,12 @@ class Domain::Fa::PostTest < ActiveSupport::TestCase
refute new_post.file.persisted?
assert_equal expected_sha256, new_post.file.response.sha256
assert_equal 21826851, new_post.fa_id
# TODO - populate creator
# assert_equal "Drake_Ergenthal", new_post.creator.name
assert_equal "Story", new_post.category
assert_equal "All", new_post.theme
[
:verb,
:content_type,
@@ -73,6 +79,14 @@ class Domain::Fa::PostTest < ActiveSupport::TestCase
assert_equal legacy_le.status, new_post.file.status_code
end
test "remove buggy prefixes" do
# TODO - implement this
# Some posts have a title prefixed with "Font size adjustment: smallerlarger"
# which should be removed
# Legacy::Fa::Post.where("title like ?", "Font size adjustment: smallerlarger%").count
# => 7056
end
test "can manipulate associated log entries" do
post = ::Domain::Fa::Post.new(fa_id: 12345)
assert post.valid?, post.errors.full_messages
@@ -83,4 +97,12 @@ class Domain::Fa::PostTest < ActiveSupport::TestCase
assert post.valid?, post.errors.full_messages
post.save!
end
test "skips posts with an invalid full file url" do
legacy_post = ::Legacy::Fa::Post.find(3234144)
new_post = ::Domain::Fa::Post.build_from_legacy(legacy_post)
assert new_post.valid?, new_post.errors.full_messages
assert new_post.file.nil?
new_post.save!
end
end