refactor store request/response, use IntoResponse
This commit is contained in:
@@ -1,18 +1,80 @@
|
|||||||
use crate::{
|
use crate::{sha256::Sha256, shard::StoreResult, shards::Shards};
|
||||||
request_response::store_request::{StoreRequest, StoreResult},
|
use axum::{body::Bytes, response::IntoResponse, Extension, Json};
|
||||||
sha256::Sha256,
|
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
|
||||||
shards::Shards,
|
|
||||||
};
|
|
||||||
use axum::Extension;
|
|
||||||
use axum_typed_multipart::TypedMultipart;
|
|
||||||
|
|
||||||
|
use axum::http::StatusCode;
|
||||||
|
use serde::Serialize;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
|
||||||
|
#[derive(TryFromMultipart)]
|
||||||
|
pub struct StoreRequest {
|
||||||
|
pub sha256: Option<String>,
|
||||||
|
pub content_type: String,
|
||||||
|
pub data: FieldData<Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Debug, PartialEq)]
|
||||||
|
#[serde(rename_all = "snake_case", tag = "status")]
|
||||||
|
pub enum StoreResponse {
|
||||||
|
Created {
|
||||||
|
stored_size: usize,
|
||||||
|
data_size: usize,
|
||||||
|
},
|
||||||
|
Exists {
|
||||||
|
stored_size: usize,
|
||||||
|
data_size: usize,
|
||||||
|
},
|
||||||
|
Sha256Mismatch {
|
||||||
|
expected_sha256: String,
|
||||||
|
},
|
||||||
|
InternalError {
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreResponse {
|
||||||
|
fn status_code(&self) -> StatusCode {
|
||||||
|
match self {
|
||||||
|
StoreResponse::Created { .. } => StatusCode::CREATED,
|
||||||
|
StoreResponse::Exists { .. } => StatusCode::OK,
|
||||||
|
StoreResponse::Sha256Mismatch { .. } => StatusCode::BAD_REQUEST,
|
||||||
|
StoreResponse::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<StoreResult> for StoreResponse {
|
||||||
|
fn from(result: StoreResult) -> Self {
|
||||||
|
match result {
|
||||||
|
StoreResult::Created {
|
||||||
|
stored_size,
|
||||||
|
data_size,
|
||||||
|
} => StoreResponse::Created {
|
||||||
|
stored_size,
|
||||||
|
data_size,
|
||||||
|
},
|
||||||
|
StoreResult::Exists {
|
||||||
|
stored_size,
|
||||||
|
data_size,
|
||||||
|
} => StoreResponse::Exists {
|
||||||
|
stored_size,
|
||||||
|
data_size,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for StoreResponse {
|
||||||
|
fn into_response(self) -> axum::response::Response {
|
||||||
|
(self.status_code(), Json(self)).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[axum::debug_handler]
|
#[axum::debug_handler]
|
||||||
pub async fn store_handler(
|
pub async fn store_handler(
|
||||||
Extension(shards): Extension<Shards>,
|
Extension(shards): Extension<Shards>,
|
||||||
TypedMultipart(request): TypedMultipart<StoreRequest>,
|
TypedMultipart(request): TypedMultipart<StoreRequest>,
|
||||||
) -> StoreResult {
|
) -> StoreResponse {
|
||||||
let sha256 = Sha256::from_bytes(&request.data.contents);
|
let sha256 = Sha256::from_bytes(&request.data.contents);
|
||||||
let shard = shards.shard_for(&sha256);
|
let shard = shards.shard_for(&sha256);
|
||||||
|
|
||||||
@@ -23,12 +85,21 @@ pub async fn store_handler(
|
|||||||
"client sent mismatched sha256: (client) {} != (computed) {}",
|
"client sent mismatched sha256: (client) {} != (computed) {}",
|
||||||
req_sha256, sha256_str
|
req_sha256, sha256_str
|
||||||
);
|
);
|
||||||
return StoreResult::Sha256Mismatch { sha256: sha256_str };
|
|
||||||
|
return StoreResponse::Sha256Mismatch {
|
||||||
|
expected_sha256: sha256_str,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
shard
|
match shard
|
||||||
.store(sha256, request.content_type, request.data.contents)
|
.store(sha256, request.content_type, request.data.contents)
|
||||||
.await
|
.await
|
||||||
|
{
|
||||||
|
Ok(store_result) => store_result.into(),
|
||||||
|
Err(err) => StoreResponse::InternalError {
|
||||||
|
message: err.to_string(),
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -42,7 +113,7 @@ mod test {
|
|||||||
Shards::new(vec![make_shard().await]).unwrap()
|
Shards::new(vec![make_shard().await]).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_request(sha256: Option<Sha256>, data: Bytes) -> StoreResult {
|
async fn send_request(sha256: Option<Sha256>, data: Bytes) -> StoreResponse {
|
||||||
store_handler(
|
store_handler(
|
||||||
Extension(make_shards().await),
|
Extension(make_shards().await),
|
||||||
TypedMultipart(StoreRequest {
|
TypedMultipart(StoreRequest {
|
||||||
@@ -60,7 +131,8 @@ mod test {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_store_handler() {
|
async fn test_store_handler() {
|
||||||
let result = send_request(None, "hello, world!".as_bytes().into()).await;
|
let result = send_request(None, "hello, world!".as_bytes().into()).await;
|
||||||
assert!(matches!(result, StoreResult::Created { .. }));
|
assert_eq!(result.status_code(), StatusCode::CREATED);
|
||||||
|
assert!(matches!(result, StoreResponse::Created { .. }));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -68,10 +140,11 @@ mod test {
|
|||||||
let not_hello_world = Sha256::from_bytes("not hello, world!".as_bytes());
|
let not_hello_world = Sha256::from_bytes("not hello, world!".as_bytes());
|
||||||
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
||||||
let result = send_request(Some(not_hello_world), "hello, world!".as_bytes().into()).await;
|
let result = send_request(Some(not_hello_world), "hello, world!".as_bytes().into()).await;
|
||||||
|
assert_eq!(result.status_code(), StatusCode::BAD_REQUEST);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result,
|
result,
|
||||||
StoreResult::Sha256Mismatch {
|
StoreResponse::Sha256Mismatch {
|
||||||
sha256: hello_world.hex_string()
|
expected_sha256: hello_world.hex_string()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -80,6 +153,7 @@ mod test {
|
|||||||
async fn test_store_handler_matching_sha256() {
|
async fn test_store_handler_matching_sha256() {
|
||||||
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
let hello_world = Sha256::from_bytes("hello, world!".as_bytes());
|
||||||
let result = send_request(Some(hello_world), "hello, world!".as_bytes().into()).await;
|
let result = send_request(Some(hello_world), "hello, world!".as_bytes().into()).await;
|
||||||
assert!(matches!(result, StoreResult::Created { .. }));
|
assert_eq!(result.status_code(), StatusCode::CREATED);
|
||||||
|
assert!(matches!(result, StoreResponse::Created { .. }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
mod handlers;
|
mod handlers;
|
||||||
mod request_response;
|
|
||||||
mod sha256;
|
mod sha256;
|
||||||
mod shard;
|
mod shard;
|
||||||
mod shards;
|
mod shards;
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
pub mod store_request;
|
|
||||||
@@ -1,41 +0,0 @@
|
|||||||
use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Json};
|
|
||||||
use axum_typed_multipart::{FieldData, TryFromMultipart};
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
#[derive(TryFromMultipart)]
|
|
||||||
pub struct StoreRequest {
|
|
||||||
pub sha256: Option<String>,
|
|
||||||
pub content_type: String,
|
|
||||||
pub data: FieldData<Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, PartialEq, Debug)]
|
|
||||||
#[serde(tag = "status", rename_all = "snake_case")]
|
|
||||||
pub enum StoreResult {
|
|
||||||
Created {
|
|
||||||
stored_size: usize,
|
|
||||||
data_size: usize,
|
|
||||||
},
|
|
||||||
Exists {
|
|
||||||
stored_size: usize,
|
|
||||||
data_size: usize,
|
|
||||||
},
|
|
||||||
Sha256Mismatch {
|
|
||||||
sha256: String,
|
|
||||||
},
|
|
||||||
InternalError {
|
|
||||||
message: String,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IntoResponse for StoreResult {
|
|
||||||
fn into_response(self) -> axum::response::Response {
|
|
||||||
let status_code = match &self {
|
|
||||||
StoreResult::Created { .. } => StatusCode::CREATED,
|
|
||||||
StoreResult::Exists { .. } => StatusCode::OK,
|
|
||||||
StoreResult::Sha256Mismatch { .. } => StatusCode::BAD_REQUEST,
|
|
||||||
StoreResult::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
};
|
|
||||||
(status_code, Json(self)).into_response()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -17,7 +17,7 @@ impl Display for Sha256Error {
|
|||||||
}
|
}
|
||||||
impl Error for Sha256Error {}
|
impl Error for Sha256Error {}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy, PartialEq, Eq)]
|
||||||
pub struct Sha256([u8; 32]);
|
pub struct Sha256([u8; 32]);
|
||||||
impl Sha256 {
|
impl Sha256 {
|
||||||
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
||||||
@@ -65,3 +65,9 @@ impl PartialEq<String> for Sha256 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl core::fmt::Debug for Sha256 {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "Sha256({})", self.hex_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
36
src/shard/fn_get.rs
Normal file
36
src/shard/fn_get.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
|
impl Shard {
|
||||||
|
pub async fn get(&self, sha256: Sha256) -> Result<Option<GetResult>, Box<dyn Error>> {
|
||||||
|
self.conn
|
||||||
|
.call(move |conn| get_impl(conn, sha256).map_err(|e| e.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("get failed: {}", e);
|
||||||
|
e.into()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_impl(
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
sha256: Sha256,
|
||||||
|
) -> Result<Option<GetResult>, rusqlite::Error> {
|
||||||
|
conn.query_row(
|
||||||
|
"SELECT content_type, compressed_size, created_at, data FROM entries WHERE sha256 = ?",
|
||||||
|
params![sha256.hex_string()],
|
||||||
|
|row| {
|
||||||
|
let content_type = row.get(0)?;
|
||||||
|
let stored_size = row.get(1)?;
|
||||||
|
let created_at = parse_created_at_str(row.get(2)?)?;
|
||||||
|
let data = row.get(3)?;
|
||||||
|
Ok(GetResult {
|
||||||
|
content_type,
|
||||||
|
stored_size,
|
||||||
|
created_at,
|
||||||
|
data,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.optional()
|
||||||
|
}
|
||||||
78
src/shard/fn_store.rs
Normal file
78
src/shard/fn_store.rs
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(PartialEq, Debug)]
|
||||||
|
pub enum StoreResult {
|
||||||
|
Created {
|
||||||
|
stored_size: usize,
|
||||||
|
data_size: usize,
|
||||||
|
},
|
||||||
|
Exists {
|
||||||
|
stored_size: usize,
|
||||||
|
data_size: usize,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Shard {
|
||||||
|
pub async fn store(
|
||||||
|
&self,
|
||||||
|
sha256: Sha256,
|
||||||
|
content_type: String,
|
||||||
|
data: Bytes,
|
||||||
|
) -> Result<StoreResult, Box<dyn Error>> {
|
||||||
|
self.conn
|
||||||
|
.call(move |conn| store(conn, sha256, content_type, data).map_err(|e| e.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("store failed: {}", e);
|
||||||
|
e.into()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn store(
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
sha256: Sha256,
|
||||||
|
content_type: String,
|
||||||
|
data: Bytes,
|
||||||
|
) -> Result<StoreResult, rusqlite::Error> {
|
||||||
|
let sha256 = sha256.hex_string();
|
||||||
|
|
||||||
|
// check for existing entry
|
||||||
|
let maybe_existing: Option<StoreResult> = conn
|
||||||
|
.query_row(
|
||||||
|
"SELECT uncompressed_size, compressed_size, created_at FROM entries WHERE sha256 = ?",
|
||||||
|
params![sha256],
|
||||||
|
|row| {
|
||||||
|
Ok(StoreResult::Exists {
|
||||||
|
stored_size: row.get(0)?,
|
||||||
|
data_size: row.get(1)?,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.optional()?;
|
||||||
|
|
||||||
|
if let Some(existing) = maybe_existing {
|
||||||
|
return Ok(existing);
|
||||||
|
}
|
||||||
|
|
||||||
|
let created_at = chrono::Utc::now().to_rfc3339();
|
||||||
|
let data_size = data.len();
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO entries (sha256, content_type, compression, uncompressed_size, compressed_size, data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
params![
|
||||||
|
sha256,
|
||||||
|
content_type,
|
||||||
|
0,
|
||||||
|
data_size,
|
||||||
|
data_size,
|
||||||
|
&data[..],
|
||||||
|
created_at,
|
||||||
|
],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(StoreResult::Created {
|
||||||
|
stored_size: data_size,
|
||||||
|
data_size,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,10 +1,11 @@
|
|||||||
|
mod fn_get;
|
||||||
mod fn_migrate;
|
mod fn_migrate;
|
||||||
|
mod fn_store;
|
||||||
pub mod shard_error;
|
pub mod shard_error;
|
||||||
|
|
||||||
use crate::{
|
use crate::{sha256::Sha256, shard::shard_error::ShardError};
|
||||||
request_response::store_request::StoreResult, sha256::Sha256, shard::shard_error::ShardError,
|
|
||||||
};
|
|
||||||
use axum::body::Bytes;
|
use axum::body::Bytes;
|
||||||
|
pub use fn_store::StoreResult;
|
||||||
|
|
||||||
use rusqlite::{params, types::FromSql, OptionalExtension};
|
use rusqlite::{params, types::FromSql, OptionalExtension};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
@@ -61,79 +62,6 @@ impl Shard {
|
|||||||
.map_err(|e| e.into())
|
.map_err(|e| e.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn store(&self, sha256: Sha256, content_type: String, data: Bytes) -> StoreResult {
|
|
||||||
let sha256 = sha256.hex_string();
|
|
||||||
self.conn.call(move |conn| {
|
|
||||||
// check for existing entry
|
|
||||||
let maybe_existing: Option<StoreResult> = conn
|
|
||||||
.query_row(
|
|
||||||
"SELECT uncompressed_size, compressed_size, created_at FROM entries WHERE sha256 = ?",
|
|
||||||
params![sha256],
|
|
||||||
|row| Ok(StoreResult::Exists {
|
|
||||||
stored_size: row.get(0)?,
|
|
||||||
data_size: row.get(1)?,
|
|
||||||
})
|
|
||||||
)
|
|
||||||
.optional()?;
|
|
||||||
|
|
||||||
if let Some(existing) = maybe_existing {
|
|
||||||
return Ok(existing);
|
|
||||||
}
|
|
||||||
|
|
||||||
let created_at = chrono::Utc::now().to_rfc3339();
|
|
||||||
let data_size = data.len();
|
|
||||||
|
|
||||||
conn.execute(
|
|
||||||
"INSERT INTO entries (sha256, content_type, compression, uncompressed_size, compressed_size, data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
||||||
params![
|
|
||||||
sha256,
|
|
||||||
content_type,
|
|
||||||
0,
|
|
||||||
data_size,
|
|
||||||
data_size,
|
|
||||||
&data[..],
|
|
||||||
created_at,
|
|
||||||
],
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(StoreResult::Created { stored_size: data_size, data_size })
|
|
||||||
})
|
|
||||||
.await.unwrap_or_else(|e| {
|
|
||||||
error!("store failed: {}", e);
|
|
||||||
StoreResult::InternalError { message: e.to_string() }
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get(&self, sha256: Sha256) -> Result<Option<GetResult>, Box<dyn Error>> {
|
|
||||||
self.conn
|
|
||||||
.call(move |conn| {
|
|
||||||
let get_result = conn
|
|
||||||
.query_row(
|
|
||||||
"SELECT content_type, compressed_size, created_at, data FROM entries WHERE sha256 = ?",
|
|
||||||
params![sha256.hex_string()],
|
|
||||||
|row| {
|
|
||||||
let content_type = row.get(0)?;
|
|
||||||
let stored_size = row.get(1)?;
|
|
||||||
let created_at = parse_created_at_str(row.get(2)?)?;
|
|
||||||
let data = row.get(3)?;
|
|
||||||
Ok(GetResult {
|
|
||||||
content_type,
|
|
||||||
stored_size,
|
|
||||||
created_at,
|
|
||||||
data,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.optional()?;
|
|
||||||
Ok(get_result)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("get failed: {}", e);
|
|
||||||
e.into()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn num_entries(&self) -> Result<usize, Box<dyn Error>> {
|
pub async fn num_entries(&self) -> Result<usize, Box<dyn Error>> {
|
||||||
get_num_entries(&self.conn).await.map_err(|e| e.into())
|
get_num_entries(&self.conn).await.map_err(|e| e.into())
|
||||||
}
|
}
|
||||||
@@ -191,7 +119,8 @@ pub mod test {
|
|||||||
let sha256 = Sha256::from_bytes(data);
|
let sha256 = Sha256::from_bytes(data);
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(sha256, "text/plain".to_string(), data.into())
|
||||||
.await;
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
store_result,
|
store_result,
|
||||||
super::StoreResult::Created {
|
super::StoreResult::Created {
|
||||||
@@ -215,7 +144,8 @@ pub mod test {
|
|||||||
|
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(sha256, "text/plain".to_string(), data.into())
|
||||||
.await;
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
store_result,
|
store_result,
|
||||||
super::StoreResult::Created {
|
super::StoreResult::Created {
|
||||||
@@ -226,7 +156,8 @@ pub mod test {
|
|||||||
|
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
.store(sha256, "text/plain".to_string(), data.into())
|
.store(sha256, "text/plain".to_string(), data.into())
|
||||||
.await;
|
.await
|
||||||
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
store_result,
|
store_result,
|
||||||
super::StoreResult::Exists {
|
super::StoreResult::Exists {
|
||||||
|
|||||||
Reference in New Issue
Block a user