Files
redux-scraper/app/lib/domain/fa/sqlite_exporter.rb
2025-01-01 03:29:53 +00:00

313 lines
8.0 KiB
Ruby

# typed: false
class Domain::Fa::SqliteExporter
include HasMeasureDuration
TABLES = {
fa_users: {
model: Domain::Fa::User,
columns: [
%w[id int primary key],
%w[url_name text],
%w[name text],
%w[artist_type text],
%w[mood text],
%w[num_pageviews int],
%w[num_submissions int],
%w[num_comments_recieved int],
%w[num_comments_given int],
%w[num_journals int],
%w[num_favorites int],
%w[registered_at text],
],
indexes: [{ on: "id", unique: true }, { on: "url_name", unique: true }],
batch_size: 512,
# format registered_at column
each_row: ->(row) { row[11] = row[11]&.iso8601 },
},
fa_follows: {
model: Domain::Fa::Follow,
columns: [%w[follower_id int], %w[followed_id int]],
indexes: [{ on: %w[follower_id followed_id], unique: true }],
fk: {
follower_id: %w[fa_users id],
followed_id: %w[fa_users id],
},
batch_size: 4096,
},
fa_favs: {
model: Domain::Fa::Fav,
columns: [%w[user_id int], %w[post_id int]],
indexes: [{ on: %w[user_id post_id], unique: true }],
fk: {
user_id: %w[fa_users id],
post_id: %w[fa_posts id],
},
batch_size: 4096,
},
fa_posts: {
model: Domain::Fa::Post.where("file_url_str is not null"),
columns: [
%w[id int],
%w[fa_id int],
%w[creator_id int],
%w[title text],
%w[category text],
%w[theme text],
%w[species text],
%w[gender text],
%w[file_url_str text],
%w[num_views int],
%w[num_comments int],
%w[num_favorites int],
%w[posted_at text],
],
batch_size: 4096,
indexes: [{ on: "id", unique: true }, { on: "fa_id", unique: true }],
fk: {
creator_id: %w[fa_users id],
},
# format posted_at column
each_row: ->(row) { row[12] = row[12]&.iso8601 },
},
}
def initialize(db, sample, tables)
@db = db
@sample = sample
@tables = tables.include?(:all) ? TABLES.keys : tables
@tables.each do |table|
raise("unknown table: #{table}") unless TABLES.key?(table)
end
end
def run
measure("create tables") { migrate }
@tables.each do |table|
config = TABLES[table]
measure(
proc do |num|
num && "dumped #{table}, #{num} rows" || "dumping #{table}..."
end,
) do
dump_table_common(
table: table,
model: config[:model],
columns: config[:columns],
batch_size: config[:batch_size],
each_row: config[:each_row],
)
end
end
ensure
@db.close
end
def start_profiling!
RubyProf.start
end
def end_profiling!
base = "profiler/fa_sqlite_exporter"
FileUtils.mkdir_p(base) unless File.exist?(base)
result = RubyProf.stop
File.open("#{base}/profile.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.rubyprof", "w") do |f|
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
end
end
private
def migrate
TABLES.each do |table, config|
columns = config[:columns]
fk = config[:fk] || {}
columns_and_fks = [
columns.map { |name, *rest| "#{name} #{rest.join(" ")}" }.join(",\n"),
fk.map do |name, foreign|
foreign_table, foreign_column = foreign
"foreign key (#{name}) references #{foreign_table}(#{foreign_column})"
end,
].flatten.join(",\n")
sql = <<-SQL
create table if not exists #{table} (
#{columns_and_fks}
);
SQL
# logger.info(sql)
@db.execute_batch2(sql)
config[:indexes].each do |index|
unique = index[:unique] ? "unique" : ""
cols = [index[:on]].flatten
col_names = cols.join("_")
sql = <<-SQL
create #{unique} index if not exists #{col_names}_on_#{table}
on #{table} (#{cols.join(", ")});
SQL
logger.info(sql)
@db.execute_batch2(sql)
end
end
end
# ====== common infra ====== #
def dump_table_common(model:, table:, columns:, batch_size:, each_row: nil)
num_models = 0
logger.info("[#{table.to_s.bold}] [batch size: #{batch_size.to_s.bold}]")
inserter = create_inserter(batch_size, table, columns)
load_duration = 0.0
insert_duration = 0.0
map_duration = 0.0
dump_start = Time.now
load_start = Time.now
pluck_rows(model, columns, batch_size: batch_size) do |batch|
@db.transaction do
load_duration += Time.now - load_start
map_start = Time.now
batch.each { |row| each_row.call(row) } if each_row
map_duration += Time.now - map_start
insert_start = Time.now
inserter.insert(batch)
insert_duration += Time.now - insert_start
num_models += batch.size
load_start = Time.now
end
end
dump_duration = Time.now - dump_start
logger.info(
"[#{table.to_s.bold}] " +
"[#{(num_models / dump_duration).round(0).to_s.bold}/sec] " +
"[load: #{load_duration.round(2).to_s.bold} sec] " +
"[map: #{map_duration.round(2).to_s.bold} sec] " +
"[insert: #{insert_duration.round(2).to_s.bold} sec]",
)
num_models
ensure
inserter.close if inserter
end
def create_inserter(bulk_size, table, columns)
Inserter.new(@db, bulk_size, table, columns)
end
class Inserter
include HasColorLogger
include HasMeasureDuration
def initialize(db, bulk_size, table, columns)
@db = db
@bulk_size = [bulk_size, 999 / columns.size].min
@table = table
@columns = columns
binds = "(" + (["?"] * @columns.size).join(", ") + ")"
@single = @db.prepare <<-SQL
insert into #{@table} (#{@columns.map(&:first).join(", ")})
values #{binds} on conflict do nothing
SQL
@bulk = @db.prepare <<-SQL
insert into #{@table} (#{@columns.map(&:first).join(", ")})
values #{([binds] * @bulk_size).join(", ")} on conflict do nothing
SQL
end
def close
@single.close
@bulk.close
end
def insert(colss)
while colss.size >= @bulk_size
insert_bulk(colss[0...@bulk_size])
colss = colss[@bulk_size...]
end
colss.each { |col| insert_single(col) } if colss.any?
end
private
def insert_single(cols)
bad_dims! if cols.size != @columns.size
@bind_index = 1
@single.reset!
bind_single(@single, cols)
@single.execute
end
def insert_bulk(colss)
bad_dims! if colss.size != @bulk_size
bad_dims! if colss.any? { |col| col.size != @columns.size }
@bind_index = 1
@bulk.reset!
bind_bulk(@bulk, colss)
@bulk.execute
end
def bind_single(stmt, binds)
# stmt.bind_params(binds)
binds.each do |value|
stmt.bind_param(@bind_index, value)
@bind_index += 1
end
end
def bind_bulk(stmt, binds)
# stmt.bind_params(binds)
binds.each { |arr| bind_single(stmt, arr) }
end
def dimensionality!
raise("incorrect dimensions")
end
end
def pluck_rows(relation, cols, batch_size:)
num_batches = 0
num_models = 0
start_time = Time.now
models_in_measure = 0
relation.in_batches(of: batch_size, use_ranges: true) do |batch|
batch = batch.pluck(*cols.map(&:first).map(&:to_sym)).to_a
yield batch
num_models += batch.size
models_in_measure += batch.size
num_batches += 1
print "."
if num_batches % 128 == 0
rate = (models_in_measure / (Time.now - start_time)).round(1)
start_time = Time.now
models_in_measure = 0
puts " #{num_models.to_s.bold} - #{rate.to_s.bold}/sec"
end
break if @sample && num_models >= batch_size * 32
end
puts ""
end
end