Files
redux-scraper/app/lib/tasks/telegram_bot_task.rb
Dylan Knutson b6e2e5e502 Update telegram bot task, user view, and type definitions
- Modified telegram bot task implementation
- Updated domain users index view
- Updated telegram-bot-ruby type shims
2025-08-15 05:59:11 +00:00

530 lines
16 KiB
Ruby

# typed: strict
require "telegram/bot"
require "tempfile"
require "net/http"
require "uri"
require_relative "../stopwatch"
module Tasks
class TelegramBotTask
extend T::Sig
include Domain::VisualSearchHelper
include Domain::DomainModelHelper
sig { params(log_sink: T.any(IO, StringIO)).void }
def initialize(log_sink: $stderr)
@log_sink = log_sink
end
sig { void }
def run
bot_token = get_bot_token
if bot_token.nil?
log(
"❌ Telegram bot token not configured. Please set it in GlobalStates.",
)
log("Go to /state/telegram-config to configure the bot token.")
return
end
log("🤖 Starting Telegram bot...")
log("Bot token: #{bot_token[0..10]}..." + "*" * (bot_token.length - 11))
log("Press Ctrl+C to stop the bot")
begin
Telegram::Bot::Client.run(bot_token) do |bot|
log("✅ Telegram bot connected successfully")
bot.listen do |message|
begin
handle_message(bot, message)
rescue StandardError => e
log("❌ Error handling message: #{e.message}")
log("Backtrace: #{e.backtrace&.first(5)&.join("\n")}")
end
end
end
rescue Telegram::Bot::Exceptions::ResponseError => e
log("❌ Telegram API error: #{e.message}")
if e.error_code == 429
retry_after = e.response.dig("parameters", "retry_after")&.to_i || 10
log(
"❌ Rate limit exceeded, waiting #{retry_after} seconds and trying again.",
)
sleep(retry_after)
retry
else
raise e
end
rescue Interrupt
log("User interrupted the bot")
rescue StandardError => e
log("❌ Unexpected error: #{e.message}")
log("Backtrace: #{e.backtrace&.first(5)&.join("\n")}")
raise e
ensure
log("🛑 Telegram bot stopped")
end
end
private
sig { params(message: String).void }
def log(message)
@log_sink.puts(message)
end
sig { returns(T.nilable(String)) }
def get_bot_token
GlobalState.get("telegram-bot-token")
end
sig do
params(
bot: Telegram::Bot::Client,
message: Telegram::Bot::Types::Message,
).void
end
def handle_message(bot, message)
unless message.photo || message.document || message.video
bot.api.send_message(
chat_id: message.chat.id,
text: [
"👋 Hi! I'm a visual search bot.",
"📷 Send me an <b>image</b> or <b>video</b> and I'll search for similar content in my database.",
].join("\n\n"),
parse_mode: "HTML",
reply_to_message_id: message.message_id,
)
return
end
# Start timing the total request
total_request_timer = Stopwatch.start
chat_id = message.chat.id
# Create initial log record
telegram_log = create_telegram_log(message)
# Send initial response
response_message =
bot.api.send_message(
chat_id: chat_id,
text: "🔍 Analyzing... Please wait...",
reply_to_message_id: message.message_id,
)
begin
# Process the image and perform visual search
search_result, processed_blob =
process_media_message_with_logging(bot, message, telegram_log)
if search_result
if search_result.empty?
result_text = "❌ No close matches found."
else
result_text = format_search_results(search_result)
end
# Update log with success (whether results found or not)
update_telegram_log_success(
telegram_log,
search_result,
result_text,
processed_blob,
)
else
result_text =
"❌ Could not process the file. Please make sure it's a valid image or video file."
# Update log with invalid image
update_telegram_log_invalid_image(telegram_log, result_text)
end
# Update the response with results
bot.api.edit_message_text(
chat_id: chat_id,
message_id: response_message.message_id,
text: result_text,
parse_mode: "HTML",
)
# Record total request time
total_request_time = total_request_timer.elapsed
telegram_log.update!(total_request_time: total_request_time)
log("⏱️ Total request completed in #{total_request_timer.elapsed_s}")
rescue StandardError => e
log("Error processing file: #{e.message}")
# Update log with error
update_telegram_log_error(telegram_log, e)
# Update with error message
bot.api.edit_message_text(
chat_id: chat_id,
message_id: response_message.message_id,
text:
"❌ An error occurred while processing your file. Please try again.",
)
# Record total request time even for errors
total_request_time = total_request_timer.elapsed
telegram_log.update!(total_request_time: total_request_time)
log(
"⏱️ Total request (with error) completed in #{total_request_timer.elapsed_s}",
)
end
end
sig do
params(
bot: Telegram::Bot::Client,
message: Telegram::Bot::Types::Message,
telegram_log: TelegramBotLog,
).returns(
[
T.nilable(
T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
),
T.nilable(BlobFile),
],
)
end
def process_media_message_with_logging(bot, message, telegram_log)
log("📥 Received message from chat #{message.chat.id}")
# Get the largest photo or document
media_file = get_media_file_from_message(message)
return nil, nil unless media_file
# Download the image to a temporary file
download_stopwatch = Stopwatch.start
temp_file = download_telegram_file(bot, media_file)
download_time = download_stopwatch.elapsed
return nil, nil unless temp_file
log("📥 Downloaded file in #{download_stopwatch.elapsed_s}")
processed_blob = nil
begin
# Time image processing (file reading and BlobFile creation)
image_processing_stopwatch = Stopwatch.start
file_path = T.must(temp_file.path)
file_content = File.binread(file_path)
# Create BlobFile for the processed image
content_type =
case media_file
when Telegram::Bot::Types::Document
media_file.mime_type || "application/octet-stream"
when Telegram::Bot::Types::PhotoSize
"image/jpeg" # Telegram photos are typically JPEG
when Telegram::Bot::Types::Video
media_file.mime_type || "video/mp4"
else
"application/octet-stream"
end
processed_blob =
BlobFile.find_or_initialize_from_contents(file_content) do |blob|
blob.content_type = content_type
end
processed_blob.save! unless processed_blob.persisted?
image_processing_time = image_processing_stopwatch.elapsed
log("🔧 Processed file in #{image_processing_stopwatch.elapsed_s}")
# Time fingerprint generation
fingerprint_stopwatch = Stopwatch.start
temp_dir = Dir.mktmpdir("telegram-bot-task-visual-search")
fingerprints = generate_fingerprints(file_path, content_type, temp_dir)
fingerprint_computation_time = fingerprint_stopwatch.elapsed
if fingerprints.nil?
log("❌ Error generating fingerprints")
return nil, nil
end
log(
"🔍 Generated fingerprints in #{fingerprint_stopwatch.elapsed_s}, searching for similar images...",
)
# Time search operation
search_stopwatch = Stopwatch.start
similar_results =
find_similar_fingerprints(
fingerprints.map(&:to_fingerprint_and_detail),
limit: 10,
oversearch: 3,
includes: {
post_file: :post,
},
)
search_computation_time = search_stopwatch.elapsed
# Update timing metrics in log
telegram_log.update!(
download_time: download_time,
image_processing_time: image_processing_time,
fingerprint_computation_time: fingerprint_computation_time,
search_computation_time: search_computation_time,
)
# Filter to only >90% similarity
high_quality_matches =
similar_results.select { |result| result.similarity_percentage > 90 }
log(
"✅ Found #{high_quality_matches.length} high-quality matches (>90% similarity) in #{search_stopwatch.elapsed_s}",
)
[high_quality_matches, processed_blob]
rescue StandardError => e
log("❌ Error processing file: #{e.message}")
[nil, processed_blob]
ensure
# Clean up temp files
FileUtils.rm_rf(temp_dir) if temp_dir
temp_file.unlink if temp_file
end
end
# Logging helper methods
sig do
params(message: Telegram::Bot::Types::Message).returns(TelegramBotLog)
end
def create_telegram_log(message)
user = message.from
chat = message.chat
TelegramBotLog.create!(
telegram_user_id: user&.id || 0,
telegram_username: user&.username,
telegram_first_name: user&.first_name,
telegram_last_name: user&.last_name,
telegram_chat_id: chat.id,
request_timestamp: Time.current,
status: :processing, # Will be updated when request completes
search_results_count: 0,
response_data: {
},
)
end
sig do
params(
telegram_log: TelegramBotLog,
search_results:
T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
response_text: String,
processed_blob: T.nilable(BlobFile),
).void
end
def update_telegram_log_success(
telegram_log,
search_results,
response_text,
processed_blob
)
telegram_log.update!(
status: :success,
search_results_count: search_results.length,
processed_image: processed_blob,
response_data: {
response_text: response_text,
matches: search_results.length,
threshold: 90,
results:
search_results
.take(5)
.map do |result|
post_file = result.fingerprint.post_file
{
similarity: result.similarity_percentage.round(1),
post_file_id: post_file&.id,
}
end,
},
)
end
sig { params(telegram_log: TelegramBotLog, response_text: String).void }
def update_telegram_log_invalid_image(telegram_log, response_text)
telegram_log.update!(
status: :invalid_image,
search_results_count: 0,
error_message: "Invalid or unsupported file format",
response_data: {
response_text: response_text,
error: "Invalid file format",
},
)
end
sig { params(telegram_log: TelegramBotLog, error: StandardError).void }
def update_telegram_log_error(telegram_log, error)
telegram_log.update!(
status: :error,
search_results_count: 0,
error_message: error.message,
response_data: {
error: error.message,
error_class: error.class.name,
},
)
end
sig do
params(
results: T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
).returns(String)
end
def format_search_results(results)
return "❌ No close matches found." if results.empty?
response =
"🎯 Found #{results.length} #{"match".pluralize(results.length)}"
response += " (showing first 5)" if results.length > 5
response += "\n"
results
.take(5)
.each_with_index do |result, index|
post_file = result.fingerprint.post_file
next unless post_file
post = post_file.post
next unless post
response += "- "
percentage = result.similarity_percentage.round(1)
response += "#{percentage}%"
external_url = post.external_url_for_view
if (title = post.title) && external_url
response +=
" - <a href=\"#{external_url}\">#{post.to_param} - #{html_escape(title)}</a>"
elsif external_url
response += " - <a href=\"#{external_url}\">#{post.to_param}</a>"
else
response += " - #{post.to_param}"
end
if post.respond_to?(:creator) && (creator = post.send(:creator))
url = creator.external_url_for_view
if url
response +=
" by <a href=\"#{url}\">#{html_escape(creator.name_for_view)}</a>"
else
response += " by #{html_escape(creator.name_for_view)}"
end
end
response += "\n"
end
response
end
sig { params(text: String).returns(String) }
def html_escape(text)
# Only escape characters that are explicitly required by Telegram Bot API
# All <, > and & symbols that are not part of a tag or HTML entity must be replaced
# API supports only these named HTML entities: &lt;, &gt;, &amp; and &quot;
text
.gsub("&", "&amp;") # Ampersand (must be first to avoid double-escaping)
.gsub("<", "&lt;") # Less than
.gsub(">", "&gt;") # Greater than
end
# Extract image file information from Telegram message
sig do
params(message: Telegram::Bot::Types::Message).returns(
T.nilable(
T.any(
Telegram::Bot::Types::PhotoSize,
Telegram::Bot::Types::Video,
Telegram::Bot::Types::Document,
),
),
)
end
def get_media_file_from_message(message)
if message.photo && message.photo.any?
# Get the largest photo variant
message.photo.max_by { |photo| photo.file_size || 0 }
elsif message.video
message.video
elsif message.document
# Check if document is an image
content_type = message.document.mime_type
if content_type&.start_with?("image/") || content_type == "video/mp4"
message.document
else
log("❌ Document is not an image or video: #{content_type}")
nil
end
else
log("❌ No image or video found in message")
nil
end
end
# Download image from Telegram and save to temporary file
sig do
params(
bot: Telegram::Bot::Client,
file_info:
T.any(
Telegram::Bot::Types::PhotoSize,
Telegram::Bot::Types::Video,
Telegram::Bot::Types::Document,
),
).returns(T.nilable(Tempfile))
end
def download_telegram_file(bot, file_info)
bot_token = get_bot_token
return nil unless bot_token
begin
# Get file path from Telegram
file_response = bot.api.get_file(file_id: file_info.file_id)
file_path = file_response.file_path
unless file_path
log("❌ Could not get file path from Telegram")
return nil
end
# Download the file
file_url = "https://api.telegram.org/file/bot#{bot_token}/#{file_path}"
log("📥 Downloading file from: #{file_url}...")
uri = URI(file_url)
downloaded_data = Net::HTTP.get(uri)
# Create temporary file
extension = File.extname(file_path)
extension = ".jpg" if extension.empty?
temp_file = Tempfile.new(["telegram_file", extension])
temp_file.binmode
temp_file.write(downloaded_data)
temp_file.close
log("✅ Downloaded file to: #{temp_file.path}")
temp_file
rescue StandardError => e
log("❌ Error downloading file: #{e.message}")
nil
end
end
end
end