Compare commits
3 Commits
7eada9588c
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4396d367a8 | ||
|
|
212a72c511 | ||
|
|
a955acbdce |
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -1724,6 +1724,7 @@ dependencies = [
|
||||
"teloxide-core",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3290,9 +3291,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.18.0"
|
||||
version = "1.18.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be"
|
||||
checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
|
||||
dependencies = [
|
||||
"getrandom 0.3.3",
|
||||
"js-sys",
|
||||
|
||||
@@ -34,6 +34,7 @@ seq-macro = "0.3.6"
|
||||
base64 = "0.22.1"
|
||||
mockall = "0.13.1"
|
||||
reqwest = "0.12.23"
|
||||
uuid = "1.18.1"
|
||||
|
||||
[dev-dependencies]
|
||||
rstest = "0.26.1"
|
||||
|
||||
@@ -16,6 +16,9 @@ CREATE TABLE users (
|
||||
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_users_telegram_id ON users(telegram_id);
|
||||
CREATE INDEX idx_users_username ON users(username);
|
||||
|
||||
-- Main listing table (handles all listing types)
|
||||
CREATE TABLE listings (
|
||||
id INTEGER PRIMARY KEY,
|
||||
@@ -34,6 +37,7 @@ CREATE TABLE listings (
|
||||
slots_available INTEGER DEFAULT 1,
|
||||
|
||||
-- Timing
|
||||
is_active INTEGER DEFAULT 1,
|
||||
starts_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
ends_at TEXT NOT NULL,
|
||||
anti_snipe_minutes INTEGER DEFAULT 5,
|
||||
@@ -43,6 +47,10 @@ CREATE TABLE listings (
|
||||
FOREIGN KEY (seller_id) REFERENCES users(id)
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_listings_seller_id ON listings(seller_id);
|
||||
CREATE INDEX idx_listings_type ON listings(listing_type);
|
||||
CREATE INDEX idx_listings_ends_at ON listings(ends_at);
|
||||
|
||||
-- Proxy bid strategies (NOT actual bids, but bidding strategies)
|
||||
CREATE TABLE proxy_bids (
|
||||
id INTEGER PRIMARY KEY,
|
||||
@@ -59,6 +67,8 @@ CREATE TABLE proxy_bids (
|
||||
UNIQUE(listing_id, buyer_id) -- One active proxy per user per listing
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_proxy_bids_listing_buyer ON proxy_bids(listing_id, buyer_id);
|
||||
|
||||
-- Actual bids that happened (events)
|
||||
CREATE TABLE bids (
|
||||
id INTEGER PRIMARY KEY,
|
||||
@@ -83,6 +93,10 @@ CREATE TABLE bids (
|
||||
FOREIGN KEY (proxy_bid_id) REFERENCES proxy_bids(id)
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_bids_listing_id ON bids(listing_id);
|
||||
CREATE INDEX idx_bids_buyer_id ON bids(buyer_id);
|
||||
CREATE INDEX idx_bids_amount ON bids(bid_amount);
|
||||
|
||||
-- Media attachments
|
||||
CREATE TABLE listing_medias (
|
||||
id INTEGER PRIMARY KEY,
|
||||
@@ -95,6 +109,8 @@ CREATE TABLE listing_medias (
|
||||
FOREIGN KEY (listing_id) REFERENCES listings(id)
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_listing_medias_listing_id ON listing_medias(listing_id);
|
||||
|
||||
-- User preferences
|
||||
CREATE TABLE user_settings (
|
||||
user_id INTEGER PRIMARY KEY,
|
||||
@@ -106,12 +122,19 @@ CREATE TABLE user_settings (
|
||||
FOREIGN KEY (user_id) REFERENCES users(id)
|
||||
) STRICT;
|
||||
|
||||
-- Create indexes for better performance
|
||||
CREATE INDEX idx_listings_seller_id ON listings(seller_id);
|
||||
CREATE INDEX idx_listings_type ON listings(listing_type);
|
||||
CREATE INDEX idx_listings_ends_at ON listings(ends_at);
|
||||
CREATE INDEX idx_bids_listing_id ON bids(listing_id);
|
||||
CREATE INDEX idx_bids_buyer_id ON bids(buyer_id);
|
||||
CREATE INDEX idx_bids_amount ON bids(bid_amount);
|
||||
CREATE INDEX idx_proxy_bids_listing_buyer ON proxy_bids(listing_id, buyer_id);
|
||||
CREATE INDEX idx_listing_medias_listing_id ON listing_medias(listing_id);
|
||||
-- Message about a listing that were forwarded from the bot to a channel or user
|
||||
CREATE TABLE forwarded_listings (
|
||||
id INTEGER PRIMARY KEY,
|
||||
listing_id INTEGER NOT NULL,
|
||||
telegram_message_id INTEGER NOT NULL,
|
||||
telegram_chat_id INTEGER NOT NULL,
|
||||
forwarding_user_id INTEGER NOT NULL,
|
||||
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
FOREIGN KEY (listing_id) REFERENCES listings(id),
|
||||
FOREIGN KEY (forwarding_user_id) REFERENCES users(id)
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX idx_forwarded_listings_listing_id ON forwarded_listings(listing_id);
|
||||
CREATE INDEX idx_forwarded_listings_forwarding_user_id ON forwarded_listings(forwarding_user_id);
|
||||
15
migrations/02_add_bid_active_listing_constraint.sql
Normal file
15
migrations/02_add_bid_active_listing_constraint.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
-- Add constraint to prevent bids on inactive listings
|
||||
-- This ensures atomicity at the database level
|
||||
|
||||
-- Create a trigger to prevent bids on inactive listings
|
||||
-- This is more reliable than CHECK constraints in SQLite
|
||||
CREATE TRIGGER prevent_bid_on_inactive_listing
|
||||
BEFORE INSERT ON bids
|
||||
FOR EACH ROW
|
||||
WHEN (
|
||||
SELECT COALESCE(is_active, 0) FROM listings WHERE id = NEW.listing_id
|
||||
) != 1
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'Cannot place bid on inactive listing');
|
||||
END;
|
||||
|
||||
@@ -154,7 +154,7 @@ mod tests {
|
||||
listing: PersistedListing,
|
||||
}
|
||||
async fn set_up_fixtures() -> Fixtures {
|
||||
let deps = create_deps().await;
|
||||
let Deps { deps, .. } = create_deps().await;
|
||||
let seller = with_test_user(&deps, |seller| {
|
||||
seller.username = Some("seller".to_string());
|
||||
seller.telegram_id = 123.into()
|
||||
|
||||
@@ -14,7 +14,7 @@ use crate::{
|
||||
dptree_utils::MapTwo,
|
||||
handle_error::with_error_handler,
|
||||
handler_utils::find_listing_by_id,
|
||||
message_utils::buyer_name_or_link,
|
||||
message_utils::user_name_or_link,
|
||||
start_command_data::StartCommandData,
|
||||
App, BotError, BotHandler, BotResult, DialogueRootState, RootDialogue,
|
||||
};
|
||||
@@ -113,7 +113,7 @@ async fn handle_place_bid_on_listing(
|
||||
response_lines.push(format!(
|
||||
"Placing a bid <b>{title}</b>, ran by {seller}",
|
||||
title = listing.base.title,
|
||||
seller = buyer_name_or_link(&seller)
|
||||
seller = user_name_or_link(&seller)
|
||||
));
|
||||
let currency_type = listing.base.currency_type;
|
||||
response_lines.push(format!("You are bidding on this listing as: {user:?}"));
|
||||
@@ -164,11 +164,11 @@ async fn handle_view_listing_bids(
|
||||
if let Some(current_bid) = bids.first() {
|
||||
let buyer = bidding_users
|
||||
.get(¤t_bid.buyer_id)
|
||||
.ok_or(BotError::internal("Buyer not found"))?;
|
||||
.ok_or(BotError::internal_str("Buyer not found"))?;
|
||||
response_lines.push(format!(
|
||||
"💰 Current highest bid: <b>{current_bid}</b> from {buyer_name}",
|
||||
current_bid = current_bid.bid_amount.with_type(currency_type),
|
||||
buyer_name = buyer_name_or_link(&buyer)
|
||||
buyer_name = user_name_or_link(buyer)
|
||||
));
|
||||
} else {
|
||||
response_lines.push("💰 No bids yet".to_string());
|
||||
@@ -177,11 +177,11 @@ async fn handle_view_listing_bids(
|
||||
for bid in bids.iter() {
|
||||
let bidder = bidding_users
|
||||
.get(&bid.buyer_id)
|
||||
.ok_or(BotError::internal("Bidder not found"))?;
|
||||
.ok_or(BotError::internal_str("Bidder not found"))?;
|
||||
response_lines.push(format!(
|
||||
"💰 Bid: <b>{bid_amount}</b> from {buyer_name}",
|
||||
bid_amount = bid.bid_amount.with_type(currency_type),
|
||||
buyer_name = buyer_name_or_link(&bidder)
|
||||
buyer_name = user_name_or_link(bidder)
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -11,14 +11,14 @@ use crate::{
|
||||
db::{bid::PersistedBid, listing::PersistedListing, user::PersistedUser},
|
||||
message::MessageType,
|
||||
message_sender::{BoxedMessageSender, MessageSender},
|
||||
message_utils::buyer_name_or_link,
|
||||
message_utils::user_name_or_link,
|
||||
start_command_data::StartCommandData,
|
||||
BotError, BotResult, MessageTarget,
|
||||
};
|
||||
|
||||
pub struct BotMessageSender(Bot, MessageTarget);
|
||||
pub struct BotMessageSender(Bot, Option<MessageTarget>);
|
||||
impl BotMessageSender {
|
||||
pub fn new(bot: Bot, message_target: MessageTarget) -> Self {
|
||||
pub fn new(bot: Bot, message_target: Option<MessageTarget>) -> Self {
|
||||
Self(bot, message_target)
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ impl BotMessageSender {
|
||||
#[async_trait]
|
||||
impl MessageSender for BotMessageSender {
|
||||
fn with_target(&self, target: MessageTarget) -> BoxedMessageSender {
|
||||
let clone = Self(self.0.clone(), target);
|
||||
let clone = Self(self.0.clone(), Some(target));
|
||||
Box::new(clone)
|
||||
}
|
||||
async fn send_html_message(
|
||||
@@ -34,7 +34,10 @@ impl MessageSender for BotMessageSender {
|
||||
text: String,
|
||||
keyboard: Option<InlineKeyboardMarkup>,
|
||||
) -> BotResult {
|
||||
let target = self.1.clone();
|
||||
let target = self
|
||||
.1
|
||||
.clone()
|
||||
.ok_or(BotError::internal_str("No message target"))?;
|
||||
if let Some(message_id) = target.message_id {
|
||||
log::info!("Editing message in chat: {target:?}");
|
||||
let mut message = self
|
||||
@@ -107,6 +110,36 @@ impl MessageSender for BotMessageSender {
|
||||
self.send_bid_invalid_listing_expired(listing, buyer)
|
||||
.await?;
|
||||
}
|
||||
MessageType::UserHasWonListingForSeller {
|
||||
listing,
|
||||
buyer,
|
||||
seller,
|
||||
bid,
|
||||
} => {
|
||||
self.send_user_has_won_listing_for_seller(listing, buyer, seller, bid)
|
||||
.await?;
|
||||
}
|
||||
MessageType::UserHasWonListingForBuyer {
|
||||
listing,
|
||||
buyer,
|
||||
seller,
|
||||
bid,
|
||||
} => {
|
||||
self.send_user_has_won_listing_for_buyer(listing, buyer, seller, bid)
|
||||
.await?;
|
||||
}
|
||||
MessageType::UserHasLostListingForBuyer {
|
||||
listing,
|
||||
buyer,
|
||||
seller,
|
||||
} => {
|
||||
self.send_user_has_lost_listing_for_buyer(listing, buyer, seller)
|
||||
.await?;
|
||||
}
|
||||
MessageType::ListingExpiredWithNoWinnerForSeller { listing, seller } => {
|
||||
self.send_listing_expired_with_no_winner_for_seller(listing, seller)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -142,7 +175,7 @@ impl BotMessageSender {
|
||||
"A bid was placed on {title} for <b>{bid_amount}</b> by {buyer}",
|
||||
bid_amount = bid.bid_amount.with_type(listing.base.currency_type),
|
||||
title = listing.base.title,
|
||||
buyer = buyer_name_or_link(&buyer),
|
||||
buyer = user_name_or_link(&buyer),
|
||||
),
|
||||
Some(
|
||||
InlineKeyboardMarkup::default().append_row([InlineKeyboardButton::callback(
|
||||
@@ -178,7 +211,82 @@ impl BotMessageSender {
|
||||
self.with_target(buyer.into())
|
||||
.send_html_message(
|
||||
format!(
|
||||
"Auction <b>{title}</b> has already ended",
|
||||
"Auction <b>{title}</b> already ended on <b>{ends_at}</b>",
|
||||
title = listing.base.title,
|
||||
ends_at = listing.base.ends_at.format("%Y-%m-%d %H:%M:%S")
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_user_has_won_listing_for_seller(
|
||||
&self,
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
seller: PersistedUser,
|
||||
bid: PersistedBid,
|
||||
) -> BotResult {
|
||||
self.with_target(seller.into())
|
||||
.send_html_message(
|
||||
format!(
|
||||
"<b>{buyer}</b> has won {title} for <b>{bid_amount}</b>",
|
||||
buyer = user_name_or_link(&buyer),
|
||||
title = listing.base.title,
|
||||
bid_amount = bid.bid_amount.with_type(listing.base.currency_type),
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_user_has_won_listing_for_buyer(
|
||||
&self,
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
seller: PersistedUser,
|
||||
bid: PersistedBid,
|
||||
) -> BotResult {
|
||||
self.with_target(buyer.into())
|
||||
.send_html_message(
|
||||
format!(
|
||||
"You have won {title} by {seller} for <b>{bid_amount}</b>",
|
||||
title = listing.base.title,
|
||||
seller = user_name_or_link(&seller),
|
||||
bid_amount = bid.bid_amount.with_type(listing.base.currency_type),
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_user_has_lost_listing_for_buyer(
|
||||
&self,
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
seller: PersistedUser,
|
||||
) -> BotResult {
|
||||
self.with_target(buyer.into())
|
||||
.send_html_message(
|
||||
format!(
|
||||
"Auction {title} by {seller} has ended, and you were not the winner",
|
||||
title = listing.base.title,
|
||||
seller = user_name_or_link(&seller),
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_listing_expired_with_no_winner_for_seller(
|
||||
&self,
|
||||
listing: PersistedListing,
|
||||
seller: PersistedUser,
|
||||
) -> BotResult {
|
||||
self.with_target(seller.into())
|
||||
.send_html_message(
|
||||
format!(
|
||||
"Auction {title} has ended with no winner",
|
||||
title = listing.base.title
|
||||
),
|
||||
None,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use anyhow::Error;
|
||||
use teloxide::dispatching::DpHandlerDescription;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -11,7 +12,10 @@ impl BotError {
|
||||
pub fn user_visible(msg: impl Into<String>) -> Self {
|
||||
Self::UserVisibleError(msg.into())
|
||||
}
|
||||
pub fn internal(msg: impl Into<String>) -> Self {
|
||||
pub fn internal(msg: impl Into<Error>) -> Self {
|
||||
Self::InternalError(msg.into())
|
||||
}
|
||||
pub fn internal_str(msg: impl Into<String>) -> Self {
|
||||
Self::InternalError(anyhow::anyhow!(msg.into()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::{
|
||||
},
|
||||
handle_error::with_error_handler,
|
||||
handler_utils::{find_listing_by_id, find_or_create_db_user_from_update},
|
||||
message_utils::{extract_callback_data, pluralize_with_count},
|
||||
message_utils::{extract_callback_data, pluralize_with_count, user_name_or_link},
|
||||
start_command_data::StartCommandData,
|
||||
App, BotError, BotResult, Command, DialogueRootState, RootDialogue,
|
||||
};
|
||||
@@ -97,7 +97,10 @@ async fn inline_query_extract_forward_listing(
|
||||
) -> Option<PersistedListing> {
|
||||
let query = &inline_query.query;
|
||||
info!("Try to extract forward listing from query: {query}");
|
||||
let listing_id_str = query.split("forward_listing:").nth(1)?;
|
||||
if !query.starts_with("forward_listing:") {
|
||||
return None;
|
||||
}
|
||||
let listing_id_str = query.split(":").nth(1)?;
|
||||
let listing_id = DbListingId::new(listing_id_str.parse::<i64>().ok()?);
|
||||
let listing = app
|
||||
.daos
|
||||
@@ -138,22 +141,28 @@ async fn handle_forward_listing(
|
||||
|
||||
// Get the current price based on listing type
|
||||
let current_price = get_listing_current_price(&listing);
|
||||
let seller = app
|
||||
.daos
|
||||
.user
|
||||
.find_by_id(listing.base.seller_id)
|
||||
.await?
|
||||
.ok_or(BotError::internal_str("Seller not found"))?;
|
||||
|
||||
// Create a more detailed message content for the shared listing
|
||||
let message_content = format!(
|
||||
"🎯 <b>{}</b>\n\n\
|
||||
📝 {}\n\n\
|
||||
💰 <b>Current Price:</b> ${}\n\
|
||||
⏰ <b>Ends:</b> {}\n\n\
|
||||
"🎯 <b>{title}</b> by {seller}\n\n\
|
||||
📝 {description}\n\n\
|
||||
💰 <b>Current Price:</b> ${current_price}\n\
|
||||
⏰ <b>Ends:</b> {ends_at}\n\n\
|
||||
<i>Use the buttons below to interact! ⬇️</i>",
|
||||
listing.base.title,
|
||||
listing
|
||||
title = listing.base.title,
|
||||
seller = user_name_or_link(&seller),
|
||||
description = listing
|
||||
.base
|
||||
.description
|
||||
.as_deref()
|
||||
.unwrap_or("No description"),
|
||||
current_price,
|
||||
listing.base.ends_at.format("%b %d, %Y at %H:%M UTC")
|
||||
ends_at = listing.base.ends_at.format("%b %d, %Y at %H:%M UTC")
|
||||
);
|
||||
|
||||
app.bot
|
||||
@@ -301,7 +310,10 @@ async fn enter_show_listing_details(
|
||||
ManageListingButtons::PreviewMessage.to_button(),
|
||||
InlineKeyboardButton::switch_inline_query(
|
||||
ManageListingButtons::ForwardListing.title(),
|
||||
format!("forward_listing:{listing_id}"),
|
||||
format!(
|
||||
"forward_listing:{listing_id}:{nonce}",
|
||||
nonce = uuid::Uuid::new_v4()
|
||||
),
|
||||
),
|
||||
])
|
||||
.append_row([
|
||||
|
||||
@@ -25,6 +25,7 @@ fn create_test_draft() -> ListingDraft {
|
||||
currency_type: CurrencyType::Usd,
|
||||
starts_at: chrono::Utc::now(),
|
||||
ends_at: chrono::Utc::now() + chrono::Duration::hours(1),
|
||||
is_active: true,
|
||||
},
|
||||
fields: ListingFields::FixedPriceListing(FixedPriceListingFields {
|
||||
buy_now_price: MoneyAmount::default(),
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
BasicAuctionFields, BlindAuctionFields, FixedPriceListingFields, ListingBase,
|
||||
ListingFields, MultiSlotAuctionFields, PersistedListing, PersistedListingFields,
|
||||
},
|
||||
CurrencyType, ListingType, MoneyAmount, DbUserId,
|
||||
CurrencyType, DbUserId, ListingType, MoneyAmount,
|
||||
},
|
||||
DialogueRootState,
|
||||
};
|
||||
@@ -58,6 +58,7 @@ impl ListingDraft {
|
||||
description: None,
|
||||
starts_at: Utc::now(),
|
||||
ends_at: Utc::now() + Duration::days(3),
|
||||
is_active: true,
|
||||
},
|
||||
fields,
|
||||
}
|
||||
|
||||
@@ -20,6 +20,20 @@ impl BidDAO {
|
||||
#[allow(unused)]
|
||||
impl BidDAO {
|
||||
pub async fn insert_bid(&self, bid: &NewBid) -> Result<PersistedBid> {
|
||||
let mut tx = self.0.begin().await?;
|
||||
|
||||
// First, validate that the listing is active within the transaction
|
||||
// Note: SQLite doesn't support FOR UPDATE, but transactions provide isolation
|
||||
let is_active: bool = sqlx::query_scalar("SELECT is_active FROM listings WHERE id = ?")
|
||||
.bind(bid.listing_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("Listing not found"))?;
|
||||
|
||||
if !is_active {
|
||||
return Err(anyhow::anyhow!("Cannot place bid on inactive listing"));
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let binds = BindFields::default()
|
||||
.push("listing_id", &bid.listing_id)
|
||||
@@ -32,7 +46,7 @@ impl BidDAO {
|
||||
.push("created_at", &now)
|
||||
.push("updated_at", &now);
|
||||
|
||||
let query_str = format!(
|
||||
let insert_bid_query_str = format!(
|
||||
r#"
|
||||
INSERT INTO bids ({}) VALUES ({})
|
||||
RETURNING *
|
||||
@@ -40,12 +54,11 @@ impl BidDAO {
|
||||
binds.bind_names().join(", "),
|
||||
binds.bind_placeholders().join(", ")
|
||||
);
|
||||
let insert_bid_query = binds.bind_to_query(sqlx::query(&insert_bid_query_str));
|
||||
|
||||
let row = binds
|
||||
.bind_to_query(sqlx::query(&query_str))
|
||||
.fetch_one(&self.0)
|
||||
.await?;
|
||||
Ok(FromRow::from_row(&row)?)
|
||||
let row = insert_bid_query.fetch_one(&mut *tx).await?;
|
||||
tx.commit().await?;
|
||||
Ok(PersistedBid::from_row(&row)?)
|
||||
}
|
||||
|
||||
pub async fn bidder_ids_for_listing(&self, listing_id: DbListingId) -> Result<Vec<DbUserId>> {
|
||||
@@ -99,6 +112,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::db::{
|
||||
dao::listing_dao::{ListingEventSender, ListingUpdatedEvent},
|
||||
listing::{BasicAuctionFields, ListingFields},
|
||||
models::{listing::NewListing, user::NewUser},
|
||||
CurrencyType, ListingDAO, MoneyAmount, UserDAO,
|
||||
@@ -114,9 +128,10 @@ mod tests {
|
||||
crate::db::DbUserId,
|
||||
crate::db::DbListingId,
|
||||
) {
|
||||
let (tx, _) = tokio::sync::mpsc::channel::<ListingUpdatedEvent>(1);
|
||||
let pool = create_test_pool().await;
|
||||
let user_dao = UserDAO::new(pool.clone());
|
||||
let listing_dao = ListingDAO::new(pool.clone());
|
||||
let listing_dao = ListingDAO::new(pool.clone(), ListingEventSender::new(tx));
|
||||
let bid_dao = BidDAO::new(pool);
|
||||
|
||||
// Create a test user
|
||||
@@ -144,6 +159,7 @@ mod tests {
|
||||
currency_type: CurrencyType::Usd,
|
||||
starts_at: Utc::now(),
|
||||
ends_at: Utc::now() + chrono::Duration::hours(24),
|
||||
is_active: true,
|
||||
},
|
||||
fields: ListingFields::BasicAuction(BasicAuctionFields {
|
||||
starting_bid: MoneyAmount::from_str("10.00").unwrap(),
|
||||
@@ -168,7 +184,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_bid() {
|
||||
async fn test_insert_bid_on_active_listing() {
|
||||
let (_user_dao, _listing_dao, bid_dao, user_id, listing_id) =
|
||||
create_test_user_and_listing().await;
|
||||
|
||||
@@ -183,11 +199,11 @@ mod tests {
|
||||
proxy_bid_id: None,
|
||||
};
|
||||
|
||||
// Insert bid
|
||||
// Insert bid on active listing should succeed
|
||||
let inserted_bid = bid_dao
|
||||
.insert_bid(&new_bid)
|
||||
.await
|
||||
.expect("Failed to insert bid");
|
||||
.expect("Failed to insert bid on active listing");
|
||||
|
||||
// Verify the inserted bid has the correct values
|
||||
assert_eq!(inserted_bid.listing_id, new_bid.listing_id);
|
||||
@@ -207,4 +223,50 @@ mod tests {
|
||||
inserted_bid.persisted.updated_at
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_bid_on_inactive_listing_fails() {
|
||||
let (_user_dao, listing_dao, bid_dao, user_id, listing_id) =
|
||||
create_test_user_and_listing().await;
|
||||
|
||||
// Mark the listing as inactive
|
||||
|
||||
listing_dao
|
||||
.set_listing_is_active(listing_id, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_bid = NewBid {
|
||||
persisted: (),
|
||||
listing_id,
|
||||
buyer_id: user_id,
|
||||
bid_amount: MoneyAmount::from_str("25.50").unwrap(),
|
||||
description: Some("Test bid description".to_string()),
|
||||
is_cancelled: false,
|
||||
slot_number: Some(1),
|
||||
proxy_bid_id: None,
|
||||
};
|
||||
|
||||
// Verify the listing is actually inactive
|
||||
let updated_listing = listing_dao.find_by_id(listing_id).await.unwrap().unwrap();
|
||||
assert!(
|
||||
!updated_listing.base.is_active,
|
||||
"Listing should be inactive"
|
||||
);
|
||||
|
||||
// Insert bid on inactive listing should fail
|
||||
let result = bid_dao.insert_bid(&new_bid).await;
|
||||
match result {
|
||||
Ok(_) => panic!("Expected bid insertion to fail on inactive listing, but it succeeded"),
|
||||
Err(e) => {
|
||||
let error_msg = e.to_string();
|
||||
assert!(
|
||||
error_msg.contains("inactive listing")
|
||||
|| error_msg.contains("Cannot place bid"),
|
||||
"Expected error about inactive listing, got: {}",
|
||||
error_msg
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use chrono::Utc;
|
||||
use itertools::Itertools;
|
||||
use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool};
|
||||
use std::fmt::Debug;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
use crate::db::{
|
||||
bind_fields::BindFields,
|
||||
@@ -18,20 +19,50 @@ use crate::db::{
|
||||
DbListingId, DbUserId, ListingType,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ListingUpdatedEvent {
|
||||
Created(DbListingId),
|
||||
Updated(DbListingId),
|
||||
Deleted(DbListingId),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ListingEventSender(Sender<ListingUpdatedEvent>);
|
||||
impl ListingEventSender {
|
||||
pub fn new(sender: Sender<ListingUpdatedEvent>) -> Self {
|
||||
Self(sender)
|
||||
}
|
||||
|
||||
pub fn channel() -> (ListingEventSender, Receiver<ListingUpdatedEvent>) {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel::<ListingUpdatedEvent>(1);
|
||||
(ListingEventSender::new(sender), receiver)
|
||||
}
|
||||
|
||||
pub fn send(&self, event: ListingUpdatedEvent) {
|
||||
if let Err(e) = self.0.try_send(event) {
|
||||
log::error!("Error sending listing updated event: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Data Access Object for Listing operations
|
||||
#[derive(Clone)]
|
||||
pub struct ListingDAO(SqlitePool);
|
||||
pub struct ListingDAO(SqlitePool, ListingEventSender);
|
||||
|
||||
impl ListingDAO {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self(pool)
|
||||
pub fn new(pool: SqlitePool, sender: ListingEventSender) -> Self {
|
||||
Self(pool, sender)
|
||||
}
|
||||
|
||||
fn send_event(&self, event: ListingUpdatedEvent) {
|
||||
self.1.send(event);
|
||||
}
|
||||
|
||||
/// Insert a new listing into the database
|
||||
pub async fn insert_listing(&self, listing: &NewListing) -> Result<PersistedListing> {
|
||||
let now = Utc::now();
|
||||
|
||||
let binds = binds_for_listing(&listing)
|
||||
let binds = binds_for_listing(listing)
|
||||
.push("seller_id", &listing.base.seller_id)
|
||||
.push("starts_at", &listing.base.starts_at)
|
||||
.push("ends_at", &listing.base.ends_at)
|
||||
@@ -51,12 +82,14 @@ impl ListingDAO {
|
||||
.bind_to_query(sqlx::query(&query_str))
|
||||
.fetch_one(&self.0)
|
||||
.await?;
|
||||
Ok(FromRow::from_row(&row)?)
|
||||
let listing = PersistedListing::from_row(&row)?;
|
||||
self.send_event(ListingUpdatedEvent::Created(listing.persisted.id));
|
||||
Ok(listing)
|
||||
}
|
||||
|
||||
pub async fn update_listing(&self, listing: &PersistedListing) -> Result<PersistedListing> {
|
||||
let now = Utc::now();
|
||||
let binds = binds_for_listing(&listing).push("updated_at", &now);
|
||||
let binds = binds_for_listing(listing).push("updated_at", &now);
|
||||
|
||||
let query_str = format!(
|
||||
r#"
|
||||
@@ -78,7 +111,9 @@ impl ListingDAO {
|
||||
.bind(listing.base.seller_id)
|
||||
.fetch_one(&self.0)
|
||||
.await?;
|
||||
Ok(FromRow::from_row(&row)?)
|
||||
let listing = PersistedListing::from_row(&row)?;
|
||||
self.send_event(ListingUpdatedEvent::Updated(listing.persisted.id));
|
||||
Ok(listing)
|
||||
}
|
||||
|
||||
/// Find a listing by its ID
|
||||
@@ -108,8 +143,43 @@ impl ListingDAO {
|
||||
.execute(&self.0)
|
||||
.await?;
|
||||
|
||||
self.send_event(ListingUpdatedEvent::Deleted(listing_id));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_listing_is_active(
|
||||
&self,
|
||||
listing_id: DbListingId,
|
||||
is_active: bool,
|
||||
) -> Result<PersistedListing> {
|
||||
let result = sqlx::query_as::<_, PersistedListing>(
|
||||
r#"
|
||||
UPDATE listings
|
||||
SET is_active = ?
|
||||
WHERE id = ?
|
||||
RETURNING *"#,
|
||||
)
|
||||
.bind(is_active)
|
||||
.bind(listing_id)
|
||||
.fetch_one(&self.0)
|
||||
.await?;
|
||||
self.send_event(ListingUpdatedEvent::Updated(result.persisted.id));
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn find_next_ending_listing(&self) -> Result<Option<PersistedListing>> {
|
||||
let result = sqlx::query_as(
|
||||
r#"
|
||||
SELECT * FROM listings
|
||||
WHERE is_active = 1
|
||||
ORDER BY ends_at ASC
|
||||
LIMIT 1"#,
|
||||
)
|
||||
.fetch_optional(&self.0)
|
||||
.await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
fn binds_for_listing<P: Debug + Clone>(listing: &Listing<P>) -> BindFields {
|
||||
@@ -125,6 +195,7 @@ fn binds_for_base(base: &ListingBase) -> BindFields {
|
||||
.push("currency_type", &base.currency_type)
|
||||
.push("starts_at", &base.starts_at)
|
||||
.push("ends_at", &base.ends_at)
|
||||
.push("is_active", &base.is_active)
|
||||
}
|
||||
|
||||
fn binds_for_fields(fields: &ListingFields) -> BindFields {
|
||||
@@ -167,6 +238,7 @@ impl FromRow<'_, SqliteRow> for PersistedListing {
|
||||
currency_type: row.get("currency_type"),
|
||||
starts_at: row.get("starts_at"),
|
||||
ends_at: row.get("ends_at"),
|
||||
is_active: row.get("is_active"),
|
||||
};
|
||||
let fields = match listing_type {
|
||||
ListingType::BasicAuction => ListingFields::BasicAuction(BasicAuctionFields {
|
||||
@@ -201,3 +273,175 @@ impl FromRow<'_, SqliteRow> for PersistedListing {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_utils::{create_deps, with_test_listing, with_test_user};
|
||||
use chrono::{Duration, Utc};
|
||||
use rstest::rstest;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_next_ending_listing_no_active_listings() {
|
||||
let deps = create_deps().await;
|
||||
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
|
||||
|
||||
let result = listing_dao
|
||||
.find_next_ending_listing()
|
||||
.await
|
||||
.expect("Failed to query for next ending listing");
|
||||
|
||||
assert!(
|
||||
result.is_none(),
|
||||
"Expected no listings when database is empty"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_next_ending_listing_single_active_listing() {
|
||||
let deps = create_deps().await;
|
||||
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
|
||||
|
||||
let seller = with_test_user(&deps.deps, |_| {}).await;
|
||||
let ends_at = Utc::now() + Duration::hours(1);
|
||||
|
||||
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
|
||||
listing.base.ends_at = ends_at;
|
||||
})
|
||||
.await;
|
||||
|
||||
let result = listing_dao
|
||||
.find_next_ending_listing()
|
||||
.await
|
||||
.expect("Failed to query for next ending listing");
|
||||
|
||||
assert!(result.is_some(), "Expected to find the active listing");
|
||||
let found_listing = result.unwrap();
|
||||
assert_eq!(found_listing.persisted.id, inserted_listing.persisted.id);
|
||||
assert_eq!(found_listing.base.ends_at, ends_at);
|
||||
assert!(found_listing.base.is_active);
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case(3, vec![2, 1, 3], 1)] // Multiple listings, should return the one ending soonest (index 1)
|
||||
#[case(2, vec![1, 2], 0)] // Two listings, should return the one ending first (index 0)
|
||||
#[case(4, vec![4, 1, 3, 2], 1)] // Four listings, should return the one ending soonest (index 1)
|
||||
#[tokio::test]
|
||||
async fn test_find_next_ending_listing_multiple_active_listings(
|
||||
#[case] _num_listings: usize,
|
||||
#[case] hours_from_now: Vec<i64>,
|
||||
#[case] expected_index: usize,
|
||||
) {
|
||||
let deps = create_deps().await;
|
||||
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
|
||||
let seller = with_test_user(&deps.deps, |_| {}).await;
|
||||
|
||||
let mut inserted_listings = Vec::new();
|
||||
let base_time = Utc::now();
|
||||
|
||||
// Create multiple listings with different end times
|
||||
for (_i, hours) in hours_from_now.iter().enumerate() {
|
||||
let ends_at = base_time + Duration::hours(*hours);
|
||||
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
|
||||
listing.base.ends_at = ends_at;
|
||||
})
|
||||
.await;
|
||||
inserted_listings.push(inserted_listing);
|
||||
}
|
||||
|
||||
let result = listing_dao
|
||||
.find_next_ending_listing()
|
||||
.await
|
||||
.expect("Failed to query for next ending listing");
|
||||
|
||||
assert!(result.is_some(), "Expected to find an active listing");
|
||||
let found_listing = result.unwrap();
|
||||
|
||||
// Should return the listing that ends soonest
|
||||
let expected_listing = &inserted_listings[expected_index];
|
||||
assert_eq!(found_listing.persisted.id, expected_listing.persisted.id);
|
||||
assert_eq!(found_listing.base.ends_at, expected_listing.base.ends_at);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_next_ending_listing_ignores_inactive_listings() {
|
||||
let deps = create_deps().await;
|
||||
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
|
||||
let seller = with_test_user(&deps.deps, |_| {}).await;
|
||||
|
||||
// Create an inactive listing that ends sooner
|
||||
let inactive_ends_at = Utc::now() + Duration::hours(1);
|
||||
with_test_listing(&deps.deps, &seller, |listing| {
|
||||
listing.base.ends_at = inactive_ends_at;
|
||||
listing.base.is_active = false;
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create an active listing that ends later
|
||||
let active_ends_at = Utc::now() + Duration::hours(2);
|
||||
let inserted_active_listing = with_test_listing(&deps.deps, &seller, |listing| {
|
||||
listing.base.ends_at = active_ends_at;
|
||||
})
|
||||
.await;
|
||||
|
||||
let result = listing_dao
|
||||
.find_next_ending_listing()
|
||||
.await
|
||||
.expect("Failed to query for next ending listing");
|
||||
|
||||
assert!(result.is_some(), "Expected to find the active listing");
|
||||
let found_listing = result.unwrap();
|
||||
|
||||
// Should return the active listing, not the inactive one that ends sooner
|
||||
assert_eq!(
|
||||
found_listing.persisted.id,
|
||||
inserted_active_listing.persisted.id
|
||||
);
|
||||
assert_eq!(found_listing.base.ends_at, active_ends_at);
|
||||
assert!(found_listing.base.is_active);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_next_ending_listing_with_mixed_active_inactive() {
|
||||
let deps = create_deps().await;
|
||||
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
|
||||
let seller = with_test_user(&deps.deps, |_| {}).await;
|
||||
|
||||
let base_time = Utc::now();
|
||||
let mut inserted_active_listings = Vec::new();
|
||||
|
||||
// Create a mix of active and inactive listings
|
||||
let listing_configs = vec![
|
||||
(1, false), // inactive, ends in 1 hour
|
||||
(2, true), // active, ends in 2 hours
|
||||
(3, false), // inactive, ends in 3 hours
|
||||
(4, true), // active, ends in 4 hours
|
||||
(5, true), // active, ends in 5 hours
|
||||
];
|
||||
|
||||
for (hours, is_active) in listing_configs {
|
||||
let ends_at = base_time + Duration::hours(hours);
|
||||
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
|
||||
listing.base.ends_at = ends_at;
|
||||
listing.base.is_active = is_active;
|
||||
})
|
||||
.await;
|
||||
|
||||
if is_active {
|
||||
inserted_active_listings.push(inserted_listing);
|
||||
}
|
||||
}
|
||||
|
||||
let result = listing_dao
|
||||
.find_next_ending_listing()
|
||||
.await
|
||||
.expect("Failed to query for next ending listing");
|
||||
|
||||
assert!(result.is_some(), "Expected to find an active listing");
|
||||
let found_listing = result.unwrap();
|
||||
|
||||
// Should return the first active listing (ends in 2 hours), ignoring inactive ones
|
||||
let expected_listing = &inserted_active_listings[0]; // The one ending in 2 hours
|
||||
assert_eq!(found_listing.persisted.id, expected_listing.persisted.id);
|
||||
assert!(found_listing.base.is_active);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ mod user_dao;
|
||||
// Re-export DAO structs for easy access
|
||||
pub use bid_dao::BidDAO;
|
||||
pub use listing_dao::ListingDAO;
|
||||
pub use listing_dao::{ListingEventSender, ListingUpdatedEvent};
|
||||
use sqlx::SqlitePool;
|
||||
pub use user_dao::UserDAO;
|
||||
|
||||
@@ -16,10 +17,10 @@ pub struct DAOs {
|
||||
}
|
||||
|
||||
impl DAOs {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
pub fn new(pool: SqlitePool, sender: ListingEventSender) -> Self {
|
||||
Self {
|
||||
user: UserDAO::new(pool.clone()),
|
||||
listing: ListingDAO::new(pool.clone()),
|
||||
listing: ListingDAO::new(pool.clone(), sender),
|
||||
bid: BidDAO::new(pool),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ pub struct ListingBase {
|
||||
pub starts_at: DateTime<Utc>,
|
||||
pub ends_at: DateTime<Utc>,
|
||||
pub currency_type: CurrencyType,
|
||||
pub is_active: bool,
|
||||
}
|
||||
|
||||
impl ListingBase {
|
||||
@@ -171,6 +172,7 @@ mod tests {
|
||||
currency_type,
|
||||
starts_at: Utc::now(),
|
||||
ends_at: Utc::now() + Duration::days(3),
|
||||
is_active: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,11 +183,15 @@ mod tests {
|
||||
#[case(ListingFields::FixedPriceListing(FixedPriceListingFields { buy_now_price: MoneyAmount::from_str("100.00").unwrap(), slots_available: 10 }))]
|
||||
#[tokio::test]
|
||||
async fn test_blind_auction_crud(#[case] fields: ListingFields) {
|
||||
use crate::{db::ListingDAO, test_utils::create_test_pool};
|
||||
use crate::{
|
||||
db::{ListingDAO, ListingEventSender},
|
||||
test_utils::create_test_pool,
|
||||
};
|
||||
|
||||
let (tx, _) = ListingEventSender::channel();
|
||||
let pool = create_test_pool().await;
|
||||
let user_dao = UserDAO::new(pool.clone());
|
||||
let listing_dao = ListingDAO::new(pool.clone());
|
||||
let listing_dao = ListingDAO::new(pool.clone(), tx);
|
||||
let seller_id = create_test_user(&user_dao, 99999.into(), Some("testuser")).await;
|
||||
let new_listing = build_base_listing(
|
||||
seller_id,
|
||||
|
||||
271
src/listing_expiry_checker_task.rs
Normal file
271
src/listing_expiry_checker_task.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use std::collections::VecDeque;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use crate::{
|
||||
db::{dao::ListingUpdatedEvent, listing::PersistedListing, DAOs},
|
||||
*,
|
||||
};
|
||||
|
||||
pub async fn task(
|
||||
message_sender: BoxedMessageSender,
|
||||
daos: DAOs,
|
||||
mut listing_event: Receiver<ListingUpdatedEvent>,
|
||||
) -> BotResult<()> {
|
||||
const ERROR_WINDOW: Duration = Duration::seconds(3);
|
||||
const COOLDOWN_DELAY: Duration = Duration::seconds(3);
|
||||
|
||||
let mut error_timestamps: VecDeque<DateTime<Utc>> = VecDeque::new();
|
||||
|
||||
loop {
|
||||
match task_impl(&message_sender, &daos, &mut listing_event).await {
|
||||
Ok(r) => return Ok(r),
|
||||
Err(e) => {
|
||||
log::error!("Error in listing expiry checker task: {e}");
|
||||
|
||||
let now = Utc::now();
|
||||
error_timestamps.push_back(now);
|
||||
|
||||
// Remove errors older than the error window
|
||||
while let Some(&front_time) = error_timestamps.front() {
|
||||
if now - front_time > ERROR_WINDOW {
|
||||
error_timestamps.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If we have 2 or more errors within the window, apply cooldown
|
||||
if error_timestamps.len() >= 2 {
|
||||
log::warn!(
|
||||
"Multiple errors ({}) within {} seconds, applying cooldown of {} seconds",
|
||||
error_timestamps.len(),
|
||||
ERROR_WINDOW.num_seconds(),
|
||||
COOLDOWN_DELAY.num_seconds()
|
||||
);
|
||||
|
||||
tokio::time::sleep(COOLDOWN_DELAY.to_std().map_err(BotError::internal)?).await;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn task_impl(
|
||||
message_sender: &BoxedMessageSender,
|
||||
daos: &DAOs,
|
||||
listing_event: &mut Receiver<ListingUpdatedEvent>,
|
||||
) -> BotResult<()> {
|
||||
const MAX_CHECK_INTERVAL: Duration = Duration::seconds(10);
|
||||
|
||||
loop {
|
||||
let check_again_in =
|
||||
if let Some(listing) = daos.listing.find_next_ending_listing().await? {
|
||||
if handle_listing_expiry(message_sender, daos, &listing).await? {
|
||||
// if this listing was expired, immediately check again on the next listing
|
||||
continue;
|
||||
}
|
||||
// wait until the listing ends, or a listing is updated
|
||||
(Utc::now() - listing.base.ends_at).clamp(Duration::zero(), MAX_CHECK_INTERVAL)
|
||||
} else {
|
||||
MAX_CHECK_INTERVAL
|
||||
}
|
||||
.to_std()
|
||||
.map_err(BotError::internal)?;
|
||||
|
||||
tokio::select! {
|
||||
_ = listing_event.recv() => {
|
||||
log::info!("listing updated, rechecking expiry");
|
||||
continue;
|
||||
}
|
||||
_ = tokio::time::sleep(check_again_in) => {
|
||||
log::info!("listing expired or timeout, rechecking expiry");
|
||||
continue;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_listing_expiry(
|
||||
message_sender: &BoxedMessageSender,
|
||||
daos: &DAOs,
|
||||
listing: &PersistedListing,
|
||||
) -> BotResult<bool> {
|
||||
if listing.base.ends_at < Utc::now() {
|
||||
let expired_ago = Utc::now() - listing.base.ends_at;
|
||||
log::info!(
|
||||
"listing expired: {listing_id} (expired {expired_ago} ago)",
|
||||
listing_id = listing.persisted.id,
|
||||
);
|
||||
let listing = daos
|
||||
.listing
|
||||
.set_listing_is_active(listing.persisted.id, false)
|
||||
.await?;
|
||||
|
||||
let seller = daos
|
||||
.user
|
||||
.find_by_id(listing.base.seller_id)
|
||||
.await?
|
||||
.ok_or(BotError::internal_str("Seller not found"))?;
|
||||
|
||||
let bids = daos.bid.bids_for_listing(listing.persisted.id).await?;
|
||||
|
||||
if let Some(bid) = bids.first() {
|
||||
let buyer = daos
|
||||
.user
|
||||
.find_by_id(bid.buyer_id)
|
||||
.await?
|
||||
.ok_or(BotError::internal_str("Winning buyer not found"))?;
|
||||
|
||||
message_sender
|
||||
.send_message(MessageType::UserHasWonListingForBuyer {
|
||||
listing: listing.clone(),
|
||||
buyer: buyer.clone(),
|
||||
seller: seller.clone(),
|
||||
bid: bid.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
message_sender
|
||||
.send_message(MessageType::UserHasWonListingForSeller {
|
||||
listing: listing.clone(),
|
||||
buyer: buyer.clone(),
|
||||
seller: seller.clone(),
|
||||
bid: bid.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
// send message to all other buyers who did not win
|
||||
for bid in bids.iter() {
|
||||
if bid.buyer_id == buyer.persisted.id {
|
||||
continue;
|
||||
}
|
||||
let buyer = daos
|
||||
.user
|
||||
.find_by_id(bid.buyer_id)
|
||||
.await?
|
||||
.ok_or(BotError::internal_str("Losing buyer not found"))?;
|
||||
message_sender
|
||||
.send_message(MessageType::UserHasLostListingForBuyer {
|
||||
listing: listing.clone(),
|
||||
buyer: buyer.clone(),
|
||||
seller: seller.clone(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
// there was no winner, send notification to the seller
|
||||
message_sender
|
||||
.send_message(MessageType::ListingExpiredWithNoWinnerForSeller {
|
||||
listing: listing.clone(),
|
||||
seller: seller.clone(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::bid::NewBid;
|
||||
use crate::db::MoneyAmount;
|
||||
use crate::message_sender::MockMessageSender;
|
||||
use crate::test_utils::*;
|
||||
use mockall::predicate::function;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_listing_expiry() {
|
||||
let Deps { deps, .. } = create_deps().await;
|
||||
let seller = with_test_user(&deps, |user| {
|
||||
user.telegram_id = 123.into();
|
||||
user.username = Some("seller".to_string())
|
||||
})
|
||||
.await;
|
||||
let losing_buyer = with_test_user(&deps, |user| {
|
||||
user.telegram_id = 789.into();
|
||||
user.username = Some("losing_buyer".to_string())
|
||||
})
|
||||
.await;
|
||||
let winning_buyer = with_test_user(&deps, |user| {
|
||||
user.telegram_id = 456.into();
|
||||
user.username = Some("buyer".to_string())
|
||||
})
|
||||
.await;
|
||||
let listing = with_test_listing(&deps, &seller, |listing| {
|
||||
listing.base.ends_at = Utc::now() - Duration::seconds(1);
|
||||
})
|
||||
.await;
|
||||
|
||||
// the losing bid
|
||||
deps.get::<DAOs>()
|
||||
.bid
|
||||
.insert_bid(&NewBid::new_basic(
|
||||
listing.persisted.id,
|
||||
losing_buyer.persisted.id,
|
||||
MoneyAmount::from_str("50.00").unwrap(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// the winning bid
|
||||
deps.get::<DAOs>()
|
||||
.bid
|
||||
.insert_bid(&NewBid::new_basic(
|
||||
listing.persisted.id,
|
||||
winning_buyer.persisted.id,
|
||||
MoneyAmount::from_str("100.00").unwrap(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let winner_id = winning_buyer.persisted.id;
|
||||
let loser_id = losing_buyer.persisted.id;
|
||||
let mut message_sender = MockMessageSender::new();
|
||||
|
||||
message_sender
|
||||
.expect_send_message()
|
||||
.with(function(move |m| match m {
|
||||
MessageType::UserHasWonListingForSeller { buyer, .. } => {
|
||||
buyer.persisted.id == winner_id
|
||||
}
|
||||
_ => false,
|
||||
}))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
message_sender
|
||||
.expect_send_message()
|
||||
.with(function(move |m| match m {
|
||||
MessageType::UserHasWonListingForBuyer { buyer, .. } => {
|
||||
buyer.persisted.id == winner_id
|
||||
}
|
||||
_ => false,
|
||||
}))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
message_sender
|
||||
.expect_send_message()
|
||||
.with(function(move |m| match m {
|
||||
MessageType::UserHasLostListingForBuyer { buyer, .. } => {
|
||||
buyer.persisted.id == loser_id
|
||||
}
|
||||
_ => false,
|
||||
}))
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
let daos = (*deps.get::<DAOs>()).clone();
|
||||
let message_sender: BoxedMessageSender = Box::new(message_sender);
|
||||
handle_listing_expiry(&message_sender, &daos, &listing)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
41
src/main.rs
41
src/main.rs
@@ -8,6 +8,7 @@ mod dptree_utils;
|
||||
mod handle_error;
|
||||
mod handler_utils;
|
||||
mod keyboard_utils;
|
||||
mod listing_expiry_checker_task;
|
||||
mod message;
|
||||
mod message_sender;
|
||||
mod message_target;
|
||||
@@ -26,7 +27,8 @@ use crate::commands::{
|
||||
my_listings::{my_listings_handler, my_listings_inline_handler, MyListingsState},
|
||||
new_listing::{new_listing_handler, NewListingState},
|
||||
};
|
||||
use crate::db::DAOs;
|
||||
use crate::db::user::PersistedUser;
|
||||
use crate::db::{DAOs, ListingEventSender};
|
||||
use crate::handle_error::with_error_handler;
|
||||
use crate::handler_utils::{find_or_create_db_user_from_update, update_into_message_target};
|
||||
use crate::message::MessageType;
|
||||
@@ -35,12 +37,14 @@ use crate::sqlite_storage::SqliteStorage;
|
||||
use crate::start_command_data::StartCommandData;
|
||||
use anyhow::{anyhow, Result};
|
||||
pub use bot_result::*;
|
||||
use chrono::{Duration, Utc};
|
||||
use commands::*;
|
||||
use config::Config;
|
||||
use log::info;
|
||||
pub use message_target::MessageTarget;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use teloxide::dispatching::dialogue::serializer::Json;
|
||||
use teloxide::types::Me;
|
||||
use teloxide::{prelude::*, types::BotCommand, utils::command::BotCommands};
|
||||
pub use wrap_endpoint::*;
|
||||
|
||||
@@ -130,6 +134,11 @@ pub fn main_handler() -> BotHandler {
|
||||
.map(|daos: DAOs| daos.listing.clone())
|
||||
.map(|daos: DAOs| daos.bid.clone())
|
||||
.filter_map_async(find_or_create_db_user_from_update)
|
||||
.branch(
|
||||
Update::filter_message()
|
||||
.filter(filter_forwarded_from_pawctioneer_bot)
|
||||
.endpoint(handle_forwarded_from_pawctioneer_bot),
|
||||
)
|
||||
.branch(my_listings_inline_handler())
|
||||
.branch(
|
||||
dptree::entry()
|
||||
@@ -177,6 +186,16 @@ async fn main() -> Result<()> {
|
||||
info!("Starting Pawctioneer Bot...");
|
||||
let bot = Box::new(Bot::new(&config.telegram_token));
|
||||
|
||||
// set up the listing expiry checker task
|
||||
let (listing_expiry_checker_tx, listing_expiry_checker_rx) = ListingEventSender::channel();
|
||||
let daos = DAOs::new(db_pool.clone(), listing_expiry_checker_tx);
|
||||
|
||||
tokio::spawn(listing_expiry_checker_task::task(
|
||||
Box::new(BotMessageSender::new(*bot.clone(), None)),
|
||||
daos.clone(),
|
||||
listing_expiry_checker_rx,
|
||||
));
|
||||
|
||||
// Set up the bot's command menu
|
||||
setup_bot_commands(&bot).await?;
|
||||
|
||||
@@ -185,7 +204,7 @@ async fn main() -> Result<()> {
|
||||
|bot: Box<Bot>, update: Update, daos: DAOs, bot_username: BotUsername| {
|
||||
let target = update_into_message_target(update)?;
|
||||
Some(App::new(
|
||||
Box::new(BotMessageSender::new(*bot, target)),
|
||||
Box::new(BotMessageSender::new(*bot, Some(target))),
|
||||
daos.clone(),
|
||||
bot_username.0,
|
||||
))
|
||||
@@ -194,7 +213,6 @@ async fn main() -> Result<()> {
|
||||
.chain(main_handler());
|
||||
|
||||
let dialog_storage = SqliteStorage::new(db_pool.clone(), Json).await?;
|
||||
let daos = DAOs::new(db_pool.clone());
|
||||
let bot_username = bot
|
||||
.get_me()
|
||||
.await?
|
||||
@@ -227,6 +245,20 @@ async fn unknown_message_handler(msg: Message) -> BotResult {
|
||||
)))
|
||||
}
|
||||
|
||||
fn filter_forwarded_from_pawctioneer_bot(message: Message, me: Me) -> bool {
|
||||
if let Some(bot) = message.via_bot {
|
||||
if bot.id == me.user.id {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn handle_forwarded_from_pawctioneer_bot(user: PersistedUser) -> BotResult {
|
||||
info!("Received forwarded message from Pawctioneer Bot from {user:?}, ignoring...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -247,7 +279,8 @@ mod tests {
|
||||
always(),
|
||||
)
|
||||
.returning(|_, _| Ok(()));
|
||||
let mut deps = with_message_sender(create_deps().await, message_sender).await;
|
||||
let Deps { deps, .. } = create_deps().await;
|
||||
let mut deps = with_message_sender(deps, message_sender).await;
|
||||
deps.insert(create_tele_update("/help"));
|
||||
let handler = main_handler();
|
||||
dptree::type_check(handler.sig(), &deps, &[]);
|
||||
|
||||
@@ -20,4 +20,25 @@ pub enum MessageType {
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
},
|
||||
UserHasWonListingForSeller {
|
||||
listing: PersistedListing,
|
||||
seller: PersistedUser,
|
||||
buyer: PersistedUser,
|
||||
bid: PersistedBid,
|
||||
},
|
||||
UserHasWonListingForBuyer {
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
seller: PersistedUser,
|
||||
bid: PersistedBid,
|
||||
},
|
||||
UserHasLostListingForBuyer {
|
||||
listing: PersistedListing,
|
||||
buyer: PersistedUser,
|
||||
seller: PersistedUser,
|
||||
},
|
||||
ListingExpiredWithNoWinnerForSeller {
|
||||
listing: PersistedListing,
|
||||
seller: PersistedUser,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -57,8 +57,8 @@ pub fn format_datetime(dt: DateTime<Utc>) -> String {
|
||||
dt.format("%b %d, %Y %H:%M UTC").to_string()
|
||||
}
|
||||
|
||||
pub fn buyer_name_or_link(user: &PersistedUser) -> String {
|
||||
let link = format!("https://t.me/{}", user.telegram_id);
|
||||
pub fn user_name_or_link(user: &PersistedUser) -> String {
|
||||
let link = format!("tg://user?id={}", user.telegram_id);
|
||||
let name = if let Some(last_name) = &user.last_name {
|
||||
format!("{} {}", user.first_name, last_name)
|
||||
} else {
|
||||
|
||||
@@ -6,6 +6,7 @@ use sqlx::SqlitePool;
|
||||
use std::{ops::Deref, str::FromStr, sync::Arc};
|
||||
use teloxide::dispatching::dialogue::serializer::Json;
|
||||
use teloxide::types::*;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
@@ -15,7 +16,8 @@ use crate::{
|
||||
NewListing, PersistedListing,
|
||||
},
|
||||
user::{NewUser, PersistedUser},
|
||||
CurrencyType, DAOs, ListingDAO, MoneyAmount, UserDAO,
|
||||
CurrencyType, DAOs, ListingDAO, ListingEventSender, ListingUpdatedEvent, MoneyAmount,
|
||||
UserDAO,
|
||||
},
|
||||
message_sender::MockMessageSender,
|
||||
sqlite_storage::SqliteStorage,
|
||||
@@ -202,10 +204,15 @@ pub fn create_tele_callback_query(callback_data: &str, from: User) -> CallbackQu
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_deps() -> DependencyMap {
|
||||
pub struct Deps {
|
||||
pub deps: DependencyMap,
|
||||
pub listing_event_receiver: Receiver<ListingUpdatedEvent>,
|
||||
}
|
||||
pub async fn create_deps() -> Deps {
|
||||
let pool = create_test_pool().await;
|
||||
let dialog_storage = SqliteStorage::new(pool.clone(), Json).await.unwrap();
|
||||
let daos = DAOs::new(pool);
|
||||
let (tx, rx) = ListingEventSender::channel();
|
||||
let daos = DAOs::new(pool, tx);
|
||||
let me_user = create_tele_user(|user| user.username = Some("me".to_string()));
|
||||
let me = Me {
|
||||
user: me_user,
|
||||
@@ -215,7 +222,10 @@ pub async fn create_deps() -> DependencyMap {
|
||||
can_connect_to_business: true,
|
||||
has_main_web_app: true,
|
||||
};
|
||||
dptree::deps![dialog_storage, me, daos]
|
||||
Deps {
|
||||
deps: dptree::deps![dialog_storage, me, daos],
|
||||
listing_event_receiver: rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn with_dialogue(mut deps: DependencyMap, user: &PersistedUser) -> DependencyMap {
|
||||
@@ -239,19 +249,6 @@ pub async fn with_message_sender(
|
||||
deps
|
||||
}
|
||||
|
||||
pub async fn create_test_user(dao: &UserDAO) -> PersistedUser {
|
||||
dao.insert_user(&NewUser {
|
||||
persisted: (),
|
||||
telegram_id: 12345.into(),
|
||||
first_name: "Test User".to_string(),
|
||||
last_name: None,
|
||||
username: Some("testuser".to_string()),
|
||||
is_banned: false,
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn with_test_user(
|
||||
deps: &DependencyMap,
|
||||
user_fn: impl FnOnce(&mut NewUser) -> (),
|
||||
@@ -284,6 +281,7 @@ pub async fn with_test_listing(
|
||||
starts_at: Utc::now(),
|
||||
ends_at: Utc::now() + Duration::days(3),
|
||||
currency_type: CurrencyType::Usd,
|
||||
is_active: true,
|
||||
},
|
||||
fields: ListingFields::BasicAuction(BasicAuctionFields {
|
||||
starting_bid: MoneyAmount::from_str("100.00").unwrap(),
|
||||
@@ -306,6 +304,7 @@ pub async fn create_test_listing(dao: &ListingDAO, seller: &PersistedUser) -> Pe
|
||||
starts_at: Utc::now(),
|
||||
ends_at: Utc::now() + Duration::days(3),
|
||||
currency_type: CurrencyType::Usd,
|
||||
is_active: true,
|
||||
},
|
||||
fields: BasicAuction(BasicAuctionFields {
|
||||
starting_bid: MoneyAmount::from_str("100.00").unwrap(),
|
||||
|
||||
Reference in New Issue
Block a user