add fa user follows job

This commit is contained in:
Dylan Knutson
2023-03-31 15:12:16 +09:00
parent c63f1dffcb
commit c587aabbbe
20 changed files with 3718 additions and 29 deletions

View File

@@ -40,18 +40,17 @@ task :good_job do
env_hash = {
"RAILS_ENV" => "worker",
"GOOD_JOB_POLL_INTERVAL" => "2",
"GOOD_JOB_POLL_INTERVAL" => "5",
"GOOD_JOB_MAX_CACHE" => "10000",
"GOOD_JOB_QUEUE_SELECT_LIMIT" => "4096",
"GOOD_JOB_MAX_THREADS" => "4",
"GOOD_JOB_QUEUES" => [
"manual:4",
"static_file:4",
"+static_file,fa_post:2",
"fa_post:2",
"+fa_user_page,fa_user_gallery:2",
"static_file:3",
"+static_file,fa_post:1",
"+fa_user_page,fa_user_gallery,fa_post:2",
"fa_user_follows:1",
proxy == "serverhost-1" ? nil : "twitter_timeline_tweets:1",
"-static_file,fa_post,manual,fa_user_page,fa_user_gallery,twitter_timeline_tweets:2",
].reject(&:nil?).join(";"),
}

View File

@@ -7,13 +7,22 @@ class Domain::Fa::Job::FaJobBase < Scraper::JobBase
protected
def find_or_intitialize_user_from_args(args, caused_by_entry: nil)
def find_or_create_user_from_args(...)
user = find_or_build_user_from_args(...)
if user.new_record?
user.save!
end
user
end
def find_or_build_user_from_args(args, caused_by_entry: nil)
args[:user] || begin
url_name = args[:url_name]&.downcase
fatal_error("arg 'url_name' is required if arg 'user' is nil") if url_name.blank?
Domain::Fa::User.find_or_create_by(url_name: url_name) do |user|
fatal_error("arg 'url_name' is required if arg 'user' is nil") if args[:url_name].blank?
url_name = Domain::Fa::User.name_to_url_name(args[:url_name])
Domain::Fa::User.find_or_initialize_by(url_name: url_name) do |user|
user.state_detail ||= {}
user.state_detail["first_seen_entry"] = caused_by_entry.id if caused_by_entry
user.name ||= url_name
end
end
end

View File

@@ -0,0 +1,167 @@
# Gather and record all the follows for a user
# This will be used to create an index of follower -> followed
# of a specific user, for recommender training
class Domain::Fa::Job::UserFollowsJob < Domain::Fa::Job::FaJobBase
queue_as :fa_user_follows
ignore_signature_args :caused_by_entry
def perform(args)
@caused_by_entry = args[:caused_by_entry]
@first_job_entry = nil
@force_scan = !!args[:force_scan]
@user = find_or_build_user_from_args(args, caused_by_entry: best_caused_by_entry)
is_new_record = @user.new_record?
@user.save!
logger.prefix = "[#{(@user.url_name || @user.name).bold} / #{@user.state.bold}]"
if !@user.due_for_follows_scan? && !@force_scan
logger.warn("scanned #{time_ago_in_words(@user.scanned_follows_at)}, skipping")
return
end
@page_number = 1
@total_follows_seen = 0
@last_in_user_list = nil
@scanned_followed_ids = Set.new
while true
break if scan_follows_page == :break
# bail out at 100,000 users
break if @page_number > 500
@page_number += 1
end
to_add = nil
to_remove = nil
duration, _ignore = measure do
existing_followed_ids = Set.new(@user.follows.pluck(:followed_id))
to_remove = existing_followed_ids - @scanned_followed_ids
to_add = @scanned_followed_ids - existing_followed_ids
end
duration_ms = (duration * 1000).to_i
logger.info("add #{to_add.size.to_s.bold} follows, " +
"remove #{to_remove.size.to_s.bold} follows " +
"(took #{duration_ms.to_s.bold} ms)")
duration, _ignore = measure do
ReduxApplicationRecord.transaction do
@user.follows.where(followed_id: to_remove).delete_all
@user.follows.insert_all!(to_add.map do |id|
{ followed_id: id }
end) unless to_add.empty?
@user.scanned_follows_at = Time.now
@user.save!
end
end
if is_new_record
logger.info("user was new record, enqueue page scan job")
Domain::Fa::Job::UserPageJob.perform_later({
user: @user,
caused_by_entry: best_caused_by_entry,
})
end
duration_ms = (1000 * duration).to_i
logger.info(
"bulk set follows list to #{@user.follows.count.to_s.bold} users " +
"(took #{duration_ms.to_s.bold} ms)"
)
end
private
def scan_follows_page
ret = nil
url = if @page_number > 1
"https://www.furaffinity.net/watchlist/by/#{@user.url_name}/#{@page_number}/?"
else
"https://www.furaffinity.net/watchlist/by/#{@user.url_name}/"
end
response = http_client.get(url, caused_by_entry: best_caused_by_entry)
@first_job_entry ||= response.log_entry
if response.status_code != 200
fatal_error(
"http #{response.status_code.to_s.red.bold}, " +
"log entry #{response.log_entry.id.to_s.bold}"
)
end
page = Domain::Fa::Parser::Page.new(response.body, require_logged_in: false)
user_list = page.user_list.uniq
if user_list.empty?
logger.info("page #{@page_number.to_s.bold} has no users, break")
return :break
end
if user_list.last.url_name == @last_in_user_list
logger.info("page #{@page_number.to_s.bold} saw same user as last page, break")
return :break
end
# the last page will have < 200 users, we know we're at the end
ret = :break if user_list.length < 190
@last_in_user_list = user_list.last.url_name
@total_follows_seen += user_list.length
users_to_create = []
duration, followed_user_ids = measure do
existing_url_name_to_id = Domain::Fa::User.where(
url_name: user_list.map(&:url_name),
).pluck(:id, :url_name).map do |id, url_name|
[url_name, id]
end.to_h
users_to_create = user_list.reject do |user|
existing_url_name_to_id[user.url_name]
end.map do |user|
{
url_name: user.url_name,
name: user.name,
state_detail: { "first_seen_entry" => response.log_entry.id },
}
end
created_user_ids = Domain::Fa::User.upsert_all(
users_to_create,
unique_by: :url_name,
update_only: :url_name,
returning: %i[id url_name],
).map do |row|
row["id"]
end unless users_to_create.empty?
users_to_create.each do |user_hash|
Domain::Fa::Job::UserPageJob.perform_later(
url_name: user_hash[:url_name],
caused_by_entry: best_caused_by_entry,
)
end
(created_user_ids || []) + existing_url_name_to_id.values
end
duration_ms = (duration * 1000).to_i
logger.info(
"page #{@page_number.to_s.bold} - " +
"#{user_list.length.to_s.bold} users on page, " +
"created #{users_to_create.size.to_s.bold} " +
"(took #{duration_ms.to_s.bold} ms)"
)
followed_user_ids.each do |user_id|
@scanned_followed_ids.add(user_id)
end
ret
end
def best_caused_by_entry
@first_job_entry || @caused_by_entry
end
end

View File

@@ -7,7 +7,7 @@ class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::FaJobBase
def perform(args)
@force_scan = !!args[:force_scan]
@caused_by_entry = args[:caused_by_entry]
@user = find_or_intitialize_user_from_args(args, caused_by_entry: @caused_by_entry)
@user = find_or_create_user_from_args(args, caused_by_entry: @caused_by_entry)
logger.prefix = "[user #{(@user.url_name || @user.name).bold} / #{@user.state.bold}]"
if @user.state != "ok" && @user.scanned_gallery_at

View File

@@ -4,7 +4,7 @@ class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::FaJobBase
def perform(args)
@caused_by_entry = args[:caused_by_entry]
@user = find_or_intitialize_user_from_args(args, caused_by_entry: @caused_by_entry)
@user = find_or_create_user_from_args(args, caused_by_entry: @caused_by_entry)
@force_scan = !!args[:force_scan]
logger.prefix = "[#{(@user.url_name || @user.name).bold} / #{@user.state.bold}]"

View File

@@ -5,7 +5,7 @@ class Domain::Twitter::Job::TwitterJobBase < Scraper::JobBase
:get_twitter_http_client
end
def find_or_intitialize_user_from_args(args)
def find_or_create_user_from_args(args)
if args[:user]
args[:user]
elsif args[:tw_id].present?

View File

@@ -4,7 +4,7 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
def perform(args)
@name = args[:name]
@user = find_or_intitialize_user_from_args(args)
@user = find_or_create_user_from_args(args)
logger.prefix = proc { "[user: #{(@user.name || args[:name])&.bold}]" }
@proxy_name = Rails.application.config.x.proxy_name.to_s

View File

@@ -18,6 +18,13 @@ class Scraper::JobBase < ApplicationJob
@gallery_dl_client ||= Scraper::ClientFactory.get_gallery_dl_client
end
def measure
now = Time.now
ret = yield
dur = Time.now - now
[dur, ret]
end
good_job_control_concurrency_with(
total_limit: 1,
key: proc do

View File

@@ -145,6 +145,10 @@ class Domain::Fa::Parser::Page < Domain::Fa::Parser::Base
def submission
@submission ||= Domain::Fa::Parser::SubmissionParserHelper.new(@page, @phtml, @page_version)
end
def user_list
@user_list ||= Domain::Fa::Parser::UserListParserHelper.user_list(@page)
end
end
private

View File

@@ -0,0 +1,30 @@
class Domain::Fa::Parser::UserListParserHelper
User = Struct.new(
:name,
:url_name,
:href,
keyword_init: true,
)
def self.user_list(page)
page.css(".watch-list .watch-list-items").map do |elem|
watch_list_item_to_user_struct(elem)
end
end
private
def self.watch_list_item_to_user_struct(elem)
link = elem.css("a").first
href = link["href"]
raise unless href.starts_with?("/user/")
# strip off '/user/' prefix
url_name = href.split("/").reject(&:blank?).last
User.new(
name: link.text,
url_name: url_name,
href: href,
)
end
end

View File

@@ -18,6 +18,10 @@ class Domain::Fa::User < ReduxApplicationRecord
:scan_error, # user has been removed or otherwise, see state_detail
]
has_many :follows,
class_name: "::Domain::Fa::Follow",
foreign_key: :follower_id
validates_presence_of(:name, :url_name)
before_validation do
self.url_name ||= self.class.name_to_url_name(self.name) if self.name
@@ -29,13 +33,17 @@ class Domain::Fa::User < ReduxApplicationRecord
end
def due_for_page_scan?
scanned_page_at.nil? || scanned_page_at < 1.year.ago
scanned_page_at.nil? || scanned_page_at < 1.month.ago
end
def due_for_gallery_scan?
scanned_gallery_at.nil? || scanned_gallery_at < 1.year.ago
end
def due_for_follows_scan?
scanned_follows_at.nil? || scanned_follows_at < 1.month.ago
end
def self.find_or_build_from_legacy(legacy_user)
existing = find_by(url_name: legacy_user.url_name)
return existing if existing

View File

@@ -88,6 +88,12 @@ development:
dedipath-1: *testcookies
serverhost-1: *testcookies
production:
direct: *ddwhatnow
proxy-1: *vipvillageworker
dedipath-1: *blazeandwish
serverhost-1: *cottoniq
worker:
direct: *ddwhatnow
proxy-1: *vipvillageworker

View File

@@ -1,9 +1,8 @@
class CreateDomainFaFollows < ActiveRecord::Migration[7.0]
def change
create_table :domain_fa_follows do |t|
t.references :follower
t.references :followed
t.timestamps
t.references :follower, null: false
t.references :followed, null: false
end
add_foreign_key :domain_fa_follows, :domain_fa_users, column: :follower_id

View File

@@ -0,0 +1,7 @@
class AddDomainFaUsersScannedFollowsAt < ActiveRecord::Migration[7.0]
def change
change_table :domain_fa_users do |t|
t.datetime :scanned_follows_at
end
end
end

9
db/schema.rb generated
View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2023_03_30_134212) do
ActiveRecord::Schema[7.0].define(version: 2023_03_31_023807) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_stat_statements"
enable_extension "pg_trgm"
@@ -105,10 +105,8 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_30_134212) do
end
create_table "domain_fa_follows", force: :cascade do |t|
t.bigint "follower_id"
t.bigint "followed_id"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.bigint "follower_id", null: false
t.bigint "followed_id", null: false
t.index ["followed_id"], name: "index_domain_fa_follows_on_followed_id"
t.index ["follower_id"], name: "index_domain_fa_follows_on_follower_id"
end
@@ -168,6 +166,7 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_30_134212) do
t.jsonb "log_entry_detail"
t.integer "state"
t.jsonb "state_detail"
t.datetime "scanned_follows_at"
t.index ["name"], name: "index_domain_fa_users_on_name", unique: true
t.index ["url_name"], name: "index_domain_fa_users_on_url_name", unique: true
end

View File

@@ -126,7 +126,7 @@ describe Domain::Fa::Job::BrowsePageJob do
status_code: 200,
content_type: "text/html",
contents: SpecUtil.read_fixture_file("domain/fa/job/browse_page_no_submissions.html"),
caused_by_entry: nil,
caused_by_entry_idx: nil,
},
]
)

View File

@@ -0,0 +1,187 @@
require "rails_helper"
describe Domain::Fa::Job::UserFollowsJob do
let(:http_client_mock) { instance_double("::Scraper::HttpClient") }
let(:set_zzreg_http_mock) {
proc {
SpecUtil.init_http_client_mock(
http_client_mock, [
{
uri: "https://www.furaffinity.net/watchlist/by/zzreg/",
status_code: 200,
content_type: "text/html",
contents: SpecUtil.read_fixture_file("domain/fa/parser/redux/watchlist_zzreg.html"),
caused_by_entry_idx: nil,
},
{
uri: "https://www.furaffinity.net/watchlist/by/zzreg/2/?",
status_code: 200,
content_type: "text/html",
contents: SpecUtil.read_fixture_file("domain/fa/parser/redux/watchlist_zzreg.html"),
caused_by_entry_idx: 0,
},
]
)
}
}
before do
ActiveJob::Base.queue_adapter = :test
Scraper::ClientFactory.http_client_mock = http_client_mock
@zzreg_mock_log_entries = set_zzreg_http_mock.call
end
shared_context "zzreg user exists" do
let!(:user) { Domain::Fa::User.create!(name: "Zzreg", url_name: "zzreg") }
end
shared_examples "zzreg follow creation" do
it "creates the right follows" do
e = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
user.reload
expect(user.follows.length).to eq(770)
expect(user.scanned_follows_at).to_not be_nil
end
end
context "performed with a user that doesn't exist yet" do
it "creates the scanned user and followed users" do
expect do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
expect(ret).to_not be_a(Exception)
end.to change { Domain::Fa::User.count }.by(771)
end
it "enqueues a user page job" do
Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
zzreg = Domain::Fa::User.find_by(url_name: "zzreg")
expect(zzreg).to_not be_nil
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:user] == zzreg
end).to_not be_nil
end
end
context "when the scanned user already exists" do
include_context "zzreg user exists"
it "can be performed by url_name" do
expect do
Domain::Fa::Job::UserFollowsJob.perform_now({ url_name: "zzreg" })
end.to change { Domain::Fa::User.count }.by(770)
end
it "can be performed by direct post object" do
expect do
Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
end.to change { Domain::Fa::User.count }.by(770)
end
it "does not enqueue a user page job" do
Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:user] == user
end).to be_nil
end
it "can be ran twice to no ill effect" do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
set_zzreg_http_mock.call
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user, force_scan: true })
expect(ret).to_not be_a(Exception)
end
include_examples "zzreg follow creation"
end
context "some scanned users already exist" do
include_context "zzreg user exists"
let!(:followed) {
Domain::Fa::User.create!(
# name in html is Agi_Type01, it's intentionally changed here
name: "AGI_Type01",
url_name: "agitype01",
num_submissions: 10,
)
}
it "does not ovewrite the existing user" do
original_updated_at = followed.updated_at
expect do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
end.to change { Domain::Fa::User.count }.by(769)
followed.reload
expect(followed.num_submissions).to eq(10)
expect(followed.created_at).to eq(original_updated_at)
# the upsert should only create new user models, not modify
# existing ones, even for casing changes
expect(followed.name).to eq("AGI_Type01")
expect(followed.state_detail).to eq({})
# newly created users should have the right 'first_seen_entry' id
stripes_user = Domain::Fa::User.find_by url_name: "stripes"
expect(stripes_user).to_not be_nil
expect(stripes_user.state_detail["first_seen_entry"]).to eq(@zzreg_mock_log_entries[0].id)
end
it "newly inserted users have a name associated with them" do
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
user = Domain::Fa::User.find_by(url_name: "aimi")
expect(user).to_not be_nil
expect(user.name).to eq("Aimi")
end
include_examples "zzreg follow creation"
end
context "the set of followed users changes" do
include_context "zzreg user exists"
# this is a user that is not in the watchlist
let!(:smaz_user) { Domain::Fa::User.create!(name: "Smaz", url_name: "smaz") }
# this is a user already in the watchlist
let!(:agi_type01_user) {
Domain::Fa::User.create!(
name: "Agi_Type01",
url_name: "agitype01",
num_submissions: 10,
)
}
it "correctly adds and removes follows" do
follow_1 = Domain::Fa::Follow.create!(follower: user, followed: smaz_user)
follow_2 = Domain::Fa::Follow.create!(follower: user, followed: agi_type01_user)
expect(user.follows.length).to eq(2)
ret = Domain::Fa::Job::UserFollowsJob.perform_now({ user: user })
expect(ret).to_not be_a(Exception)
user.reload
expect(user.follows.length).to eq(770)
expect(user.follows.where(followed: smaz_user).first).to be_nil
expect(user.follows.where(followed: agi_type01_user).first).to eq(follow_2)
# correct user page jobs should be enqueued
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:user] == smaz_user
end).to be_nil
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:user] == agi_type01_user
end).to be_nil
# newly created users are enqueued by url name
expect(SpecUtil.enqueued_jobs(Domain::Fa::Job::UserPageJob).find do |job|
job[:args][0][:url_name] == "meesh"
end).to_not be_nil
end
end
end

File diff suppressed because it is too large Load Diff

View File

@@ -250,6 +250,17 @@ class Domain::Fa::Parser::ReduxPageTest < ActiveSupport::TestCase
assert_equal "//d.furaffinity.net/art/soulhunt3r/1433541729/1433541729.soulhunt3r_image.jpg", sub.full_res_img
end
def test_watchlist_zzreg
parser = get_parser("watchlist_zzreg.html", require_logged_in: false)
user_list = parser.user_list
assert_equal 770, user_list.length
assert_equal "-creeps", user_list[0].url_name
assert_equal "-creeps", user_list[0].name
assert_equal "~nicky~", user_list.last.url_name
assert_equal "~Nicky~", user_list.last.name
end
def get_parser(file, require_logged_in: true)
path = File.join("domain/fa/parser/redux", file)
contents = read_fixture_file(path) || raise("Couldn't open #{path}")

View File

@@ -1,5 +1,5 @@
cron: bundle exec rake good_job_cron
direct: bundle exec rake good_job proxy=direct
proxy-1: bundle exec rake good_job proxy=proxy-1
dedipath-1: bundle exec rake good_job proxy=dedipath-1
serverhost-1: bundle exec rake good_job proxy=serverhost-1
cron: RAILS_ENV=worker bundle exec rake good_job_cron
direct: RAILS_ENV=worker bundle exec rake good_job proxy=direct
proxy-1: RAILS_ENV=worker bundle exec rake good_job proxy=proxy-1
dedipath-1: RAILS_ENV=worker bundle exec rake good_job proxy=dedipath-1
serverhost-1: RAILS_ENV=worker bundle exec rake good_job proxy=serverhost-1