Files
redux-scraper/rake/bluesky.rake
2025-08-18 16:28:53 +00:00

203 lines
5.9 KiB
Ruby

# typed: true
# frozen_string_literal: true
T.bind(self, T.all(Rake::DSL, Object))
module BlueskyRakeHelpers
extend T::Sig
sig { params(handle: String).returns(T.nilable(String)) }
def self.resolve_did(handle)
DIDKit::Resolver.new.resolve_handle(handle)&.did
end
sig { params(did: String).returns(T.nilable(String)) }
def self.resolve_handle(did)
DIDKit::Resolver.new.resolve_did(did).get_validated_handle
end
sig do
params(handle_or_did: T.nilable(String)).returns(
T.nilable(Domain::User::BlueskyUser),
)
end
def self.find_bsky_user(handle_or_did)
return nil if handle_or_did.blank?
if handle_or_did.starts_with?("did:")
did = handle_or_did
Domain::User::BlueskyUser.find_by(did:) ||
begin
handle = resolve_handle(did)
return nil if handle.blank?
Domain::User::BlueskyUser.create!(did:, handle:)
end
else
Domain::User::BlueskyUser.find_by(handle:) ||
begin
did = self.resolve_did(handle_or_did)
return nil if did.blank?
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
user.handle = handle_or_did
end
end
end
end
end
namespace :bluesky do
desc "Start the Bluesky monitor"
task monitor: :environment do
Tasks::Bluesky::Monitor.new.run
end
desc "Add a DID or hashtag to the Bluesky monitor"
task add: :environment do
if (hashtag = ENV["hashtag"])
# Remove # if provided
hashtag = hashtag.gsub(/^#/, "")
Domain::Bluesky::MonitoredObject.build_for_hashtag(hashtag).save!
puts "Added hashtag: ##{hashtag}"
elsif ENV["handle"] || ENV["did"]
user = BlueskyRakeHelpers.find_bsky_user(ENV["handle"] || ENV["did"])
if user.nil?
puts "user not found"
next
end
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
Domain::Bluesky::Job::ScanPostsJob.perform_later(user:)
Domain::Bluesky::MonitoredObject.build_for_user(user).save!
else
raise "hashtag or handle/did is required"
end
end
desc "Add DID or user handle, bulk from stdin"
task add_bulk: :environment do
added = 0
file = (ENV["file"] ? File.open(T.must(ENV["file"]), "r") : STDIN)
skip = ENV["skip"]&.to_i
while line = file.gets
if skip.present?
skip -= 1
next if skip > 0
end
begin
line = line.strip.chomp
next if line.blank?
user = BlueskyRakeHelpers.find_bsky_user(line)
if user
unless user.scanned_profile_at.present?
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
puts "enqueued profile scan: #{user.did} / #{user.handle}"
end
unless user.scanned_posts_at.present?
Domain::Bluesky::Job::ScanPostsJob.perform_later(user:)
puts "enqueued posts scan: #{user.did} / #{user.handle}"
end
obj = Domain::Bluesky::MonitoredObject.build_for_user(user)
if obj.save
puts "added #{user.did} / #{user.handle} to monitor"
added += 1
end
if added % 25 == 0
loop do
queue_size =
GoodJob::Job
.where(
finished_at: nil,
performed_at: nil,
error: nil,
queue_name: "bluesky",
)
.where(
[
"(serialized_params->'exception_executions' = '{}')",
"(serialized_params->'exception_executions' is null)",
].join(" OR "),
)
.count
puts "queue size: #{queue_size}"
if queue_size > 150
sleep 10
else
break
end
end
end
else
puts "user not found: #{line}"
end
rescue StandardError => e
puts "error: #{e.message}"
puts "line: #{line}"
raise e
end
end
puts "added #{added} users to monitor"
end
desc "Remove a DID or hashtag from the Bluesky monitor"
task remove: :environment do
if (hashtag = ENV["hashtag"])
# Remove # if provided
hashtag = hashtag.gsub(/^#/, "")
Domain::Bluesky::MonitoredObject.find_by(
value: hashtag,
kind: :hashtag,
)&.destroy!
puts "Removed hashtag: ##{hashtag}"
elsif (handle = ENV["handle"])
did = BlueskyRakeHelpers.resolve_did(handle)
puts "resolved did: #{did}"
raise "did is required" if did.blank?
Domain::Bluesky::MonitoredObject.find_by(
value: did,
kind: :user_did,
)&.destroy!
elsif (did = ENV["did"])
raise "did is required" if did.blank?
Domain::Bluesky::MonitoredObject.find_by(
value: did,
kind: :user_did,
)&.destroy!
else
raise "hashtag, handle, or did is required"
end
end
desc "Watch users that user follows"
task watch_follows: :environment do
user =
BlueskyRakeHelpers.find_bsky_user(ENV["handle"] || ENV["did"]) ||
raise("user is required, need a handle/did")
Domain::Bluesky::Job::ScanUserFollowsJob.perform_now(user:)
user.reload
user.user_user_follows_from.each do |follow|
to_user = T.cast(follow.to, Domain::User::BlueskyUser)
model = Domain::Bluesky::MonitoredObject.build_for_user(to_user)
puts "added #{to_user.did} / #{to_user.handle} to monitor" if model.save
end
end
desc "Extract bluesky post rkeys / user dids from E621 sources"
task extract_e621_bluesky_posts_and_users: :environment do
sources = Set.new
Domain::Post::E621Post
.where("sources_array::text LIKE '%https://bsky.app%'")
.find_each(batch_size: 32) do |post|
post.sources_array.each do |source|
if source.include?("https://bsky.app") && sources.add?(source)
puts source
end
end
end
end
end