diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 11f765f618..bbf6ed8168 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -12,14 +12,14 @@ use std::cmp; use std::mem; use std::ptr; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::task::Context; use std::time::Instant; pub(crate) struct SharedPool { pub(super) connect_options: ::Options, pub(super) idle_conns: ArrayQueue>, - waiters: SegQueue>, + waiters: SegQueue>, pub(super) size: AtomicU32, is_closed: AtomicBool, pub(super) options: PoolOptions, @@ -42,7 +42,9 @@ impl SharedPool { pub(super) async fn close(&self) { self.is_closed.store(true, Ordering::Release); while let Some(waker) = self.waiters.pop() { - waker.wake(); + if let Some(waker) = waker.upgrade() { + waker.wake(); + } } // ensure we wait until the pool is actually closed @@ -92,8 +94,11 @@ impl SharedPool { panic!("BUG: connection queue overflow in release()"); } - if let Some(waker) = self.waiters.pop() { - waker.wake(); + while let Some(waker) = self.waiters.pop() { + if let Some(waker) = waker.upgrade() { + waker.wake(); + break; + } } } @@ -137,7 +142,7 @@ impl SharedPool { future::poll_fn(|cx| -> Poll<()> { let waiter = waiter.get_or_insert_with(|| { let waiter = Waiter::new(cx); - self.waiters.push(waiter.clone()); + self.waiters.push(Arc::downgrade(&waiter)); waiter }); @@ -358,7 +363,7 @@ fn spawn_reaper(pool: &Arc>) { /// (where the pool thinks it has more connections than it does). pub(in crate::pool) struct DecrementSizeGuard<'a> { size: &'a AtomicU32, - waiters: &'a SegQueue>, + waiters: &'a SegQueue>, dropped: bool, } @@ -387,7 +392,9 @@ impl Drop for DecrementSizeGuard<'_> { self.dropped = true; self.size.fetch_sub(1, Ordering::SeqCst); if let Some(waker) = self.waiters.pop() { - waker.wake(); + if let Some(waker) = waker.upgrade() { + waker.wake(); + } } } }