From 95c78326d28993b74962679d1d0839b3300c4bb8 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Wed, 29 Nov 2023 13:36:15 +0800 Subject: [PATCH] storage: use connection pool for sqlite Sqlite connection is not thread safe, so use connection pool to support multi-threading. Signed-off-by: Jiang Liu --- Cargo.lock | 98 +++++++++++++++++++++++++++----- storage/Cargo.toml | 7 ++- storage/src/backend/localdisk.rs | 2 +- storage/src/cache/dedup/db.rs | 59 ++++++++++++------- storage/src/cache/dedup/mod.rs | 13 ++++- storage/src/cache/mod.rs | 1 + 6 files changed, 141 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e32af89d62..522f5b06e7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,12 +155,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "build_const" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" - [[package]] name = "bumpalo" version = "3.12.0" @@ -299,13 +293,19 @@ dependencies = [ [[package]] name = "crc" -version = "1.8.1" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" dependencies = [ - "build_const", + "crc-catalog", ] +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.3.2" @@ -637,11 +637,11 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "gpt" -version = "3.0.0" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd7365d734a70ac5dd7be791b0c96083852188df015b8c665bb2dadb108a743" +checksum = "8283e7331b8c93b9756e0cfdbcfb90312852f953c6faf9bf741e684cc3b6ad69" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "crc", "log", "uuid", @@ -1324,6 +1324,8 @@ dependencies = [ "nix", "nydus-api", "nydus-utils", + "r2d2", + "r2d2_sqlite", "regex", "reqwest", "rusqlite", @@ -1514,6 +1516,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1556,6 +1564,58 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6" +dependencies = [ + "r2d2", + "rusqlite", + "uuid", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -1696,6 +1756,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -2097,11 +2166,12 @@ dependencies = [ [[package]] name = "uuid" -version = "0.8.2" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom", + "rand", ] [[package]] diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 8b7005a2fb0..2c2f1fa8c6c 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -24,7 +24,9 @@ libc = "0.2" log = "0.4.8" nix = "0.24" reqwest = { version = "0.11.14", features = ["blocking", "json"], optional = true } -rusqlite = { version = "0.30", features = ["bundled"] } +rusqlite = { version = "0.30", features = ["bundled"], optional = true } +r2d2 = { version = "0.8", optional = true } +r2d2_sqlite = { version = "0.23", optional = true } serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" sha1 = { version = "0.10.5", optional = true } @@ -35,7 +37,7 @@ tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sy url = { version = "2.1.1", optional = true } vm-memory = "0.10" fuse-backend-rs = "^0.10.3" -gpt = { version = "3.0.0", optional = true } +gpt = { version = "3.1.0", optional = true } nydus-api = { version = "0.3", path = "../api" } nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] } @@ -54,6 +56,7 @@ backend-oss = ["base64", "httpdate", "hmac", "sha1", "reqwest", "url"] backend-registry = ["base64", "reqwest", "url"] backend-s3 = ["base64", "hmac", "http", "reqwest", "sha2", "time", "url"] backend-http-proxy = ["hyper", "hyperlocal", "http", "reqwest", "url"] +dedup = ["rusqlite", "r2d2", "r2d2_sqlite"] prefetch-rate-limit = ["leaky-bucket"] [package.metadata.docs.rs] diff --git a/storage/src/backend/localdisk.rs b/storage/src/backend/localdisk.rs index 329af02c35c..0c83bfba9db 100644 --- a/storage/src/backend/localdisk.rs +++ b/storage/src/backend/localdisk.rs @@ -297,7 +297,7 @@ impl LocalDisk { v.name.clone() } else { // The 64-byte blob_id is stored in two parts - v.name.clone() + guid.to_simple().to_string().as_str() + v.name.clone() + guid.simple().to_string().as_str() }; if name.is_empty() { diff --git a/storage/src/cache/dedup/db.rs b/storage/src/cache/dedup/db.rs index 36b62883461..f57889d4fa6 100644 --- a/storage/src/cache/dedup/db.rs +++ b/storage/src/cache/dedup/db.rs @@ -6,22 +6,27 @@ use std::path::Path; +use r2d2::{Pool, PooledConnection}; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction}; use super::Result; pub struct CasDb { - conn: Connection, + pool: Pool, } impl CasDb { pub fn new(path: impl AsRef) -> Result { let mut db_path = path.as_ref().to_owned(); db_path.push("cas.db"); - let conn = Connection::open(db_path)?; + Self::from_file(db_path) + } - // Always wait in case of busy. - conn.busy_handler(Some(|_v| true))?; + pub fn from_file(db_path: impl AsRef) -> Result { + let mgr = SqliteConnectionManager::file(db_path); + let pool = r2d2::Pool::new(mgr)?; + let conn = pool.get()?; conn.execute( "CREATE TABLE IF NOT EXISTS Blobs ( @@ -58,7 +63,7 @@ impl CasDb { )?; */ - Ok(CasDb { conn }) + Ok(CasDb { pool }) } pub fn get_blob_id_with_tx(tran: &Transaction, blob: &str) -> Result> { @@ -78,7 +83,7 @@ impl CasDb { let sql = "SELECT BlobId FROM Blobs WHERE FilePath = ?"; if let Some(id) = self - .conn + .get_connection()? .query_row(sql, [blob], |row| row.get::(0)) .optional()? { @@ -92,7 +97,7 @@ impl CasDb { let sql = "SELECT FilePath FROM Blobs WHERE BlobId = ?"; if let Some(path) = self - .conn + .get_connection()? .query_row(sql, [id], |row| row.get::(0)) .optional()? { @@ -103,7 +108,8 @@ impl CasDb { } pub fn get_all_blobs(&self) -> Result> { - let mut stmt = self.conn.prepare("SELECT BlobId, FilePath FROM Blobs")?; + let conn = self.get_connection()?; + let mut stmt = conn.prepare_cached("SELECT BlobId, FilePath FROM Blobs")?; let rows = stmt.query_map([], |row| Ok((row.get::(0)?, row.get(1)?)))?; let mut results: Vec<(u64, String)> = Vec::new(); for row in rows { @@ -114,8 +120,9 @@ impl CasDb { pub fn add_blobs(&mut self, blobs: &[String]) -> Result<()> { let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; - let tran = self.begin_transaction()?; for blob in blobs { if let Err(e) = tran.execute(sql, [blob]) { return Err(e.into()); @@ -126,17 +133,19 @@ impl CasDb { Ok(()) } - pub fn add_blob(&mut self, blob: &str) -> Result { + pub fn add_blob(&self, blob: &str) -> Result { let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)"; - self.conn.execute(sql, [blob])?; - Ok(self.conn.last_insert_rowid() as u64) + let conn = self.get_connection()?; + conn.execute(sql, [blob])?; + Ok(conn.last_insert_rowid() as u64) } pub fn delete_blobs(&mut self, blobs: &[String]) -> Result<()> { let delete_blobs_sql = "DELETE FROM Blobs WHERE FilePath = (?1)"; let delete_chunks_sql = "DELETE FROM Chunks WHERE BlobId = (?1)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; - let tran = self.begin_transaction()?; for blob in blobs { if let Some(id) = Self::get_blob_id_with_tx(&tran, blob)? { if let Err(e) = tran.execute(delete_chunks_sql, [id]) { @@ -160,7 +169,7 @@ impl CasDb { ORDER BY Blobs.BlobId LIMIT 1 OFFSET 0"; if let Some((new_blob_id, chunk_info)) = self - .conn + .get_connection()? .query_row(sql, [chunk_id], |row| { Ok((row.get(0)?, row.get::(1)?)) }) @@ -174,8 +183,9 @@ impl CasDb { pub fn add_chunks(&mut self, chunks: &[(String, u64, String)]) -> Result<()> { let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; - let tran = self.begin_transaction()?; for chunk in chunks { match Self::get_blob_id_with_tx(&tran, &chunk.2) { Err(e) => return Err(e), @@ -191,10 +201,11 @@ impl CasDb { Ok(()) } - pub fn add_chunk(&mut self, chunk_id: &str, chunk_offset: u64, blob_id: &str) -> Result<()> { + pub fn add_chunk(&self, chunk_id: &str, chunk_offset: u64, blob_id: &str) -> Result<()> { let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; - let tran = self.begin_transaction()?; match Self::get_blob_id_with_tx(&tran, blob_id) { Err(e) => return Err(e), Ok(id) => { @@ -208,13 +219,19 @@ impl CasDb { Ok(()) } - fn begin_transaction(&mut self) -> Result { - let mut tx = self - .conn - .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?; + fn begin_transaction( + conn: &mut PooledConnection, + ) -> Result { + let mut tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?; tx.set_drop_behavior(DropBehavior::Rollback); Ok(tx) } + + fn get_connection(&self) -> Result> { + let conn = self.pool.get()?; + conn.busy_handler(Some(|_v| true))?; + Ok(conn) + } } #[cfg(test)] diff --git a/storage/src/cache/dedup/mod.rs b/storage/src/cache/dedup/mod.rs index 09723666cb1..f52a8fcc1de 100644 --- a/storage/src/cache/dedup/mod.rs +++ b/storage/src/cache/dedup/mod.rs @@ -12,11 +12,16 @@ mod db; pub enum CasError { Io(Error), Db(rusqlite::Error), + R2D2(r2d2::Error), } impl Display for CasError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self) + match self { + CasError::Io(e) => write!(f, "{}", e), + CasError::Db(e) => write!(f, "{}", e), + CasError::R2D2(e) => write!(f, "{}", e), + } } } @@ -28,6 +33,12 @@ impl From for CasError { } } +impl From for CasError { + fn from(e: r2d2::Error) -> Self { + CasError::R2D2(e) + } +} + impl From for CasError { fn from(e: Error) -> Self { CasError::Io(e) diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index 2f86f5fd047..1ae6dda497e 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -36,6 +36,7 @@ use crate::utils::{alloc_buf, check_digest}; use crate::{StorageResult, RAFS_MAX_CHUNK_SIZE}; mod cachedfile; +#[cfg(feature = "dedup")] mod dedup; mod dummycache; mod filecache;