tw_id for users
This commit is contained in:
@@ -20,12 +20,12 @@ class Domain::Twitter::Job::TwitterJobBase < Scraper::JobBase
|
||||
def find_or_intitialize_user_from_args(args)
|
||||
if args[:user]
|
||||
args[:user]
|
||||
elsif args[:id].present?
|
||||
Domain::Twitter::User.find_or_initialize_by(id: args[:id])
|
||||
elsif args[:tw_id].present?
|
||||
Domain::Twitter::User.find_or_initialize_by(tw_id: args[:tw_id])
|
||||
elsif args[:name].present?
|
||||
Domain::Twitter::User.find_or_initialize_by(name: args[:name])
|
||||
else
|
||||
fatal_error("arg 'name' or 'id' is required if arg 'user' is nil")
|
||||
fatal_error("arg 'name' or 'tw_id' is required if arg 'user' is nil")
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -5,10 +5,11 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
ignore_signature_args [:caused_by_entry]
|
||||
|
||||
def perform(args)
|
||||
@name = args[:name]
|
||||
@user = find_or_intitialize_user_from_args(args)
|
||||
logger.prefix = "[user: #{(@user.name || args[:name])&.bold}]"
|
||||
logger.prefix = proc { "[user: #{(@user.name || args[:name])&.bold}]" }
|
||||
|
||||
@proxy_name = Rails.application.config.x.proxy_name
|
||||
@proxy_name = Rails.application.config.x.proxy_name.to_s
|
||||
@caused_by_entry = args[:caused_by_entry]
|
||||
@force_scan = !!args[:force_scan]
|
||||
@first_twitter_caused_by = nil
|
||||
@@ -29,7 +30,7 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
end
|
||||
|
||||
gallery_dl_client.start_twitter_user(
|
||||
@user.name, caused_by_entry: @caused_by_entry,
|
||||
@name || @user.name, caused_by_entry: @caused_by_entry,
|
||||
)
|
||||
|
||||
while true
|
||||
@@ -45,6 +46,7 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
break
|
||||
when GDL::HttpRequestEvent
|
||||
@first_twitter_caused_by ||= event.log_entry
|
||||
maybe_extract_user_info(event)
|
||||
when GDL::TweetEvent
|
||||
update_user_from_tweet(event)
|
||||
break if handle_tweet_event(event) == :break
|
||||
@@ -66,15 +68,52 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
rescue => e
|
||||
if e.message =~ /Tweets are protected/
|
||||
@user.state = "error"
|
||||
@user.state_detail[:error] = e.message
|
||||
@user.tweets_protected_error_proxies << @proxy_name
|
||||
@user.tweets_protected_error_proxies.uniq!
|
||||
@user.tweets_protected_error_proxies.sort!
|
||||
@user.save!
|
||||
raise "tweets are protected, tried on proxies: #{@user.tweets_protected_error_proxies}"
|
||||
elsif e.message =~ /Requested user could not be found/
|
||||
logger.error("user could not be found: #{@user.name}")
|
||||
@user.state = "error"
|
||||
@user.state_detail[:error] = e.message
|
||||
@user.save!
|
||||
# for users which can't be found, no point in attempting to re-scan
|
||||
return
|
||||
end
|
||||
raise e
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def maybe_extract_user_info(http_event)
|
||||
return if @user&.id
|
||||
return unless http_event.response_code == 200
|
||||
return unless http_event.response_headers[:'content-type'].starts_with?("application/json")
|
||||
json = JSON.parse(http_event.body)
|
||||
return unless json
|
||||
typename = json.dig("data", "user", "result", "__typename")
|
||||
return unless typename == "User"
|
||||
user_id = json.dig("data", "user", "result", "rest_id")
|
||||
return unless user_id
|
||||
user_id = user_id.to_i
|
||||
|
||||
existing = Domain::Twitter::User.find_by(tw_id: user_id)
|
||||
if existing
|
||||
logger.info "found existing user model"
|
||||
@user = existing
|
||||
@updated_user_from_tweet = false
|
||||
else
|
||||
if @user.tw_id
|
||||
raise "tw_id mismatch" unless @user.tw_id == user_id
|
||||
else
|
||||
logger.info "extracted tw_id for user"
|
||||
@user.tw_id = user_id
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def handle_tweet_event(tweet_event)
|
||||
tweet_hash = tweet_event.tweet
|
||||
tweet = Domain::Twitter::Tweet.find_by(id: tweet_hash[:id])
|
||||
@@ -83,6 +122,8 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
if tweet
|
||||
logger.info("found existing tweet: #{tweet.id}")
|
||||
return nil if @force_scan
|
||||
# if we've done a full timeline scan, and now encountered a tweet we've
|
||||
# already seen, stop scanning
|
||||
return :break if @user.scanned_timeline_at
|
||||
return nil
|
||||
end
|
||||
@@ -131,12 +172,8 @@ class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::Twitte
|
||||
return if @updated_user_from_tweet
|
||||
|
||||
author = tweet_event.author
|
||||
|
||||
if @user.id
|
||||
raise unless @user.id == author[:id]
|
||||
else
|
||||
@user.id = author[:id]
|
||||
end
|
||||
raise("no tw_id on user") unless @user.tw_id
|
||||
raise("tw_id mismatch: #{@user.tw_id} != #{author[:id]}") unless @user.tw_id == author[:id]
|
||||
|
||||
@user.name = author[:name]
|
||||
@user.nick = author[:nick]
|
||||
|
||||
@@ -37,7 +37,11 @@ class ColorLogger
|
||||
end
|
||||
|
||||
klass_name_str = "[#{klass_name.send(color)}]".ljust(32)
|
||||
[klass_name_str, logger.prefix, msg].reject(&:blank?).join(" ") + "\n"
|
||||
prefix = logger.prefix
|
||||
if prefix.is_a?(Proc)
|
||||
prefix = prefix.call
|
||||
end
|
||||
[klass_name_str, prefix, msg].reject(&:blank?).join(" ") + "\n"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -2,7 +2,9 @@ class Domain::Twitter::Tweet < ReduxApplicationRecord
|
||||
self.table_name = "domain_twitter_tweets"
|
||||
|
||||
belongs_to :author,
|
||||
class_name: "::Domain::Twitter::User"
|
||||
class_name: "::Domain::Twitter::User",
|
||||
foreign_key: :author_id,
|
||||
primary_key: :tw_id
|
||||
|
||||
has_many :medias,
|
||||
class_name: "Domain::Twitter::Media",
|
||||
|
||||
@@ -4,7 +4,8 @@ class Domain::Twitter::User < ReduxApplicationRecord
|
||||
|
||||
has_many :tweets,
|
||||
class_name: "Domain::Twitter::Tweet",
|
||||
foreign_key: "author_id"
|
||||
foreign_key: "author_id",
|
||||
primary_key: "tw_id"
|
||||
|
||||
has_many :medias, through: :tweets
|
||||
|
||||
@@ -18,9 +19,10 @@ class Domain::Twitter::User < ReduxApplicationRecord
|
||||
|
||||
def tweets_protected_error_proxies
|
||||
self.state_detail["tweets_protected_error_proxies"] ||= []
|
||||
self.state_detail["tweets_protected_error_proxies"]
|
||||
end
|
||||
|
||||
def due_for_timeline_tweets_scan?
|
||||
scanned_timeline_at.nil? || scanned_timeline_at < 1.week.ago
|
||||
scanned_timeline_at.nil? || scanned_timeline_at < 1.day.ago
|
||||
end
|
||||
end
|
||||
|
||||
@@ -12,6 +12,10 @@ class CreateTwitterTweets < ActiveRecord::Migration[7.0]
|
||||
t.json :state_detail
|
||||
t.json :raw_data
|
||||
|
||||
# twitter id associated with the user
|
||||
t.integer :tw_id
|
||||
t.index :tw_id, unique: true
|
||||
|
||||
t.string :name, null: false
|
||||
t.string :nick
|
||||
t.string :description
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
PROXY = ENV["proxy"]
|
||||
VALID_PROXIES = ["direct", "proxy-1", "dedipath-1", "serverhost-1"]
|
||||
raise("'proxy' env must be one of #{VALID_PROXIES}") unless VALID_PROXIES.include?(ENV["proxy"])
|
||||
raise("'proxy' env must be one of #{VALID_PROXIES}") unless VALID_PROXIES.include?(PROXY)
|
||||
|
||||
# This runs in the master process after it preloads the app
|
||||
after_preload_app do
|
||||
@@ -43,7 +44,7 @@ worker_configs = [
|
||||
workers: 3,
|
||||
},
|
||||
{
|
||||
name: "twitter timelines / fa user pages / galleries",
|
||||
name: "twitter / fa galleries",
|
||||
queues: [:twitter_timeline_tweets, :fa_user_page, :fa_user_gallery],
|
||||
workers: 2,
|
||||
},
|
||||
@@ -60,7 +61,7 @@ worker_configs = [
|
||||
]
|
||||
|
||||
worker_configs.each do |config|
|
||||
worker_group(config[:name]) do |g|
|
||||
worker_group("#{PROXY} - #{config[:name]}") do |g|
|
||||
g.read_ahead = 8
|
||||
g.sleep_delay = 5
|
||||
g.workers = config[:workers]
|
||||
|
||||
Reference in New Issue
Block a user