video post downloading

This commit is contained in:
Dylan Knutson
2025-08-11 18:09:49 +00:00
parent 40c6d44100
commit 390f0939b0
13 changed files with 717 additions and 380 deletions

View File

@@ -283,7 +283,7 @@ class Domain::PostsController < DomainController
relation = T.unsafe(policy_scope(relation)).page(params[:page]).per(50)
relation =
relation.order(
relation.klass.post_order_attribute => :desc,
"#{relation.klass.post_order_attribute} DESC, id DESC",
) unless skip_ordering
relation
end

View File

@@ -1,5 +1,7 @@
# typed: strict
class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
MEDIA_EMBED_TYPES = %w[app.bsky.embed.images app.bsky.embed.video]
self.default_priority = -25
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
@@ -34,6 +36,62 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
private
sig do
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
).returns(T::Boolean)
end
def should_record_post?(user, record_data)
# Check for quotes first - skip quotes of other users' posts
quote_uri = extract_quote_uri(record_data)
if quote_uri
# Extract DID from the quoted post URI
quoted_did = quote_uri.split("/")[2]
return false unless quoted_did == user.did
end
# Check for replies - only record if it's a root post or reply to user's own post
return true unless record_data.dig("value", "reply")
# For replies, check if the root post is by the same user
reply_data = record_data.dig("value", "reply")
root_uri = reply_data.dig("root", "uri")
return true unless root_uri # If we can't determine root, allow it
# Extract DID from the root post URI
# AT URI format: at://did:plc:xyz/app.bsky.feed.post/rkey
root_did = root_uri.split("/")[2]
# Only record if the root post is by the same user
root_did == user.did
end
sig { params(record: T::Hash[String, T.untyped]).returns(T.nilable(String)) }
def extract_quote_uri(record)
# Check for quote in embed data
embed = record["embed"]
return nil unless embed
case embed["$type"]
when "app.bsky.embed.record"
# Direct quote - check if it's actually a quote of a post
record_data = embed["record"]
if record_data && record_data["uri"]&.include?("app.bsky.feed.post")
record_data["uri"]
end
when "app.bsky.embed.recordWithMedia"
# Quote with media
record_data = embed.dig("record", "record")
if record_data && record_data["uri"]&.include?("app.bsky.feed.post")
record_data["uri"]
end
else
nil
end
end
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_posts(user)
# Use AT Protocol API to list user's posts
@@ -43,6 +101,7 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
cursor = T.let(nil, T.nilable(String))
num_processed_posts = 0
num_posts_with_media = 0
num_filtered_posts = 0
num_created_posts = 0
num_pages = 0
@@ -75,12 +134,27 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
records.each do |record_data|
num_processed_posts += 1
record = record_data["value"]
next unless record && record["embed"]
embed_type = record_data.dig("value", "embed", "$type")
unless MEDIA_EMBED_TYPES.include?(embed_type)
logger.info(
format_tags(
"skipping post, non-media embed type",
make_tags(embed_type:),
),
)
next
end
# Only process posts with media
num_posts_with_media += 1
if process_historical_post(user, record_data, record)
# Skip posts that are replies to other users or quotes
unless should_record_post?(user, record_data)
num_filtered_posts += 1
next
end
if process_historical_post(user, record_data, response.log_entry)
num_created_posts += 1
end
end
@@ -105,6 +179,7 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
make_tags(
num_processed_posts:,
num_posts_with_media:,
num_filtered_posts:,
num_created_posts:,
num_pages:,
),
@@ -116,51 +191,42 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
params(
user: Domain::User::BlueskyUser,
record_data: T::Hash[String, T.untyped],
record: T::Hash[String, T.untyped],
log_entry: HttpLogEntry,
).returns(T::Boolean)
end
def process_historical_post(user, record_data, record)
uri = record_data["uri"]
rkey = record_data["uri"].split("/").last
def process_historical_post(user, record_data, log_entry)
at_uri = record_data["uri"]
# Check if we already have this post
existing_post = user.posts.find_by(bluesky_rkey: rkey)
existing_post = user.posts.find_by(at_uri:)
if existing_post
enqueue_pending_files_job(existing_post)
return false
end
begin
post =
Domain::Post::BlueskyPost.create!(
at_uri: uri,
bluesky_rkey: rkey,
text: record["text"] || "",
posted_at: Time.parse(record["createdAt"]),
post_raw: record,
)
# Extract reply and quote URIs from the raw post data
reply_to_uri = record_data.dig("value", "reply", "root", "uri")
quote_uri = extract_quote_uri(record_data)
post.creator = user
post.save!
# Process media if present
process_post_media(post, record["embed"], user.did!) if record["embed"]
logger.debug(
format_tags(
"created historical post",
make_tags(bluesky_rkey: post.bluesky_rkey),
),
post =
Domain::Post::BlueskyPost.build(
state: "ok",
at_uri: at_uri,
first_seen_entry: log_entry,
text: record_data.dig("value", "text") || "",
posted_at: Time.parse(record_data.dig("value", "createdAt")),
post_raw: record_data,
reply_to_uri: reply_to_uri,
quote_uri: quote_uri,
)
rescue => e
logger.error(
format_tags(
"failed to create historical post",
make_tags(bluesky_rkey: rkey, error: e.message),
),
)
false
end
post.creator = user
post.save!
# Process media if present
embed = record_data.dig("value", "embed")
process_post_media(post, embed, user.did!) if embed
logger.debug(format_tags("created post", make_tags(at_uri:)))
true
end
@@ -175,14 +241,16 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
def process_post_media(post, embed_data, did)
case embed_data["$type"]
when "app.bsky.embed.images"
process_post_images(post, embed_data["images"], did)
process_post_images(post, embed_data, did)
when "app.bsky.embed.video"
process_post_video(post, embed_data, did)
when "app.bsky.embed.recordWithMedia"
if embed_data["media"] &&
embed_data["media"]["$type"] == "app.bsky.embed.images"
process_post_images(post, embed_data["media"]["images"], did)
embed_type = embed_data.dig("media", "$type")
if embed_type == "app.bsky.embed.images"
process_post_images(post, embed_data["media"], did)
elsif embed_type == "app.bsky.embed.video"
process_post_video(post, embed_data["media"], did)
end
when "app.bsky.embed.external"
process_external_embed(post, embed_data["external"], did)
end
end
@@ -202,69 +270,87 @@ class Domain::Bluesky::Job::ScanPostsJob < Domain::Bluesky::Job::Base
sig do
params(
post: Domain::Post::BlueskyPost,
images: T::Array[T::Hash[String, T.untyped]],
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_images(post, images, did)
files = []
def process_post_images(post, embed_data, did)
images = embed_data.dig("images") || []
images.each_with_index do |image_data, index|
blob_data = image_data["image"]
next unless blob_data && blob_data["ref"]
post_file =
post.files.build(
file_order: index,
url_str: construct_blob_url(did, blob_data["ref"]["$link"]),
alt_text: image_data["alt"],
blob_ref: blob_data["ref"]["$link"],
)
# Store aspect ratio if present
if image_data["aspectRatio"]
post_file.aspect_ratio_width = image_data["aspectRatio"]["width"]
post_file.aspect_ratio_height = image_data["aspectRatio"]["height"]
end
post_file = post.files.build(file_order: index)
set_blob_ref_and_url(post_file, image_data["image"], did)
set_aspect_ratio(post_file, image_data["aspectRatio"])
set_alt_text(post_file, image_data["alt"])
post_file.save!
defer_job(Domain::StaticFileJob, { post_file: }, { queue: "bluesky" })
files << post_file
logger.debug(
format_tags(
"created image for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
end
sig do
params(
post: Domain::Post::BlueskyPost,
embed_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_post_video(post, embed_data, did)
post_file = post.files.build(file_order: 0)
set_blob_ref_and_url(post_file, embed_data["video"], did)
set_aspect_ratio(post_file, embed_data["aspectRatio"])
set_alt_text(post_file, embed_data["alt"])
post_file.save!
defer_job(Domain::StaticFileJob, { post_file: }, { queue: "bluesky" })
logger.debug(
format_tags(
"created files for historical post",
make_tags(bluesky_rkey: post.bluesky_rkey, num_files: files.size),
"created video for post",
make_tags(at_uri: post.at_uri, post_file_id: post_file.id),
),
)
end
sig do
params(
post: Domain::Post::BlueskyPost,
external_data: T::Hash[String, T.untyped],
post_file: Domain::PostFile::BlueskyPostFile,
file_data: T::Hash[String, T.untyped],
did: String,
).void
end
def process_external_embed(post, external_data, did)
thumb_data = external_data["thumb"]
return unless thumb_data && thumb_data["ref"]
def set_blob_ref_and_url(post_file, file_data, did)
return unless file_data.dig("$type") == "blob"
blob_ref = file_data.dig("ref", "$link")
return unless blob_ref
post_file.blob_ref = blob_ref
post_file.url_str = construct_blob_url(did, blob_ref)
end
post_file =
post.files.build(
file_order: 0,
url_str: construct_blob_url(did, thumb_data["ref"]["$link"]),
blob_ref: thumb_data["ref"]["$link"],
)
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
aspect_ratio: T.nilable(T::Hash[String, T.untyped]),
).void
end
def set_aspect_ratio(post_file, aspect_ratio)
return unless aspect_ratio
post_file.aspect_ratio_width = aspect_ratio.dig("width")
post_file.aspect_ratio_height = aspect_ratio.dig("height")
end
post_file.save!
defer_job(Domain::StaticFileJob, { post_file: }, { queue: "bluesky" })
logger.debug(
format_tags(
"created thumbnail for post",
make_tags(bluesky_rkey: post.bluesky_rkey),
),
)
sig do
params(
post_file: Domain::PostFile::BlueskyPostFile,
alt_text: T.nilable(String),
).void
end
def set_alt_text(post_file, alt_text)
post_file.alt_text = alt_text if alt_text
end
end

View File

@@ -91,7 +91,7 @@ module Tasks::Bluesky
post =
Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post|
post.bluesky_rkey = op.rkey
post.rkey = op.rkey
post.text = op.raw_record["text"]
post.posted_at = msg.time.in_time_zone("UTC")
post.creator = creator_for(msg)
@@ -100,9 +100,7 @@ module Tasks::Bluesky
process_media(post, embed_data, msg.did)
logger.info(
"created bluesky post: `#{post.bluesky_rkey}` / `#{post.at_uri}`",
)
logger.info("created bluesky post: `#{post.rkey}` / `#{post.at_uri}`")
end
end
@@ -225,7 +223,7 @@ module Tasks::Bluesky
end
logger.info(
"created #{files.size} #{"file".pluralize(files.size)} for post: #{post.bluesky_rkey} / #{did}",
"created #{files.size} #{"file".pluralize(files.size)} for post: #{post.rkey} / #{did}",
)
end

View File

@@ -14,11 +14,21 @@ class Domain::Post::BlueskyPost < Domain::Post
validates :state, presence: true
validates :at_uri, presence: true, uniqueness: true
validates :bluesky_rkey, presence: true
validates :rkey, presence: true
before_validation { self.rkey = at_uri&.split("/")&.last }
validate :rkey_matches_at_uri
sig { void }
def rkey_matches_at_uri
return true if rkey.blank? && at_uri.blank?
return true if rkey == at_uri&.split("/")&.last
errors.add(:rkey, "does not match AT URI")
end
sig { override.returns([String, Symbol]) }
def self.param_prefix_and_attribute
["bsky", :bluesky_rkey]
["bsky", :rkey]
end
sig { override.returns(String) }
@@ -43,16 +53,14 @@ class Domain::Post::BlueskyPost < Domain::Post
sig { override.returns(T.nilable(T.any(String, Integer))) }
def domain_id_for_view
self.bluesky_rkey
self.rkey
end
sig { override.returns(T.nilable(Addressable::URI)) }
def external_url_for_view
handle = self.creator&.handle
if bluesky_rkey.present? && handle.present?
Addressable::URI.parse(
"https://bsky.app/profile/#{handle}/post/#{bluesky_rkey}",
)
if rkey.present? && handle.present?
Addressable::URI.parse("https://bsky.app/profile/#{handle}/post/#{rkey}")
end
end
@@ -61,9 +69,14 @@ class Domain::Post::BlueskyPost < Domain::Post
self.files.first
end
sig { override.returns(T.nilable(HttpLogEntry)) }
def scanned_post_log_entry_for_view
self.first_seen_entry
end
sig { override.returns(T::Boolean) }
def pending_scan?
scanned_at.nil?
false
end
sig { override.returns(T.nilable(String)) }