123 lines
3.4 KiB
Ruby
123 lines
3.4 KiB
Ruby
require "csv"
|
|
|
|
class Domain::E621::CsvPostImporter < LegacyImport::BulkImportJob
|
|
def initialize(csv_path:, start_at:, limit: nil)
|
|
logger.info "loading..."
|
|
@csv_file = CSV.new(File.open(csv_path, "r+"), headers: true)
|
|
logger.info "loaded"
|
|
@start_at = start_at || 0
|
|
@start_time = Time.now
|
|
@limit = limit
|
|
|
|
logger.info "start_at=#{@start_at} limit=#{limit}"
|
|
end
|
|
|
|
def name
|
|
"e621_csv_post_importer"
|
|
end
|
|
|
|
def profile?
|
|
false
|
|
end
|
|
|
|
def run_impl
|
|
progress = 0
|
|
for row in @csv_file.each
|
|
row = row.to_h
|
|
e621_id = row["id"].to_i
|
|
if @start_at && e621_id >= @start_at
|
|
@start_time = Time.now
|
|
next
|
|
end
|
|
progress += self.class.import_row(row)
|
|
write_last_id e621_id
|
|
break if @limit && progress >= @limit
|
|
end
|
|
# while row = @csv_file.shift&.to_h
|
|
# e621_id = row["id"].to_i
|
|
# if @start_at && e621_id < @start_at
|
|
# @start_time = Time.now
|
|
# next
|
|
# end
|
|
# progress += self.class.import_row(row)
|
|
# write_last_id e621_id
|
|
# break if @limit && progress >= @limit
|
|
# end
|
|
|
|
progress
|
|
end
|
|
|
|
def self.import_row(row)
|
|
e621_id = row["id"].to_i
|
|
md5 = row["md5"]
|
|
|
|
logger.prefix = proc { "[e621_id #{e621_id.to_s.bold}]" }
|
|
|
|
post =
|
|
Domain::E621::Post.find_by({ e621_id: e621_id }) ||
|
|
Domain::E621::Post.new({ e621_id: e621_id, md5: md5 })
|
|
|
|
if post.md5 && post.md5 != md5
|
|
logger.error("md5 mismatch, skipping")
|
|
return 0
|
|
end
|
|
|
|
if (updated_at = row["updated_at"])
|
|
csv_e621_updated_at = Time.parse(row["updated_at"])
|
|
if post.e621_updated_at && post.e621_updated_at > csv_e621_updated_at
|
|
logger.info("model more recent than csv, skipping")
|
|
return 0
|
|
end
|
|
post.e621_updated_at = csv_e621_updated_at
|
|
else
|
|
logger.warn("no updated_at in csv row")
|
|
end
|
|
|
|
file_ext = row["file_ext"]
|
|
post.file_url_str =
|
|
"https://static1.e621.net/data/#{md5[0...2]}/#{md5[2...4]}/#{md5}.#{file_ext}"
|
|
post.description = row["description"]
|
|
post.rating = row["rating"]
|
|
post.score = row["score"].to_i
|
|
post.score_up = row["up_score"].to_i
|
|
post.score_down = row["down_score"].to_i
|
|
post.num_favorites = row["fav_count"].to_i
|
|
post.num_comments = row["comment_count"].to_i
|
|
post.change_seq = row["change_seq"].to_i
|
|
post.parent_e621_id = row["parent_id"]&.to_i
|
|
|
|
flags_array = []
|
|
flags_array << "deleted" if row["is_deleted"] == "t"
|
|
flags_array << "pending" if row["is_pending"] == "t"
|
|
flags_array << "flagged" if row["is_flagged"] == "t"
|
|
post.flags_array = flags_array
|
|
post.sources_array = row["source"].split(/[\n\s]/).map(&:strip)
|
|
post.tags_array = row["tag_string"].split(/[\n\s]/).map(&:strip).sort
|
|
|
|
name_to_tag_id, missing_tags =
|
|
Domain::E621::TagUtil.tag_names_to_id_map(post.tags_array, posts: [post])
|
|
name_to_tag_id.merge!(
|
|
Domain::E621::TagUtil.create_tags_from_names(missing_tags)
|
|
)
|
|
|
|
new_record = post.new_record?
|
|
Domain::E621::Post.transaction do
|
|
post.save!
|
|
Domain::E621::TagUtil.update_tags_on_post(
|
|
post,
|
|
post.tags_array,
|
|
name_to_tag_id
|
|
)
|
|
end
|
|
|
|
Domain::E621::Job::StaticFileJob.perform_later({ post: post }) if new_record
|
|
|
|
logger.info("updated post (new? #{new_record ? "yes" : "no"})")
|
|
|
|
new_record ? 1 : 0
|
|
rescue => e
|
|
logger.error("error importing post: #{e.message}")
|
|
0
|
|
end
|
|
end
|