Files
redux-scraper/app/jobs/domain/bluesky/job/scan_posts_job.rb
2025-08-14 21:13:37 +00:00

250 lines
6.9 KiB
Ruby

# typed: strict
class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
MEDIA_EMBED_TYPES = %w[app.bsky.embed.images app.bsky.embed.video]
self.default_priority = -10
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
logger.push_tags(make_arg_tag(user))
logger.info(format_tags("starting posts scan"))
return if buggy_user?(user)
unless user.state_ok?
logger.error(
format_tags("skipping posts scan", make_tags(state: user.state)),
)
return
end
if !user.posts_scan.due? && !force_scan?
logger.info(
format_tags(
"skipping posts scan",
make_tags(scanned_at: user.posts_scan.ago_in_words),
),
)
return
end
scan_user_posts(user)
user.last_posts_scan_log_entry = first_log_entry
user.touch
logger.info(format_tags("completed posts scan"))
ensure
user.save! if user
end
private
sig do
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
).returns(T::Boolean)
end
def should_record_post?(user, record_data)
# Check for quotes first - skip quotes of other users' posts
quote_uri = extract_quote_uri(record_data)
if quote_uri
# Extract DID from the quoted post URI
quoted_did = quote_uri.split("/")[2]
return false unless quoted_did == user.did
end
# Check for replies - only record if it's a root post or reply to user's own post
return true unless record_data.dig("value", "reply")
# For replies, check if the root post is by the same user
reply_data = record_data.dig("value", "reply")
root_uri = reply_data.dig("root", "uri")
return true unless root_uri # If we can't determine root, allow it
# Extract DID from the root post URI
# AT URI format: at://did:plc:xyz/app.bsky.feed.post/rkey
root_did = root_uri.split("/")[2]
# Only record if the root post is by the same user
root_did == user.did
end
sig { params(record: T::Hash[String, T.untyped]).returns(T.nilable(String)) }
def extract_quote_uri(record)
# Check for quote in embed data
embed = record["embed"]
return nil unless embed
case embed["$type"]
when "app.bsky.embed.record"
# Direct quote - check if it's actually a quote of a post
record_data = embed["record"]
if record_data && record_data["uri"]&.include?("app.bsky.feed.post")
record_data["uri"]
end
when "app.bsky.embed.recordWithMedia"
# Quote with media
record_data = embed.dig("record", "record")
if record_data && record_data["uri"]&.include?("app.bsky.feed.post")
record_data["uri"]
end
else
nil
end
end
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_posts(user)
# Use AT Protocol API to list user's posts
posts_url =
"https://bsky.social/xrpc/com.atproto.repo.listRecords?repo=#{user.did}&collection=app.bsky.feed.post&limit=100"
cursor = T.let(nil, T.nilable(String))
num_processed_posts = 0
num_posts_with_media = 0
num_filtered_posts = 0
num_created_posts = 0
num_pages = 0
posts_scan = Domain::UserJobEvent::PostsScan.create!(user:)
loop do
url = cursor ? "#{posts_url}&cursor=#{cursor}" : posts_url
response = http_client.get(url)
posts_scan.update!(log_entry: response.log_entry) if num_pages == 0
num_pages += 1
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user posts",
make_tags(status_code: response.status_code),
),
)
end
begin
data = JSON.parse(response.body)
if data["error"]
logger.error(
format_tags("posts API error", make_tags(error: data["error"])),
)
break
end
records = data["records"] || []
records.each do |record_data|
num_processed_posts += 1
embed_type = record_data.dig("value", "embed", "$type")
unless MEDIA_EMBED_TYPES.include?(embed_type)
logger.info(
format_tags(
"skipping post, non-media embed type",
make_tags(embed_type:),
),
)
next
end
# Only process posts with media
num_posts_with_media += 1
# Skip posts that are replies to other users or quotes
unless should_record_post?(user, record_data)
num_filtered_posts += 1
next
end
if process_historical_post(user, record_data, response.log_entry)
num_created_posts += 1
end
end
cursor = data["cursor"]
break if cursor.nil? || records.empty?
rescue JSON::ParserError => e
logger.error(
format_tags(
"failed to parse posts JSON",
make_tags(error: e.message),
),
)
break
end
end
user.scanned_posts_at = Time.current
posts_scan.update!(
total_posts_seen: num_processed_posts,
new_posts_seen: num_created_posts,
)
logger.info(
format_tags(
"scanned posts",
make_tags(
num_processed_posts:,
num_posts_with_media:,
num_filtered_posts:,
num_created_posts:,
num_pages:,
),
),
)
end
sig do
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
log_entry: HttpLogEntry,
).returns(T::Boolean)
end
def process_historical_post(user, record_data, log_entry)
at_uri = record_data["uri"]
# Check if we already have this post
existing_post = user.posts.find_by(at_uri:)
if existing_post
enqueue_pending_files_job(existing_post)
return false
end
# Extract reply and quote URIs from the raw post data
reply_to_uri = record_data.dig("value", "reply", "root", "uri")
quote_uri = extract_quote_uri(record_data)
post =
Domain::Post::BlueskyPost.build(
state: "ok",
at_uri: at_uri,
first_seen_entry: log_entry,
text: record_data.dig("value", "text") || "",
posted_at: Time.parse(record_data.dig("value", "createdAt")),
post_raw: record_data.dig("value"),
reply_to_uri: reply_to_uri,
quote_uri: quote_uri,
)
post.creator = user
post.save!
# Process media if present
embed = record_data.dig("value", "embed")
helper = Bluesky::ProcessPostHelper.new(@deferred_job_sink)
helper.process_post_media(post, embed, user.did!) if embed
logger.debug(format_tags("created post", make_tags(at_uri:)))
true
end
sig { params(post: Domain::Post::BlueskyPost).void }
def enqueue_pending_files_job(post)
post.files.each do |post_file|
if post_file.state_pending?
defer_job(Domain::StaticFileJob, { post_file: })
end
end
end
end