diff --git a/Cargo.lock b/Cargo.lock index 43ce8a1..b5d7018 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,6 +277,7 @@ dependencies = [ "tokio-rusqlite", "tracing", "tracing-subscriber", + "zstd", ] [[package]] @@ -305,6 +306,11 @@ name = "cc" version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" +dependencies = [ + "jobserver", + "libc", + "once_cell", +] [[package]] name = "cfg-if" @@ -882,6 +888,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -2196,3 +2211,31 @@ dependencies = [ "quote", "syn 2.0.60", ] + +[[package]] +name = "zstd" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.10+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index b650165..6938c79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ clap = { version = "4.5.4", features = ["derive"] } futures = "0.3.30" kdam = "0.5.1" rand = "0.8.5" -rusqlite = "0.31.0" +rusqlite = { version = "0.31.0", features = ["vtab"] } serde = { version = "1.0.198", features = ["serde_derive"] } serde_json = "1.0.116" sha2 = "0.10.8" @@ -30,3 +30,4 @@ tracing = "0.1.40" tracing-subscriber = "0.3.18" reqwest = { version = "0.12.4", features = ["json", "multipart", "blocking"] } hex = "0.4.3" +zstd = "0.13.1" diff --git a/src/shard/mod.rs b/src/shard/mod.rs index a1cf4eb..1b2eb46 100644 --- a/src/shard/mod.rs +++ b/src/shard/mod.rs @@ -1,7 +1,10 @@ +pub mod shard_error; + use crate::{ axum_result_type::AxumJsonResultOf, request_response::store_request::{StoreRequestWithSha256, StoreResponse}, sha256::Sha256, + shard::shard_error::ShardError, }; use axum::{http::StatusCode, Json}; @@ -157,37 +160,28 @@ impl Shard { // create tables, indexes, etc self.sqlite .call(move |conn| { - conn.execute( - "CREATE TABLE IF NOT EXISTS schema_version ( - version INTEGER PRIMARY KEY, - created_at TEXT NOT NULL - )", - [], - )?; + ensure_schema_versions_table(conn)?; + let schema_rows = load_schema_rows(conn)?; - let schema_row: Option<(i64, UtcDateTime)> = conn.query_row( - "SELECT version, created_at FROM schema_version ORDER BY version DESC LIMIT 1", - [], - |row| { - let ver = row.get(0)?; - // let created_at_str: String = row.get(1)?; - let created_at = parse_created_at_str(row.get(1)?)?; - Ok((ver, created_at)) - } - ).optional()?; - - if let Some((version, date_time)) = schema_row { + if let Some((version, date_time)) = schema_rows.first() { debug!( "shard {}: latest schema version: {} @ {}", shard_id, version, date_time ); - if version < 1 { - migrate_to_version_1(conn)?; + if *version == 1 { + migrate_to_version_2(conn)?; + } else if *version == 2 { + // no-op + } else { + return Err(tokio_rusqlite::Error::Other(Box::new(ShardError::new( + format!("shard {}: unsupported schema version {}", shard_id, version), + )))); } } else { debug!("shard {}: no schema version found, initializing", shard_id); migrate_to_version_1(conn)?; + migrate_to_version_2(conn)?; } Ok(()) }) @@ -220,12 +214,29 @@ fn is_duplicate_entry_err(error: &rusqlite::Error) -> bool { false } -fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> { +fn ensure_schema_versions_table(conn: &rusqlite::Connection) -> Result { conn.execute( - "INSERT INTO schema_version (version, created_at) VALUES (1, ?)", - [chrono::Utc::now().to_rfc3339()], - )?; + "CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + created_at TEXT NOT NULL + )", + [], + ) +} +fn load_schema_rows(conn: &rusqlite::Connection) -> Result, rusqlite::Error> { + let mut stmt = conn + .prepare("SELECT version, created_at FROM schema_version ORDER BY version DESC LIMIT 1")?; + let rows = stmt.query_map([], |row| { + let version = row.get(0)?; + let created_at = row.get(1)?; + Ok((version, created_at)) + })?; + rows.collect() +} + +fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> { + debug!("migrating to version 1"); conn.execute( "CREATE TABLE IF NOT EXISTS entries ( sha256 BLOB PRIMARY KEY, @@ -237,6 +248,27 @@ fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Err [], )?; + conn.execute( + "INSERT INTO schema_version (version, created_at) VALUES (1, ?)", + [chrono::Utc::now().to_rfc3339()], + )?; + + Ok(()) +} + +fn migrate_to_version_2(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> { + debug!("migrating to version 2"); + conn.execute( + "INSERT INTO schema_version (version, created_at) VALUES (2, ?)", + [chrono::Utc::now().to_rfc3339()], + )?; + + // add `compression` column to entries table + conn.execute( + "ALTER TABLE entries ADD COLUMN compression INTEGER NOT NULL DEFAULT 0", + [], + )?; + Ok(()) } diff --git a/src/shard/shard_error.rs b/src/shard/shard_error.rs new file mode 100644 index 0000000..90ad91c --- /dev/null +++ b/src/shard/shard_error.rs @@ -0,0 +1,20 @@ +use std::{ + error::Error, + fmt::{Display, Formatter}, +}; + +#[derive(Debug)] +pub struct ShardError { + pub message: String, +} +impl ShardError { + pub fn new(message: String) -> Self { + Self { message } + } +} +impl Display for ShardError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} +impl Error for ShardError {}