make sorbet structs comparable

This commit is contained in:
Dylan Knutson
2025-02-17 19:35:45 +00:00
parent cfb8d6e714
commit 27253ff50b
20 changed files with 201 additions and 98 deletions

View File

@@ -172,3 +172,4 @@ gem "prometheus_exporter", "~> 2.2"
gem "sorbet-static-and-runtime"
gem "tapioca", require: false
gem "rspec-sorbet", group: [:test]
gem "sorbet-struct-comparable"

View File

@@ -451,6 +451,8 @@ GEM
sorbet-static-and-runtime (0.5.11711)
sorbet (= 0.5.11711)
sorbet-runtime (= 0.5.11711)
sorbet-struct-comparable (1.3.0)
sorbet-runtime (>= 0.5)
spoom (1.5.0)
erubi (>= 1.10.0)
prism (>= 0.28.0)
@@ -596,6 +598,7 @@ DEPENDENCIES
shakapacker (~> 6.6)
shoulda-matchers
sorbet-static-and-runtime
sorbet-struct-comparable
sprockets-rails
sqlite3 (~> 1.4)
stackprof

View File

@@ -27,6 +27,8 @@ module Domain::PostsHelper
end
class DomainData < T::Struct
include T::Struct::ActsAsComparable
const :domain_icon_path, String
const :domain_icon_title, String
end

View File

@@ -7,6 +7,8 @@ module GoodJobHelper
abstract!
class AnsiSegment < T::Struct
include T::Struct::ActsAsComparable
const :text, String
const :class_names, T::Array[String]
end

View File

@@ -5,6 +5,8 @@ class Domain::E621::Job::ScanPostFavsJob < Domain::E621::Job::Base
MAX_USERS_PER_SLICE = 1000
class UserRow < T::Struct
include T::Struct::ActsAsComparable
const :e621_id, Integer
const :name, String
const :num_other_favs, Integer

View File

@@ -275,44 +275,37 @@ class Domain::Fa::Job::Base < Scraper::JobBase
{ url_name: user.url_name }
end
if enqueue_page_scan && users_enqueued_for_page_scan.add?(user.url_name)
if user.due_for_page_scan?
logger.info(
format_tags(
"enqueue user page job",
make_tag("last scanned", time_ago_in_words(user.scanned_page_at)),
),
)
defer_job(Domain::Fa::Job::UserPageJob, args)
end
if enqueue_page_scan && user.due_for_page_scan? &&
defer_job(Domain::Fa::Job::UserPageJob, args)
logger.info(
format_tags(
"enqueue user page job",
make_tag("last scanned", time_ago_in_words(user.scanned_page_at)),
),
)
end
if enqueue_gallery_scan &&
users_enqueued_for_gallery_scan.add?(user.url_name)
if user.due_for_gallery_scan?
logger.info(
format_tags(
"enqueue user gallery job",
make_tag(
"last scanned",
time_ago_in_words(user.scanned_gallery_at),
),
if enqueue_gallery_scan && user.due_for_gallery_scan? &&
defer_job(Domain::Fa::Job::UserGalleryJob, args)
logger.info(
format_tags(
"enqueue user gallery job",
make_tag(
"last scanned",
time_ago_in_words(user.scanned_gallery_at),
),
)
defer_job(Domain::Fa::Job::UserGalleryJob, args)
end
),
)
end
if enqueue_favs_scan && users_enqueued_for_favs_scan.add?(user.url_name)
if user.due_for_favs_scan?
logger.info(
format_tags(
"enqueue user favs job",
make_tag("last scanned", time_ago_in_words(user.scanned_favs_at)),
),
)
defer_job(Domain::Fa::Job::FavsJob, args)
end
if enqueue_favs_scan && user.due_for_favs_scan? &&
defer_job(Domain::Fa::Job::FavsJob, args)
logger.info(
format_tags(
"enqueue user favs job",
make_tag("last scanned", time_ago_in_words(user.scanned_favs_at)),
),
)
end
end
end
@@ -333,14 +326,13 @@ class Domain::Fa::Job::Base < Scraper::JobBase
def enqueue_fa_id_scan(fa_id, enqueue_pri = nil)
enqueue_pri = self.class.normalize_enqueue_pri(enqueue_pri)
logger.tagged(make_tag("fa_id", fa_id)) do
if @posts_enqueued_for_scan.add?(fa_id)
if @posts_enqueued_for_scan.add?(fa_id) &&
defer_job(
Domain::Fa::Job::ScanPostJob,
{ fa_id: fa_id },
{ priority: enqueue_pri },
)
logger.info format_tags("enqueue post scan", make_tag("fa_id", fa_id))
defer_job(
Domain::Fa::Job::ScanPostJob,
{ fa_id: fa_id },
{ priority: enqueue_pri },
)
end
end
end
@@ -354,27 +346,29 @@ class Domain::Fa::Job::Base < Scraper::JobBase
if @posts_enqueued_for_scan.add?(T.must(post.fa_id))
fa_id_str = (post.fa_id || "(nil)").to_s.bold
if !post.scanned_at.present?
logger.info format_tags(
"enqueue post scan",
make_tag("fa_id", fa_id_str),
)
defer_job(
Domain::Fa::Job::ScanPostJob,
{ post: post },
{ priority: enqueue_pri },
)
if defer_job(
Domain::Fa::Job::ScanPostJob,
{ post: post },
{ priority: enqueue_pri },
)
logger.info format_tags(
"enqueue post scan",
make_tag("fa_id", fa_id_str),
)
end
elsif (post_file = post.file) && post_file.url_str.present? &&
post_file.log_entry.nil?
logger.info format_tags(
"enqueue file scan",
make_tag("fa_id", fa_id_str),
make_tag("post_file.id", post_file.id),
)
defer_job(
Domain::Fa::Job::ScanFileJob,
{ post_file: },
{ priority: enqueue_pri },
)
if defer_job(
Domain::Fa::Job::ScanFileJob,
{ post_file: },
{ priority: enqueue_pri },
)
logger.info format_tags(
"enqueue file scan",
make_tag("fa_id", fa_id_str),
make_tag("post_file.id", post_file.id),
)
end
end
end
end

View File

@@ -32,14 +32,20 @@ class Domain::Fa::Job::ScanUserUtils
module DisabledOrNotFoundResult
class Fatal < T::Struct
include T::Struct::ActsAsComparable
const :message, String
end
class Stop < T::Struct
include T::Struct::ActsAsComparable
const :message, String
end
class Ok < T::Struct
include T::Struct::ActsAsComparable
const :page, Domain::Fa::Parser::Page
end
end

View File

@@ -4,7 +4,9 @@ class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::Base
MAX_PAGE_NUMBER = 350
class Folder < T::Struct
class Folder < T::ImmutableStruct
include T::Struct::ActsAsComparable
const :href, String
const :title, String
end

View File

@@ -69,7 +69,7 @@ class Scraper::JobBase < ApplicationJob
sig { params(args: T.untyped).void }
def initialize(*args)
super(*T.unsafe(args))
@deferred_jobs = T.let([], T::Array[DeferredJob])
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@gallery_dl_client = T.let(nil, T.nilable(Scraper::GalleryDlClient))
@first_log_entry = T.let(nil, T.nilable(HttpLogEntry))
@@ -364,10 +364,10 @@ class Scraper::JobBase < ApplicationJob
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).void
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
@deferred_jobs << DeferredJob.new(job_class:, params:, set_args:)
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig { void }

View File

@@ -1,6 +1,8 @@
# typed: strict
class DeferredJob < T::Struct
class DeferredJob < T::ImmutableStruct
include T::Struct::ActsAsComparable
const :job_class, T.class_of(Scraper::JobBase)
const :params, T::Hash[Symbol, T.untyped]
const :set_args, T::Hash[Symbol, T.untyped]

View File

@@ -6,36 +6,74 @@ class Domain::BlobFile::MigrateBlobEntryToBlobFile
sig { params(log_sink: T.any(IO, StringIO)).void }
def initialize(log_sink: $stderr)
@num_migrated = T.let(0, Integer)
@num_processed = T.let(0, Integer)
@start_time = T.let(Time.current, ActiveSupport::TimeWithZone)
@log_sink = log_sink
@channel = T.let(Concurrent::Channel.new(capacity: 8), Concurrent::Channel)
end
sig { params(batch_size: Integer, start_sha256: String).void }
def run(batch_size: 16, start_sha256: ZERO_SHA256)
start_sha256_bin = HexUtil.hex2bin(start_sha256)
BlobEntry.in_batches(
of: batch_size,
start: start_sha256_bin,
order: :asc,
use_ranges: true,
) do |batch|
reader_thread =
Thread.new do
Thread.current.name = "reader"
start_sha256_bin = HexUtil.hex2bin(start_sha256)
BlobEntry
.includes(:base)
.in_batches(
of: batch_size,
start: start_sha256_bin,
order: :asc,
use_ranges: true,
) do |batch|
@channel.put(batch)
# puts "#{Thread.current.name.rjust(10)} | put #{batch.size} batch"
end
@channel.put(nil)
@channel.close
end
writer_thread_1 =
Thread.new do
Thread.current.name = "writer 1"
writer_thread_loop
end
writer_thread_2 =
Thread.new do
Thread.current.name = "writer 2"
writer_thread_loop
end
reader_thread.join
writer_thread_1.join
writer_thread_2.join
end
sig { void }
def writer_thread_loop
num_migrated = 0
num_processed = 0
start_time = Time.current
while (batch = @channel.take)
# puts "#{Thread.current.name.rjust(10)} | take #{batch.size} batch"
batch_migrated = insert_blob_entries_batch(batch)
@num_migrated += batch_migrated
@num_processed += batch.size
rate = batch_migrated.to_f / (Time.current - @start_time)
num_migrated += batch_migrated
num_processed += batch.size
rate = batch_migrated.to_f / (Time.current - start_time)
last = batch.last&.sha256
last_hex = last ? HexUtil.bin2hex(last) : "nil"
@log_sink.puts(
[
"[migrated: #{n2d(@num_migrated)}]",
"[processed: #{n2d(@num_processed)}]",
"[#{Thread.current.name.rjust(10)}]",
"[migrated: #{n2d(num_migrated)}]",
"[processed: #{n2d(num_processed)}]",
"[rate: #{rate.round(1).to_s.rjust(5)}/second]",
"[last: '#{last_hex}']",
].join(" "),
)
@start_time = Time.current
num_migrated += batch_migrated
num_processed += batch.size
start_time = Time.current
end
end
@@ -45,24 +83,16 @@ class Domain::BlobFile::MigrateBlobEntryToBlobFile
def insert_blob_entries_batch(batch)
num_migrated = 0
blob_entry_sha256s = batch.pluck(:sha256)
blob_file_sha256s =
BlobFile.where(sha256: blob_entry_sha256s).pluck(:sha256)
missing_sha256s = blob_entry_sha256s - blob_file_sha256s
BlobFile.transaction do
BlobEntry
.where(sha256: missing_sha256s)
.each do |blob_entry|
blob_file_attributes =
batch.map do |blob_entry|
blob_file = BlobFile.initialize_from_blob_entry(blob_entry)
sha256_hex = HexUtil.bin2hex(T.must(blob_file.sha256))
begin
blob_file.save!
num_migrated += 1
rescue ActiveRecord::RecordInvalid => e
@log_sink.puts "error saving blob file #{sha256_hex}: #{e}"
end
blob_file.write_to_disk!
blob_file.attributes.except("created_at", "updated_at")
end
BlobFile.insert_all(blob_file_attributes, unique_by: %i[sha256])
num_migrated += blob_file_attributes.size
end
num_migrated
end

View File

@@ -231,7 +231,9 @@ class Domain::Fa::Parser::UserPageHelper < Domain::Fa::Parser::Base
end
class RecentUser < T::Struct
include T::Struct::ActsAsComparable
extend T::Sig
const :name, String
const :url_name, String

View File

@@ -241,7 +241,7 @@ class Domain::MigrateToDomain
ReduxApplicationRecord.connection.execute(<<~SQL)
-- Map old user IDs to new user IDs:
DROP TABLE IF EXISTS user_map;
CREATE TEMP TABLE user_map TABLESPACE mirai AS
CREATE TABLE user_map TABLESPACE mirai AS
SELECT old_users.id AS old_user_id, new_users.id AS new_user_id
FROM domain_fa_users old_users
JOIN domain_users new_users
@@ -254,7 +254,7 @@ class Domain::MigrateToDomain
-- Map old post IDs to new post IDs:
DROP TABLE IF EXISTS post_map;
CREATE TEMP TABLE post_map TABLESPACE mirai AS
CREATE TABLE post_map TABLESPACE mirai AS
SELECT old_posts.id AS old_post_id, new_posts.id AS new_post_id
FROM domain_fa_posts old_posts
JOIN domain_posts new_posts

View File

@@ -3,6 +3,8 @@ module Domain::RouteHelper
extend T::Sig
class RouteData < T::Struct
include T::Struct::ActsAsComparable
const :klass, T.class_of(ActiveRecord::Base)
const :resource_name, Symbol
const :param, Symbol

View File

@@ -6,6 +6,8 @@ class Scraper::CurlHttpPerformer
# Struct.new(:response_code, :response_headers, :response_time_ms, :body)
class Response < T::Struct
include T::Struct::ActsAsComparable
const :response_code, Integer
const :response_headers, T::Hash[String, String]
const :response_time_ms, Integer

View File

@@ -7,6 +7,8 @@ class Scraper::HttpClient
attr_reader :config
class Response < T::Struct
include T::Struct::ActsAsComparable
const :status_code, Integer
const :body, String
const :log_entry, HttpLogEntry

View File

@@ -41,7 +41,9 @@ class BlobFile < ReduxApplicationRecord
message: "can't be nil",
}
before_save do
before_save :write_to_disk!
sig { void }
def write_to_disk!
if not self.persisted?
unless File.exist?(self.absolute_file_path)
FileUtils.mkdir_p(File.dirname(self.absolute_file_path))

View File

@@ -1,6 +1,15 @@
require "find"
namespace :blob_file do
desc "migrate blob entries in parallel"
task migrate_blob_entries_parallel: %i[environment] do
batch_size = ENV["batch_size"]&.to_i || 16
start_at = ENV["start_at"] || "0" * 64
migrator = Domain::BlobFile::MigrateBlobEntryToBlobFile.new
migrator.run(batch_size: batch_size, start_sha256: start_at)
end
desc "migrate blob files to the new format"
task migrate_blob_entries: %i[environment] do
batch_size = ENV["batch_size"]&.to_i || 1000

View File

@@ -0,0 +1,35 @@
# typed: true
# DO NOT EDIT MANUALLY
# This is an autogenerated file for types exported from the `sorbet-struct-comparable` gem.
# Please instead update this file by running `bin/tapioca gem sorbet-struct-comparable`.
# source://sorbet-struct-comparable//lib/sorbet-struct-comparable/version.rb#4
module SorbetStructComparable; end
# source://sorbet-struct-comparable//lib/sorbet-struct-comparable/version.rb#5
SorbetStructComparable::VERSION = T.let(T.unsafe(nil), String)
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#6
module T::Struct::ActsAsComparable
include ::Comparable
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#14
sig { params(other: ::Object).returns(T.nilable(::Integer)) }
def <=>(other); end
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#26
sig { params(other: ::Object).returns(T::Boolean) }
def eql?(other); end
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#31
sig { returns(::Integer) }
def hash; end
end
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#10
T::Struct::ActsAsComparable::EQUAL = T.let(T.unsafe(nil), Integer)
# source://sorbet-struct-comparable//lib/t/struct/acts_as_comparable.rb#11
T::Struct::ActsAsComparable::NOT_COMPARABLE = T.let(T.unsafe(nil), T.untyped)

View File

@@ -19,6 +19,11 @@ RSpec.describe Domain::BlobFile::MigrateBlobEntryToBlobFile do
context "when migrating multiple BlobEntries" do
let!(:blob_entries) { 3.times.map { |i| create(:blob_entry) } }
it "can migrate with a batch size of 1" do
migrator.run(batch_size: 1)
expect(BlobFile.count).to eq(3)
end
it "migrates all entries in batches" do
expect { migrator.run }.to change(BlobFile, :count).by(3)
blob_entries.each do |blob_entry|