remove unused UserIncrementalJob
This commit is contained in:
@@ -1,223 +0,0 @@
|
||||
# typed: strict
|
||||
module Domain::Fa::Job
|
||||
class UserIncrementalJob < Base
|
||||
queue_as :fa_user_page
|
||||
|
||||
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
|
||||
def perform(args)
|
||||
user = user_from_args!
|
||||
logger.push_tags(make_arg_tag(user))
|
||||
return if buggy_user?(user)
|
||||
|
||||
# this is similar to a user page job, and will update the user page
|
||||
# however, it will incrementally update user favs & follows / following:
|
||||
# - favs: look at the 'favorites' section and add new favs
|
||||
# checking that we can see at least one already recorded fav.
|
||||
# if not, enqueue a full favs scan job.
|
||||
# - follows / following: look at the 'watchers' / 'watching' section,
|
||||
# and add new follows.
|
||||
|
||||
if !user.incremental_scan.due? && !force_scan?
|
||||
logger.warn(
|
||||
format_tags(
|
||||
make_tag("incremental scanned", user.incremental_scan.ago_in_words),
|
||||
"scanned recently, skipping",
|
||||
),
|
||||
)
|
||||
return
|
||||
end
|
||||
|
||||
response =
|
||||
http_client.get("https://www.furaffinity.net/user/#{user.url_name}/")
|
||||
logger.tagged(make_arg_tag(response.log_entry)) do
|
||||
page = update_user_from_user_page(user, response)
|
||||
if page
|
||||
check_favs(user, page.user_page.recent_fav_fa_ids)
|
||||
check_watchers(user, page.user_page.recent_watchers)
|
||||
check_watching(user, page.user_page.recent_watching)
|
||||
end
|
||||
user.scanned_incremental_at = Time.current
|
||||
logger.info(format_tags("completed page scan"))
|
||||
end
|
||||
ensure
|
||||
user.save! if user
|
||||
|
||||
if response && response.status_code == 200 && user.present?
|
||||
suppress_deferred_job(Domain::Fa::Job::UserIncrementalJob, { user: })
|
||||
enqueue_jobs_from_found_links(response.log_entry)
|
||||
end
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
user: Domain::User::FaUser,
|
||||
recent_fav_fa_ids: T::Array[Integer],
|
||||
).void
|
||||
end
|
||||
def check_favs(user, recent_fav_fa_ids)
|
||||
recent_fav_posts = find_or_create_posts_by_fa_ids(recent_fav_fa_ids)
|
||||
recent_fav_post_ids = recent_fav_posts.map(&:id)
|
||||
|
||||
existing_fav_post_ids =
|
||||
user.user_post_favs.where(post_id: recent_fav_post_ids).pluck(:post_id)
|
||||
|
||||
missing_fav_post_ids = recent_fav_post_ids - existing_fav_post_ids
|
||||
if missing_fav_post_ids.empty?
|
||||
logger.info("no new favs for user")
|
||||
user.scanned_favs_at = Time.current
|
||||
return
|
||||
end
|
||||
|
||||
if missing_fav_post_ids.any?
|
||||
logger.info(
|
||||
format_tags(make_tag("add favs", missing_fav_post_ids.size)),
|
||||
)
|
||||
Domain::UserPostFav.insert_all!(
|
||||
missing_fav_post_ids.map do |post_id|
|
||||
{ user_id: user.id, post_id: post_id }
|
||||
end,
|
||||
)
|
||||
|
||||
# Use reset_counters to update the counter cache after using insert_all
|
||||
Domain::User.reset_counters(user.id, :user_post_favs)
|
||||
logger.info(
|
||||
format_tags(
|
||||
make_tag("reset user_post_favs counter cache for user", user.id),
|
||||
),
|
||||
)
|
||||
end
|
||||
|
||||
if missing_fav_post_ids.include? recent_fav_post_ids.last
|
||||
logger.info(format_tags("last fav is new, enqueue full favs scan"))
|
||||
defer_job(Domain::Fa::Job::FavsJob, { user: user })
|
||||
else
|
||||
user.scanned_favs_at = Time.current
|
||||
end
|
||||
end
|
||||
|
||||
# who watches this user - does not update scanned_follows_at timestamp
|
||||
# nor enqueue a full follows scan job
|
||||
# TODO - may be useful to have a separate 'scan full followed by' job
|
||||
# to handle users who are watched by a large number of others
|
||||
sig do
|
||||
params(
|
||||
user: Domain::User::FaUser,
|
||||
recent_watched_by:
|
||||
T::Array[Domain::Fa::Parser::UserPageHelper::RecentUser],
|
||||
).void
|
||||
end
|
||||
def check_watchers(user, recent_watched_by)
|
||||
recent_watched_by_ids =
|
||||
find_or_create_users_by_recent_users(recent_watched_by).map do |m|
|
||||
T.must(m.id)
|
||||
end
|
||||
|
||||
known_watcher_ids =
|
||||
T.cast(
|
||||
user
|
||||
.user_user_follows_to
|
||||
.where(from_id: recent_watched_by_ids)
|
||||
.pluck(:from_id),
|
||||
T::Array[Integer],
|
||||
)
|
||||
|
||||
missing_watcher_ids = recent_watched_by_ids - known_watcher_ids
|
||||
if missing_watcher_ids.empty?
|
||||
logger.info("no new watchers")
|
||||
return
|
||||
end
|
||||
|
||||
num_missing = missing_watcher_ids.size
|
||||
Domain::UserUserFollow.insert_all!(
|
||||
missing_watcher_ids.map do |watcher_id|
|
||||
{ from_id: watcher_id, to_id: user.id }
|
||||
end,
|
||||
)
|
||||
|
||||
# Use reset_counters to update follows_to for the current user
|
||||
Domain::User.reset_counters(user.id, :user_user_follows_to)
|
||||
logger.info(
|
||||
format_tags(
|
||||
make_tag(
|
||||
"reset user_user_follows_to counter cache for user",
|
||||
user.id,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Update follows_from counts for watcher users
|
||||
missing_watcher_ids.each_slice(100) do |batch_ids|
|
||||
batch_ids.each do |watcher_id|
|
||||
Domain::User.reset_counters(watcher_id, :user_user_follows_from)
|
||||
end
|
||||
end
|
||||
|
||||
logger.info("added #{num_missing.to_s.bold} new watchers")
|
||||
end
|
||||
|
||||
sig do
|
||||
params(
|
||||
user: Domain::User::FaUser,
|
||||
recent_watched:
|
||||
T::Array[Domain::Fa::Parser::UserPageHelper::RecentUser],
|
||||
).void
|
||||
end
|
||||
def check_watching(user, recent_watched)
|
||||
recent_watched_users =
|
||||
find_or_create_users_by_recent_users(recent_watched)
|
||||
raise("invariant") unless recent_watched_users.size == recent_watched.size
|
||||
recent_watched_user_ids = recent_watched_users.map { |m| T.must(m.id) }
|
||||
known_watched_user_ids =
|
||||
user.followed_users.where(id: recent_watched_user_ids).pluck(:id)
|
||||
missing_watched_users =
|
||||
recent_watched_users.reject do |u|
|
||||
known_watched_user_ids.include?(u.id)
|
||||
end
|
||||
missing_watched_user_ids = missing_watched_users.map { |m| T.must(m.id) }
|
||||
|
||||
if missing_watched_user_ids.empty?
|
||||
logger.info("no new users watched")
|
||||
user.scanned_follows_at = Time.current
|
||||
return
|
||||
end
|
||||
|
||||
num_missing = missing_watched_user_ids.size
|
||||
Domain::UserUserFollow.insert_all!(
|
||||
missing_watched_user_ids.map do |watched_user_id|
|
||||
{ from_id: user.id, to_id: watched_user_id }
|
||||
end,
|
||||
)
|
||||
|
||||
# Use reset_counters to update follows_from for the current user
|
||||
Domain::User.reset_counters(user.id, :user_user_follows_from)
|
||||
logger.info(
|
||||
format_tags(
|
||||
make_tag(
|
||||
"reset user_user_follows_from counter cache for user",
|
||||
user.id,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Update follows_to counts for users who were followed
|
||||
missing_watched_user_ids.each_slice(100) do |batch_ids|
|
||||
batch_ids.each do |watched_user_id|
|
||||
Domain::User.reset_counters(watched_user_id, :user_user_follows_to)
|
||||
end
|
||||
end
|
||||
|
||||
logger.info("added #{num_missing.to_s.bold} new users watched")
|
||||
|
||||
last_watched_user = recent_watched_users.last
|
||||
if last_watched_user && missing_watched_users.include?(last_watched_user)
|
||||
logger.info("last user watched is new, enqueue full follows scan")
|
||||
defer_job(Domain::Fa::Job::UserFollowsJob, { user: })
|
||||
else
|
||||
logger.info(
|
||||
"last user watched was known, no need for full follows scan",
|
||||
)
|
||||
user.scanned_follows_at = Time.current
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -44,12 +44,6 @@ class Domain::Fa::UserEnqueuer
|
||||
rows.each do |user|
|
||||
types = []
|
||||
if user.state == "ok"
|
||||
if user.favs_scan.due? || user.page_scan.due? ||
|
||||
user.follows_scan.due?
|
||||
Domain::Fa::Job::UserIncrementalJob.perform_later({ user: user })
|
||||
types << "incremental"
|
||||
end
|
||||
|
||||
if user.favs_scan.due?
|
||||
Domain::Fa::Job::FavsJob.perform_later({ user: user })
|
||||
types << "favs"
|
||||
|
||||
16
sorbet/rbi/dsl/domain/fa/enqueue_due_user_page_scans.rbi
generated
Normal file
16
sorbet/rbi/dsl/domain/fa/enqueue_due_user_page_scans.rbi
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
# typed: true
|
||||
|
||||
# DO NOT EDIT MANUALLY
|
||||
# This is an autogenerated file for dynamic methods in `Domain::Fa::EnqueueDueUserPageScans`.
|
||||
# Please instead update this file by running `bin/tapioca dsl Domain::Fa::EnqueueDueUserPageScans`.
|
||||
|
||||
|
||||
class Domain::Fa::EnqueueDueUserPageScans
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
|
||||
class << self
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
end
|
||||
end
|
||||
@@ -1,27 +0,0 @@
|
||||
# typed: true
|
||||
|
||||
# DO NOT EDIT MANUALLY
|
||||
# This is an autogenerated file for dynamic methods in `Domain::Fa::Job::UserIncrementalJob`.
|
||||
# Please instead update this file by running `bin/tapioca dsl Domain::Fa::Job::UserIncrementalJob`.
|
||||
|
||||
|
||||
class Domain::Fa::Job::UserIncrementalJob
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
|
||||
class << self
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
|
||||
sig do
|
||||
params(
|
||||
args: T::Hash[::Symbol, T.untyped],
|
||||
block: T.nilable(T.proc.params(job: Domain::Fa::Job::UserIncrementalJob).void)
|
||||
).returns(T.any(Domain::Fa::Job::UserIncrementalJob, FalseClass))
|
||||
end
|
||||
def perform_later(args, &block); end
|
||||
|
||||
sig { params(args: T::Hash[::Symbol, T.untyped]).returns(T.untyped) }
|
||||
def perform_now(args); end
|
||||
end
|
||||
end
|
||||
@@ -1,11 +1,11 @@
|
||||
# typed: true
|
||||
|
||||
# DO NOT EDIT MANUALLY
|
||||
# This is an autogenerated file for dynamic methods in `EnqeueDuePostFileJobs`.
|
||||
# Please instead update this file by running `bin/tapioca dsl EnqeueDuePostFileJobs`.
|
||||
# This is an autogenerated file for dynamic methods in `EnqueueDuePostFileJobs`.
|
||||
# Please instead update this file by running `bin/tapioca dsl EnqueueDuePostFileJobs`.
|
||||
|
||||
|
||||
class EnqeueDuePostFileJobs
|
||||
class EnqueueDuePostFileJobs
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
# typed: false
|
||||
require "rails_helper"
|
||||
|
||||
describe Domain::Fa::Job::UserIncrementalJob do
|
||||
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
|
||||
before do
|
||||
Scraper::ClientFactory.http_client_mock = http_client_mock
|
||||
@log_entries =
|
||||
HttpClientMockHelpers.init_http_client_mock(
|
||||
http_client_mock,
|
||||
client_mock_config,
|
||||
)
|
||||
end
|
||||
|
||||
let!(:meesh) do
|
||||
Domain::User::FaUser.create!(name: "meesh", url_name: "meesh")
|
||||
end
|
||||
|
||||
context "scanning a normal user" do
|
||||
let(:client_mock_config) do
|
||||
[
|
||||
{
|
||||
uri: "https://www.furaffinity.net/user/meesh/",
|
||||
status_code: 200,
|
||||
content_type: "text/html",
|
||||
contents:
|
||||
SpecUtil.read_fixture_file("domain/fa/job/user_page_meesh.html"),
|
||||
},
|
||||
]
|
||||
end
|
||||
|
||||
it "updates the user model" do
|
||||
perform_now({ user: meesh })
|
||||
meesh.reload
|
||||
expect(meesh.name).to eq("Meesh")
|
||||
end
|
||||
|
||||
it "enqueues avatar jobs" do
|
||||
perform_now({ user: meesh })
|
||||
expect(
|
||||
SpecUtil
|
||||
.enqueued_job_args(Domain::Fa::Job::UserAvatarJob)
|
||||
.map { |args| args[:avatar] },
|
||||
).to eq([meesh.avatar])
|
||||
end
|
||||
|
||||
it "enqueues page scan jobs" do
|
||||
perform_now({ user: meesh })
|
||||
# 12 new watchers, 12 new watched
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to be(
|
||||
24,
|
||||
)
|
||||
end
|
||||
|
||||
it "enqueues follows scan jobs" do
|
||||
perform_now({ user: meesh })
|
||||
# new watch in last position, so enqueue scan
|
||||
expect(
|
||||
SpecUtil
|
||||
.enqueued_job_args(Domain::Fa::Job::UserFollowsJob)
|
||||
.map { |args| args[:user] },
|
||||
).to eq([meesh])
|
||||
expect(meesh.scanned_follows_at).to be_nil
|
||||
end
|
||||
|
||||
it "enqueues post scan jobs" do
|
||||
perform_now({ user: meesh })
|
||||
# 20 newly seen faved posts
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to be(
|
||||
20,
|
||||
)
|
||||
end
|
||||
|
||||
it "enqueues favs scan jobs" do
|
||||
# new fav in last position, so should enqueue scan
|
||||
perform_now({ user: meesh })
|
||||
expect(
|
||||
SpecUtil
|
||||
.enqueued_job_args(Domain::Fa::Job::FavsJob)
|
||||
.map { |args| args[:user] },
|
||||
).to eq([meesh])
|
||||
expect(meesh.scanned_favs_at).to be_nil
|
||||
end
|
||||
|
||||
it "incrementally adds new watchers and favs" do
|
||||
celeste =
|
||||
Domain::User::FaUser.create!(name: "Celeste~", url_name: "celeste~")
|
||||
post_51594821 = Domain::Post::FaPost.create!(fa_id: 51_594_821)
|
||||
|
||||
meesh.scanned_favs_at = 1.day.ago
|
||||
meesh.scanned_follows_at = 1.day.ago
|
||||
meesh.followed_users << celeste
|
||||
meesh.faved_posts << post_51594821
|
||||
meesh.save!
|
||||
|
||||
perform_now({ url_name: "meesh" })
|
||||
meesh.reload
|
||||
|
||||
# 12 new watchers, 11 new watched
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).length).to be(
|
||||
23,
|
||||
)
|
||||
# No new watch in last position, can skip scan
|
||||
expect(
|
||||
SpecUtil.enqueued_jobs(Domain::Fa::Job::UserFollowsJob),
|
||||
).to be_empty
|
||||
expect(meesh.scanned_follows_at).to be_within(3.seconds).of(Time.current)
|
||||
|
||||
# 19 newly seen faved posts
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::ScanPostJob).length).to be(
|
||||
19,
|
||||
)
|
||||
# No new fav in last position, so don't enqueue scan
|
||||
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::FavsJob)).to be_empty
|
||||
expect(meesh.scanned_favs_at).to be_within(3.seconds).of(Time.current)
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user