fix progress saving for for-user backfill_favs_and_dates_task
This commit is contained in:
@@ -123,7 +123,7 @@ class Domain::Fa::Parser::Page < Domain::Fa::Parser::Base
|
||||
sig { returns(ActiveSupport::TimeZone) }
|
||||
def logged_in_user_tz
|
||||
case logged_in_user
|
||||
when "zzreg"
|
||||
when "zzreg", "cottoniq"
|
||||
ActiveSupport::TimeZone.new("America/Los_Angeles")
|
||||
when "ddwhatnow", "vipvillageworker"
|
||||
ActiveSupport::TimeZone.new("America/New_York")
|
||||
|
||||
@@ -25,14 +25,24 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
start_at: T.nilable(String),
|
||||
log_sink: T.any(IO, StringIO),
|
||||
user_url_name: T.nilable(String),
|
||||
batch_size: T.nilable(Integer),
|
||||
).void
|
||||
end
|
||||
def initialize(mode:, start_at:, log_sink: $stderr, user_url_name: nil)
|
||||
def initialize(
|
||||
mode:,
|
||||
start_at:,
|
||||
log_sink: $stderr,
|
||||
user_url_name: nil,
|
||||
batch_size: nil
|
||||
)
|
||||
super(log_sink:)
|
||||
@mode = mode
|
||||
@start_at = T.let(get_progress(start_at&.to_s)&.to_i, T.nilable(Integer))
|
||||
@batch_size = T.let(batch_size || 32, Integer)
|
||||
|
||||
if @mode == Mode::ForUser && user_url_name.present?
|
||||
if @mode == Mode::ForUser
|
||||
unless user_url_name.present?
|
||||
raise("user_url_name is required for mode: #{@mode}")
|
||||
end
|
||||
@user =
|
||||
T.let(
|
||||
Domain::User::FaUser.find_by(url_name: user_url_name),
|
||||
@@ -40,6 +50,8 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
)
|
||||
raise "user not found for #{user_url_name}" unless @user
|
||||
end
|
||||
|
||||
@start_at = T.let(get_progress(start_at&.to_s)&.to_i, T.nilable(Integer))
|
||||
end
|
||||
|
||||
class Stats < T::ImmutableStruct
|
||||
@@ -65,6 +77,15 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
def self.zero
|
||||
Stats.new()
|
||||
end
|
||||
|
||||
sig { returns(String) }
|
||||
def to_s
|
||||
[
|
||||
"[user pages: #{num_user_pages}]",
|
||||
"[favs pages: #{num_favs_pages}]",
|
||||
"[total: #{favs_dates_stats.to_s}]",
|
||||
].join(" ")
|
||||
end
|
||||
end
|
||||
|
||||
sig { void }
|
||||
@@ -99,7 +120,7 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
|
||||
query
|
||||
.includes(:response)
|
||||
.in_batches(start: @start_at, of: 32) do |batch|
|
||||
.in_batches(start: @start_at, of: @batch_size) do |batch|
|
||||
batch = batch.to_a
|
||||
batch_stats = Stats.zero
|
||||
|
||||
@@ -134,7 +155,7 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
break if interrupted?
|
||||
end
|
||||
|
||||
log("total stats: #{total_stats}")
|
||||
log("total stats: #{total_stats.to_s}")
|
||||
end
|
||||
|
||||
sig { params(hle: HttpLogEntry).returns(Stats) }
|
||||
@@ -145,8 +166,10 @@ class Tasks::Fa::BackfillFavsAndDatesTask < Tasks::InterruptableTask
|
||||
user = Domain::User::FaUser.find_by(url_name:)
|
||||
raise "no user found for #{url_name}" unless user
|
||||
|
||||
page_parser = Domain::Fa::Parser::Page.from_log_entry(hle)
|
||||
page_parser =
|
||||
Domain::Fa::Parser::Page.from_log_entry(hle, require_logged_in: false)
|
||||
return Stats.zero if page_parser.account_disabled?
|
||||
return Stats.zero if page_parser.logged_in_user.blank?
|
||||
|
||||
ReduxApplicationRecord.transaction do
|
||||
case hle.uri_path
|
||||
|
||||
@@ -31,9 +31,11 @@ module Tasks
|
||||
end
|
||||
|
||||
if value
|
||||
log("resuming from saved progress: #{value}")
|
||||
log(
|
||||
"[progress key: #{progress_key.bold}] [resuming from: #{value.to_s.bold}]",
|
||||
)
|
||||
else
|
||||
log("no saved progress found, starting from beginning")
|
||||
log("[progress key: #{progress_key.bold}] [no saved progress]")
|
||||
end
|
||||
|
||||
value
|
||||
|
||||
@@ -137,10 +137,12 @@ namespace :fa do
|
||||
mode = ENV["mode"] || "both"
|
||||
mode = Tasks::Fa::BackfillFavsAndDatesTask::Mode.deserialize(mode)
|
||||
user_url_name = ENV["user_url_name"]
|
||||
batch_size = ENV["batch_size"]&.to_i
|
||||
Tasks::Fa::BackfillFavsAndDatesTask.new(
|
||||
mode:,
|
||||
start_at:,
|
||||
user_url_name:,
|
||||
batch_size:,
|
||||
).run
|
||||
end
|
||||
|
||||
@@ -173,12 +175,12 @@ namespace :fa do
|
||||
# end
|
||||
|
||||
desc "Enqueue pending favs jobs"
|
||||
task enqueue_pending_favs: :environment do
|
||||
task enqueue_due_user_favs: :environment do
|
||||
Tasks::Fa::EnqueueDueUserFavsScansTask.new.run
|
||||
end
|
||||
|
||||
desc "Enqueue pending page jobs"
|
||||
task enqueue_pending_user_pages: :environment do
|
||||
task enqueue_due_user_pages: :environment do
|
||||
Tasks::Fa::EnqueueDueUserPageScansTask.new.run
|
||||
end
|
||||
|
||||
|
||||
@@ -77,9 +77,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
|
||||
start_at: start_at_id,
|
||||
)
|
||||
|
||||
expect(log_sink.string).to include(
|
||||
"resuming from saved progress: #{start_at_id}",
|
||||
)
|
||||
expect(log_sink.string).to include("resuming from:")
|
||||
end
|
||||
|
||||
context "with start_at='last' and existing progress" do
|
||||
@@ -98,9 +96,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
|
||||
Tasks::CreatePostFileFingerprintsTask::Mode::PostFileDescending,
|
||||
start_at: "last",
|
||||
)
|
||||
expect(log_sink.string).to include(
|
||||
"resuming from saved progress: #{post_files[2].id}",
|
||||
)
|
||||
expect(log_sink.string).to include("resuming from:")
|
||||
end
|
||||
end
|
||||
|
||||
@@ -112,9 +108,7 @@ RSpec.describe Tasks::CreatePostFileFingerprintsTask do
|
||||
start_at: "last",
|
||||
)
|
||||
|
||||
expect(log_sink.string).to include(
|
||||
"no saved progress found, starting from beginning",
|
||||
)
|
||||
expect(log_sink.string).to include("no saved progress")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user