From 4d7cab21cfbb62493013347320120b85911d7523 Mon Sep 17 00:00:00 2001 From: Taylor Neely Date: Tue, 15 Oct 2024 20:36:04 +0000 Subject: [PATCH] Reap expired connections on drop The reaper only runs against the connections in its idle pool. This is fine for reaping idle connections, but for hotly contested connections beyond their maximum lifetime this can prove problematic. Consider an active connection beyond its lifetime and a reaper that runs every 3 seconds: - [t0] Connection is idle - [t1] Connection is active - [t2] Reaper runs, does not see connection - [t3] Connection is idle This pattern can repeat infinitely with the connection never being reaped. By checking the max lifetime on drop, we can ensure that expired connections are reaped in a reason amount of time (assuming they eventually do get dropped). --- bb8/src/inner.rs | 14 +++++++++++--- bb8/tests/test.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index be2ef52..e4bfb63 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -149,12 +149,20 @@ where "handled in caller" ); + let max_lifetime = self.inner.statics.max_lifetime; + let is_expired = max_lifetime.map_or(false, |lt| conn.is_expired(Instant::now() - lt)); + let is_broken = self.inner.manager.has_broken(&mut conn.conn); + let mut locked = self.inner.internals.lock(); - match (state, self.inner.manager.has_broken(&mut conn.conn)) { - (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), - (_, is_broken) => { + match (state, is_broken || is_expired) { + (ConnectionState::Present, false) if !is_expired => { + locked.put(conn, None, self.inner.clone()) + } + _ => { if is_broken { self.inner.statistics.record(StatsKind::ClosedBroken); + } else if is_expired { + self.inner.statistics.record_connections_reaped(0, 1); } let approvals = locked.dropped(1, &self.inner.statics); self.spawn_replenishing_approvals(approvals); diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 0e1225e..581c2b9 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -1,4 +1,5 @@ use bb8::*; +use tokio::time::sleep; use std::future::Future; use std::marker::PhantomData; @@ -585,6 +586,40 @@ async fn test_max_lifetime() { assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5); } +#[tokio::test] +async fn test_max_lifetime_reap_on_drop() { + static DROPPED: AtomicUsize = AtomicUsize::new(0); + + #[derive(Default)] + struct Connection; + + impl Drop for Connection { + fn drop(&mut self) { + DROPPED.fetch_add(1, Ordering::SeqCst); + } + } + + let manager = OkManager::::new(); + let pool = Pool::builder() + .max_lifetime(Some(Duration::from_secs(1))) + .connection_timeout(Duration::from_secs(1)) + .reaper_rate(Duration::from_secs(999)) + .build(manager) + .await + .unwrap(); + + let conn = pool.get().await; + + // And wait. + sleep(Duration::from_secs(2)).await; + assert_eq!(DROPPED.load(Ordering::SeqCst), 0); + + // Connection is reaped on drop. + drop(conn); + assert_eq!(DROPPED.load(Ordering::SeqCst), 1); + assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 1); +} + #[tokio::test] async fn test_min_idle() { let pool = Pool::builder()