update csv import job

This commit is contained in:
Dylan Knutson
2023-08-24 15:03:00 -07:00
parent 01f8d0b962
commit 1fa22351d5
10 changed files with 360 additions and 196 deletions

View File

@@ -155,3 +155,13 @@ task :workoff_failed_jobs => [:environment, :set_ar_stdout, :set_logger_stdout]
worker.run(job)
end
end
task :reverse_csv do
file = ENV["file"] || raise("need 'file' (file path)")
in_csv = CSV.parse(File.open(file, "r+"), headers: true)
out_csv = CSV.new(File.open("rev_" + file, "w"), write_headers: true, headers: in_csv.headers)
in_csv.reverse_each do |row|
out_csv << row.map(&:second)
end
out_csv.close
end

View File

@@ -1,5 +1,7 @@
module Domain::E621::Job
class PostsIndexJob < Base
TagAndCategory = Domain::E621::TagUtil::TagAndCategory
queue_as :e621
ignore_signature_args :caused_by_entry
@@ -28,35 +30,18 @@ module Domain::E621::Job
@num_created = 0
@num_seen = 0
all_tags = Set.new json["posts"].map { |post_json|
all_tag_names = Set.new json["posts"].map { |post_json|
tag_and_cat_for_json(post_json)
}.flatten
all_tag_names = all_tags.map(&:name)
}.flatten.map(&:name)
existing_tags = Domain::E621::Tag.where(name: all_tag_names)
@name_to_tag_id = existing_tags.map { |tag| [tag.name, tag.id] }.to_h
e621_id_to_post.each do |e621_id, post|
post.tags.each do |tag|
@name_to_tag_id[tag.name] = tag.id
end
end
missing_tags = all_tag_names - existing_tags.map(&:name)
if missing_tags.any?
logger.info("creating #{missing_tags.size.to_s.bold} missing tags")
upsert_hashes = missing_tags.map do |name|
{ name: name }
end
Domain::E621::Tag.upsert_all(
upsert_hashes,
unique_by: :name,
update_only: :name,
returning: %i[id name],
).each do |row|
@name_to_tag_id[row["name"]] = row["id"]
end
end
@name_to_tag_id, missing_tags =
Domain::E621::TagUtil.tag_names_to_id_map(
all_tag_names,
posts: e621_id_to_post.values,
)
@name_to_tag_id.merge!(
Domain::E621::TagUtil.create_tags_from_names(missing_tags)
)
json["posts"].each do |post_json|
@num_seen += 1
@@ -108,54 +93,27 @@ module Domain::E621::Job
post.pools_array = post_json["pools"]
post.sources_array = post_json["sources"]
post.tags_array = post_json["tags"]
post.artists_array = post_json["tags"]["artist"]
truth_tag_names = tag_and_cat_for_json(post_json)
existing_tag_names = tag_and_cat_for_model(post)
to_add = truth_tag_names - existing_tag_names
to_remove = existing_tag_names - truth_tag_names
new_record = post.new_record?
Domain::E621::Post.transaction do
is_new = post.new_record?
post.save!
if to_remove.any?
to_remove_ids = to_remove.
map(&:name).
map { |name| @name_to_tag_id[name] }
post.
taggings.
where(tag_id: to_remove_ids).
delete_all
end
post.taggings.insert_all!(to_add.map do |tag_and_cat|
id = @name_to_tag_id[tag_and_cat.name]
{ tag_id: id, category: tag_and_cat.category }
end) if to_add.any?
defer_job(Domain::E621::Job::StaticFileJob, {
post: post,
caused_by_entry: @log_entry,
}) if is_new
Domain::E621::TagUtil.update_tags_on_post_with_cat(
post,
tag_and_cat_for_json(post_json),
@name_to_tag_id,
)
end
defer_job(Domain::E621::Job::StaticFileJob, {
post: post,
caused_by_entry: @log_entry,
}) if new_record
true
end
def tag_names_to_ids(names)
names.map { |name| @name_to_tag_id[name] }
end
TAG_CATEGORIES = %w[general species character copyright artist lore meta]
TagAndCategory = Struct.new(:name, :category)
def tag_and_cat_for_model(model)
model.taggings.map do |tagging|
TagAndCategory.new(tagging.tag.name, tagging.category)
end
end
def tag_and_cat_for_json(post_json)
TAG_CATEGORIES.map do |tc|

View File

@@ -61,7 +61,12 @@ class ColorLogger
end
def self.klass_name_from_instance(instance)
klass_name = instance.class.name.dup
if instance.is_a?(String)
klass_name = instance.dup
else
klass_name = instance.class.name.dup
end
klass_name.delete_prefix!("Domain::")
prefixes = [

View File

@@ -0,0 +1,133 @@
require "csv"
class Domain::E621::CsvPostImporter < LegacyImport::BulkImportJob
def initialize(
csv_path:,
start_at:,
limit: nil
)
logger.info "loading..."
@csv_file = CSV.new(File.open(csv_path, "r+"), headers: true)
logger.info "loaded"
@start_at = start_at || 0
@start_time = Time.now
@limit = limit
logger.info "start_at=#{@start_at} limit=#{limit}"
end
def name
"e621_csv_post_importer"
end
def profile?
false
end
def run_impl
progress = 0
for row in @csv_file.each
row = row.to_h
e621_id = row["id"].to_i
if @start_at && e621_id >= @start_at
@start_time = Time.now
next
end
progress += self.class.import_row(row)
write_last_id e621_id
break if @limit && progress >= @limit
end
# while row = @csv_file.shift&.to_h
# e621_id = row["id"].to_i
# if @start_at && e621_id < @start_at
# @start_time = Time.now
# next
# end
# progress += self.class.import_row(row)
# write_last_id e621_id
# break if @limit && progress >= @limit
# end
progress
end
def self.import_row(row)
e621_id = row["id"].to_i
md5 = row["md5"]
logger.prefix = proc { "[e621_id #{e621_id.to_s.bold}]" }
post = Domain::E621::Post.find_by({
e621_id: e621_id,
}) || Domain::E621::Post.new({
e621_id: e621_id,
md5: md5,
})
if post.md5 && post.md5 != md5
logger.error("md5 mismatch, skipping")
return 0
end
if (updated_at = row["updated_at"])
csv_e621_updated_at = Time.parse(row["updated_at"])
if post.e621_updated_at && post.e621_updated_at > csv_e621_updated_at
logger.info("model more recent than csv, skipping")
return 0
end
post.e621_updated_at = csv_e621_updated_at
else
logger.warn("no updated_at in csv row")
end
file_ext = row["file_ext"]
post.file_url_str = "https://static1.e621.net/data/#{md5[0...2]}/#{md5[2...4]}/#{md5}.#{file_ext}"
post.description = row["description"]
post.rating = row["rating"]
post.score = row["score"].to_i
post.score_up = row["up_score"].to_i
post.score_down = row["down_score"].to_i
post.num_favorites = row["fav_count"].to_i
post.num_comments = row["comment_count"].to_i
post.change_seq = row["change_seq"].to_i
post.parent_e621_id = row["parent_id"]&.to_i
flags_array = []
flags_array << "deleted" if row["is_deleted"] == "t"
flags_array << "pending" if row["is_pending"] == "t"
flags_array << "flagged" if row["is_flagged"] == "t"
post.flags_array = flags_array
post.sources_array = row["source"].split(/[\n\s]/).map(&:strip)
post.tags_array = row["tag_string"].split(/[\n\s]/).map(&:strip).sort
name_to_tag_id, missing_tags =
Domain::E621::TagUtil.tag_names_to_id_map(
post.tags_array,
posts: [post],
)
name_to_tag_id.merge!(
Domain::E621::TagUtil.create_tags_from_names(missing_tags)
)
new_record = post.new_record?
Domain::E621::Post.transaction do
post.save!
Domain::E621::TagUtil.update_tags_on_post(
post,
post.tags_array,
name_to_tag_id,
)
end
Domain::E621::Job::StaticFileJob.perform_later({
post: post,
}) if new_record
logger.info("updated post (new? #{new_record ? "yes" : "no"})")
new_record ? 1 : 0
rescue => e
logger.error("error importing post: #{e.message}")
0
end
end

View File

@@ -0,0 +1,105 @@
module Domain::E621::TagUtil
include HasColorLogger
TagAndCategory = Struct.new(:name, :category)
# convert tag names [String] into {String -> Integer} ID map
def self.tag_names_to_id_map(tag_names, posts: [])
tags = Domain::E621::Tag.where(name: tag_names)
name_to_tag_id = tags.map { |tag| [tag.name, tag.id] }.to_h
posts.each do |post|
post.tags.each do |tag|
name_to_tag_id[tag.name] = tag.id
end
end
[
# main tag map
name_to_tag_id,
# missing tags
tag_names - tags.map(&:name),
]
end
def self.create_tags_from_names(tag_names)
return {} unless tag_names.any?
logger.info("creating #{tag_names.size.to_s.bold} missing tags")
upsert_hashes = tag_names.map do |name|
{ name: name }
end
name_to_tag_id = {}
Domain::E621::Tag.upsert_all(
upsert_hashes,
unique_by: :name,
update_only: :name,
returning: %i[id name],
).each do |row|
name_to_tag_id[row["name"]] = row["id"]
end
name_to_tag_id
end
def self.update_tags_on_post(post, tags, name_to_tag_id)
model_tags = tag_and_cat_for_model(post).map(&:name)
to_add = tags - model_tags
to_remove = model_tags - tags
add_and_remove_tags(post, to_add, to_remove, name_to_tag_id)
end
def self.update_tags_on_post_with_cat(post, tag_and_cats, name_to_tag_id)
model_tag_and_cats = tag_and_cat_for_model(post)
to_add = tag_and_cats - model_tag_and_cats
to_remove = model_tag_and_cats - tag_and_cats
add_and_remove_tags(post, to_add, to_remove, name_to_tag_id)
end
def self.tag_and_cat_for_model(model)
model.taggings.map do |tagging|
TagAndCategory.new(tagging.tag.name, tagging.category)
end
end
def self.add_and_remove_tags(post, to_add, to_remove, name_to_tag_id)
raise("post must be persisted") if post.new_record?
if to_remove.any?
to_remove_ids = to_remove.
map { |tacos|
name = if tacos.is_a?(TagAndCategory)
tacos.name
else
tacos
end
name_to_tag_id[name] || raise(
"invariant: #{name} not in id map"
)
}
post.
taggings.
where(tag_id: to_remove_ids).
delete_all
end
post.taggings.insert_all!(to_add.map do |tacos|
name = tag_and_cat_or_str_to_name(tacos)
id = name_to_tag_id[name] || raise(
"invariant: #{name} not in id map"
)
if tacos.is_a?(TagAndCategory)
{ tag_id: id, category: tacos.category }
else
{ tag_id: id, category: "cat_general" }
end
end) if to_add.any?
end
def self.tag_and_cat_or_str_to_name(tacos)
if tacos.is_a?(TagAndCategory)
tacos.name
else
tacos
end
end
end

View File

@@ -13,6 +13,10 @@ module HasColorLogger
define_method(:logger) do
@logger ||= ColorLogger.make(sink, self)
end
define_singleton_method(:logger) do
@logger ||= ColorLogger.make(sink, self.name)
end
end
end
end

View File

@@ -1,125 +0,0 @@
require "csv"
class LegacyImport::E621CsvPostImporter < LegacyImport::BulkImportJob
def initialize(
csv_path:,
batch_size:,
forks:,
start_at:
)
@csv_file = CSV.new(File.open(csv_path, "r+"), headers: true)
@forks = forks || 2
@batch_size = batch_size || @forks * 64
@start_at = start_at || 0
@start_time = Time.now
logger.info "forks=#{@forks} batch_size=#{@batch_size} start_at=#{@start_at}"
end
def name
"e621_csv_post_importer"
end
def profile?
false
end
def run_impl
progress = 0
while true
batch = []
while row = @csv_file.shift&.to_h
e621_id = row["id"].to_i
if @start_at && e621_id < @start_at
@start_time = Time.now
next
end
batch << row
break if batch.size >= @batch_size
end
break if batch.empty?
last_e621_id = batch.last["id"].to_i
if @forks == 1
progress += import_e621_rows(batch)
else
progress += ForkFuture.parallel_map_slice(@forks, batch) do |fork_batch|
import_e621_rows(fork_batch)
end.sum
end
rate = progress.to_f / (Time.now - @start_time)
puts "finish batch, last id #{e621_id} - #{progress} - #{rate.round(1)} / second"
write_last_id last_e621_id
end
progress
end
private
def import_e621_rows(rows)
progress = 0
rows.each do |row|
progress += import_e621_row(row)
end
progress
end
def import_e621_row(row)
e621_id = row["id"].to_i
post = Domain::E621::Post.find_by(e621_id: e621_id)
md5 = row["md5"]
if post
unless post.md5 == md5
post.file = nil
post.md5 = md5
end
else
post = Domain::E621::Post.new({
e621_id: e621_id,
md5: md5,
})
existing = Domain::E621::Post.find_by(md5: md5)
if existing
existing.state_detail["discarded_reason"] = "duplicate found during csv import: #{e621_id}",
existing.discard
puts "discard #{existing.id} / #{existing.e621_id} / #{existing.md5} in favor of #{e621_id}"
end
end
file_ext = row["file_ext"]
post.file_url_str = "https://static1.e621.net/data/#{md5[0...2]}/#{md5[2...4]}/#{md5}.#{file_ext}"
post.file ||= begin
le = HttpLogEntry.find_by_uri_host_path(post.file_url_str)
# puts "look up log entry for #{post.e621_id} (found: #{!le.nil?})"
le
end
post.sources_array = row["source"].split("\n")
post.rating = row["rating"]
post.tags_array = row["tag_string"].split(" ").sort
post.num_favorites = row["fav_count"].to_i
post.num_comments = row["comment_count"].to_i
post.description = row["description"]
post.score = row["score"].to_i
post.score_up = row["up_score"].to_i
post.score_down = row["down_score"].to_i
post.parent_e621_id = row["parent_id"]&.to_i
flags_array = []
flags_array << "deleted" if row["is_deleted"] == "t"
flags_array << "pending" if row["is_pending"] == "t"
flags_array << "flagged" if row["is_flagged"] == "t"
post.flags_array = flags_array
if post.changed?
post.save!
return 1
else
return 0
end
rescue
binding.pry
raise
end
end

View File

@@ -136,4 +136,14 @@ class Domain::E621::Post < ReduxApplicationRecord
def file_uri
Addressable::URI.parse(self.file_url_str) if self.file_url_str.present?
end
def e621_updated_at
str = state_detail["e621_updated_at"]
Time.parse(str) if str
end
def e621_updated_at=(time)
time = Time.parse(time) if time.is_a?(String)
state_detail["e621_updated_at"] = time.iso8601
end
end

View File

@@ -12,16 +12,14 @@ namespace :e621 do
desc "import e621 data from csv"
task :import_csv => :environment do |t, args|
batch_size = ENV["batch_size"]&.to_i
forks = ENV["forks"]&.to_i
start_at = ENV["start_at"]&.to_i
limit = ENV["limit"]&.to_i
csv_path = ENV["csv"] || raise("must supply `csv`")
LegacyImport::E621CsvPostImporter.new(
Domain::E621::CsvPostImporter.new(
csv_path: csv_path,
batch_size: batch_size,
forks: forks,
start_at: start_at,
limit: limit,
).run
end

View File

@@ -0,0 +1,66 @@
require "rails_helper"
describe Domain::E621::CsvPostImporter do
let(:csv_row) do
{
"id" => 12345,
"md5" => "1c6169aa51668681e9697a48144d7c78",
"updated_at" => "2023-08-24 07:35:09.171905",
"file_ext" => "jpg",
"description" => "a description",
"rating" => "s",
"score" => 67,
"up_score" => 69,
"down_score" => 2,
"fav_count" => 420,
"comment_count" => 1,
"change_seq" => 0,
"parent_id" => nil,
"is_deleted" => "f",
"is_pending" => "t",
"is_flagged" => "f",
"source" => "src1\nsrc2",
"tag_string" => "tag1 tag2",
}
end
it "imports new posts" do
expect do
described_class.import_row(csv_row)
end.to change(Domain::E621::Post, :count).by(1)
post = Domain::E621::Post.find_by(e621_id: csv_row["id"])
expect(SpecUtil.enqueued_jobs(Domain::E621::Job::StaticFileJob)).to match([
including(args: [{
post: post,
}]),
])
expect(post.md5).to eq(csv_row["md5"])
expect(post.file_url_str).to eq("https://static1.e621.net/data/1c/61/1c6169aa51668681e9697a48144d7c78.jpg")
expect(post.description).to eq(csv_row["description"])
expect(post.sources_array).to eq(["src1", "src2"])
expect(post.tags_array).to eq(["tag1", "tag2"])
expect(post.tags.map(&:name)).to match(["tag1", "tag2"])
expect(post.taggings.map(&:category).to_set).to eq(["cat_general"].to_set)
expect(post.e621_updated_at).to eq(Time.parse "2023-08-24T07:35:09-07:00")
end
it "does not touch posts updated after the csv" do
post = Domain::E621::Post.create!({
e621_id: csv_row["id"],
md5: csv_row["md5"],
tags_array: ["tag3", "tag4"],
})
new_time = "2023-08-25T07:35:09-07:00"
post.e621_updated_at = new_time
post.save!
expect do
described_class.import_row(csv_row)
end.to_not change(Domain::E621::Post, :count)
post.reload
expect(post.e621_updated_at).to eq(Time.parse new_time)
expect(post.tags_array).to eq(["tag3", "tag4"])
end
end