user follows/followed by scans for bluesky

This commit is contained in:
Dylan Knutson
2025-08-14 17:03:36 +00:00
parent e1933104b3
commit 1d248c1f23
37 changed files with 6145 additions and 38 deletions

View File

@@ -93,11 +93,18 @@ module GoodJobHelper
sig { params(job: GoodJob::Job).returns(T::Array[JobArg]) }
def arguments_for_job(job)
deserialized =
T.cast(
ActiveJob::Arguments.deserialize(job.serialized_params).to_h,
T::Hash[String, T.untyped],
begin
deserialized =
T.cast(
ActiveJob::Arguments.deserialize(job.serialized_params).to_h,
T::Hash[String, T.untyped],
)
rescue ActiveJob::DeserializationError => e
Rails.logger.error(
"error deserializing job arguments: #{e.class.name} - #{e.message}",
)
return [JobArg.new(key: :error, value: e.message, inferred: true)]
end
args_hash =
T.cast(deserialized["arguments"].first, T::Hash[Symbol, T.untyped])
args =

View File

@@ -106,11 +106,12 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
num_filtered_posts = 0
num_created_posts = 0
num_pages = 0
posts_scan = Domain::UserJobEvent::PostsScan.create!(user:)
loop do
url = cursor ? "#{posts_url}&cursor=#{cursor}" : posts_url
response = http_client.get(url)
posts_scan.update!(log_entry: response.log_entry) if num_pages == 0
num_pages += 1
if response.status_code != 200
@@ -176,6 +177,10 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
end
user.scanned_posts_at = Time.current
posts_scan.update!(
total_posts_seen: num_processed_posts,
new_posts_seen: num_created_posts,
)
logger.info(
format_tags(
"scanned posts",

View File

@@ -0,0 +1,264 @@
# typed: strict
# frozen_string_literal: true
class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base
self.default_priority = -10
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
last_follows_scan = user.follows_scans.last
if (ca = last_follows_scan&.created_at) && (ca > 1.month.ago) &&
!force_scan?
logger.info(
format_tags(
"skipping user #{user.did} follows scan",
make_tags(
ago: time_ago_in_words(ca),
last_scan_id: last_follows_scan.id,
),
),
)
else
perform_scan_type(
user,
"follows",
bsky_method: "app.bsky.graph.getFollows",
bsky_field: "follows",
edge_name: :user_user_follows_from,
user_attr: :from_id,
other_attr: :to_id,
)
end
last_followed_by_scan = user.followed_by_scans.last
if (ca = last_followed_by_scan&.created_at) && (ca > 1.month.ago) &&
!force_scan?
logger.info(
format_tags(
"skipping user #{user.did} followed by scan",
make_tags(
ago: time_ago_in_words(ca),
last_scan_id: last_followed_by_scan.id,
),
),
)
else
perform_scan_type(
user,
"followed_by",
bsky_method: "app.bsky.graph.getFollowers",
bsky_field: "followers",
edge_name: :user_user_follows_to,
user_attr: :to_id,
other_attr: :from_id,
)
end
end
private
sig do
params(
user: Domain::User::BlueskyUser,
kind: String,
bsky_method: String,
bsky_field: String,
edge_name: Symbol,
user_attr: Symbol,
other_attr: Symbol,
).void
end
def perform_scan_type(
user,
kind,
bsky_method:,
bsky_field:,
edge_name:,
user_attr:,
other_attr:
)
scan = Domain::UserJobEvent::FollowScan.create!(user:, kind:)
cursor = T.let(nil, T.nilable(String))
page = 0
subjects_data = T.let([], T::Array[Bluesky::Graph::Subject])
loop do
# get followers
xrpc_url =
"https://public.api.bsky.app/xrpc/#{bsky_method}?actor=#{user.did!}&limit=100"
xrpc_url = "#{xrpc_url}&cursor=#{cursor}" if cursor
response = http_client.get(xrpc_url)
scan.update!(log_entry: response.log_entry) if page == 0
page += 1
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user #{kind}",
make_tags(status_code: response.status_code),
),
)
end
data = JSON.parse(response.body)
if data["error"]
fatal_error(
format_tags(
"failed to get user #{kind}",
make_tags(error: data["error"]),
),
)
end
subjects_data.concat(
data[bsky_field].map do |subject_data|
Bluesky::Graph::Subject.from_json(subject_data)
end,
)
cursor = data["cursor"]
break if cursor.nil?
end
handle_subjects_data(
user,
subjects_data,
scan,
edge_name:,
user_attr:,
other_attr:,
)
scan.update!(state: "completed", completed_at: Time.current)
logger.info(
format_tags(
"completed user #{kind} scan",
make_tags(num_subjects: subjects_data.size),
),
)
rescue => e
scan.update!(state: "error", completed_at: Time.current) if scan
raise e
end
sig do
params(
user: Domain::User::BlueskyUser,
subjects: T::Array[Bluesky::Graph::Subject],
scan: Domain::UserJobEvent::FollowScan,
edge_name: Symbol,
user_attr: Symbol,
other_attr: Symbol,
).void
end
def handle_subjects_data(
user,
subjects,
scan,
edge_name:,
user_attr:,
other_attr:
)
subjects_by_did =
T.cast(subjects.index_by(&:did), T::Hash[String, Bluesky::Graph::Subject])
users_by_did =
T.cast(
Domain::User::BlueskyUser.where(did: subjects_by_did.keys).index_by(
&:did
),
T::Hash[String, Domain::User::BlueskyUser],
)
missing_user_dids = subjects_by_did.keys - users_by_did.keys
missing_user_dids.each do |did|
subject = subjects_by_did[did] || next
users_by_did[did] = create_user_from_subject(subject)
end
users_by_id = users_by_did.values.map { |u| [T.must(u.id), u] }.to_h
existing_subject_ids =
T.cast(user.send(edge_name).pluck(other_attr), T::Array[Integer])
new_user_ids = users_by_did.values.map(&:id).compact - existing_subject_ids
removed_user_ids =
existing_subject_ids - users_by_did.values.map(&:id).compact
follow_upsert_attrs = []
unfollow_upsert_attrs = []
referenced_user_ids = Set.new([user.id])
new_user_ids.each do |new_user_id|
new_user_did = users_by_id[new_user_id]&.did
followed_at = new_user_did && subjects_by_did[new_user_did]&.created_at
referenced_user_ids.add(new_user_id)
follow_upsert_attrs << {
user_attr => user.id,
other_attr => new_user_id,
:followed_at => followed_at,
:removed_at => nil,
}
end
removed_at = Time.current
removed_user_ids.each do |removed_user_id|
referenced_user_ids.add(removed_user_id)
unfollow_upsert_attrs << {
user_attr => user.id,
other_attr => removed_user_id,
:removed_at => removed_at,
}
end
Domain::User.transaction do
follow_upsert_attrs.each_slice(5000) do |slice|
Domain::UserUserFollow.upsert_all(slice, unique_by: %i[from_id to_id])
end
unfollow_upsert_attrs.each_slice(5000) do |slice|
Domain::UserUserFollow.upsert_all(slice, unique_by: %i[from_id to_id])
end
end
# reset counter caches
Domain::User.transaction do
referenced_user_ids.each do |user_id|
Domain::User.reset_counters(
user_id,
:user_user_follows_from,
:user_user_follows_to,
)
end
end
update_attrs = {
num_created_users: missing_user_dids.size,
num_existing_assocs: existing_subject_ids.size,
num_new_assocs: new_user_ids.size,
num_removed_assocs: removed_user_ids.size,
num_total_assocs: subjects.size,
}
logger.info(
format_tags("updated user #{edge_name}", make_tags(update_attrs)),
)
scan.update_json_attributes!(update_attrs)
end
sig do
params(subject: Bluesky::Graph::Subject).returns(Domain::User::BlueskyUser)
end
def create_user_from_subject(subject)
user =
Domain::User::BlueskyUser.create!(
did: subject.did,
handle: subject.handle,
display_name: subject.display_name,
description: subject.description,
)
avatar = user.create_avatar(url_str: subject.avatar)
defer_job(Domain::Bluesky::Job::ScanUserJob, { user: }, { priority: 0 })
defer_job(Domain::UserAvatarJob, { avatar: }, { priority: -1 })
user
end
end

View File

@@ -8,7 +8,6 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
logger.push_tags(make_arg_tag(user))
logger.info(format_tags("starting profile scan"))
return if buggy_user?(user)
if !user.profile_scan.due? && !force_scan?
logger.info(
format_tags(
@@ -16,12 +15,10 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
make_tags(scanned_at: user.profile_scan.ago_in_words),
),
)
enqueue_scan_posts_job_if_due(user)
return
end
scan_user_profile(user)
enqueue_scan_posts_job_if_due(user)
logger.info(format_tags("completed profile scan"))
ensure
user.save! if user
@@ -55,6 +52,7 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_profile(user)
logger.info(format_tags("scanning user profile"))
profile_scan = Domain::UserJobEvent::ProfileScan.create!(user:)
# Use AT Protocol API to get user profile
profile_url =
@@ -62,6 +60,7 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
response = http_client.get(profile_url)
user.last_scan_log_entry = response.log_entry
profile_scan.update!(log_entry: response.log_entry)
if response.status_code != 200
fatal_error(

View File

@@ -2,6 +2,7 @@
class Domain::StaticFileJob < Scraper::JobBase
include Domain::StaticFileJobHelper
queue_as :static_file
discard_on ActiveJob::DeserializationError
sig { override.returns(Symbol) }
def self.http_factory_method

View File

@@ -1,6 +1,7 @@
# typed: strict
class Domain::UserAvatarJob < Scraper::JobBase
queue_as :static_file
discard_on ActiveJob::DeserializationError
sig { override.returns(Symbol) }
def self.http_factory_method
@@ -43,6 +44,10 @@ class Domain::UserAvatarJob < Scraper::JobBase
end
end
ensure
avatar.save! if avatar
if avatar
avatar.save!
user = avatar.user
user.touch if user
end
end
end

7
app/lib/bluesky/graph.rb Normal file
View File

@@ -0,0 +1,7 @@
# typed: strict
# frozen_string_literal: true
module Bluesky
module Graph
end
end

View File

@@ -0,0 +1,31 @@
# typed: strict
# frozen_string_literal: true
module Bluesky
module Graph
class Subject < T::ImmutableStruct
extend T::Sig
include T::Struct::ActsAsComparable
const :did, String
const :handle, String
const :display_name, T.nilable(String)
const :description, T.nilable(String)
const :avatar, T.nilable(String)
const :indexed_at, T.nilable(Time)
const :created_at, T.nilable(Time)
sig { params(json: T::Hash[String, T.untyped]).returns(Subject) }
def self.from_json(json)
new(
did: json["did"],
handle: json["handle"],
display_name: json["displayName"],
description: json["description"],
avatar: json["avatar"],
indexed_at: (ia = json["indexedAt"]) && Time.zone.parse(ia),
created_at: (ca = json["createdAt"]) && Time.zone.parse(ca),
)
end
end
end
end

View File

@@ -127,6 +127,16 @@ class Domain::User < ReduxApplicationRecord
has_many :followed_users, through: :user_user_follows_from, source: :to
has_many :followed_by_users, through: :user_user_follows_to, source: :from
has_many :follows_scans,
-> { where(kind: "follows").order(created_at: :asc) },
inverse_of: :user,
class_name: "Domain::UserJobEvent::FollowScan"
has_many :followed_by_scans,
-> { where(kind: "followed_by").order(created_at: :asc) },
inverse_of: :user,
class_name: "Domain::UserJobEvent::FollowScan"
sig { params(klass: T.class_of(Domain::Post)).void }
def self.has_created_posts!(klass)
self.class_has_created_posts = klass

View File

@@ -8,6 +8,9 @@ class Domain::User::BlueskyUser < Domain::User
has_created_posts! Domain::Post::BlueskyPost
# TODO - when we scrape liked posts, add this back in
# has_faved_posts! Domain::Post::BlueskyPost
#
has_followed_users! Domain::User::BlueskyUser
has_followed_by_users! Domain::User::BlueskyUser
belongs_to :last_scan_log_entry, class_name: "HttpLogEntry", optional: true
belongs_to :last_posts_scan_log_entry,

View File

@@ -6,5 +6,5 @@ class Domain::UserJobEvent < ReduxApplicationRecord
abstract!
self.abstract_class = true
belongs_to :user, class_name: Domain::User.name
belongs_to :user, class_name: "Domain::User"
end

View File

@@ -0,0 +1,32 @@
# typed: strict
# frozen_string_literal: true
class Domain::UserJobEvent::FollowScan < Domain::UserJobEvent
self.table_name = "domain_user_job_event_follow_scans"
belongs_to :log_entry, class_name: "HttpLogEntry", optional: true
enum :state,
{ running: "running", error: "error", completed: "completed" },
prefix: true
enum :kind, { followed_by: "followed_by", follows: "follows" }, prefix: true
validates :state,
presence: true,
inclusion: {
in: %w[running error completed],
}
validates :kind, presence: true, inclusion: { in: %w[followed_by follows] }
validates :started_at, presence: true
validates :completed_at, presence: true, unless: :state_running?
validates :log_entry, presence: true, if: :state_completed?
before_validation do
self.state ||= "running" if new_record?
self.started_at ||= Time.current if new_record?
end
sig { params(json_attributes: T::Hash[Symbol, T.untyped]).void }
def update_json_attributes!(json_attributes)
self.json_attributes = json_attributes.merge(self.json_attributes)
save!
end
end

View File

@@ -0,0 +1,4 @@
class Domain::UserJobEvent::PostsScan < Domain::UserJobEvent
self.table_name = "domain_user_job_event_posts_scans"
belongs_to :log_entry, class_name: "HttpLogEntry", optional: true
end

View File

@@ -0,0 +1,4 @@
class Domain::UserJobEvent::ProfileScan < Domain::UserJobEvent
self.table_name = "domain_user_job_event_profile_scans"
belongs_to :log_entry, class_name: "HttpLogEntry", optional: true
end

View File

@@ -64,7 +64,7 @@ class ReduxApplicationRecord < ActiveRecord::Base
end
after_save do
T.bind(self, ReduxApplicationRecord)
# T.bind(self, ReduxApplicationRecord)
@after_save_deferred_jobs ||=
T.let([], T.nilable(T::Array[[DeferredJob, T.nilable(Scraper::JobBase)]]))

View File

@@ -1,14 +1,14 @@
<div class="flex items-center gap-2">
<% if user.display_name.present? %>
<span class="text-slate-800 text-lg font-semibold">
<% if user.display_name.present? %>
<div class="flex flex-col mb-1">
<span class="text-slate-800 text-lg font-semibold leading-none">
<%= user.display_name %>
</span>
<span class="text-sm font-normal text-slate-500" title="<%= user.did %>">
@<%= user.handle %>
</span>
<% else %>
<span class="text-slate-800 text-lg font-semibold" title="<%= user.did %>">
<%= user.handle %>
</span>
<% end %>
</div>
</div>
<% else %>
<span class="text-slate-800 text-lg font-semibold" title="<%= user.did %>">
<%= user.handle %>
</span>
<% end %>

View File

@@ -6,12 +6,12 @@
<% case @index_type %>
<% when :followed_by %>
<%= pluralize(@users.total_count, "user") %> following
<%= link_to @user.name,
<%= link_to @user.name_for_view,
domain_user_following_path(@user),
class: "text-blue-600 hover:underline" %>
<% when :following %>
<%= pluralize(@users.total_count, "user") %> followed by
<%= link_to @user.name,
<%= link_to @user.name_for_view,
domain_user_following_path(@user),
class: "text-blue-600 hover:underline" %>
<% when :users_faving_post %>
@@ -36,17 +36,21 @@
<% if user.avatar&.log_entry.present? %>
<%= image_tag domain_user_avatar_img_src_path(user.avatar, thumb: "64-avatar"),
class: "h-12 w-12 rounded-md border object-cover",
alt: user.name %>
alt: user.name_for_view %>
<% else %>
<div
class="flex h-12 w-12 items-center justify-center rounded-full bg-slate-200"
>
<i class="bi bi-person text-slate-400"></i>
<i class="fa-solid fa-user text-slate-400"></i>
</div>
<% end %>
<div class="min-w-0">
<div class="font-medium text-slate-900"><%= user.name %></div>
<div class="text-sm text-slate-500">@<%= user.url_name %></div>
<div class="font-medium text-slate-900"><%= user.name_for_view %></div>
<% if user.is_a?(Domain::User::BlueskyUser) %>
<div class="text-sm text-slate-500">@<%= user.handle %></div>
<% elsif user.is_a?(Domain::User::FaUse) %>
<div class="text-sm text-slate-500">@<%= user.url_name %></div>
<% end %>
</div>
<% end %>
<% end %>