feat: implement Bluesky scan posts job and enhance user scanning

- Add new ScanPostsJob for scanning Bluesky posts
- Enhance ScanUserJob with improved error handling and logging
- Update BlueskyPost model with new fields and validation
- Add auxiliary tables for Bluesky posts
- Improve job base classes with better color logging
- Update specs with proper HTTP mocking patterns
- Add factory for BlueskyPost testing
This commit is contained in:
Dylan Knutson
2025-08-10 18:41:01 +00:00
parent 5c71fc6b15
commit eba4b58666
20 changed files with 796 additions and 548 deletions

View File

@@ -16,15 +16,20 @@ class Domain::Bluesky::Job::Base < Scraper::JobBase
if (user = arguments[0][:user]).is_a?(Domain::User::BlueskyUser)
user
elsif (did = arguments[0][:did]).present?
Domain::User::BlueskyUser.find_or_initialize_by(did: did)
Domain::User::BlueskyUser.find_or_create_by(did:) do |user|
resolver = DIDKit::Resolver.new
if (resolved = resolver.resolve_did(did))
user.handle = resolved.get_validated_handle
end
end
elsif (handle = arguments[0][:handle]).present?
resolver = DIDKit::Resolver.new
resolved =
resolver.resolve_handle(handle) ||
fatal_error("failed to resolve handle: #{handle}")
Domain::User::BlueskyUser.find_or_initialize_by(
did: resolved.did,
) { |user| user.handle = handle }
did = resolver.resolve_handle(handle)&.did
fatal_error("failed to resolve handle: #{handle}") if did.nil?
user = Domain::User::BlueskyUser.find_or_initialize_by(did:)
user.handle = handle
user.save!
user
else
nil
end
@@ -40,4 +45,9 @@ class Domain::Bluesky::Job::Base < Scraper::JobBase
# Add any known problematic handles/DIDs here
false
end
sig { params(did: String, cid: String).returns(String) }
def construct_blob_url(did, cid)
"https://bsky.social/xrpc/com.atproto.sync.getBlob?did=#{did}&cid=#{cid}"
end
end

View File

@@ -0,0 +1,253 @@
# typed: strict
class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
self.default_priority = -25
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
logger.push_tags(make_arg_tag(user))
logger.info("starting posts scan")
return if buggy_user?(user)
return unless user.state_ok?
if !user.posts_scan.due? && !force_scan?
logger.info(
format_tags(
"skipping posts scan",
make_tags(scanned_at: user.posts_scan.ago_in_words),
),
)
return
end
scan_user_posts(user)
logger.info(format_tags("completed posts scan"))
ensure
user.save! if user
end
private
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_posts(user)
# Use AT Protocol API to list user's posts
posts_url =
"https://bsky.social/xrpc/com.atproto.repo.listRecords?repo=#{user.did}&collection=app.bsky.feed.post&limit=100"
cursor = T.let(nil, T.nilable(String))
num_processed_posts = 0
num_posts_with_media = 0
num_created_posts = 0
num_pages = 0
loop do
url = cursor ? "#{posts_url}&cursor=#{cursor}" : posts_url
response = http_client.get(url)
num_pages += 1
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user posts",
make_tags(status_code: response.status_code),
),
)
end
begin
data = JSON.parse(response.body)
if data["error"]
logger.error(
format_tags("posts API error", make_tags(error: data["error"])),
)
break
end
records = data["records"] || []
records.each do |record_data|
num_processed_posts += 1
record = record_data["value"]
next unless record && record["embed"]
# Only process posts with media
num_posts_with_media += 1
user_did = user.did
next unless user_did
if process_historical_post(user, record_data, record, user_did)
num_created_posts += 1
end
end
cursor = data["cursor"]
break if cursor.nil? || records.empty?
rescue JSON::ParserError => e
logger.error(
format_tags(
"failed to parse posts JSON",
make_tags(error: e.message),
),
)
break
end
end
user.scanned_posts_at = Time.current
logger.info(
format_tags(
"scanned posts",
make_tags(
num_processed_posts:,
num_posts_with_media:,
num_created_posts:,
num_pages:,
),
),
)
end
sig do
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
record: T::Hash[String, T.untyped],
user_did: String,
).returns(T::Boolean)
end
def process_historical_post(user, record_data, record, user_did)
uri = record_data["uri"]
rkey = record_data["uri"].split("/").last
# Check if we already have this post
existing_post = Domain::Post::BlueskyPost.find_by(at_uri: uri)
return false if existing_post
begin
post =
Domain::Post::BlueskyPost.create!(
at_uri: uri,
bluesky_rkey: rkey,
text: record["text"] || "",
posted_at: Time.parse(record["createdAt"]),
post_raw: record,
)
post.creator = user
post.save!
# Process media if present
process_post_media(post, record["embed"], user_did) if record["embed"]
logger.debug(
format_tags(
"created historical post",
make_tags(bluesky_rkey: post.bluesky_rkey),
),
)
rescue => e
logger.error(
format_tags(
"failed to create historical post",
make_tags(bluesky_rkey: rkey, error: e.message),
),
)
false
end
true
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_post_images(post, embed_data["images"], did)
when "app.bsky.embed.recordWithMedia"
if embed_data["media"] &&
embed_data["media"]["$type"] == "app.bsky.embed.images"
process_post_images(post, embed_data["media"]["images"], did)
end
when "app.bsky.embed.external"
process_external_embed(post, embed_data["external"], did)
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
images: T::Array[T::Hash[String, T.untyped]],
did: String,
).void
end
def process_post_images(post, images, did)
files = []
images.each_with_index do |image_data, index|
blob_data = image_data["image"]
next unless blob_data && blob_data["ref"]
post_file =
post.files.build(
type: "Domain::PostFile::BlueskyPostFile",
file_order: index,
url_str: construct_blob_url(did, blob_data["ref"]["$link"]),
state: "pending",
alt_text: image_data["alt"],
blob_ref: blob_data["ref"]["$link"],
)
# Store aspect ratio if present
if image_data["aspectRatio"]
post_file.aspect_ratio_width = image_data["aspectRatio"]["width"]
post_file.aspect_ratio_height = image_data["aspectRatio"]["height"]
end
post_file.save!
Domain::StaticFileJob.perform_later({ post_file: })
files << post_file
end
logger.debug(
format_tags(
"created files for historical post",
make_tags(bluesky_rkey: post.bluesky_rkey, num_files: files.size),
),
)
end
sig do
params(
post: Domain::Post::BlueskyPost,
external_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_external_embed(post, external_data, did)
thumb_data = external_data["thumb"]
return unless thumb_data && thumb_data["ref"]
post_file =
post.files.build(
file_order: 0,
url_str: construct_blob_url(did, thumb_data["ref"]["$link"]),
state: "pending",
blob_ref: thumb_data["ref"]["$link"],
)
post_file.save!
Domain::StaticFileJob.perform_later({ post_file: })
logger.debug(
format_tags(
"created thumbnail for post",
make_tags(bluesky_rkey: post.bluesky_rkey),
),
)
end
end

View File

@@ -6,56 +6,67 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
def perform(args)
user = user_from_args!
logger.push_tags(make_arg_tag(user))
logger.info("Starting Bluesky user scan for #{user.handle}")
logger.info("starting profile scan")
return if buggy_user?(user)
# Scan user profile/bio
user = scan_user_profile(user) if force_scan? ||
user.scanned_profile_at.nil? || due_for_profile_scan?(user)
# Scan user's historical posts
if user.state_ok? &&
(
force_scan? || user.scanned_posts_at.nil? ||
due_for_posts_scan?(user)
)
scan_user_posts(user)
if !user.profile_scan.due? && !force_scan?
logger.info(
format_tags(
"skipping profile scan",
make_tags(scanned_at: user.profile_scan.ago_in_words),
),
)
return
end
logger.info("Completed Bluesky user scan")
# Scan user profile/bio
scan_user_profile(user)
logger.info(format_tags("completed profile scan"))
if user.posts_scan.due? || force_scan?
logger.info(
format_tags(
"enqueue posts scan",
make_tags(posts_scan: user.posts_scan.ago_in_words),
),
)
defer_job(Domain::Bluesky::Job::ScanPostsJob, { user: })
end
ensure
user.save! if user
end
private
sig do
params(user: Domain::User::BlueskyUser).returns(Domain::User::BlueskyUser)
end
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_profile(user)
logger.info("Scanning user profile for #{user.handle}")
logger.info(format_tags("scanning user profile"))
# Use AT Protocol API to get user profile
profile_url =
"https://bsky.social/xrpc/com.atproto.repo.getRecord?repo=#{user.did}&collection=app.bsky.actor.profile&rkey=self"
response = http_client.get(profile_url)
if response.status_code != 200
logger.error("Failed to get user profile: #{response.status_code}")
user.state_error!
return user
end
# Note: Store log entry reference if needed for debugging
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user profile",
make_tags(status_code: response.status_code),
),
)
end
begin
profile_data = JSON.parse(response.body)
if profile_data["error"]
logger.error("Profile API error: #{profile_data["error"]}")
user.state_error!
return user
fatal_error(
format_tags(
"profile API error",
make_tags(error: profile_data["error"]),
),
)
end
record = profile_data["value"]
@@ -71,201 +82,15 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
end
end
user.scanned_profile_at = Time.current
user.state_ok! unless user.state_error?
user.scanned_profile_at = Time.zone.now
rescue JSON::ParserError => e
logger.error("Failed to parse profile JSON: #{e.message}")
user.state_error!
end
user
end
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_posts(user)
logger.info("Scanning historical posts for #{user.handle}")
# Use AT Protocol API to list user's posts
posts_url =
"https://bsky.social/xrpc/com.atproto.repo.listRecords?repo=#{user.did}&collection=app.bsky.feed.post&limit=100"
cursor = T.let(nil, T.nilable(String))
posts_processed = 0
posts_with_media = 0
loop do
url = cursor ? "#{posts_url}&cursor=#{cursor}" : posts_url
response = http_client.get(url)
if response.status_code != 200
logger.error("Failed to get user posts: #{response.status_code}")
break
end
begin
data = JSON.parse(response.body)
if data["error"]
logger.error("Posts API error: #{data["error"]}")
break
end
records = data["records"] || []
records.each do |record_data|
posts_processed += 1
record = record_data["value"]
next unless record && record["embed"]
# Only process posts with media
posts_with_media += 1
user_did = user.did
next unless user_did
process_historical_post(user, record_data, record, user_did)
end
cursor = data["cursor"]
break if cursor.nil? || records.empty?
# Add small delay to avoid rate limiting
sleep(0.1)
rescue JSON::ParserError => e
logger.error("Failed to parse posts JSON: #{e.message}")
break
end
end
user.scanned_posts_at = Time.current
logger.info(
"Processed #{posts_processed} posts, #{posts_with_media} with media",
)
end
sig do
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
record: T::Hash[String, T.untyped],
user_did: String,
).void
end
def process_historical_post(user, record_data, record, user_did)
uri = record_data["uri"]
rkey = record_data["uri"].split("/").last
# Check if we already have this post
existing_post = Domain::Post::BlueskyPost.find_by(at_uri: uri)
return if existing_post
begin
post =
Domain::Post::BlueskyPost.create!(
at_uri: uri,
bluesky_rkey: rkey,
text: record["text"] || "",
bluesky_created_at: Time.parse(record["createdAt"]),
post_raw: record,
)
post.creator = user
post.save!
# Process media if present
process_post_media(post, record["embed"], user_did) if record["embed"]
logger.debug("Created historical post: #{post.bluesky_rkey}")
rescue => e
logger.error("Failed to create historical post #{rkey}: #{e.message}")
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_post_images(post, embed_data["images"], did)
when "app.bsky.embed.recordWithMedia"
if embed_data["media"] &&
embed_data["media"]["$type"] == "app.bsky.embed.images"
process_post_images(post, embed_data["media"]["images"], did)
end
when "app.bsky.embed.external"
process_external_embed(post, embed_data["external"], did)
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
images: T::Array[T::Hash[String, T.untyped]],
did: String,
).void
end
def process_post_images(post, images, did)
files = []
images.each_with_index do |image_data, index|
blob_data = image_data["image"]
next unless blob_data && blob_data["ref"]
post_file =
post.files.build(
type: "Domain::PostFile::BlueskyPostFile",
file_order: index,
url_str: construct_blob_url(did, blob_data["ref"]["$link"]),
state: "pending",
alt_text: image_data["alt"],
blob_ref: blob_data["ref"]["$link"],
)
# Store aspect ratio if present
if image_data["aspectRatio"]
post_file.aspect_ratio_width = image_data["aspectRatio"]["width"]
post_file.aspect_ratio_height = image_data["aspectRatio"]["height"]
end
post_file.save!
Domain::StaticFileJob.perform_later({ post_file: })
files << post_file
end
logger.debug(
"Created #{files.size} #{"file".pluralize(files.size)} for historical post: #{post.bluesky_rkey}",
)
end
sig do
params(
post: Domain::Post::BlueskyPost,
external_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_external_embed(post, external_data, did)
thumb_data = external_data["thumb"]
return unless thumb_data && thumb_data["ref"]
post_file =
post.files.build(
type: "Domain::PostFile::BlueskyPostFile",
file_order: 0,
url_str: construct_blob_url(did, thumb_data["ref"]["$link"]),
state: "pending",
blob_ref: thumb_data["ref"]["$link"],
fatal_error(
format_tags(
"failed to parse profile JSON",
make_tags(error: e.message),
),
)
post_file.save!
Domain::StaticFileJob.perform_later({ post_file: })
logger.debug(
"Created external thumbnail for historical post: #{post.bluesky_rkey}",
)
end
end
sig do
@@ -275,61 +100,45 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
).void
end
def process_user_avatar(user, avatar_data)
logger.debug("process_user_avatar called for user: #{user.handle}")
logger.debug(format_tags("processing user avatar", make_tags(avatar_data:)))
return unless avatar_data["ref"]
user_did = user.did
return unless user_did
avatar_url = construct_blob_url(user_did, avatar_data["ref"]["$link"])
logger.debug("Avatar URL: #{avatar_url}")
logger.debug(format_tags("extract avatar url", make_tags(avatar_url:)))
# Check if avatar already exists and is downloaded
existing_avatar = user.avatar
if existing_avatar.present?
logger.debug("Existing avatar found with state: #{existing_avatar.state}")
logger.debug(
format_tags(
"existing avatar found",
make_tags(state: existing_avatar.state),
),
)
# Only enqueue if the avatar URL has changed or it's not downloaded yet
if existing_avatar.url_str != avatar_url
existing_avatar.update!(url_str: avatar_url, state: "pending")
defer_result =
defer_job(Domain::UserAvatarJob, { avatar: existing_avatar })
logger.debug(
"Updated avatar URL and enqueued download for user: #{user.handle}, defer_result: #{defer_result}",
avatar = user.avatars.create!(url_str: avatar_url)
logger.info(
format_tags(
"avatar url changed, creating new avatar",
make_tags(avatar:),
),
)
defer_job(Domain::UserAvatarJob, { avatar: avatar })
elsif existing_avatar.state_pending?
defer_result =
defer_job(Domain::UserAvatarJob, { avatar: existing_avatar })
logger.debug(
"Re-enqueued pending avatar download for user: #{user.handle}, defer_result: #{defer_result}",
)
defer_job(Domain::UserAvatarJob, { avatar: existing_avatar })
logger.info(format_tags("re-enqueued pending avatar download"))
end
else
# Create new avatar and enqueue download
logger.debug("Creating new avatar for user: #{user.handle}")
avatar = user.create_avatar!(url_str: avatar_url, state: "pending")
defer_result = defer_job(Domain::UserAvatarJob, { avatar: avatar })
logger.debug(
"Created avatar and enqueued download for user: #{user.handle}, defer_result: #{defer_result}",
avatar = user.avatars.create!(url_str: avatar_url)
defer_job(Domain::UserAvatarJob, { avatar: })
logger.info(
format_tags("created avatar and enqueued download", make_tags(avatar:)),
)
end
end
sig { params(did: String, cid: String).returns(String) }
def construct_blob_url(did, cid)
"https://bsky.social/xrpc/com.atproto.sync.getBlob?did=#{did}&cid=#{cid}"
end
sig { params(user: Domain::User::BlueskyUser).returns(T::Boolean) }
def due_for_profile_scan?(user)
scanned_at = user.scanned_profile_at
return true if scanned_at.nil?
scanned_at < 1.month.ago
end
sig { params(user: Domain::User::BlueskyUser).returns(T::Boolean) }
def due_for_posts_scan?(user)
scanned_at = user.scanned_posts_at
return true if scanned_at.nil?
scanned_at < 1.week.ago
end
end

View File

@@ -122,7 +122,13 @@ class Scraper::JobBase < ApplicationJob
sig { returns(Domain::UserAvatar) }
def avatar_from_args!
T.cast(arguments[0][:avatar], Domain::UserAvatar)
if (avatar = arguments[0][:avatar])
T.cast(avatar, Domain::UserAvatar)
elsif (user = arguments[0][:user])
T.must(T.cast(user, Domain::User).avatar)
else
raise("no avatar found in arguments: #{arguments.inspect}")
end
end
sig { returns(Domain::PostFile) }

View File

@@ -27,6 +27,11 @@ module HasColorLogger
self.class.make_tag(tag_name, tag_value)
end
sig { params(tags: T::Hash[Symbol, T.untyped]).returns(T::Array[String]) }
def make_tags(tags)
tags.map { |tag_name, tag_value| make_tag(tag_name.to_s, tag_value) }
end
sig { params(tags: T.any(String, T::Array[String])).returns(String) }
def format_tags(*tags)
self.class.format_tags(*T.unsafe([tags].flatten))

View File

@@ -93,7 +93,7 @@ module Tasks::Bluesky
Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post|
post.bluesky_rkey = op.rkey
post.text = op.raw_record["text"]
post.bluesky_created_at = msg.time.in_time_zone("UTC")
post.posted_at = msg.time.in_time_zone("UTC")
post.creator = creator_for(msg)
post.post_raw = op.raw_record
end

View File

@@ -15,7 +15,6 @@ class Domain::Post::BlueskyPost < Domain::Post
validates :state, presence: true
validates :at_uri, presence: true, uniqueness: true
validates :bluesky_rkey, presence: true
validates :bluesky_created_at, presence: true
sig { override.returns([String, Symbol]) }
def self.param_prefix_and_attribute
@@ -29,7 +28,7 @@ class Domain::Post::BlueskyPost < Domain::Post
sig { override.returns(Symbol) }
def self.post_order_attribute
:bluesky_created_at
:posted_at
end
sig { override.returns(Domain::DomainType) }

View File

@@ -217,6 +217,7 @@ class Domain::User < ReduxApplicationRecord
inverse_of: :user
has_many :avatars,
-> { order(created_at: :desc) },
class_name: "::Domain::UserAvatar",
inverse_of: :user,
dependent: :destroy

View File

@@ -2,8 +2,8 @@
class Domain::User::BlueskyUser < Domain::User
aux_table :bluesky
due_timestamp :scanned_profile_at, 1.month
due_timestamp :scanned_posts_at, 1.week
due_timestamp :scanned_profile_at, 3.years
due_timestamp :scanned_posts_at, 3.years
has_created_posts! Domain::Post::BlueskyPost
has_faved_posts! Domain::Post::BlueskyPost