start e621 legacy import
This commit is contained in:
@@ -2,6 +2,7 @@ class LegacyImport::BulkImportJob
|
||||
attr_reader :logger_prefix
|
||||
|
||||
def logger
|
||||
@logger_prefix ||= "[last_id (nil)]"
|
||||
@logger ||= ColorLogger.make($stdout, self)
|
||||
end
|
||||
|
||||
@@ -9,9 +10,22 @@ class LegacyImport::BulkImportJob
|
||||
raise NotImplementedError.new("implement #name")
|
||||
end
|
||||
|
||||
def write_progress(progress)
|
||||
@logger_prefix = "[progress #{progress.to_s.bold}]"
|
||||
File.write("tmp/#{name}_progress", progress.to_s)
|
||||
def run_impl
|
||||
raise NotImplementedError.new("implement #run_impl")
|
||||
end
|
||||
|
||||
def run
|
||||
start_profiling!
|
||||
start_at = Time.now
|
||||
total_work = run_impl
|
||||
duration = (Time.now - start_at)
|
||||
logger.info "finish, total #{duration.round(1)}s, #{total_work} items, #{(total_work / duration).round(1)} items/s"
|
||||
end_profiling!
|
||||
end
|
||||
|
||||
def write_last_id(last_id)
|
||||
@logger_prefix = "[last_id #{last_id.to_s.bold}]"
|
||||
File.write("tmp/#{name}_progress", last_id.to_s)
|
||||
end
|
||||
|
||||
def start_profiling!
|
||||
|
||||
@@ -9,19 +9,19 @@ class LegacyImport::E621LegacyPostImporter < LegacyImport::BulkImportJob
|
||||
end
|
||||
|
||||
def name
|
||||
"e621_post_importer"
|
||||
"e621_legacy_post_importer"
|
||||
end
|
||||
|
||||
def profile?
|
||||
false
|
||||
end
|
||||
|
||||
def run
|
||||
start_profiling!
|
||||
|
||||
def run_impl
|
||||
progress = 0
|
||||
query = ::Legacy::E621::Post.includes(:blob_entry, :description_ref)
|
||||
query.find_in_batches(start: @start_at, batch_size: @batch_size) do |batch|
|
||||
query = ::Legacy::E621::Post.includes(:blob_entry, { taggings: :tag })
|
||||
# finish = @start_at + (2 * 32 * 32)
|
||||
finish = nil
|
||||
query.find_in_batches(start: @start_at, finish: finish, batch_size: @batch_size * @forks) do |batch|
|
||||
last_id = batch.last&.id
|
||||
|
||||
if @forks <= 1
|
||||
@@ -33,11 +33,10 @@ class LegacyImport::E621LegacyPostImporter < LegacyImport::BulkImportJob
|
||||
end
|
||||
|
||||
rate = progress.to_f / (Time.now - @start_time)
|
||||
puts "finish batch, last id #{last_id} - #{progress} - #{rate.round(1)} / second"
|
||||
write_progress last_id
|
||||
logger.info "finish batch, last id #{last_id} - #{progress} - #{rate.round(1)} / second"
|
||||
write_last_id last_id
|
||||
end
|
||||
|
||||
stop_profiling!
|
||||
progress
|
||||
end
|
||||
|
||||
private
|
||||
@@ -63,6 +62,7 @@ class LegacyImport::E621LegacyPostImporter < LegacyImport::BulkImportJob
|
||||
unless post.valid?
|
||||
raise("error building post #{post.id} / #{post.e621_id}: #{post.errors.full_messages}")
|
||||
end
|
||||
# binding.pry
|
||||
post.save!
|
||||
progress += 1
|
||||
end
|
||||
|
||||
@@ -36,7 +36,7 @@ module LiteTrail::ActiveRecordClassMethods
|
||||
map_attribute: map_attribute,
|
||||
}
|
||||
|
||||
if separate_versions_table.nil?
|
||||
if !separate_versions_table
|
||||
# using the polymorphic versions table
|
||||
has_many :versions,
|
||||
-> { order(created_at: :asc) },
|
||||
|
||||
@@ -28,12 +28,14 @@ class BlobEntry < ReduxApplicationRecord
|
||||
end
|
||||
|
||||
def contents
|
||||
contents_raw = self.read_attribute(:contents)
|
||||
if self.base
|
||||
@contents_base_patched ||= XDiff.patch(self.base.contents, contents_raw)
|
||||
else
|
||||
contents_raw
|
||||
end
|
||||
@contents ||= begin
|
||||
contents_raw = self.read_attribute(:contents)
|
||||
if self.base
|
||||
XDiff.patch(self.base.contents, contents_raw)
|
||||
else
|
||||
contents_raw
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def bytes_stored
|
||||
|
||||
@@ -3,7 +3,8 @@ class Domain::E621::Post < ReduxApplicationRecord
|
||||
has_lite_trail(schema_version: 1, separate_versions_table: true)
|
||||
|
||||
# see state_detail for scan_error/file_error
|
||||
enum state: [:ok, :scan_error, :file_error]
|
||||
enum state: %i[ok scan_error, file_error]
|
||||
enum rating: %i[safe questionable explicit]
|
||||
|
||||
validates_presence_of(
|
||||
:e621_id,
|
||||
@@ -11,12 +12,108 @@ class Domain::E621::Post < ReduxApplicationRecord
|
||||
)
|
||||
after_initialize do
|
||||
self.state ||= :ok
|
||||
self.state_detail ||= {}
|
||||
self.flags_array ||= []
|
||||
self.pools_array ||= []
|
||||
self.sources_array ||= []
|
||||
self.artists_array ||= []
|
||||
self.tags_array ||= []
|
||||
end
|
||||
|
||||
has_many :taggings,
|
||||
class_name: "Domain::E621::Tagging"
|
||||
class_name: "Domain::E621::Tagging"
|
||||
|
||||
has_many :tags,
|
||||
class_name: "Domain::E621::Tag",
|
||||
through: :taggings
|
||||
class_name: "Domain::E621::Tag",
|
||||
through: :taggings
|
||||
|
||||
# If the file was scraped, this is the blob entry that represents it
|
||||
belongs_to :file,
|
||||
class_name: "::HttpLogEntry",
|
||||
optional: :true,
|
||||
autosave: true
|
||||
|
||||
SKIP_MISMATCH_LEGACY_IDS = Set.new([
|
||||
836414,
|
||||
])
|
||||
|
||||
def self.find_or_build_from_legacy(legacy_model)
|
||||
model = self.find_by(e621_id: legacy_model.e621_id)
|
||||
return model if model
|
||||
model = self.new({
|
||||
state: :ok,
|
||||
file_url_str: legacy_model.file_url,
|
||||
rating: legacy_model.class.ratings[legacy_model.rating],
|
||||
sources_array: legacy_model.sources,
|
||||
tags_array: legacy_model.tags.map(&:value),
|
||||
artists_array: legacy_model.artists || [],
|
||||
})
|
||||
|
||||
if legacy_model.e621_status != "active"
|
||||
model.flags_array << legacy_model.e621_status
|
||||
model.flags_array.uniq!
|
||||
end
|
||||
|
||||
[
|
||||
:e621_id,
|
||||
:md5,
|
||||
:description,
|
||||
:score,
|
||||
:created_at,
|
||||
].each do |attr|
|
||||
model.send(:"#{attr}=", legacy_model.send(attr))
|
||||
end
|
||||
|
||||
http_log_entries = ::HttpLogEntry.where(
|
||||
uri_host: model.file_uri.host,
|
||||
uri_path: model.file_uri.path,
|
||||
)
|
||||
http_log_entry = http_log_entries.first
|
||||
|
||||
if !http_log_entry && legacy_model.blob_entry
|
||||
legacy_hles = ::Legacy::HttpLogEntry.where(
|
||||
host: model.file_uri.host,
|
||||
path: model.file_uri.path,
|
||||
)
|
||||
legacy_hle = legacy_hles.first
|
||||
|
||||
if legacy_hle
|
||||
http_log_entry = ::HttpLogEntry.build_from_legacy(legacy_hle)
|
||||
else
|
||||
http_log_entry = ::HttpLogEntry.new({
|
||||
uri: model.file_uri || raise,
|
||||
status_code: 200,
|
||||
verb: "get",
|
||||
response_time_ms: -1,
|
||||
requested_at: Time.now,
|
||||
request_headers: ::HttpLogEntryHeader.empty,
|
||||
response_headers: ::HttpLogEntryHeader.empty,
|
||||
performed_by: "legacy",
|
||||
})
|
||||
end
|
||||
|
||||
http_log_entry.response ||= ::BlobEntry.find_or_build_from_legacy(legacy_model.blob_entry)
|
||||
blob_entry = http_log_entry.response
|
||||
http_log_entry.content_type ||= blob_entry.content_type
|
||||
|
||||
raise("#{http_log_entry.content_type} != #{blob_entry.content_type}") unless blob_entry.content_type.start_with?(http_log_entry.content_type)
|
||||
end
|
||||
|
||||
if http_log_entry
|
||||
blob_entry = http_log_entry.response
|
||||
|
||||
if model.md5 != Digest::MD5.hexdigest(blob_entry.contents)
|
||||
if http_log_entry.status_code != 404
|
||||
raise("#{model.md5} != #{Digest::MD5.hexdigest(blob_entry.contents)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
model.file = http_log_entry
|
||||
model
|
||||
end
|
||||
|
||||
def file_uri
|
||||
Addressable::URI.parse(self.file_url_str) if self.file_url_str.present?
|
||||
end
|
||||
end
|
||||
|
||||
@@ -40,39 +40,40 @@ class HttpLogEntry < ReduxApplicationRecord
|
||||
|
||||
def self.build_from_legacy(legacy_model)
|
||||
response_body = legacy_model.response_body
|
||||
can_reconstruct_from_be =
|
||||
can_reconstruct_be =
|
||||
response_body.nil? &&
|
||||
legacy_model.parent_log_entry_id.nil? &&
|
||||
legacy_model.resp_body.present? &&
|
||||
legacy_model.blob_entry.present?
|
||||
|
||||
if can_reconstruct_from_be
|
||||
if can_reconstruct_be
|
||||
blob_entry = ::BlobEntry.find_or_build_from_legacy(legacy_model.blob_entry)
|
||||
blob_sha256 = HexUtil.hex2bin(legacy_model.resp_body)
|
||||
unless blob_entry.sha256 == blob_sha256
|
||||
raise("mismatch for legacy http entry #{legacy_model.id} / legacy blob entry #{legacy_model.blob_entry.id}")
|
||||
end
|
||||
|
||||
uri = Addressable::URI.parse(legacy_model.full_path)
|
||||
uri.scheme ||= "https"
|
||||
uri.path ||= "/"
|
||||
|
||||
record = ::HttpLogEntry.new({
|
||||
verb: legacy_model.verb,
|
||||
uri: uri,
|
||||
content_type: legacy_model.content_type,
|
||||
status_code: legacy_model.status,
|
||||
response_time_ms: legacy_model.response_time,
|
||||
request_headers: ::HttpLogEntryHeader.find_or_build(headers: legacy_model.req_headers),
|
||||
response_headers: ::HttpLogEntryHeader.find_or_build(headers: legacy_model.res_headers),
|
||||
response: blob_entry,
|
||||
requested_at: legacy_model.requested_at,
|
||||
created_at: legacy_model.created_at,
|
||||
updated_at: legacy_model.updated_at,
|
||||
performed_by: "legacy",
|
||||
})
|
||||
return record
|
||||
else
|
||||
blob_entry = nil
|
||||
end
|
||||
|
||||
uri = Addressable::URI.parse(legacy_model.full_path)
|
||||
uri.scheme ||= "https"
|
||||
uri.path ||= "/"
|
||||
|
||||
::HttpLogEntry.new({
|
||||
verb: legacy_model.verb,
|
||||
uri: uri,
|
||||
content_type: legacy_model.content_type,
|
||||
status_code: legacy_model.status,
|
||||
response_time_ms: legacy_model.response_time,
|
||||
request_headers: ::HttpLogEntryHeader.find_or_build(headers: legacy_model.req_headers),
|
||||
response_headers: ::HttpLogEntryHeader.find_or_build(headers: legacy_model.res_headers),
|
||||
response: blob_entry,
|
||||
requested_at: legacy_model.requested_at,
|
||||
created_at: legacy_model.created_at,
|
||||
updated_at: legacy_model.updated_at,
|
||||
performed_by: "legacy",
|
||||
})
|
||||
end
|
||||
|
||||
def uri=(uri)
|
||||
|
||||
@@ -51,4 +51,8 @@ class HttpLogEntryHeader < ReduxApplicationRecord
|
||||
headers: headers,
|
||||
}
|
||||
end
|
||||
|
||||
def self.empty
|
||||
@empty_model ||= self.find_or_create(headers: {})
|
||||
end
|
||||
end
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
#
|
||||
|
||||
class Legacy::E621::Tagging < LegacyApplicationRecord
|
||||
self.table_name = "e621_taggings"
|
||||
|
||||
belongs_to :post, class_name: "Legacy::E621::Post"
|
||||
belongs_to :tag, class_name: "Legacy::E621::Tag"
|
||||
|
||||
|
||||
@@ -5,51 +5,32 @@ class CreateDomainE621Posts < ActiveRecord::Migration[7.0]
|
||||
t.integer :state, null: false
|
||||
t.jsonb :state_detail
|
||||
|
||||
t.string :md5
|
||||
t.string :file_url_str
|
||||
t.string :description
|
||||
t.integer :rating
|
||||
t.integer :score
|
||||
t.integer :up_score
|
||||
t.integer :down_score
|
||||
t.integer :status
|
||||
t.integer :favorites
|
||||
|
||||
t.integer :file_width
|
||||
t.integer :file_height
|
||||
t.integer :score_up
|
||||
t.integer :score_down
|
||||
t.integer :num_favorites
|
||||
t.integer :num_comments
|
||||
t.integer :change_seq
|
||||
|
||||
t.jsonb :flags_array
|
||||
t.jsonb :pools_array
|
||||
t.jsonb :sources_array
|
||||
t.jsonb :artists_array
|
||||
t.jsonb :tags_array
|
||||
|
||||
t.references :file
|
||||
t.references :parent_e621
|
||||
|
||||
t.timestamps
|
||||
|
||||
t.index :e621_id, unique: :true
|
||||
t.index :md5, unique: :true
|
||||
end
|
||||
|
||||
create_versions_table :domain_e621_posts
|
||||
end
|
||||
end
|
||||
|
||||
# id :integer not null, primary key
|
||||
# e621_id :integer not null
|
||||
# md5 :string not null
|
||||
# sources :string
|
||||
# file_url :string not null
|
||||
# file_ext :string not null
|
||||
# description :string
|
||||
# rating :integer
|
||||
# width :integer
|
||||
# height :integer not null
|
||||
# tags_string :string not null
|
||||
# status :integer
|
||||
# score :integer
|
||||
# removed :boolean
|
||||
# created_at :datetime not null
|
||||
# updated_at :datetime not null
|
||||
# artists :string
|
||||
# e621_count :integer
|
||||
# author :string
|
||||
# e621_status :string
|
||||
# blob_entry_id :integer
|
||||
# imgsearch_entry_id :integer
|
||||
|
||||
18
db/schema.rb
generated
18
db/schema.rb
generated
@@ -51,7 +51,7 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_01_013456) do
|
||||
t.integer "schema_version"
|
||||
t.string "event", null: false
|
||||
t.jsonb "diff"
|
||||
t.datetime "created_at"
|
||||
t.datetime "created_at", null: false
|
||||
t.index ["item_id"], name: "index_domain_e621_post_versions_on_item_id"
|
||||
end
|
||||
|
||||
@@ -59,17 +59,20 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_01_013456) do
|
||||
t.integer "e621_id", null: false
|
||||
t.integer "state", null: false
|
||||
t.jsonb "state_detail"
|
||||
t.string "md5"
|
||||
t.string "file_url_str"
|
||||
t.string "description"
|
||||
t.integer "rating"
|
||||
t.integer "score"
|
||||
t.integer "up_score"
|
||||
t.integer "down_score"
|
||||
t.integer "status"
|
||||
t.integer "favorites"
|
||||
t.integer "file_width"
|
||||
t.integer "file_height"
|
||||
t.integer "score_up"
|
||||
t.integer "score_down"
|
||||
t.integer "num_favorites"
|
||||
t.integer "num_comments"
|
||||
t.integer "change_seq"
|
||||
t.jsonb "flags_array"
|
||||
t.jsonb "pools_array"
|
||||
t.jsonb "sources_array"
|
||||
t.jsonb "artists_array"
|
||||
t.jsonb "tags_array"
|
||||
t.bigint "file_id"
|
||||
t.bigint "parent_e621_id"
|
||||
@@ -77,6 +80,7 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_01_013456) do
|
||||
t.datetime "updated_at", null: false
|
||||
t.index ["e621_id"], name: "index_domain_e621_posts_on_e621_id", unique: true
|
||||
t.index ["file_id"], name: "index_domain_e621_posts_on_file_id"
|
||||
t.index ["md5"], name: "index_domain_e621_posts_on_md5", unique: true
|
||||
t.index ["parent_e621_id"], name: "index_domain_e621_posts_on_parent_e621_id"
|
||||
end
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ class Domain::E621::PostTest < ActiveSupport::TestCase
|
||||
assert_equal post.updated_at, updated_rating_version.created_at
|
||||
}
|
||||
|
||||
post.rating = 10
|
||||
post.rating = "safe"
|
||||
|
||||
assert post.save
|
||||
check_update.call
|
||||
|
||||
Reference in New Issue
Block a user