Files
redux-scraper/app/jobs/domain/bluesky/job/scan_user_job.rb
2025-08-18 05:59:26 +00:00

233 lines
6.5 KiB
Ruby

# typed: strict
class Domain::Bluesky::Job::ScanUserJob < Domain::Bluesky::Job::Base
self.default_priority = -20
sig { override.params(args: T::Hash[Symbol, T.untyped]).returns(T.untyped) }
def perform(args)
user = user_from_args!
logger.push_tags(make_arg_tag(user))
logger.info(format_tags("starting profile scan"))
if user.state_account_disabled? && !force_scan?
logger.info(format_tags("account is disabled, skipping profile scan"))
return
end
if !user.profile_scan.due? && !force_scan?
logger.info(
format_tags(
"skipping profile scan",
make_tags(scanned_at: user.profile_scan.ago_in_words),
),
)
return
end
scan_user_profile(user)
user.scanned_profile_at = Time.zone.now
logger.info(format_tags("completed profile scan"))
ensure
user.save! if user
end
private
sig { params(user: Domain::User::BlueskyUser).void }
def scan_user_profile(user)
logger.info(format_tags("scanning user profile"))
profile_scan = Domain::UserJobEvent::ProfileScan.create!(user:)
# Use Bluesky Actor API to get user profile
profile_url =
"https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=#{user.did}"
response = http_client.get(profile_url)
user.last_scan_log_entry = response.log_entry
profile_scan.update!(log_entry: response.log_entry)
if response.status_code == 400
error = JSON.parse(response.body)["error"]
if error == "InvalidRequest"
logger.error(format_tags("account is disabled / does not exist"))
user.state = "account_disabled"
return
end
end
if response.status_code != 200
fatal_error(
format_tags(
"failed to get user profile",
make_tags(status_code: response.status_code),
),
)
end
begin
profile_data = JSON.parse(response.body)
rescue JSON::ParserError => e
fatal_error(
format_tags(
"failed to parse profile JSON",
make_tags(error: e.message),
),
)
end
if profile_data["error"]
fatal_error(
format_tags(
"profile API error",
make_tags(error: profile_data["error"]),
),
)
end
# The getProfile endpoint returns the profile data directly, not wrapped in "value"
record = profile_data
if record
# Update user profile information
user.description = record["description"]
user.display_name = record["displayName"]
user.profile_raw = record
# Set registration time from profile createdAt
if record["createdAt"]
user.registered_at = Time.parse(record["createdAt"]).in_time_zone("UTC")
logger.info(
format_tags(
"set user registration time",
make_tags(registered_at: user.registered_at),
),
)
end
# Process avatar if present
process_user_avatar_url(user, record["avatar"]) if record["avatar"]
end
end
sig do
params(
user: Domain::User::BlueskyUser,
avatar_data: T::Hash[String, T.untyped],
).void
end
def process_user_avatar(user, avatar_data)
logger.debug(format_tags("processing user avatar", make_tags(avatar_data:)))
return unless avatar_data["ref"]
user_did = user.did
return unless user_did
avatar_url =
Bluesky::ProcessPostHelper.construct_blob_url(
user_did,
avatar_data["ref"]["$link"],
)
logger.debug(format_tags("extract avatar url", make_tags(avatar_url:)))
# Check if avatar already exists and is downloaded
existing_avatar = user.avatar
if existing_avatar.present?
logger.debug(
format_tags(
"existing avatar found",
make_tags(state: existing_avatar.state),
),
)
# Only enqueue if the avatar URL has changed or it's not downloaded yet
if existing_avatar.url_str != avatar_url
avatar = user.avatars.create!(url_str: avatar_url)
logger.info(
format_tags(
"avatar url changed, creating new avatar",
make_arg_tag(avatar),
),
)
defer_job(
Domain::UserAvatarJob,
{ avatar: avatar },
{ queue: "bluesky", priority: -30 },
)
elsif existing_avatar.state_pending?
defer_job(
Domain::UserAvatarJob,
{ avatar: existing_avatar },
{ queue: "bluesky", priority: -30 },
)
logger.info(format_tags("re-enqueued pending avatar download"))
end
else
# Create new avatar and enqueue download
avatar = user.avatars.create!(url_str: avatar_url)
defer_job(
Domain::UserAvatarJob,
{ avatar: },
{ queue: "bluesky", priority: -30 },
)
logger.info(
format_tags(
"created avatar and enqueued download",
make_arg_tag(avatar),
),
)
end
end
sig { params(user: Domain::User::BlueskyUser, avatar_url: String).void }
def process_user_avatar_url(user, avatar_url)
logger.debug(
format_tags("processing user avatar url", make_tags(avatar_url:)),
)
return if avatar_url.blank?
# Check if avatar already exists and is downloaded
existing_avatar = user.avatar
if existing_avatar.present?
logger.debug(
format_tags(
"existing avatar found",
make_tags(state: existing_avatar.state),
),
)
# Only enqueue if the avatar URL has changed or it's not downloaded yet
if existing_avatar.url_str != avatar_url
avatar = user.avatars.create!(url_str: avatar_url)
logger.info(
format_tags(
"avatar url changed, creating new avatar",
make_arg_tag(avatar),
),
)
defer_job(
Domain::UserAvatarJob,
{ avatar: avatar },
{ queue: "bluesky", priority: -30 },
)
elsif existing_avatar.state_pending?
defer_job(
Domain::UserAvatarJob,
{ avatar: existing_avatar },
{ queue: "bluesky", priority: -30 },
)
logger.info(format_tags("re-enqueued pending avatar download"))
end
else
# Create new avatar and enqueue download
avatar = user.avatars.create!(url_str: avatar_url)
defer_job(
Domain::UserAvatarJob,
{ avatar: },
{ queue: "bluesky", priority: -30 },
)
logger.info(
format_tags(
"created avatar and enqueued download",
make_arg_tag(avatar),
),
)
end
end
end