Skip to content

Commit

Permalink
feat: switch spanner's db pool to deadpool
Browse files Browse the repository at this point in the history
this is a quick integration of deadpool to be revisited with some more
cleanup

Issue #794
  • Loading branch information
pjenvey committed Aug 21, 2020
1 parent ce905cf commit 0e5a2c4
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 34 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bytes = "0.5"
cadence = "0.20.0"
chrono = "0.4"
config = "0.10"
deadpool = "0.5.2"
diesel = { version = "1.4.4", features = ["mysql", "r2d2"] }
diesel_logger = "0.1.1"
diesel_migrations = { version = "1.4.0", features = ["mysql"] }
Expand Down
9 changes: 9 additions & 0 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl From<bb8::State> for PoolState {
}
}

impl From<deadpool::Status> for PoolState {
fn from(status: deadpool::Status) -> PoolState {
PoolState {
connections: status.size as u32,
idle_connections: status.available as u32,
}
}
}

#[cfg(test)]
pub type GetCollectionId = i32;

Expand Down
16 changes: 8 additions & 8 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
};

pub async fn create_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::CreateBatch,
) -> Result<results::CreateBatch> {
let batch_id = Uuid::new_v4().to_simple().to_string();
Expand Down Expand Up @@ -57,7 +57,7 @@ pub async fn create_async(
Ok(batch_id)
}

pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -> Result<bool> {
pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> Result<bool> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
let exists = db
.sql(
Expand All @@ -81,7 +81,7 @@ pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -
Ok(exists.is_some())
}

pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> Result<()> {
pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
let mut metrics = db.metrics.clone();
metrics.start_timer("storage.spanner.append_items_to_batch", None);

Expand All @@ -106,7 +106,7 @@ pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) ->
}

pub async fn get_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::GetBatch,
) -> Result<Option<results::GetBatch>> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
Expand Down Expand Up @@ -142,7 +142,7 @@ pub async fn get_async(
Ok(batch)
}

pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Result<()> {
pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
// Also deletes child batch_bsos rows (INTERLEAVE IN PARENT batches ON
// DELETE CASCADE)
Expand All @@ -165,7 +165,7 @@ pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Re
}

pub async fn commit_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::CommitBatch,
) -> Result<results::CommitBatch> {
let mut metrics = db.metrics.clone();
Expand Down Expand Up @@ -239,7 +239,7 @@ pub async fn commit_async(
}

pub async fn do_append_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
user_id: HawkIdentifier,
collection_id: i32,
batch_id: String,
Expand Down Expand Up @@ -335,7 +335,7 @@ pub async fn do_append_async(
/// For the special case of a user creating a batch for a collection with no
/// prior data.
async fn pretouch_collection_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
user_id: &HawkIdentifier,
collection_id: i32,
) -> Result<()> {
Expand Down
9 changes: 6 additions & 3 deletions src/db/spanner/manager.rs → src/db/spanner/manager/bb8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use crate::{
settings::Settings,
};

const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
// XXX:
pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";

pub struct SpannerConnectionManager<T> {
database_name: String,
Expand All @@ -33,6 +34,7 @@ impl<_T> fmt::Debug for SpannerConnectionManager<_T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerConnectionManager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}
Expand Down Expand Up @@ -65,7 +67,7 @@ pub struct SpannerSession {
pub client: SpannerClient,
pub session: Session,

pub(super) use_test_transactions: bool,
pub(in crate::db::spanner) use_test_transactions: bool,
}

#[async_trait]
Expand Down Expand Up @@ -130,7 +132,8 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
}
}

async fn create_session(
// XXX:
pub async fn create_session(
client: &SpannerClient,
database_name: &str,
) -> Result<Session, grpcio::Error> {
Expand Down
119 changes: 119 additions & 0 deletions src/db/spanner/manager/deadpool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::{fmt, sync::Arc};

use actix_web::web::block;
use async_trait::async_trait;
use deadpool::managed::{RecycleError, RecycleResult};
use googleapis_raw::spanner::v1::{spanner::GetSessionRequest, spanner_grpc::SpannerClient};
use grpcio::{ChannelBuilder, ChannelCredentials, EnvBuilder, Environment};

use crate::{
db::error::{DbError, DbErrorKind},
server::metrics::Metrics,
settings::Settings,
};

// XXX:
use super::bb8::{create_session, SpannerSession, SPANNER_ADDRESS};

// - -> SpannerSessionManager (and bb8 too)
// - bb8s doesn't need the PhantomData
// - kill the lifetimes for now or PhantomData one
pub struct Manager {
database_name: String,
/// The gRPC environment
env: Arc<Environment>,
metrics: Metrics,
test_transactions: bool,
}

impl fmt::Debug for Manager {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Manager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}

impl Manager {
pub fn new(settings: &Settings, metrics: &Metrics) -> Result<Self, DbError> {
let url = &settings.database_url;
if !url.starts_with("spanner://") {
Err(DbErrorKind::InvalidUrl(url.to_owned()))?;
}
let database_name = url["spanner://".len()..].to_owned();
let env = Arc::new(EnvBuilder::new().build());

#[cfg(not(test))]
let test_transactions = false;
#[cfg(test)]
let test_transactions = settings.database_use_test_transactions;

Ok(Manager {
database_name,
env,
metrics: metrics.clone(),
test_transactions,
})
}
}

#[async_trait]
impl deadpool::managed::Manager<SpannerSession, DbError> for Manager {
async fn create(&self) -> Result<SpannerSession, DbError> {
let env = self.env.clone();
let mut metrics = self.metrics.clone();
// XXX: issue732: Could google_default_credentials (or
// ChannelBuilder::secure_connect) block?!
let chan = block(move || -> Result<grpcio::Channel, grpcio::Error> {
metrics.start_timer("storage.pool.grpc_auth", None);
// Requires
// GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
let creds = ChannelCredentials::google_default_credentials()?;
Ok(ChannelBuilder::new(env)
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds))
})
.await
.map_err(|e| match e {
actix_web::error::BlockingError::Error(e) => e.into(),
actix_web::error::BlockingError::Canceled => {
DbError::internal("web::block Manager operation canceled")
}
})?;
let client = SpannerClient::new(chan);

// Connect to the instance and create a Spanner session.
let session = create_session(&client, &self.database_name).await?;

Ok(SpannerSession {
client,
session,
use_test_transactions: self.test_transactions,
})
}

async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult<DbError> {
let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
if let Err(e) = conn
.client
.get_session_async(&req)
.map_err(|e| RecycleError::Backend(e.into()))?
.await
{
match e {
grpcio::Error::RpcFailure(ref status)
if status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, &self.database_name)
.await
.map_err(|e| RecycleError::Backend(e.into()))?;
}
_ => return Err(RecycleError::Backend(e.into())),
}
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/db/spanner/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod bb8;
pub mod deadpool;
20 changes: 10 additions & 10 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,37 @@ struct SpannerDbSession {
}

#[derive(Clone, Debug)]
pub struct SpannerDb<'a> {
pub(super) inner: Arc<SpannerDbInner<'a>>,
pub struct SpannerDb {
pub(super) inner: Arc<SpannerDbInner>,

/// Pool level cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,

pub metrics: Metrics,
}

pub struct SpannerDbInner<'a> {
pub(super) conn: Conn<'a>,
pub struct SpannerDbInner {
pub(super) conn: Conn,

session: RefCell<SpannerDbSession>,
}

impl fmt::Debug for SpannerDbInner<'_> {
impl fmt::Debug for SpannerDbInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SpannerDbInner")
}
}

impl<'a> Deref for SpannerDb<'a> {
type Target = SpannerDbInner<'a>;
impl Deref for SpannerDb {
type Target = SpannerDbInner;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<'a> SpannerDb<'a> {
pub fn new(conn: Conn<'a>, coll_cache: Arc<CollectionCache>, metrics: &Metrics) -> Self {
impl SpannerDb {
pub fn new(conn: Conn, coll_cache: Arc<CollectionCache>, metrics: &Metrics) -> Self {
let inner = SpannerDbInner {
conn,
session: RefCell::new(Default::default()),
Expand Down Expand Up @@ -1604,7 +1604,7 @@ impl<'a> SpannerDb<'a> {
}
}

impl<'a> Db<'a> for SpannerDb<'a> {
impl<'a> Db<'a> for SpannerDb {
fn commit(&self) -> DbFuture<'_, ()> {
let db = self.clone();
Box::pin(async move { db.commit_async().map_err(Into::into).await })
Expand Down
Loading

0 comments on commit 0e5a2c4

Please sign in to comment.