Skip to content

Commit

Permalink
feat: kill old or excessively idled connections (#1006)
Browse files Browse the repository at this point in the history
* feat: kill old or excessivly idled connections

* Two new optional arguments:

 _database_pool_connection_lifespan_ = max connection lifespan (in seconds)
 _database_pool_connection_max_idle_ = max idle time (in seconds)

* add reporting (sorta) of connection lifespans to appropriate errors

Issue: #861
  • Loading branch information
jrconlin authored Feb 23, 2021
1 parent 56cadcb commit 082dd1f
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 64 deletions.
3 changes: 3 additions & 0 deletions src/db/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub enum DbErrorKind {

#[fail(display = "User over quota")]
Quota,

#[fail(display = "Connection expired")]
Expired,
}

impl DbError {
Expand Down
4 changes: 4 additions & 0 deletions src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl<'a> Db<'a> for MockDb {
mock_db_method!(get_batch, GetBatch, Option<results::GetBatch>);
mock_db_method!(commit_batch, CommitBatch);

fn get_connection_info(&self) -> results::ConnectionInfo {
results::ConnectionInfo::default()
}

mock_db_method!(get_collection_id, GetCollectionId);
#[cfg(test)]
mock_db_method!(create_collection, CreateCollection);
Expand Down
2 changes: 2 additions & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ pub trait Db<'a>: Debug + 'a {

fn check(&self) -> DbFuture<'_, results::Check>;

fn get_connection_info(&self) -> results::ConnectionInfo;

/// Retrieve the timestamp for an item/collection
///
/// Modeled on the Python `get_resource_timestamp` function.
Expand Down
4 changes: 4 additions & 0 deletions src/db/mysql/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ impl<'a> Db<'a> for MysqlDb {
Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into))
}

fn get_connection_info(&self) -> results::ConnectionInfo {
results::ConnectionInfo::default()
}

#[cfg(test)]
fn create_collection(&self, name: String) -> DbFuture<'_, i32> {
let db = self.clone();
Expand Down
6 changes: 6 additions & 0 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ pub struct PoolState {
pub idle_connections: u32,
}

#[derive(Debug, Default)]
pub struct ConnectionInfo {
pub age: i64,
pub idle: i64,
}

pub type GetCollectionId = i32;

#[cfg(test)]
Expand Down
21 changes: 16 additions & 5 deletions src/db/spanner/manager/deadpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct SpannerSessionManager {
env: Arc<Environment>,
metrics: Metrics,
test_transactions: bool,
max_lifespan: Option<u32>,
max_idle: Option<u32>,
}

impl fmt::Debug for SpannerSessionManager {
Expand Down Expand Up @@ -52,26 +54,35 @@ impl SpannerSessionManager {
env,
metrics: metrics.clone(),
test_transactions,
max_lifespan: settings.database_pool_connection_lifespan,
max_idle: settings.database_pool_connection_max_idle,
})
}
}

#[async_trait]
impl Manager<SpannerSession, DbError> for SpannerSessionManager {
async fn create(&self) -> Result<SpannerSession, DbError> {
create_spanner_session(
let session = create_spanner_session(
Arc::clone(&self.env),
self.metrics.clone(),
&self.database_name,
self.test_transactions,
)
.await
.await?;
Ok(session)
}

async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult<DbError> {
recycle_spanner_session(conn, &self.database_name)
.await
.map_err(RecycleError::Backend)
recycle_spanner_session(
conn,
&self.database_name,
&self.metrics,
self.max_lifespan,
self.max_idle,
)
.await
.map_err(RecycleError::Backend)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/db/spanner/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod bb8;
// mod bb8;
mod deadpool;
mod session;

Expand Down
37 changes: 36 additions & 1 deletion src/db/spanner/manager/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ use googleapis_raw::spanner::v1::{
};
use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, MetadataBuilder};
use std::sync::Arc;
use std::time::SystemTime;

use crate::{db::error::DbError, server::metrics::Metrics};
use crate::{
db::error::{DbError, DbErrorKind},
server::metrics::Metrics,
};

const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";

Expand Down Expand Up @@ -63,7 +67,38 @@ pub async fn create_spanner_session(
pub async fn recycle_spanner_session(
conn: &mut SpannerSession,
database_name: &str,
metrics: &Metrics,
max_lifetime: Option<u32>,
max_idle: Option<u32>,
) -> Result<(), DbError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some(max_life) = max_lifetime {
// get the current UTC seconds
if let Some(age) = conn.session.create_time.clone().into_option() {
let age = now - age.seconds as u64;
if age > max_life as u64 {
metrics.incr("db.connection.max_life");
dbg!("### aging out", conn.session.get_name());
return Err(DbErrorKind::Expired.into());
}
}
}
// check how long that this has been idle...
if let Some(max_idle) = max_idle {
if let Some(idle) = conn.session.approximate_last_use_time.clone().into_option() {
// get current UTC seconds
let idle = std::cmp::max(0, now as i64 - idle.seconds);
if idle > max_idle as i64 {
metrics.incr("db.connection.max_idle");
dbg!("### idling out", conn.session.get_name());
return Err(DbErrorKind::Expired.into());
}
}
}

let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
if let Err(e) = conn.client.get_session_async(&req)?.await {
Expand Down
21 changes: 21 additions & 0 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
fmt,
ops::Deref,
sync::Arc,
time::SystemTime,
};

use futures::future::TryFutureExt;
Expand Down Expand Up @@ -2060,6 +2061,26 @@ impl<'a> Db<'a> for SpannerDb {
Box::pin(async move { db.get_collection_id_async(&name).map_err(Into::into).await })
}

fn get_connection_info(&self) -> results::ConnectionInfo {
let session = self.conn.session.clone();
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
results::ConnectionInfo {
age: session
.create_time
.into_option()
.map(|time| now - time.seconds)
.unwrap_or_default(),
idle: session
.approximate_last_use_time
.into_option()
.map(|time| now - time.seconds)
.unwrap_or_default(),
}
}

#[cfg(test)]
fn create_collection(&self, name: String) -> DbFuture<'_, i32> {
let db = self.clone();
Expand Down
38 changes: 33 additions & 5 deletions src/db/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::db::results::ConnectionInfo;
use crate::db::{params, Db, DbPool};
use crate::error::{ApiError, ApiErrorKind};
use crate::server::metrics::Metrics;
Expand All @@ -6,15 +7,18 @@ use crate::web::extractors::{
BsoParam, CollectionParam, HawkIdentifier, PreConditionHeader, PreConditionHeaderOpt,
};
use crate::web::middleware::SyncServerRequest;
use crate::web::tags::Tags;
use crate::web::X_LAST_MODIFIED;

use actix_http::http::{HeaderValue, Method, StatusCode};
use actix_http::Error;
use actix_http::{Error, Extensions};
use actix_web::dev::{Payload, PayloadStream};
use actix_web::http::header;
use actix_web::web::Data;
use actix_web::{FromRequest, HttpRequest, HttpResponse};
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use std::cell::RefMut;
use std::future::Future;

#[derive(Clone)]
Expand All @@ -27,13 +31,21 @@ pub struct DbTransactionPool {
precondition: PreConditionHeaderOpt,
}

fn set_extra(exts: &mut RefMut<'_, Extensions>, connection_info: ConnectionInfo) {
let mut tags = Tags::default();
tags.add_extra("connection_age", &connection_info.age.to_string());
tags.add_extra("connection_idle", &connection_info.idle.to_string());
tags.commit(exts);
}

impl DbTransactionPool {
/// Perform an action inside of a DB transaction. If the action fails, the
/// transaction is rolled back. If the action succeeds, the transaction is
/// NOT committed. Further processing is required before we are sure the
/// action has succeeded (ex. check HTTP response for internal error).
async fn transaction_internal<'a, A: 'a, R, F>(
&'a self,
request: HttpRequest,
action: A,
) -> Result<(R, Box<dyn Db<'a>>), Error>
where
Expand All @@ -53,6 +65,8 @@ impl DbTransactionPool {

// Handle lock error
if let Err(e) = result {
// Update the extra info fields.
set_extra(&mut request.extensions_mut(), db.get_connection_info());
db.rollback().await?;
return Err(e.into());
}
Expand All @@ -75,12 +89,16 @@ impl DbTransactionPool {
}

/// Perform an action inside of a DB transaction.
pub async fn transaction<'a, A: 'a, R, F>(&'a self, action: A) -> Result<R, Error>
pub async fn transaction<'a, A: 'a, R, F>(
&'a self,
request: HttpRequest,
action: A,
) -> Result<R, Error>
where
A: FnOnce(Box<dyn Db<'a>>) -> F,
F: Future<Output = Result<R, Error>> + 'a,
{
let (resp, db) = self.transaction_internal(action).await?;
let (resp, db) = self.transaction_internal(request, action).await?;

// No further processing before commit is possible
db.commit().await?;
Expand All @@ -89,13 +107,20 @@ impl DbTransactionPool {

/// Perform an action inside of a DB transaction. This method will rollback
/// if the HTTP response is an error.
pub async fn transaction_http<'a, A: 'a, F>(&'a self, action: A) -> Result<HttpResponse, Error>
pub async fn transaction_http<'a, A: 'a, F>(
&'a self,
request: HttpRequest,
action: A,
) -> Result<HttpResponse, Error>
where
A: FnOnce(Box<dyn Db<'a>>) -> F,
F: Future<Output = Result<HttpResponse, Error>> + 'a,
{
let mreq = request.clone();
let check_precondition = move |db: Box<dyn Db<'a>>| {
async move {
// set the extra information for all requests so we capture default err handlers.
set_extra(&mut mreq.extensions_mut(), db.get_connection_info());
let resource_ts = db
.extract_resource(
self.user_id.clone(),
Expand Down Expand Up @@ -144,7 +169,10 @@ impl DbTransactionPool {
}
};

let (resp, db) = self.transaction_internal(check_precondition).await?;
let (resp, db) = self
.transaction_internal(request.clone(), check_precondition)
.await?;
// match on error and return a composed HttpResponse (so we can use the tags?)

// HttpResponse can contain an internal error
match resp.error() {
Expand Down
10 changes: 5 additions & 5 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Drop for Metrics {
impl From<&HttpRequest> for Metrics {
fn from(req: &HttpRequest) -> Self {
let exts = req.extensions();
let def_tags = Tags::from_request_head(req.head());
let def_tags = Tags::from(req.head());
let tags = exts.get::<Tags>().unwrap_or(&def_tags);
Metrics {
client: match req.app_data::<Data<ServerState>>() {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Metrics {
pub fn start_timer(&mut self, label: &str, tags: Option<Tags>) {
let mut mtags = self.tags.clone().unwrap_or_default();
if let Some(t) = tags {
mtags.extend(t.tags)
mtags.extend(t)
}

trace!("⌚ Starting timer... {:?}", &label; &mtags);
Expand Down Expand Up @@ -138,7 +138,7 @@ impl Metrics {
let mut tagged = client.count_with_tags(label, count);
let mut mtags = self.tags.clone().unwrap_or_default();
if let Some(tags) = tags {
mtags.extend(tags.tags);
mtags.extend(tags);
}
for key in mtags.tags.keys().clone() {
if let Some(val) = mtags.tags.get(key) {
Expand Down Expand Up @@ -207,7 +207,7 @@ mod tests {
),
);

let tags = Tags::from_request_head(&rh);
let tags = Tags::from(&rh);

let mut result = HashMap::<String, String>::new();
result.insert("ua.os.ver".to_owned(), "NT 10.0".to_owned());
Expand All @@ -233,7 +233,7 @@ mod tests {
header::HeaderValue::from_static("Mozilla/5.0 (curl) Gecko/20100101 curl"),
);

let tags = Tags::from_request_head(&rh);
let tags = Tags::from(&rh);
assert!(!tags.tags.contains_key("ua.os.ver"));
println!("{:?}", tags);
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ async fn test_endpoint_with_body(
.call(req)
.await
.expect("Could not get sresponse in test_endpoint_with_body");
dbg!("got response", sresponse.response().status());
assert!(sresponse.response().status().is_success());
dbg!("all good");
test::read_body(sresponse).await
}

Expand Down
Loading

0 comments on commit 082dd1f

Please sign in to comment.