add periodic favs scanner
This commit is contained in:
67
app/lib/domain/fa/enqueue_due_user_favs_scans.rb
Normal file
67
app/lib/domain/fa/enqueue_due_user_favs_scans.rb
Normal file
@@ -0,0 +1,67 @@
|
||||
# typed: strict
|
||||
class Domain::Fa::EnqueueDueUserFavsScans
|
||||
extend T::Sig
|
||||
include HasColorLogger
|
||||
|
||||
QUEUE_HIGH_WATER_MARK = 100
|
||||
QUEUE_LOW_WATER_MARK = 50
|
||||
DELAY_TIME = T.let(10.seconds, ActiveSupport::Duration)
|
||||
class DateHelper
|
||||
extend ActionView::Helpers::DateHelper
|
||||
end
|
||||
|
||||
sig { void }
|
||||
def self.run
|
||||
loop do
|
||||
while (qs = queue_size) > QUEUE_LOW_WATER_MARK
|
||||
logger.info(
|
||||
"waiting for queue to drop to #{QUEUE_LOW_WATER_MARK} (currently #{qs})",
|
||||
)
|
||||
sleep DELAY_TIME.in_seconds
|
||||
end
|
||||
logger.info("queue size is #{queue_size}")
|
||||
|
||||
while (qs = queue_size) < QUEUE_LOW_WATER_MARK
|
||||
to_enqueue = QUEUE_HIGH_WATER_MARK - qs
|
||||
logger.info("queue is at #{qs}, enqueuing #{to_enqueue} due favs scans")
|
||||
Domain::User::FaUser
|
||||
.where(state: "ok")
|
||||
.order(Arel.sql "json_attributes->>'scanned_favs_at' asc nulls first")
|
||||
.limit(to_enqueue)
|
||||
.each do |user|
|
||||
logger.tagged(make_arg_tag(user)) do
|
||||
logger.info(
|
||||
format_tags(
|
||||
make_tag(
|
||||
"scanned_favs_at",
|
||||
time_ago_in_words(user.scanned_favs_at),
|
||||
),
|
||||
"enqueuing favs job",
|
||||
),
|
||||
)
|
||||
Domain::Fa::Job::FavsJob.perform_later({ user: })
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
logger.info("queue size is #{queue_size}")
|
||||
logger.info("sleeping for #{DELAY_TIME.in_seconds} seconds")
|
||||
sleep DELAY_TIME.in_seconds
|
||||
end
|
||||
end
|
||||
|
||||
sig { returns(Integer) }
|
||||
def self.queue_size
|
||||
GoodJob::Job.where(
|
||||
job_class: "Domain::Fa::Job::FavsJob",
|
||||
performed_at: nil,
|
||||
error: nil,
|
||||
).count
|
||||
end
|
||||
|
||||
sig { params(time: T.nilable(ActiveSupport::TimeWithZone)).returns(String) }
|
||||
def self.time_ago_in_words(time)
|
||||
return "never" if time.nil?
|
||||
"#{DateHelper.time_ago_in_words(time)} ago"
|
||||
end
|
||||
end
|
||||
@@ -4,6 +4,7 @@ require "active_support/concern"
|
||||
module HasColorLogger
|
||||
extend T::Sig
|
||||
extend T::Helpers
|
||||
requires_ancestor { Kernel }
|
||||
|
||||
sig { params(sink: T.any(IO, StringIO)).returns(Module) }
|
||||
def self.[](sink)
|
||||
@@ -23,61 +24,88 @@ module HasColorLogger
|
||||
|
||||
sig { params(tag_name: String, tag_value: T.untyped).returns(String) }
|
||||
def make_tag(tag_name, tag_value)
|
||||
tag_value_str = tag_value ? tag_value.to_s.bold : "(nil)".italic
|
||||
"#{tag_name}: #{tag_value_str}"
|
||||
self.class.make_tag(tag_name, tag_value)
|
||||
end
|
||||
|
||||
sig { params(tags: String).returns(String) }
|
||||
def format_tags(*tags)
|
||||
format_tags_arr(tags)
|
||||
self.class.format_tags(*T.unsafe(tags))
|
||||
end
|
||||
|
||||
sig { params(tags: T::Array[String]).returns(String) }
|
||||
def format_tags_arr(tags)
|
||||
tags.map { |tag| "[#{tag}]" }.join(" ")
|
||||
self.class.format_tags_arr(tags)
|
||||
end
|
||||
|
||||
sig do
|
||||
params(arg: T.untyped, name: T.nilable(String)).returns(T::Array[String])
|
||||
end
|
||||
def make_arg_tag(arg, name: nil)
|
||||
tags = []
|
||||
return tags if arg.nil?
|
||||
self.class.make_arg_tag(arg, name:)
|
||||
end
|
||||
|
||||
case arg
|
||||
when Domain::User
|
||||
name ||= "user"
|
||||
prefix, attr = arg.class.param_prefix_and_attribute
|
||||
tags << make_tag("#{name}.kind", prefix)
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.#{attr}", arg.send(attr))
|
||||
when Domain::Post
|
||||
name ||= "post"
|
||||
prefix, attr = arg.class.param_prefix_and_attribute
|
||||
tags << make_tag("#{name}.kind", prefix)
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.#{attr}", arg.send(attr))
|
||||
when Domain::PostFile
|
||||
name ||= "file"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.state", arg.state)
|
||||
when Domain::UserAvatar
|
||||
name ||= "avatar"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.state", arg.state)
|
||||
when Domain::PostGroup
|
||||
name ||= "pool"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
when HttpLogEntry
|
||||
name ||= "hle"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.code", arg.status_code)
|
||||
else
|
||||
tags << make_tag("unknown", arg.class.name)
|
||||
module ClassMethods
|
||||
extend T::Sig
|
||||
|
||||
sig { params(tag_name: String, tag_value: T.untyped).returns(String) }
|
||||
def make_tag(tag_name, tag_value)
|
||||
tag_value_str = tag_value ? tag_value.to_s.bold : "(nil)".italic
|
||||
"#{tag_name}: #{tag_value_str}"
|
||||
end
|
||||
|
||||
tags
|
||||
sig { params(tags: String).returns(String) }
|
||||
def format_tags(*tags)
|
||||
format_tags_arr(tags)
|
||||
end
|
||||
|
||||
sig { params(tags: T::Array[String]).returns(String) }
|
||||
def format_tags_arr(tags)
|
||||
tags.map { |tag| "[#{tag}]" }.join(" ")
|
||||
end
|
||||
|
||||
sig do
|
||||
params(arg: T.untyped, name: T.nilable(String)).returns(T::Array[String])
|
||||
end
|
||||
def make_arg_tag(arg, name: nil)
|
||||
tags = []
|
||||
return tags if arg.nil?
|
||||
|
||||
case arg
|
||||
when Domain::User
|
||||
name ||= "user"
|
||||
prefix, attr = arg.class.param_prefix_and_attribute
|
||||
tags << make_tag("#{name}.kind", prefix)
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.#{attr}", arg.send(attr))
|
||||
when Domain::Post
|
||||
name ||= "post"
|
||||
prefix, attr = arg.class.param_prefix_and_attribute
|
||||
tags << make_tag("#{name}.kind", prefix)
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.#{attr}", arg.send(attr))
|
||||
when Domain::PostFile
|
||||
name ||= "file"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.state", arg.state)
|
||||
when Domain::UserAvatar
|
||||
name ||= "avatar"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.state", arg.state)
|
||||
when Domain::PostGroup
|
||||
name ||= "pool"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
when HttpLogEntry
|
||||
name ||= "hle"
|
||||
tags << make_tag("#{name}.id", arg.id)
|
||||
tags << make_tag("#{name}.code", arg.status_code)
|
||||
else
|
||||
tags << make_tag("unknown", arg.class.name)
|
||||
end
|
||||
|
||||
tags
|
||||
end
|
||||
end
|
||||
mixes_in_class_methods(ClassMethods)
|
||||
|
||||
# by default, write to stdout
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
@@ -291,4 +291,9 @@ namespace :fa do
|
||||
batch_size: ENV["batch_size"]&.to_i,
|
||||
).run
|
||||
end
|
||||
|
||||
desc "Enqueue pending favs jobs"
|
||||
task enqueue_pending_favs: :environment do
|
||||
Domain::Fa::EnqueueDueUserFavsScans.run
|
||||
end
|
||||
end
|
||||
|
||||
16
sorbet/rbi/dsl/domain/fa/enqueue_due_user_favs_scans.rbi
generated
Normal file
16
sorbet/rbi/dsl/domain/fa/enqueue_due_user_favs_scans.rbi
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
# typed: true
|
||||
|
||||
# DO NOT EDIT MANUALLY
|
||||
# This is an autogenerated file for dynamic methods in `Domain::Fa::EnqueueDueUserFavsScans`.
|
||||
# Please instead update this file by running `bin/tapioca dsl Domain::Fa::EnqueueDueUserFavsScans`.
|
||||
|
||||
|
||||
class Domain::Fa::EnqueueDueUserFavsScans
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
|
||||
class << self
|
||||
sig { returns(ColorLogger) }
|
||||
def logger; end
|
||||
end
|
||||
end
|
||||
@@ -43,16 +43,17 @@ RSpec.configure do |config|
|
||||
end
|
||||
end
|
||||
|
||||
config.before(:all) do
|
||||
# safeguard against running this test in a non-test environment
|
||||
root_dir =
|
||||
File.absolute_path(Rails.application.config_for("blob_file_location"))
|
||||
if root_dir.match?(%r{^#{Rails.root}/tmp})
|
||||
FileUtils.rm_rf(root_dir)
|
||||
else
|
||||
raise "blob_file_location is not in the tmp directory"
|
||||
end
|
||||
end
|
||||
# this breaks parallel tests because it's not thread safe
|
||||
# config.before(:all) do
|
||||
# # safeguard against running this test in a non-test environment
|
||||
# root_dir =
|
||||
# File.absolute_path(Rails.application.config_for("blob_file_location"))
|
||||
# if root_dir.match?(%r{^#{Rails.root}/tmp})
|
||||
# FileUtils.rm_rf(root_dir)
|
||||
# else
|
||||
# raise "blob_file_location is not in the tmp directory"
|
||||
# end
|
||||
# end
|
||||
|
||||
# rspec-expectations config goes here. You can use an alternate
|
||||
# assertion/expectation library such as wrong or the stdlib/minitest
|
||||
|
||||
Reference in New Issue
Block a user