initial sqlite exporter script

This commit is contained in:
Dylan Knutson
2023-05-20 10:15:22 -07:00
parent e1c10b150c
commit cb3aaadd29
3 changed files with 378 additions and 2 deletions

View File

@@ -0,0 +1,357 @@
class Domain::Fa::SqliteExporter
include HasMeasureDuration
def initialize(db, sample, tables)
@db = db
@sample = sample
@tables = tables
end
def run
measure("created tables") do
migrate
end
measure("drop indexes") do
drop_indexes
end
measure(proc do |num|
"dumped #{num&.to_s&.bold} fa users"
end) do
dump_fa_users
end if dump_table?(:users)
measure(proc do |num|
"dumped #{num&.to_s&.bold} fa follows"
end) do
dump_fa_follows
end if dump_table?(:follows)
measure(proc do |num|
"dumped #{num&.to_s&.bold} fa favs"
end) do
dump_fa_favs
end if dump_table?(:favs)
measure(proc do |num|
"dumped #{num&.to_s&.bold} fa posts"
end) do
dump_fa_posts
end if dump_table?(:posts)
measure("created indexes") do
create_indexes
end
ensure
@db.close
end
def start_profiling!
RubyProf.start
end
def end_profiling!
base = "profiler/fa_sqlite_exporter"
FileUtils.mkdir_p(base) unless File.exist?(base)
result = RubyProf.stop
File.open("#{base}/profile.txt", "w") do |f|
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.html", "w") do |f|
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
end
File.open("#{base}/profile.rubyprof", "w") do |f|
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
end
end
private
def migrate
@db.execute_batch2 <<-SQL
create table if not exists fa_users (
id int primary key,
url_name text,
name text,
num_favorites int,
registered_at text
);
create table if not exists fa_follows (
id int primary key,
follower_id int,
followed_id int
);
create table if not exists fa_favs (
id int primary key,
user_id int,
post_id int
);
create table if not exists fa_posts (
id int primary key,
fa_id int,
creator_id int,
title text,
num_views int,
num_comments int,
num_favorites int,
posted_at text
);
SQL
end
INDEXES = [
["fa_users", "url_name", true],
["fa_follows", "follower_id", false],
["fa_follows", "followed_id", false],
["fa_favs", "user_id", false],
["fa_favs", "post_id", false],
["fa_posts", "creator_id", false],
["fa_posts", "fa_id", true],
]
def create_indexes
@db.execute_batch2(INDEXES.map do |table, col, unique|
<<-SQL
create #{unique ? "unique" : ""} index if not exists #{col}_on_#{table}
on #{table} (#{col});
SQL
end.join("\n"))
end
def drop_indexes
@db.execute_batch2(INDEXES.map do |table, col, unique|
<<-SQL
drop index if exists #{col}_on_#{table};
SQL
end.join("\n"))
end
def dump_fa_users
dump_table_common(
model: Domain::Fa::User,
table: "fa_users",
columns: %w[id url_name name num_favorites registered_at],
batch_size: 512,
) do |batch|
# format registered_at
batch.each do |row|
row[4] = row[4]&.iso8601
end
end
end
def dump_fa_follows
dump_table_common(
model: Domain::Fa::Follow,
table: "fa_follows",
columns: ["id", "follower_id", "followed_id"],
batch_size: 4096,
)
end
def dump_fa_favs
dump_table_common(
model: Domain::Fa::Fav,
table: "fa_favs",
columns: ["id", "user_id", "post_id"],
batch_size: 4096,
)
end
def dump_fa_posts
dump_table_common(
model: Domain::Fa::Post.where("file_url_str is not null"),
table: "fa_posts",
columns: %w[
id fa_id title creator_id
num_views num_comments num_favorites
posted_at
],
batch_size: 4096,
) do |batch|
# format posted_at
batch.each do |row|
row[7] = row[7]&.iso8601
end
end
end
# ====== common infra ====== #
def dump_table_common(model:, table:, columns:, batch_size:)
num_models = 0
start_id = max_id(table, "id")
start_id += 1 if start_id
logger.info("dumping #{table.bold}, start at #{(start_id || "first").to_s.bold}...")
inserter = create_inserter(batch_size, table, columns)
load_duration = 0.0
insert_duration = 0.0
map_duration = 0.0
dump_start = Time.now
load_start = Time.now
@db.transaction
pluck_rows(model, columns, start_id: start_id, batch_size: batch_size) do |rows|
load_duration += Time.now - load_start
map_start = Time.now
yield rows if block_given?
map_duration += Time.now - map_start
insert_start = Time.now
inserter.insert(rows)
insert_duration += Time.now - insert_start
num_models += rows.size
load_start = Time.now
end
dump_duration = Time.now - dump_start
logger.info(
"time spent on #{table.bold} " +
"(#{(num_models / dump_duration).round(0).to_s.bold}/sec): " +
"#{load_duration.round(2).to_s.bold} sec loading, " +
"#{map_duration.round(2).to_s.bold} sec mapping, " +
"#{insert_duration.round(2).to_s.bold} sec inserting"
)
num_models
ensure
inserter.close if inserter
@db.commit
end
def create_inserter(bulk_size, table, columns)
Inserter.new(@db, bulk_size, table, columns)
end
class Inserter
include HasColorLogger
include HasMeasureDuration
def initialize(db, bulk_size, table, columns)
@db = db
@bulk_size = bulk_size
@table = table
@columns = columns
binds = "(" + (["?"] * columns.size).join(", ") + ")"
@single = @db.prepare <<-SQL
insert into #{table} (#{columns.join(", ")})
values #{binds}
SQL
@bulk = @db.prepare <<-SQL
insert into #{table} (#{columns.join(", ")})
values #{([binds] * bulk_size).join(", ")}
SQL
end
def close
@single.close
@bulk.close
end
def insert(colss)
while colss.size >= @bulk_size
insert_bulk(colss[0...@bulk_size])
colss = colss[@bulk_size...]
end
if colss.any?
colss.each do |col|
insert_single(col)
end
end
end
private
def insert_single(cols)
bad_dims! if cols.size != @columns.size
@bind_index = 1
@single.reset!
bind_single(@single, cols)
@single.execute
end
def insert_bulk(colss)
bad_dims! if colss.size != @bulk_size
bad_dims! if colss.any? { |col| col.size != @columns.size }
@bind_index = 1
@bulk.reset!
bind_bulk(@bulk, colss)
@bulk.execute
end
def bind_single(stmt, binds)
# stmt.bind_params(binds)
binds.each do |value|
stmt.bind_param(@bind_index, value)
@bind_index += 1
end
end
def bind_bulk(stmt, binds)
# stmt.bind_params(binds)
binds.each do |arr|
bind_single(stmt, arr)
end
end
def dimensionality!
raise("incorrect dimensions")
end
end
def pluck_rows(relation, cols, start_id:, batch_size:)
num_batches = 0
num_models = 0
start_time = Time.now
models_in_measure = 0
relation = relation.all unless relation.is_a?(ActiveRecord::Relation)
relation = relation.where("id >= ?", start_id) if start_id
relation.pluck_in_batches(*cols.map(&:to_sym), batch_size: batch_size) do |batch|
yield batch
num_models += batch.size
models_in_measure += batch.size
num_batches += 1
print "."
if num_batches % 128 == 0
rate = (models_in_measure / (Time.now - start_time)).round(1)
start_time = Time.now
models_in_measure = 0
puts " #{num_models.to_s.bold} - #{rate.to_s.bold}/sec"
end
break if @sample && num_models >= batch_size * 32
end
puts ""
end
def max_id(table, column)
@db.get_first_value <<-SQL
select max(#{column}) from #{table}
SQL
end
def dump_table?(table)
ret = @tables.include?(:all) || @tables.include?(table)
if !ret
logger.info("skipping #{table.to_s.bold}...")
end
ret
end
end

View File

@@ -8,10 +8,14 @@ module HasMeasureDuration
now = Time.now
ret = yield
duration = Time.now - now
if duration >= 5
if duration >= 1.hour
duration_str = "#{(duration / 1.hour).round(2).to_s.bold} hr"
elsif duration >= 5.minute
duration_str = "#{(duration / 1.minute).round(2).to_s.bold} min"
elsif duration >= 5.second
duration_str = "#{duration.round(2).to_s.bold} sec"
else
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
duration_str = "#{(duration * 1000).round(0).to_s.bold} ms"
end
title = title.call(ret) if title.respond_to?(:call)
logger.info "#{title} - #{duration_str}"

View File

@@ -238,4 +238,19 @@ namespace :fa do
})
end
end
task :export_to_sqlite => [:environment, :set_logger_stdout] do
profile = !!ENV["profile"]
sample = !!ENV["sample"]
outfile = ENV["outfile"] || raise("'outfile' required")
tables = ENV["tables"] || raise("'tables' required (all, users, follows, favs, posts)")
tables = tables.split(",").map(&:to_sym)
db = SQLite3::Database.new(outfile)
exporter = Domain::Fa::SqliteExporter.new(db, sample, tables)
exporter.start_profiling! if profile
exporter.run
exporter.end_profiling! if profile
end
end