Files
pawctioneer-bot/src/db/dao/listing_dao.rs
2025-09-10 23:51:01 +00:00

448 lines
16 KiB
Rust

//! Listing Data Access Object (DAO)
//!
//! Provides encapsulated CRUD operations for Listing entities
use anyhow::Result;
use chrono::Utc;
use itertools::Itertools;
use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool};
use std::fmt::Debug;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::db::{
bind_fields::BindFields,
listing::{
BasicAuctionFields, BlindAuctionFields, FixedPriceListingFields, Listing, ListingBase,
ListingFields, MultiSlotAuctionFields, NewListing, PersistedListing,
PersistedListingFields,
},
DbListingId, DbUserId, ListingType,
};
#[derive(Debug, Clone, Copy)]
pub enum ListingUpdatedEvent {
Created(DbListingId),
Updated(DbListingId),
Deleted(DbListingId),
}
#[derive(Clone)]
pub struct ListingEventSender(Sender<ListingUpdatedEvent>);
impl ListingEventSender {
pub fn new(sender: Sender<ListingUpdatedEvent>) -> Self {
Self(sender)
}
pub fn channel() -> (ListingEventSender, Receiver<ListingUpdatedEvent>) {
let (sender, receiver) = tokio::sync::mpsc::channel::<ListingUpdatedEvent>(1);
(ListingEventSender::new(sender), receiver)
}
pub fn send(&self, event: ListingUpdatedEvent) {
if let Err(e) = self.0.try_send(event) {
log::error!("Error sending listing updated event: {e:?}");
}
}
}
/// Data Access Object for Listing operations
#[derive(Clone)]
pub struct ListingDAO(SqlitePool, ListingEventSender);
impl ListingDAO {
pub fn new(pool: SqlitePool, sender: ListingEventSender) -> Self {
Self(pool, sender)
}
fn send_event(&self, event: ListingUpdatedEvent) {
self.1.send(event);
}
/// Insert a new listing into the database
pub async fn insert_listing(&self, listing: &NewListing) -> Result<PersistedListing> {
let now = Utc::now();
let binds = binds_for_listing(listing)
.push("seller_id", &listing.base.seller_id)
.push("starts_at", &listing.base.starts_at)
.push("ends_at", &listing.base.ends_at)
.push("created_at", &now)
.push("updated_at", &now);
let query_str = format!(
r#"
INSERT INTO listings ({}) VALUES ({})
RETURNING *
"#,
binds.bind_names().join(", "),
binds.bind_placeholders().join(", "),
);
let row = binds
.bind_to_query(sqlx::query(&query_str))
.fetch_one(&self.0)
.await?;
let listing = PersistedListing::from_row(&row)?;
self.send_event(ListingUpdatedEvent::Created(listing.persisted.id));
Ok(listing)
}
pub async fn update_listing(&self, listing: &PersistedListing) -> Result<PersistedListing> {
let now = Utc::now();
let binds = binds_for_listing(listing).push("updated_at", &now);
let query_str = format!(
r#"
UPDATE listings
SET {}
WHERE id = ?
AND seller_id = ?
RETURNING *
"#,
binds
.bind_names()
.map(|name| format!("{name} = ?"))
.join(", "),
);
let row = binds
.bind_to_query(sqlx::query(&query_str))
.bind(listing.persisted.id)
.bind(listing.base.seller_id)
.fetch_one(&self.0)
.await?;
let listing = PersistedListing::from_row(&row)?;
self.send_event(ListingUpdatedEvent::Updated(listing.persisted.id));
Ok(listing)
}
/// Find a listing by its ID
pub async fn find_by_id(&self, listing_id: DbListingId) -> Result<Option<PersistedListing>> {
let result = sqlx::query_as("SELECT * FROM listings WHERE id = ?")
.bind(listing_id)
.fetch_optional(&self.0)
.await?;
Ok(result)
}
/// Find all listings by a seller
pub async fn find_by_seller(&self, seller_id: DbUserId) -> Result<Vec<PersistedListing>> {
let rows =
sqlx::query_as("SELECT * FROM listings WHERE seller_id = ? ORDER BY created_at DESC")
.bind(seller_id)
.fetch_all(&self.0)
.await?;
Ok(rows)
}
/// Delete a listing
pub async fn delete_listing(&self, listing_id: DbListingId) -> Result<()> {
sqlx::query("DELETE FROM listings WHERE id = ?")
.bind(listing_id)
.execute(&self.0)
.await?;
self.send_event(ListingUpdatedEvent::Deleted(listing_id));
Ok(())
}
pub async fn set_listing_is_active(
&self,
listing_id: DbListingId,
is_active: bool,
) -> Result<PersistedListing> {
let result = sqlx::query_as::<_, PersistedListing>(
r#"
UPDATE listings
SET is_active = ?
WHERE id = ?
RETURNING *"#,
)
.bind(is_active)
.bind(listing_id)
.fetch_one(&self.0)
.await?;
self.send_event(ListingUpdatedEvent::Updated(result.persisted.id));
Ok(result)
}
pub async fn find_next_ending_listing(&self) -> Result<Option<PersistedListing>> {
let result = sqlx::query_as(
r#"
SELECT * FROM listings
WHERE is_active = 1
ORDER BY ends_at ASC
LIMIT 1"#,
)
.fetch_optional(&self.0)
.await?;
Ok(result)
}
}
fn binds_for_listing<P: Debug + Clone>(listing: &Listing<P>) -> BindFields {
BindFields::default()
.extend(binds_for_base(&listing.base))
.extend(binds_for_fields(&listing.fields))
}
fn binds_for_base(base: &ListingBase) -> BindFields {
BindFields::default()
.push("title", &base.title)
.push("description", &base.description)
.push("currency_type", &base.currency_type)
.push("starts_at", &base.starts_at)
.push("ends_at", &base.ends_at)
.push("is_active", &base.is_active)
}
fn binds_for_fields(fields: &ListingFields) -> BindFields {
match fields {
ListingFields::BasicAuction(fields) => BindFields::default()
.push("listing_type", &ListingType::BasicAuction)
.push("starting_bid", &fields.starting_bid)
.push("buy_now_price", &fields.buy_now_price)
.push("min_increment", &fields.min_increment)
.push("anti_snipe_minutes", &fields.anti_snipe_minutes),
ListingFields::MultiSlotAuction(fields) => BindFields::default()
.push("listing_type", &ListingType::MultiSlotAuction)
.push("starting_bid", &fields.starting_bid)
.push("buy_now_price", &fields.buy_now_price)
.push("min_increment", &fields.min_increment)
.push("slots_available", &fields.slots_available)
.push("anti_snipe_minutes", &fields.anti_snipe_minutes),
ListingFields::FixedPriceListing(fields) => BindFields::default()
.push("listing_type", &ListingType::FixedPriceListing)
.push("buy_now_price", &fields.buy_now_price)
.push("slots_available", &fields.slots_available),
ListingFields::BlindAuction(fields) => BindFields::default()
.push("listing_type", &ListingType::BlindAuction)
.push("starting_bid", &fields.starting_bid),
}
}
impl FromRow<'_, SqliteRow> for PersistedListing {
fn from_row(row: &'_ SqliteRow) -> std::result::Result<Self, sqlx::Error> {
let listing_type = row.get("listing_type");
let persisted = PersistedListingFields {
id: row.get("id"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
};
let base = ListingBase {
seller_id: row.get("seller_id"),
title: row.get("title"),
description: row.get("description"),
currency_type: row.get("currency_type"),
starts_at: row.get("starts_at"),
ends_at: row.get("ends_at"),
is_active: row.get("is_active"),
};
let fields = match listing_type {
ListingType::BasicAuction => ListingFields::BasicAuction(BasicAuctionFields {
starting_bid: row.get("starting_bid"),
buy_now_price: row.get("buy_now_price"),
min_increment: row.get("min_increment"),
anti_snipe_minutes: row.get("anti_snipe_minutes"),
}),
ListingType::MultiSlotAuction => {
ListingFields::MultiSlotAuction(MultiSlotAuctionFields {
starting_bid: row.get("starting_bid"),
buy_now_price: row.get("buy_now_price"),
min_increment: row.get("min_increment"),
slots_available: row.get("slots_available"),
anti_snipe_minutes: row.get("anti_snipe_minutes"),
})
}
ListingType::FixedPriceListing => {
ListingFields::FixedPriceListing(FixedPriceListingFields {
buy_now_price: row.get("buy_now_price"),
slots_available: row.get("slots_available"),
})
}
ListingType::BlindAuction => ListingFields::BlindAuction(BlindAuctionFields {
starting_bid: row.get("starting_bid"),
}),
};
Ok(PersistedListing {
persisted,
base,
fields,
})
}
}
#[cfg(test)]
mod tests {
use crate::test_utils::{create_deps, with_test_listing, with_test_user};
use chrono::{Duration, Utc};
use rstest::rstest;
#[tokio::test]
async fn test_find_next_ending_listing_no_active_listings() {
let deps = create_deps().await;
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
let result = listing_dao
.find_next_ending_listing()
.await
.expect("Failed to query for next ending listing");
assert!(
result.is_none(),
"Expected no listings when database is empty"
);
}
#[tokio::test]
async fn test_find_next_ending_listing_single_active_listing() {
let deps = create_deps().await;
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
let seller = with_test_user(&deps.deps, |_| {}).await;
let ends_at = Utc::now() + Duration::hours(1);
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
listing.base.ends_at = ends_at;
})
.await;
let result = listing_dao
.find_next_ending_listing()
.await
.expect("Failed to query for next ending listing");
assert!(result.is_some(), "Expected to find the active listing");
let found_listing = result.unwrap();
assert_eq!(found_listing.persisted.id, inserted_listing.persisted.id);
assert_eq!(found_listing.base.ends_at, ends_at);
assert!(found_listing.base.is_active);
}
#[rstest]
#[case(3, vec![2, 1, 3], 1)] // Multiple listings, should return the one ending soonest (index 1)
#[case(2, vec![1, 2], 0)] // Two listings, should return the one ending first (index 0)
#[case(4, vec![4, 1, 3, 2], 1)] // Four listings, should return the one ending soonest (index 1)
#[tokio::test]
async fn test_find_next_ending_listing_multiple_active_listings(
#[case] _num_listings: usize,
#[case] hours_from_now: Vec<i64>,
#[case] expected_index: usize,
) {
let deps = create_deps().await;
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
let seller = with_test_user(&deps.deps, |_| {}).await;
let mut inserted_listings = Vec::new();
let base_time = Utc::now();
// Create multiple listings with different end times
for (_i, hours) in hours_from_now.iter().enumerate() {
let ends_at = base_time + Duration::hours(*hours);
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
listing.base.ends_at = ends_at;
})
.await;
inserted_listings.push(inserted_listing);
}
let result = listing_dao
.find_next_ending_listing()
.await
.expect("Failed to query for next ending listing");
assert!(result.is_some(), "Expected to find an active listing");
let found_listing = result.unwrap();
// Should return the listing that ends soonest
let expected_listing = &inserted_listings[expected_index];
assert_eq!(found_listing.persisted.id, expected_listing.persisted.id);
assert_eq!(found_listing.base.ends_at, expected_listing.base.ends_at);
}
#[tokio::test]
async fn test_find_next_ending_listing_ignores_inactive_listings() {
let deps = create_deps().await;
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
let seller = with_test_user(&deps.deps, |_| {}).await;
// Create an inactive listing that ends sooner
let inactive_ends_at = Utc::now() + Duration::hours(1);
with_test_listing(&deps.deps, &seller, |listing| {
listing.base.ends_at = inactive_ends_at;
listing.base.is_active = false;
})
.await;
// Create an active listing that ends later
let active_ends_at = Utc::now() + Duration::hours(2);
let inserted_active_listing = with_test_listing(&deps.deps, &seller, |listing| {
listing.base.ends_at = active_ends_at;
})
.await;
let result = listing_dao
.find_next_ending_listing()
.await
.expect("Failed to query for next ending listing");
assert!(result.is_some(), "Expected to find the active listing");
let found_listing = result.unwrap();
// Should return the active listing, not the inactive one that ends sooner
assert_eq!(
found_listing.persisted.id,
inserted_active_listing.persisted.id
);
assert_eq!(found_listing.base.ends_at, active_ends_at);
assert!(found_listing.base.is_active);
}
#[tokio::test]
async fn test_find_next_ending_listing_with_mixed_active_inactive() {
let deps = create_deps().await;
let listing_dao = &deps.deps.get::<crate::db::DAOs>().listing;
let seller = with_test_user(&deps.deps, |_| {}).await;
let base_time = Utc::now();
let mut inserted_active_listings = Vec::new();
// Create a mix of active and inactive listings
let listing_configs = vec![
(1, false), // inactive, ends in 1 hour
(2, true), // active, ends in 2 hours
(3, false), // inactive, ends in 3 hours
(4, true), // active, ends in 4 hours
(5, true), // active, ends in 5 hours
];
for (hours, is_active) in listing_configs {
let ends_at = base_time + Duration::hours(hours);
let inserted_listing = with_test_listing(&deps.deps, &seller, |listing| {
listing.base.ends_at = ends_at;
listing.base.is_active = is_active;
})
.await;
if is_active {
inserted_active_listings.push(inserted_listing);
}
}
let result = listing_dao
.find_next_ending_listing()
.await
.expect("Failed to query for next ending listing");
assert!(result.is_some(), "Expected to find an active listing");
let found_listing = result.unwrap();
// Should return the first active listing (ends in 2 hours), ignoring inactive ones
let expected_listing = &inserted_active_listings[0]; // The one ending in 2 hours
assert_eq!(found_listing.persisted.id, expected_listing.persisted.id);
assert!(found_listing.base.is_active);
}
}