71 lines
2.0 KiB
Ruby
71 lines
2.0 KiB
Ruby
# typed: strict
|
|
# frozen_string_literal: true
|
|
|
|
class MigrateDomainUserPostFavsE621 < ActiveRecord::Migration[7.2]
|
|
disable_ddl_transaction!
|
|
|
|
sig { void }
|
|
def up
|
|
puts "Getting min/max user id..."
|
|
min_user_id =
|
|
Domain::User.where(type: "Domain::User::E621User").minimum(:id)
|
|
max_user_id =
|
|
Domain::User.where(type: "Domain::User::E621User").maximum(:id) + 1
|
|
max_batch_size = 1000
|
|
batch_count = ((max_user_id - min_user_id) / max_batch_size.to_f).ceil
|
|
|
|
puts "Migrating #{batch_count} batches..."
|
|
shards =
|
|
T.cast(
|
|
batch_count.times.map do |batch_index|
|
|
start_user_id = min_user_id + batch_index * max_batch_size
|
|
end_user_id = [start_user_id + max_batch_size, max_user_id].min
|
|
|
|
Domain::User.where(
|
|
type: "Domain::User::E621User",
|
|
id: start_user_id...end_user_id,
|
|
).pluck(:id)
|
|
end,
|
|
T::Array[T::Array[Integer]],
|
|
)
|
|
|
|
num_threads = 4
|
|
pool =
|
|
T.let(
|
|
Concurrent::FixedThreadPool.new(num_threads),
|
|
Concurrent::FixedThreadPool,
|
|
)
|
|
shards.each_with_index do |shard, index|
|
|
pool.post do
|
|
puts "migrate shard #{index + 1} of #{shards.size}: #{shard.minmax.join(" -> ")} (#{shard.size} users)"
|
|
migrate_shard(shard)
|
|
puts "done: shard #{index + 1} of #{shards.size}: #{shard.minmax.join(" -> ")}"
|
|
end
|
|
end
|
|
|
|
pool.shutdown
|
|
pool.wait_for_termination
|
|
end
|
|
|
|
sig { params(user_ids: T::Array[Integer]).void }
|
|
def migrate_shard(user_ids)
|
|
ActiveRecord::Base.with_connection do |connection|
|
|
connection.execute <<-SQL
|
|
INSERT INTO
|
|
domain_user_post_favs_e621 (
|
|
user_id,
|
|
post_id,
|
|
removed
|
|
)
|
|
SELECT
|
|
user_id,
|
|
post_id,
|
|
removed
|
|
FROM domain_user_post_favs
|
|
WHERE user_id IN (#{user_ids.join(", ")})
|
|
ON CONFLICT (user_id, post_id) DO NOTHING
|
|
SQL
|
|
end
|
|
end
|
|
end
|