Skip to content

Commit

Permalink
Switch back to synchronous Mutex (fixes #74)
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Nov 4, 2020
1 parent eb6d276 commit 2a62b0e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 44 deletions.
47 changes: 21 additions & 26 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ use tokio::spawn;
use tokio::time::interval_at;

use crate::inner::{
add_connection, drop_connections, schedule_reaping, Conn, IdleConn, PoolInternals, SharedPool,
add_connection, drop_connections, schedule_reaping, ApprovalIter, Conn, IdleConn,
PoolInternals, SharedPool,
};

/// A trait which provides connection-specific functionality.
Expand Down Expand Up @@ -304,7 +305,7 @@ impl<M: ManageConnection> Builder<M> {
/// minimum number of connections, or it times out.
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager);
let stream = pool.replenish_idle_connections();
let stream = pool.replenish_idle_connections(pool.inner.wanted());
stream.try_fold((), |_, _| ok(())).await.map(|()| pool)
}

Expand All @@ -314,7 +315,7 @@ impl<M: ManageConnection> Builder<M> {
/// before returning.
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let p = self.build_inner(manager);
p.clone().spawn_replenishing();
p.clone().spawn_replenishing(p.inner.wanted());
p
}
}
Expand Down Expand Up @@ -376,17 +377,19 @@ impl<M: ManageConnection> Pool<M> {

fn replenish_idle_connections(
&self,
approvals: ApprovalIter,
) -> FuturesUnordered<impl Future<Output = Result<(), M::Error>>> {
let stream = FuturesUnordered::new();
for _ in 0..self.inner.wanted() {
stream.push(add_connection(self.inner.clone()));
for approval in approvals {
stream.push(add_connection(self.inner.clone(), approval));
}
stream
}

pub(crate) fn spawn_replenishing(self) {
pub(crate) fn spawn_replenishing(self, approvals: ApprovalIter) {
spawn(async move {
while let Some(result) = self.replenish_idle_connections().next().await {
let mut stream = self.replenish_idle_connections(approvals);
while let Some(result) = stream.next().await {
self.inner.sink_error(result);
}
});
Expand Down Expand Up @@ -425,13 +428,13 @@ impl<M: ManageConnection> Pool<M> {
Err((e, conn)) => (Err(e), conn),
};

self.put_back(birth, conn).await;
self.put_back(birth, conn);

r.map_err(RunError::User)
}

/// Return connection back in to the pool
async fn put_back(&self, birth: Instant, mut conn: M::Connection) {
fn put_back(&self, birth: Instant, mut conn: M::Connection) {
let inner = self.inner.clone();

// Supposed to be fast, but do it before locking anyways.
Expand All @@ -450,14 +453,13 @@ impl<M: ManageConnection> Pool<M> {
let inner = self.inner.clone();

loop {
let mut internals = inner.internals.lock();
if let Some(conn) = internals.conns.pop_front() {
if let Some((conn, approvals)) = inner.get() {
// Spin up a new connection if necessary to retain our minimum idle count
if internals.num_conns + internals.pending_conns < inner.statics.max_size {
if approvals.len() > 0 {
Pool {
inner: inner.clone(),
}
.spawn_replenishing();
.spawn_replenishing(approvals);
}

if inner.statics.test_on_check_out {
Expand All @@ -467,6 +469,7 @@ impl<M: ManageConnection> Pool<M> {
Ok(()) => return Ok(Conn { conn, birth }),
Err(_) => {
mem::drop(conn);
let mut internals = inner.internals.lock();
drop_connections(&inner, &mut internals, 1);
}
}
Expand All @@ -480,14 +483,9 @@ impl<M: ManageConnection> Pool<M> {
}

let (tx, rx) = oneshot::channel();
let current_and_pending_conns = {
let mut locked = inner.internals.lock();
locked.waiters.push_back(tx);
locked.num_conns + locked.pending_conns
};

if current_and_pending_conns < inner.statics.max_size {
spawn(async move { inner.sink_error(add_connection(inner.clone()).await) });
if let Some(approval) = inner.can_add_more(Some(tx)) {
let inner = inner.clone();
spawn(async move { inner.sink_error(add_connection(inner.clone(), approval).await) });
}

match inner.or_timeout(rx).await {
Expand Down Expand Up @@ -566,10 +564,7 @@ where
M: ManageConnection,
{
fn drop(&mut self) {
futures::executor::block_on(async {
self.pool
.put_back(self.checkout, self.conn.take().unwrap().conn)
.await;
})
self.pool
.put_back(self.checkout, self.conn.take().unwrap().conn);
}
}
103 changes: 85 additions & 18 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::mem;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -91,17 +90,64 @@ impl<M> SharedPool<M>
where
M: ManageConnection,
{
pub(crate) fn wanted(&self) -> u32 {
let desired = self.statics.min_idle.unwrap_or(0);
let internals = self.internals.lock();
let idle_or_pending = internals.conns.len() as u32 + internals.pending_conns;
if idle_or_pending < desired {
desired - idle_or_pending
pub(crate) fn get(&self) -> Option<(IdleConn<M::Connection>, ApprovalIter)> {
let mut locked = self.internals.lock();
locked.conns.pop_front().map(|conn| {
let wanted = self.want_more(&mut locked);
(conn, self.approvals(&mut locked, wanted))
})
}

pub(crate) fn can_add_more(
&self,
waiter: Option<oneshot::Sender<Conn<M::Connection>>>,
) -> Option<Approval> {
let mut locked = self.internals.lock();
if let Some(waiter) = waiter {
locked.waiters.push_back(waiter);
}

if locked.num_conns + locked.pending_conns < self.statics.max_size {
locked.pending_conns += 1;
Some(Approval { _priv: () })
} else {
None
}
}

pub(crate) fn wanted(&self) -> ApprovalIter {
let mut internals = self.internals.lock();
let num = self.want_more(&mut internals);
self.approvals(&mut internals, num)
}

fn want_more(&self, locked: &mut MutexGuard<PoolInternals<M::Connection>>) -> u32 {
let available = locked.conns.len() as u32 + locked.pending_conns;
let min_idle = self.statics.min_idle.unwrap_or(0);
if available < min_idle {
min_idle - available
} else {
0
}
}

fn approvals(
&self,
locked: &mut MutexGuard<PoolInternals<M::Connection>>,
num: u32,
) -> ApprovalIter {
let current = locked.num_conns + locked.pending_conns;
let allowed = if current < self.statics.max_size {
self.statics.max_size - current
} else {
0
};

let num = min(num, allowed);
locked.pending_conns += num;
ApprovalIter { num: num as usize }
}

pub(crate) fn sink_error(&self, result: Result<(), M::Error>) {
match result {
Ok(()) => {}
Expand All @@ -126,18 +172,10 @@ where
}

// Outside of Pool to avoid borrow splitting issues on self
pub(crate) async fn add_connection<M>(pool: Arc<SharedPool<M>>) -> Result<(), M::Error>
pub(crate) async fn add_connection<M>(pool: Arc<SharedPool<M>>, _: Approval) -> Result<(), M::Error>
where
M: ManageConnection,
{
let mut internals = pool.internals.lock();
if internals.num_conns + internals.pending_conns >= pool.statics.max_size {
return Ok(());
}

internals.pending_conns += 1;
mem::drop(internals);

let new_shared = Arc::downgrade(&pool);
let shared = match new_shared.upgrade() {
None => return Ok(()),
Expand Down Expand Up @@ -176,6 +214,34 @@ where
}
}

pub(crate) struct ApprovalIter {
num: usize,
}

impl Iterator for ApprovalIter {
type Item = Approval;

fn next(&mut self) -> Option<Self::Item> {
match self.num {
0 => None,
_ => {
self.num -= 1;
Some(Approval { _priv: () })
}
}
}
}

impl ExactSizeIterator for ApprovalIter {
fn len(&self) -> usize {
self.num
}
}

pub(crate) struct Approval {
_priv: (),
}

// Drop connections
// NB: This is called with the pool lock held.
pub(crate) fn drop_connections<'a, M>(
Expand All @@ -188,11 +254,12 @@ pub(crate) fn drop_connections<'a, M>(
internals.num_conns -= dropped as u32;
// We might need to spin up more connections to maintain the idle limit, e.g.
// if we hit connection lifetime limits
if internals.num_conns + internals.pending_conns < pool.statics.max_size {
let num = pool.want_more(internals);
if num > 0 {
Pool {
inner: pool.clone(),
}
.spawn_replenishing();
.spawn_replenishing(pool.approvals(internals, num));
}
}

Expand Down

0 comments on commit 2a62b0e

Please sign in to comment.