From 2a62b0ed926afd779a115b3813f70a385088da8d Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Wed, 4 Nov 2020 11:27:18 +0100 Subject: [PATCH] Switch back to synchronous Mutex (fixes #74) --- bb8/src/api.rs | 47 ++++++++++----------- bb8/src/inner.rs | 103 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 106 insertions(+), 44 deletions(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 7a48a70..deaad6c 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -55,7 +55,8 @@ use tokio::spawn; use tokio::time::interval_at; use crate::inner::{ - add_connection, drop_connections, schedule_reaping, Conn, IdleConn, PoolInternals, SharedPool, + add_connection, drop_connections, schedule_reaping, ApprovalIter, Conn, IdleConn, + PoolInternals, SharedPool, }; /// A trait which provides connection-specific functionality. @@ -304,7 +305,7 @@ impl Builder { /// minimum number of connections, or it times out. pub async fn build(self, manager: M) -> Result, M::Error> { let pool = self.build_inner(manager); - let stream = pool.replenish_idle_connections(); + let stream = pool.replenish_idle_connections(pool.inner.wanted()); stream.try_fold((), |_, _| ok(())).await.map(|()| pool) } @@ -314,7 +315,7 @@ impl Builder { /// before returning. pub fn build_unchecked(self, manager: M) -> Pool { let p = self.build_inner(manager); - p.clone().spawn_replenishing(); + p.clone().spawn_replenishing(p.inner.wanted()); p } } @@ -376,17 +377,19 @@ impl Pool { fn replenish_idle_connections( &self, + approvals: ApprovalIter, ) -> FuturesUnordered>> { let stream = FuturesUnordered::new(); - for _ in 0..self.inner.wanted() { - stream.push(add_connection(self.inner.clone())); + for approval in approvals { + stream.push(add_connection(self.inner.clone(), approval)); } stream } - pub(crate) fn spawn_replenishing(self) { + pub(crate) fn spawn_replenishing(self, approvals: ApprovalIter) { spawn(async move { - while let Some(result) = self.replenish_idle_connections().next().await { + let mut stream = self.replenish_idle_connections(approvals); + while let Some(result) = stream.next().await { self.inner.sink_error(result); } }); @@ -425,13 +428,13 @@ impl Pool { Err((e, conn)) => (Err(e), conn), }; - self.put_back(birth, conn).await; + self.put_back(birth, conn); r.map_err(RunError::User) } /// Return connection back in to the pool - async fn put_back(&self, birth: Instant, mut conn: M::Connection) { + fn put_back(&self, birth: Instant, mut conn: M::Connection) { let inner = self.inner.clone(); // Supposed to be fast, but do it before locking anyways. @@ -450,14 +453,13 @@ impl Pool { let inner = self.inner.clone(); loop { - let mut internals = inner.internals.lock(); - if let Some(conn) = internals.conns.pop_front() { + if let Some((conn, approvals)) = inner.get() { // Spin up a new connection if necessary to retain our minimum idle count - if internals.num_conns + internals.pending_conns < inner.statics.max_size { + if approvals.len() > 0 { Pool { inner: inner.clone(), } - .spawn_replenishing(); + .spawn_replenishing(approvals); } if inner.statics.test_on_check_out { @@ -467,6 +469,7 @@ impl Pool { Ok(()) => return Ok(Conn { conn, birth }), Err(_) => { mem::drop(conn); + let mut internals = inner.internals.lock(); drop_connections(&inner, &mut internals, 1); } } @@ -480,14 +483,9 @@ impl Pool { } let (tx, rx) = oneshot::channel(); - let current_and_pending_conns = { - let mut locked = inner.internals.lock(); - locked.waiters.push_back(tx); - locked.num_conns + locked.pending_conns - }; - - if current_and_pending_conns < inner.statics.max_size { - spawn(async move { inner.sink_error(add_connection(inner.clone()).await) }); + if let Some(approval) = inner.can_add_more(Some(tx)) { + let inner = inner.clone(); + spawn(async move { inner.sink_error(add_connection(inner.clone(), approval).await) }); } match inner.or_timeout(rx).await { @@ -566,10 +564,7 @@ where M: ManageConnection, { fn drop(&mut self) { - futures::executor::block_on(async { - self.pool - .put_back(self.checkout, self.conn.take().unwrap().conn) - .await; - }) + self.pool + .put_back(self.checkout, self.conn.take().unwrap().conn); } } diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index bdfe106..a325af6 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -1,6 +1,5 @@ use std::cmp::{max, min}; use std::collections::VecDeque; -use std::mem; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -91,17 +90,64 @@ impl SharedPool where M: ManageConnection, { - pub(crate) fn wanted(&self) -> u32 { - let desired = self.statics.min_idle.unwrap_or(0); - let internals = self.internals.lock(); - let idle_or_pending = internals.conns.len() as u32 + internals.pending_conns; - if idle_or_pending < desired { - desired - idle_or_pending + pub(crate) fn get(&self) -> Option<(IdleConn, ApprovalIter)> { + let mut locked = self.internals.lock(); + locked.conns.pop_front().map(|conn| { + let wanted = self.want_more(&mut locked); + (conn, self.approvals(&mut locked, wanted)) + }) + } + + pub(crate) fn can_add_more( + &self, + waiter: Option>>, + ) -> Option { + let mut locked = self.internals.lock(); + if let Some(waiter) = waiter { + locked.waiters.push_back(waiter); + } + + if locked.num_conns + locked.pending_conns < self.statics.max_size { + locked.pending_conns += 1; + Some(Approval { _priv: () }) + } else { + None + } + } + + pub(crate) fn wanted(&self) -> ApprovalIter { + let mut internals = self.internals.lock(); + let num = self.want_more(&mut internals); + self.approvals(&mut internals, num) + } + + fn want_more(&self, locked: &mut MutexGuard>) -> u32 { + let available = locked.conns.len() as u32 + locked.pending_conns; + let min_idle = self.statics.min_idle.unwrap_or(0); + if available < min_idle { + min_idle - available } else { 0 } } + fn approvals( + &self, + locked: &mut MutexGuard>, + num: u32, + ) -> ApprovalIter { + let current = locked.num_conns + locked.pending_conns; + let allowed = if current < self.statics.max_size { + self.statics.max_size - current + } else { + 0 + }; + + let num = min(num, allowed); + locked.pending_conns += num; + ApprovalIter { num: num as usize } + } + pub(crate) fn sink_error(&self, result: Result<(), M::Error>) { match result { Ok(()) => {} @@ -126,18 +172,10 @@ where } // Outside of Pool to avoid borrow splitting issues on self -pub(crate) async fn add_connection(pool: Arc>) -> Result<(), M::Error> +pub(crate) async fn add_connection(pool: Arc>, _: Approval) -> Result<(), M::Error> where M: ManageConnection, { - let mut internals = pool.internals.lock(); - if internals.num_conns + internals.pending_conns >= pool.statics.max_size { - return Ok(()); - } - - internals.pending_conns += 1; - mem::drop(internals); - let new_shared = Arc::downgrade(&pool); let shared = match new_shared.upgrade() { None => return Ok(()), @@ -176,6 +214,34 @@ where } } +pub(crate) struct ApprovalIter { + num: usize, +} + +impl Iterator for ApprovalIter { + type Item = Approval; + + fn next(&mut self) -> Option { + match self.num { + 0 => None, + _ => { + self.num -= 1; + Some(Approval { _priv: () }) + } + } + } +} + +impl ExactSizeIterator for ApprovalIter { + fn len(&self) -> usize { + self.num + } +} + +pub(crate) struct Approval { + _priv: (), +} + // Drop connections // NB: This is called with the pool lock held. pub(crate) fn drop_connections<'a, M>( @@ -188,11 +254,12 @@ pub(crate) fn drop_connections<'a, M>( internals.num_conns -= dropped as u32; // We might need to spin up more connections to maintain the idle limit, e.g. // if we hit connection lifetime limits - if internals.num_conns + internals.pending_conns < pool.statics.max_size { + let num = pool.want_more(internals); + if num > 0 { Pool { inner: pool.clone(), } - .spawn_replenishing(); + .spawn_replenishing(pool.approvals(internals, num)); } }