diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index f36c4e3ed..c91315dc3 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -81,7 +81,9 @@ impl AppState { StorageType::DynamoDb => Box::new(DdbClientImpl::new(metrics.clone(), &db_settings)?), #[cfg(feature = "bigtable")] StorageType::BigTable => { - Box::new(BigTableClientImpl::new(metrics.clone(), &db_settings)?) + let client = BigTableClientImpl::new(metrics.clone(), &db_settings)?; + client.spawn_sweeper(Duration::from_secs(30)); + Box::new(client) } #[cfg(all(feature = "bigtable", feature = "dynamodb"))] StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?), diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 3be7cedbd..5c51a9f39 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -75,7 +75,9 @@ impl Server { #[cfg(feature = "bigtable")] StorageType::BigTable => { debug!("Using BigTable"); - Box::new(BigTableClientImpl::new(metrics.clone(), &db_settings)?) + let client = BigTableClientImpl::new(metrics.clone(), &db_settings)?; + client.spawn_sweeper(Duration::from_secs(30)); + Box::new(client) } #[cfg(all(feature = "bigtable", feature = "dual"))] StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?), diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 86c314a1a..3c3eeec1a 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -305,6 +305,11 @@ impl BigTableClientImpl { }) } + /// Spawn a task to periodically evict idle connections + pub fn spawn_sweeper(&self, interval: Duration) { + self.pool.spawn_sweeper(interval); + } + /// Return a ReadRowsRequest for a given row key fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest { read_row_request(&self.settings.table_name, row_key) diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index ebae53ee8..ac916e754 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -1,9 +1,13 @@ -use std::time::Instant; -use std::{fmt, sync::Arc}; +use std::{ + fmt, + sync::Arc, + time::{Duration, Instant}, +}; +use actix_web::rt; use async_trait::async_trait; use cadence::StatsdClient; -use deadpool::managed::{Manager, PoolConfig, Timeouts}; +use deadpool::managed::{Manager, PoolConfig, QueueMode, Timeouts}; use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder}; use crate::db::bigtable::{bigtable_client::BigtableDb, BigTableDbSettings, BigTableError}; @@ -93,7 +97,12 @@ impl BigTablePool { connection.clone(), metrics.clone(), )?; - let mut config = PoolConfig::default(); + let mut config = PoolConfig { + // Prefer LIFO to allow the sweeper task to evict least frequently + // used connections + queue_mode: QueueMode::Lifo, + ..Default::default() + }; if let Some(size) = bt_settings.database_pool_max_size { debug!("🏊 Setting pool max size {}", &size); config.max_size = size as usize; @@ -116,6 +125,24 @@ impl BigTablePool { _metrics: metrics.clone(), }) } + + /// Spawn a task to periodically evict idle connections + pub fn spawn_sweeper(&self, interval: Duration) { + let Some(max_idle) = self.pool.manager().settings.database_pool_max_idle else { + return; + }; + let pool = self.pool.clone(); + rt::spawn(async move { + loop { + sweeper(&pool, max_idle); + rt::time::sleep(interval).await; + } + }); + } +} + +fn sweeper(pool: &deadpool::managed::Pool, max_idle: Duration) { + pool.retain(|_, metrics| metrics.last_used() < max_idle); } /// BigTable Pool Manager. This contains everything needed to create a new connection.