Improve logging and add rake task for monitoring user follows
- Enhanced logging format in scan_user_follows_job and monitor tasks using format_tags - Added new rake task 'bluesky:watch_follows' to monitor users that a given user follows - Improved log formatting consistency across Bluesky monitoring components
This commit is contained in:
@@ -8,7 +8,7 @@ class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base
|
||||
def perform(args)
|
||||
user = user_from_args!
|
||||
|
||||
last_follows_scan = user.follows_scans.last
|
||||
last_follows_scan = user.follows_scans.where(state: "completed").last
|
||||
if (ca = last_follows_scan&.created_at) && (ca > 1.month.ago) &&
|
||||
!force_scan?
|
||||
logger.info(
|
||||
@@ -32,7 +32,8 @@ class Domain::Bluesky::Job::ScanUserFollowsJob < Domain::Bluesky::Job::Base
|
||||
)
|
||||
end
|
||||
|
||||
last_followed_by_scan = user.followed_by_scans.last
|
||||
last_followed_by_scan =
|
||||
user.followed_by_scans.where(state: "completed").last
|
||||
if (ca = last_followed_by_scan&.created_at) && (ca > 1.month.ago) &&
|
||||
!force_scan?
|
||||
logger.info(
|
||||
|
||||
@@ -23,8 +23,18 @@ module Tasks::Bluesky
|
||||
.map(&:downcase),
|
||||
)
|
||||
|
||||
logger.info("dids: #{@dids.to_a.join(", ")}")
|
||||
logger.info("hashtags: #{@hashtags.to_a.join(", ")}")
|
||||
logger.info(
|
||||
format_tags("loaded dids", make_tags(num_dids: @dids.to_a.size)),
|
||||
)
|
||||
logger.info(
|
||||
format_tags(
|
||||
"loaded hashtags",
|
||||
make_tags(
|
||||
num_hashtags: @hashtags.to_a.size,
|
||||
hashtags: @hashtags.to_a.inspect,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
@bluesky_client =
|
||||
T.let(
|
||||
@@ -48,23 +58,29 @@ module Tasks::Bluesky
|
||||
|
||||
sig { void }
|
||||
def run
|
||||
@bluesky_client.on_connecting { logger.info("connecting...") }
|
||||
@bluesky_client.on_connect { logger.info("connected") }
|
||||
@bluesky_client.on_disconnect { logger.info("disconnected") }
|
||||
@bluesky_client.on_connecting do
|
||||
logger.info(format_tags("connecting..."))
|
||||
end
|
||||
@bluesky_client.on_connect { logger.info(format_tags("connected")) }
|
||||
@bluesky_client.on_disconnect { logger.info(format_tags("disconnected")) }
|
||||
@bluesky_client.on_reconnect do
|
||||
logger.info("connection lost, trying to reconnect...")
|
||||
logger.info(format_tags("connection lost, trying to reconnect..."))
|
||||
end
|
||||
@bluesky_client.on_timeout do
|
||||
logger.info("connection stalled, triggering a reconnect...")
|
||||
logger.info(
|
||||
format_tags("connection stalled, triggering a reconnect..."),
|
||||
)
|
||||
end
|
||||
@bluesky_client.on_message do |msg|
|
||||
handle_message(msg)
|
||||
if msg.seq % 10_000 == 0
|
||||
logger.info("saving cursor: #{msg.seq.to_s.bold}")
|
||||
logger.info(format_tags("saving cursor", make_tags(seq: msg.seq)))
|
||||
save_cursor(msg.seq)
|
||||
end
|
||||
end
|
||||
@bluesky_client.on_error { |e| logger.error("ERROR: #{e.to_s.red.bold}") }
|
||||
@bluesky_client.on_error do |e|
|
||||
logger.error(format_tags("ERROR", make_tags(error: e)))
|
||||
end
|
||||
|
||||
# Start the thread to listen to postgres NOTIFYs to add to the @dids set
|
||||
pg_notify_thread =
|
||||
@@ -72,12 +88,12 @@ module Tasks::Bluesky
|
||||
|
||||
@bluesky_client.connect
|
||||
rescue Interrupt
|
||||
logger.info("shutting down...")
|
||||
logger.info(format_tags("shutting down..."))
|
||||
@bluesky_client.disconnect
|
||||
@bluesky_client.close
|
||||
pg_notify_thread&.raise(Interrupt)
|
||||
pg_notify_thread&.join
|
||||
logger.info("shutdown complete")
|
||||
logger.info(format_tags("shutdown complete"))
|
||||
end
|
||||
|
||||
sig { params(msg: Skyfall::Jetstream::Message).void }
|
||||
@@ -100,7 +116,17 @@ module Tasks::Bluesky
|
||||
post_text = T.let(op.raw_record["text"], T.nilable(String)) || ""
|
||||
post_hashtags = extract_hashtags(post_text)
|
||||
has_monitored_hashtag = !(post_hashtags & @hashtags.to_a).empty?
|
||||
|
||||
log_args =
|
||||
(
|
||||
if from_monitored_did
|
||||
{ reason: "did", did: msg.did }
|
||||
else
|
||||
{
|
||||
reason: "hashtag",
|
||||
hashtags: (post_hashtags & @hashtags.to_a).inspect,
|
||||
}
|
||||
end
|
||||
)
|
||||
next unless from_monitored_did || has_monitored_hashtag
|
||||
|
||||
deferred_job_sink = DeferredJobSink.new(self.class)
|
||||
@@ -109,29 +135,24 @@ module Tasks::Bluesky
|
||||
T.let(op.raw_record["embed"], T.nilable(T::Hash[String, T.untyped]))
|
||||
next unless embed_data
|
||||
unless helper.should_process_post?(embed_data)
|
||||
reason =
|
||||
(
|
||||
if from_monitored_did
|
||||
"monitored DID"
|
||||
else
|
||||
"monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}"
|
||||
end
|
||||
)
|
||||
logger.info(
|
||||
"skipping post from #{reason}: #{op.uri} - #{embed_data["$type"]}",
|
||||
format_tags(
|
||||
"skipping post",
|
||||
make_tags(**log_args),
|
||||
make_tags(uri: op.uri),
|
||||
make_tags(type: embed_data["$type"]),
|
||||
),
|
||||
)
|
||||
next
|
||||
end
|
||||
|
||||
reason =
|
||||
(
|
||||
if from_monitored_did
|
||||
"monitored DID"
|
||||
else
|
||||
"monitored hashtag(s): #{(post_hashtags & @hashtags.to_a).join(", ")}"
|
||||
end
|
||||
logger.info(
|
||||
format_tags(
|
||||
"processing post",
|
||||
make_tags(**log_args),
|
||||
make_tags(uri: op.uri),
|
||||
),
|
||||
)
|
||||
logger.info("processing post from #{reason}: #{op.uri}")
|
||||
|
||||
post =
|
||||
Domain::Post::BlueskyPost.find_or_create_by!(at_uri: op.uri) do |post|
|
||||
@@ -145,7 +166,13 @@ module Tasks::Bluesky
|
||||
|
||||
helper.process_post_media(post, embed_data, msg.did)
|
||||
|
||||
logger.info("created bluesky post: `#{post.rkey}` / `#{post.at_uri}`")
|
||||
logger.info(
|
||||
format_tags(
|
||||
"created bluesky post",
|
||||
make_tags(rkey: post.rkey),
|
||||
make_tags(at_uri: post.at_uri),
|
||||
),
|
||||
)
|
||||
ensure
|
||||
deferred_job_sink.enqueue_deferred_jobs! if deferred_job_sink
|
||||
end
|
||||
@@ -170,7 +197,11 @@ module Tasks::Bluesky
|
||||
if creator.new_record?
|
||||
creator.handle = @resolver.get_validated_handle(did) || did
|
||||
logger.info(
|
||||
"created bluesky user: `#{creator.handle}` / `#{creator.did}`",
|
||||
format_tags(
|
||||
"created bluesky user",
|
||||
make_tags(handle: creator.handle),
|
||||
make_tags(did: creator.did),
|
||||
),
|
||||
)
|
||||
creator.save!
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user: creator)
|
||||
@@ -190,7 +221,7 @@ module Tasks::Bluesky
|
||||
|
||||
sig { void }
|
||||
def listen_to_postgres_notifies
|
||||
logger.info("listening to postgres NOTIFYs")
|
||||
logger.info(format_tags("listening to postgres NOTIFYs"))
|
||||
ActiveRecord::Base.connection_pool.with_connection do |conn|
|
||||
conn = T.cast(conn, ActiveRecord::ConnectionAdapters::PostgreSQLAdapter)
|
||||
conn.exec_query(
|
||||
@@ -203,7 +234,14 @@ module Tasks::Bluesky
|
||||
conn.raw_connection.wait_for_notify do |event, pid, payload|
|
||||
kind, value = payload.split("/", 2)
|
||||
logger.info(
|
||||
"NOTIFY: #{event} / pid: #{pid} / payload: #{payload} / kind: #{kind} / value: #{value}",
|
||||
format_tags(
|
||||
"NOTIFY",
|
||||
make_tags(event: event),
|
||||
make_tags(pid: pid),
|
||||
make_tags(payload: payload),
|
||||
make_tags(kind: kind),
|
||||
make_tags(value: value),
|
||||
),
|
||||
)
|
||||
next unless kind && value
|
||||
case event
|
||||
@@ -217,9 +255,9 @@ module Tasks::Bluesky
|
||||
end
|
||||
end
|
||||
rescue Interrupt
|
||||
logger.info("interrupt in notify thread...")
|
||||
logger.info(format_tags("interrupt in notify thread"))
|
||||
ensure
|
||||
logger.info("unlistening to postgres NOTIFYs")
|
||||
logger.info(format_tags("unlistening to postgres NOTIFYs"))
|
||||
conn.exec_query(
|
||||
"UNLISTEN #{Domain::Bluesky::MonitoredObject::ADDED_NOTIFY_CHANNEL}",
|
||||
)
|
||||
|
||||
@@ -2,16 +2,40 @@
|
||||
# frozen_string_literal: true
|
||||
T.bind(self, T.all(Rake::DSL, Object))
|
||||
|
||||
module BlueskyRakeHelpers
|
||||
extend T::Sig
|
||||
|
||||
sig { params(handle: String).returns(T.nilable(String)) }
|
||||
def self.resolve_did(handle)
|
||||
DIDKit::Resolver.new.resolve_handle(handle)&.did
|
||||
end
|
||||
|
||||
sig { params(did: String).returns(T.nilable(String)) }
|
||||
def self.resolve_handle(did)
|
||||
DIDKit::Resolver.new.resolve_did(did).get_validated_handle
|
||||
end
|
||||
|
||||
sig { returns(T.nilable(Domain::User::BlueskyUser)) }
|
||||
def self.user_from_env
|
||||
if (handle = ENV["handle"])
|
||||
did = self.resolve_did(handle)
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = handle
|
||||
end
|
||||
elsif (did = ENV["did"])
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = resolve_handle(did)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
namespace :bluesky do
|
||||
desc "Start the Bluesky monitor"
|
||||
task monitor: :environment do
|
||||
Tasks::Bluesky::Monitor.new.run
|
||||
end
|
||||
|
||||
def resolve_did(handle)
|
||||
DIDKit::Resolver.new.resolve_handle(handle)&.did
|
||||
end
|
||||
|
||||
desc "Add a DID or hashtag to the Bluesky monitor"
|
||||
task add: :environment do
|
||||
if (hashtag = ENV["hashtag"])
|
||||
@@ -20,26 +44,10 @@ namespace :bluesky do
|
||||
|
||||
Domain::Bluesky::MonitoredObject.create!(value: hashtag, kind: :hashtag)
|
||||
puts "Added hashtag: ##{hashtag}"
|
||||
elsif (handle = ENV["handle"])
|
||||
did = resolve_did(handle)
|
||||
puts "resolved did: #{did}"
|
||||
raise "did is required" if did.blank?
|
||||
user =
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = handle
|
||||
end
|
||||
elsif (user = BlueskyRakeHelpers.user_from_env)
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
|
||||
Domain::Bluesky::Job::ScanPostsJob.perform_later(user:)
|
||||
Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did)
|
||||
elsif (did = ENV["did"])
|
||||
raise "did is required" if did.blank?
|
||||
user =
|
||||
Domain::User::BlueskyUser.find_or_create_by!(did:) do |user|
|
||||
user.handle = nil
|
||||
end
|
||||
Domain::Bluesky::Job::ScanUserJob.perform_later(user:)
|
||||
Domain::Bluesky::Job::ScanPostsJob.perform_later(user:)
|
||||
Domain::Bluesky::MonitoredObject.create!(value: did, kind: :user_did)
|
||||
Domain::Bluesky::MonitoredObject.create!(value: user.did, kind: :user_did)
|
||||
else
|
||||
raise "hashtag, handle, or did is required"
|
||||
end
|
||||
@@ -57,7 +65,7 @@ namespace :bluesky do
|
||||
)&.destroy!
|
||||
puts "Removed hashtag: ##{hashtag}"
|
||||
elsif (handle = ENV["handle"])
|
||||
did = resolve_did(handle)
|
||||
did = BlueskyRakeHelpers.resolve_did(handle)
|
||||
puts "resolved did: #{did}"
|
||||
raise "did is required" if did.blank?
|
||||
|
||||
@@ -76,4 +84,24 @@ namespace :bluesky do
|
||||
raise "hashtag, handle, or did is required"
|
||||
end
|
||||
end
|
||||
|
||||
desc "Watch users that user follows"
|
||||
task watch_follows: :environment do
|
||||
user =
|
||||
BlueskyRakeHelpers.user_from_env ||
|
||||
raise("user is required, use `handle` or `did`")
|
||||
Domain::Bluesky::Job::ScanUserFollowsJob.perform_now(user:)
|
||||
user.reload
|
||||
user.user_user_follows_from.each do |follow|
|
||||
to_user = T.cast(follow.to, Domain::User::BlueskyUser)
|
||||
model =
|
||||
Domain::Bluesky::MonitoredObject.create(
|
||||
value: to_user.did!,
|
||||
kind: :user_did,
|
||||
)
|
||||
if model.persisted?
|
||||
puts "added #{to_user.did} / #{to_user.handle} to monitor"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user