Skip to content

Commit

Permalink
fix(pool): reimplement pool internals with futures-intrusive
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Jul 16, 2021
1 parent e33e451 commit c685a96
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 233 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ encoding_rs = { version = "0.8.23", optional = true }
either = "1.5.3"
futures-channel = { version = "0.3.5", default-features = false, features = ["sink", "alloc", "std"] }
futures-core = { version = "0.3.5", default-features = false }
futures-intrusive = "0.4.0"
futures-util = { version = "0.3.5", features = ["sink"] }
generic-array = { version = "0.14.4", default-features = false, optional = true }
hex = "0.4.2"
Expand Down
42 changes: 29 additions & 13 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::inner::{DecrementSizeGuard, SharedPool};
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use sqlx_rt::spawn;
use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;

use futures_intrusive::sync::SemaphoreReleaser;

use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, SharedPool};

/// A connection managed by a [`Pool`][crate::pool::Pool].
///
/// Will be returned to the pool on-drop.
Expand All @@ -28,8 +31,8 @@ pub(super) struct Idle<DB: Database> {

/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<'p, C> {
inner: C,
guard: DecrementSizeGuard<'p>,
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<'p>,
}

const DEREF_ERR: &str = "(bug) connection already released to pool";
Expand Down Expand Up @@ -71,7 +74,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
sqlx_rt::spawn(async move {
let mut floating = live.float(&pool);

// test the connection on-release to ensure it is still viable
Expand Down Expand Up @@ -102,7 +105,8 @@ impl<DB: Database> Live<DB> {
pub fn float(self, pool: &SharedPool<DB>) -> Floating<'_, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new(pool),
// create a new guard from a previously leaked permit
guard: DecrementSizeGuard::new_permit(pool),
}
}

Expand Down Expand Up @@ -161,6 +165,11 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}
}

pub async fn close(self) -> Result<(), Error> {
// `guard` is dropped as intended
self.inner.raw.close().await
}

pub fn detach(self) -> DB::Connection {
self.inner.raw
}
Expand All @@ -174,10 +183,14 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}

impl<'s, DB: Database> Floating<'s, Idle<DB>> {
pub fn from_idle(idle: Idle<DB>, pool: &'s SharedPool<DB>) -> Self {
pub fn from_idle(
idle: Idle<DB>,
pool: &'s SharedPool<DB>,
permit: SemaphoreReleaser<'s>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::new(pool),
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}

Expand All @@ -192,9 +205,12 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
}
}

pub async fn close(self) -> Result<(), Error> {
pub async fn close(self) -> DecrementSizeGuard<'s> {
// `guard` is dropped as intended
self.inner.live.raw.close().await
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
}
self.guard
}
}

Expand Down
Loading

0 comments on commit c685a96

Please sign in to comment.