split out common bsky post creation logic into Bluesky::ProcessPostHelper

This commit is contained in:
Dylan Knutson
2025-08-14 17:55:17 +00:00
parent cfffe50541
commit e9ac97be29
11 changed files with 300 additions and 341 deletions

View File

@@ -87,9 +87,4 @@ class Domain::Bluesky::Job::Base < Scraper::JobBase
# Add any known problematic handles/DIDs here
false
end
sig { params(did: String, cid: String).returns(String) }
def construct_blob_url(did, cid)
"https://bsky.social/xrpc/com.atproto.sync.getBlob?did=#{did}&cid=#{cid}"
end
end

View File

@@ -232,36 +232,12 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
# Process media if present
embed = record_data.dig("value", "embed")
process_post_media(post, embed, user.did!) if embed
helper = Bluesky::ProcessPostHelper.new(@deferred_job_sink)
helper.process_post_media(post, embed, user.did!) if embed
logger.debug(format_tags("created post", make_tags(at_uri:)))
true
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_post_images(post, embed_data, did)
when "app.bsky.embed.video"
process_post_video(post, embed_data, did)
when "app.bsky.embed.recordWithMedia"
embed_type = embed_data.dig("media", "$type")
if embed_type == "app.bsky.embed.images"
process_post_images(post, embed_data["media"], did)
elsif embed_type == "app.bsky.embed.video"
process_post_video(post, embed_data["media"], did)
end
end
end
sig { params(post: Domain::Post::BlueskyPost).void }
def enqueue_pending_files_job(post)
post.files.each do |post_file|
@@ -270,91 +246,4 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
end
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_images(post, embed_data, did)
images = embed_data.dig("images") || []
images.each_with_index do |image_data, index|
post_file = post.files.build(file_order: index)
set_blob_ref_and_url(post_file, image_data["image"], did)
set_aspect_ratio(post_file, image_data["aspectRatio"])
set_alt_text(post_file, image_data["alt"])
post_file.save!
defer_job(Domain::StaticFileJob, { post_file: })
logger.debug(
format_tags(
"created image for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_video(post, embed_data, did)
post_file = post.files.build(file_order: 0)
set_blob_ref_and_url(post_file, embed_data["video"], did)
set_aspect_ratio(post_file, embed_data["aspectRatio"])
set_alt_text(post_file, embed_data["alt"])
post_file.save!
defer_job(Domain::StaticFileJob, { post_file: })
logger.debug(
format_tags(
"created video for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
file_data: T::Hash[String, T.untyped],
did: String,
).void
end
def set_blob_ref_and_url(post_file, file_data, did)
return unless file_data.dig("$type") == "blob"
blob_ref = file_data.dig("ref", "$link")
return unless blob_ref
post_file.blob_ref = blob_ref
post_file.url_str = construct_blob_url(did, blob_ref)
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
aspect_ratio: T.nilable(T::Hash[String, T.untyped]),
).void
end
def set_aspect_ratio(post_file, aspect_ratio)
return unless aspect_ratio
post_file.aspect_ratio_width = aspect_ratio.dig("width")
post_file.aspect_ratio_height = aspect_ratio.dig("height")
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
alt_text: T.nilable(String),
).void
end
def set_alt_text(post_file, alt_text)
post_file.alt_text = alt_text if alt_text
end
end

View File

@@ -121,7 +121,11 @@ class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
user_did = user.did
return unless user_did
avatar_url = construct_blob_url(user_did, avatar_data["ref"]["$link"])
avatar_url =
Bluesky::ProcessPostHelper.construct_blob_url(
user_did,
avatar_data["ref"]["$link"],
)
logger.debug(format_tags("extract avatar url", make_tags(avatar_url:)))
# Check if avatar already exists and is downloaded

View File

@@ -70,8 +70,7 @@ class Scraper::JobBase < ApplicationJob
sig { params(args: T.untyped).void }
def initialize(*args)
super(*T.unsafe(args))
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@suppressed_jobs = T.let(Set.new, T::Set[SuppressedJob])
@deferred_job_sink = T.let(DeferredJobSink.new(self.class), DeferredJobSink)
@http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@tor_http_client = T.let(nil, T.nilable(Scraper::HttpClient))
@gallery_dl_client = T.let(nil, T.nilable(Scraper::GalleryDlClient))
@@ -256,78 +255,12 @@ class Scraper::JobBase < ApplicationJob
)
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
).void
end
def suppress_deferred_job(job_class, params)
ignore_args = job_class.gather_ignore_signature_args
params_cleared =
params.reject { |key, value| ignore_args.include?(key.to_sym) }
!!@suppressed_jobs.add?(
SuppressedJob.new(job_class:, params: params_cleared),
)
end
delegate :defer_job, to: :@deferred_job_sink
delegate :suppress_deferred_job, to: :@deferred_job_sink
sig { void }
def enqueue_deferred_jobs!
jobs_to_enqueue =
@deferred_jobs.filter_map do |deferred_job|
if @suppressed_jobs.any? { |suppressed_job|
if suppressed_job.matches?(deferred_job)
logger.info(
"suppressing deferred job #{deferred_job.job_class.name} with params #{deferred_job.describe_params}",
)
true
end
}
nil
else
deferred_job
end
end
GoodJob::Bulk.enqueue do
jobs_to_enqueue.each do |deferred_job|
args =
deferred_job.params.merge(
{
caused_by_entry: causing_log_entry,
caused_by_job_id: self.job_id,
},
)
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: self.class,
enqueued_class: deferred_job.job_class,
)
if job
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
(make_tag("job_id", job.job_id)),
"enqueue deferred job",
),
)
end
end
rescue StandardError => e
logger.error("error enqueueing jobs: #{e.class.name} - #{e.message}")
end
@deferred_job_sink.enqueue_deferred_jobs!(causing_log_entry, self.job_id)
end
sig { params(msg: T.untyped).returns(T.noreturn) }

View File

@@ -0,0 +1,142 @@
# typed: strict
# frozen_string_literal: true
#
class Bluesky::ProcessPostHelper
extend T::Sig
include HasColorLogger
IMAGE_OR_VIDEO =
T.let(%w[app.bsky.embed.images app.bsky.embed.video], T::Array[String])
sig { params(deferred_job_sink: DeferredJobSink).void }
def initialize(deferred_job_sink)
@deferred_job_sink = deferred_job_sink
end
sig { params(did: String, cid: String).returns(String) }
def self.construct_blob_url(did, cid)
"https://bsky.social/xrpc/com.atproto.sync.getBlob?did=#{did}&cid=#{cid}"
end
sig { params(embed_data: T::Hash[String, T.untyped]).returns(T::Boolean) }
def should_process_post?(embed_data)
type = embed_data["$type"]
if IMAGE_OR_VIDEO.include?(type)
true
elsif type == "app.bsky.embed.recordWithMedia"
embed_type = embed_data.dig("media", "$type")
IMAGE_OR_VIDEO.include?(embed_type)
else
false
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_post_images(post, embed_data, did)
when "app.bsky.embed.video"
process_post_video(post, embed_data, did)
when "app.bsky.embed.recordWithMedia"
embed_type = embed_data.dig("media", "$type")
if embed_type == "app.bsky.embed.images"
process_post_images(post, embed_data["media"], did)
elsif embed_type == "app.bsky.embed.video"
process_post_video(post, embed_data["media"], did)
end
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_images(post, embed_data, did)
images = embed_data.dig("images") || []
images.each_with_index do |image_data, index|
post_file = post.files.build(file_order: index)
set_blob_ref_and_url(post_file, image_data["image"], did)
set_aspect_ratio(post_file, image_data["aspectRatio"])
set_alt_text(post_file, image_data["alt"])
post_file.save!
@deferred_job_sink.defer_job(Domain::StaticFileJob, { post_file: })
logger.debug(
format_tags(
"created image for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_video(post, embed_data, did)
post_file = post.files.build(file_order: 0)
set_blob_ref_and_url(post_file, embed_data["video"], did)
set_aspect_ratio(post_file, embed_data["aspectRatio"])
set_alt_text(post_file, embed_data["alt"])
post_file.save!
@deferred_job_sink.defer_job(Domain::StaticFileJob, { post_file: })
logger.debug(
format_tags(
"created video for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
file_data: T::Hash[String, T.untyped],
did: String,
).void
end
def set_blob_ref_and_url(post_file, file_data, did)
return unless file_data.dig("$type") == "blob"
blob_ref = file_data.dig("ref", "$link")
return unless blob_ref
post_file.blob_ref = blob_ref
post_file.url_str = self.class.construct_blob_url(did, blob_ref)
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
aspect_ratio: T.nilable(T::Hash[String, T.untyped]),
).void
end
def set_aspect_ratio(post_file, aspect_ratio)
return unless aspect_ratio
post_file.aspect_ratio_width = aspect_ratio.dig("width")
post_file.aspect_ratio_height = aspect_ratio.dig("height")
end
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
alt_text: T.nilable(String),
).void
end
def set_alt_text(post_file, alt_text)
post_file.alt_text = alt_text if alt_text
end
end

View File

@@ -0,0 +1,88 @@
# typed: strict
# frozen_string_literal: true
class DeferredJobSink
extend T::Sig
include HasColorLogger
sig { params(source_class: T.untyped).void }
def initialize(source_class)
@suppressed_jobs = T.let(Set.new, T::Set[SuppressedJob])
@deferred_jobs = T.let(Set.new, T::Set[DeferredJob])
@source_class = source_class
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
set_args: T::Hash[Symbol, T.untyped],
).returns(T::Boolean)
end
def defer_job(job_class, params, set_args = {})
!!@deferred_jobs.add?(DeferredJob.new(job_class:, params:, set_args:))
end
sig do
params(
job_class: T.class_of(Scraper::JobBase),
params: T::Hash[Symbol, T.untyped],
).void
end
def suppress_deferred_job(job_class, params)
ignore_args = job_class.gather_ignore_signature_args
params_cleared =
params.reject { |key, value| ignore_args.include?(key.to_sym) }
!!@suppressed_jobs.add?(
SuppressedJob.new(job_class:, params: params_cleared),
)
end
sig do
params(
caused_by_entry: T.nilable(HttpLogEntry),
caused_by_job_id: T.nilable(String),
).void
end
def enqueue_deferred_jobs!(caused_by_entry = nil, caused_by_job_id = nil)
jobs_to_enqueue =
@deferred_jobs.filter_map do |deferred_job|
if @suppressed_jobs.any? { |suppressed_job|
if suppressed_job.matches?(deferred_job)
logger.info(
"suppressing deferred job #{deferred_job.job_class.name} with params #{deferred_job.describe_params}",
)
true
end
}
nil
else
deferred_job
end
end
GoodJob::Bulk.enqueue do
jobs_to_enqueue.each do |deferred_job|
args =
deferred_job.params.merge({ caused_by_entry:, caused_by_job_id: })
set_args = deferred_job.set_args
job = deferred_job.job_class.set(set_args).perform_later(args)
Scraper::Metrics::JobBaseMetrics.observe_job_enqueued(
source_class: @source_class,
enqueued_class: deferred_job.job_class,
)
if job
logger.info(
format_tags(
make_tag("job_class", deferred_job.job_class.name),
(make_tag("job_id", job.job_id)),
"enqueue deferred job",
),
)
end
end
rescue StandardError => e
logger.error("error enqueueing jobs: #{e.class.name} - #{e.message}")
end
end
end

View File

@@ -25,8 +25,7 @@ module Scraper::Metrics::JobBaseMetrics
sig do
params(
source_class:
T.any(T.class_of(Scraper::JobBase), T.class_of(ReduxApplicationRecord)),
source_class: T.untyped,
enqueued_class: T.class_of(Scraper::JobBase),
).void
end

View File

@@ -28,6 +28,7 @@ module Tasks::Bluesky
wanted_collections: %w[
app.bsky.feed.post
app.bsky.embed.images
app.bsky.embed.video
app.bsky.embed.recordWithMedia
],
},
@@ -85,9 +86,15 @@ module Tasks::Bluesky
return unless @dids.include?(msg.did)
msg.operations.each do |op|
next unless op.action == :create && op.type == :bsky_post
deferred_job_sink = DeferredJobSink.new(self.class)
helper = Bluesky::ProcessPostHelper.new(deferred_job_sink)
embed_data =
T.let(op.raw_record["embed"], T.nilable(T::Hash[String, T.untyped]))
next unless embed_data
unless helper.should_process_post?(embed_data)
logger.info("skipping post: #{op.uri} - #{embed_data["$type"]}")
next
end
post =
Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post|
@@ -99,9 +106,11 @@ module Tasks::Bluesky
post.monitor_scanned_at = Time.current
end
process_media(post, embed_data, msg.did)
helper.process_post_media(post, embed_data, msg.did)
logger.info("created bluesky post: `#{post.rkey}` / `#{post.at_uri}`")
ensure
deferred_job_sink.enqueue_deferred_jobs! if deferred_job_sink
end
end
@@ -162,106 +171,5 @@ module Tasks::Bluesky
)
end
end
private
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_images(post, embed_data["images"], did)
when "app.bsky.embed.recordWithMedia"
# Handle quote posts with media
if embed_data["media"] &&
embed_data["media"]["$type"] == "app.bsky.embed.images"
process_images(post, embed_data["media"]["images"], did)
end
when "app.bsky.embed.external"
# Handle external embeds (website cards) - could have thumbnail images
process_external_embed(post, embed_data["external"], did)
else
logger.debug("unknown embed type: #{embed_data["$type"]}")
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
images: T::Array[T::Hash[String, T.untyped]],
did: String,
).void
end
def process_images(post, images, did)
files = []
images.each_with_index do |image_data, index|
blob_data = image_data["image"]
next unless blob_data && blob_data["ref"]
# Create PostFile record for the image
post_file =
post.files.build(
type: "Domain::PostFile::BlueskyPostFile",
file_order: index,
url_str: construct_blob_url(did, blob_data["ref"]["$link"]),
state: "pending",
alt_text: image_data["alt"],
blob_ref: blob_data["ref"]["$link"],
)
# Store aspect ratio if present
if image_data["aspectRatio"]
post_file.aspect_ratio_width = image_data["aspectRatio"]["width"]
post_file.aspect_ratio_height = image_data["aspectRatio"]["height"]
end
post_file.save!
Domain::StaticFileJob.perform_later({ post_file: })
files << post_file
end
logger.info(
"created #{files.size} #{"file".pluralize(files.size)} for post: #{post.rkey} / #{did}",
)
end
sig do
params(
post: Domain::Post::BlueskyPost,
external_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_external_embed(post, external_data, did)
# Handle thumbnail image from external embeds (website cards)
thumb_data = external_data["thumb"]
return unless thumb_data && thumb_data["ref"]
post_file =
post.files.build(
type: "Domain::PostFile::BlueskyPostFile",
file_order: 0,
url_str: construct_blob_url(did, thumb_data["ref"]["$link"]),
state: "pending",
)
# Store metadata
post_file.alt_text = "Website preview thumbnail"
post_file.blob_ref = thumb_data["ref"]["$link"]
post_file.save!
logger.info("created bluesky external thumbnail: #{post_file.url_str}")
end
sig { params(did: String, cid: String).returns(String) }
def construct_blob_url(did, cid)
# Construct the Bluesky blob URL using the AT Protocol getBlob endpoint
"https://bsky.social/xrpc/com.atproto.sync.getBlob?did=#{did}&cid=#{cid}"
end
end
end