tests for hints
This commit is contained in:
@@ -21,7 +21,7 @@ impl Display for Sha256Error {
|
|||||||
}
|
}
|
||||||
impl Error for Sha256Error {}
|
impl Error for Sha256Error {}
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Default)]
|
#[derive(Clone, Copy, PartialEq, Eq, Default, Hash)]
|
||||||
pub struct Sha256([u8; 32]);
|
pub struct Sha256([u8; 32]);
|
||||||
impl Sha256 {
|
impl Sha256 {
|
||||||
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
pub fn from_hex_string(hex: &str) -> Result<Self, Box<dyn Error>> {
|
||||||
|
|||||||
@@ -48,23 +48,17 @@ impl Shard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CompressedRowResult = (String, usize, UtcDateTime, CompressionId, Vec<u8>);
|
||||||
fn get_compressed_row(
|
fn get_compressed_row(
|
||||||
conn: &mut rusqlite::Connection,
|
conn: &mut rusqlite::Connection,
|
||||||
sha256: &Sha256,
|
sha256: &Sha256,
|
||||||
) -> Result<Option<(String, usize, UtcDateTime, CompressionId, Vec<u8>)>, rusqlite::Error> {
|
) -> Result<Option<CompressedRowResult>, rusqlite::Error> {
|
||||||
conn.query_row(
|
conn.query_row(
|
||||||
"SELECT content_type, compressed_size, created_at, compression_id, data
|
"SELECT content_type, compressed_size, created_at, compression_id, data
|
||||||
FROM entries
|
FROM entries
|
||||||
WHERE sha256 = ?",
|
WHERE sha256 = ?",
|
||||||
params![sha256],
|
params![sha256],
|
||||||
|row| {
|
|row| CompressedRowResult::try_from(row),
|
||||||
let content_type = row.get(0)?;
|
|
||||||
let stored_size = row.get(1)?;
|
|
||||||
let created_at = row.get(2)?;
|
|
||||||
let compression_id = row.get(3)?;
|
|
||||||
let data: Vec<u8> = row.get(4)?;
|
|
||||||
Ok((content_type, stored_size, created_at, compression_id, data))
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
.optional()
|
.optional()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,7 +60,8 @@ fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Err
|
|||||||
debug!("migrating to version 1");
|
debug!("migrating to version 1");
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS entries (
|
"CREATE TABLE IF NOT EXISTS entries (
|
||||||
sha256 BLOB PRIMARY KEY,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
sha256 BLOB NOT NULL,
|
||||||
content_type TEXT NOT NULL,
|
content_type TEXT NOT NULL,
|
||||||
compression_id INTEGER NOT NULL,
|
compression_id INTEGER NOT NULL,
|
||||||
uncompressed_size INTEGER NOT NULL,
|
uncompressed_size INTEGER NOT NULL,
|
||||||
@@ -71,11 +72,16 @@ fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Err
|
|||||||
[],
|
[],
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS entries_sha256_idx ON entries (sha256)",
|
||||||
|
[],
|
||||||
|
)?;
|
||||||
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS compression_hints (
|
"CREATE TABLE IF NOT EXISTS compression_hints (
|
||||||
name TEXT NOT NULL,
|
name TEXT NOT NULL,
|
||||||
ordering INTEGER NOT NULL,
|
ordering INTEGER NOT NULL,
|
||||||
sha256 BLOB NOT NULL
|
entry_id INTEGER NOT NULL
|
||||||
)",
|
)",
|
||||||
[],
|
[],
|
||||||
)?;
|
)?;
|
||||||
|
|||||||
@@ -20,9 +20,9 @@ impl Shard {
|
|||||||
.conn
|
.conn
|
||||||
.call(move |conn| {
|
.call(move |conn| {
|
||||||
let mut stmt = conn.prepare(
|
let mut stmt = conn.prepare(
|
||||||
"SELECT sha256, data FROM entries WHERE sha256 IN (
|
"SELECT sha256, data FROM entries WHERE id IN (
|
||||||
SELECT sha256 FROM compression_hints WHERE name = ? ORDER BY ordering
|
SELECT entry_id FROM compression_hints WHERE name = ?1 ORDER BY ordering
|
||||||
) LIMIT ?",
|
) LIMIT ?2",
|
||||||
)?;
|
)?;
|
||||||
let rows = stmt.query_map(params![compression_hint, limit], |row| {
|
let rows = stmt.query_map(params![compression_hint, limit], |row| {
|
||||||
let sha256: Sha256 = row.get(0)?;
|
let sha256: Sha256 = row.get(0)?;
|
||||||
|
|||||||
@@ -50,9 +50,10 @@ impl Shard {
|
|||||||
|
|
||||||
let uncompressed_size = data.len();
|
let uncompressed_size = data.len();
|
||||||
|
|
||||||
let compressor = self.compressor.read().await;
|
let (compression_id, data) = {
|
||||||
let (compression_id, data) =
|
let compressor = self.compressor.read().await;
|
||||||
compressor.compress(compression_hint.as_deref(), &content_type, data)?;
|
compressor.compress(compression_hint.as_deref(), &content_type, data)?
|
||||||
|
};
|
||||||
|
|
||||||
self.conn
|
self.conn
|
||||||
.call(move |conn| {
|
.call(move |conn| {
|
||||||
@@ -102,28 +103,31 @@ fn insert(
|
|||||||
let created_at = UtcDateTime::now();
|
let created_at = UtcDateTime::now();
|
||||||
let compressed_size = data.len();
|
let compressed_size = data.len();
|
||||||
|
|
||||||
conn.execute("INSERT INTO entries
|
let entry_id: i64 = conn.query_row(
|
||||||
(sha256, content_type, compression_id, uncompressed_size, compressed_size, data, created_at)
|
"INSERT INTO entries
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
(sha256, content_type, compression_id, uncompressed_size, compressed_size, data, created_at)
|
||||||
",
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
params![
|
RETURNING id
|
||||||
sha256,
|
",
|
||||||
content_type,
|
params![
|
||||||
compression_id,
|
sha256,
|
||||||
uncompressed_size,
|
content_type,
|
||||||
compressed_size,
|
compression_id,
|
||||||
data.as_ref(),
|
uncompressed_size,
|
||||||
created_at,
|
compressed_size,
|
||||||
],
|
data.as_ref(),
|
||||||
)?;
|
created_at,
|
||||||
|
],
|
||||||
|
|row| row.get(0)
|
||||||
|
)?;
|
||||||
|
|
||||||
if let Some(compression_hint) = compression_hint {
|
if let Some(compression_hint) = compression_hint {
|
||||||
let rand_ordering = rand::random::<i64>();
|
let rand_ordering = rand::random::<i64>();
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"INSERT INTO compression_hints
|
"INSERT INTO compression_hints
|
||||||
(name, ordering, sha256)
|
(name, ordering, entry_id)
|
||||||
VALUES (?, ?, ?)",
|
VALUES (?, ?, ?)",
|
||||||
params![compression_hint, rand_ordering, sha256],
|
params![compression_hint, rand_ordering, entry_id],
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,16 +2,16 @@ mod fn_get;
|
|||||||
mod fn_migrate;
|
mod fn_migrate;
|
||||||
mod fn_samples_for_hint;
|
mod fn_samples_for_hint;
|
||||||
mod fn_store;
|
mod fn_store;
|
||||||
mod shard;
|
|
||||||
pub mod shard_error;
|
pub mod shard_error;
|
||||||
|
mod shard_struct;
|
||||||
|
|
||||||
pub use fn_get::{GetArgs, GetResult};
|
pub use fn_get::{GetArgs, GetResult};
|
||||||
pub use fn_store::{StoreArgs, StoreResult};
|
pub use fn_store::{StoreArgs, StoreResult};
|
||||||
pub use shard::Shard;
|
pub use shard_struct::Shard;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
pub use super::shard::test::*;
|
pub use super::shard_struct::test::*;
|
||||||
}
|
}
|
||||||
|
|
||||||
use crate::{sha256::Sha256, shard::shard_error::ShardError};
|
use crate::{sha256::Sha256, shard::shard_error::ShardError};
|
||||||
|
|||||||
@@ -63,8 +63,10 @@ async fn get_num_entries(conn: &Connection) -> Result<usize, tokio_rusqlite::Err
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
use crate::compressor::test::make_compressor_with;
|
use crate::compressor::test::make_compressor_with;
|
||||||
use crate::compressor::{Compressor, CompressorArc};
|
use crate::compressor::CompressorArc;
|
||||||
use crate::{
|
use crate::{
|
||||||
compressor::test::make_compressor,
|
compressor::test::make_compressor,
|
||||||
sha256::Sha256,
|
sha256::Sha256,
|
||||||
@@ -74,6 +76,8 @@ pub mod test {
|
|||||||
|
|
||||||
use rstest::rstest;
|
use rstest::rstest;
|
||||||
|
|
||||||
|
use super::Shard;
|
||||||
|
|
||||||
pub async fn make_shard_with(compressor: CompressorArc) -> super::Shard {
|
pub async fn make_shard_with(compressor: CompressorArc) -> super::Shard {
|
||||||
let conn = tokio_rusqlite::Connection::open_in_memory().await.unwrap();
|
let conn = tokio_rusqlite::Connection::open_in_memory().await.unwrap();
|
||||||
super::Shard::open(0, conn, compressor).await.unwrap()
|
super::Shard::open(0, conn, compressor).await.unwrap()
|
||||||
@@ -229,7 +233,6 @@ pub mod test {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_compression_hint() {
|
async fn test_compression_hint() {
|
||||||
let shard = make_shard().await;
|
let shard = make_shard().await;
|
||||||
let compressor = make_compressor().into_arc();
|
|
||||||
let data = "hello, world!".as_bytes();
|
let data = "hello, world!".as_bytes();
|
||||||
let sha256 = Sha256::from_bytes(data);
|
let sha256 = Sha256::from_bytes(data);
|
||||||
let store_result = shard
|
let store_result = shard
|
||||||
@@ -248,4 +251,86 @@ pub mod test {
|
|||||||
assert_eq!(results[0].sha256, sha256);
|
assert_eq!(results[0].sha256, sha256);
|
||||||
assert_eq!(results[0].data, data);
|
assert_eq!(results[0].data, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn store_random(shard: &Shard, hint: &str) -> (Sha256, Vec<u8>) {
|
||||||
|
let data = (0..1024).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
|
||||||
|
let sha256 = Sha256::from_bytes(&data);
|
||||||
|
let store_result = shard
|
||||||
|
.store(StoreArgs {
|
||||||
|
sha256,
|
||||||
|
content_type: "text/plain".to_string(),
|
||||||
|
data: data.clone().into(),
|
||||||
|
compression_hint: Some(hint.to_string()),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(store_result, StoreResult::Created { .. }));
|
||||||
|
(sha256, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compression_hint_limits() {
|
||||||
|
let get_keys_set = |hash_map: &HashMap<Sha256, Vec<u8>>| {
|
||||||
|
hash_map
|
||||||
|
.keys()
|
||||||
|
.into_iter()
|
||||||
|
.map(|k| *k)
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
let insert_num = 500;
|
||||||
|
let sample_num = 100;
|
||||||
|
let shard = make_shard().await;
|
||||||
|
let mut hint1 = HashMap::new();
|
||||||
|
let mut hint2 = HashMap::new();
|
||||||
|
|
||||||
|
for _ in 0..insert_num {
|
||||||
|
let (a, b) = store_random(&shard, "hint1").await;
|
||||||
|
hint1.insert(a, b);
|
||||||
|
}
|
||||||
|
|
||||||
|
for _ in 0..insert_num {
|
||||||
|
let (a, b) = store_random(&shard, "hint2").await;
|
||||||
|
hint2.insert(a, b);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(hint1.len(), insert_num);
|
||||||
|
assert_eq!(hint2.len(), insert_num);
|
||||||
|
|
||||||
|
let hint1samples = shard
|
||||||
|
.samples_for_hint("hint1", sample_num)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| (r.sha256, r.data))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let hint2samples = shard
|
||||||
|
.samples_for_hint("hint2", sample_num)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| (r.sha256, r.data))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let hint1_keys = get_keys_set(&hint1);
|
||||||
|
let hint2_keys = get_keys_set(&hint2);
|
||||||
|
let hint1samples_keys = get_keys_set(&hint1samples);
|
||||||
|
let hint2samples_keys = get_keys_set(&hint2samples);
|
||||||
|
|
||||||
|
assert_eq!(hint1_keys.len(), insert_num);
|
||||||
|
assert_eq!(hint2_keys.len(), insert_num);
|
||||||
|
assert!(hint1_keys.is_disjoint(&hint2_keys));
|
||||||
|
assert!(
|
||||||
|
hint1samples_keys.is_disjoint(&hint2samples_keys),
|
||||||
|
"hint1: {:?}, hint2: {:?}",
|
||||||
|
hint1samples_keys,
|
||||||
|
hint2samples_keys
|
||||||
|
);
|
||||||
|
assert_eq!(hint1samples.len(), sample_num);
|
||||||
|
assert_eq!(hint2samples.len(), sample_num);
|
||||||
|
assert_eq!(hint2samples_keys.len(), sample_num);
|
||||||
|
assert!(hint1_keys.is_superset(&hint1samples_keys));
|
||||||
|
assert!(hint2_keys.is_superset(&hint2samples_keys));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -11,14 +11,19 @@ impl UtcDateTime {
|
|||||||
pub fn now() -> Self {
|
pub fn now() -> Self {
|
||||||
Self(chrono::Utc::now())
|
Self(chrono::Utc::now())
|
||||||
}
|
}
|
||||||
pub fn to_string(&self) -> String {
|
|
||||||
self.0.to_rfc3339()
|
#[cfg(test)]
|
||||||
}
|
|
||||||
pub fn from_string(s: &str) -> Result<Self, chrono::ParseError> {
|
pub fn from_string(s: &str) -> Result<Self, chrono::ParseError> {
|
||||||
Ok(Self(DateTime::parse_from_rfc3339(s)?.to_utc()))
|
Ok(Self(DateTime::parse_from_rfc3339(s)?.to_utc()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ToString for UtcDateTime {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
self.0.to_rfc3339()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PartialEq<DateTime<chrono::Utc>> for UtcDateTime {
|
impl PartialEq<DateTime<chrono::Utc>> for UtcDateTime {
|
||||||
fn eq(&self, other: &DateTime<chrono::Utc>) -> bool {
|
fn eq(&self, other: &DateTime<chrono::Utc>) -> bool {
|
||||||
self.0 == *other
|
self.0 == *other
|
||||||
|
|||||||
Reference in New Issue
Block a user