initial sqlite exporter script
This commit is contained in:
357
app/lib/domain/fa/sqlite_exporter.rb
Normal file
357
app/lib/domain/fa/sqlite_exporter.rb
Normal file
@@ -0,0 +1,357 @@
|
||||
class Domain::Fa::SqliteExporter
|
||||
include HasMeasureDuration
|
||||
|
||||
def initialize(db, sample, tables)
|
||||
@db = db
|
||||
@sample = sample
|
||||
@tables = tables
|
||||
end
|
||||
|
||||
def run
|
||||
measure("created tables") do
|
||||
migrate
|
||||
end
|
||||
|
||||
measure("drop indexes") do
|
||||
drop_indexes
|
||||
end
|
||||
|
||||
measure(proc do |num|
|
||||
"dumped #{num&.to_s&.bold} fa users"
|
||||
end) do
|
||||
dump_fa_users
|
||||
end if dump_table?(:users)
|
||||
|
||||
measure(proc do |num|
|
||||
"dumped #{num&.to_s&.bold} fa follows"
|
||||
end) do
|
||||
dump_fa_follows
|
||||
end if dump_table?(:follows)
|
||||
|
||||
measure(proc do |num|
|
||||
"dumped #{num&.to_s&.bold} fa favs"
|
||||
end) do
|
||||
dump_fa_favs
|
||||
end if dump_table?(:favs)
|
||||
|
||||
measure(proc do |num|
|
||||
"dumped #{num&.to_s&.bold} fa posts"
|
||||
end) do
|
||||
dump_fa_posts
|
||||
end if dump_table?(:posts)
|
||||
|
||||
measure("created indexes") do
|
||||
create_indexes
|
||||
end
|
||||
ensure
|
||||
@db.close
|
||||
end
|
||||
|
||||
def start_profiling!
|
||||
RubyProf.start
|
||||
end
|
||||
|
||||
def end_profiling!
|
||||
base = "profiler/fa_sqlite_exporter"
|
||||
FileUtils.mkdir_p(base) unless File.exist?(base)
|
||||
result = RubyProf.stop
|
||||
File.open("#{base}/profile.txt", "w") do |f|
|
||||
RubyProf::GraphPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.html", "w") do |f|
|
||||
RubyProf::CallStackPrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
File.open("#{base}/profile.rubyprof", "w") do |f|
|
||||
RubyProf::SpeedscopePrinter.new(result).print(f, { min_percent: 1 })
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def migrate
|
||||
@db.execute_batch2 <<-SQL
|
||||
create table if not exists fa_users (
|
||||
id int primary key,
|
||||
url_name text,
|
||||
name text,
|
||||
num_favorites int,
|
||||
registered_at text
|
||||
);
|
||||
|
||||
create table if not exists fa_follows (
|
||||
id int primary key,
|
||||
follower_id int,
|
||||
followed_id int
|
||||
);
|
||||
|
||||
create table if not exists fa_favs (
|
||||
id int primary key,
|
||||
user_id int,
|
||||
post_id int
|
||||
);
|
||||
|
||||
create table if not exists fa_posts (
|
||||
id int primary key,
|
||||
fa_id int,
|
||||
creator_id int,
|
||||
title text,
|
||||
num_views int,
|
||||
num_comments int,
|
||||
num_favorites int,
|
||||
posted_at text
|
||||
);
|
||||
SQL
|
||||
end
|
||||
|
||||
INDEXES = [
|
||||
["fa_users", "url_name", true],
|
||||
["fa_follows", "follower_id", false],
|
||||
["fa_follows", "followed_id", false],
|
||||
["fa_favs", "user_id", false],
|
||||
["fa_favs", "post_id", false],
|
||||
["fa_posts", "creator_id", false],
|
||||
["fa_posts", "fa_id", true],
|
||||
]
|
||||
|
||||
def create_indexes
|
||||
@db.execute_batch2(INDEXES.map do |table, col, unique|
|
||||
<<-SQL
|
||||
create #{unique ? "unique" : ""} index if not exists #{col}_on_#{table}
|
||||
on #{table} (#{col});
|
||||
SQL
|
||||
end.join("\n"))
|
||||
end
|
||||
|
||||
def drop_indexes
|
||||
@db.execute_batch2(INDEXES.map do |table, col, unique|
|
||||
<<-SQL
|
||||
drop index if exists #{col}_on_#{table};
|
||||
SQL
|
||||
end.join("\n"))
|
||||
end
|
||||
|
||||
def dump_fa_users
|
||||
dump_table_common(
|
||||
model: Domain::Fa::User,
|
||||
table: "fa_users",
|
||||
columns: %w[id url_name name num_favorites registered_at],
|
||||
batch_size: 512,
|
||||
) do |batch|
|
||||
# format registered_at
|
||||
batch.each do |row|
|
||||
row[4] = row[4]&.iso8601
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def dump_fa_follows
|
||||
dump_table_common(
|
||||
model: Domain::Fa::Follow,
|
||||
table: "fa_follows",
|
||||
columns: ["id", "follower_id", "followed_id"],
|
||||
batch_size: 4096,
|
||||
)
|
||||
end
|
||||
|
||||
def dump_fa_favs
|
||||
dump_table_common(
|
||||
model: Domain::Fa::Fav,
|
||||
table: "fa_favs",
|
||||
columns: ["id", "user_id", "post_id"],
|
||||
batch_size: 4096,
|
||||
)
|
||||
end
|
||||
|
||||
def dump_fa_posts
|
||||
dump_table_common(
|
||||
model: Domain::Fa::Post.where("file_url_str is not null"),
|
||||
table: "fa_posts",
|
||||
columns: %w[
|
||||
id fa_id title creator_id
|
||||
num_views num_comments num_favorites
|
||||
posted_at
|
||||
],
|
||||
batch_size: 4096,
|
||||
) do |batch|
|
||||
# format posted_at
|
||||
batch.each do |row|
|
||||
row[7] = row[7]&.iso8601
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# ====== common infra ====== #
|
||||
|
||||
def dump_table_common(model:, table:, columns:, batch_size:)
|
||||
num_models = 0
|
||||
start_id = max_id(table, "id")
|
||||
start_id += 1 if start_id
|
||||
logger.info("dumping #{table.bold}, start at #{(start_id || "first").to_s.bold}...")
|
||||
inserter = create_inserter(batch_size, table, columns)
|
||||
|
||||
load_duration = 0.0
|
||||
insert_duration = 0.0
|
||||
map_duration = 0.0
|
||||
|
||||
dump_start = Time.now
|
||||
load_start = Time.now
|
||||
|
||||
@db.transaction
|
||||
|
||||
pluck_rows(model, columns, start_id: start_id, batch_size: batch_size) do |rows|
|
||||
load_duration += Time.now - load_start
|
||||
|
||||
map_start = Time.now
|
||||
yield rows if block_given?
|
||||
map_duration += Time.now - map_start
|
||||
|
||||
insert_start = Time.now
|
||||
inserter.insert(rows)
|
||||
insert_duration += Time.now - insert_start
|
||||
|
||||
num_models += rows.size
|
||||
load_start = Time.now
|
||||
end
|
||||
|
||||
dump_duration = Time.now - dump_start
|
||||
logger.info(
|
||||
"time spent on #{table.bold} " +
|
||||
"(#{(num_models / dump_duration).round(0).to_s.bold}/sec): " +
|
||||
"#{load_duration.round(2).to_s.bold} sec loading, " +
|
||||
"#{map_duration.round(2).to_s.bold} sec mapping, " +
|
||||
"#{insert_duration.round(2).to_s.bold} sec inserting"
|
||||
)
|
||||
num_models
|
||||
ensure
|
||||
inserter.close if inserter
|
||||
@db.commit
|
||||
end
|
||||
|
||||
def create_inserter(bulk_size, table, columns)
|
||||
Inserter.new(@db, bulk_size, table, columns)
|
||||
end
|
||||
|
||||
class Inserter
|
||||
include HasColorLogger
|
||||
include HasMeasureDuration
|
||||
|
||||
def initialize(db, bulk_size, table, columns)
|
||||
@db = db
|
||||
@bulk_size = bulk_size
|
||||
@table = table
|
||||
@columns = columns
|
||||
|
||||
binds = "(" + (["?"] * columns.size).join(", ") + ")"
|
||||
|
||||
@single = @db.prepare <<-SQL
|
||||
insert into #{table} (#{columns.join(", ")})
|
||||
values #{binds}
|
||||
SQL
|
||||
|
||||
@bulk = @db.prepare <<-SQL
|
||||
insert into #{table} (#{columns.join(", ")})
|
||||
values #{([binds] * bulk_size).join(", ")}
|
||||
SQL
|
||||
end
|
||||
|
||||
def close
|
||||
@single.close
|
||||
@bulk.close
|
||||
end
|
||||
|
||||
def insert(colss)
|
||||
while colss.size >= @bulk_size
|
||||
insert_bulk(colss[0...@bulk_size])
|
||||
colss = colss[@bulk_size...]
|
||||
end
|
||||
|
||||
if colss.any?
|
||||
colss.each do |col|
|
||||
insert_single(col)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def insert_single(cols)
|
||||
bad_dims! if cols.size != @columns.size
|
||||
@bind_index = 1
|
||||
@single.reset!
|
||||
bind_single(@single, cols)
|
||||
@single.execute
|
||||
end
|
||||
|
||||
def insert_bulk(colss)
|
||||
bad_dims! if colss.size != @bulk_size
|
||||
bad_dims! if colss.any? { |col| col.size != @columns.size }
|
||||
@bind_index = 1
|
||||
@bulk.reset!
|
||||
bind_bulk(@bulk, colss)
|
||||
@bulk.execute
|
||||
end
|
||||
|
||||
def bind_single(stmt, binds)
|
||||
# stmt.bind_params(binds)
|
||||
binds.each do |value|
|
||||
stmt.bind_param(@bind_index, value)
|
||||
@bind_index += 1
|
||||
end
|
||||
end
|
||||
|
||||
def bind_bulk(stmt, binds)
|
||||
# stmt.bind_params(binds)
|
||||
binds.each do |arr|
|
||||
bind_single(stmt, arr)
|
||||
end
|
||||
end
|
||||
|
||||
def dimensionality!
|
||||
raise("incorrect dimensions")
|
||||
end
|
||||
end
|
||||
|
||||
def pluck_rows(relation, cols, start_id:, batch_size:)
|
||||
num_batches = 0
|
||||
num_models = 0
|
||||
|
||||
start_time = Time.now
|
||||
models_in_measure = 0
|
||||
|
||||
relation = relation.all unless relation.is_a?(ActiveRecord::Relation)
|
||||
relation = relation.where("id >= ?", start_id) if start_id
|
||||
relation.pluck_in_batches(*cols.map(&:to_sym), batch_size: batch_size) do |batch|
|
||||
yield batch
|
||||
|
||||
num_models += batch.size
|
||||
models_in_measure += batch.size
|
||||
num_batches += 1
|
||||
print "."
|
||||
|
||||
if num_batches % 128 == 0
|
||||
rate = (models_in_measure / (Time.now - start_time)).round(1)
|
||||
start_time = Time.now
|
||||
models_in_measure = 0
|
||||
puts " #{num_models.to_s.bold} - #{rate.to_s.bold}/sec"
|
||||
end
|
||||
|
||||
break if @sample && num_models >= batch_size * 32
|
||||
end
|
||||
|
||||
puts ""
|
||||
end
|
||||
|
||||
def max_id(table, column)
|
||||
@db.get_first_value <<-SQL
|
||||
select max(#{column}) from #{table}
|
||||
SQL
|
||||
end
|
||||
|
||||
def dump_table?(table)
|
||||
ret = @tables.include?(:all) || @tables.include?(table)
|
||||
if !ret
|
||||
logger.info("skipping #{table.to_s.bold}...")
|
||||
end
|
||||
ret
|
||||
end
|
||||
end
|
||||
@@ -8,10 +8,14 @@ module HasMeasureDuration
|
||||
now = Time.now
|
||||
ret = yield
|
||||
duration = Time.now - now
|
||||
if duration >= 5
|
||||
if duration >= 1.hour
|
||||
duration_str = "#{(duration / 1.hour).round(2).to_s.bold} hr"
|
||||
elsif duration >= 5.minute
|
||||
duration_str = "#{(duration / 1.minute).round(2).to_s.bold} min"
|
||||
elsif duration >= 5.second
|
||||
duration_str = "#{duration.round(2).to_s.bold} sec"
|
||||
else
|
||||
duration_str = "#{(1000 * duration).to_i.to_s.bold} ms"
|
||||
duration_str = "#{(duration * 1000).round(0).to_s.bold} ms"
|
||||
end
|
||||
title = title.call(ret) if title.respond_to?(:call)
|
||||
logger.info "#{title} - #{duration_str}"
|
||||
|
||||
15
rake/fa.rake
15
rake/fa.rake
@@ -238,4 +238,19 @@ namespace :fa do
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
task :export_to_sqlite => [:environment, :set_logger_stdout] do
|
||||
profile = !!ENV["profile"]
|
||||
sample = !!ENV["sample"]
|
||||
outfile = ENV["outfile"] || raise("'outfile' required")
|
||||
|
||||
tables = ENV["tables"] || raise("'tables' required (all, users, follows, favs, posts)")
|
||||
tables = tables.split(",").map(&:to_sym)
|
||||
|
||||
db = SQLite3::Database.new(outfile)
|
||||
exporter = Domain::Fa::SqliteExporter.new(db, sample, tables)
|
||||
exporter.start_profiling! if profile
|
||||
exporter.run
|
||||
exporter.end_profiling! if profile
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user