add compression column

This commit is contained in:
Dylan Knutson
2024-04-23 15:06:37 -07:00
parent c772661cf2
commit d91e3c8105
4 changed files with 122 additions and 26 deletions

43
Cargo.lock generated
View File

@@ -277,6 +277,7 @@ dependencies = [
"tokio-rusqlite", "tokio-rusqlite",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"zstd",
] ]
[[package]] [[package]]
@@ -305,6 +306,11 @@ name = "cc"
version = "1.0.95" version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b"
dependencies = [
"jobserver",
"libc",
"once_cell",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@@ -882,6 +888,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.69" version = "0.3.69"
@@ -2196,3 +2211,31 @@ dependencies = [
"quote", "quote",
"syn 2.0.60", "syn 2.0.60",
] ]
[[package]]
name = "zstd"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.10+zstd.1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa"
dependencies = [
"cc",
"pkg-config",
]

View File

@@ -20,7 +20,7 @@ clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3.30" futures = "0.3.30"
kdam = "0.5.1" kdam = "0.5.1"
rand = "0.8.5" rand = "0.8.5"
rusqlite = "0.31.0" rusqlite = { version = "0.31.0", features = ["vtab"] }
serde = { version = "1.0.198", features = ["serde_derive"] } serde = { version = "1.0.198", features = ["serde_derive"] }
serde_json = "1.0.116" serde_json = "1.0.116"
sha2 = "0.10.8" sha2 = "0.10.8"
@@ -30,3 +30,4 @@ tracing = "0.1.40"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
reqwest = { version = "0.12.4", features = ["json", "multipart", "blocking"] } reqwest = { version = "0.12.4", features = ["json", "multipart", "blocking"] }
hex = "0.4.3" hex = "0.4.3"
zstd = "0.13.1"

View File

@@ -1,7 +1,10 @@
pub mod shard_error;
use crate::{ use crate::{
axum_result_type::AxumJsonResultOf, axum_result_type::AxumJsonResultOf,
request_response::store_request::{StoreRequestWithSha256, StoreResponse}, request_response::store_request::{StoreRequestWithSha256, StoreResponse},
sha256::Sha256, sha256::Sha256,
shard::shard_error::ShardError,
}; };
use axum::{http::StatusCode, Json}; use axum::{http::StatusCode, Json};
@@ -157,37 +160,28 @@ impl Shard {
// create tables, indexes, etc // create tables, indexes, etc
self.sqlite self.sqlite
.call(move |conn| { .call(move |conn| {
conn.execute( ensure_schema_versions_table(conn)?;
"CREATE TABLE IF NOT EXISTS schema_version ( let schema_rows = load_schema_rows(conn)?;
version INTEGER PRIMARY KEY,
created_at TEXT NOT NULL
)",
[],
)?;
let schema_row: Option<(i64, UtcDateTime)> = conn.query_row( if let Some((version, date_time)) = schema_rows.first() {
"SELECT version, created_at FROM schema_version ORDER BY version DESC LIMIT 1",
[],
|row| {
let ver = row.get(0)?;
// let created_at_str: String = row.get(1)?;
let created_at = parse_created_at_str(row.get(1)?)?;
Ok((ver, created_at))
}
).optional()?;
if let Some((version, date_time)) = schema_row {
debug!( debug!(
"shard {}: latest schema version: {} @ {}", "shard {}: latest schema version: {} @ {}",
shard_id, version, date_time shard_id, version, date_time
); );
if version < 1 { if *version == 1 {
migrate_to_version_1(conn)?; migrate_to_version_2(conn)?;
} else if *version == 2 {
// no-op
} else {
return Err(tokio_rusqlite::Error::Other(Box::new(ShardError::new(
format!("shard {}: unsupported schema version {}", shard_id, version),
))));
} }
} else { } else {
debug!("shard {}: no schema version found, initializing", shard_id); debug!("shard {}: no schema version found, initializing", shard_id);
migrate_to_version_1(conn)?; migrate_to_version_1(conn)?;
migrate_to_version_2(conn)?;
} }
Ok(()) Ok(())
}) })
@@ -220,12 +214,29 @@ fn is_duplicate_entry_err(error: &rusqlite::Error) -> bool {
false false
} }
fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> { fn ensure_schema_versions_table(conn: &rusqlite::Connection) -> Result<usize, rusqlite::Error> {
conn.execute( conn.execute(
"INSERT INTO schema_version (version, created_at) VALUES (1, ?)", "CREATE TABLE IF NOT EXISTS schema_version (
[chrono::Utc::now().to_rfc3339()], version INTEGER PRIMARY KEY,
)?; created_at TEXT NOT NULL
)",
[],
)
}
fn load_schema_rows(conn: &rusqlite::Connection) -> Result<Vec<(i64, String)>, rusqlite::Error> {
let mut stmt = conn
.prepare("SELECT version, created_at FROM schema_version ORDER BY version DESC LIMIT 1")?;
let rows = stmt.query_map([], |row| {
let version = row.get(0)?;
let created_at = row.get(1)?;
Ok((version, created_at))
})?;
rows.collect()
}
fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
debug!("migrating to version 1");
conn.execute( conn.execute(
"CREATE TABLE IF NOT EXISTS entries ( "CREATE TABLE IF NOT EXISTS entries (
sha256 BLOB PRIMARY KEY, sha256 BLOB PRIMARY KEY,
@@ -237,6 +248,27 @@ fn migrate_to_version_1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Err
[], [],
)?; )?;
conn.execute(
"INSERT INTO schema_version (version, created_at) VALUES (1, ?)",
[chrono::Utc::now().to_rfc3339()],
)?;
Ok(())
}
fn migrate_to_version_2(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
debug!("migrating to version 2");
conn.execute(
"INSERT INTO schema_version (version, created_at) VALUES (2, ?)",
[chrono::Utc::now().to_rfc3339()],
)?;
// add `compression` column to entries table
conn.execute(
"ALTER TABLE entries ADD COLUMN compression INTEGER NOT NULL DEFAULT 0",
[],
)?;
Ok(()) Ok(())
} }

20
src/shard/shard_error.rs Normal file
View File

@@ -0,0 +1,20 @@
use std::{
error::Error,
fmt::{Display, Formatter},
};
#[derive(Debug)]
pub struct ShardError {
pub message: String,
}
impl ShardError {
pub fn new(message: String) -> Self {
Self { message }
}
}
impl Display for ShardError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl Error for ShardError {}