Files
redux-scraper/app/jobs/domain/bluesky/job/scan_user_follows_job.rb
Dylan Knutson 4d456ee73d Improve logging and add rake task for monitoring user follows
- Enhanced logging format in scan_user_follows_job and monitor tasks using format_tags
- Added new rake task 'bluesky:watch_follows' to monitor users that a given user follows
- Improved log formatting consistency across Bluesky monitoring components
2025-08-15 21:55:18 +00:00

267 lines
7.0 KiB
Ruby

# typed: strict
# frozen_string_literal: true
class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base
self.default_priority = -10
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
last_follows_scan = user.follows_scans.where(state: "completed").last
if (ca = last_follows_scan&.created_at) && (ca > 1.month.ago) &&
!force_scan?
logger.info(
format_tags(
"skipping user #{user.did} follows scan",
make_tags(
ago: time_ago_in_words(ca),
last_scan_id: last_follows_scan.id,
),
),
)
else
perform_scan_type(
user,
"follows",
bsky_method: "app.bsky.graph.getFollows",
bsky_field: "follows",
edge_name: :user_user_follows_from,
user_attr: :from_id,
other_attr: :to_id,
)
end
last_followed_by_scan =
user.followed_by_scans.where(state: "completed").last
if (ca = last_followed_by_scan&.created_at) && (ca > 1.month.ago) &&
!force_scan?
logger.info(
format_tags(
"skipping user #{user.did} followed by scan",
make_tags(
ago: time_ago_in_words(ca),
last_scan_id: last_followed_by_scan.id,
),
),
)
else
perform_scan_type(
user,
"followed_by",
bsky_method: "app.bsky.graph.getFollowers",
bsky_field: "followers",
edge_name: :user_user_follows_to,
user_attr: :to_id,
other_attr: :from_id,
)
end
end
private
sig do
params(
user: Domain::User::BlueskyUser,
kind: String,
bsky_method: String,
bsky_field: String,
edge_name: Symbol,
user_attr: Symbol,
other_attr: Symbol,
).void
end
def perform_scan_type(
user,
kind,
bsky_method:,
bsky_field:,
edge_name:,
user_attr:,
other_attr:
)
scan = Domain::UserJobEvent::FollowScan.create!(user:, kind:)
cursor = T.let(nil, T.nilable(String))
page = 0
subjects_data = T.let([], T::Array[Bluesky::Graph::Subject])
loop do
# get followers
xrpc_url =
"https://public.api.bsky.app/xrpc/#{bsky_method}?actor=#{user.did!}&limit=100"
xrpc_url = "#{xrpc_url}&cursor=#{cursor}" if cursor
response = http_client.get(xrpc_url)
scan.update!(log_entry: response.log_entry) if page == 0
page += 1
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user #{kind}",
make_tags(status_code: response.status_code),
),
)
end
data = JSON.parse(response.body)
if data["error"]
fatal_error(
format_tags(
"failed to get user #{kind}",
make_tags(error: data["error"]),
),
)
end
subjects_data.concat(
data[bsky_field].map do |subject_data|
Bluesky::Graph::Subject.from_json(subject_data)
end,
)
cursor = data["cursor"]
break if cursor.nil?
end
handle_subjects_data(
user,
subjects_data,
scan,
edge_name:,
user_attr:,
other_attr:,
)
scan.update!(state: "completed", completed_at: Time.current)
logger.info(
format_tags(
"completed user #{kind} scan",
make_tags(num_subjects: subjects_data.size),
),
)
rescue => e
scan.update!(state: "error", completed_at: Time.current) if scan
raise e
end
sig do
params(
user: Domain::User::BlueskyUser,
subjects: T::Array[Bluesky::Graph::Subject],
scan: Domain::UserJobEvent::FollowScan,
edge_name: Symbol,
user_attr: Symbol,
other_attr: Symbol,
).void
end
def handle_subjects_data(
user,
subjects,
scan,
edge_name:,
user_attr:,
other_attr:
)
subjects_by_did =
T.cast(subjects.index_by(&:did), T::Hash[String, Bluesky::Graph::Subject])
users_by_did =
T.cast(
Domain::User::BlueskyUser.where(did: subjects_by_did.keys).index_by(
&:did
),
T::Hash[String, Domain::User::BlueskyUser],
)
missing_user_dids = subjects_by_did.keys - users_by_did.keys
missing_user_dids.each do |did|
subject = subjects_by_did[did] || next
users_by_did[did] = create_user_from_subject(subject)
end
users_by_id = users_by_did.values.map { |u| [T.must(u.id), u] }.to_h
existing_subject_ids =
T.cast(user.send(edge_name).pluck(other_attr), T::Array[Integer])
new_user_ids = users_by_did.values.map(&:id).compact - existing_subject_ids
removed_user_ids =
existing_subject_ids - users_by_did.values.map(&:id).compact
follow_upsert_attrs = []
unfollow_upsert_attrs = []
referenced_user_ids = Set.new([user.id])
new_user_ids.each do |new_user_id|
new_user_did = users_by_id[new_user_id]&.did
followed_at = new_user_did && subjects_by_did[new_user_did]&.created_at
referenced_user_ids.add(new_user_id)
follow_upsert_attrs << {
user_attr => user.id,
other_attr => new_user_id,
:followed_at => followed_at,
:removed_at => nil,
}
end
removed_at = Time.current
removed_user_ids.each do |removed_user_id|
referenced_user_ids.add(removed_user_id)
unfollow_upsert_attrs << {
user_attr => user.id,
other_attr => removed_user_id,
:removed_at => removed_at,
}
end
Domain::User.transaction do
follow_upsert_attrs.each_slice(5000) do |slice|
Domain::UserUserFollow.upsert_all(slice, unique_by: %i[from_id to_id])
end
unfollow_upsert_attrs.each_slice(5000) do |slice|
Domain::UserUserFollow.upsert_all(slice, unique_by: %i[from_id to_id])
end
end
# reset counter caches
Domain::User.transaction do
referenced_user_ids.each do |user_id|
Domain::User.reset_counters(
user_id,
:user_user_follows_from,
:user_user_follows_to,
)
end
end
update_attrs = {
num_created_users: missing_user_dids.size,
num_existing_assocs: existing_subject_ids.size,
num_new_assocs: new_user_ids.size,
num_removed_assocs: removed_user_ids.size,
num_total_assocs: subjects.size,
}
logger.info(
format_tags("updated user #{edge_name}", make_tags(update_attrs)),
)
scan.update_json_attributes!(update_attrs)
user.touch
end
sig do
params(subject: Bluesky::Graph::Subject).returns(Domain::User::BlueskyUser)
end
def create_user_from_subject(subject)
user =
Domain::User::BlueskyUser.create!(
did: subject.did,
handle: subject.handle,
display_name: subject.display_name,
description: subject.description,
)
avatar = user.create_avatar(url_str: subject.avatar)
defer_job(Domain::Bluesky::Job::ScanUserJob, { user: }, { priority: 0 })
defer_job(Domain::UserAvatarJob, { avatar: }, { priority: -1 })
user
end
end