add zstd compression, tests for store/get
This commit is contained in:
@@ -1,4 +1,8 @@
|
|||||||
use crate::{sha256::Sha256, shard::StoreResult, shards::Shards};
|
use crate::{
|
||||||
|
sha256::Sha256,
|
||||||
|
shard::{StoreArgs, StoreResult},
|
||||||
|
shards::Shards,
|
||||||
|
};
|
||||||
use axum::{body::Bytes, response::IntoResponse, Extension, Json};
|
use axum::{body::Bytes, response::IntoResponse, Extension, Json};
|
||||||
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
|
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
|
||||||
|
|
||||||
@@ -19,10 +23,12 @@ pub enum StoreResponse {
|
|||||||
Created {
|
Created {
|
||||||
stored_size: usize,
|
stored_size: usize,
|
||||||
data_size: usize,
|
data_size: usize,
|
||||||
|
created_at: String,
|
||||||
},
|
},
|
||||||
Exists {
|
Exists {
|
||||||
stored_size: usize,
|
stored_size: usize,
|
||||||
data_size: usize,
|
data_size: usize,
|
||||||
|
created_at: String,
|
||||||
},
|
},
|
||||||
Sha256Mismatch {
|
Sha256Mismatch {
|
||||||
expected_sha256: String,
|
expected_sha256: String,
|
||||||
@@ -49,16 +55,20 @@ impl From<StoreResult> for StoreResponse {
|
|||||||
StoreResult::Created {
|
StoreResult::Created {
|
||||||
stored_size,
|
stored_size,
|
||||||
data_size,
|
data_size,
|
||||||
|
created_at,
|
||||||
} => StoreResponse::Created {
|
} => StoreResponse::Created {
|
||||||
stored_size,
|
stored_size,
|
||||||
data_size,
|
data_size,
|
||||||
|
created_at: created_at.to_rfc3339(),
|
||||||
},
|
},
|
||||||
StoreResult::Exists {
|
StoreResult::Exists {
|
||||||
stored_size,
|
stored_size,
|
||||||
data_size,
|
data_size,
|
||||||
|
created_at,
|
||||||
} => StoreResponse::Exists {
|
} => StoreResponse::Exists {
|
||||||
stored_size,
|
stored_size,
|
||||||
data_size,
|
data_size,
|
||||||
|
created_at: created_at.to_rfc3339(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -92,7 +102,11 @@ pub async fn store_handler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
match shard
|
match shard
|
||||||
.store(sha256, request.content_type, request.data.contents)
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: request.content_type,
|
||||||
|
data: request.data.contents,
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(store_result) => store_result.into(),
|
Ok(store_result) => store_result.into(),
|
||||||
@@ -105,19 +119,25 @@ pub async fn store_handler(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::shards::test::make_shards;
|
use crate::{shards::test::make_shards_with_compression, UseCompression};
|
||||||
use axum::body::Bytes;
|
use axum::body::Bytes;
|
||||||
use axum_typed_multipart::FieldData;
|
use axum_typed_multipart::FieldData;
|
||||||
|
use rstest::rstest;
|
||||||
|
|
||||||
async fn send_request(sha256: Option<Sha256>, data: Bytes) -> StoreResponse {
|
async fn send_request<D: Into<Bytes>>(
|
||||||
|
sha256: Option<Sha256>,
|
||||||
|
content_type: &str,
|
||||||
|
use_compression: UseCompression,
|
||||||
|
data: D,
|
||||||
|
) -> StoreResponse {
|
||||||
store_handler(
|
store_handler(
|
||||||
Extension(make_shards().await),
|
Extension(make_shards_with_compression(use_compression).await),
|
||||||
TypedMultipart(StoreRequest {
|
TypedMultipart(StoreRequest {
|
||||||
sha256: sha256.map(|s| s.hex_string()),
|
sha256: sha256.map(|s| s.hex_string()),
|
||||||
content_type: "text/plain".to_string(),
|
content_type: content_type.to_string(),
|
||||||
data: FieldData {
|
data: FieldData {
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
contents: data,
|
contents: data.into(),
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
@@ -126,7 +146,7 @@ pub mod test {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_store_handler() {
|
async fn test_store_handler() {
|
||||||
let result = send_request(None, "hello, world!".as_bytes().into()).await;
|
let result = send_request(None, "text/plain", UseCompression::Auto, "hello, world!").await;
|
||||||
assert_eq!(result.status_code(), StatusCode::CREATED);
|
assert_eq!(result.status_code(), StatusCode::CREATED);
|
||||||
assert!(matches!(result, StoreResponse::Created { .. }));
|
assert!(matches!(result, StoreResponse::Created { .. }));
|
||||||
}
|
}
|
||||||
@@ -135,7 +155,13 @@ pub mod test {
|
|||||||
async fn test_store_handler_mismatched_sha256() {
|
async fn test_store_handler_mismatched_sha256() {
|
||||||
let not_hello_world = Sha256::from_bytes("goodbye, planet!".as_bytes());
|
let not_hello_world = Sha256::from_bytes("goodbye, planet!".as_bytes());
|
||||||
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
||||||
let result = send_request(Some(not_hello_world), "hello, world!".as_bytes().into()).await;
|
let result = send_request(
|
||||||
|
Some(not_hello_world),
|
||||||
|
"text/plain",
|
||||||
|
UseCompression::Auto,
|
||||||
|
"hello, world!",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
assert_eq!(result.status_code(), StatusCode::BAD_REQUEST);
|
assert_eq!(result.status_code(), StatusCode::BAD_REQUEST);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result,
|
result,
|
||||||
@@ -148,8 +174,51 @@ pub mod test {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_store_handler_matching_sha256() {
|
async fn test_store_handler_matching_sha256() {
|
||||||
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
||||||
let result = send_request(Some(hello_world), "hello, world!".as_bytes().into()).await;
|
let result = send_request(
|
||||||
|
Some(hello_world),
|
||||||
|
"text/plain",
|
||||||
|
UseCompression::Auto,
|
||||||
|
"hello, world!",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
assert_eq!(result.status_code(), StatusCode::CREATED);
|
assert_eq!(result.status_code(), StatusCode::CREATED);
|
||||||
assert!(matches!(result, StoreResponse::Created { .. }));
|
assert!(matches!(result, StoreResponse::Created { .. }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_assert_eq<T: PartialEq + std::fmt::Debug>(value: T) -> impl Fn(T) {
|
||||||
|
move |actual| assert_eq!(actual, value)
|
||||||
|
}
|
||||||
|
fn make_assert_lt<T: PartialOrd + std::fmt::Debug>(value: T) -> impl Fn(T) {
|
||||||
|
move |actual| assert!(actual < value)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[rstest]
|
||||||
|
// textual should be compressed by default
|
||||||
|
#[case("text/plain", UseCompression::Auto, make_assert_lt(1024))]
|
||||||
|
#[case("text/plain", UseCompression::Zstd, make_assert_lt(1024))]
|
||||||
|
#[case("text/plain", UseCompression::None, make_assert_eq(1024))]
|
||||||
|
// images, etc should not be compressed by default
|
||||||
|
#[case("image/jpg", UseCompression::Auto, make_assert_eq(1024))]
|
||||||
|
#[case("image/jpg", UseCompression::Zstd, make_assert_lt(1024))]
|
||||||
|
#[case("image/jpg", UseCompression::None, make_assert_eq(1024))]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compressible_data<F: Fn(usize)>(
|
||||||
|
#[case] content_type: &str,
|
||||||
|
#[case] use_compression: UseCompression,
|
||||||
|
#[case] assert_stored_size: F,
|
||||||
|
) {
|
||||||
|
let result = send_request(None, content_type, use_compression, vec![0; 1024]).await;
|
||||||
|
assert_eq!(result.status_code(), StatusCode::CREATED);
|
||||||
|
match result {
|
||||||
|
StoreResponse::Created {
|
||||||
|
stored_size,
|
||||||
|
data_size,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
assert_stored_size(stored_size);
|
||||||
|
assert_eq!(data_size, 1024);
|
||||||
|
}
|
||||||
|
_ => panic!("expected StoreResponse::Created"),
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
16
src/main.rs
16
src/main.rs
@@ -9,7 +9,7 @@ use axum::{
|
|||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
Extension, Router,
|
Extension, Router,
|
||||||
};
|
};
|
||||||
use clap::Parser;
|
use clap::{Parser, ValueEnum};
|
||||||
use shard::Shard;
|
use shard::Shard;
|
||||||
use std::{error::Error, path::PathBuf};
|
use std::{error::Error, path::PathBuf};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
@@ -34,6 +34,18 @@ struct Args {
|
|||||||
/// Number of shards
|
/// Number of shards
|
||||||
#[arg(short, long)]
|
#[arg(short, long)]
|
||||||
shards: Option<usize>,
|
shards: Option<usize>,
|
||||||
|
|
||||||
|
/// How to compress stored data
|
||||||
|
#[arg(short, long, default_value = "auto")]
|
||||||
|
compression: UseCompression,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, PartialEq, Debug, Copy, Clone, ValueEnum)]
|
||||||
|
pub enum UseCompression {
|
||||||
|
#[default]
|
||||||
|
Auto,
|
||||||
|
None,
|
||||||
|
Zstd,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
@@ -67,7 +79,7 @@ fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
for shard_id in 0..num_shards {
|
for shard_id in 0..num_shards {
|
||||||
let shard_sqlite_path = db_path.join(format!("shard{}.sqlite", shard_id));
|
let shard_sqlite_path = db_path.join(format!("shard{}.sqlite", shard_id));
|
||||||
let shard_sqlite_conn = Connection::open(&shard_sqlite_path).await?;
|
let shard_sqlite_conn = Connection::open(&shard_sqlite_path).await?;
|
||||||
let shard = Shard::open(shard_id, shard_sqlite_conn).await?;
|
let shard = Shard::open(shard_id, UseCompression::Auto, shard_sqlite_conn).await?;
|
||||||
info!(
|
info!(
|
||||||
"shard {} has {} entries",
|
"shard {} has {} entries",
|
||||||
shard.id(),
|
shard.id(),
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ impl Display for Sha256Error {
|
|||||||
}
|
}
|
||||||
impl Error for Sha256Error {}
|
impl Error for Sha256Error {}
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq)]
|
#[derive(Clone, Copy, PartialEq, Eq, Default)]
|
||||||
pub struct Sha256([u8; 32]);
|
pub struct Sha256([u8; 32]);
|
||||||
impl Sha256 {
|
impl Sha256 {
|
||||||
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::io::Read;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
pub struct GetResult {
|
pub struct GetResult {
|
||||||
@@ -11,7 +13,7 @@ pub struct GetResult {
|
|||||||
impl Shard {
|
impl Shard {
|
||||||
pub async fn get(&self, sha256: Sha256) -> Result<Option<GetResult>, Box<dyn Error>> {
|
pub async fn get(&self, sha256: Sha256) -> Result<Option<GetResult>, Box<dyn Error>> {
|
||||||
self.conn
|
self.conn
|
||||||
.call(move |conn| get_impl(conn, sha256).map_err(|e| e.into()))
|
.call(move |conn| get_impl(conn, sha256))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("get failed: {}", e);
|
error!("get failed: {}", e);
|
||||||
@@ -23,29 +25,49 @@ impl Shard {
|
|||||||
fn get_impl(
|
fn get_impl(
|
||||||
conn: &mut rusqlite::Connection,
|
conn: &mut rusqlite::Connection,
|
||||||
sha256: Sha256,
|
sha256: Sha256,
|
||||||
) -> Result<Option<GetResult>, rusqlite::Error> {
|
) -> Result<Option<GetResult>, tokio_rusqlite::Error> {
|
||||||
conn.query_row(
|
let maybe_row = conn
|
||||||
"SELECT content_type, compressed_size, created_at, data FROM entries WHERE sha256 = ?",
|
.query_row(
|
||||||
params![sha256.hex_string()],
|
"SELECT content_type, compressed_size, created_at, compression, data
|
||||||
|row| {
|
FROM entries
|
||||||
let content_type = row.get(0)?;
|
WHERE sha256 = ?",
|
||||||
let stored_size = row.get(1)?;
|
params![sha256.hex_string()],
|
||||||
let created_at = parse_created_at_str(row.get(2)?)?;
|
|row| {
|
||||||
let data = row.get(3)?;
|
let content_type = row.get(0)?;
|
||||||
Ok(GetResult {
|
let stored_size = row.get(1)?;
|
||||||
sha256,
|
let created_at = parse_created_at_str(row.get(2)?)?;
|
||||||
content_type,
|
let compression = row.get(3)?;
|
||||||
stored_size,
|
let data: Vec<u8> = row.get(4)?;
|
||||||
created_at,
|
Ok((content_type, stored_size, created_at, compression, data))
|
||||||
data,
|
},
|
||||||
})
|
)
|
||||||
},
|
.optional()
|
||||||
)
|
.map_err(into_tokio_rusqlite_err)?;
|
||||||
.optional()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_created_at_str(created_at_str: String) -> Result<UtcDateTime, rusqlite::Error> {
|
let row = match maybe_row {
|
||||||
let parsed = chrono::DateTime::parse_from_rfc3339(&created_at_str)
|
Some(row) => row,
|
||||||
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?;
|
None => return Ok(None),
|
||||||
Ok(parsed.to_utc())
|
};
|
||||||
|
|
||||||
|
let (content_type, stored_size, created_at, compression, data) = row;
|
||||||
|
let data = match compression {
|
||||||
|
Compression::None => data,
|
||||||
|
Compression::Zstd => {
|
||||||
|
let mut decoder =
|
||||||
|
zstd::Decoder::new(data.as_slice()).map_err(into_tokio_rusqlite_err)?;
|
||||||
|
let mut decompressed = vec![];
|
||||||
|
decoder
|
||||||
|
.read_to_end(&mut decompressed)
|
||||||
|
.map_err(into_tokio_rusqlite_err)?;
|
||||||
|
decompressed
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Some(GetResult {
|
||||||
|
sha256,
|
||||||
|
content_type,
|
||||||
|
stored_size,
|
||||||
|
created_at,
|
||||||
|
data,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,22 +5,27 @@ pub enum StoreResult {
|
|||||||
Created {
|
Created {
|
||||||
stored_size: usize,
|
stored_size: usize,
|
||||||
data_size: usize,
|
data_size: usize,
|
||||||
|
created_at: UtcDateTime,
|
||||||
},
|
},
|
||||||
Exists {
|
Exists {
|
||||||
stored_size: usize,
|
stored_size: usize,
|
||||||
data_size: usize,
|
data_size: usize,
|
||||||
|
created_at: UtcDateTime,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct StoreArgs {
|
||||||
|
pub sha256: Sha256,
|
||||||
|
pub content_type: String,
|
||||||
|
pub data: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
impl Shard {
|
impl Shard {
|
||||||
pub async fn store(
|
pub async fn store(&self, store_args: StoreArgs) -> Result<StoreResult, Box<dyn Error>> {
|
||||||
&self,
|
let use_compression = self.use_compression;
|
||||||
sha256: Sha256,
|
|
||||||
content_type: String,
|
|
||||||
data: Bytes,
|
|
||||||
) -> Result<StoreResult, Box<dyn Error>> {
|
|
||||||
self.conn
|
self.conn
|
||||||
.call(move |conn| store(conn, sha256, content_type, data).map_err(|e| e.into()))
|
.call(move |conn| store(conn, use_compression, store_args))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("store failed: {}", e);
|
error!("store failed: {}", e);
|
||||||
@@ -31,10 +36,13 @@ impl Shard {
|
|||||||
|
|
||||||
fn store(
|
fn store(
|
||||||
conn: &mut rusqlite::Connection,
|
conn: &mut rusqlite::Connection,
|
||||||
sha256: Sha256,
|
use_compression: UseCompression,
|
||||||
content_type: String,
|
StoreArgs {
|
||||||
data: Bytes,
|
sha256,
|
||||||
) -> Result<StoreResult, rusqlite::Error> {
|
content_type,
|
||||||
|
data,
|
||||||
|
}: StoreArgs,
|
||||||
|
) -> Result<StoreResult, tokio_rusqlite::Error> {
|
||||||
let sha256 = sha256.hex_string();
|
let sha256 = sha256.hex_string();
|
||||||
|
|
||||||
// check for existing entry
|
// check for existing entry
|
||||||
@@ -46,6 +54,7 @@ fn store(
|
|||||||
Ok(StoreResult::Exists {
|
Ok(StoreResult::Exists {
|
||||||
stored_size: row.get(0)?,
|
stored_size: row.get(0)?,
|
||||||
data_size: row.get(1)?,
|
data_size: row.get(1)?,
|
||||||
|
created_at: parse_created_at_str(row.get(2)?)?,
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -55,24 +64,59 @@ fn store(
|
|||||||
return Ok(existing);
|
return Ok(existing);
|
||||||
}
|
}
|
||||||
|
|
||||||
let created_at = chrono::Utc::now().to_rfc3339();
|
let created_at = chrono::Utc::now();
|
||||||
let data_size = data.len();
|
let uncompressed_size = data.len();
|
||||||
|
let tmp_data_holder;
|
||||||
|
|
||||||
|
let use_compression = match use_compression {
|
||||||
|
UseCompression::None => false,
|
||||||
|
UseCompression::Auto => auto_compressible_content_type(&content_type),
|
||||||
|
UseCompression::Zstd => true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (compression, data) = if use_compression {
|
||||||
|
tmp_data_holder = zstd::encode_all(&data[..], 0).map_err(into_tokio_rusqlite_err)?;
|
||||||
|
if tmp_data_holder.len() < data.len() {
|
||||||
|
(Compression::Zstd, &tmp_data_holder[..])
|
||||||
|
} else {
|
||||||
|
(Compression::None, &data[..])
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(Compression::None, &data[..])
|
||||||
|
};
|
||||||
|
let compressed_size = data.len();
|
||||||
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"INSERT INTO entries (sha256, content_type, compression, uncompressed_size, compressed_size, data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
"INSERT INTO entries
|
||||||
|
(sha256, content_type, compression, uncompressed_size, compressed_size, data, created_at)
|
||||||
|
VALUES
|
||||||
|
(?, ?, ?, ?, ?, ?, ?)
|
||||||
|
",
|
||||||
params![
|
params![
|
||||||
sha256,
|
sha256,
|
||||||
content_type,
|
content_type,
|
||||||
0,
|
compression,
|
||||||
data_size,
|
uncompressed_size,
|
||||||
data_size,
|
compressed_size,
|
||||||
&data[..],
|
data,
|
||||||
created_at,
|
created_at.to_rfc3339(),
|
||||||
],
|
],
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(StoreResult::Created {
|
Ok(StoreResult::Created {
|
||||||
stored_size: data_size,
|
stored_size: compressed_size,
|
||||||
data_size,
|
data_size: uncompressed_size,
|
||||||
|
created_at,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn auto_compressible_content_type(content_type: &str) -> bool {
|
||||||
|
[
|
||||||
|
"text/",
|
||||||
|
"application/xml",
|
||||||
|
"application/json",
|
||||||
|
"application/javascript",
|
||||||
|
]
|
||||||
|
.iter()
|
||||||
|
.any(|ct| content_type.starts_with(ct))
|
||||||
|
}
|
||||||
|
|||||||
153
src/shard/mod.rs
153
src/shard/mod.rs
@@ -4,11 +4,11 @@ mod fn_store;
|
|||||||
pub mod shard_error;
|
pub mod shard_error;
|
||||||
|
|
||||||
pub use fn_get::GetResult;
|
pub use fn_get::GetResult;
|
||||||
pub use fn_store::StoreResult;
|
pub use fn_store::{StoreArgs, StoreResult};
|
||||||
|
|
||||||
use crate::{sha256::Sha256, shard::shard_error::ShardError};
|
use crate::{sha256::Sha256, shard::shard_error::ShardError, UseCompression};
|
||||||
use axum::body::Bytes;
|
use axum::body::Bytes;
|
||||||
use rusqlite::{params, types::FromSql, OptionalExtension};
|
use rusqlite::{params, types::FromSql, OptionalExtension, ToSql};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use tokio_rusqlite::Connection;
|
use tokio_rusqlite::Connection;
|
||||||
use tracing::{debug, error};
|
use tracing::{debug, error};
|
||||||
@@ -19,11 +19,43 @@ pub type UtcDateTime = chrono::DateTime<chrono::Utc>;
|
|||||||
pub struct Shard {
|
pub struct Shard {
|
||||||
id: usize,
|
id: usize,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
|
use_compression: UseCompression,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
enum Compression {
|
||||||
|
None,
|
||||||
|
Zstd,
|
||||||
|
}
|
||||||
|
impl ToSql for Compression {
|
||||||
|
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||||
|
match self {
|
||||||
|
Compression::None => 0.to_sql(),
|
||||||
|
Compression::Zstd => 1.to_sql(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl FromSql for Compression {
|
||||||
|
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||||
|
match value.as_i64()? {
|
||||||
|
0 => Ok(Compression::None),
|
||||||
|
1 => Ok(Compression::Zstd),
|
||||||
|
_ => Err(rusqlite::types::FromSqlError::InvalidType),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Shard {
|
impl Shard {
|
||||||
pub async fn open(id: usize, conn: Connection) -> Result<Self, Box<dyn Error>> {
|
pub async fn open(
|
||||||
let shard = Self { id, conn };
|
id: usize,
|
||||||
|
use_compression: UseCompression,
|
||||||
|
conn: Connection,
|
||||||
|
) -> Result<Self, Box<dyn Error>> {
|
||||||
|
let shard = Self {
|
||||||
|
id,
|
||||||
|
use_compression,
|
||||||
|
conn,
|
||||||
|
};
|
||||||
shard.migrate().await?;
|
shard.migrate().await?;
|
||||||
Ok(shard)
|
Ok(shard)
|
||||||
}
|
}
|
||||||
@@ -69,14 +101,32 @@ async fn get_num_entries(conn: &Connection) -> Result<usize, tokio_rusqlite::Err
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_created_at_str(created_at_str: String) -> Result<UtcDateTime, rusqlite::Error> {
|
||||||
|
let parsed = chrono::DateTime::parse_from_rfc3339(&created_at_str)
|
||||||
|
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?;
|
||||||
|
Ok(parsed.to_utc())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_tokio_rusqlite_err<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
|
||||||
|
e: E,
|
||||||
|
) -> tokio_rusqlite::Error {
|
||||||
|
tokio_rusqlite::Error::Other(e.into())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
use super::StoreResult;
|
use rstest::rstest;
|
||||||
use crate::sha256::Sha256;
|
|
||||||
|
use super::{StoreResult, UseCompression};
|
||||||
|
use crate::{sha256::Sha256, shard::StoreArgs};
|
||||||
|
|
||||||
|
pub async fn make_shard_with_compression(use_compression: UseCompression) -> super::Shard {
|
||||||
|
let conn = tokio_rusqlite::Connection::open_in_memory().await.unwrap();
|
||||||
|
super::Shard::open(0, use_compression, conn).await.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn make_shard() -> super::Shard {
|
pub async fn make_shard() -> super::Shard {
|
||||||
let conn = tokio_rusqlite::Connection::open_in_memory().await.unwrap();
|
make_shard_with_compression(UseCompression::Auto).await
|
||||||
super::Shard::open(0, conn).await.unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -107,16 +157,25 @@ pub mod test {
|
|||||||
let data = "hello, world!".as_bytes();
|
let data = "hello, world!".as_bytes();
|
||||||
let sha256 = Sha256::from_bytes(data);
|
let sha256 = Sha256::from_bytes(data);
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: "text/plain".to_string(),
|
||||||
|
data: data.into(),
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
match store_result {
|
||||||
store_result,
|
|
||||||
StoreResult::Created {
|
StoreResult::Created {
|
||||||
data_size: data.len(),
|
stored_size,
|
||||||
stored_size: data.len()
|
data_size,
|
||||||
|
created_at,
|
||||||
|
} => {
|
||||||
|
assert_eq!(stored_size, data.len());
|
||||||
|
assert_eq!(data_size, data.len());
|
||||||
|
assert!(created_at > chrono::Utc::now() - chrono::Duration::seconds(1));
|
||||||
}
|
}
|
||||||
);
|
_ => panic!("expected StoreResult::Created"),
|
||||||
|
}
|
||||||
assert_eq!(shard.num_entries().await.unwrap(), 1);
|
assert_eq!(shard.num_entries().await.unwrap(), 1);
|
||||||
|
|
||||||
let get_result = shard.get(sha256).await.unwrap().unwrap();
|
let get_result = shard.get(sha256).await.unwrap().unwrap();
|
||||||
@@ -132,28 +191,76 @@ pub mod test {
|
|||||||
let sha256 = Sha256::from_bytes(data);
|
let sha256 = Sha256::from_bytes(data);
|
||||||
|
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: "text/plain".to_string(),
|
||||||
|
data: data.into(),
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
assert!(matches!(store_result, StoreResult::Created { .. }));
|
||||||
store_result,
|
|
||||||
|
let created_at = match store_result {
|
||||||
StoreResult::Created {
|
StoreResult::Created {
|
||||||
data_size: data.len(),
|
stored_size,
|
||||||
stored_size: data.len()
|
data_size,
|
||||||
|
created_at,
|
||||||
|
} => {
|
||||||
|
assert_eq!(stored_size, data.len());
|
||||||
|
assert_eq!(data_size, data.len());
|
||||||
|
created_at
|
||||||
}
|
}
|
||||||
);
|
_ => panic!("expected StoreResult::Created"),
|
||||||
|
};
|
||||||
|
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: "text/plain".to_string(),
|
||||||
|
data: data.into(),
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
store_result,
|
store_result,
|
||||||
StoreResult::Exists {
|
StoreResult::Exists {
|
||||||
data_size: data.len(),
|
data_size: data.len(),
|
||||||
stored_size: data.len()
|
stored_size: data.len(),
|
||||||
|
created_at
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert_eq!(shard.num_entries().await.unwrap(), 1);
|
assert_eq!(shard.num_entries().await.unwrap(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[rstest]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compression_store_get(
|
||||||
|
#[values(UseCompression::Auto, UseCompression::None, UseCompression::Zstd)]
|
||||||
|
use_compression: UseCompression,
|
||||||
|
#[values(true, false)] incompressible_data: bool,
|
||||||
|
#[values("text/string", "image/jpg", "application/octet-stream")] content_type: String,
|
||||||
|
) {
|
||||||
|
let shard = make_shard_with_compression(use_compression).await;
|
||||||
|
let mut data = vec![b'.'; 1024];
|
||||||
|
if incompressible_data {
|
||||||
|
for byte in data.iter_mut() {
|
||||||
|
*byte = rand::random();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let sha256 = Sha256::from_bytes(&data);
|
||||||
|
let store_result = shard
|
||||||
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: content_type.clone(),
|
||||||
|
data: data.clone().into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(store_result, StoreResult::Created { .. }));
|
||||||
|
|
||||||
|
let get_result = shard.get(sha256).await.unwrap().unwrap();
|
||||||
|
assert_eq!(get_result.content_type, content_type);
|
||||||
|
assert_eq!(get_result.data, data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,11 +34,15 @@ impl Shards {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
use crate::shard::test::make_shard;
|
use crate::{shard::test::make_shard_with_compression, UseCompression};
|
||||||
|
|
||||||
use super::Shards;
|
use super::Shards;
|
||||||
|
|
||||||
|
pub async fn make_shards_with_compression(use_compression: UseCompression) -> Shards {
|
||||||
|
Shards::new(vec![make_shard_with_compression(use_compression).await]).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn make_shards() -> Shards {
|
pub async fn make_shards() -> Shards {
|
||||||
Shards::new(vec![make_shard().await]).unwrap()
|
make_shards_with_compression(UseCompression::Auto).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user