diff --git a/app/jobs/domain/bluesky/job/scan_user_follows_job.rb b/app/jobs/domain/bluesky/job/scan_user_follows_job.rb index acec0703..a1434bb9 100644 --- a/app/jobs/domain/bluesky/job/scan_user_follows_job.rb +++ b/app/jobs/domain/bluesky/job/scan_user_follows_job.rb @@ -8,7 +8,7 @@ class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base def perform(args) user = user_from_args! - last_follows_scan = user.follows_scans.last + last_follows_scan = user.follows_scans.where(state: "completed").last if (ca = last_follows_scan&.created_at) && (ca > 1.month.ago) && !force_scan? logger.info( @@ -32,7 +32,8 @@ class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base ) end - last_followed_by_scan = user.followed_by_scans.last + last_followed_by_scan = + user.followed_by_scans.where(state: "completed").last if (ca = last_followed_by_scan&.created_at) && (ca > 1.month.ago) && !force_scan? logger.info( diff --git a/app/lib/tasks/bluesky/monitor.rb b/app/lib/tasks/bluesky/monitor.rb index 0b267074..8a5a8aa7 100644 --- a/app/lib/tasks/bluesky/monitor.rb +++ b/app/lib/tasks/bluesky/monitor.rb @@ -23,8 +23,18 @@ module Tasks::Bluesky .map(&:downcase), ) - logger.info("dids: #{@dids.to_a.join(", ")}") - logger.info("hashtags: #{@hashtags.to_a.join(", ")}") + logger.info( + format_tags("loaded dids", make_tags(num_dids: @dids.to_a.size)), + ) + logger.info( + format_tags( + "loaded hashtags", + make_tags( + num_hashtags: @hashtags.to_a.size, + hashtags: @hashtags.to_a.inspect, + ), + ), + ) @bluesky_client = T.let( @@ -48,23 +58,29 @@ module Tasks::Bluesky sig { void } def run - @bluesky_client.on_connecting { logger.info("connecting...") } - @bluesky_client.on_connect { logger.info("connected") } - @bluesky_client.on_disconnect { logger.info("disconnected") } + @bluesky_client.on_connecting do + logger.info(format_tags("connecting...")) + end + @bluesky_client.on_connect { logger.info(format_tags("connected")) } + @bluesky_client.on_disconnect { logger.info(format_tags("disconnected")) } @bluesky_client.on_reconnect do - logger.info("connection lost, trying to reconnect...") + logger.info(format_tags("connection lost, trying to reconnect...")) end @bluesky_client.on_timeout do - logger.info("connection stalled, triggering a reconnect...") + logger.info( + format_tags("connection stalled, triggering a reconnect..."), + ) end @bluesky_client.on_message do |msg| handle_message(msg) if msg.seq % 10_000 == 0 - logger.info("saving cursor: #{msg.seq.to_s.bold}") + logger.info(format_tags("saving cursor", make_tags(seq: msg.seq))) save_cursor(msg.seq) end end - @bluesky_client.on_error { |e| logger.error("ERROR: #{e.to_s.red.bold}") } + @bluesky_client.on_error do |e| + logger.error(format_tags("ERROR", make_tags(error: e))) + end # Start the thread to listen to postgres NOTIFYs to add to the @dids set pg_notify_thread = @@ -72,12 +88,12 @@ module Tasks::Bluesky @bluesky_client.connect rescue Interrupt - logger.info("shutting down...") + logger.info(format_tags("shutting down...")) @bluesky_client.disconnect @bluesky_client.close pg_notify_thread&.raise(Interrupt) pg_notify_thread&.join - logger.info("shutdown complete") + logger.info(format_tags("shutdown complete")) end sig { params(msg: Skyfall::Jetstream::Message).void } @@ -100,7 +116,17 @@ module Tasks::Bluesky post_text = T.let(op.raw_record["text"], T.nilable(String)) || "" post_hashtags = extract_hashtags(post_text) has_monitored_hashtag = !(post_hashtags & @hashtags.to_a).empty? - + log_args = + ( + if from_monitored_did + { reason: "did", did: msg.did } + else + { + reason: "hashtag", + hashtags: (post_hashtags & @hashtags.to_a).inspect, + } + end + ) next unless from_monitored_did || has_monitored_hashtag deferred_job_sink = DeferredJobSink.new(self.class) @@ -109,29 +135,24 @@ module Tasks::Bluesky T.let(op.raw_record["embed"], T.nilable(T::Hash[String, T.untyped])) next unless embed_data unless helper.should_process_post?(embed_data) - reason = - ( - if from_monitored_did - "monitored DID" - else - "monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}" - end - ) logger.info( - "skipping post from #{reason}: #{op.uri} - #{embed_data["$type"]}", + format_tags( + "skipping post", + make_tags(**log_args), + make_tags(uri: op.uri), + make_tags(type: embed_data["$type"]), + ), ) next end - reason = - ( - if from_monitored_did - "monitored DID" - else - "monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}" - end - ) - logger.info("processing post from #{reason}: #{op.uri}") + logger.info( + format_tags( + "processing post", + make_tags(**log_args), + make_tags(uri: op.uri), + ), + ) post = Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post| @@ -145,7 +166,13 @@ module Tasks::Bluesky helper.process_post_media(post, embed_data, msg.did) - logger.info("created bluesky post: `#{post.rkey}` / `#{post.at_uri}`") + logger.info( + format_tags( + "created bluesky post", + make_tags(rkey: post.rkey), + make_tags(at_uri: post.at_uri), + ), + ) ensure deferred_job_sink.enqueue_deferred_jobs! if deferred_job_sink end @@ -170,7 +197,11 @@ module Tasks::Bluesky if creator.new_record? creator.handle = @resolver.get_validated_handle(did) || did logger.info( - "created bluesky user: `#{creator.handle}` / `#{creator.did}`", + format_tags( + "created bluesky user", + make_tags(handle: creator.handle), + make_tags(did: creator.did), + ), ) creator.save! Domain::Bluesky::Job::ScanUserJob.perform_later(user: creator) @@ -190,7 +221,7 @@ module Tasks::Bluesky sig { void } def listen_to_postgres_notifies - logger.info("listening to postgres NOTIFYs") + logger.info(format_tags("listening to postgres NOTIFYs")) ActiveRecord::Base.connection_pool.with_connection do |conn| conn = T.cast(conn, ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) conn.exec_query( @@ -203,7 +234,14 @@ module Tasks::Bluesky conn.raw_connection.wait_for_notify do |event, pid, payload| kind, value = payload.split("/", 2) logger.info( - "NOTIFY: #{event} / pid: #{pid} / payload: #{payload} / kind: #{kind} / value: #{value}", + format_tags( + "NOTIFY", + make_tags(event: event), + make_tags(pid: pid), + make_tags(payload: payload), + make_tags(kind: kind), + make_tags(value: value), + ), ) next unless kind && value case event @@ -217,9 +255,9 @@ module Tasks::Bluesky end end rescue Interrupt - logger.info("interrupt in notify thread...") + logger.info(format_tags("interrupt in notify thread")) ensure - logger.info("unlistening to postgres NOTIFYs") + logger.info(format_tags("unlistening to postgres NOTIFYs")) conn.exec_query( "UNLISTEN #{Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL}", ) diff --git a/rake/bluesky.rake b/rake/bluesky.rake index 8e64723a..cbadde11 100644 --- a/rake/bluesky.rake +++ b/rake/bluesky.rake @@ -2,16 +2,40 @@ # frozen_string_literal: true T.bind(self, T.all(Rake::DSL, Object)) +module BlueskyRakeHelpers + extend T::Sig + + sig { params(handle: String).returns(T.nilable(String)) } + def self.resolve_did(handle) + DIDKit::Resolver.new.resolve_handle(handle)&.did + end + + sig { params(did: String).returns(T.nilable(String)) } + def self.resolve_handle(did) + DIDKit::Resolver.new.resolve_did(did).get_validated_handle + end + + sig { returns(T.nilable(Domain::User::BlueskyUser)) } + def self.user_from_env + if (handle = ENV["handle"]) + did = self.resolve_did(handle) + Domain::User::BlueskyUser.find_or_create_by!(did:) do |user| + user.handle = handle + end + elsif (did = ENV["did"]) + Domain::User::BlueskyUser.find_or_create_by!(did:) do |user| + user.handle = resolve_handle(did) + end + end + end +end + namespace :bluesky do desc "Start the Bluesky monitor" task monitor: :environment do Tasks::Bluesky::Monitor.new.run end - def resolve_did(handle) - DIDKit::Resolver.new.resolve_handle(handle)&.did - end - desc "Add a DID or hashtag to the Bluesky monitor" task add: :environment do if (hashtag = ENV["hashtag"]) @@ -20,26 +44,10 @@ namespace :bluesky do Domain::Bluesky::MonitoredObject.create!(value: hashtag, kind: :hashtag) puts "Added hashtag: ##{hashtag}" - elsif (handle = ENV["handle"]) - did = resolve_did(handle) - puts "resolved did: #{did}" - raise "did is required" if did.blank? - user = - Domain::User::BlueskyUser.find_or_create_by!(did:) do |user| - user.handle = handle - end + elsif (user = BlueskyRakeHelpers.user_from_env) Domain::Bluesky::Job::ScanUserJob.perform_later(user:) Domain::Bluesky::Job::ScanPostsJob.perform_later(user:) - Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did) - elsif (did = ENV["did"]) - raise "did is required" if did.blank? - user = - Domain::User::BlueskyUser.find_or_create_by!(did:) do |user| - user.handle = nil - end - Domain::Bluesky::Job::ScanUserJob.perform_later(user:) - Domain::Bluesky::Job::ScanPostsJob.perform_later(user:) - Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did) + Domain::Bluesky::MonitoredObject.create!(value: user.did, kind: :user_did) else raise "hashtag, handle, or did is required" end @@ -57,7 +65,7 @@ namespace :bluesky do )&.destroy! puts "Removed hashtag: ##{hashtag}" elsif (handle = ENV["handle"]) - did = resolve_did(handle) + did = BlueskyRakeHelpers.resolve_did(handle) puts "resolved did: #{did}" raise "did is required" if did.blank? @@ -76,4 +84,24 @@ namespace :bluesky do raise "hashtag, handle, or did is required" end end + + desc "Watch users that user follows" + task watch_follows: :environment do + user = + BlueskyRakeHelpers.user_from_env || + raise("user is required, use `handle` or `did`") + Domain::Bluesky::Job::ScanUserFollowsJob.perform_now(user:) + user.reload + user.user_user_follows_from.each do |follow| + to_user = T.cast(follow.to, Domain::User::BlueskyUser) + model = + Domain::Bluesky::MonitoredObject.create( + value: to_user.did!, + kind: :user_did, + ) + if model.persisted? + puts "added #{to_user.did} / #{to_user.handle} to monitor" + end + end + end end