Files
redux-scraper/app/lib/domain/migrate_to_domain.rb
2025-06-17 05:56:11 +00:00

1100 lines
38 KiB
Ruby

# typed: strict
class Domain::MigrateToDomain
extend T::Sig
include HasColorLogger
sig { params(pb_sink: IO).void }
def initialize(pb_sink = $stderr)
@pb_sink = pb_sink
end
sig { params(only_user: T.nilable(String)).void }
def migrate_e621_users(only_user: nil)
logger.info "migrating e621 users"
if only_user
query = Domain::E621::User.where(name: only_user)
else
query =
Domain::E621::User.where.not(
e621_user_id: Domain::User::E621User.select(:e621_id),
)
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 10_000) do |batch|
migrate_batch(
Domain::User::E621User,
batch,
unique_by: [:idx_domain_e621_users_on_e621_id],
) { |old_model| initialize_e621_user_from(old_model) }
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_e621_posts(only_user: nil)
logger.info "migrating e621 posts"
if only_user
user = Domain::E621::User.find_by(name: only_user)
if user.nil?
logger.info "e621 user not found: #{only_user}"
return
end
query = user.faved_posts
else
query =
Domain::E621::Post.joins(
"LEFT JOIN domain_posts ON domain_e621_posts.e621_id =
(domain_posts.json_attributes->>'e621_id')::integer
AND domain_posts.type = 'Domain::Post::E621Post'",
).where("domain_posts.id IS NULL")
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query
.includes(:file)
.find_in_batches(batch_size: 10_000) do |batch|
ReduxApplicationRecord.transaction do
models =
migrate_batch(
Domain::Post::E621Post,
batch,
unique_by: [:idx_domain_e621_posts_on_e621_id],
) { |old_model| initialize_e621_post_from(old_model) }
migrate_batch(
Domain::PostFile,
models.filter(&:file),
unique_by: [:index_domain_post_files_on_log_entry_id],
) do |post|
file = T.must(post.file)
file.post_id = T.must(post.id)
file
end
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_fa_users(only_user: nil)
logger.info "migrating fa users"
if only_user
user = Domain::Fa::User.find_by(url_name: only_user)
if user.nil?
logger.info "fa user not found: #{only_user}"
return
end
query =
Domain::Fa::User.where(url_name: only_user).or(
Domain::Fa::User.where(id: user.follows.select(:id)),
)
else
query =
Domain::Fa::User.joins(
"LEFT JOIN domain_users ON domain_fa_users.url_name =
domain_users.json_attributes->>'url_name'
AND domain_users.type = 'Domain::User::FaUser'",
).where("domain_users.id IS NULL")
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 10_000) do |batch|
ReduxApplicationRecord.transaction do
models =
migrate_batch(
Domain::User::FaUser,
batch,
unique_by: [:idx_domain_fa_users_on_url_name],
) { |old_user| initialize_fa_user_from(old_user) }
migrate_batch(Domain::UserAvatar, models.filter(&:avatar)) do |user|
avatar = T.must(user.avatar)
avatar.user_id = user.id
avatar
end
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_fa_posts(only_user: nil)
logger.info "migrating fa posts"
if only_user
user = Domain::Fa::User.find_by(url_name: only_user)
if user.nil?
logger.info "fa user not found: #{only_user}"
return
end
old_fa_ids =
(user.posts.pluck(:fa_id) + user.fav_posts.pluck(:fa_id)).uniq
missing_fa_ids = old_fa_ids
logger.info "missing_fa_ids: #{missing_fa_ids.size}"
missing_fa_ids.sort!
else
old_fa_ids = Domain::Fa::Post.pluck(:fa_id)
logger.info "old_fa_ids: #{old_fa_ids.size}"
new_fa_ids = Domain::Post::FaPost.pluck(:fa_id)
logger.info "new_fa_ids: #{new_fa_ids.size}"
missing_fa_ids = old_fa_ids - new_fa_ids
missing_fa_ids.sort!
logger.info "missing_fa_ids: #{missing_fa_ids.size}"
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: missing_fa_ids.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
missing_fa_ids.each_slice(1_000) do |fa_ids|
batch =
Domain::Fa::Post.where(fa_id: fa_ids).includes(:creator, :file).to_a
ReduxApplicationRecord.transaction do
if only_user
migrate_batch(
Domain::User::FaUser,
batch.map(&:creator).compact.uniq,
unique_by: [:idx_domain_fa_users_on_url_name],
) { |user| initialize_fa_user_from(user) }
end
initialized_models =
migrate_batch(
Domain::Post::FaPost,
batch,
unique_by: [:idx_domain_fa_posts_on_fa_id],
) { |old_post| initialize_fa_post_from(old_post) }
migrate_batch(
Domain::PostFile,
initialized_models.filter(&:file),
unique_by: [:index_domain_post_files_on_log_entry_id],
) do |post|
file = T.must(post.file)
file.post_id = post.id
file
end
migrate_batch(
Domain::UserPostCreation,
initialized_models.filter(&:primary_user_post_creation),
unique_by: [:index_domain_user_post_creations_on_user_id_and_post_id],
) do |post|
user_post_creation = T.must(post.primary_user_post_creation)
user_post_creation.post_id = post.id
user_post_creation
end
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_e621_users_favs(only_user: nil)
logger.info "migrating e621 users favs"
# ReduxApplicationRecord.connection.execute(<<~SQL)
# -- Map old user IDs to new user IDs:
# DROP TABLE IF EXISTS e621_user_map;
# CREATE TABLE e621_user_map TABLESPACE mirai AS
# SELECT old_users.id AS old_user_id, new_users.id AS new_user_id
# FROM domain_e621_users old_users
# JOIN domain_users new_users
# ON new_users.json_attributes->>'e621_id' = old_users.e621_user_id
# WHERE new_users.type = 'Domain::User::E621User';
# CREATE INDEX idx_user_map_old_user_id ON e621_user_map(old_user_id, new_user_id) TABLESPACE mirai;
# CREATE INDEX idx_user_map_new_user_id ON e621_user_map(new_user_id, old_user_id) TABLESPACE mirai;
# ANALYZE e621_user_map;
# -- Map old post IDs to new post IDs:
# DROP TABLE IF EXISTS e621_post_map;
# CREATE TABLE e621_post_map TABLESPACE mirai AS
# SELECT old_posts.id AS old_post_id, new_posts.id AS new_post_id
# FROM domain_e621_posts old_posts
# JOIN domain_posts new_posts
# ON (new_posts.json_attributes->>'e621_id')::integer = old_posts.e621_id
# WHERE new_posts.type = 'Domain::Post::E621Post';
# CREATE INDEX idx_post_map_old_post_id ON e621_post_map(old_post_id, new_post_id) TABLESPACE mirai;
# CREATE INDEX idx_post_map_new_post_id ON e621_post_map(new_post_id, old_post_id) TABLESPACE mirai;
# ANALYZE e621_post_map;
# DO $$
# DECLARE
# v_user_ids bigint[];
# v_batch_size integer := 10; -- Adjust batch size as needed
# v_total_count integer;
# v_processed_count integer := 0;
# v_progress numeric;
# v_batch bigint[];
# BEGIN
# RAISE NOTICE 'Counting users...';
# -- Fetch all distinct user_ids into an array
# SELECT array_agg(domain_e621_users.id ORDER BY domain_e621_users.id)
# INTO v_user_ids
# FROM domain_e621_users
# INNER JOIN e621_user_map um ON domain_e621_users.id = um.old_user_id
# INNER JOIN domain_users du ON um.new_user_id = du.id
# WHERE du.type = 'Domain::User::E621User'
# AND du.json_attributes->>'migrated_user_favs_at' IS NULL;
# IF v_user_ids IS NULL THEN
# RAISE NOTICE 'No users found to process';
# RETURN;
# END IF;
# -- Get total user count for progress tracking
# v_total_count := array_length(v_user_ids, 1);
# RAISE NOTICE 'Total users to process: %', v_total_count;
# -- Loop over user IDs in batches
# FOR i IN 1..v_total_count BY v_batch_size LOOP
# -- Extract the current batch of users
# v_batch := v_user_ids[i:LEAST(i + v_batch_size - 1, v_total_count)];
# -- Insert batch for the current set of users
# INSERT INTO domain_user_post_favs (user_id, post_id)
# SELECT um.new_user_id, pm.new_post_id
# FROM domain_e621_favs old_favs
# JOIN e621_post_map pm ON old_favs.post_id = pm.old_post_id
# JOIN e621_user_map um ON old_favs.user_id = um.old_user_id
# WHERE old_favs.user_id = ANY(v_batch)
# ON CONFLICT (user_id, post_id) DO NOTHING;
# UPDATE domain_users
# SET json_attributes = jsonb_set(json_attributes, '{migrated_user_favs_at}', to_jsonb(now()))
# FROM e621_user_map um
# WHERE domain_users.id = um.new_user_id
# AND um.old_user_id = ANY(v_batch)
# AND domain_users.type = 'Domain::User::E621User';
# -- Update progress tracking
# v_processed_count := LEAST(i + v_batch_size - 1, v_total_count);
# v_progress := (v_processed_count::numeric / v_total_count::numeric) * 100;
# -- Log progress
# RAISE NOTICE 'Processed users % of % - user ids: % (Progress: % %%)',
# v_processed_count, v_total_count, v_batch, ROUND(v_progress, 2);
# -- COMMIT;
# END LOOP;
# END $$;
# SQL
if only_user
query = Domain::User::E621User.where(name: only_user)
else
query = Domain::User::E621User.where(migrated_user_favs_at: nil)
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 100) do |batch|
ReduxApplicationRecord.transaction do
batch.each { |user| migrate_e621_user_favs(user) }
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_fa_users_favs(only_user: nil)
logger.info "migrating fa users favs"
ReduxApplicationRecord.connection.execute(<<~SQL)
-- Map old user IDs to new user IDs:
DROP TABLE IF EXISTS user_map;
CREATE TABLE user_map TABLESPACE mirai AS
SELECT old_users.id AS old_user_id, new_users.id AS new_user_id
FROM domain_fa_users old_users
JOIN domain_users new_users
ON new_users.json_attributes->>'url_name' = old_users.url_name
WHERE new_users.type = 'Domain::User::FaUser';
CREATE INDEX idx_user_map_old_user_id ON user_map(old_user_id, new_user_id) TABLESPACE mirai;
CREATE INDEX idx_user_map_new_user_id ON user_map(new_user_id, old_user_id) TABLESPACE mirai;
ANALYZE user_map;
-- Map old post IDs to new post IDs:
DROP TABLE IF EXISTS post_map;
CREATE TABLE post_map TABLESPACE mirai AS
SELECT old_posts.id AS old_post_id, new_posts.id AS new_post_id
FROM domain_fa_posts old_posts
JOIN domain_posts new_posts
ON (new_posts.json_attributes->>'fa_id')::integer = old_posts.fa_id
WHERE new_posts.type = 'Domain::Post::FaPost';
CREATE INDEX idx_post_map_old_post_id ON post_map(old_post_id, new_post_id) TABLESPACE mirai;
CREATE INDEX idx_post_map_new_post_id ON post_map(new_post_id, old_post_id) TABLESPACE mirai;
ANALYZE post_map;
DO $$
DECLARE
v_user_ids bigint[];
v_batch_size integer := 10; -- Adjust batch size as needed
v_total_count integer;
v_processed_count integer := 0;
v_progress numeric;
v_batch bigint[];
BEGIN
RAISE NOTICE 'Counting users...';
-- Fetch all distinct user_ids into an array
SELECT array_agg(domain_fa_users.id ORDER BY domain_fa_users.id)
INTO v_user_ids
FROM domain_fa_users
INNER JOIN user_map um ON domain_fa_users.id = um.old_user_id
INNER JOIN domain_users du ON um.new_user_id = du.id
WHERE du.type = 'Domain::User::FaUser'
AND du.json_attributes->>'migrated_user_favs_at' IS NULL;
IF v_user_ids IS NULL THEN
RAISE NOTICE 'No users found to process';
RETURN;
END IF;
-- Get total user count for progress tracking
v_total_count := array_length(v_user_ids, 1);
RAISE NOTICE 'Total users to process: %', v_total_count;
-- Loop over user IDs in batches
FOR i IN 1..v_total_count BY v_batch_size LOOP
-- Extract the current batch of users
v_batch := v_user_ids[i:LEAST(i + v_batch_size - 1, v_total_count)];
-- Insert batch for the current set of users
INSERT INTO domain_user_post_favs (user_id, post_id)
SELECT um.new_user_id, pm.new_post_id
FROM domain_fa_favs old_favs
JOIN post_map pm ON old_favs.post_id = pm.old_post_id
JOIN user_map um ON old_favs.user_id = um.old_user_id
WHERE old_favs.user_id = ANY(v_batch)
ON CONFLICT (user_id, post_id) DO NOTHING;
UPDATE domain_users
SET json_attributes = jsonb_set(json_attributes, '{migrated_user_favs_at}', to_jsonb(now()))
FROM user_map um
WHERE domain_users.id = um.new_user_id
AND um.old_user_id = ANY(v_batch)
AND domain_users.type = 'Domain::User::FaUser';
-- Update progress tracking
v_processed_count := LEAST(i + v_batch_size - 1, v_total_count);
v_progress := (v_processed_count::numeric / v_total_count::numeric) * 100;
-- Log progress
RAISE NOTICE 'Processed users % of % - user ids: % (Progress: % %%)',
v_processed_count, v_total_count, v_batch, ROUND(v_progress, 2);
-- COMMIT;
END LOOP;
END $$;
SQL
# previous attempt that does not batch:
# INSERT INTO domain_user_post_favs (user_id, post_id)
# SELECT um.new_user_id, pm.new_post_id
# FROM domain_fa_favs old_favs
# JOIN post_map pm ON old_favs.post_id = pm.old_post_id
# JOIN user_map um ON old_favs.user_id = um.old_user_id
# ON CONFLICT (user_id, post_id) DO NOTHING;
# ReduxApplicationRecord.connection.execute(<<~SQL)
# UPDATE domain_users
# SET json_attributes = jsonb_set(json_attributes, '{migrated_user_favs_at}', to_jsonb(now()))
# WHERE domain_users.type = 'Domain::User::FaUser'
# SQL
end
sig { params(only_user: T.nilable(String)).void }
def migrate_fa_users_followed_users(only_user: nil)
logger.info "migrating fa followed users"
if only_user
query = Domain::User::FaUser.where(url_name: only_user)
else
query = Domain::User::FaUser.where(migrated_followed_users_at: nil)
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 10) do |batch|
ReduxApplicationRecord.transaction do
batch.each { |user| migrate_fa_user_followed_users(user) }
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_inkbunny_users(only_user: nil)
logger.info "migrating inkbunny users"
if only_user
query = Domain::Inkbunny::User.where(name: only_user)
else
query =
Domain::Inkbunny::User.where.not(
ib_user_id: Domain::User::InkbunnyUser.select(:ib_id),
)
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 10_000) do |batch|
ReduxApplicationRecord.transaction do
models =
migrate_batch(
Domain::User::InkbunnyUser,
batch,
unique_by: [:idx_domain_inkbunny_users_on_ib_id],
) { |old_user| initialize_inkbunny_user_from(old_user) }
migrate_batch(Domain::UserAvatar, models.filter(&:avatar)) do |user|
avatar = T.must(user.avatar)
avatar.user_id = user.id
avatar
end
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_inkbunny_posts(only_user: nil)
logger.info "migrating inkbunny posts"
if only_user
user = Domain::Inkbunny::User.find_by(name: only_user)
if user.nil?
logger.info "inkbunny user not found: #{only_user}"
return
end
missing_ib_post_ids = user.posts.pluck(:ib_post_id)
else
existing_ib_post_ids = Domain::Inkbunny::Post.pluck(:ib_post_id).to_set
logger.info "existing_ib_post_ids: #{existing_ib_post_ids.size}"
new_ib_post_ids = Domain::Post::InkbunnyPost.pluck(:ib_id).to_set
logger.info "new_ib_post_ids: #{new_ib_post_ids.size}"
missing_ib_post_ids = (existing_ib_post_ids - new_ib_post_ids).to_a
missing_ib_post_ids.sort!
logger.info "missing_ib_post_ids: #{missing_ib_post_ids.size}"
# query =
# Domain::Inkbunny::Post
# .where.not(ib_post_id: Domain::Post::InkbunnyPost.select(:ib_id))
# .includes(:creator, { files: :log_entry }, :pools)
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: missing_ib_post_ids.size,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
missing_ib_post_ids.each_slice(100) do |id_batch|
batch =
Domain::Inkbunny::Post
.where(ib_post_id: id_batch)
.includes(:creator, { files: :log_entry }, :pools)
.to_a
ReduxApplicationRecord.transaction do
models =
migrate_batch(
Domain::Post::InkbunnyPost,
batch,
unique_by: [:idx_domain_inkbunny_posts_on_ib_id],
) { |old_post| initialize_inkbunny_post(old_post) }
migrate_batch(
Domain::UserPostCreation,
models.filter(&:primary_user_post_creation),
unique_by: [:index_domain_user_post_creations_on_user_id_and_post_id],
) do |post|
user_post_creation = T.must(post.primary_user_post_creation)
user_post_creation.post_id = post.id
user_post_creation
end
models.each do |model|
model.files.each { |file| file.post_id = model.id }
end
migrate_batch(
Domain::PostFile::InkbunnyPostFile,
models.flat_map(&:files),
unique_by: [:idx_domain_inkbunny_post_files_on_ib_id],
) { |file| file }
rescue ActiveRecord::StatementInvalid => e
logger.error "StatementInvalid: #{e.message}"
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig { params(only_user: T.nilable(String)).void }
def migrate_inkbunny_pools(only_user: nil)
logger.info "migrating inkbunny pools"
if only_user
user = Domain::Inkbunny::User.find_by(name: only_user)
if user.nil?
logger.info "inkbunny user not found: #{only_user}"
return
end
query =
Domain::Inkbunny::Pool.where(
ib_pool_id:
Domain::Inkbunny::PoolJoin.where(post: user.posts).select(
:ib_pool_id,
),
).includes(pool_joins: %i[post left_post right_post])
else
query =
Domain::Inkbunny::Pool
.where.not(ib_pool_id: Domain::PostGroup::InkbunnyPool.select(:ib_id))
.includes(pool_joins: %i[post left_post right_post])
end
pb =
ProgressBar.create(
throttle_rate: 0.2,
total: query.count,
format: "%t: %c/%C %B %p%% %a %e",
output: @pb_sink,
)
query.find_in_batches(batch_size: 100) do |batch|
ReduxApplicationRecord.transaction do
models =
migrate_batch(Domain::PostGroup::InkbunnyPool, batch) do |old_pool|
initialize_inkbunny_pool(old_pool)
end
models.each do |model|
model.post_group_joins.each do |post_group_join|
post_group_join.group_id = model.id
end
end
migrate_batch(
Domain::PostGroupJoin::InkbunnyPoolJoin,
models.flat_map(&:post_group_joins),
) { |post_group_join| post_group_join }
end
pb.progress = [pb.progress + batch.size, pb.total].min
end
end
sig do
params(old_user: Domain::Inkbunny::User).returns(Domain::User::InkbunnyUser)
end
def initialize_inkbunny_user_from(old_user)
new_user = Domain::User::InkbunnyUser.new
new_user.ib_id = old_user.ib_user_id
new_user.name = old_user.name
new_user.state = old_user.state
new_user.scanned_gallery_at = old_user.scanned_gallery_at
new_user.deep_update_log_entry_id = old_user.deep_update_log_entry_id
new_user.shallow_update_log_entry_id = old_user.shallow_update_log_entry_id
new_user.created_at = old_user.created_at
if avatar_url_str = old_user.avatar_url_str
new_avatar = Domain::UserAvatar.new
new_avatar.log_entry_id = old_user.avatar_file_log_entry_id
new_avatar.url_str = avatar_url_str
new_avatar.downloaded_at = old_user.avatar_downloaded_at
new_avatar.state =
case old_user.avatar_state
when "ok"
old_user.avatar_file_log_entry_id.present? ? "ok" : "pending"
when "not_found"
new_avatar.error_message = old_user.avatar_state
"file_404"
else
new_avatar.error_message = old_user.avatar_state
"http_error"
end
new_user.avatar = new_avatar
end
new_user
end
sig do
params(old_post: Domain::Inkbunny::Post).returns(Domain::Post::InkbunnyPost)
end
def initialize_inkbunny_post(old_post)
new_post = Domain::Post::InkbunnyPost.new
new_post.ib_id = old_post.ib_post_id
new_post.state = old_post.state
new_post.rating = old_post.rating
new_post.submission_type = old_post.submission_type
new_post.created_at = old_post.created_at
new_post.posted_at = old_post.posted_at
new_post.title = old_post.title
new_post.writing = old_post.writing
new_post.num_views = old_post.num_views
new_post.num_files = old_post.num_files
new_post.num_favs = old_post.num_favs
new_post.num_comments = old_post.num_comments
new_post.keywords = old_post.keywords
new_post.last_file_updated_at = old_post.last_file_updated_at
new_post.deep_update_log_entry_id = old_post.deep_update_log_entry_id
new_post.shallow_update_log_entry_id = old_post.shallow_update_log_entry_id
new_post.shallow_updated_at = old_post.shallow_updated_at
new_post.deep_updated_at = old_post.deep_updated_at
new_post.ib_detail_raw = old_post.ib_detail_raw
if old_creator = old_post.creator
new_post.creator =
Domain::User::InkbunnyUser.find_by!(ib_id: old_creator.ib_user_id)
end
new_post.files =
old_post.files.map do |old_file|
new_state =
case old_file.state
when "ok"
old_file.log_entry_id.present? ? "ok" : "pending"
else
"terminal_error"
end
new_file = Domain::PostFile::InkbunnyPostFile.new
new_file.ib_id = old_file.ib_file_id
new_file.log_entry = old_file.log_entry
new_file.last_status_code = old_file.log_entry&.status_code
new_file.url_str = old_file.url_str
new_file.state = new_state
new_file.file_name = old_file.file_name
new_file.md5_initial = old_file.md5_initial
new_file.md5_full = old_file.md5_full
new_file.md5s = old_file.md5s
new_file.file_order = old_file.file_order
new_file.ib_detail_raw = old_file.ib_detail_raw
new_file.ib_created_at = old_file.ib_created_at
new_file
end
new_post
end
sig do
params(old_pool: Domain::Inkbunny::Pool).returns(
Domain::PostGroup::InkbunnyPool,
)
end
def initialize_inkbunny_pool(old_pool)
new_pool = Domain::PostGroup::InkbunnyPool.new
new_pool.ib_id = old_pool.ib_pool_id
new_pool.post_group_joins =
old_pool.pool_joins.map do |old_pool_join|
new_pool_join = Domain::PostGroupJoin::InkbunnyPoolJoin.new
new_pool_join.post_id =
Domain::Post::InkbunnyPost.find_by!(
ib_id: T.must(old_pool_join.post).ib_post_id,
).id
if old_left_post = old_pool_join.left_post
new_pool_join.left_post_id =
Domain::Post::InkbunnyPost.find_by!(
ib_id: old_left_post.ib_post_id,
).id
end
if old_right_post = old_pool_join.right_post
new_pool_join.right_post_id =
Domain::Post::InkbunnyPost.find_by!(
ib_id: old_right_post.ib_post_id,
).id
end
new_pool_join
end
new_pool
end
private
sig { params(old_user: Domain::E621::User).returns(Domain::User::E621User) }
def initialize_e621_user_from(old_user)
new_user = Domain::User::E621User.new
new_user.e621_id = old_user.e621_user_id
new_user.name = old_user.name
new_user.favs_are_hidden = old_user.favs_are_hidden
new_user.num_other_favs_cached = old_user.num_other_favs_cached
new_user.scanned_favs_status = old_user.scanned_favs_status
new_user.scanned_favs_at = old_user.scanned_favs_at
new_user
end
sig { params(old_post: Domain::E621::Post).returns(Domain::Post::E621Post) }
def initialize_e621_post_from(old_post)
new_post = Domain::Post::E621Post.new
new_post.state = old_post.state
new_post.e621_id = old_post.e621_id
new_post.scanned_post_favs_at = old_post.scanned_post_favs_at
new_post.rating = old_post.rating
new_post.tags_array = old_post.tags_array
new_post.flags_array = old_post.flags_array
new_post.pools_array = old_post.pools_array
new_post.sources_array = old_post.sources_array
new_post.artists_array = old_post.artists_array
new_post.e621_updated_at = old_post.e621_updated_at
new_post.posted_at = old_post.posted_at
new_post.last_index_page_id = old_post.last_index_page_id
new_post.caused_by_entry_id = old_post.caused_by_entry_id
new_post.scan_log_entry_id = old_post.scan_log_entry_id
new_post.index_page_ids = old_post.index_page_ids
new_post.md5 = old_post.md5
new_post.prev_md5s = old_post.prev_md5s
new_post.scan_error = old_post.scan_error
new_post.created_at = old_post.created_at
new_post.parent_post_e621_id = old_post.parent_e621_id
new_post.description = old_post.description
new_post.rating = old_post.rating
new_post.score = old_post.score
new_post.score_up = old_post.score_up
new_post.score_down = old_post.score_down
new_post.num_favorites = old_post.num_favorites
new_post.num_comments = old_post.num_comments
new_post.change_seq = old_post.change_seq
old_file = old_post.file
file_url_str = old_post.file_url_str
if old_file || file_url_str
new_file = Domain::PostFile.new
new_file.url_str = file_url_str
new_file.log_entry_id = old_file&.id || old_post.file_error&.log_entry_id
new_file.last_status_code =
old_file&.status_code || old_post.file_error&.status_code
new_file.retry_count = old_post.file_error&.retry_count
if old_file.present? && old_file.status_code == 200
new_file.state = "ok"
elsif old_file.present?
new_file.state = "terminal_error"
new_file.error_message = "status_code: #{old_file.status_code}"
else
new_file.state = "pending"
end
new_post.file = new_file
end
new_post
end
sig { params(old_user: Domain::Fa::User).returns(Domain::User::FaUser) }
def initialize_fa_user_from(old_user)
new_user = Domain::User::FaUser.new
new_user.state =
case old_user.state
when "ok"
"ok"
when "scan_error"
if /disabled or not found/ =~ old_user.state_detail["scan_error"]
"account_disabled"
else
"error"
end
else
raise("unknown fa user state: #{old_user.state}")
end
new_user.url_name = old_user.url_name
new_user.name = old_user.name
new_user.full_name = old_user.full_name
new_user.artist_type = old_user.artist_type
new_user.mood = old_user.mood
new_user.profile_html = old_user.profile_html
new_user.num_pageviews = old_user.num_pageviews
new_user.num_submissions = old_user.num_submissions
new_user.num_comments_recieved = old_user.num_comments_recieved
new_user.num_comments_given = old_user.num_comments_given
new_user.num_journals = old_user.num_journals
new_user.num_favorites = old_user.num_favorites
new_user.scanned_gallery_at = old_user.scanned_gallery_at
new_user.scanned_page_at = old_user.scanned_page_at
new_user.scanned_follows_at = old_user.scanned_follows_at
new_user.scanned_favs_at = old_user.scanned_favs_at
new_user.scanned_incremental_at = old_user.scanned_incremental_at
new_user.registered_at = old_user.registered_at
if old_avatar = old_user.avatar
new_avatar = Domain::UserAvatar.new
new_avatar.log_entry_id = old_avatar.log_entry_id
new_avatar.url_str = old_avatar.file_url_str
new_avatar.state =
case old_avatar.state
when "ok"
old_avatar.log_entry_id.present? ? "ok" : "pending"
when "file_not_found"
new_avatar.error_message = old_avatar.state
"file_404"
else
new_avatar.error_message = old_avatar.state
"http_error"
end
new_user.avatar = new_avatar
end
new_user
end
sig { params(post: Domain::Fa::Post).returns(Domain::Post::FaPost) }
def initialize_fa_post_from(post)
new_post = Domain::Post::FaPost.new
new_post.state = post.state
new_post.title = post.title
new_post.fa_id = post.fa_id
new_post.category = post.category
new_post.theme = post.theme
new_post.species = post.species
new_post.gender = post.gender
new_post.description = post.description
new_post.keywords = post.keywords
new_post.num_favorites = post.num_favorites
new_post.num_comments = post.num_comments
new_post.num_views = post.num_views
new_post.posted_at = post.posted_at
new_post.scanned_at = post.scanned_at
new_post.scan_file_error = post.scan_file_error
new_post.last_user_page_id = post.last_user_page_id
new_post.last_submission_log_entry_id = post.last_submission_page_id
new_post.first_browse_page_id = post.first_browse_page_id
new_post.first_gallery_page_id = post.first_gallery_page_id
new_post.first_seen_entry_id = post.first_seen_entry_id
new_post.created_at = post.created_at
if post.creator.present?
new_post.creator =
Domain::User::FaUser.find_by(url_name: post.creator&.url_name)
end
if post.file.present? || post.file_uri.present?
new_file = Domain::PostFile.new
new_file.log_entry = post.file
new_file.last_status_code = post.file&.status_code
new_file.url_str = post.file_uri.to_s
new_file.state = post.state
new_post.file = new_file
end
new_post
end
sig { params(user: Domain::User::FaUser).void }
def migrate_fa_user_favs(user)
user_url_name = user.url_name
old_user = Domain::Fa::User.find_by!(url_name: user_url_name)
Domain::UserPostFav.connection.execute(<<~SQL)
INSERT INTO domain_user_post_favs (user_id, post_id)
SELECT #{user.id}, domain_posts.id
FROM domain_fa_posts old_posts
INNER JOIN domain_posts ON
(domain_posts.json_attributes->>'fa_id')::integer = old_posts.fa_id
AND domain_posts.type = 'Domain::Post::FaPost'
INNER JOIN domain_fa_favs ON
domain_fa_favs.post_id = old_posts.id
AND domain_fa_favs.user_id = #{old_user.id}
ON CONFLICT (user_id, post_id) DO NOTHING
SQL
# Use reset_counters to update the counter cache after inserting favs
Domain::User.reset_counters(user.id, :user_post_favs)
logger.info(
"Reset user_post_favs counter cache for user #{user.name} (ID: #{user.id})",
)
if user.faved_posts.count != old_user.fav_posts.count
logger.error(
"favs mismatch for #{user.name}: (#{user.faved_posts.count} != #{old_user.fav_posts.count})",
)
else
user.migrated_user_favs_at = Time.current
user.save!
end
end
sig { params(user: Domain::User::FaUser).void }
def migrate_fa_user_followed_users(user)
user_url_name = user.url_name
old_user =
Domain::Fa::User.find_by(url_name: user_url_name) ||
begin
user.migrated_followed_users_at = Time.current
user.save!
return
end
followed_user_url_names = old_user.follows.pluck(:url_name)
new_user_ids =
Domain::User::FaUser.where(url_name: followed_user_url_names).pluck(:id)
new_user_ids.each_slice(10_000) do |user_ids|
Domain::UserUserFollow.upsert_all(
user_ids.map { |user_id| { from_id: user.id, to_id: user_id } },
unique_by: %i[from_id to_id],
)
end
# Use reset_counters to update follows_from for the current user
Domain::User.reset_counters(user.id, :user_user_follows_from)
logger.info(
"Reset user_user_follows_from counter cache for user #{user.name} (ID: #{user.id})",
)
# Update follows_to counts for users who were followed
new_user_ids.each_slice(1000) do |batch_ids|
batch_ids.each do |user_id|
Domain::User.reset_counters(user_id, :user_user_follows_to)
end
end
logger.info(
"Reset user_user_follows_to counter cache for #{new_user_ids.size} followed users",
)
if new_user_ids.size != old_user.follows.count
logger.error(
"followers mismatch for #{user.name}: (#{user.followed_users.count} != #{old_user.follows.count})",
)
else
user.migrated_followed_users_at = Time.current
user.save!
end
end
sig { params(new_user: Domain::User::E621User).void }
def migrate_e621_user_favs(new_user)
user_e621_id = new_user.e621_id
old_user = Domain::E621::User.find_by!(e621_user_id: user_e621_id)
old_post_e621_ids = old_user.faved_posts.select(:e621_id)
new_post_ids =
Domain::Post::E621Post.where(e621_id: old_post_e621_ids).pluck(:id)
new_post_ids.each_slice(10_000) do |post_ids|
Domain::UserPostFav.upsert_all(
post_ids.map { |post_id| { user_id: new_user.id, post_id: } },
unique_by: %i[user_id post_id],
)
end
# Use reset_counters to update the counter cache after upserting favs
Domain::User.reset_counters(new_user.id, :user_post_favs)
logger.info(
"Reset user_post_favs counter cache for user #{new_user.name} (ID: #{new_user.id})",
)
if new_user.faved_posts.count != old_user.faved_posts.count
logger.error(
"favs mismatch for #{new_user.name}: (#{new_user.faved_posts.count} != #{old_user.faved_posts.count})",
)
else
new_user.migrated_user_favs_at = Time.current
new_user.save!
end
end
sig do
params(
klass: T.class_of(ActiveRecord::Base),
attributes: T::Hash[String, T.untyped],
).returns(T::Hash[String, T.untyped])
end
def clean_attributes(klass, attributes)
reject_attrs = klass.columns.filter(&:virtual?).map(&:name)
if klass < AttrJsonRecordAliases
reject_attrs +=
T.unsafe(klass).attr_json_registry.attribute_names.map(&:to_s)
end
attributes.except(*reject_attrs).except("id", "created_at", "updated_at")
end
sig do
type_parameters(:Old, :New)
.params(
klass: T.class_of(ActiveRecord::Base),
batch: T::Array[T.all(T.type_parameter(:Old), ActiveRecord::Base)],
unique_by: T.nilable(T::Array[Symbol]),
model_mapper:
T.nilable(
T
.proc
.params(record: T.all(T.type_parameter(:Old), ActiveRecord::Base))
.returns(T.all(T.type_parameter(:New), ActiveRecord::Base)),
),
)
.returns(T::Array[T.type_parameter(:New)])
end
def migrate_batch(klass, batch, unique_by: nil, &model_mapper)
attributes = []
models = []
klass.transaction do
batch.each do |record|
model = model_mapper ? model_mapper.call(record) : record
if model.is_a?(Domain::User)
model.names_for_search_values.each do |name|
model.user_search_names.build(name:)
end
end
models << model
attributes << clean_attributes(klass, model.attributes)
end
if unique_by
returned = klass.upsert_all(attributes, unique_by:)
else
returned = klass.insert_all!(attributes)
end
returned.zip(models).each { |hash, model| model.id = hash["id"] }
end
if klass < Domain::User
models.each do |model|
model.user_search_names.each do |user_search_name|
user_search_name.user_id = model.id
end
end
migrate_batch(
Domain::UserSearchName,
models.flat_map(&:user_search_names),
unique_by: [:index_domain_user_search_names_on_user_id_and_name],
)
end
models
end
end