# typed: strict # frozen_string_literal: true module Tasks::Bluesky class Monitor include HasColorLogger extend T::Sig CURSOR_KEY = "task-bluesky-jetstream-cursor-1" sig { params(pg_notify: T::Boolean).void } def initialize(pg_notify: true) @pg_notify = pg_notify @resolver = T.let(DIDKit::Resolver.new, DIDKit::Resolver) @dids = T.let(Concurrent::Set.new, Concurrent::Set) @hashtags = T.let(Concurrent::Set.new, Concurrent::Set) @dids.merge( Domain::Bluesky::MonitoredObject.where(kind: :user_did).pluck(:value), ) @hashtags.merge( Domain::Bluesky::MonitoredObject .where(kind: :hashtag) .pluck(:value) .map(&:downcase), ) logger.info( format_tags("loaded dids", make_tags(num_dids: @dids.to_a.size)), ) logger.info( format_tags( "loaded hashtags", make_tags( num_hashtags: @hashtags.to_a.size, hashtags: @hashtags.to_a.inspect, ), ), ) cursor = load_cursor logger.info(format_tags("using cursor", make_tags(cursor:))) @bluesky_client = T.let( Skyfall::Jetstream.new( "jetstream2.us-east.bsky.network", { cursor:, wanted_collections: %w[ app.bsky.feed.post app.bsky.embed.images app.bsky.embed.video app.bsky.embed.recordWithMedia ], }, ), Skyfall::Jetstream, ) @bluesky_client.user_agent = "ReFurrer/1.0 (bsky:delta.refurrer.com)" end sig { void } def run @bluesky_client.on_connecting do logger.info(format_tags("connecting...")) end @bluesky_client.on_connect { logger.info(format_tags("connected")) } @bluesky_client.on_disconnect { logger.info(format_tags("disconnected")) } @bluesky_client.on_reconnect do logger.info(format_tags("connection lost, trying to reconnect...")) end @bluesky_client.on_timeout do logger.info( format_tags("connection stalled, triggering a reconnect..."), ) end @bluesky_client.on_message do |msg| handle_message(msg) if msg.seq % 10_000 == 0 logger.info(format_tags("saving cursor", make_tags(seq: msg.seq))) save_cursor(msg.seq) end end @bluesky_client.on_error do |e| logger.error(format_tags("ERROR", make_tags(error: e))) end # Start the thread to listen to postgres NOTIFYs to add to the @dids set pg_notify_thread = Thread.new { listen_to_postgres_notifies } if @pg_notify @bluesky_client.connect rescue Interrupt logger.info(format_tags("shutting down...")) @bluesky_client.disconnect @bluesky_client.close pg_notify_thread&.raise(Interrupt) pg_notify_thread&.join logger.info(format_tags("shutdown complete")) end sig { params(msg: Skyfall::Jetstream::Message).void } def handle_message(msg) case msg when Skyfall::Jetstream::CommitMessage handle_commit_message(msg) end end sig { params(msg: Skyfall::Jetstream::CommitMessage).void } def handle_commit_message(msg) return unless msg.type == :commit msg.operations.each do |op| next unless op.action == :create && op.type == :bsky_post # Check if we should process this post (either from monitored DID or contains monitored hashtags) from_monitored_did = @dids.include?(msg.did) post_text = T.let(op.raw_record["text"], T.nilable(String)) || "" post_hashtags = extract_hashtags(post_text) has_monitored_hashtag = !(post_hashtags & @hashtags.to_a).empty? log_args = ( if from_monitored_did { reason: "did", did: msg.did } else { reason: "hashtag", hashtags: (post_hashtags & @hashtags.to_a).inspect, } end ) next unless from_monitored_did || has_monitored_hashtag deferred_job_sink = DeferredJobSink.new(self.class) helper = Bluesky::ProcessPostHelper.new(deferred_job_sink) embed_data = T.let(op.raw_record["embed"], T.nilable(T::Hash[String, T.untyped])) next unless embed_data unless helper.should_process_post?(embed_data) logger.info( format_tags( "skipping post", make_tags(**log_args), make_tags(uri: op.uri), make_tags(type: embed_data["$type"]), ), ) next end logger.info( format_tags( "processing post", make_tags(**log_args), make_tags(uri: op.uri), ), ) post = Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post| post.rkey = op.rkey post.text = post_text post.posted_at = msg.time.in_time_zone("UTC") post.creator = creator_for(msg) post.post_raw = op.raw_record post.monitor_scanned_at = Time.current end helper.process_post_media(post, embed_data, msg.did) logger.info( format_tags( "created bluesky post", make_tags(rkey: post.rkey), make_tags(at_uri: post.at_uri), ), ) ensure deferred_job_sink.enqueue_deferred_jobs! if deferred_job_sink end end sig { params(text: String).returns(T::Array[String]) } def extract_hashtags(text) # Extract hashtags from text (matches #word or #word123 but not #123) hashtags = text.scan(/#([a-zA-Z]\w*)/).flatten # Convert to lowercase for case-insensitive matching hashtags.map(&:downcase) end sig do params(msg: Skyfall::Jetstream::CommitMessage).returns( T.nilable(Domain::User::BlueskyUser), ) end def creator_for(msg) did = msg.did creator = Domain::User::BlueskyUser.find_or_initialize_by(did:) if creator.new_record? creator.handle = @resolver.get_validated_handle(did) || did logger.info( format_tags( "created bluesky user", make_tags(handle: creator.handle), make_tags(did: creator.did), ), ) creator.save! Domain::Bluesky::Job::ScanUserJob.perform_later(user: creator) end creator end sig { returns(T.nilable(Integer)) } def load_cursor GlobalState.get(CURSOR_KEY)&.to_i end sig { params(cursor: Integer).void } def save_cursor(cursor) GlobalState.set(CURSOR_KEY, cursor.to_s) end sig { void } def listen_to_postgres_notifies logger.info(format_tags("listening to postgres NOTIFYs")) ActiveRecord::Base.connection_pool.with_connection do |conn| conn = T.cast(conn, ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) conn.exec_query( "LISTEN #{Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL}", ) conn.exec_query( "LISTEN #{Domain::Bluesky::MonitoredObject::REMOVED_NOTIFY_CHANNEL}", ) loop do conn.raw_connection.wait_for_notify do |event, pid, payload| kind, value = payload.split("/", 2) logger.info( format_tags( "NOTIFY", make_tags(event: event), make_tags(pid: pid), make_tags(payload: payload), make_tags(kind: kind), make_tags(value: value), ), ) next unless kind && value case event when Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL @dids.add(value) if kind == "user_did" @hashtags.add(value.downcase) if kind == "hashtag" when Domain::Bluesky::MonitoredObject::REMOVED_NOTIFY_CHANNEL @dids.delete(value) if kind == "user_did" @hashtags.delete(value.downcase) if kind == "hashtag" end end end rescue Interrupt logger.info(format_tags("interrupt in notify thread")) ensure logger.info(format_tags("unlistening to postgres NOTIFYs")) conn.exec_query( "UNLISTEN #{Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL}", ) conn.exec_query( "UNLISTEN #{Domain::Bluesky::MonitoredObject::REMOVED_NOTIFY_CHANNEL}", ) end end end end