From 077bf091ecaededfa3c937ce5ac5a5f6f95015f3 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 19 Aug 2020 16:20:59 -0700 Subject: [PATCH] feat: switch spanner's db pool to deadpool this is a quick integration of deadpool to be revisited with some more cleanup Issue #794 --- Cargo.lock | 15 +++ Cargo.toml | 1 + src/db/results.rs | 9 ++ src/db/spanner/batch.rs | 16 +-- src/db/spanner/{manager.rs => manager/bb8.rs} | 9 +- src/db/spanner/manager/deadpool.rs | 119 ++++++++++++++++++ src/db/spanner/manager/mod.rs | 2 + src/db/spanner/models.rs | 20 +-- src/db/spanner/pool.rs | 31 +++-- src/db/spanner/support.rs | 6 +- 10 files changed, 194 insertions(+), 34 deletions(-) rename src/db/spanner/{manager.rs => manager/bb8.rs} (95%) create mode 100644 src/db/spanner/manager/deadpool.rs create mode 100644 src/db/spanner/manager/mod.rs diff --git a/Cargo.lock b/Cargo.lock index add81b9a3c..7549eeea6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,6 +777,20 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "deadpool" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aaff9a7a1de9893f4004fa08527b31cb2ae4121c44e053cf53f29203c73bd23" +dependencies = [ + "async-trait", + "config", + "crossbeam-queue", + "num_cpus", + "serde 1.0.114", + "tokio", +] + [[package]] name = "debugid" version = "0.7.2" @@ -2719,6 +2733,7 @@ dependencies = [ "cadence", "chrono", "config", + "deadpool", "diesel", "diesel_logger", "diesel_migrations", diff --git a/Cargo.toml b/Cargo.toml index ea17f1a7da..102cb80688 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ bytes = "0.5" cadence = "0.20.0" chrono = "0.4" config = "0.10" +deadpool = "0.5.2" diesel = { version = "1.4.4", features = ["mysql", "r2d2"] } diesel_logger = "0.1.1" diesel_migrations = { version = "1.4.0", features = ["mysql"] } diff --git a/src/db/results.rs b/src/db/results.rs index 3b063f44cf..93f49f706c 100644 --- a/src/db/results.rs +++ b/src/db/results.rs @@ -94,6 +94,15 @@ impl From for PoolState { } } +impl From for PoolState { + fn from(status: deadpool::Status) -> PoolState { + PoolState { + connections: status.size as u32, + idle_connections: status.available as u32, + } + } +} + #[cfg(test)] pub type GetCollectionId = i32; diff --git a/src/db/spanner/batch.rs b/src/db/spanner/batch.rs index 6869e4805d..66ba26b41f 100644 --- a/src/db/spanner/batch.rs +++ b/src/db/spanner/batch.rs @@ -18,7 +18,7 @@ use crate::{ }; pub async fn create_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::CreateBatch, ) -> Result { let batch_id = Uuid::new_v4().to_simple().to_string(); @@ -57,7 +57,7 @@ pub async fn create_async( Ok(batch_id) } -pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -> Result { +pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> Result { let collection_id = db.get_collection_id_async(¶ms.collection).await?; let exists = db .sql( @@ -81,7 +81,7 @@ pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) - Ok(exists.is_some()) } -pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> Result<()> { +pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> { let mut metrics = db.metrics.clone(); metrics.start_timer("storage.spanner.append_items_to_batch", None); @@ -106,7 +106,7 @@ pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> } pub async fn get_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::GetBatch, ) -> Result> { let collection_id = db.get_collection_id_async(¶ms.collection).await?; @@ -142,7 +142,7 @@ pub async fn get_async( Ok(batch) } -pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Result<()> { +pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> { let collection_id = db.get_collection_id_async(¶ms.collection).await?; // Also deletes child batch_bsos rows (INTERLEAVE IN PARENT batches ON // DELETE CASCADE) @@ -165,7 +165,7 @@ pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Re } pub async fn commit_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::CommitBatch, ) -> Result { let mut metrics = db.metrics.clone(); @@ -239,7 +239,7 @@ pub async fn commit_async( } pub async fn do_append_async( - db: &SpannerDb<'_>, + db: &SpannerDb, user_id: HawkIdentifier, collection_id: i32, batch_id: String, @@ -335,7 +335,7 @@ pub async fn do_append_async( /// For the special case of a user creating a batch for a collection with no /// prior data. async fn pretouch_collection_async( - db: &SpannerDb<'_>, + db: &SpannerDb, user_id: &HawkIdentifier, collection_id: i32, ) -> Result<()> { diff --git a/src/db/spanner/manager.rs b/src/db/spanner/manager/bb8.rs similarity index 95% rename from src/db/spanner/manager.rs rename to src/db/spanner/manager/bb8.rs index 344625a968..181e01548a 100644 --- a/src/db/spanner/manager.rs +++ b/src/db/spanner/manager/bb8.rs @@ -18,7 +18,8 @@ use crate::{ settings::Settings, }; -const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; +// XXX: +pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; pub struct SpannerConnectionManager { database_name: String, @@ -33,6 +34,7 @@ impl<_T> fmt::Debug for SpannerConnectionManager<_T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("SpannerConnectionManager") .field("database_name", &self.database_name) + .field("test_transactions", &self.test_transactions) .finish() } } @@ -65,7 +67,7 @@ pub struct SpannerSession { pub client: SpannerClient, pub session: Session, - pub(super) use_test_transactions: bool, + pub(in crate::db::spanner) use_test_transactions: bool, } #[async_trait] @@ -130,7 +132,8 @@ impl ManageConnection } } -async fn create_session( +// XXX: +pub async fn create_session( client: &SpannerClient, database_name: &str, ) -> Result { diff --git a/src/db/spanner/manager/deadpool.rs b/src/db/spanner/manager/deadpool.rs new file mode 100644 index 0000000000..0077b6705c --- /dev/null +++ b/src/db/spanner/manager/deadpool.rs @@ -0,0 +1,119 @@ +use std::{fmt, sync::Arc}; + +use actix_web::web::block; +use async_trait::async_trait; +use deadpool::managed::{RecycleError, RecycleResult}; +use googleapis_raw::spanner::v1::{spanner::GetSessionRequest, spanner_grpc::SpannerClient}; +use grpcio::{ChannelBuilder, ChannelCredentials, EnvBuilder, Environment}; + +use crate::{ + db::error::{DbError, DbErrorKind}, + server::metrics::Metrics, + settings::Settings, +}; + +// XXX: +use super::bb8::{create_session, SpannerSession, SPANNER_ADDRESS}; + +// - -> SpannerSessionManager (and bb8 too) +// - bb8s doesn't need the PhantomData +// - kill the lifetimes for now or PhantomData one +pub struct Manager { + database_name: String, + /// The gRPC environment + env: Arc, + metrics: Metrics, + test_transactions: bool, +} + +impl fmt::Debug for Manager { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Manager") + .field("database_name", &self.database_name) + .field("test_transactions", &self.test_transactions) + .finish() + } +} + +impl Manager { + pub fn new(settings: &Settings, metrics: &Metrics) -> Result { + let url = &settings.database_url; + if !url.starts_with("spanner://") { + Err(DbErrorKind::InvalidUrl(url.to_owned()))?; + } + let database_name = url["spanner://".len()..].to_owned(); + let env = Arc::new(EnvBuilder::new().build()); + + #[cfg(not(test))] + let test_transactions = false; + #[cfg(test)] + let test_transactions = settings.database_use_test_transactions; + + Ok(Manager { + database_name, + env, + metrics: metrics.clone(), + test_transactions, + }) + } +} + +#[async_trait] +impl deadpool::managed::Manager for Manager { + async fn create(&self) -> Result { + let env = self.env.clone(); + let mut metrics = self.metrics.clone(); + // XXX: issue732: Could google_default_credentials (or + // ChannelBuilder::secure_connect) block?! + let chan = block(move || -> Result { + metrics.start_timer("storage.pool.grpc_auth", None); + // Requires + // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json + let creds = ChannelCredentials::google_default_credentials()?; + Ok(ChannelBuilder::new(env) + .max_send_message_len(100 << 20) + .max_receive_message_len(100 << 20) + .secure_connect(SPANNER_ADDRESS, creds)) + }) + .await + .map_err(|e| match e { + actix_web::error::BlockingError::Error(e) => e.into(), + actix_web::error::BlockingError::Canceled => { + DbError::internal("web::block Manager operation canceled") + } + })?; + let client = SpannerClient::new(chan); + + // Connect to the instance and create a Spanner session. + let session = create_session(&client, &self.database_name).await?; + + Ok(SpannerSession { + client, + session, + use_test_transactions: self.test_transactions, + }) + } + + async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult { + let mut req = GetSessionRequest::new(); + req.set_name(conn.session.get_name().to_owned()); + if let Err(e) = conn + .client + .get_session_async(&req) + .map_err(|e| RecycleError::Backend(e.into()))? + .await + { + match e { + grpcio::Error::RpcFailure(ref status) + if status.status == grpcio::RpcStatusCode::NOT_FOUND => + { + conn.session = create_session(&conn.client, &self.database_name) + .await + .map_err(|e| RecycleError::Backend(e.into()))?; + } + _ => return Err(RecycleError::Backend(e.into())), + } + } + Ok(()) + } +} diff --git a/src/db/spanner/manager/mod.rs b/src/db/spanner/manager/mod.rs new file mode 100644 index 0000000000..d29dca6f4b --- /dev/null +++ b/src/db/spanner/manager/mod.rs @@ -0,0 +1,2 @@ +pub mod bb8; +pub mod deadpool; diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 17fd169c6c..cba4bbb8ee 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -80,8 +80,8 @@ struct SpannerDbSession { } #[derive(Clone, Debug)] -pub struct SpannerDb<'a> { - pub(super) inner: Arc>, +pub struct SpannerDb { + pub(super) inner: Arc, /// Pool level cache of collection_ids and their names coll_cache: Arc, @@ -89,28 +89,28 @@ pub struct SpannerDb<'a> { pub metrics: Metrics, } -pub struct SpannerDbInner<'a> { - pub(super) conn: Conn<'a>, +pub struct SpannerDbInner { + pub(super) conn: Conn, session: RefCell, } -impl fmt::Debug for SpannerDbInner<'_> { +impl fmt::Debug for SpannerDbInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "SpannerDbInner") } } -impl<'a> Deref for SpannerDb<'a> { - type Target = SpannerDbInner<'a>; +impl Deref for SpannerDb { + type Target = SpannerDbInner; fn deref(&self) -> &Self::Target { &self.inner } } -impl<'a> SpannerDb<'a> { - pub fn new(conn: Conn<'a>, coll_cache: Arc, metrics: &Metrics) -> Self { +impl SpannerDb { + pub fn new(conn: Conn, coll_cache: Arc, metrics: &Metrics) -> Self { let inner = SpannerDbInner { conn, session: RefCell::new(Default::default()), @@ -1604,7 +1604,7 @@ impl<'a> SpannerDb<'a> { } } -impl<'a> Db<'a> for SpannerDb<'a> { +impl<'a> Db<'a> for SpannerDb { fn commit(&self) -> DbFuture<'_, ()> { let db = self.clone(); Box::pin(async move { db.commit_async().map_err(Into::into).await }) diff --git a/src/db/spanner/pool.rs b/src/db/spanner/pool.rs index 43304456bd..94bcd5009f 100644 --- a/src/db/spanner/pool.rs +++ b/src/db/spanner/pool.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use bb8::{ErrorSink, Pool, PooledConnection}; +use bb8::ErrorSink; use std::{ collections::HashMap, @@ -12,11 +12,12 @@ use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS}; use crate::server::metrics::Metrics; use crate::settings::Settings; -use super::manager::{SpannerConnectionManager, SpannerSession}; +use super::manager::bb8::SpannerSession; use super::models::SpannerDb; use crate::error::ApiResult; -pub(super) type Conn<'a> = PooledConnection<'a, SpannerConnectionManager>; +//pub(super) type Conn<'a> = PooledConnection<'a, SpannerConnectionManager>; +pub(super) type Conn = deadpool::managed::Object; embed_migrations!(); @@ -32,7 +33,8 @@ embed_migrations!(); #[derive(Clone)] pub struct SpannerDbPool { /// Pool of db connections - pool: Pool>, + //pool: Pool>, + pool: deadpool::managed::Pool, /// In-memory cache of collection_ids and their names coll_cache: Arc, @@ -47,24 +49,33 @@ impl SpannerDbPool { } pub async fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result { - let manager = SpannerConnectionManager::::new(settings, metrics)?; + //let manager = SpannerConnectionManager::::new(settings, metrics)?; let max_size = settings.database_pool_max_size.unwrap_or(10); + /* let builder = bb8::Pool::builder() .max_size(max_size) .min_idle(settings.database_pool_min_idle) .error_sink(Box::new(LoggingErrorSink)); + */ + let manager = super::manager::deadpool::Manager::new(settings, metrics)?; + let config = deadpool::managed::PoolConfig::new(max_size as usize); + let pool = deadpool::managed::Pool::from_config(manager, config); Ok(Self { - pool: builder.build(manager).await?, + pool, coll_cache: Default::default(), metrics: metrics.clone(), }) } - pub async fn get_async(&self) -> Result> { + pub async fn get_async(&self) -> Result { let conn = self.pool.get().await.map_err(|e| match e { - bb8::RunError::User(dbe) => dbe, - bb8::RunError::TimedOut => DbError::internal("bb8:TimedOut"), + //bb8::RunError::User(dbe) => dbe, + //bb8::RunError::TimedOut => DbError::internal("bb8:TimedOut"), + deadpool::managed::PoolError::Backend(dbe) => dbe, + deadpool::managed::PoolError::Timeout(timeout_type) => { + DbError::internal(&format!("deadpool Timeout: {:?}", timeout_type)) + } })?; Ok(SpannerDb::new( conn, @@ -84,7 +95,7 @@ impl DbPool for SpannerDbPool { } fn state(&self) -> results::PoolState { - self.pool.state().into() + self.pool.status().into() } fn validate_batch_id(&self, id: String) -> Result<()> { diff --git a/src/db/spanner/support.rs b/src/db/spanner/support.rs index 5766cb4748..6c6ca0ccfd 100644 --- a/src/db/spanner/support.rs +++ b/src/db/spanner/support.rs @@ -88,7 +88,7 @@ impl ExecuteSqlRequestBuilder { self } - fn prepare_request(self, conn: &Conn<'_>) -> ExecuteSqlRequest { + fn prepare_request(self, conn: &Conn) -> ExecuteSqlRequest { let mut request = self.execute_sql; request.set_session(conn.session.get_name().to_owned()); if let Some(params) = self.params { @@ -103,7 +103,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a SQL read statement but return a non-blocking streaming result - pub fn execute_async(self, conn: &Conn<'_>) -> Result { + pub fn execute_async(self, conn: &Conn) -> Result { let stream = conn .client .execute_streaming_sql(&self.prepare_request(conn))?; @@ -111,7 +111,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a DML statement, returning the exact count of modified rows - pub async fn execute_dml_async(self, conn: &Conn<'_>) -> Result { + pub async fn execute_dml_async(self, conn: &Conn) -> Result { let rs = conn .client .execute_sql_async(&self.prepare_request(conn))?