69 lines
2.0 KiB
Ruby
69 lines
2.0 KiB
Ruby
# typed: strict
|
|
# frozen_string_literal: true
|
|
|
|
class MigrateDomainUserPostFavsFa < ActiveRecord::Migration[7.2]
|
|
disable_ddl_transaction!
|
|
|
|
sig { void }
|
|
def up
|
|
min_user_id = Domain::User.minimum(:id)
|
|
max_user_id = Domain::User.maximum(:id) + 1
|
|
max_batch_size = 1000
|
|
batch_count = ((max_user_id - min_user_id) / max_batch_size.to_f).ceil
|
|
|
|
puts "Migrating #{batch_count} batches..."
|
|
shards =
|
|
batch_count.times.map do |batch_index|
|
|
start_user_id = min_user_id + batch_index * max_batch_size
|
|
end_user_id = [start_user_id + max_batch_size, max_user_id].min
|
|
[start_user_id, end_user_id]
|
|
end
|
|
|
|
num_threads = 4
|
|
pool =
|
|
T.let(
|
|
Concurrent::FixedThreadPool.new(num_threads),
|
|
Concurrent::FixedThreadPool,
|
|
)
|
|
shards.each_with_index do |shard, index|
|
|
pool.post do
|
|
puts "migrate shard #{index + 1} of #{shards.size}: #{shard.join(" -> ")}"
|
|
migrate_shard(*shard)
|
|
puts "done: shard #{index + 1} of #{shards.size}: #{shard.join(" -> ")}"
|
|
end
|
|
end
|
|
|
|
pool.shutdown
|
|
pool.wait_for_termination
|
|
end
|
|
|
|
sig { params(start_user_id: Integer, end_user_id: Integer).void }
|
|
def migrate_shard(start_user_id, end_user_id)
|
|
ActiveRecord::Base.with_connection do |connection|
|
|
connection.execute <<-SQL
|
|
INSERT INTO
|
|
domain_user_post_favs_fa (
|
|
user_id,
|
|
post_id,
|
|
fa_fav_id,
|
|
removed,
|
|
explicit_time,
|
|
inferred_time
|
|
)
|
|
SELECT
|
|
user_id,
|
|
post_id,
|
|
(json_attributes->>'fav_id')::integer as fa_fav_id,
|
|
removed,
|
|
to_timestamp((json_attributes->>'explicit_time')::integer) as explicit_time,
|
|
to_timestamp((json_attributes->>'inferred_time')::integer) as inferred_time
|
|
FROM domain_user_post_favs
|
|
WHERE type = 'Domain::UserPostFav::FaUserPostFav'
|
|
AND user_id >= #{start_user_id}
|
|
AND user_id < #{end_user_id}
|
|
ON CONFLICT (user_id, post_id) DO NOTHING
|
|
SQL
|
|
end
|
|
end
|
|
end
|