unify listing page scanning logic

This commit is contained in:
Dylan Knutson
2025-02-21 18:55:49 +00:00
parent fe0711c7d9
commit 50d875982a
9 changed files with 198 additions and 253 deletions

View File

@@ -21,7 +21,7 @@
- [ ] limit number of users, or paginate for "users who favorited this post" page
- [ ] manual good job runner does not indicate if the job threw an exception - check return value of #perform, maybe?
- [ ] FA user favs job should stop when in incremental mode when all posts on the page are already known favs (e.g. pages with only 47 posts are not a false positive)
- [ ] Factor out FA listings page enqueue logic into common location; use in Gallery and Favs jobs
- [x] Factor out FA listings page enqueue logic into common location; use in Gallery and Favs jobs
- [ ] Add followers / following to FA user show page
- [ ] Parse E621 source url for inkbunny posts & users
- [ ] Parse E621 source url for fa users

View File

@@ -1,18 +1,15 @@
# typed: strict
class Domain::Fa::Job::Base < Scraper::JobBase
abstract!
discard_on ActiveJob::DeserializationError
include HasBulkEnqueueJobs
sig { override.returns(Symbol) }
def self.http_factory_method
:get_fa_http_client
end
sig { params(args: T.untyped).void }
def initialize(*args)
super(*T.unsafe(args))
@posts_enqueued_for_scan = T.let(Set.new, T::Set[Integer])
end
protected
sig { params(build_post: T::Boolean).returns(Domain::Post::FaPost) }
@@ -106,109 +103,108 @@ class Domain::Fa::Job::Base < Scraper::JobBase
return true
end
ListingsPageScanStats = Struct.new(:new_seen, :total_seen, :last_was_new)
class ListingPageScanStats < T::Struct
include T::Struct::ActsAsComparable
const :new_posts, T::Array[Domain::Post::FaPost]
const :all_posts, T::Array[Domain::Post::FaPost]
end
module ListingPageType
extend T::Sig
class BrowsePage < T::Struct
extend T::Sig
const :page_number, Integer
end
class GalleryPage < T::Struct
const :page_number, Integer
const :folder, String
end
class FavsPage < T::Struct
const :page_number, T.nilable(String)
const :user, Domain::User::FaUser
end
Type = T.type_alias { T.any(BrowsePage, GalleryPage, FavsPage) }
sig { params(page_type: Type).returns(String) }
def self.describe(page_type)
case page_type
when BrowsePage
"browse"
when GalleryPage
"folder '#{page_type.folder}'"
when FavsPage
"favs"
end
end
end
sig do
params(
job_type: Symbol,
page: T.untyped,
enqueue_posts_pri: Symbol,
enqueue_page_scan: T::Boolean,
enqueue_gallery_scan: T::Boolean,
page_desc: T.nilable(String),
).returns(ListingsPageScanStats)
page_type: ListingPageType::Type,
page_parser: Domain::Fa::Parser::Page,
).returns(ListingPageScanStats)
end
def update_and_enqueue_posts_from_listings_page(
job_type,
page,
enqueue_posts_pri:,
enqueue_page_scan: true,
enqueue_gallery_scan: true,
page_desc: nil
)
fatal_error("not a listings page") unless page.probably_listings_page?
submissions = page.submissions_parsed
def update_and_enqueue_posts_from_listings_page(page_type, page_parser:)
fatal_error("not a listing page") unless page_parser.probably_listings_page?
page_desc = (page_desc ? "page #{page_desc.to_s.bold}" : "page")
all_posts = T.let([], T::Array[Domain::Post::FaPost])
new_posts = T.let([], T::Array[Domain::Post::FaPost])
posts_to_save = T.let([], T::Array[Domain::Post::FaPost])
listing_page_stats = ListingsPageScanStats.new(0, 0, false)
submissions.each do |submission|
post = Domain::Post::FaPost.find_or_initialize_by(fa_id: submission.id)
listing_page_stats.last_was_new = post.new_record?
listing_page_stats.new_seen += 1 if post.new_record?
listing_page_stats.total_seen += 1
page_parser.submissions_parsed.each do |submission|
post =
Domain::Post::FaPost.find_or_initialize_by_submission_parser(
submission,
first_seen_log_entry: last_log_entry,
)
update_and_save_post_from_listings_page(job_type, post, submission)
if creator = post.creator
enqueue_user_scan(creator, enqueue_page_scan:, enqueue_gallery_scan:)
case page_type
when ListingPageType::BrowsePage
post.first_browse_page ||= last_log_entry
when ListingPageType::GalleryPage
post.first_gallery_page ||= last_log_entry
end
if post.state_ok?
enqueue_post_scan(post, enqueue_posts_pri)
elsif post.state_removed?
logger.info "(todo) removed post seen in listing page, enqueue scan for fa_id #{post.fa_id}"
elsif post.state_scan_error?
logger.info "(todo) scan_error'd post seen in listing page for fa_id #{post.fa_id}"
elsif post.state_file_error?
logger.info "(todo) file_error'd post seen in listing page for fa_id #{post.fa_id}"
else
logger.info(
format_tags(
"unknown post state",
make_tag("post.state", post.state),
make_tag("post.fa_id", post.fa_id),
),
)
all_posts << post
new_posts << post if post.new_record?
if post.new_record? || !post.state_ok? || post.scanned_at.blank? ||
post.file&.state_terminal_error?
post.state_ok!
posts_to_save << post
defer_job(Domain::Fa::Job::ScanPostJob, { post: })
end
if (post_file = post.file) && post_file.url_str.present? &&
post_file.log_entry.nil? && !post_file.state_terminal_error?
defer_job(Domain::Fa::Job::ScanFileJob, { post_file: })
end
if creator = post.creator
enqueue_user_scan(creator)
end
end
posts_to_save.each(&:save!)
logger.info(
format_tags(
make_tag("page_desc", page_desc),
make_tag("num_posts", submissions.count.to_s.bold),
make_tag("num_new_posts", listing_page_stats.new_seen.to_s.bold),
make_tag("page_number", page_type.page_number),
make_tag("page_type", ListingPageType.describe(page_type)),
make_tag("all_posts.count", all_posts.count),
make_tag("new_posts.count", new_posts.count),
),
)
listing_page_stats
ListingPageScanStats.new(new_posts:, all_posts:)
end
sig do
params(
job_type: Symbol,
post: Domain::Post::FaPost,
submission: T.untyped,
).void
end
def update_and_save_post_from_listings_page(job_type, post, submission)
if job_type == :browse_page
post.first_browse_page_id ||= causing_log_entry&.id
elsif job_type == :gallery_page
post.first_gallery_page_id ||= causing_log_entry&.id
else
fatal_error("unhandled job_type: #{job_type}")
end
post.creator ||=
Domain::User::FaUser.find_or_build_from_submission_parser(submission)
post.title = submission.title || fatal_error("blank title")
post.save!
end
sig do
params(
user: Domain::User::FaUser,
enqueue_page_scan: T::Boolean,
enqueue_gallery_scan: T::Boolean,
enqueue_favs_scan: T::Boolean,
).void
end
def enqueue_user_scan(
user,
enqueue_page_scan: true,
enqueue_gallery_scan: true,
enqueue_favs_scan: true
)
sig { params(user: Domain::User::FaUser).void }
def enqueue_user_scan(user)
users_enqueued_for_page_scan ||= Set.new
users_enqueued_for_gallery_scan ||= Set.new
users_enqueued_for_favs_scan ||= Set.new
@@ -219,16 +215,16 @@ class Domain::Fa::Job::Base < Scraper::JobBase
{ user: user }
else
unless user.url_name
logger.warn format_tags(
"no user url_name and is new record, skipping",
)
logger.warn(
format_tags("url_name or id, skipping enqueue_user_scan"),
)
return
end
{ url_name: user.url_name }
end
if enqueue_page_scan && user.due_for_page_scan? &&
if user.due_for_page_scan? &&
defer_job(Domain::Fa::Job::UserPageJob, args)
logger.info(
format_tags(
@@ -238,7 +234,7 @@ class Domain::Fa::Job::Base < Scraper::JobBase
)
end
if enqueue_gallery_scan && user.due_for_gallery_scan? &&
if user.due_for_gallery_scan? &&
defer_job(Domain::Fa::Job::UserGalleryJob, args)
logger.info(
format_tags(
@@ -251,8 +247,7 @@ class Domain::Fa::Job::Base < Scraper::JobBase
)
end
if enqueue_favs_scan && user.due_for_favs_scan? &&
defer_job(Domain::Fa::Job::FavsJob, args)
if user.due_for_favs_scan? && defer_job(Domain::Fa::Job::FavsJob, args)
logger.info(
format_tags(
"enqueue user favs job",
@@ -263,70 +258,6 @@ class Domain::Fa::Job::Base < Scraper::JobBase
end
end
sig { params(enqueue_pri: T.nilable(Symbol)).returns(Integer) }
def self.normalize_enqueue_pri(enqueue_pri)
case enqueue_pri
when :low
-5
when :high
-15
else
-10
end
end
sig { params(fa_id: Integer, enqueue_pri: T.nilable(Symbol)).void }
def enqueue_fa_id_scan(fa_id, enqueue_pri = nil)
enqueue_pri = self.class.normalize_enqueue_pri(enqueue_pri)
logger.tagged(make_tag("fa_id", fa_id)) do
if @posts_enqueued_for_scan.add?(fa_id) &&
defer_job(
Domain::Fa::Job::ScanPostJob,
{ fa_id: fa_id },
{ priority: enqueue_pri },
)
logger.info format_tags("enqueue post scan", make_tag("fa_id", fa_id))
end
end
end
sig do
params(post: Domain::Post::FaPost, enqueue_pri: T.nilable(Symbol)).void
end
def enqueue_post_scan(post, enqueue_pri = nil)
enqueue_pri = self.class.normalize_enqueue_pri(enqueue_pri)
logger.tagged(make_arg_tag(post)) do
if @posts_enqueued_for_scan.add?(T.must(post.fa_id))
fa_id_str = (post.fa_id || "(nil)").to_s.bold
if !post.scanned_at.present?
if defer_job(
Domain::Fa::Job::ScanPostJob,
{ post: post },
{ priority: enqueue_pri },
)
logger.info format_tags(
"enqueue post scan",
make_tag("fa_id", fa_id_str),
)
end
elsif (post_file = post.file) && post_file.url_str.present? &&
post_file.log_entry.nil?
if defer_job(
Domain::Fa::Job::ScanFileJob,
{ post_file: },
{ priority: enqueue_pri },
)
logger.info format_tags(
"enqueue file scan",
make_tag("fa_id", fa_id_str),
make_tag("post_file.id", post_file.id),
)
end
end
end
end
end
sig do
params(fa_ids: T::Array[Integer]).returns(T::Array[Domain::Post::FaPost])
end

View File

@@ -60,14 +60,12 @@ class Domain::Fa::Job::BrowsePageJob < Domain::Fa::Job::Base
page = Domain::Fa::Parser::Page.new(response.body)
listing_page_stats =
update_and_enqueue_posts_from_listings_page(
:browse_page,
page,
enqueue_posts_pri: :high,
page_desc: "Browse@#{@page_number}",
ListingPageType::BrowsePage.new(page_number: @page_number),
page_parser: page,
)
@total_num_new_posts_seen += listing_page_stats.new_seen
@total_num_posts_seen += listing_page_stats.total_seen
listing_page_stats.new_seen > 0
@total_num_new_posts_seen += listing_page_stats.new_posts.count
@total_num_posts_seen += listing_page_stats.all_posts.count
listing_page_stats.new_posts.count > 0
end
end

View File

@@ -111,7 +111,6 @@ class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
const :faved_post_ids_on_page, T::Set[Integer]
const :posts_created_ids, T::Set[Integer]
const :keep_scanning, T::Boolean
const :listings_page, Domain::Fa::Parser::Page
end
Result = T.type_alias { T.any(Stop, Ok) }
@@ -125,6 +124,7 @@ class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
else
"https://www.furaffinity.net/favorites/#{user.url_name}/"
end
response = http_client.get(url)
if response.status_code != 200
fatal_error(
@@ -142,48 +142,62 @@ class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
return ScanPageResult::Stop.new
end
page = Domain::Fa::Parser::Page.new(response.body)
fatal_error("not a favs listing page") unless page.probably_listings_page?
submissions = page.submissions_parsed
@page_id = page.favorites_next_button_id
existing_fa_id_to_post_id =
Domain::Post::FaPost
.where(fa_id: submissions.map(&:id))
.pluck(:fa_id, :id)
.to_h
created_posts = T.let([], T::Array[Domain::Post::FaPost])
posts_to_save = T.let([], T::Array[Domain::Post::FaPost])
submissions.each do |submission_parser_helper|
post =
Domain::Post::FaPost.find_or_initialize_by_submission_parser(
submission_parser_helper,
first_seen_log_entry: response.log_entry,
)
created_posts << post if post.new_record?
if post.new_record? || !post.state_ok?
posts_to_save << post
post.state_ok!
post.enqueue_job_after_save(
Domain::Fa::Job::ScanPostJob,
{ post:, caused_by_entry: causing_log_entry },
)
end
end
bulk_enqueue_jobs { posts_to_save.each(&:save!) }
last_page_post_ids = T.let(Set.new, T::Set[Integer])
created_posts.each { |post| last_page_post_ids.add(T.must(post.id)) }
existing_fa_id_to_post_id.values.each { |id| last_page_post_ids.add(id) }
page_parser = Domain::Fa::Parser::Page.new(response.body)
listing_page_stats =
update_and_enqueue_posts_from_listings_page(
ListingPageType::FavsPage.new(page_number: @page_id, user:),
page_parser:,
)
@page_id = page_parser.favorites_next_button_id
ScanPageResult::Ok.new(
faved_post_ids_on_page: last_page_post_ids,
posts_created_ids: created_posts.map(&:id).compact.to_set,
faved_post_ids_on_page:
listing_page_stats.all_posts.map(&:id).compact.to_set,
posts_created_ids: listing_page_stats.new_posts.map(&:id).compact.to_set,
keep_scanning: @page_id.present?,
listings_page: page,
)
# unless page_parser.probably_listings_page?
# fatal_error("not a favs listing page")
# end
# submissions = page_parser.submissions_parsed
# existing_fa_id_to_post_id =
# Domain::Post::FaPost
# .where(fa_id: submissions.map(&:id))
# .pluck(:fa_id, :id)
# .to_h
# created_posts = T.let([], T::Array[Domain::Post::FaPost])
# posts_to_save = T.let([], T::Array[Domain::Post::FaPost])
# submissions.each do |submission_parser_helper|
# post =
# Domain::Post::FaPost.find_or_initialize_by_submission_parser(
# submission_parser_helper,
# first_seen_log_entry: response.log_entry,
# )
# created_posts << post if post.new_record?
# if post.new_record? || !post.state_ok?
# posts_to_save << post
# post.state_ok!
# post.enqueue_job_after_save(
# Domain::Fa::Job::ScanPostJob,
# { post:, caused_by_entry: causing_log_entry },
# )
# end
# end
# bulk_enqueue_jobs { posts_to_save.each(&:save!) }
# last_page_post_ids = T.let(Set.new, T::Set[Integer])
# created_posts.each { |post| last_page_post_ids.add(T.must(post.id)) }
# existing_fa_id_to_post_id.values.each { |id| last_page_post_ids.add(id) }
# ScanPageResult::Ok.new(
# faved_post_ids_on_page: last_page_post_ids,
# posts_created_ids: created_posts.map(&:id).compact.to_set,
# keep_scanning: @page_id.present?,
# page_parser: page,
# )
end
end

View File

@@ -121,21 +121,19 @@ class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::Base
listing_page_stats =
update_and_enqueue_posts_from_listings_page(
:gallery_page,
page,
enqueue_posts_pri: :low,
enqueue_gallery_scan: false,
enqueue_page_scan: false,
page_desc: "#{folder.title}@#{page_number}",
ListingPageType::GalleryPage.new(page_number:, folder: folder.title),
page_parser: page,
)
total_num_new_posts_seen += listing_page_stats.new_seen
total_num_posts_seen += listing_page_stats.total_seen
total_num_new_posts_seen += listing_page_stats.new_posts.count
total_num_posts_seen += listing_page_stats.all_posts.count
logger.info format_tags(
make_tag("page_number", page_number),
make_tag("new on page", listing_page_stats.new_seen),
make_tag("total on page", listing_page_stats.total_seen),
)
logger.info(
format_tags(
make_tag("page_number", page_number),
make_tag("new on page", listing_page_stats.new_posts.count),
make_tag("total on page", listing_page_stats.all_posts.count),
),
)
if scan_folders?
page.submission_folders.each do |sf|
@@ -146,15 +144,18 @@ class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::Base
end
page_number += 1
break if listing_page_stats.new_seen == 0 && !@go_until_end
break if listing_page_stats.total_seen < LISTINGS_PER_PAGE_THRESHOLD
break if listing_page_stats.new_posts.empty? && !@go_until_end
break if listing_page_stats.all_posts.count < LISTINGS_PER_PAGE_THRESHOLD
end
logger.info format_tags(
"complete",
make_tag("num_new", total_num_new_posts_seen),
make_tag("num_total", total_num_posts_seen),
)
logger.info(
format_tags(
"complete",
make_tag("num_new", total_num_new_posts_seen),
make_tag("num_total", total_num_posts_seen),
),
)
:continue
end

View File

@@ -54,6 +54,7 @@ class Scraper::JobBase < ApplicationJob
def around_request(proc)
response = proc.call
@job.first_log_entry ||= response.log_entry
@job.last_log_entry = response.log_entry
response
end
end
@@ -65,6 +66,7 @@ class Scraper::JobBase < ApplicationJob
@http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@gallery_dl_client = T.let(nil, T.nilable(Scraper::GalleryDlClient))
@first_log_entry = T.let(nil, T.nilable(HttpLogEntry))
@last_log_entry = T.let(nil, T.nilable(HttpLogEntry))
end
sig { abstract.returns(Symbol) }
@@ -104,15 +106,12 @@ class Scraper::JobBase < ApplicationJob
# The primary log entry for this job. Typically, this is the first request
# that was performed by this job.
sig { params(log_entry: HttpLogEntry).void }
def first_log_entry=(log_entry)
@first_log_entry = log_entry
end
sig { returns(T.nilable(HttpLogEntry)) }
def first_log_entry
@first_log_entry
end
attr_accessor :first_log_entry
# The last log entry for this job.
sig { returns(T.nilable(HttpLogEntry)) }
attr_accessor :last_log_entry
# The log entry considered to be the cause of jobs that this job enqueues.
sig { returns(T.nilable(HttpLogEntry)) }

View File

@@ -75,19 +75,6 @@ RSpec.describe BlobEntriesController, type: :controller do
expect(response.body).not_to be_nil
end
it "has the x-sendfile header" do
get :show, params: { sha256: sha256_hex }
expect(response.headers["X-Sendfile"]).to be_present
expect(response.headers["X-Sendfile"]).to eq(
Rails
.root
.join(
"tmp/blob_files_test/v1/b3/5d/a/b35dadbac6eafe0a1357253704bae78c3ee605e3a3061600f778f9678a5534b0",
)
.to_s,
)
end
context "with thumbnail request" do
it "generates thumbnail for valid size" do
get :show, params: { sha256: sha256_hex, thumb: "tiny" }

View File

@@ -33,7 +33,7 @@ RSpec.describe Domain::Inkbunny::PostsController, type: :controller do
expect(response.body).to include(posts[0].title)
expect(response.body).to include(posts[1].title)
expect(response.body).to include(user.name)
files.each { |file| expect(response.body).not_to include("<img") }
expect(response.body).not_to match(/<img.+blobs/)
end
end

View File

@@ -345,7 +345,7 @@ describe Domain::Fa::Job::BrowsePageJob do
context "and post file scanned" do
before do
post = find_post.call
create(:domain_post_file, post: post)
create(:domain_post_file, post:)
post.scanned_at = 1.hour.ago
post.save!
perform_now({})
@@ -357,6 +357,21 @@ describe Domain::Fa::Job::BrowsePageJob do
include_examples "enqueue user gallery scan", true
end
context "and post scanned but file is a terminal error state" do
before do
post = find_post.call
create(:domain_post_file, post:, state: "terminal_error")
post.scanned_at = 1.hour.ago
post.save!
perform_now({})
end
include_examples "enqueue post scan", true
include_examples "enqueue file scan", false
include_examples "enqueue user page scan", true
include_examples "enqueue user gallery scan", true
end
context "and user gallery already scanned" do
before do
creator = find_creator.call