Skip to content

Commit

Permalink
fix: pool internals improvements
Browse files Browse the repository at this point in the history
* fix `DecrementSizeGuard::drop()` only waking one `Waiter` regardless of whether that waiter was already woken
* fix connect-backoff loop giving up the size guard
* don't cut in line to open a new connection
* have tasks waiting on `acquire()` wake periodically to check if there's a connection in the queue

Signed-off-by: Austin Bonander <[email protected]>
  • Loading branch information
abonander authored and mehcode committed Apr 6, 2021
1 parent 64e872f commit 5295ff1
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 135 deletions.
2 changes: 1 addition & 1 deletion sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<DB: Database> PoolConnection<DB> {
/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
let mut floating = live.float(&pool);
Expand Down
231 changes: 121 additions & 110 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use crate::pool::{deadline_as_timeout, PoolOptions};
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures_core::task::{Poll, Waker};
use futures_util::future;
use sqlx_rt::{sleep, spawn, timeout};
use std::cmp;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::task::Context;
use std::time::Instant;
use std::time::{Duration, Instant};

/// Waiters should wake at least every this often to check if a connection has not come available
/// since they went to sleep.
const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500);

pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
Expand All @@ -26,6 +29,26 @@ pub(crate) struct SharedPool<DB: Database> {
}

impl<DB: Database> SharedPool<DB> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
) -> Arc<Self> {
let pool = Self {
connect_options,
idle_conns: ArrayQueue::new(options.max_connections as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
};

let pool = Arc::new(pool);

spawn_reaper(&pool);

pool
}

pub(super) fn size(&self) -> u32 {
self.size.load(Ordering::Acquire)
}
Expand Down Expand Up @@ -94,12 +117,7 @@ impl<DB: Database> SharedPool<DB> {
panic!("BUG: connection queue overflow in release()");
}

while let Some(waker) = self.waiters.pop() {
if let Some(waker) = waker.upgrade() {
waker.wake();
break;
}
}
wake_one(&self.waiters);
}

/// Try to atomically increment the pool size for a new connection.
Expand All @@ -125,68 +143,20 @@ impl<DB: Database> SharedPool<DB> {
None
}

/// Wait for a connection, if either `size` drops below `max_connections` so we can
/// open a new connection, or if an idle connection is returned to the pool.
///
/// Returns an error if `deadline` elapses before we are woken.
async fn wait_for_conn(&self, deadline: Instant) -> Result<(), Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}

let mut waiter = None;

timeout(
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});

if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.map_err(|_| Error::PoolTimedOut)
}

pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
) -> Arc<Self> {
let pool = Self {
connect_options,
idle_conns: ArrayQueue::new(options.max_connections as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
};

let pool = Arc::new(pool);

spawn_reaper(&pool);

pool
}

#[allow(clippy::needless_lifetimes)]
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
let mut waited = !self.options.fair;
let mut backoff = 0.01;

// the strong ref of the `Weak<Waiter>` that we push to the queue
// initialized during the `timeout()` call below
// as long as we own this, we keep our place in line
let mut waiter = None;

// Unless the pool has been closed ...
while !self.is_closed() {
// Don't cut in line
// Don't cut in line unless no one is waiting
if waited || self.waiters.is_empty() {
// Attempt to immediately acquire a connection. This will return Some
// if there is an idle connection in our channel.
Expand All @@ -195,28 +165,40 @@ impl<DB: Database> SharedPool<DB> {
return Ok(live);
}
}
}

if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection
match self.connection(deadline, guard).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
Ok(None) => {
// If the connection is refused wait in exponentially
// increasing steps for the server to come up, capped by
// two seconds.
sqlx_rt::sleep(std::time::Duration::from_secs_f64(backoff)).await;
backoff = f64::min(backoff * 2.0, 2.0);
continue;
}
Err(e) => return Err(e),
// check if we can open a new connection
if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection
return self.connection(deadline, guard).await;
}
}

// Wait for a connection to become available (or we are allowed to open a new one)
// Returns an error if `deadline` passes
self.wait_for_conn(deadline).await?;
let timeout_duration = cmp::min(
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
MIN_WAKE_PERIOD,
);

sqlx_rt::timeout(
timeout_duration,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});

if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.ok(); // timeout is no longer fatal here; we check if the deadline expired above

waited = true;
}
Expand All @@ -228,39 +210,51 @@ impl<DB: Database> SharedPool<DB> {
&'s self,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> Result<Option<Floating<'s, Live<DB>>>, Error> {
) -> Result<Floating<'s, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}

let timeout = super::deadline_as_timeout::<DB>(deadline)?;
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout::<DB>(deadline)? / 5;

loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
if let Some(callback) = &self.options.after_connect {
callback(&mut raw).await?;
}

// result here is `Result<Result<C, Error>, TimeoutError>`
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
if let Some(callback) = &self.options.after_connect {
callback(&mut raw).await?;
return Ok(Floating::new_live(raw, guard));
}

Ok(Some(Floating::new_live(raw, guard)))
}
// an IO error while connecting is assumed to be the system starting up
Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (),

// an IO error while connecting is assumed to be the system starting up
Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => Ok(None),
// TODO: Handle other database "boot period"s

// TODO: Handle other database "boot period"s
// [postgres] the database system is starting up
// TODO: Make this check actually check if this is postgres
Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => (),

// [postgres] the database system is starting up
// TODO: Make this check actually check if this is postgres
Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => Ok(None),
// Any other error while connection should immediately
// terminate and bubble the error up
Ok(Err(e)) => return Err(e),

// Any other error while connection should immediately
// terminate and bubble the error up
Ok(Err(e)) => Err(e),
// timed out
Err(_) => return Err(Error::PoolTimedOut),
}

// timed out
Err(_) => Err(Error::PoolTimedOut),
// If the connection is refused wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
sqlx_rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}
}
}
Expand Down Expand Up @@ -334,7 +328,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {

let pool = Arc::clone(&pool);

spawn(async move {
sqlx_rt::spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);
Expand All @@ -360,11 +354,21 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let _ = conn.close().await;
}

sleep(period).await;
sqlx_rt::sleep(period).await;
}
});
}

fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
while let Some(weak) = waiters.pop() {
if let Some(waiter) = weak.upgrade() {
if waiter.wake() {
return;
}
}
}
}

/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
Expand Down Expand Up @@ -399,11 +403,7 @@ impl Drop for DecrementSizeGuard<'_> {
assert!(!self.dropped, "double-dropped!");
self.dropped = true;
self.size.fetch_sub(1, Ordering::SeqCst);
if let Some(waker) = self.waiters.pop() {
if let Some(waker) = waker.upgrade() {
waker.wake();
}
}
wake_one(&self.waiters);
}
}

Expand All @@ -420,9 +420,20 @@ impl Waiter {
})
}

fn wake(&self) {
self.woken.store(true, Ordering::Release);
self.waker.wake_by_ref();
/// Wake this waiter if it has not previously been woken.
///
/// Return `true` if this waiter was newly woken, or `false` if it was already woken.
fn wake(&self) -> bool {
// if we were the thread to flip this boolean from false to true
if let Ok(_) = self
.woken
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
self.waker.wake_by_ref();
return true;
}

false
}

fn is_woken(&self) -> bool {
Expand Down
16 changes: 7 additions & 9 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,14 @@ async fn init_min_connections<DB: Database>(pool: &SharedPool<DB>) -> Result<(),
// this guard will prevent us from exceeding `max_size`
if let Some(guard) = pool.try_increment_size() {
// [connect] will raise an error when past deadline
// [connect] returns None if its okay to retry
if let Some(conn) = pool.connection(deadline, guard).await? {
let is_ok = pool
.idle_conns
.push(conn.into_idle().into_leakable())
.is_ok();
let conn = pool.connection(deadline, guard).await?;
let is_ok = pool
.idle_conns
.push(conn.into_idle().into_leakable())
.is_ok();

if !is_ok {
panic!("BUG: connection queue overflow in init_min_connections");
}
if !is_ok {
panic!("BUG: connection queue overflow in init_min_connections");
}
}
}
Expand Down
Loading

0 comments on commit 5295ff1

Please sign in to comment.