montior hashtag impl
This commit is contained in:
@@ -17,7 +17,10 @@ module Tasks::Bluesky
|
||||
Domain::Bluesky::MonitoredObject.where(kind: :user_did).pluck(:value),
|
||||
)
|
||||
@hashtags.merge(
|
||||
Domain::Bluesky::MonitoredObject.where(kind: :hashtag).pluck(:value),
|
||||
Domain::Bluesky::MonitoredObject
|
||||
.where(kind: :hashtag)
|
||||
.pluck(:value)
|
||||
.map(&:downcase),
|
||||
)
|
||||
|
||||
logger.info("dids: #{@dids.to_a.join(", ")}")
|
||||
@@ -88,23 +91,52 @@ module Tasks::Bluesky
|
||||
sig { params(msg: Skyfall::Jetstream::CommitMessage).void }
|
||||
def handle_commit_message(msg)
|
||||
return unless msg.type == :commit
|
||||
return unless @dids.include?(msg.did)
|
||||
|
||||
msg.operations.each do |op|
|
||||
next unless op.action == :create && op.type == :bsky_post
|
||||
|
||||
# Check if we should process this post (either from monitored DID or contains monitored hashtags)
|
||||
from_monitored_did = @dids.include?(msg.did)
|
||||
post_text = T.let(op.raw_record["text"], T.nilable(String)) || ""
|
||||
post_hashtags = extract_hashtags(post_text)
|
||||
has_monitored_hashtag = !(post_hashtags & @hashtags.to_a).empty?
|
||||
|
||||
next unless from_monitored_did || has_monitored_hashtag
|
||||
|
||||
deferred_job_sink = DeferredJobSink.new(self.class)
|
||||
helper = Bluesky::ProcessPostHelper.new(deferred_job_sink)
|
||||
embed_data =
|
||||
T.let(op.raw_record["embed"], T.nilable(T::Hash[String, T.untyped]))
|
||||
next unless embed_data
|
||||
unless helper.should_process_post?(embed_data)
|
||||
logger.info("skipping post: #{op.uri} - #{embed_data["$type"]}")
|
||||
reason =
|
||||
(
|
||||
if from_monitored_did
|
||||
"monitored DID"
|
||||
else
|
||||
"monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}"
|
||||
end
|
||||
)
|
||||
logger.info(
|
||||
"skipping post from #{reason}: #{op.uri} - #{embed_data["$type"]}",
|
||||
)
|
||||
next
|
||||
end
|
||||
|
||||
reason =
|
||||
(
|
||||
if from_monitored_did
|
||||
"monitored DID"
|
||||
else
|
||||
"monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}"
|
||||
end
|
||||
)
|
||||
logger.info("processing post from #{reason}: #{op.uri}")
|
||||
|
||||
post =
|
||||
Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post|
|
||||
post.rkey = op.rkey
|
||||
post.text = op.raw_record["text"]
|
||||
post.text = post_text
|
||||
post.posted_at = msg.time.in_time_zone("UTC")
|
||||
post.creator = creator_for(msg)
|
||||
post.post_raw = op.raw_record
|
||||
@@ -119,6 +151,14 @@ module Tasks::Bluesky
|
||||
end
|
||||
end
|
||||
|
||||
sig { params(text: String).returns(T::Array[String]) }
|
||||
def extract_hashtags(text)
|
||||
# Extract hashtags from text (matches #word or #word123 but not #123)
|
||||
hashtags = text.scan(/#([a-zA-Z]\w*)/).flatten
|
||||
# Convert to lowercase for case-insensitive matching
|
||||
hashtags.map(&:downcase)
|
||||
end
|
||||
|
||||
sig do
|
||||
params(msg: Skyfall::Jetstream::CommitMessage).returns(
|
||||
T.nilable(Domain::User::BlueskyUser),
|
||||
@@ -126,12 +166,16 @@ module Tasks::Bluesky
|
||||
end
|
||||
def creator_for(msg)
|
||||
did = msg.did
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |creator|
|
||||
creator = Domain::User::BlueskyUser.find_or_initialize_by(did:)
|
||||
if creator.new_record?
|
||||
creator.handle = @resolver.get_validated_handle(did) || did
|
||||
logger.info(
|
||||
"created bluesky user: `#{creator.handle}` / `#{creator.did}`",
|
||||
)
|
||||
creator.save!
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user: creator)
|
||||
end
|
||||
creator
|
||||
end
|
||||
|
||||
sig { returns(T.nilable(Integer)) }
|
||||
@@ -165,10 +209,10 @@ module Tasks::Bluesky
|
||||
case event
|
||||
when Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL
|
||||
@dids.add(value) if kind == "user_did"
|
||||
@hashtags.add(value) if kind == "hashtag"
|
||||
@hashtags.add(value.downcase) if kind == "hashtag"
|
||||
when Domain::Bluesky::MonitoredObject::REMOVED_NOTIFY_CHANNEL
|
||||
@dids.delete(value) if kind == "user_did"
|
||||
@hashtags.delete(value) if kind == "hashtag"
|
||||
@hashtags.delete(value.downcase) if kind == "hashtag"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -12,41 +12,68 @@ namespace :bluesky do
|
||||
DIDKit::Resolver.new.resolve_handle(handle)&.did
|
||||
end
|
||||
|
||||
desc "Add a DID to the Bluesky monitor"
|
||||
desc "Add a DID or hashtag to the Bluesky monitor"
|
||||
task add: :environment do
|
||||
if (handle = ENV["handle"])
|
||||
if (hashtag = ENV["hashtag"])
|
||||
# Remove # if provided
|
||||
hashtag = hashtag.gsub(/^#/, "")
|
||||
|
||||
Domain::Bluesky::MonitoredObject.create!(value: hashtag, kind: :hashtag)
|
||||
puts "Added hashtag: ##{hashtag}"
|
||||
elsif (handle = ENV["handle"])
|
||||
did = resolve_did(handle)
|
||||
puts "resolved did: #{did}"
|
||||
else
|
||||
did = ENV["did"]
|
||||
end
|
||||
raise "did is required" if did.blank?
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did)
|
||||
user =
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = handle
|
||||
end
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
|
||||
Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did)
|
||||
user =
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = handle
|
||||
end
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
|
||||
elsif (did = ENV["did"])
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did)
|
||||
user =
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = nil
|
||||
end
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
|
||||
else
|
||||
raise "hashtag, handle, or did is required"
|
||||
end
|
||||
end
|
||||
|
||||
desc "Remove a DID from the Bluesky monitor"
|
||||
desc "Remove a DID or hashtag from the Bluesky monitor"
|
||||
task remove: :environment do
|
||||
if (handle = ENV["handle"])
|
||||
if (hashtag = ENV["hashtag"])
|
||||
# Remove # if provided
|
||||
hashtag = hashtag.gsub(/^#/, "")
|
||||
|
||||
Domain::Bluesky::MonitoredObject.find_by(
|
||||
value: hashtag,
|
||||
kind: :hashtag,
|
||||
)&.destroy!
|
||||
puts "Removed hashtag: ##{hashtag}"
|
||||
elsif (handle = ENV["handle"])
|
||||
did = resolve_did(handle)
|
||||
puts "resolved did: #{did}"
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
Domain::Bluesky::MonitoredObject.find_by(
|
||||
value: did,
|
||||
kind: :user_did,
|
||||
)&.destroy!
|
||||
elsif (did = ENV["did"])
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
Domain::Bluesky::MonitoredObject.find_by(
|
||||
value: did,
|
||||
kind: :user_did,
|
||||
)&.destroy!
|
||||
else
|
||||
did = ENV["did"]
|
||||
raise "hashtag, handle, or did is required"
|
||||
end
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
Domain::Bluesky::MonitoredObject.find_by(value: did)&.destroy!
|
||||
end
|
||||
|
||||
desc "Delete all bluesky posts/files"
|
||||
task delete_all: :environment do
|
||||
raise unless Rails.env.development?
|
||||
Domain::PostFile::BlueskyPostFile.destroy_all
|
||||
Domain::Post::BlueskyPost.destroy_all
|
||||
end
|
||||
end
|
||||
|
||||
@@ -8,6 +8,7 @@ RSpec.describe Tasks::Bluesky::Monitor do
|
||||
subject(:monitor) { described_class.new(pg_notify: false) }
|
||||
|
||||
let(:test_did) { "did:plc:test123456789" }
|
||||
let(:unmonitored_did) { "did:plc:unmonitored123" }
|
||||
let(:base_time) { Time.parse("2025-01-08 12:00:00 UTC") }
|
||||
|
||||
before do
|
||||
@@ -20,6 +21,13 @@ RSpec.describe Tasks::Bluesky::Monitor do
|
||||
did: test_did,
|
||||
handle: "testuser.bsky.social",
|
||||
)
|
||||
|
||||
# Create a Bluesky user for the unmonitored DID (for hashtag tests)
|
||||
create(
|
||||
:domain_user_bluesky_user,
|
||||
did: unmonitored_did,
|
||||
handle: "unmonitored.bsky.social",
|
||||
)
|
||||
end
|
||||
|
||||
# Helper method to create real CommitMessage objects
|
||||
|
||||
Reference in New Issue
Block a user