Skip to content

Commit

Permalink
feat: Add set_connect_options method to Pool (#2088)
Browse files Browse the repository at this point in the history
* feat: Add set_connect_options method to Pool

This allows external updates of the ConnectionOptions used when a new
connection needs to be opened for the pool.  The primary use case
is to support dynamically updated (read: rotated) credentials used
by systems like AWS RDS.

* Use Arc wrapper for ConnectOptions to reduce lock contention

* sqlite fix

* Use direct assignment instead of mem::swap

Co-authored-by: Austin Bonander <[email protected]>

Co-authored-by: Austin Bonander <[email protected]>
  • Loading branch information
moatra and abonander committed Feb 18, 2023
1 parent e1080ce commit c0f73e9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
7 changes: 6 additions & 1 deletion sqlx-core/src/mysql/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -152,7 +153,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
})
}
Expand Down
16 changes: 12 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::pool::options::PoolConnectionMetadata;
Expand All @@ -20,7 +20,7 @@ use futures_util::FutureExt;
use std::time::{Duration, Instant};

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
Expand All @@ -47,7 +47,7 @@ impl<DB: Database> PoolInner<DB> {
};

let pool = Self {
connect_options,
connect_options: RwLock::new(Arc::new(connect_options)),
idle_conns: ArrayQueue::new(capacity),
semaphore: Semaphore::new(options.fair, semaphore_capacity),
size: AtomicU32::new(0),
Expand Down Expand Up @@ -292,9 +292,17 @@ impl<DB: Database> PoolInner<DB> {
loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;

// clone the connect options arc so it can be used without holding the RwLockReadGuard
// across an async await point
let connect_options = self
.connect_options
.read()
.expect("write-lock holder panicked")
.clone();

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
match sqlx_rt::timeout(timeout, connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
Expand Down
30 changes: 26 additions & 4 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -489,9 +490,26 @@ impl<DB: Database> Pool<DB> {
self.0.num_idle()
}

/// Get the connection options for this pool
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
&self.0.connect_options
/// Gets a clone of the connection options for this pool
pub fn connect_options(&self) -> Arc<<DB::Connection as Connection>::Options> {
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.clone()
}

/// Updates the connection options this pool will use when opening any future connections. Any
/// existing open connection in the pool will be left as-is.
pub fn set_connect_options(&self, connect_options: <DB::Connection as Connection>::Options) {
// technically write() could also panic if the current thread already holds the lock,
// but because this method can't be re-entered by the same thread that shouldn't be a problem
let mut guard = self
.0
.connect_options
.write()
.expect("write-lock holder panicked");
*guard = Arc::new(connect_options);
}

/// Get the options for this pool
Expand All @@ -514,7 +532,11 @@ impl Pool<Any> {
///
/// Determined by the connection URL.
pub fn any_kind(&self) -> AnyKind {
self.0.connect_options.kind()
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.kind()
}
}

Expand Down
7 changes: 6 additions & 1 deletion sqlx-core/src/postgres/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -159,7 +160,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
})
}
Expand Down

0 comments on commit c0f73e9

Please sign in to comment.