incremental favs on user pages, user page enqueue job

This commit is contained in:
Dylan Knutson
2025-06-19 22:35:53 +00:00
parent 9d2bde629a
commit 7ee3b30180
19 changed files with 2857 additions and 69 deletions

View File

@@ -617,3 +617,8 @@ task find_post_files_with_empty_response: :environment do
pb.progress = [pb.progress + 1, pb.total].min
end
end
desc "Enqueue pending post file jobs"
task enqueue_pending_post_file_jobs: :environment do
EnqueueDuePostFileJobs.new.run
end

View File

@@ -369,10 +369,9 @@ class Domain::Fa::Job::Base < Scraper::JobBase
).returns(T.nilable(Domain::Fa::Parser::Page))
end
def update_user_from_user_page(user, response)
disabled_or_not_found = user_disabled_or_not_found?(user, response)
user.scanned_page_at = Time.current
user.last_user_page_log_entry = response.log_entry
return nil if disabled_or_not_found
return nil if user_disabled_or_not_found?(user, response)
page = Domain::Fa::Parser::Page.new(response.body)
return nil unless page.probably_user_page?
@@ -529,6 +528,7 @@ class Domain::Fa::Job::Base < Scraper::JobBase
T.let(
[
/User ".+" has voluntarily disabled access/,
/User .+ has voluntarily disabled access/,
/The page you are trying to reach is currently pending deletion/,
],
T::Array[Regexp],

View File

@@ -29,6 +29,11 @@ class Domain::Fa::Job::FavsJob < Domain::Fa::Job::Base
return
end
if user.state_account_disabled?
logger.warn(format_tags("user is disabled, skipping"))
return
end
faved_post_ids = T.let(Set.new, T::Set[Integer])
existing_faved_post_ids =
T.let(Set.new(user.user_post_favs.pluck(:post_id)), T::Set[Integer])

View File

@@ -137,6 +137,16 @@ class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::Base
end
def check_skip_favs_scan(user, user_page)
recent_faved_fa_ids = user_page.recent_fav_fa_ids
recent_faved_posts =
Domain::Post::FaPost.where(fa_id: recent_faved_fa_ids).to_a
# create missing posts in the favorites section
(recent_faved_fa_ids - recent_faved_posts.map(&:fa_id)).each do |fa_id|
post = Domain::Post::FaPost.find_or_create_by(fa_id:)
defer_job(Domain::Fa::Job::ScanPostJob, { post: })
recent_faved_posts << post
end
if recent_faved_fa_ids.empty?
logger.info(format_tags("skipping favs scan, 0 favorites"))
user.scanned_favs_at = Time.current
@@ -146,23 +156,67 @@ class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::Base
"skipping favs scan, #{recent_faved_fa_ids.count} favorites < threshold",
),
)
faved_posts =
recent_faved_fa_ids.map do |fa_id|
Domain::Post::FaPost.find_or_create_by(fa_id:)
end
user.user_post_favs.upsert_all(
faved_posts.map(&:id).compact.map { |post_id| { post_id: } },
recent_faved_posts.map(&:id).compact.map { |post_id| { post_id: } },
unique_by: %i[user_id post_id],
)
# Use reset_counters to update the counter cache after upserting favs
Domain::User.reset_counters(user.id, :user_post_favs)
user.scanned_favs_at = Time.current
elsif user.scanned_favs_at.present?
known_faved_post_fa_ids =
user
.faved_posts
.where(id: recent_faved_posts.map(&:id))
.map(&:fa_id)
.to_set
first_known_idx =
recent_faved_fa_ids.index do |fa_id|
known_faved_post_fa_ids.include?(fa_id)
end
if first_known_idx.nil?
logger.info("no known favs, full favs scan")
return
end
# are all the favs after the first known fav known?
any_unknown_after_first_known =
(recent_faved_fa_ids[first_known_idx..] || []).any? do |fa_id|
!known_faved_post_fa_ids.include?(fa_id)
end
if any_unknown_after_first_known
logger.info("unknown favs after first known fav, full favs scan")
return
end
unknown_fav_fa_ids =
(recent_faved_fa_ids[..first_known_idx] || []).filter do |fa_id|
!known_faved_post_fa_ids.include?(fa_id)
end
logger.info(
format_tags(
make_tag("reset user_post_favs counter cache for user", user.id),
),
"unknown favs are all at beginning, adding fa_ids to favs: #{unknown_fav_fa_ids.join(", ")}",
)
if unknown_fav_fa_ids.any?
unknown_faved_posts =
unknown_fav_fa_ids
.map do |fa_id|
recent_faved_posts.find { |post| post.fa_id == fa_id }
end
.compact
user.user_post_favs.upsert_all(
unknown_faved_posts.map(&:id).compact.map { |post_id| { post_id: } },
unique_by: %i[user_id post_id],
)
Domain::User.reset_counters(user.id, :user_post_favs)
end
user.scanned_favs_at = Time.current
end
end
@@ -247,7 +301,6 @@ class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::Base
end
def check_skip_followed_by_users_scan(user, user_page)
recent_watchers = user_page.recent_watchers
page_num_watched_by = user_page.num_watched_by
if recent_watchers.empty?
logger.info(format_tags("skipping followed by scan, 0 watched"))

View File

@@ -1,67 +1,45 @@
# typed: strict
class Domain::Fa::EnqueueDueUserFavsScans
class Domain::Fa::EnqueueDueUserFavsScans < EnqueueJobBase
extend T::Sig
include HasColorLogger
QUEUE_HIGH_WATER_MARK = 100
QUEUE_LOW_WATER_MARK = 50
DELAY_TIME = T.let(10.seconds, ActiveSupport::Duration)
class DateHelper
extend ActionView::Helpers::DateHelper
end
sig { void }
def self.run
sig { override.void }
def start_enqueuing
loop do
while (qs = queue_size) > QUEUE_LOW_WATER_MARK
logger.info(
"waiting for queue to drop to #{QUEUE_LOW_WATER_MARK} (currently #{qs})",
)
sleep DELAY_TIME.in_seconds
end
logger.info("queue size is #{queue_size}")
while (qs = queue_size) < QUEUE_LOW_WATER_MARK
to_enqueue = QUEUE_HIGH_WATER_MARK - qs
logger.info("queue is at #{qs}, enqueuing #{to_enqueue} due favs scans")
users =
Domain::User::FaUser
.where(state: "ok")
.order(Arel.sql "json_attributes->>'scanned_favs_at' asc nulls first")
.limit(to_enqueue)
.each do |user|
logger.tagged(make_arg_tag(user)) do
logger.info(
format_tags(
make_tag(
"scanned_favs_at",
time_ago_in_words(user.scanned_favs_at),
),
"enqueuing favs job",
),
)
Domain::Fa::Job::FavsJob.perform_later({ user: })
end
.limit(10)
users.each do |user|
enqueue do
logger.tagged(
make_arg_tag(user),
make_tag(
"scanned_favs_at",
time_ago_in_words(user.scanned_favs_at),
),
) do
logger.info("enqueuing favs job")
Domain::Fa::Job::FavsJob.perform_later({ user: })
end
end
end
logger.info("queue size is #{queue_size}")
logger.info("sleeping for #{DELAY_TIME.in_seconds} seconds")
sleep DELAY_TIME.in_seconds
if users.empty? ||
((sfa = users.first&.scanned_favs_at) && sfa > 14.days.ago)
break
end
end
end
sig { returns(Integer) }
def self.queue_size
sig { override.returns(Integer) }
def queue_size
GoodJob::Job.where(
job_class: "Domain::Fa::Job::FavsJob",
performed_at: nil,
error: nil,
).count
end
sig { params(time: T.nilable(ActiveSupport::TimeWithZone)).returns(String) }
def self.time_ago_in_words(time)
return "never" if time.nil?
"#{DateHelper.time_ago_in_words(time)} ago"
end
end

View File

@@ -0,0 +1,56 @@
# typed: strict
class Domain::Fa::EnqueueDueUserPageScans < EnqueueJobBase
extend T::Sig
include HasColorLogger
sig { override.void }
def start_enqueuing
processed_url_names = T.let(Set.new, T::Set[String])
loop do
users =
Domain::User::FaUser
.where(state: "ok")
.order(Arel.sql "json_attributes->>'scanned_page_at' asc nulls first")
.limit(150)
.to_a
if users.empty? ||
((sfa = users.first&.scanned_page_at) && sfa > 89.days.ago)
break
end
users =
users.reject do |user|
url_name = user.url_name
url_name && processed_url_names.include?(url_name)
end
users.each do |user|
enqueue do
logger.tagged(
make_arg_tag(user),
make_tag(
"scanned_page_at",
time_ago_in_words(user.scanned_page_at),
),
) do
logger.info("enqueuing page job")
Domain::Fa::Job::UserPageJob.perform_later({ user: })
end
end
end
processed_url_names.merge(users.map(&:url_name).compact)
end
end
sig { override.returns(Integer) }
def queue_size
GoodJob::Job.where(
job_class: "Domain::Fa::Job::UserPageJob",
performed_at: nil,
error: nil,
).count
end
end

View File

@@ -0,0 +1,31 @@
# typed: strict
class EnqueueDuePostFileJobs < EnqueueJobBase
extend T::Sig
include HasColorLogger
sig { override.void }
def start_enqueuing
Domain::PostFile
.where(state: "ok", last_status_code: nil, log_entry_id: nil)
.find_each(batch_size: 10) do |post_file|
enqueue do
logger.tagged(
make_arg_tag(post_file.post),
make_arg_tag(post_file),
) do
logger.info("enqueuing post file")
Job::PostFileJob.perform_later(post_file:)
end
end
end
end
sig { override.returns(Integer) }
def queue_size
GoodJob::Job.where(
job_class: "Job::PostFileJob",
performed_at: nil,
error: nil,
).count
end
end

View File

@@ -0,0 +1,79 @@
# typed: strict
class EnqueueJobBase
extend T::Sig
extend T::Helpers
include HasColorLogger
abstract!
class DateHelper
extend ActionView::Helpers::DateHelper
end
sig { params(perform_max: T.nilable(Integer)).void }
def initialize(perform_max: nil)
@total_performed = T.let(0, Integer)
@perform_max = T.let(perform_max, T.nilable(Integer))
@inferred_queue_size = T.let(0, Integer)
end
sig { returns(Integer) }
def high_water_mark
100
end
sig { returns(Integer) }
def low_water_mark
50
end
sig { returns(ActiveSupport::Duration) }
def delay_time
10.seconds
end
sig { void }
def run
@inferred_queue_size = queue_size
logger.info(
"initial queue size is #{@inferred_queue_size}, starting enqueuing",
)
start_enqueuing
end
sig { abstract.void }
def start_enqueuing
end
sig { params(block: T.proc.void).void }
def enqueue(&block)
# if we're under the high water mark, we can just enqueue and return
# so we get called again as soon as possible
if @inferred_queue_size < high_water_mark
# logger.info("queue is at #{@inferred_queue_size}, enqueuing")
else
# if we're over the high water mark, we need to wait for the queue to drop
# so we don't enqueue too many jobs at once
while @inferred_queue_size > low_water_mark
logger.info(
"waiting for queue to drop to #{low_water_mark} (currently #{@inferred_queue_size}, total performed #{@total_performed})",
)
sleep delay_time.in_seconds
@inferred_queue_size = queue_size
end
end
block.call
@inferred_queue_size += 1
@total_performed += 1
end
sig { abstract.returns(Integer) }
def queue_size
end
sig { params(time: T.nilable(ActiveSupport::TimeWithZone)).returns(String) }
def time_ago_in_words(time)
return "never" if time.nil?
"#{DateHelper.time_ago_in_words(time)} ago"
end
end

View File

@@ -1,10 +1,11 @@
# typed: true
# typed: strict
class Scraper::FaHttpClientConfig < Scraper::HttpClientConfig
DEFAULT_ALLOWED_DOMAINS = %w[*.furaffinity.net *.facdn.net ipinfo.io]
UUID_PATTERN =
/\A[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\z/i
OAID_PATTERN = /\A[0-9a-f]{32}\z/i
sig { override.returns(T.nilable(T::Array[T.untyped])) }
def cookies
a_cookie = GlobalState.get("furaffinity-cookie-a")
b_cookie = GlobalState.get("furaffinity-cookie-b")
@@ -29,21 +30,25 @@ class Scraper::FaHttpClientConfig < Scraper::HttpClientConfig
]
end
sig { override.returns(T::Array[[String, Numeric]]) }
def ratelimit
# number represents minimum delay in seconds between requests to the same domain
[["d.furaffinity.net", :none], ["*.facdn.net", :none], ["*", 1.5]]
[["d.furaffinity.net", 1.0], ["*.facdn.net", 1.0], ["*", 1.5]]
end
sig { override.returns(T::Array[String]) }
def allowed_domains
DEFAULT_ALLOWED_DOMAINS
end
sig { override.returns(Integer) }
def redirect_limit
4
end
private
sig { params(name: String, value: T.nilable(String), pattern: Regexp).void }
def validate_cookie!(name, value, pattern)
raise "#{name} cookie is not set" if value.nil? || value.empty?
raise "#{name} cookie has invalid format" unless value.match?(pattern)

View File

@@ -29,6 +29,12 @@ class Domain::PostFile < ReduxApplicationRecord
attr_json :last_status_code, :integer
attr_json :retry_count, :integer
scope :for_post_type,
->(post_klass) do
post_klass = T.cast(post_klass, T.class_of(Domain::Post))
joins(:post).where(post: { type: post_klass.name })
end
enum :state,
{
pending: "pending",

View File

@@ -295,6 +295,11 @@ namespace :fa do
desc "Enqueue pending favs jobs"
task enqueue_pending_favs: :environment do
Domain::Fa::EnqueueDueUserFavsScans.run
Domain::Fa::EnqueueDueUserFavsScans.new.run
end
desc "Enqueue pending page jobs"
task enqueue_pending_user_pages: :environment do
Domain::Fa::EnqueueDueUserPageScans.new.run
end
end

View File

@@ -597,6 +597,9 @@ class Domain::PostFile
sig { params(association: Symbol).returns(T::Array[T.untyped]) }
def extract_associated(association); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def for_post_type(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def from(*args, &blk); end
@@ -1626,6 +1629,9 @@ class Domain::PostFile
sig { params(association: Symbol).returns(T::Array[T.untyped]) }
def extract_associated(association); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def for_post_type(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def from(*args, &blk); end

View File

@@ -580,6 +580,9 @@ class Domain::PostFile::InkbunnyPostFile
sig { params(association: Symbol).returns(T::Array[T.untyped]) }
def extract_associated(association); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def for_post_type(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateAssociationRelation) }
def from(*args, &blk); end
@@ -2075,6 +2078,9 @@ class Domain::PostFile::InkbunnyPostFile
sig { params(association: Symbol).returns(T::Array[T.untyped]) }
def extract_associated(association); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def for_post_type(*args, &blk); end
sig { params(args: T.untyped, blk: T.untyped).returns(PrivateRelation) }
def from(*args, &blk); end

View File

@@ -0,0 +1,16 @@
# typed: true
# DO NOT EDIT MANUALLY
# This is an autogenerated file for dynamic methods in `EnqeueDuePostFileJobs`.
# Please instead update this file by running `bin/tapioca dsl EnqeueDuePostFileJobs`.
class EnqeueDuePostFileJobs
sig { returns(ColorLogger) }
def logger; end
class << self
sig { returns(ColorLogger) }
def logger; end
end
end

16
sorbet/rbi/dsl/enqueue_job_base.rbi generated Normal file
View File

@@ -0,0 +1,16 @@
# typed: true
# DO NOT EDIT MANUALLY
# This is an autogenerated file for dynamic methods in `EnqueueJobBase`.
# Please instead update this file by running `bin/tapioca dsl EnqueueJobBase`.
class EnqueueJobBase
sig { returns(ColorLogger) }
def logger; end
class << self
sig { returns(ColorLogger) }
def logger; end
end
end

View File

@@ -12,6 +12,28 @@ describe Domain::Fa::Job::UserPageJob do
)
end
context "the user is disabled" do
let(:client_mock_config) do
[
{
uri: "https://www.furaffinity.net/user/christianallree/",
status_code: 200,
content_type: "text/html",
contents:
SpecUtil.read_fixture_file(
"domain/fa/job/user_page_christianallree_account_disabled.html",
),
},
]
end
it "marks the user as disabled" do
perform_now({ url_name: "christianallree" })
user = Domain::User::FaUser.find_by(url_name: "christianallree")
expect(user.state).to eq("account_disabled")
end
end
context "scanning a normal user" do
let(:client_mock_config) do
[
@@ -775,6 +797,184 @@ describe Domain::Fa::Job::UserPageJob do
end
end
context "more favorites than fits in the recent faved section" do
let(:client_mock_config) do
[
{
uri: "https://www.furaffinity.net/user/dilgear/",
status_code: 200,
content_type: "text/html",
contents:
SpecUtil.read_fixture_file(
"domain/fa/user_page/user_page_dilgear_many_recent_favorites.html",
),
},
]
end
context "user has not had a favs scan in the past" do
it "enqueues a favs job" do
perform_now({ url_name: "dilgear" })
user = Domain::User::FaUser.find_by(url_name: "dilgear")
expect(user.scanned_favs_at).to be_nil
expect(user.user_post_favs.count).to eq(0)
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::FavsJob)).to match(
[hash_including(user:, caused_by_entry: @log_entries[0])],
)
end
end
context "the user has had a favs scan in the past" do
let(:scanned_favs_at) { 1.year.ago }
let(:user) do
create(:domain_user_fa_user, url_name: "dilgear", scanned_favs_at:)
end
let(:faved_post_fa_ids) do
[
# newer favs
49_881,
72_900,
60_088,
60_086,
58_000,
57_298,
57_282,
56_811,
55_187,
54_431,
58_488,
31_830,
31_425,
31_413,
31_409,
29_859,
28_961,
26_304,
24_977,
24_451,
# older favs
]
end
shared_examples "does not enqueue a favs job" do
it "does not enqueue a favs job" do
perform_now({ user: })
user.reload
expect(
SpecUtil.enqueued_job_args(Domain::Fa::Job::FavsJob),
).to be_empty
end
end
shared_examples "enqueues a favs job" do
it "enqueues a favs job" do
perform_now({ user: })
expect(SpecUtil.enqueued_job_args(Domain::Fa::Job::FavsJob)).to match(
[hash_including(user:, caused_by_entry: @log_entries[0])],
)
end
end
shared_examples "marks scanned_favs_at as recent" do
it "marks scanned_favs_at as recent" do
expect do
perform_now({ user: })
user.reload
end.to change { user.scanned_favs_at }.to be_within(3.seconds).of(
Time.current,
)
end
end
shared_examples "does not mark scanned_favs_at as recent" do
it "does not mark scanned_favs_at as recent" do
expect {
perform_now({ user: })
user.reload
}.to_not change { user.scanned_favs_at }
end
end
shared_examples "does not change the user's favorites" do
it "does not change the user's favorites" do
expect {
perform_now({ user: })
user.reload
}.to_not change { user.faved_posts.map(&:fa_id).sort }
end
end
context "and the user has no known favorites" do
include_examples "enqueues a favs job"
include_examples "does not mark scanned_favs_at as recent"
include_examples "does not change the user's favorites"
end
context "all user's recent favorites are known" do
before do
faved_post_fa_ids.each do |fa_post_id|
post = create(:domain_post_fa_post, fa_id: fa_post_id)
user.user_post_favs.create!(post_id: post.id)
end
end
include_examples "does not enqueue a favs job"
include_examples "marks scanned_favs_at as recent"
include_examples "does not change the user's favorites"
end
context "all but the last favorite are known" do
before do
faved_post_fa_ids[0..-2].each do |fa_post_id|
post = create(:domain_post_fa_post, fa_id: fa_post_id)
user.user_post_favs.create!(post_id: post.id)
end
end
include_examples "enqueues a favs job"
include_examples "does not mark scanned_favs_at as recent"
include_examples "does not change the user's favorites"
end
context "favorites in the middle are unknown" do
before do
(faved_post_fa_ids[..5] + faved_post_fa_ids[8..]).each do |fa_post_id|
post = create(:domain_post_fa_post, fa_id: fa_post_id)
user.user_post_favs.create!(post_id: post.id)
end
end
include_examples "enqueues a favs job"
include_examples "does not mark scanned_favs_at as recent"
include_examples "does not change the user's favorites"
end
context "favorites at the start are unknown" do
before do
faved_post_fa_ids[5..].each do |fa_post_id|
post = create(:domain_post_fa_post, fa_id: fa_post_id)
user.user_post_favs.create!(post_id: post.id)
end
end
include_examples "does not enqueue a favs job"
include_examples "marks scanned_favs_at as recent"
it "adds the new favorites to the user's favorites" do
expect { perform_now({ user: }) }.to change {
user.reload
user.faved_posts.count
}.by(5)
expect(user.faved_posts.count).to eq(faved_post_fa_ids.count)
expect(user.faved_posts.map(&:fa_id)).to contain_exactly(
*faved_post_fa_ids,
)
end
end
end
end
context "with a user with buggy favcount" do
let(:client_mock_config) do
[

View File

@@ -104,13 +104,4 @@ RSpec.describe Scraper::FaHttpClientConfig do
expect(config.redirect_limit).to eq(4)
end
end
describe "#ratelimit" do
it "returns the configured rate limits" do
config = described_class.new
expect(config.ratelimit).to eq(
[["d.furaffinity.net", :none], ["*.facdn.net", :none], ["*", 1.5]],
)
end
end
end

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long