Skip to content

Commit

Permalink
Merge branch 'master' into feat/drop-incomplete-records-SYNC-4172
Browse files Browse the repository at this point in the history
  • Loading branch information
pjenvey authored Mar 12, 2024
2 parents 7465fc2 + 70b1e7b commit 2555a2f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
4 changes: 3 additions & 1 deletion autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ impl AppState {
StorageType::DynamoDb => Box::new(DdbClientImpl::new(metrics.clone(), &db_settings)?),
#[cfg(feature = "bigtable")]
StorageType::BigTable => {
Box::new(BigTableClientImpl::new(metrics.clone(), &db_settings)?)
let client = BigTableClientImpl::new(metrics.clone(), &db_settings)?;
client.spawn_sweeper(Duration::from_secs(30));
Box::new(client)
}
#[cfg(all(feature = "bigtable", feature = "dynamodb"))]
StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?),
Expand Down
4 changes: 3 additions & 1 deletion autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ impl Server {
#[cfg(feature = "bigtable")]
StorageType::BigTable => {
debug!("Using BigTable");
Box::new(BigTableClientImpl::new(metrics.clone(), &db_settings)?)
let client = BigTableClientImpl::new(metrics.clone(), &db_settings)?;
client.spawn_sweeper(Duration::from_secs(30));
Box::new(client)
}
#[cfg(all(feature = "bigtable", feature = "dual"))]
StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?),
Expand Down
5 changes: 5 additions & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ impl BigTableClientImpl {
})
}

/// Spawn a task to periodically evict idle connections
pub fn spawn_sweeper(&self, interval: Duration) {
self.pool.spawn_sweeper(interval);
}

/// Return a ReadRowsRequest for a given row key
fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest {
read_row_request(&self.settings.table_name, row_key)
Expand Down
35 changes: 31 additions & 4 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::time::Instant;
use std::{fmt, sync::Arc};
use std::{
fmt,
sync::Arc,
time::{Duration, Instant},
};

use actix_web::rt;
use async_trait::async_trait;
use cadence::StatsdClient;
use deadpool::managed::{Manager, PoolConfig, Timeouts};
use deadpool::managed::{Manager, PoolConfig, QueueMode, Timeouts};
use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder};

use crate::db::bigtable::{bigtable_client::BigtableDb, BigTableDbSettings, BigTableError};
Expand Down Expand Up @@ -93,7 +97,12 @@ impl BigTablePool {
connection.clone(),
metrics.clone(),
)?;
let mut config = PoolConfig::default();
let mut config = PoolConfig {
// Prefer LIFO to allow the sweeper task to evict least frequently
// used connections
queue_mode: QueueMode::Lifo,
..Default::default()
};
if let Some(size) = bt_settings.database_pool_max_size {
debug!("🏊 Setting pool max size {}", &size);
config.max_size = size as usize;
Expand All @@ -116,6 +125,24 @@ impl BigTablePool {
_metrics: metrics.clone(),
})
}

/// Spawn a task to periodically evict idle connections
pub fn spawn_sweeper(&self, interval: Duration) {
let Some(max_idle) = self.pool.manager().settings.database_pool_max_idle else {
return;
};
let pool = self.pool.clone();
rt::spawn(async move {
loop {
sweeper(&pool, max_idle);
rt::time::sleep(interval).await;
}
});
}
}

fn sweeper(pool: &deadpool::managed::Pool<BigtableClientManager>, max_idle: Duration) {
pool.retain(|_, metrics| metrics.last_used() < max_idle);
}

/// BigTable Pool Manager. This contains everything needed to create a new connection.
Expand Down

0 comments on commit 2555a2f

Please sign in to comment.