add twitter scraper infra

This commit is contained in:
2023-03-08 18:15:05 -08:00
parent 9bd3feeac6
commit 5a4595cd57
40 changed files with 792 additions and 77 deletions

View File

@@ -12,6 +12,7 @@ Rake.application.rake_require "worker"
Rake.application.rake_require "metrics"
Rake.application.rake_require "fa"
Rake.application.rake_require "e621"
Rake.application.rake_require "twitter"
task :set_ar_stdout => :environment do
ActiveRecord::Base.logger = Logger.new(STDOUT)

View File

@@ -46,15 +46,28 @@ class LogEntriesController < ApplicationController
}
end
@by_domain_counts = Hash.new do |hash, key|
hash[key] = {
count: 0,
bytes: 0,
bytes_stored: 0,
}
end
HttpLogEntry.includes(:response).find_each(batch_size: 100, order: :desc) do |log_entry|
break if log_entry.created_at < @time_window.ago
@last_window_count += 1
@last_window_bytes += log_entry.response.size
@last_window_bytes_stored += log_entry.response.bytes_stored
content_type = log_entry.content_type.split(";").first
@content_type_counts[content_type][:count] += 1
@content_type_counts[content_type][:bytes] += log_entry.response.size
@content_type_counts[content_type][:bytes_stored] += log_entry.response.bytes_stored
@by_domain_counts[log_entry.uri_host][:count] += 1
@by_domain_counts[log_entry.uri_host][:bytes] += log_entry.response.size
@by_domain_counts[log_entry.uri_host][:bytes_stored] += log_entry.response.bytes_stored
end
end
@@ -79,8 +92,11 @@ class LogEntriesController < ApplicationController
)
elsif response.content_type =~ /text\/plain/
render plain: response.contents
elsif response.content_type =~ /text\/html/
elsif response.content_type.starts_with? "text/html"
render html: response.contents.html_safe
elsif response.content_type.starts_with? "application/json"
pretty_json = JSON.pretty_generate(JSON.parse response.contents)
render html: "<html><body><pre>#{pretty_json}</pre></body></html>".html_safe
else
render plain: "no renderer for #{response.content_type}"
end

View File

@@ -1,23 +1,25 @@
class Domain::Fa::Job::FaJobBase < Scraper::JobBase
discard_on ActiveJob::DeserializationError
DATE_HELPER = Class.new.extend(ActionView::Helpers::DateHelper)
def self.build_http_client
@@fa_base_http_client ||= begin
proxy = ENV["proxy"]
puts "Building FA HTTP client (proxy: #{proxy})"
Domain::Fa::Scraper::HttpClient.new(proxy || Scraper::CurlHttpPerformer.new)
Scraper::FaHttpClient.new(proxy || Scraper::CurlHttpPerformer.new)
end
end
def self.reset_http_client
def self.reset_clients
@@fa_base_http_client.close!
@@fa_base_http_client = nil
end
protected
def http_client
@http_client ||= self.class.build_http_client
end
def find_or_intitialize_user_from_args(args)
args[:user] || begin
fatal_error("arg 'url_name' is required if arg 'user' is nil") if args[:url_name].blank?
@@ -165,9 +167,4 @@ class Domain::Fa::Job::FaJobBase < Scraper::JobBase
end
end
end
def time_ago_in_words(maybe_time)
return "never".bold if maybe_time.nil?
"#{DATE_HELPER.time_ago_in_words(maybe_time)} ago".bold
end
end

View File

@@ -64,7 +64,7 @@ class Domain::Fa::Job::UserGalleryJob < Domain::Fa::Job::FaJobBase
folder_href += "/" unless folder_href.end_with?("/")
folder_href = "/" + folder_href unless folder_href.start_with?("/")
page_url = "https://www.furaffinity.net#{folder_href}#{page_number}?perpage=72"
response = @http_client.get(page_url, caused_by_entry: @caused_by_entry)
response = http_client.get(page_url, caused_by_entry: @caused_by_entry)
@first_gallery_page_entry ||= response.log_entry
@caused_by_entry = @first_gallery_page_entry

View File

@@ -16,7 +16,7 @@ class Domain::Fa::Job::UserPageJob < Domain::Fa::Job::FaJobBase
return
end
response = @http_client.get(
response = http_client.get(
"https://www.furaffinity.net/user/#{@user.url_name}/",
caused_by_entry: @caused_by_entry,
)

View File

@@ -0,0 +1,33 @@
class Domain::Twitter::Job::MediaJob < Domain::Twitter::Job::TwitterJobBase
queue_as :twitter_static_file
ignore_signature_args [:caused_by_entry]
def perform(args)
logger.level = :warn
@media = args[:media]
@caused_by_entry = args[:caused_by_entry]
logger.prefix = "[tweet #{@media.tweet_id.to_s.bold} / media #{@media.id.bold}]"
if @media.file
@logger.warn("already have file, skipping")
return
end
response = http_client.get(
@media.url_str,
caused_by_entry: @caused_by_entry,
)
logger.debug "#{HexUtil.humansize(response.log_entry.response.size)} / " +
"#{response.log_entry.content_type} / " +
"#{response.log_entry.response_time_ms} ms"
if response.status_code != 200
fatal_error("#{response.status_code} getting media")
end
@media.file = response.log_entry
@media.save
end
end

View File

@@ -0,0 +1,41 @@
class Domain::Twitter::Job::TwitterJobBase < Scraper::JobBase
discard_on ActiveJob::DeserializationError
def self.build_http_client
@@twitter_http_client ||= begin
proxy = ENV["proxy"]
puts "Building Twitter HTTP client (proxy: #{proxy})"
Scraper::TwitterHttpClient.new(proxy || Scraper::CurlHttpPerformer.new)
end
end
def self.build_gallery_dl_client
@@gallery_dl_client ||= begin
proxy = ENV["proxy"]
puts "Building GalleryDlClient (TODO: proxy: #{proxy})"
Scraper::GalleryDlClient.new("http://localhost:5000/")
end
end
def find_or_intitialize_user_from_args(args)
if args[:user]
args[:user]
elsif args[:id].present?
Domain::Twitter::User.find_or_initialize_by(id: args[:id])
elsif args[:name].present?
Domain::Twitter::User.find_or_initialize_by(name: args[:name])
else
fatal_error("arg 'name' or 'id' is required if arg 'user' is nil")
end
end
protected
def http_client
@http_client ||= self.class.build_http_client
end
def gallery_dl_client
@gallery_dl_client ||= self.class.build_gallery_dl_client
end
end

View File

@@ -0,0 +1,144 @@
class Domain::Twitter::Job::UserTimelineTweetsJob < Domain::Twitter::Job::TwitterJobBase
GDL = Scraper::GalleryDlClient
queue_as :twitter_timeline_tweets
ignore_signature_args [:caused_by_entry]
def perform(args)
@user = find_or_intitialize_user_from_args(args)
logger.prefix = "[user: #{(@user.name || args[:name])&.bold}]"
@caused_by_entry = args[:caused_by_entry]
@force_scan = !!args[:force_scan]
@first_twitter_caused_by = nil
@updated_user_from_tweet = false
@num_scanned_tweets = 0
@num_created_tweets = 0
@num_created_medias = 0
logger.info("start tweet timeline scan (force: #{@force_scan.to_s.bold})")
if !@force_scan && !@user.due_for_timeline_tweets_scan?
logger.warn("scanned #{time_ago_in_words(@user.scanned_timeline_at)}, skipping")
return
end
gallery_dl_client.start_twitter_user(
@user.name, caused_by_entry: @caused_by_entry,
)
while true
event = gallery_dl_client.next_message(
caused_by_entry: @first_twitter_caused_by || @caused_by_entry,
)
case event
when GDL::StartEvent
logger.info("start tweets(#{event.extractor})")
when GDL::FinishEvent
logger.info("end tweets - #{@num_created_tweets} total")
break
when GDL::HttpRequestEvent
@first_twitter_caused_by ||= event.log_entry
when GDL::TweetEvent
update_user_from_tweet(event)
break if handle_tweet_event(event) == :break
when GDL::TweetMediaEvent
handle_media_event(event)
else
raise("unhandled event #{event}")
end
end
logger.info(
"created #{@num_created_tweets.to_s.bold} / " +
"scanned #{@num_scanned_tweets.to_s.bold} tweets, " +
"created #{@num_created_medias} medias"
)
@user.scanned_timeline_at = Time.now
@user.save!
end
private
def handle_tweet_event(tweet_event)
tweet_hash = tweet_event.tweet
tweet = Domain::Twitter::Tweet.find_by(id: tweet_hash[:id])
@num_scanned_tweets += 1
if tweet
logger.info("found existing tweet: #{tweet.id}")
return nil if @force_scan
return :break if @user.scanned_timeline_at
return nil
end
logger.info("+ tweet (#{@num_created_tweets.to_s.bold}) #{tweet_hash[:id].to_s.bold}")
Domain::Twitter::Tweet.new({
id: tweet_hash[:id],
author: @user,
content: tweet_hash[:content],
reply_to_tweet_id: tweet_hash[:reply_to],
tweeted_at: Time.at(tweet_hash[:date]),
}).save!
@num_created_tweets += 1
rescue
binding.pry
end
def handle_media_event(media_event)
media = Domain::Twitter::Media.find_by(id: media_event.filename)
if media
logger.info("skip existing media #{media.id}")
enqueue_media_file(media) unless media.file
return
end
logger.info("+ media (#{@num_created_medias.to_s.bold}) #{media_event.filename.bold}")
media = Domain::Twitter::Media.new({
id: media_event.filename,
tweet_id: media_event.tweet_id,
url_str: media_event.file_url,
})
media.save!
@num_created_medias += 1
enqueue_media_file(media)
rescue
binding.pry
end
def enqueue_media_file(media)
enqueue_job(Domain::Twitter::Job::MediaJob, {
media: media || raise,
caused_by_entry: @first_twitter_caused_by,
}, { priority: self.priority })
end
def update_user_from_tweet(tweet_event)
return if @updated_user_from_tweet
author = tweet_event.author
if @user.id
raise unless @user.id == author[:id]
else
@user.id = author[:id]
end
@user.name = author[:name]
@user.nick = author[:nick]
@user.description = author[:description]
@user.num_fav_count = author[:favourites_count]
@user.num_followers = author[:followers_count]
@user.num_following = author[:friends_count]
@user.registered_at = Time.at(author[:date])
@user.save!
@updated_user_from_tweet = true
rescue
binding.pry
end
end

View File

@@ -7,9 +7,8 @@ class Scraper::JobBase < ApplicationJob
@ignore_signature_args
end
def initialize(http_client = nil)
@http_client = http_client || self.class.build_http_client
super
def self.reset_clients
# for subclasses
end
def write_point(name, tags: {}, fields: {})
@@ -27,9 +26,9 @@ class Scraper::JobBase < ApplicationJob
start = Time.now
block.call
rescue Net::ReadTimeout => e
logger.error "Net::ReadTimeout - resetting http client"
logger.error "Net::ReadTimeout - resetting clients"
error = e
job.class.reset_http_client
job.class.reset_clients
raise
rescue Exception => e
error = e
@@ -57,9 +56,10 @@ class Scraper::JobBase < ApplicationJob
raise msg.uncolorize
end
protected
DATE_HELPER = Class.new.extend(ActionView::Helpers::DateHelper)
def http_client
@http_client
def time_ago_in_words(maybe_time)
return "never".bold if maybe_time.nil?
"#{DATE_HELPER.time_ago_in_words(maybe_time)} ago".bold
end
end

View File

@@ -15,11 +15,18 @@ class ColorLogger
klass_name = "Fa::#{klass_name}"
end
Logger.new(sink).tap do |logger|
prefix = ""
if klass_name.start_with?("Twitter::Job::")
klass_name.delete_prefix!("Twitter::Job::")
klass_name.delete_suffix!("Job")
klass_name = "Twitter::#{klass_name}"
end
Logger.new(sink).tap do |logger|
def logger.prefix=(p)
prefix = p
@logger_prefix = p
end
def logger.prefix
@logger_prefix || ""
end
logger.formatter = proc do |severity, datetime, progname, msg|
@@ -30,7 +37,7 @@ class ColorLogger
end
klass_name_str = "[#{klass_name.send(color)}]".ljust(32)
[klass_name_str, prefix, msg].reject(&:blank?).join(" ") + "\n"
[klass_name_str, logger.prefix, msg].reject(&:blank?).join(" ") + "\n"
end
end
end

View File

@@ -52,21 +52,6 @@ module LiteTrail::ActiveRecordClassMethods
autosave: false
end
after_create do
if self.respond_to?(:created_at)
model_created_at = self.created_at
else
model_created_at = Time.now
end
self.versions << lite_trail_class.new({
event: "create",
item: self,
schema_version: schema_version,
created_at: model_created_at,
})
end
after_update do
changes = self.saved_changes
if changes.any?

View File

@@ -1,4 +1,4 @@
class Domain::Fa::Scraper::HttpClient < Scraper::BaseHttpClient
class Scraper::FaHttpClient < Scraper::BaseHttpClient
DEFAULT_ALLOWED_DOMAINS = [
"*.furaffinity.net",
"*.facdn.net",

View File

@@ -0,0 +1,155 @@
class Scraper::GalleryDlClient
include HasColorLogger
StartEvent = Struct.new(:url, :extractor)
FinishEvent = Struct.new(:ignore)
HttpRequestEvent = Struct.new(
:method,
:url,
:kwargs,
:requested_at,
:request_headers,
:response_headers,
:response_code,
:response_time_ms,
:body,
:log_entry,
)
TweetEvent = Struct.new(
:tweet,
:author
)
TweetMediaEvent = Struct.new(
:tweet_id,
:file_url,
:filename,
:media_num,
:extension,
:height,
:width,
)
def initialize(host)
@client = Ripcord::Client.new(host)
@max_cache_size = 8
@blob_entry_cache = Hash.new do |hash, key|
hash[key] = LegacyImport::AdaptiveCache.new(@max_cache_size, 1.0, 0.1)
end
end
def start_twitter_user(username, caused_by_entry: nil)
rpc = @client.call("start_user", ["https://twitter.com/#{username}/tweets"])
raise unless rpc.successful?
decode_message(rpc.result, caused_by_entry)
end
def next_message(caused_by_entry: nil)
rpc = @client.call("next_message", [])
raise unless rpc.successful?
decode_message(rpc.result, caused_by_entry)
end
private
def decode_message(response, caused_by_entry)
case response[:event]
when "start" then StartEvent.new(response[:url], response[:extractor])
when "finish" then FinishEvent.new(nil)
when "http_request"
http_request = response[:http_request]
event = HttpRequestEvent.new(
http_request[:method],
http_request[:url],
http_request[:kwargs],
Time.at(http_request[:requested_at]),
http_request[:kwargs][:headers],
http_request[:response_headers],
http_request[:status_code],
(http_request[:duration] * 1000).to_i,
Base64.decode64(http_request[:content_base64]),
nil
)
log_http_request_event(event, caused_by_entry)
event
when "tweet" then TweetEvent.new(response[:tweet], response[:author])
when "tweet_media"
media = response[:media]
TweetMediaEvent.new(
media[:tweet_id],
media[:file_url],
media[:filename],
media[:media_num],
media[:extension],
media[:height],
media[:width],
)
end
end
def log_http_request_event(http_event, caused_by_entry)
request_headers = http_event.request_headers
response_headers = http_event.response_headers
content_type = response_headers[:"Content-Type"] ||
response_headers[:"content-type"] ||
raise("no content type provided: #{response_headers}")
url = http_event.url
uri = Addressable::URI.parse(url)
if http_event.kwargs && http_event.kwargs[:params] && http_event.kwargs[:params][:variables]
uri.query = JSON.parse(http_event.kwargs[:params][:variables]).to_query
end
url = uri.to_s
cache_key = "#{uri.host}|#{content_type}"
blob_entry_cache = @blob_entry_cache[cache_key]
candidates = if blob_entry_cache.at_capacity? && rand(0..100) >= 5
blob_entry_cache.candidates
else
[]
end
candidates << caused_by_entry.response if caused_by_entry&.response
candidates << caused_by_entry.response.base if caused_by_entry&.response&.base
retries = 0
begin
response_blob_entry = BlobEntry.find_or_build(
content_type: content_type,
contents: http_event.body,
candidates: candidates,
)
log_entry = HttpLogEntry.new({
uri: url,
verb: http_event.method.downcase,
content_type: content_type,
status_code: http_event.response_code,
request_headers: HttpLogEntryHeader.find_or_build(headers: request_headers),
response_headers: HttpLogEntryHeader.find_or_build(headers: response_headers),
response: response_blob_entry,
response_time_ms: http_event.response_time_ms,
requested_at: http_event.requested_at,
caused_by_entry: caused_by_entry,
performed_by: "direct-gdl",
})
log_entry.save!
rescue
retries += 1
retry if retries < 2
raise
end
logger.debug "insert http log entry #{log_entry.id.to_s.bold}"
http_event.log_entry = log_entry
if response_blob_entry.base_sha256
blob_entry_cache.reward(HexUtil.bin2hex(response_blob_entry.base_sha256)[0..8])
else
blob_entry_cache.insert(
HexUtil.bin2hex(response_blob_entry.sha256)[0..8], response_blob_entry, url
)
end
end
end

View File

@@ -24,7 +24,7 @@ class Scraper::ProxyHttpPerformer < Scraper::HttpPerformer
def close!
begin
@client.instance_variable_get("@http_client").finish
@client.instance_variable_get("@http_client")&.finish
rescue IOError => e
Rails.logger.error("Failed to close http client: #{e.inspect}")
end

View File

@@ -0,0 +1,30 @@
class Scraper::TwitterHttpClient < Scraper::BaseHttpClient
DEFAULT_ALLOWED_DOMAINS = [
"*.twimg.com",
"ipinfo.io",
]
def initialize(http_performer_or_proxy)
# if http_performer_or_proxy.is_a?(String)
# @cookies = Rails.application.config.x.cookies.fa[http_performer_or_proxy]
# else
# @cookies = Rails.application.config.x.cookies.fa["direct"]
# end
super(http_performer_or_proxy)
end
def cookies
[]
end
def ratelimit
[
["*.twimg.com", 0.5],
["*", 1],
]
end
def allowed_domains
DEFAULT_ALLOWED_DOMAINS
end
end

View File

@@ -94,7 +94,9 @@ class BlobEntry < ReduxApplicationRecord
DIFFABLE_CONTENT_TYPES = [
/text\/html/,
/text\/plain/,
/application\/json/,
]
def self.build_record(content_type:, sha256:, contents:, candidates: [])
record = BlobEntry.new(sha256: sha256, content_type: content_type, size: contents.size)

View File

@@ -0,0 +1,13 @@
class Domain::Twitter::Media < ReduxApplicationRecord
self.table_name = "domain_twitter_medias"
self.primary_key = :id
enum state: %i[ok error]
after_initialize do
self.state_detail ||= {}
self.raw_data ||= {}
end
belongs_to :tweet, class_name: "Domain::Twitter::Tweet"
belongs_to :file, class_name: "HttpLogEntry", optional: true
end

View File

@@ -0,0 +1,17 @@
class Domain::Twitter::Tweet < ReduxApplicationRecord
self.table_name = "domain_twitter_tweets"
belongs_to :author,
class_name: "::Domain::Twitter::User"
has_many :medias,
class_name: "Domain::Twitter::Media",
foreign_key: :tweet_id
enum state: %i[ok error]
after_initialize do
self.state ||= "ok"
self.state_detail ||= {}
self.raw_data ||= {}
end
end

View File

@@ -0,0 +1,22 @@
class Domain::Twitter::User < ReduxApplicationRecord
self.table_name = "domain_twitter_users"
has_lite_trail(schema_version: 1, separate_versions_table: true)
has_many :tweets,
class_name: "Domain::Twitter::Tweet",
foreign_key: "author_id"
has_many :medias, through: :tweets
enum state: %i[ok error]
validates_presence_of(:name)
after_initialize do
self.state ||= "ok"
self.state_detail ||= {}
self.raw_data ||= {}
end
def due_for_timeline_tweets_scan?
scanned_timeline_at.nil? || scanned_timeline_at < 1.week.ago
end
end

View File

@@ -2,7 +2,13 @@ class HttpLogEntry < ReduxApplicationRecord
include ImmutableModel
enum verb: %i[get post], _prefix: true
enum performed_by: %i[direct legacy proxy-1 dedipath-1], _prefix: true
enum performed_by: %i[
direct
legacy
proxy-1
dedipath-1
direct-gdl
], _prefix: true
belongs_to :response,
foreign_key: :response_sha256,

View File

@@ -6,15 +6,18 @@ class HttpLogEntryHeader < ReduxApplicationRecord
raise("must be a hash") unless headers.is_a?(Hash)
headers = headers.dup
scrub_header(headers, "date")
scrub_header(headers, "expires")
scrub_header(headers, "etag")
scrub_header(headers, "content-length")
scrub_header(headers, "cache-control")
if headers["last-modified"]
headers["last-modified"].gsub!(/\d\d:\d\d:\d\d/, "(scrubbed)")
end
scrub_header(headers, "perf")
scrub_header(headers, "x-connection-hash")
scrub_header(headers, "x-transaction-id")
scrub_header(headers, "x-rate-limit-remaining")
scrub_header(headers, "x-rate-limit-reset")
scrub_header(headers, "x-response-time")
scrub_datetime_header(headers, "expires")
scrub_datetime_header(headers, "last-modified")
scrub_datetime_header(headers, "date")
if headers["cf-ray"]
ray = headers["cf-ray"].split("-")
@@ -45,6 +48,12 @@ class HttpLogEntryHeader < ReduxApplicationRecord
hash[key] = "(scrubbed)" if hash[key]
end
def self.scrub_datetime_header(hash, key)
if hash[key]
hash[key].gsub!(/\d\d:\d\d:\d\d/, "(scrubbed)")
end
end
def to_bulk_insert_hash
{
sha256: sha256,

View File

@@ -46,6 +46,7 @@
<%= HexUtil.humansize(@last_window_bytes_stored) %> bytes stored (<%= stored_ratio %>x) -
<%= HexUtil.humansize(@last_window_bytes / @time_window.in_seconds) %>/sec
</h3>
<h2>By content type</h2>
<table>
<% @content_type_counts.sort_by { |_ignore, stats| -stats[:count] }.each do |content_type, stats| %>
<tr>
@@ -57,3 +58,15 @@
</tr>
<% end %>
</table>
<h2>By domain</h2>
<table>
<% @by_domain_counts.sort_by { |_ignore, stats| -stats[:bytes] }.each do |domain, stats| %>
<tr>
<td><%= domain %></td>
<td><%= stats[:count] %> requests</td>
<td><%= HexUtil.humansize(stats[:bytes]) %> transferred</td>
<td><%= HexUtil.humansize(stats[:bytes_stored]) %> stored</td>
<td><%= (stats[:bytes_stored].to_f / stats[:bytes]).round(2) %>x storage ratio</td>
</tr>
<% end %>
</table>

View File

@@ -2,6 +2,8 @@
# All other migrations PT provides are optional.
class CreateVersions < ActiveRecord::Migration[7.0]
def change
raise("boom")
create_table :versions do |t|
t.string :item_type, null: false
t.bigint :item_id, null: false

View File

@@ -1,5 +1,7 @@
class CreateDomainFaPosts < ActiveRecord::Migration[7.0]
def change
raise("boom")
create_table :domain_fa_posts do |t|
t.integer :fa_id
t.index :fa_id, unique: true

View File

@@ -1,5 +1,7 @@
class CreateDomainFaUsers < ActiveRecord::Migration[7.0]
def change
raise("boom")
create_table :domain_fa_users do |t|
t.string :name, null: false, index: { unique: true }
t.string :full_name

View File

@@ -1,5 +1,7 @@
class CreateDelayedJobs < ActiveRecord::Migration[7.0]
def self.up
raise("boom")
create_table :delayed_jobs do |table|
table.integer :priority, default: 0, null: false # Allows some jobs to jump to the front of the queue
table.integer :attempts, default: 0, null: false # Provides for retries, but still fail eventually.
@@ -17,6 +19,8 @@ class CreateDelayedJobs < ActiveRecord::Migration[7.0]
end
def self.down
raise("boom")
drop_table :delayed_jobs
end
end

View File

@@ -1,5 +1,6 @@
class CreateDomainE621Posts < ActiveRecord::Migration[7.0]
def change
raise("boom!")
create_table :domain_e621_posts do |t|
t.integer :e621_id, null: false
t.integer :state, null: false
@@ -25,10 +26,11 @@ class CreateDomainE621Posts < ActiveRecord::Migration[7.0]
t.references :file
t.references :parent_e621
t.datetime :deleted_at
t.timestamps
t.index :e621_id, unique: :true
t.index :md5, unique: :true
t.index :md5, where: "deleted_at is null", unique: :true
end
create_versions_table :domain_e621_posts

View File

@@ -1,5 +1,7 @@
class CreateDomainE621Tags < ActiveRecord::Migration[7.0]
def change
raise("boom")
create_table :domain_e621_tags do |t|
t.string :name, null: false
t.integer :e621_id

View File

@@ -1,5 +1,7 @@
class CreateDomainE621Taggings < ActiveRecord::Migration[7.0]
def change
raise("boom")
create_table :domain_e621_taggings do |t|
t.integer :type
t.references :post

View File

@@ -1,10 +0,0 @@
class AddDeletedAtToE6Posts < ActiveRecord::Migration[7.0]
def change
change_table :domain_e621_posts do |t|
t.datetime :deleted_at
end
remove_index :domain_e621_posts, :md5
add_index :domain_e621_posts, :md5, where: "deleted_at is null", unique: true
end
end

View File

@@ -0,0 +1,82 @@
class CreateTwitterTweets < ActiveRecord::Migration[7.0]
def change
raise("boom")
# for twitter tables -
# state - enum (ok, error)
# state_detail - json, contents depends on state, grab-bag of debugging info
# raw_data - json, more or less the raw pre-interpreted data surfaced
# from gallery-dl-server
create_table :domain_twitter_users do |t|
t.integer :state
t.json :state_detail
t.json :raw_data
t.string :name, null: false
t.string :nick
t.string :description
t.integer :num_fav_count
t.integer :num_followers
t.integer :num_following
t.datetime :registered_at
t.datetime :scanned_timeline_at
t.timestamps
t.index :name, unique: true
end
create_versions_table :domain_twitter_users
create_table :domain_twitter_tweets do |t|
t.integer :state
t.json :state_detail
t.json :raw_data
t.references :author
t.string :content
t.references :reply_to_tweet # another tweet id
t.timestamp :tweeted_at
t.timestamps
end
add_foreign_key(
:domain_twitter_tweets,
:domain_twitter_users,
column: :author_id,
validate: true,
)
create_table :domain_twitter_medias, id: false do |t|
# id (primary key) - the filename from the twitter e.g. FjTpW15VEAA7FKz
t.string :id, null: false
t.index :id, unique: true
t.integer :state
t.json :state_detail
t.json :raw_data
# url of the file to request
t.string :url_str
# fk of the tweet this media belongs to
t.references :tweet, null: false
t.references :file # null file -> not yet downloaded
t.timestamps
end
add_foreign_key(
:domain_twitter_medias,
:domain_twitter_tweets,
column: :tweet_id,
validate: true,
)
add_foreign_key(
:domain_twitter_medias,
:http_log_entries,
column: :file_id,
validate: true,
)
end
end

63
db/schema.rb generated
View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2023_03_06_181250) do
ActiveRecord::Schema[7.0].define(version: 2023_03_08_044852) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_stat_statements"
enable_extension "pg_trgm"
@@ -76,9 +76,9 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_06_181250) do
t.jsonb "tags_array"
t.bigint "file_id"
t.bigint "parent_e621_id"
t.datetime "deleted_at"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.datetime "deleted_at"
t.index ["e621_id"], name: "index_domain_e621_posts_on_e621_id", unique: true
t.index ["file_id"], name: "index_domain_e621_posts_on_file_id"
t.index ["md5"], name: "index_domain_e621_posts_on_md5", unique: true, where: "(deleted_at IS NULL)"
@@ -155,6 +155,61 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_06_181250) do
t.index ["url_name"], name: "index_domain_fa_users_on_url_name", unique: true
end
create_table "domain_twitter_medias", id: false, force: :cascade do |t|
t.string "id", null: false
t.integer "state"
t.json "state_detail"
t.json "raw_data"
t.string "url_str"
t.bigint "tweet_id", null: false
t.bigint "file_id"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["file_id"], name: "index_domain_twitter_medias_on_file_id"
t.index ["id"], name: "index_domain_twitter_medias_on_id", unique: true
t.index ["tweet_id"], name: "index_domain_twitter_medias_on_tweet_id"
end
create_table "domain_twitter_tweets", force: :cascade do |t|
t.integer "state"
t.json "state_detail"
t.json "raw_data"
t.bigint "author_id"
t.string "content"
t.bigint "reply_to_tweet_id"
t.datetime "tweeted_at", precision: nil
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["author_id"], name: "index_domain_twitter_tweets_on_author_id"
t.index ["reply_to_tweet_id"], name: "index_domain_twitter_tweets_on_reply_to_tweet_id"
end
create_table "domain_twitter_user_versions", force: :cascade do |t|
t.bigint "item_id"
t.integer "schema_version"
t.string "event", null: false
t.jsonb "diff"
t.datetime "created_at", null: false
t.index ["item_id"], name: "index_domain_twitter_user_versions_on_item_id"
end
create_table "domain_twitter_users", force: :cascade do |t|
t.integer "state"
t.json "state_detail"
t.json "raw_data"
t.string "name", null: false
t.string "nick"
t.string "description"
t.integer "num_fav_count"
t.integer "num_followers"
t.integer "num_following"
t.datetime "registered_at"
t.datetime "scanned_timeline_at"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["name"], name: "index_domain_twitter_users_on_name", unique: true
end
create_table "flat_sst_entries", id: false, force: :cascade do |t|
t.binary "key", null: false
t.binary "contents", null: false
@@ -216,6 +271,10 @@ ActiveRecord::Schema[7.0].define(version: 2023_03_06_181250) do
add_foreign_key "domain_e621_post_versions", "domain_e621_posts", column: "item_id"
add_foreign_key "domain_fa_posts", "domain_fa_users", column: "creator_id"
add_foreign_key "domain_fa_posts", "http_log_entries", column: "file_id"
add_foreign_key "domain_twitter_medias", "domain_twitter_tweets", column: "tweet_id"
add_foreign_key "domain_twitter_medias", "http_log_entries", column: "file_id"
add_foreign_key "domain_twitter_tweets", "domain_twitter_users", column: "author_id"
add_foreign_key "domain_twitter_user_versions", "domain_twitter_users", column: "item_id"
add_foreign_key "http_log_entries", "blob_entries", column: "response_sha256", primary_key: "sha256"
add_foreign_key "http_log_entries", "http_log_entries", column: "caused_by_id"
add_foreign_key "http_log_entries", "http_log_entry_headers", column: "request_headers_id"

View File

@@ -33,14 +33,21 @@ after_worker_shutdown do |worker_info|
"with process id #{worker_info.process_id}"
end
{
queues = {
default: 1,
fa_user_page: 1,
fa_user_gallery: 1,
fa_post: 3,
manual: 4,
static_file: 6,
}.each do |queue, workers|
twitter_static_file: 4,
}
# if ENV["proxy"] == "direct" || ENV["proxy"].nil?
# queues[:twitter_timeline_tweets] = 1
# end
queues.each do |queue, workers|
worker_group(queue) do |g|
g.read_ahead = 8
g.sleep_delay = 5

53
pool_twitter.rb Normal file
View File

@@ -0,0 +1,53 @@
# This runs in the master process after it preloads the app
after_preload_app do
puts "Master #{Process.pid} preloaded app"
# Don't hang on to database connections from the master after we've
# completed initialization
ActiveRecord::Base.connection_pool.disconnect!
end
# This runs in the worker processes after it has been forked
on_worker_boot do |worker_info|
ActiveJob::Base.logger = Logger.new("/dev/null")
Delayed::Worker.logger = Logger.new(STDOUT)
Delayed::Worker.logger.level = :error
Rails.logger = Logger.new(STDOUT)
puts "Worker #{Process.pid} started"
sleep rand(1..5)
# Reconnect to the database
ActiveRecord::Base.establish_connection
end
# This runs in the master process after a worker starts
after_worker_boot do |worker_info|
puts "Master #{Process.pid} booted worker #{worker_info.name} with " \
"process id #{worker_info.process_id}"
end
# This runs in the master process after a worker shuts down
after_worker_shutdown do |worker_info|
puts "Master #{Process.pid} detected dead worker #{worker_info.name} " \
"with process id #{worker_info.process_id}"
end
queues = {
# twitter_static_file: 4,
}
if ENV["proxy"] == "direct" || ENV["proxy"].nil?
queues[:twitter_timeline_tweets] = 1
end
queues.each do |queue, workers|
worker_group(queue) do |g|
g.read_ahead = 8
g.sleep_delay = 5
g.workers = workers
g.queues = [queue.to_s]
end
end
preload_app

View File

@@ -15,6 +15,7 @@ namespace :fa do
Domain::Fa::Job::BrowsePageJob.
set(priority: -20, queue: "manual").
perform_later({})
puts "enqueued browse page job"
end
desc "run a single post scan job"

22
rake/twitter.rake Normal file
View File

@@ -0,0 +1,22 @@
namespace :twitter do
desc "scan timeline of a user"
task :timeline => [:set_logger_stdout, :environment] do |t, args|
force_scan = ENV["force_scan"] || false
name = ENV["name"] || raise("must provide name")
Domain::Twitter::Job::UserTimelineTweetsJob.
set(priority: -10).
perform_later({ name: name, force_scan: force_scan })
puts "timeline for #{name}"
end
task :timeline_file => [:set_logger_stdout, :environment] do
file = ENV["file"]
names = File.read(file).split("\n").map(&:strip).map(&:chomp)
names.each do |name|
Domain::Twitter::Job::UserTimelineTweetsJob.
set(priority: -10).
perform_later({ name: name, force_scan: false })
puts "timeline for #{name}"
end
end
end

View File

@@ -1,8 +1,8 @@
require "rails_helper"
describe Domain::Fa::Scraper::HttpClient do
describe Scraper::FaHttpClient do
it "creates an http log entry" do
client = Domain::Fa::Scraper::HttpClient.new(SpecUtil.mock_http_performer(
client = Scraper::FaHttpClient.new(SpecUtil.mock_http_performer(
"https://www.furaffinity.net/",
request_headers: Hash,
response_code: 200,

View File

@@ -10,19 +10,13 @@ class Domain::E621::PostTest < ActiveSupport::TestCase
})
assert post.valid?, post.errors.full_messages
assert post.save
assert_equal 1, post.versions.length
check_create = proc {
created_at_version = post.versions.last
assert_equal "create", created_at_version.event
assert_equal post.created_at, created_at_version.created_at
}
check_create.call
assert_equal 0, post.versions.length
post.reload
check_create.call
assert_equal 0, post.versions.length
check_update = proc {
assert_equal 1, post.versions.length
updated_rating_version = post.versions.last
assert_equal "update", updated_rating_version.event
assert_equal post.updated_at, updated_rating_version.created_at

View File

@@ -28,12 +28,12 @@ class Domain::Fa::PostTest < ActiveSupport::TestCase
assert_match /with a handful that have special edits/, new_post.description
new_post.save!
assert_equal 1, new_post.versions.count
assert_equal 0, new_post.versions.count
assert new_post.file.id
new_post.num_views = 1000
new_post.save!
assert_equal 2, new_post.versions.count
assert_equal 1, new_post.versions.count
old_post = new_post.versions.last.reify
assert_equal 904, old_post.num_views

View File

@@ -9,6 +9,6 @@ class HttpLogEntryHeaderTest < ActiveSupport::TestCase
},
)
assert_equal "bar", header.headers["foo"]
assert_equal "(scrubbed)", header.headers["date"]
assert_equal "Tue, 15 Nov 1994 (scrubbed) GMT", header.headers["date"]
end
end