Skip to content

Commit

Permalink
storage: use connection pool for sqlite
Browse files Browse the repository at this point in the history
Sqlite connection is not thread safe, so use connection pool to
support multi-threading.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Nov 29, 2023
1 parent 5568e3a commit 36f7153
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 37 deletions.
98 changes: 84 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ libc = "0.2"
log = "0.4.8"
nix = "0.24"
reqwest = { version = "0.11.14", features = ["blocking", "json"], optional = true }
rusqlite = { version = "0.30", features = ["bundled"] }
rusqlite = { version = "0.30", features = ["bundled"], optional = true }
r2d2 = { version = "0.8", optional = true }
r2d2_sqlite = { version = "0.23", optional = true }
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde_json = "1.0.53"
sha1 = { version = "0.10.5", optional = true }
Expand All @@ -35,7 +37,7 @@ tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sy
url = { version = "2.1.1", optional = true }
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
gpt = { version = "3.0.0", optional = true }
gpt = { version = "3.1.0", optional = true }

nydus-api = { version = "0.3", path = "../api" }
nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] }
Expand All @@ -47,13 +49,15 @@ regex = "1.7.0"
toml = "0.5"

[features]
default = ["dedup"]
backend-localdisk = []
backend-localdisk-gpt = ["gpt", "backend-localdisk"]
backend-localfs = []
backend-oss = ["base64", "httpdate", "hmac", "sha1", "reqwest", "url"]
backend-registry = ["base64", "reqwest", "url"]
backend-s3 = ["base64", "hmac", "http", "reqwest", "sha2", "time", "url"]
backend-http-proxy = ["hyper", "hyperlocal", "http", "reqwest", "url"]
dedup = ["rusqlite", "r2d2", "r2d2_sqlite"]
prefetch-rate-limit = ["leaky-bucket"]

[package.metadata.docs.rs]
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/localdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl LocalDisk {
v.name.clone()
} else {
// The 64-byte blob_id is stored in two parts
v.name.clone() + guid.to_simple().to_string().as_str()
v.name.clone() + guid.simple().to_string().as_str()
};

if name.is_empty() {
Expand Down
56 changes: 37 additions & 19 deletions storage/src/cache/dedup/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,28 @@

use std::path::Path;

use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction};

use super::Result;
use crate::cache::dedup::CasError;

pub struct CasDb {
conn: Connection,
pool: Pool<SqliteConnectionManager>,
}

impl CasDb {
pub fn new(path: impl AsRef<Path>) -> Result<CasDb> {
let mut db_path = path.as_ref().to_owned();
db_path.push("cas.db");
let conn = Connection::open(db_path)?;
Self::from_file(db_path)
}

// Always wait in case of busy.
conn.busy_handler(Some(|_v| true))?;
pub fn from_file(db_path: impl AsRef<Path>) -> Result<CasDb> {
let mgr = SqliteConnectionManager::file(db_path);
let pool = r2d2::Pool::new(mgr)?;
let conn = pool.get()?;

conn.execute(
"CREATE TABLE IF NOT EXISTS Blobs (
Expand Down Expand Up @@ -58,7 +64,7 @@ impl CasDb {
)?;
*/

Ok(CasDb { conn })
Ok(CasDb { pool })
}

pub fn get_blob_id_with_tx(tran: &Transaction, blob: &str) -> Result<Option<u64>> {
Expand All @@ -78,7 +84,7 @@ impl CasDb {
let sql = "SELECT BlobId FROM Blobs WHERE FilePath = ?";

if let Some(id) = self
.conn
.get_connection()?
.query_row(sql, [blob], |row| row.get::<usize, u64>(0))
.optional()?
{
Expand All @@ -92,7 +98,7 @@ impl CasDb {
let sql = "SELECT FilePath FROM Blobs WHERE BlobId = ?";

if let Some(path) = self
.conn
.get_connection()?
.query_row(sql, [id], |row| row.get::<usize, String>(0))
.optional()?
{
Expand All @@ -103,7 +109,8 @@ impl CasDb {
}

pub fn get_all_blobs(&self) -> Result<Vec<(u64, String)>> {
let mut stmt = self.conn.prepare("SELECT BlobId, FilePath FROM Blobs")?;
let conn = self.get_connection()?;
let mut stmt = conn.prepare_cached("SELECT BlobId, FilePath FROM Blobs")?;
let rows = stmt.query_map([], |row| Ok((row.get::<usize, u64>(0)?, row.get(1)?)))?;
let mut results: Vec<(u64, String)> = Vec::new();
for row in rows {
Expand All @@ -114,8 +121,9 @@ impl CasDb {

pub fn add_blobs(&mut self, blobs: &[String]) -> Result<()> {
let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)";
let mut conn = self.get_connection()?;
let tran = Self::begin_transaction(&mut conn)?;

let tran = self.begin_transaction()?;
for blob in blobs {
if let Err(e) = tran.execute(sql, [blob]) {
return Err(e.into());
Expand All @@ -128,15 +136,17 @@ impl CasDb {

pub fn add_blob(&mut self, blob: &str) -> Result<u64> {
let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)";
self.conn.execute(sql, [blob])?;
Ok(self.conn.last_insert_rowid() as u64)
let conn = self.get_connection()?;
conn.execute(sql, [blob])?;
Ok(conn.last_insert_rowid() as u64)
}

pub fn delete_blobs(&mut self, blobs: &[String]) -> Result<()> {
let delete_blobs_sql = "DELETE FROM Blobs WHERE FilePath = (?1)";
let delete_chunks_sql = "DELETE FROM Chunks WHERE BlobId = (?1)";
let mut conn = self.get_connection()?;
let tran = Self::begin_transaction(&mut conn)?;

let tran = self.begin_transaction()?;
for blob in blobs {
if let Some(id) = Self::get_blob_id_with_tx(&tran, blob)? {
if let Err(e) = tran.execute(delete_chunks_sql, [id]) {
Expand All @@ -160,7 +170,7 @@ impl CasDb {
ORDER BY Blobs.BlobId LIMIT 1 OFFSET 0";

if let Some((new_blob_id, chunk_info)) = self
.conn
.get_connection()?
.query_row(sql, [chunk_id], |row| {
Ok((row.get(0)?, row.get::<usize, u64>(1)?))
})
Expand All @@ -174,8 +184,9 @@ impl CasDb {

pub fn add_chunks(&mut self, chunks: &[(String, u64, String)]) -> Result<()> {
let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)";
let mut conn = self.get_connection()?;
let tran = Self::begin_transaction(&mut conn)?;

let tran = self.begin_transaction()?;
for chunk in chunks {
match Self::get_blob_id_with_tx(&tran, &chunk.2) {
Err(e) => return Err(e),
Expand All @@ -193,8 +204,9 @@ impl CasDb {

pub fn add_chunk(&mut self, chunk_id: &str, chunk_offset: u64, blob_id: &str) -> Result<()> {
let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)";
let mut conn = self.get_connection()?;
let tran = Self::begin_transaction(&mut conn)?;

let tran = self.begin_transaction()?;
match Self::get_blob_id_with_tx(&tran, blob_id) {
Err(e) => return Err(e),
Ok(id) => {
Expand All @@ -208,13 +220,19 @@ impl CasDb {
Ok(())
}

fn begin_transaction(&mut self) -> Result<Transaction> {
let mut tx = self
.conn
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
fn begin_transaction(
conn: &mut PooledConnection<SqliteConnectionManager>,
) -> Result<Transaction> {
let mut tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
tx.set_drop_behavior(DropBehavior::Rollback);
Ok(tx)
}

fn get_connection(&self) -> Result<PooledConnection<SqliteConnectionManager>> {
let conn = self.pool.get()?;
conn.busy_handler(Some(|_v| true))?;
Ok(conn)
}
}

#[cfg(test)]
Expand Down
13 changes: 12 additions & 1 deletion storage/src/cache/dedup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ mod db;
pub enum CasError {
Io(Error),
Db(rusqlite::Error),
R2D2(r2d2::Error),
}

impl Display for CasError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
match self {
CasError::Io(e) => write!(f, "{}", e),
CasError::Db(e) => write!(f, "{}", e),
CasError::R2D2(e) => write!(f, "{}", e),
}
}
}

Expand All @@ -28,6 +33,12 @@ impl From<rusqlite::Error> for CasError {
}
}

impl From<r2d2::Error> for CasError {
fn from(e: r2d2::Error) -> Self {
CasError::R2D2(e)
}
}

impl From<Error> for CasError {
fn from(e: Error) -> Self {
CasError::Io(e)
Expand Down

0 comments on commit 36f7153

Please sign in to comment.