diff --git a/src/db/mod.rs b/src/db/mod.rs index 9d3a221787..89b2773a5a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -57,9 +57,6 @@ pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101; /// Rough guesstimate of the maximum reasonable life span of a batch pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds -/// DbPools' worker ThreadPool size -pub const DB_THREAD_POOL_SIZE: usize = 50; - type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; #[async_trait(?Send)] diff --git a/src/db/mysql/pool.rs b/src/db/mysql/pool.rs index 322366d793..f8bdf9c43e 100644 --- a/src/db/mysql/pool.rs +++ b/src/db/mysql/pool.rs @@ -17,7 +17,11 @@ use diesel::{ use super::models::{MysqlDb, Result}; #[cfg(test)] use super::test::TestTransactionCustomizer; -use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS}; +use crate::db::{ + error::DbError, + results::{self, PoolState}, + Db, DbPool, STD_COLLS, +}; use crate::error::{ApiError, ApiResult}; use crate::server::metrics::Metrics; use crate::settings::Settings; @@ -105,8 +109,10 @@ impl DbPool for MysqlDbPool { } impl fmt::Debug for MysqlDbPool { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "MysqlDbPool {{ coll_cache: {:?} }}", self.coll_cache) + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("MysqlDbPool") + .field("coll_cache", &self.coll_cache) + .finish() } } @@ -175,3 +181,12 @@ impl Default for CollectionCache { } } } + +impl From for PoolState { + fn from(state: diesel::r2d2::State) -> PoolState { + PoolState { + connections: state.connections, + idle_connections: state.idle_connections, + } + } +} diff --git a/src/db/results.rs b/src/db/results.rs index 93f49f706c..6f24dc82d2 100644 --- a/src/db/results.rs +++ b/src/db/results.rs @@ -76,33 +76,6 @@ pub struct PoolState { pub idle_connections: u32, } -impl From for PoolState { - fn from(state: diesel::r2d2::State) -> PoolState { - PoolState { - connections: state.connections, - idle_connections: state.idle_connections, - } - } -} - -impl From for PoolState { - fn from(state: bb8::State) -> PoolState { - PoolState { - connections: state.connections, - idle_connections: state.idle_connections, - } - } -} - -impl From for PoolState { - fn from(status: deadpool::Status) -> PoolState { - PoolState { - connections: status.size as u32, - idle_connections: status.available as u32, - } - } -} - #[cfg(test)] pub type GetCollectionId = i32; diff --git a/src/db/spanner/manager/bb8.rs b/src/db/spanner/manager/bb8.rs index b154889059..ee18a5e282 100644 --- a/src/db/spanner/manager/bb8.rs +++ b/src/db/spanner/manager/bb8.rs @@ -1,26 +1,25 @@ use std::marker::PhantomData; use std::{fmt, sync::Arc}; -use actix_web::web::block; use async_trait::async_trait; -use bb8::ManageConnection; -use googleapis_raw::spanner::v1::{ - spanner::{CreateSessionRequest, GetSessionRequest, Session}, - spanner_grpc::SpannerClient, -}; -use grpcio::{ - CallOption, ChannelBuilder, ChannelCredentials, EnvBuilder, Environment, MetadataBuilder, -}; +use bb8::{ManageConnection, PooledConnection}; +use grpcio::{EnvBuilder, Environment}; use crate::{ - db::error::{DbError, DbErrorKind}, + db::{ + error::{DbError, DbErrorKind}, + results::PoolState, + }, server::metrics::Metrics, settings::Settings, }; -pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; +use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession}; -pub struct SpannerConnectionManager { +#[allow(dead_code)] +pub type Conn<'a> = PooledConnection<'a, SpannerSessionManager>; + +pub struct SpannerSessionManager { database_name: String, /// The gRPC environment env: Arc, @@ -29,22 +28,22 @@ pub struct SpannerConnectionManager { phantom: PhantomData, } -impl<_T> fmt::Debug for SpannerConnectionManager<_T> { +impl<_T> fmt::Debug for SpannerSessionManager<_T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("SpannerConnectionManager") + fmt.debug_struct("bb8::SpannerSessionManager") .field("database_name", &self.database_name) .field("test_transactions", &self.test_transactions) .finish() } } -impl SpannerConnectionManager { +impl SpannerSessionManager { + #[allow(dead_code)] pub fn new(settings: &Settings, metrics: &Metrics) -> Result { - let url = &settings.database_url; - if !url.starts_with("spanner://") { - Err(DbErrorKind::InvalidUrl(url.to_owned()))?; - } - let database_name = url["spanner://".len()..].to_owned(); + let database_name = settings + .spanner_database_name() + .ok_or_else(|| DbErrorKind::InvalidUrl(settings.database_url.to_owned()))? + .to_owned(); let env = Arc::new(EnvBuilder::new().build()); #[cfg(not(test))] @@ -52,7 +51,7 @@ impl SpannerConnectionManager { #[cfg(test)] let test_transactions = settings.database_use_test_transactions; - Ok(SpannerConnectionManager:: { + Ok(SpannerSessionManager:: { database_name, env, metrics: metrics.clone(), @@ -62,67 +61,23 @@ impl SpannerConnectionManager { } } -pub struct SpannerSession { - pub client: SpannerClient, - pub session: Session, - - pub(in crate::db::spanner) use_test_transactions: bool, -} - #[async_trait] -impl ManageConnection - for SpannerConnectionManager -{ +impl ManageConnection for SpannerSessionManager { type Connection = SpannerSession; type Error = DbError; async fn connect(&self) -> Result { - let env = self.env.clone(); - let mut metrics = self.metrics.clone(); - // XXX: issue732: Could google_default_credentials (or - // ChannelBuilder::secure_connect) block?! - let chan = block(move || -> Result { - metrics.start_timer("storage.pool.grpc_auth", None); - // Requires - // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json - let creds = ChannelCredentials::google_default_credentials()?; - Ok(ChannelBuilder::new(env) - .max_send_message_len(100 << 20) - .max_receive_message_len(100 << 20) - .secure_connect(SPANNER_ADDRESS, creds)) - }) + create_spanner_session( + Arc::clone(&self.env), + self.metrics.clone(), + &self.database_name, + self.test_transactions, + ) .await - .map_err(|e| match e { - actix_web::error::BlockingError::Error(e) => e.into(), - actix_web::error::BlockingError::Canceled => { - DbError::internal("web::block Manager operation canceled") - } - })?; - let client = SpannerClient::new(chan); - - // Connect to the instance and create a Spanner session. - let session = create_session(&client, &self.database_name).await?; - - Ok(SpannerSession { - client, - session, - use_test_transactions: self.test_transactions, - }) } async fn is_valid(&self, mut conn: Self::Connection) -> Result { - let mut req = GetSessionRequest::new(); - req.set_name(conn.session.get_name().to_owned()); - if let Err(e) = conn.client.get_session_async(&req)?.await { - match e { - grpcio::Error::RpcFailure(ref status) - if status.status == grpcio::RpcStatusCode::NOT_FOUND => - { - conn.session = create_session(&conn.client, &self.database_name).await?; - } - _ => return Err(e.into()), - } - } + recycle_spanner_session(&mut conn, &self.database_name).await?; Ok(conn) } @@ -131,15 +86,11 @@ impl ManageConnection } } -pub async fn create_session( - client: &SpannerClient, - database_name: &str, -) -> Result { - let mut req = CreateSessionRequest::new(); - req.database = database_name.to_owned(); - let mut meta = MetadataBuilder::new(); - meta.add_str("google-cloud-resource-prefix", database_name)?; - meta.add_str("x-goog-api-client", "gcp-grpc-rs")?; - let opt = CallOption::default().headers(meta.build()); - client.create_session_async_opt(&req, opt)?.await +impl From for PoolState { + fn from(state: bb8::State) -> PoolState { + PoolState { + connections: state.connections, + idle_connections: state.idle_connections, + } + } } diff --git a/src/db/spanner/manager/deadpool.rs b/src/db/spanner/manager/deadpool.rs index e75e7dd3ad..baaaf4249d 100644 --- a/src/db/spanner/manager/deadpool.rs +++ b/src/db/spanner/manager/deadpool.rs @@ -1,20 +1,23 @@ use std::{fmt, sync::Arc}; -use actix_web::web::block; use async_trait::async_trait; -use deadpool::managed::{RecycleError, RecycleResult}; -use googleapis_raw::spanner::v1::{spanner::GetSessionRequest, spanner_grpc::SpannerClient}; -use grpcio::{ChannelBuilder, ChannelCredentials, EnvBuilder, Environment}; +use deadpool::managed::{Manager, RecycleError, RecycleResult}; +use grpcio::{EnvBuilder, Environment}; use crate::{ - db::error::{DbError, DbErrorKind}, + db::{ + error::{DbError, DbErrorKind}, + results::PoolState, + }, server::metrics::Metrics, settings::Settings, }; -use super::bb8::{create_session, SpannerSession, SPANNER_ADDRESS}; +use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession}; -pub struct Manager { +pub type Conn = deadpool::managed::Object; + +pub struct SpannerSessionManager { database_name: String, /// The gRPC environment env: Arc, @@ -22,22 +25,21 @@ pub struct Manager { test_transactions: bool, } -impl fmt::Debug for Manager { +impl fmt::Debug for SpannerSessionManager { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Manager") + fmt.debug_struct("deadpool::SpannerSessionManager") .field("database_name", &self.database_name) .field("test_transactions", &self.test_transactions) .finish() } } -impl Manager { +impl SpannerSessionManager { pub fn new(settings: &Settings, metrics: &Metrics) -> Result { - let url = &settings.database_url; - if !url.starts_with("spanner://") { - Err(DbErrorKind::InvalidUrl(url.to_owned()))?; - } - let database_name = url["spanner://".len()..].to_owned(); + let database_name = settings + .spanner_database_name() + .ok_or_else(|| DbErrorKind::InvalidUrl(settings.database_url.to_owned()))? + .to_owned(); let env = Arc::new(EnvBuilder::new().build()); #[cfg(not(test))] @@ -45,7 +47,7 @@ impl Manager { #[cfg(test)] let test_transactions = settings.database_use_test_transactions; - Ok(Manager { + Ok(Self { database_name, env, metrics: metrics.clone(), @@ -55,61 +57,29 @@ impl Manager { } #[async_trait] -impl deadpool::managed::Manager for Manager { +impl Manager for SpannerSessionManager { async fn create(&self) -> Result { - let env = self.env.clone(); - let mut metrics = self.metrics.clone(); - // XXX: issue732: Could google_default_credentials (or - // ChannelBuilder::secure_connect) block?! - let chan = block(move || -> Result { - metrics.start_timer("storage.pool.grpc_auth", None); - // Requires - // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json - let creds = ChannelCredentials::google_default_credentials()?; - Ok(ChannelBuilder::new(env) - .max_send_message_len(100 << 20) - .max_receive_message_len(100 << 20) - .secure_connect(SPANNER_ADDRESS, creds)) - }) + create_spanner_session( + Arc::clone(&self.env), + self.metrics.clone(), + &self.database_name, + self.test_transactions, + ) .await - .map_err(|e| match e { - actix_web::error::BlockingError::Error(e) => e.into(), - actix_web::error::BlockingError::Canceled => { - DbError::internal("web::block Manager operation canceled") - } - })?; - let client = SpannerClient::new(chan); - - // Connect to the instance and create a Spanner session. - let session = create_session(&client, &self.database_name).await?; - - Ok(SpannerSession { - client, - session, - use_test_transactions: self.test_transactions, - }) } async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult { - let mut req = GetSessionRequest::new(); - req.set_name(conn.session.get_name().to_owned()); - if let Err(e) = conn - .client - .get_session_async(&req) - .map_err(|e| RecycleError::Backend(e.into()))? + recycle_spanner_session(conn, &self.database_name) .await - { - match e { - grpcio::Error::RpcFailure(ref status) - if status.status == grpcio::RpcStatusCode::NOT_FOUND => - { - conn.session = create_session(&conn.client, &self.database_name) - .await - .map_err(|e| RecycleError::Backend(e.into()))?; - } - _ => return Err(RecycleError::Backend(e.into())), - } + .map_err(RecycleError::Backend) + } +} + +impl From for PoolState { + fn from(status: deadpool::Status) -> PoolState { + PoolState { + connections: status.size as u32, + idle_connections: status.available.max(0) as u32, } - Ok(()) } } diff --git a/src/db/spanner/manager/mod.rs b/src/db/spanner/manager/mod.rs index d29dca6f4b..02d34ecfc7 100644 --- a/src/db/spanner/manager/mod.rs +++ b/src/db/spanner/manager/mod.rs @@ -1,2 +1,6 @@ -pub mod bb8; -pub mod deadpool; +mod bb8; +mod deadpool; +mod session; + +pub use self::deadpool::{Conn, SpannerSessionManager}; +pub use self::session::SpannerSession; diff --git a/src/db/spanner/manager/session.rs b/src/db/spanner/manager/session.rs new file mode 100644 index 0000000000..4efb637146 --- /dev/null +++ b/src/db/spanner/manager/session.rs @@ -0,0 +1,93 @@ +use actix_web::web::block; +use googleapis_raw::spanner::v1::{ + spanner::{CreateSessionRequest, GetSessionRequest, Session}, + spanner_grpc::SpannerClient, +}; +use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, MetadataBuilder}; +use std::sync::Arc; + +use crate::{db::error::DbError, server::metrics::Metrics}; + +const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; + +/// Represents a communication channel w/ Spanner +/// +/// Session creation is expensive in Spanner so sessions should be long-lived +/// and cached for reuse. +pub struct SpannerSession { + pub session: Session, + /// The underlying client (Connection/Channel) for interacting with spanner + pub client: SpannerClient, + pub(in crate::db::spanner) use_test_transactions: bool, +} + +/// Create a Session (and the underlying gRPC Channel) +pub async fn create_spanner_session( + env: Arc, + mut metrics: Metrics, + database_name: &str, + use_test_transactions: bool, +) -> Result { + // XXX: issue732: Could google_default_credentials (or + // ChannelBuilder::secure_connect) block?! + let chan = block(move || -> Result { + metrics.start_timer("storage.pool.grpc_auth", None); + // Requires + // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json + let creds = ChannelCredentials::google_default_credentials()?; + Ok(ChannelBuilder::new(env) + .max_send_message_len(100 << 20) + .max_receive_message_len(100 << 20) + .secure_connect(SPANNER_ADDRESS, creds)) + }) + .await + .map_err(|e| match e { + actix_web::error::BlockingError::Error(e) => e.into(), + actix_web::error::BlockingError::Canceled => { + DbError::internal("web::block Manager operation canceled") + } + })?; + let client = SpannerClient::new(chan); + + // Connect to the instance and create a Spanner session. + let session = create_session(&client, database_name).await?; + + Ok(SpannerSession { + session, + client, + use_test_transactions, + }) +} + +/// Recycle a cached Session for reuse +pub async fn recycle_spanner_session( + conn: &mut SpannerSession, + database_name: &str, +) -> Result<(), DbError> { + let mut req = GetSessionRequest::new(); + req.set_name(conn.session.get_name().to_owned()); + if let Err(e) = conn.client.get_session_async(&req)?.await { + match e { + grpcio::Error::RpcFailure(ref status) + if status.status == grpcio::RpcStatusCode::NOT_FOUND => + { + conn.session = create_session(&conn.client, database_name).await?; + } + _ => return Err(e.into()), + } + } + Ok(()) +} + +async fn create_session( + client: &SpannerClient, + database_name: &str, +) -> Result { + let mut req = CreateSessionRequest::new(); + req.database = database_name.to_owned(); + let mut meta = MetadataBuilder::new(); + meta.add_str("google-cloud-resource-prefix", database_name)?; + meta.add_str("x-goog-api-client", "gcp-grpc-rs")?; + let opt = CallOption::default().headers(meta.build()); + client.create_session_async_opt(&req, opt)?.await +} diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index cba4bbb8ee..023db0ca94 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -35,10 +35,9 @@ use super::{ batch, pool::{CollectionCache, Conn}, support::{ - as_list_value, as_type, as_value, bso_from_row, ExecuteSqlRequestBuilder, - StreamedResultSetAsync, + as_list_value, as_type, as_value, bso_from_row, bso_to_insert_row, bso_to_update_row, + ExecuteSqlRequestBuilder, StreamedResultSetAsync, }, - support::{bso_to_insert_row, bso_to_update_row}, }; #[derive(Debug, Eq, PartialEq)] diff --git a/src/db/spanner/pool.rs b/src/db/spanner/pool.rs index 74150e09dd..689fbbf9d6 100644 --- a/src/db/spanner/pool.rs +++ b/src/db/spanner/pool.rs @@ -12,11 +12,11 @@ use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS}; use crate::server::metrics::Metrics; use crate::settings::Settings; -use super::manager::bb8::SpannerSession; +use super::manager::{SpannerSession, SpannerSessionManager}; use super::models::SpannerDb; use crate::error::ApiResult; -pub(super) type Conn = deadpool::managed::Object; +pub use super::manager::Conn; embed_migrations!(); @@ -32,7 +32,6 @@ embed_migrations!(); #[derive(Clone)] pub struct SpannerDbPool { /// Pool of db connections - //pool: Pool>, pool: deadpool::managed::Pool, /// In-memory cache of collection_ids and their names coll_cache: Arc, @@ -49,7 +48,7 @@ impl SpannerDbPool { pub async fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result { let max_size = settings.database_pool_max_size.unwrap_or(10); - let manager = super::manager::deadpool::Manager::new(settings, metrics)?; + let manager = SpannerSessionManager::new(settings, metrics)?; let config = deadpool::managed::PoolConfig::new(max_size as usize); let pool = deadpool::managed::Pool::from_config(manager, config); @@ -98,8 +97,10 @@ impl DbPool for SpannerDbPool { } impl fmt::Debug for SpannerDbPool { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SpannerDbPool {{ coll_cache: {:?} }}", self.coll_cache) + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("SpannerDbPool") + .field("coll_cache", &self.coll_cache) + .finish() } } diff --git a/src/settings.rs b/src/settings.rs index 17b22d35b3..683fa793d0 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -28,6 +28,7 @@ pub struct Settings { pub host: String, pub database_url: String, pub database_pool_max_size: Option, + // NOTE: Not supported by deadpool! pub database_pool_min_idle: Option, #[cfg(test)] pub database_use_test_transactions: bool, @@ -163,7 +164,15 @@ impl Settings { } pub fn uses_spanner(&self) -> bool { - self.database_url.as_str().starts_with("spanner") + self.database_url.as_str().starts_with("spanner://") + } + + pub fn spanner_database_name(&self) -> Option<&str> { + if !self.uses_spanner() { + None + } else { + Some(&self.database_url["spanner://".len()..]) + } } /// A simple banner for display of certain settings at startup diff --git a/src/web/extractors.rs b/src/web/extractors.rs index 4a0896671f..bb856f5345 100644 --- a/src/web/extractors.rs +++ b/src/web/extractors.rs @@ -38,9 +38,6 @@ use crate::web::{ X_WEAVE_RECORDS, }; -#[cfg(test)] -use crate::db::Db; - const BATCH_MAX_IDS: usize = 100; // BSO const restrictions @@ -1798,7 +1795,10 @@ mod tests { use serde_json::{self, json}; use sha2::Sha256; - use crate::db::mock::{MockDb, MockDbPool}; + use crate::db::{ + mock::{MockDb, MockDbPool}, + Db, + }; use crate::server::{metrics, ServerState}; use crate::settings::{Secrets, ServerLimits, Settings};