Files
redux-scraper/app/lib/tasks/telegram_bot_task.rb
2025-08-05 05:05:21 +00:00

515 lines
15 KiB
Ruby

# typed: strict
require "telegram/bot"
require "tempfile"
require "net/http"
require "uri"
require_relative "../stopwatch"
module Tasks
class TelegramBotTask
extend T::Sig
include Domain::VisualSearchHelper
include Domain::DomainModelHelper
sig { params(log_sink: T.any(IO, StringIO)).void }
def initialize(log_sink: $stderr)
@log_sink = log_sink
end
sig { void }
def run
bot_token = get_bot_token
if bot_token.nil?
log(
"❌ Telegram bot token not configured. Please set it in GlobalStates.",
)
log("Go to /state/telegram-config to configure the bot token.")
return
end
log("🤖 Starting Telegram bot...")
log("Bot token: #{bot_token[0..10]}..." + "*" * (bot_token.length - 11))
log("Press Ctrl+C to stop the bot")
begin
Telegram::Bot::Client.run(bot_token) do |bot|
log("✅ Telegram bot connected successfully")
bot.listen do |message|
begin
handle_message(bot, message)
rescue StandardError => e
log("❌ Error handling message: #{e.message}")
log("Backtrace: #{e.backtrace&.first(5)&.join("\n")}")
end
end
end
rescue Telegram::Bot::Exceptions::ResponseError => e
log("❌ Telegram API error: #{e.message}")
log("Please check your bot token configuration.")
rescue StandardError => e
log("❌ Unexpected error: #{e.message}")
log("Backtrace: #{e.backtrace&.first(5)&.join("\n")}")
ensure
log("🛑 Telegram bot stopped")
end
end
private
sig { params(message: String).void }
def log(message)
@log_sink.puts(message)
end
sig { returns(T.nilable(String)) }
def get_bot_token
GlobalState.get("telegram-bot-token")
end
sig do
params(
bot: Telegram::Bot::Client,
message: Telegram::Bot::Types::Message,
).void
end
def handle_message(bot, message)
return unless message.photo || message.document
# Start timing the total request
total_request_timer = Stopwatch.start
chat_id = message.chat.id
# Create initial log record
telegram_log = create_telegram_log(message)
# Send initial response
response_message =
bot.api.send_message(
chat_id: chat_id,
text: "🔍 Analyzing image... Please wait...",
reply_to_message_id: message.message_id,
)
begin
# Process the image and perform visual search
search_result, processed_blob =
process_image_message_with_logging(bot, message, telegram_log)
if search_result && !search_result.empty?
result_text = format_search_results(search_result)
# Update log with success
update_telegram_log_success(
telegram_log,
search_result,
result_text,
processed_blob,
)
elsif search_result
result_text = "❌ No close matches found."
# Update log with no results
update_telegram_log_no_results(
telegram_log,
result_text,
processed_blob,
)
else
result_text =
"❌ Could not process the image. Please make sure it's a valid image file."
# Update log with invalid image
update_telegram_log_invalid_image(telegram_log, result_text)
end
# Update the response with results
bot.api.edit_message_text(
chat_id: chat_id,
message_id: response_message.message_id,
text: result_text,
parse_mode: "Markdown",
)
# Record total request time
total_request_time = total_request_timer.elapsed
telegram_log.update!(total_request_time: total_request_time)
log("⏱️ Total request completed in #{total_request_timer.elapsed_s}")
rescue StandardError => e
log("Error processing image: #{e.message}")
# Update log with error
update_telegram_log_error(telegram_log, e)
# Update with error message
bot.api.edit_message_text(
chat_id: chat_id,
message_id: response_message.message_id,
text:
"❌ An error occurred while processing your image. Please try again.",
)
# Record total request time even for errors
total_request_time = total_request_timer.elapsed
telegram_log.update!(total_request_time: total_request_time)
log(
"⏱️ Total request (with error) completed in #{total_request_timer.elapsed_s}",
)
end
end
sig do
params(
bot: Telegram::Bot::Client,
message: Telegram::Bot::Types::Message,
telegram_log: TelegramBotLog,
).returns(
[
T.nilable(
T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
),
T.nilable(BlobFile),
],
)
end
def process_image_message_with_logging(bot, message, telegram_log)
log("📥 Received image message from chat #{message.chat.id}")
# Get the largest photo or document
image_file = get_image_file_from_message(message)
return nil, nil unless image_file
# Download the image to a temporary file
download_stopwatch = Stopwatch.start
temp_file = download_telegram_image(bot, image_file)
download_time = download_stopwatch.elapsed
return nil, nil unless temp_file
log("📥 Downloaded image in #{download_stopwatch.elapsed_s}")
processed_blob = nil
begin
# Time image processing (file reading and BlobFile creation)
image_processing_stopwatch = Stopwatch.start
file_path = T.must(temp_file.path)
file_content = File.binread(file_path)
# Create BlobFile for the processed image
content_type =
case image_file
when Telegram::Bot::Types::Document
image_file.mime_type || "application/octet-stream"
when Telegram::Bot::Types::PhotoSize
"image/jpeg" # Telegram photos are typically JPEG
else
"application/octet-stream"
end
processed_blob =
BlobFile.find_or_initialize_from_contents(file_content) do |blob|
blob.content_type = content_type
end
processed_blob.save! unless processed_blob.persisted?
image_processing_time = image_processing_stopwatch.elapsed
log("🔧 Processed image in #{image_processing_stopwatch.elapsed_s}")
# Time fingerprint generation
fingerprint_stopwatch = Stopwatch.start
fingerprint_value =
Domain::PostFile::BitFingerprint.from_file_path(file_path)
detail_fingerprint_value =
Domain::PostFile::BitFingerprint.detail_from_file_path(file_path)
fingerprint_computation_time = fingerprint_stopwatch.elapsed
log(
"🔍 Generated fingerprints in #{fingerprint_stopwatch.elapsed_s}, searching for similar images...",
)
# Time search operation
search_stopwatch = Stopwatch.start
similar_results =
find_similar_fingerprints(
fingerprint_value: fingerprint_value,
fingerprint_detail_value: detail_fingerprint_value,
limit: 10,
oversearch: 3,
includes: {
post_file: :post,
},
)
search_computation_time = search_stopwatch.elapsed
# Update timing metrics in log
telegram_log.update!(
download_time: download_time,
image_processing_time: image_processing_time,
fingerprint_computation_time: fingerprint_computation_time,
search_computation_time: search_computation_time,
)
# Filter to only >90% similarity
high_quality_matches =
similar_results.select { |result| result.similarity_percentage > 90 }
log(
"✅ Found #{high_quality_matches.length} high-quality matches (>90% similarity) in #{search_stopwatch.elapsed_s}",
)
[high_quality_matches, processed_blob]
rescue StandardError => e
log("❌ Error processing image: #{e.message}")
[nil, processed_blob]
ensure
# Clean up temp file
temp_file.unlink if temp_file
end
end
# Logging helper methods
sig do
params(message: Telegram::Bot::Types::Message).returns(TelegramBotLog)
end
def create_telegram_log(message)
user = message.from
chat = message.chat
TelegramBotLog.create!(
telegram_user_id: user&.id || 0,
telegram_username: user&.username,
telegram_first_name: user&.first_name,
telegram_last_name: user&.last_name,
telegram_chat_id: chat.id,
request_timestamp: Time.current,
status: :success, # Will be updated later
search_results_count: 0,
response_data: {
},
)
end
sig do
params(
telegram_log: TelegramBotLog,
search_results:
T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
response_text: String,
processed_blob: T.nilable(BlobFile),
).void
end
def update_telegram_log_success(
telegram_log,
search_results,
response_text,
processed_blob
)
telegram_log.update!(
status: :success,
search_results_count: search_results.length,
processed_image: processed_blob,
response_data: {
response_text: response_text,
matches: search_results.length,
threshold: 90,
results:
search_results
.take(5)
.map do |result|
post = result.fingerprint.post_file&.post
{
similarity: result.similarity_percentage.round(1),
post_id: post&.id,
url: post&.external_url_for_view,
}
end,
},
)
end
sig do
params(
telegram_log: TelegramBotLog,
response_text: String,
processed_blob: T.nilable(BlobFile),
).void
end
def update_telegram_log_no_results(
telegram_log,
response_text,
processed_blob
)
telegram_log.update!(
status: :no_results,
search_results_count: 0,
processed_image: processed_blob,
response_data: {
response_text: response_text,
matches: 0,
threshold: 90,
},
)
end
sig { params(telegram_log: TelegramBotLog, response_text: String).void }
def update_telegram_log_invalid_image(telegram_log, response_text)
telegram_log.update!(
status: :invalid_image,
search_results_count: 0,
error_message: "Invalid or unsupported image format",
response_data: {
response_text: response_text,
error: "Invalid image format",
},
)
end
sig { params(telegram_log: TelegramBotLog, error: StandardError).void }
def update_telegram_log_error(telegram_log, error)
telegram_log.update!(
status: :error,
search_results_count: 0,
error_message: error.message,
response_data: {
error: error.message,
error_class: error.class.name,
},
)
end
sig do
params(
results: T::Array[Domain::VisualSearchHelper::SimilarFingerprintResult],
).returns(String)
end
def format_search_results(results)
return "❌ No close matches found." if results.empty?
response =
"🎯 Found #{results.length} #{"match".pluralize(results.length)}"
response += " (showing first 5)" if results.length > 5
response += "\n"
results
.take(5)
.each_with_index do |result, index|
post_file = result.fingerprint.post_file
next unless post_file
post = post_file.post
next unless post
response += "- "
percentage = result.similarity_percentage.round(1)
response += "#{percentage}%"
external_url = post.external_url_for_view
if (title = post.title) && external_url
response += " - [#{post.to_param} - #{title}](#{external_url})"
elsif external_url
response += " - [#{post.to_param}](#{external_url})"
else
response += " - #{post.to_param}"
end
if post.respond_to?(:creator) && (creator = post.send(:creator))
url = creator.external_url_for_view
if url
response += " by [#{creator.name_for_view}](#{url})"
else
response += " by #{creator.name_for_view}"
end
end
response += "\n"
end
response
end
# Extract image file information from Telegram message
sig do
params(message: Telegram::Bot::Types::Message).returns(
T.nilable(
T.any(
Telegram::Bot::Types::PhotoSize,
Telegram::Bot::Types::Document,
),
),
)
end
def get_image_file_from_message(message)
if message.photo && message.photo.any?
# Get the largest photo variant
message.photo.max_by { |photo| photo.file_size || 0 }
elsif message.document
# Check if document is an image
content_type = message.document.mime_type
if content_type&.start_with?("image/")
message.document
else
log("❌ Document is not an image: #{content_type}")
nil
end
else
log("❌ No image found in message")
nil
end
end
# Download image from Telegram and save to temporary file
sig do
params(
bot: Telegram::Bot::Client,
file_info:
T.any(
Telegram::Bot::Types::PhotoSize,
Telegram::Bot::Types::Document,
),
).returns(T.nilable(Tempfile))
end
def download_telegram_image(bot, file_info)
bot_token = get_bot_token
return nil unless bot_token
begin
# Get file path from Telegram
file_response = bot.api.get_file(file_id: file_info.file_id)
file_path = file_response.file_path
unless file_path
log("❌ Could not get file path from Telegram")
return nil
end
# Download the file
file_url = "https://api.telegram.org/file/bot#{bot_token}/#{file_path}"
log("📥 Downloading image from: #{file_url}...")
uri = URI(file_url)
downloaded_data = Net::HTTP.get(uri)
# Create temporary file
extension = File.extname(file_path)
extension = ".jpg" if extension.empty?
temp_file = Tempfile.new(["telegram_image", extension])
temp_file.binmode
temp_file.write(downloaded_data)
temp_file.close
log("✅ Downloaded image to: #{temp_file.path}")
temp_file
rescue StandardError => e
log("❌ Error downloading image: #{e.message}")
nil
end
end
end
end