Files
redux-scraper/app/helpers/log_entries_helper.rb
Dylan Knutson 572c61cebb add proxies
2025-07-23 04:51:44 +00:00

388 lines
11 KiB
Ruby

# typed: strict
module LogEntriesHelper
extend T::Sig
extend T::Helpers
include HelpersInterface
abstract!
sig { params(content_type: String).returns(T::Boolean) }
def is_send_data_content_type?(content_type)
is_renderable_image_type?(content_type) ||
is_renderable_video_type?(content_type) ||
is_renderable_audio_type?(content_type) ||
is_flash_content_type?(content_type)
end
sig { params(uri_path: String).returns(T::Array[[String, String]]) }
def path_iterative_parts(uri_path)
path_parts = uri_path.split("/")
(1...path_parts.length).map do |i|
[
T.must(path_parts[i]),
T.must(path_parts[0..i]).join("/") +
(i == path_parts.length - 1 ? "" : "/"),
]
end
end
sig { params(content_type: String).returns(T.nilable(String)) }
def thumbnail_extension_for_content_type(content_type)
return nil unless is_thumbable_content_type?(content_type)
extension = extension_for_content_type(content_type)
if extension == "gif"
"gif"
else
"jpeg"
end
end
sig { params(content_type: String).returns(T.nilable(String)) }
def extension_for_content_type(content_type)
content_type = content_type.split(";")[0]
return nil unless content_type
extension = Rack::Mime::MIME_TYPES.invert[content_type]
return extension[1..] if extension
case content_type
when %r{image/jpeg}
"jpeg"
when %r{image/jpg}
"jpg"
when %r{image/png}
"png"
when %r{image/gif}
"gif"
when %r{video/webm}
"webm"
when %r{audio/mpeg}
"mp3"
when %r{audio/mp3}
"mp3"
when %r{audio/wav}
"wav"
when %r{application/pdf}
"pdf"
when %r{application/rtf}
"rtf"
when %r{application/msword}
"doc"
when %r{application/vnd\.openxmlformats-officedocument\.wordprocessingml\.document}
"docx"
when %r{application/vnd\.oasis\.opendocument\.text}
"odt"
else
nil
end
end
sig { params(content_type: String).returns(T::Boolean) }
def is_renderable_image_type?(content_type)
%w[image/jpeg image/jpg image/png image/gif].any? do |ct|
content_type.starts_with?(ct)
end
end
sig { params(content_type: String).returns(T::Boolean) }
def is_json_content_type?(content_type)
content_type.starts_with?("application/json")
end
sig { params(content_type: String).returns(T::Boolean) }
def is_rich_text_content_type?(content_type)
%w[
application/pdf
application/rtf
application/msword
text/plain
application/vnd.oasis.opendocument.text
application/vnd.openxmlformats-officedocument.wordprocessingml.document
].any? { |ct| content_type.starts_with?(ct) }
end
sig { params(content_type: String).returns(T::Boolean) }
def is_renderable_video_type?(content_type)
%w[video/mp4 video/webm].any? { |ct| content_type.starts_with?(ct) }
end
sig { params(content_type: String).returns(T::Boolean) }
def is_renderable_audio_type?(content_type)
%w[audio/mpeg audio/mp3 audio/wav audio/ogg].any? do |ct|
content_type.starts_with?(ct)
end
end
sig { params(content_type: String).returns(T::Boolean) }
def is_flash_content_type?(content_type)
content_type.match? %r{application/x-shockwave-flash}
end
sig { params(content_type: String).returns(T::Boolean) }
def is_thumbable_content_type?(content_type)
is_renderable_video_type?(content_type) ||
is_renderable_image_type?(content_type)
end
sig { params(rich_text_body: String).returns(T.nilable(String)) }
def convert_with_pdftohtml(rich_text_body)
stdin, stdout, stderr, wait_thr =
Open3.popen3(
"pdftohtml",
"-i", # ignore images
"-s", # generate single HTML page
"-nodrm", # ignore drm
"-enc",
"UTF-8",
"-stdout",
"-", # read from stdin
"-", # write to stdout (???)
)
stdin.binmode
stdin.write(rich_text_body)
stdin.close
stdout_str = stdout.read
exit_status = T.cast(wait_thr.value, Process::Status)
return nil unless exit_status.success?
# For PDFs, handle both HTML entities and Unicode NBSPs
# First replace the actual unicode NBSP character (U+00A0)
# stdout_str.gsub!(/[[:space:]]+/, " ")
stdout_str.gsub!(/\u00A0/, " ")
stdout_str.gsub!(/ /i, " ")
stdout_str.gsub!(/ /, " ")
stdout_str.gsub!(/ /i, " ")
stdout_str
ensure
stdin&.close
stdout&.close
stderr&.close
end
sig { params(rich_text_body: String).returns(T.nilable(String)) }
def convert_with_abiword(rich_text_body)
stdin, stdout, stderr, wait_thr =
Open3.popen3(
"abiword",
"--display=0",
"--to=html",
"--to-name=fd://1",
"fd://0",
)
stdin.binmode
stdin.write(rich_text_body)
stdin.close
stdout_str = stdout.read
exit_status = T.cast(wait_thr.value, Process::Status)
return nil unless exit_status.success?
stdout_str.gsub!(/Abiword HTML Document/, "")
stdout_str = try_convert_bbcode_to_html(stdout_str)
stdout_str.gsub!(%r{<br\s*/>}, "")
stdout_str
ensure
stdin&.close
stdout&.close
stderr&.close
end
sig { params(rich_text_body: String).returns(T.nilable(String)) }
def convert_with_libreoffice(rich_text_body)
tempfile = Tempfile.new(%w[test .doc], binmode: true)
tempfile.write(rich_text_body)
tempfile.flush
stdin, stdout, stderr, wait_thr =
Open3.popen3(
"libreoffice",
"--display",
"0",
"--headless",
"--convert-to",
"html",
T.must(tempfile.path),
"--cat",
)
stdin.binmode
stdin.write(rich_text_body)
stdin.close
stdout_str = stdout.read
exit_status = T.cast(wait_thr.value, Process::Status)
return nil unless exit_status.success?
stdout_str
ensure
stdin&.close
stdout&.close
stderr&.close
tempfile&.close
end
sig { params(str: String).returns(String) }
def reencode_as_utf8_lossy(str)
str.encode("UTF-8", invalid: :replace, undef: :replace, replace: "")
rescue StandardError
str
end
sig { params(rich_text_body: String).returns(String) }
def try_convert_bbcode_to_html(rich_text_body)
rich_text_body.bbcode_to_html(false)
rescue StandardError
rich_text_body
end
sig { params(rich_text_body: String).returns(String) }
def try_detect_encoding(rich_text_body)
encoding = CharlockHolmes::EncodingDetector.detect(rich_text_body)
encoding ? encoding[:encoding] : "UTF-8"
end
sig { params(log_entry: HttpLogEntry).returns(T.nilable(String)) }
def render_rich_text_content(log_entry)
content_type = log_entry.content_type
rich_text_body = log_entry.response_bytes
return nil if rich_text_body.blank? || content_type.blank?
is_plain_text = content_type.starts_with?("text/plain")
if is_plain_text
encoding_name = try_detect_encoding(rich_text_body)
rich_text_body = rich_text_body.force_encoding(encoding_name)
rich_text_body = reencode_as_utf8_lossy(rich_text_body)
document_html = try_convert_bbcode_to_html(rich_text_body)
elsif content_type.starts_with?("application/pdf")
document_html = convert_with_pdftohtml(rich_text_body)
else
document_html =
convert_with_abiword(rich_text_body) ||
convert_with_libreoffice(rich_text_body)
end
return nil if document_html.blank?
sanitize_rich_text_document_html(document_html, is_plain_text)
end
sig do
params(document_html: String, is_plain_text: T::Boolean).returns(String)
end
def sanitize_rich_text_document_html(document_html, is_plain_text)
quote_transformer =
Kernel.lambda do |env|
node = env[:node]
if node["class"]
classes = node["class"].split(" ").map(&:strip).compact
node.remove_attribute("class")
if classes.include?("quote")
# write div to be a blockquote
node.name = "blockquote"
end
end
node
end
clean_plain_text_node =
Kernel.lambda do |node|
node = T.cast(node, Nokogiri::XML::Node)
if node.text?
node_text = node.text.strip
if node_text.empty?
node.unlink
else
# collect all the subsequent nodes that are not a block element
# and replace the current node with a <p> containing the text
# and the collected nodes
current_node = node
inline_elements = []
while (next_sibling = current_node.next_sibling) &&
(next_sibling.name != "br") && (next_sibling.name != "p")
inline_elements << next_sibling
current_node = next_sibling
end
node_html = [node_text]
inline_elements.each do |inline_element|
inline_element.unlink
node_html << inline_element.to_html
end
node.replace("<p>#{node_html.join(" ")}</p>")
end
end
end
plain_text_transformer =
Kernel.lambda do |env|
# within a div, wrap bare text nodes in a <p>
node = T.cast(env[:node], Nokogiri::XML::Node)
node_name = T.cast(env[:node_name], String)
if node_name == "div"
current_child = T.unsafe(node.children.first)
while current_child.present?
clean_plain_text_node.call(current_child)
current_child = current_child.next_sibling
end
elsif node.text? && node.parent&.name == "#document-fragment"
clean_plain_text_node.call(node)
end
{ node_allowlist: [node] }
end
# remove_empty_newline_transformer =
# Kernel.lambda do |env|
# node = env[:node]
# node.unlink if node.text? && node.text.strip.chomp.blank?
# end
# remove_multiple_br_transformer =
# Kernel.lambda do |env|
# node = env[:node]
# if node.name == "br"
# node.unlink if node.previous_sibling&.name == "br"
# end
# end
sanitizer =
Sanitize.new(
elements: %w[span div p i b strong em blockquote br],
attributes: {
all: %w[style class],
},
css: {
properties: %w[color text-align margin-bottom],
},
transformers: [
quote_transformer,
# is_plain_text ? remove_empty_newline_transformer : nil,
is_plain_text ? plain_text_transformer : nil,
# is_plain_text ? remove_multiple_br_transformer : nil,
].compact,
)
fragment = sanitizer.fragment(document_html).strip
if is_plain_text
fragment.gsub!("<br>", "")
fragment.gsub!("<br />", "")
end
raw fragment
end
sig { params(performed_by: String).returns(String) }
def performed_by_to_short_code(performed_by)
case performed_by
when "direct"
"DR"
when "airvpn-1-netherlands"
"NL"
when "airvpn-2-san-jose"
"SJ"
else
"??"
end
end
end