From 8f8b4fea1859c5ef7e879b0a3c217520482c1d29 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sat, 20 May 2023 14:19:32 -0700 Subject: [PATCH 1/2] Bump to event-listener v3.0.0 The new version of event-listener comes with several new changes, including less heap allocation and no_std support. This is a breaking change as it makes a handful of futures !Unpin. Signed-off-by: John Nunley --- Cargo.toml | 8 +- src/barrier.rs | 67 +++---- src/mutex.rs | 408 ++++++++++++++++++++++-------------------- src/once_cell.rs | 69 +++---- src/rwlock/futures.rs | 252 ++++++++++++++------------ src/rwlock/raw.rs | 252 +++++++++++++------------- src/semaphore.rs | 37 ++-- tests/common/mod.rs | 8 +- 8 files changed, 565 insertions(+), 536 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 82ab302..6c0fd8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -event-listener = "2.5.1" +event-listener = "2" +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener.git" } +pin-project-lite = "0.2.11" [dev-dependencies] async-channel = "1.5.0" @@ -25,3 +27,7 @@ waker-fn = "1.1.0" [target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies] wasm-bindgen-test = "0.3" + +[patch.crates-io] +async-channel = { git = "https://github.com/smol-rs/async-channel.git", branch = "notgull/evl-3.0" } +event-listener = { git = "https://github.com/smol-rs/event-listener.git" } diff --git a/src/barrier.rs b/src/barrier.rs index e993db2..e01a563 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -82,21 +82,29 @@ impl Barrier { BarrierWait { barrier: self, lock: Some(self.state.lock()), + evl: EventListener::new(&self.event), state: WaitState::Initial, } } } -/// The future returned by [`Barrier::wait()`]. -pub struct BarrierWait<'a> { - /// The barrier to wait on. - barrier: &'a Barrier, +pin_project_lite::pin_project! { + /// The future returned by [`Barrier::wait()`]. + pub struct BarrierWait<'a> { + // The barrier to wait on. + barrier: &'a Barrier, - /// The ongoing mutex lock operation we are blocking on. - lock: Option>, + // The ongoing mutex lock operation we are blocking on. + #[pin] + lock: Option>, - /// The current state of the future. - state: WaitState, + // An event listener for the `barrier.event` event. + #[pin] + evl: EventListener, + + // The current state of the future. + state: WaitState, + } } impl fmt::Debug for BarrierWait<'_> { @@ -110,34 +118,32 @@ enum WaitState { Initial, /// We are waiting for the listener to complete. - Waiting { evl: EventListener, local_gen: u64 }, + Waiting { local_gen: u64 }, /// Waiting to re-acquire the lock to check the state again. - Reacquiring(u64), + Reacquiring { local_gen: u64 }, } impl Future for BarrierWait<'_> { type Output = BarrierWaitResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); loop { match this.state { WaitState::Initial => { // See if the lock is ready yet. - let mut state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx)); - this.lock = None; + let mut state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx)); + this.lock.set(None); let local_gen = state.generation_id; state.count += 1; if state.count < this.barrier.n { // We need to wait for the event. - this.state = WaitState::Waiting { - evl: this.barrier.event.listen(), - local_gen, - }; + this.evl.as_mut().listen(); + *this.state = WaitState::Waiting { local_gen }; } else { // We are the last one. state.count = 0; @@ -147,27 +153,26 @@ impl Future for BarrierWait<'_> { } } - WaitState::Waiting { - ref mut evl, - local_gen, - } => { - ready!(Pin::new(evl).poll(cx)); + WaitState::Waiting { local_gen } => { + ready!(this.evl.as_mut().poll(cx)); // We are now re-acquiring the mutex. - this.lock = Some(this.barrier.state.lock()); - this.state = WaitState::Reacquiring(local_gen); + this.lock.set(Some(this.barrier.state.lock())); + *this.state = WaitState::Reacquiring { + local_gen: *local_gen, + }; } - WaitState::Reacquiring(local_gen) => { + WaitState::Reacquiring { local_gen } => { // Acquire the local state again. - let state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx)); - this.lock = None; + let state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx)); + this.lock.set(None); - if local_gen == state.generation_id && state.count < this.barrier.n { + if *local_gen == state.generation_id && state.count < this.barrier.n { // We need to wait for the event again. - this.state = WaitState::Waiting { - evl: this.barrier.event.listen(), - local_gen, + this.evl.as_mut().listen(); + *this.state = WaitState::Waiting { + local_gen: *local_gen, }; } else { // We are ready, but not the leader. diff --git a/src/mutex.rs b/src/mutex.rs index 37d5ad3..df9e154 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,15 +1,13 @@ use std::borrow::Borrow; use std::cell::UnsafeCell; use std::fmt; -use std::future::Future; use std::marker::PhantomData; -use std::mem; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::process; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::Poll; // Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54. #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] @@ -18,6 +16,7 @@ use std::time::{Duration, Instant}; use std::usize; use event_listener::{Event, EventListener}; +use event_listener_strategy::{easy_wrapper, EventListenerFuture}; /// An async mutex. /// @@ -109,10 +108,10 @@ impl Mutex { /// ``` #[inline] pub fn lock(&self) -> Lock<'_, T> { - Lock { + Lock::_new(LockInner { mutex: self, acquire_slow: None, - } + }) } /// Attempts to acquire the mutex. @@ -197,7 +196,9 @@ impl Mutex { /// ``` #[inline] pub fn lock_arc(self: &Arc) -> LockArc { - LockArc(LockArcInnards::Unpolled(self.clone())) + LockArc::_new(LockArcInnards::Unpolled { + mutex: Some(self.clone()), + }) } /// Attempts to acquire the mutex and clone a reference to it. @@ -259,158 +260,190 @@ impl Default for Mutex { } } -/// The future returned by [`Mutex::lock`]. -pub struct Lock<'a, T: ?Sized> { - /// Reference to the mutex. - mutex: &'a Mutex, +easy_wrapper! { + /// The future returned by [`Mutex::lock`]. + pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>); + pub(crate) wait(); +} + +pin_project_lite::pin_project! { + /// Inner future for acquiring the mutex. + struct LockInner<'a, T: ?Sized> { + // Reference to the mutex. + mutex: &'a Mutex, - /// The future that waits for the mutex to become available. - acquire_slow: Option, T>>, + // The future that waits for the mutex to become available. + #[pin] + acquire_slow: Option, T>>, + } } unsafe impl Send for Lock<'_, T> {} unsafe impl Sync for Lock<'_, T> {} -impl<'a, T: ?Sized> Unpin for Lock<'a, T> {} - -impl fmt::Debug for Lock<'_, T> { +impl fmt::Debug for LockInner<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Lock { .. }") } } -impl<'a, T: ?Sized> Future for Lock<'a, T> { +impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> { type Output = MutexGuard<'a, T>; #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - match this.acquire_slow.as_mut() { + fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>( + self: Pin<&'x mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { + let mut this = self.project(); + + // This may seem weird, but the borrow checker complains otherwise. + if this.acquire_slow.is_none() { + match this.mutex.try_lock() { + Some(guard) => return Poll::Ready(guard), None => { - // Try the fast path before trying to register slowly. - match this.mutex.try_lock() { - Some(guard) => return Poll::Ready(guard), - None => { - this.acquire_slow = Some(AcquireSlow::new(this.mutex)); - } - } - } - - Some(acquire_slow) => { - // Continue registering slowly. - let value = ready!(Pin::new(acquire_slow).poll(cx)); - return Poll::Ready(MutexGuard(value)); + this.acquire_slow.set(Some(AcquireSlow::new(this.mutex))); } } } + + ready!(this + .acquire_slow + .as_pin_mut() + .unwrap() + .poll_with_strategy(strategy, context)); + Poll::Ready(MutexGuard(this.mutex)) } } -/// The future returned by [`Mutex::lock_arc`]. -pub struct LockArc(LockArcInnards); - -enum LockArcInnards { - /// We have not tried to poll the fast path yet. - Unpolled(Arc>), +easy_wrapper! { + /// The future returned by [`Mutex::lock_arc`]. + pub struct LockArc(LockArcInnards => MutexGuardArc); + pub(crate) wait(); +} - /// We are acquiring the mutex through the slow path. - AcquireSlow(AcquireSlow>, T>), +pin_project_lite::pin_project! { + #[project = LockArcInnardsProj] + enum LockArcInnards { + /// We have not tried to poll the fast path yet. + Unpolled { mutex: Option>> }, - /// Empty hole to make taking easier. - Empty, + /// We are acquiring the mutex through the slow path. + AcquireSlow { + #[pin] + inner: AcquireSlow>, T> + }, + } } unsafe impl Send for LockArc {} unsafe impl Sync for LockArc {} -impl Unpin for LockArc {} - -impl fmt::Debug for LockArc { +impl fmt::Debug for LockArcInnards { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("LockArc { .. }") } } -impl Future for LockArc { +impl EventListenerFuture for LockArcInnards { type Output = MutexGuardArc; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - match mem::replace(&mut this.0, LockArcInnards::Empty) { - LockArcInnards::Unpolled(mutex) => { - // Try the fast path before trying to register slowly. - match mutex.try_lock_arc() { - Some(guard) => return Poll::Ready(guard), - None => { - *this = LockArc(LockArcInnards::AcquireSlow(AcquireSlow::new( - mutex.clone(), - ))); - } - } - } + fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( + mut self: Pin<&'a mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { + // Set the inner future if needed. + if let LockArcInnardsProj::Unpolled { mutex } = self.as_mut().project() { + let mutex = mutex.take().expect("mutex taken more than once"); + + // Try the fast path before trying to register slowly. + if let Some(guard) = mutex.try_lock_arc() { + return Poll::Ready(guard); + } - LockArcInnards::AcquireSlow(mut acquire_slow) => { - // Continue registering slowly. - let value = match Pin::new(&mut acquire_slow).poll(cx) { - Poll::Pending => { - *this = LockArc(LockArcInnards::AcquireSlow(acquire_slow)); - return Poll::Pending; - } - Poll::Ready(value) => value, - }; - return Poll::Ready(MutexGuardArc(value)); - } + // Set the inner future to the slow acquire path. + self.as_mut().set(LockArcInnards::AcquireSlow { + inner: AcquireSlow::new(mutex), + }); + } - LockArcInnards::Empty => panic!("future polled after completion"), + // Poll the inner future. + let value = match self.project() { + LockArcInnardsProj::AcquireSlow { inner } => { + ready!(inner.poll_with_strategy(strategy, context)) } - } + _ => unreachable!(), + }; + + Poll::Ready(MutexGuardArc(value)) } } -/// Future for acquiring the mutex slowly. -struct AcquireSlow>, T: ?Sized> { - /// Reference to the mutex. - mutex: Option, +pin_project_lite::pin_project! { + /// Future for acquiring the mutex slowly. + struct AcquireSlow>, T: ?Sized> { + // Reference to the mutex. + mutex: Option, - /// The event listener waiting on the mutex. - listener: Option, + // The event listener waiting on the mutex. + #[pin] + listener: EventListener, - /// The point at which the mutex lock was started. - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] - start: Option, + // The point at which the mutex lock was started. + start: Start, - /// This lock operation is starving. - starved: bool, + // This lock operation is starving. + starved: bool, - /// Capture the `T` lifetime. - _marker: PhantomData, + // Capture the `T` lifetime. + #[pin] + _marker: PhantomData, + } + + impl>> PinnedDrop for AcquireSlow { + fn drop(this: Pin<&mut Self>) { + // Make sure the starvation counter is decremented. + this.take_mutex(); + } + } } -impl> + Unpin, T: ?Sized> Unpin for AcquireSlow {} +/// `pin_project_lite` doesn't support `#[cfg]` yet, so we have to do this manually. +struct Start { + #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + start: Option, +} impl>> AcquireSlow { /// Create a new `AcquireSlow` future. #[cold] fn new(mutex: B) -> Self { + // Create a new instance of the listener. + let listener = { + let mutex = Borrow::>::borrow(&mutex); + EventListener::new(&mutex.lock_ops) + }; + AcquireSlow { mutex: Some(mutex), - listener: None, - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] - start: None, + listener, + start: Start { + #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + start: None, + }, starved: false, _marker: PhantomData, } } /// Take the mutex reference out, decrementing the counter if necessary. - fn take_mutex(&mut self) -> Option { - let mutex = self.mutex.take(); + fn take_mutex(self: Pin<&mut Self>) -> Option { + let this = self.project(); + let mutex = this.mutex.take(); - if self.starved { + if *this.starved { if let Some(mutex) = mutex.as_ref() { // Decrement this counter before we exit. mutex.borrow().state.fetch_sub(2, Ordering::Release); @@ -421,78 +454,74 @@ impl>> AcquireSlow { } } -impl>> Future for AcquireSlow { +impl>> EventListenerFuture for AcquireSlow { type Output = B; #[cold] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; + fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( + mut self: Pin<&'a mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { + let mut this = self.as_mut().project(); #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] - let start = *this.start.get_or_insert_with(Instant::now); - let mutex = this - .mutex - .as_ref() - .expect("future polled after completion") - .borrow(); + let start = *this.start.start.get_or_insert_with(Instant::now); + let mutex = Borrow::>::borrow( + this.mutex.as_ref().expect("future polled after completion"), + ); // Only use this hot loop if we aren't currently starved. - if !this.starved { + if !*this.starved { loop { // Start listening for events. - match &mut this.listener { - None => { - // Start listening for events. - this.listener = Some(mutex.lock_ops.listen()); - - // Try locking if nobody is being starved. - match mutex - .state - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 0 => return Poll::Ready(this.take_mutex().unwrap()), - - // Lock is held and nobody is starved. - 1 => {} - - // Somebody is starved. - _ => break, - } + if !this.listener.is_listening() { + this.listener.as_mut().listen(); + + // Try locking if nobody is being starved. + match mutex + .state + .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 0 => return Poll::Ready(self.take_mutex().unwrap()), + + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => break, } - Some(ref mut listener) => { - // Wait for a notification. - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - - // Try locking if nobody is being starved. - match mutex - .state - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 0 => return Poll::Ready(this.take_mutex().unwrap()), - - // Lock is held and nobody is starved. - 1 => {} - - // Somebody is starved. - _ => { - // Notify the first listener in line because we probably received a - // notification that was meant for a starved task. - mutex.lock_ops.notify(1); - break; - } - } + } else { + ready!(strategy.poll(this.listener.as_mut(), context)); + + // Try locking if nobody is being starved. + match mutex + .state + .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 0 => return Poll::Ready(self.take_mutex().unwrap()), - // If waiting for too long, fall back to a fairer locking strategy that will prevent - // newer lock operations from starving us forever. - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] - if start.elapsed() > Duration::from_micros(500) { + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => { + // Notify the first listener in line because we probably received a + // notification that was meant for a starved task. + mutex.lock_ops.notify(1); break; } } + + // If waiting for too long, fall back to a fairer locking strategy that will prevent + // newer lock operations from starving us forever. + #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + if start.elapsed() > Duration::from_micros(500) { + break; + } } } @@ -503,57 +532,46 @@ impl>> Future for AcquireSlow { } // Indicate that we are now starving and will use a fairer locking strategy. - this.starved = true; + *this.starved = true; } // Fairer locking loop. loop { - match &mut this.listener { - None => { - // Start listening for events. - this.listener = Some(mutex.lock_ops.listen()); - - // Try locking if nobody else is being starved. - match mutex - .state - .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 2 => return Poll::Ready(this.take_mutex().unwrap()), - - // Lock is held by someone. - s if s % 2 == 1 => {} - - // Lock is available. - _ => { - // Be fair: notify the first listener and then go wait in line. - mutex.lock_ops.notify(1); - } + if !this.listener.is_listening() { + // Start listening for events. + this.listener.as_mut().listen(); + + // Try locking if nobody else is being starved. + match mutex + .state + .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 2 => return Poll::Ready(self.take_mutex().unwrap()), + + // Lock is held by someone. + s if s % 2 == 1 => {} + + // Lock is available. + _ => { + // Be fair: notify the first listener and then go wait in line. + mutex.lock_ops.notify(1); } } - Some(ref mut listener) => { - // Wait for a notification. - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - - // Try acquiring the lock without waiting for others. - if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { - return Poll::Ready(this.take_mutex().unwrap()); - } + } else { + // Wait for a notification. + ready!(strategy.poll(this.listener.as_mut(), context)); + + // Try acquiring the lock without waiting for others. + if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { + return Poll::Ready(self.take_mutex().unwrap()); } } } } } -impl>> Drop for AcquireSlow { - fn drop(&mut self) { - // Make sure the starvation counter is decremented. - self.take_mutex(); - } -} - /// A guard that releases the mutex when dropped. #[clippy::has_significant_drop] pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex); diff --git a/src/once_cell.rs b/src/once_cell.rs index 978f57c..42348f8 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use event_listener::{Event, EventListener}; +use event_listener_strategy::{Blocking, NonBlocking, Strategy}; /// The current state of the `OnceCell`. #[derive(Copy, Clone, PartialEq, Eq)] @@ -84,12 +85,15 @@ pub struct OnceCell { /// /// These are the users of get_or_init() and similar functions. active_initializers: Event, + /// Listeners waiting for the cell to be initialized. /// /// These are the users of wait(). passive_waiters: Event, + /// State associated with the cell. state: AtomicUsize, + /// The value of the cell. value: UnsafeCell>, } @@ -268,7 +272,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = self.passive_waiters.listen(); + let listener = EventListener::new(&self.passive_waiters); + pin!(listener); + listener.as_mut().listen(); // Try again. if let Some(value) = self.get() { @@ -320,7 +326,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = self.passive_waiters.listen(); + let listener = EventListener::new(&self.passive_waiters); + pin!(listener); + listener.as_mut().listen(); // Try again. if let Some(value) = self.get() { @@ -372,7 +380,8 @@ impl OnceCell { } // Slow path: initialize the value. - self.initialize_or_wait(closure, &mut NonBlocking).await?; + self.initialize_or_wait(closure, &mut NonBlocking::default()) + .await?; debug_assert!(self.is_initialized()); // SAFETY: We know that the value is initialized, so it is safe to @@ -425,9 +434,10 @@ impl OnceCell { // Slow path: initialize the value. // The futures provided should never block, so we can use `now_or_never`. - now_or_never( - self.initialize_or_wait(move || std::future::ready(closure()), &mut Blocking), - )?; + now_or_never(self.initialize_or_wait( + move || std::future::ready(closure()), + &mut Blocking::default(), + ))?; debug_assert!(self.is_initialized()); // SAFETY: We know that the value is initialized, so it is safe to @@ -572,10 +582,11 @@ impl OnceCell { async fn initialize_or_wait>, F: FnOnce() -> Fut>( &self, closure: F, - strategy: &mut impl Strategy, + strategy: &mut impl for<'a> Strategy<'a>, ) -> Result<(), E> { // The event listener we're currently waiting on. - let mut event_listener = None; + let event_listener = EventListener::new(&self.active_initializers); + pin!(event_listener); let mut closure = Some(closure); @@ -594,12 +605,10 @@ impl OnceCell { // but we do not have the ability to initialize it. // // We need to wait the initialization to complete. - match event_listener.take() { - None => { - event_listener = Some(self.active_initializers.listen()); - } - - Some(evl) => strategy.poll(evl).await, + if event_listener.is_listening() { + strategy.wait(event_listener.as_mut()).await; + } else { + event_listener.as_mut().listen(); } } State::Uninitialized => { @@ -771,35 +780,3 @@ fn now_or_never(f: impl Future) -> T { Poll::Pending => unreachable!("future not ready"), } } - -/// The strategy for polling an `event_listener::EventListener`. -trait Strategy { - /// The future that can be polled to wait on the listener. - type Fut: Future; - - /// Poll the event listener. - fn poll(&mut self, evl: EventListener) -> Self::Fut; -} - -/// The strategy for blocking the current thread on an `EventListener`. -struct Blocking; - -impl Strategy for Blocking { - type Fut = std::future::Ready<()>; - - fn poll(&mut self, evl: EventListener) -> Self::Fut { - evl.wait(); - std::future::ready(()) - } -} - -/// The strategy for polling an `EventListener` in an async context. -struct NonBlocking; - -impl Strategy for NonBlocking { - type Fut = EventListener; - - fn poll(&mut self, evl: EventListener) -> Self::Fut { - evl - } -} diff --git a/src/rwlock/futures.rs b/src/rwlock/futures.rs index a65d7dc..b9aea7b 100644 --- a/src/rwlock/futures.rs +++ b/src/rwlock/futures.rs @@ -11,13 +11,16 @@ use super::{ RwLockUpgradableReadGuardArc, RwLockWriteGuard, RwLockWriteGuardArc, }; -/// The future returned by [`RwLock::read`]. -pub struct Read<'a, T: ?Sized> { - /// Raw read lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawRead<'a>, - - /// Pointer to the value protected by the lock. Covariant in `T`. - pub(super) value: *const T, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::read`]. + pub struct Read<'a, T: ?Sized> { + // Raw read lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawRead<'a>, + + // Pointer to the value protected by the lock. Covariant in `T`. + pub(super) value: *const T, + } } unsafe impl Send for Read<'_, T> {} @@ -30,29 +33,31 @@ impl fmt::Debug for Read<'_, T> { } } -impl Unpin for Read<'_, T> {} - impl<'a, T: ?Sized> Future for Read<'a, T> { type Output = RwLockReadGuard<'a, T>; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockReadGuard { - lock: self.raw.lock, - value: self.value, + lock: this.raw.lock, + value: *this.value, }) } } -/// The future returned by [`RwLock::read_arc`]. -pub struct ReadArc<'a, T> { - /// Raw read lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawRead<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::read_arc`]. + pub struct ReadArc<'a, T> { + // Raw read lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawRead<'a>, - // FIXME: Could be covariant in T - pub(super) lock: &'a Arc>, + // FIXME: Could be covariant in T + pub(super) lock: &'a Arc>, + } } unsafe impl Send for ReadArc<'_, T> {} @@ -65,28 +70,30 @@ impl fmt::Debug for ReadArc<'_, T> { } } -impl Unpin for ReadArc<'_, T> {} - impl<'a, T> Future for ReadArc<'a, T> { type Output = RwLockReadGuardArc; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); // SAFETY: we just acquired a read lock - Poll::Ready(unsafe { RwLockReadGuardArc::from_arc(self.lock.clone()) }) + Poll::Ready(unsafe { RwLockReadGuardArc::from_arc(this.lock.clone()) }) } } -/// The future returned by [`RwLock::upgradable_read`]. -pub struct UpgradableRead<'a, T: ?Sized> { - /// Raw upgradable read lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawUpgradableRead<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::upgradable_read`]. + pub struct UpgradableRead<'a, T: ?Sized> { + // Raw upgradable read lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawUpgradableRead<'a>, - /// Pointer to the value protected by the lock. Invariant in `T` - /// as the upgradable lock could provide write access. - pub(super) value: *mut T, + // Pointer to the value protected by the lock. Invariant in `T` + // as the upgradable lock could provide write access. + pub(super) value: *mut T, + } } unsafe impl Send for UpgradableRead<'_, T> {} @@ -99,28 +106,30 @@ impl fmt::Debug for UpgradableRead<'_, T> { } } -impl Unpin for UpgradableRead<'_, T> {} - impl<'a, T: ?Sized> Future for UpgradableRead<'a, T> { type Output = RwLockUpgradableReadGuard<'a, T>; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockUpgradableReadGuard { - lock: self.raw.lock, - value: self.value, + lock: this.raw.lock, + value: *this.value, }) } } -/// The future returned by [`RwLock::upgradable_read_arc`]. -pub struct UpgradableReadArc<'a, T: ?Sized> { - /// Raw upgradable read lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawUpgradableRead<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::upgradable_read_arc`]. + pub struct UpgradableReadArc<'a, T: ?Sized> { + // Raw upgradable read lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawUpgradableRead<'a>, - pub(super) lock: &'a Arc>, + pub(super) lock: &'a Arc>, + } } unsafe impl Send for UpgradableReadArc<'_, T> {} @@ -133,27 +142,29 @@ impl fmt::Debug for UpgradableReadArc<'_, T> { } } -impl Unpin for UpgradableReadArc<'_, T> {} - impl<'a, T: ?Sized> Future for UpgradableReadArc<'a, T> { type Output = RwLockUpgradableReadGuardArc; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockUpgradableReadGuardArc { - lock: self.lock.clone(), + lock: this.lock.clone(), }) } } -/// The future returned by [`RwLock::write`]. -pub struct Write<'a, T: ?Sized> { - /// Raw write lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawWrite<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::write`]. + pub struct Write<'a, T: ?Sized> { + // Raw write lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawWrite<'a>, - /// Pointer to the value protected by the lock. Invariant in `T`. - pub(super) value: *mut T, + // Pointer to the value protected by the lock. Invariant in `T`. + pub(super) value: *mut T, + } } unsafe impl Send for Write<'_, T> {} @@ -166,28 +177,30 @@ impl fmt::Debug for Write<'_, T> { } } -impl Unpin for Write<'_, T> {} - impl<'a, T: ?Sized> Future for Write<'a, T> { type Output = RwLockWriteGuard<'a, T>; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockWriteGuard { - lock: self.raw.lock, - value: self.value, + lock: this.raw.lock, + value: *this.value, }) } } -/// The future returned by [`RwLock::write_arc`]. -pub struct WriteArc<'a, T: ?Sized> { - /// Raw write lock acquisition future, doesn't depend on `T`. - pub(super) raw: RawWrite<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLock::write_arc`]. + pub struct WriteArc<'a, T: ?Sized> { + // Raw write lock acquisition future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawWrite<'a>, - pub(super) lock: &'a Arc>, + pub(super) lock: &'a Arc>, + } } unsafe impl Send for WriteArc<'_, T> {} @@ -200,28 +213,30 @@ impl fmt::Debug for WriteArc<'_, T> { } } -impl Unpin for WriteArc<'_, T> {} - impl<'a, T: ?Sized> Future for WriteArc<'a, T> { type Output = RwLockWriteGuardArc; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockWriteGuardArc { - lock: self.lock.clone(), + lock: this.lock.clone(), }) } } -/// The future returned by [`RwLockUpgradableReadGuard::upgrade`]. -pub struct Upgrade<'a, T: ?Sized> { - /// Raw read lock upgrade future, doesn't depend on `T`. - pub(super) raw: RawUpgrade<'a>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLockUpgradableReadGuard::upgrade`]. + pub struct Upgrade<'a, T: ?Sized> { + // Raw read lock upgrade future, doesn't depend on `T`. + #[pin] + pub(super) raw: RawUpgrade<'a>, - /// Pointer to the value protected by the lock. Invariant in `T`. - pub(super) value: *mut T, + // Pointer to the value protected by the lock. Invariant in `T`. + pub(super) value: *mut T, + } } unsafe impl Send for Upgrade<'_, T> {} @@ -234,38 +249,56 @@ impl fmt::Debug for Upgrade<'_, T> { } } -impl Unpin for Upgrade<'_, T> {} - impl<'a, T: ?Sized> Future for Upgrade<'a, T> { type Output = RwLockWriteGuard<'a, T>; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let lock = ready!(Pin::new(&mut self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let lock = ready!(this.raw.as_mut().poll(cx)); Poll::Ready(RwLockWriteGuard { lock, - value: self.value, + value: *this.value, }) } } -/// The future returned by [`RwLockUpgradableReadGuardArc::upgrade`]. -pub struct UpgradeArc { - /// Raw read lock upgrade future, doesn't depend on `T`. - /// `'static` is a lie, this field is actually referencing the - /// `Arc` data. But since this struct also stores said `Arc`, we know - /// this value will be alive as long as the struct is. - /// - /// Yes, one field of the `ArcUpgrade` struct is referencing another. - /// Such self-references are usually not sound without pinning. - /// However, in this case, there is an indirection via the heap; - /// moving the `ArcUpgrade` won't move the heap allocation of the `Arc`, - /// so the reference inside `RawUpgrade` isn't invalidated. - pub(super) raw: ManuallyDrop>, - - /// Pointer to the value protected by the lock. Invariant in `T`. - pub(super) lock: ManuallyDrop>>, +pin_project_lite::pin_project! { + /// The future returned by [`RwLockUpgradableReadGuardArc::upgrade`]. + pub struct UpgradeArc { + // Raw read lock upgrade future, doesn't depend on `T`. + // `'static` is a lie, this field is actually referencing the + // `Arc` data. But since this struct also stores said `Arc`, we know + // this value will be alive as long as the struct is. + // + // Yes, one field of the `ArcUpgrade` struct is referencing another. + // Such self-references are usually not sound without pinning. + // However, in this case, there is an indirection via the heap; + // moving the `ArcUpgrade` won't move the heap allocation of the `Arc`, + // so the reference inside `RawUpgrade` isn't invalidated. + #[pin] + pub(super) raw: ManuallyDrop>, + + // Pointer to the value protected by the lock. Invariant in `T`. + pub(super) lock: ManuallyDrop>>, + } + + impl PinnedDrop for UpgradeArc { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + if !this.raw.is_ready() { + // SAFETY: we drop the `Arc` (decrementing the reference count) + // only if this future was cancelled before returning an + // upgraded lock. + unsafe { + // SAFETY: The drop impl for raw assumes that it is pinned. + ManuallyDrop::drop(this.raw.get_unchecked_mut()); + ManuallyDrop::drop(this.lock); + }; + } + } + } } impl fmt::Debug for UpgradeArc { @@ -275,32 +308,19 @@ impl fmt::Debug for UpgradeArc { } } -impl Unpin for UpgradeArc {} - impl Future for UpgradeArc { type Output = RwLockWriteGuardArc; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(Pin::new(&mut *self.raw).poll(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + unsafe { + // SAFETY: Practically, this is a pin projection. + ready!(Pin::new_unchecked(&mut **this.raw.get_unchecked_mut()).poll(cx)); + } Poll::Ready(RwLockWriteGuardArc { - lock: unsafe { ManuallyDrop::take(&mut self.lock) }, + lock: unsafe { ManuallyDrop::take(this.lock) }, }) } } - -impl Drop for UpgradeArc { - #[inline] - fn drop(&mut self) { - if !self.raw.is_ready() { - // SAFETY: we drop the `Arc` (decrementing the reference count) - // only if this future was cancelled before returning an - // upgraded lock. - unsafe { - ManuallyDrop::drop(&mut self.raw); - ManuallyDrop::drop(&mut self.lock); - }; - } - } -} diff --git a/src/rwlock/raw.rs b/src/rwlock/raw.rs index 6dd4731..b5f042a 100644 --- a/src/rwlock/raw.rs +++ b/src/rwlock/raw.rs @@ -88,7 +88,7 @@ impl RawRwLock { RawRead { lock: self, state: self.state.load(Ordering::Acquire), - listener: None, + listener: EventListener::new(&self.no_writer), } } @@ -163,7 +163,10 @@ impl RawRwLock { pub(super) fn write(&self) -> RawWrite<'_> { RawWrite { lock: self, - state: WriteState::Acquiring(self.mutex.lock()), + no_readers: EventListener::new(&self.no_readers), + state: WriteState::Acquiring { + lock: self.mutex.lock(), + }, } } @@ -192,7 +195,7 @@ impl RawRwLock { RawUpgrade { lock: Some(self), - listener: None, + listener: EventListener::new(&self.no_readers), } } @@ -280,93 +283,95 @@ impl RawRwLock { } } -/// The future returned by [`RawRwLock::read`]. +pin_project_lite::pin_project! { + /// The future returned by [`RawRwLock::read`]. -pub(super) struct RawRead<'a> { - /// The lock that is being acquired. - pub(super) lock: &'a RawRwLock, + pub(super) struct RawRead<'a> { + // The lock that is being acquired. + pub(super) lock: &'a RawRwLock, - /// The last-observed state of the lock. - state: usize, + // The last-observed state of the lock. + state: usize, - /// The listener for the "no writers" event. - listener: Option, + // The listener for the "no writers" event. + #[pin] + listener: EventListener, + } } impl<'a> Future for RawRead<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let this = self.get_mut(); + let mut this = self.project(); loop { - if this.state & WRITER_BIT == 0 { + if *this.state & WRITER_BIT == 0 { // Make sure the number of readers doesn't overflow. - if this.state > std::isize::MAX as usize { + if *this.state > std::isize::MAX as usize { process::abort(); } // If nobody is holding a write lock or attempting to acquire it, increment the // number of readers. match this.lock.state.compare_exchange( - this.state, - this.state + ONE_READER, + *this.state, + *this.state + ONE_READER, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => return Poll::Ready(()), - Err(s) => this.state = s, + Err(s) => *this.state = s, } } else { // Start listening for "no writer" events. - let load_ordering = match &mut this.listener { - None => { - this.listener = Some(this.lock.no_writer.listen()); - - // Make sure there really is no writer. - Ordering::SeqCst - } + let load_ordering = if !this.listener.is_listening() { + this.listener.as_mut().listen(); - Some(ref mut listener) => { - // Wait for the writer to finish. - ready!(Pin::new(listener).poll(cx)); - this.listener = None; + // Make sure there really is no writer. + Ordering::SeqCst + } else { + // Wait for the writer to finish. + ready!(this.listener.as_mut().poll(cx)); - // Notify the next reader waiting in list. - this.lock.no_writer.notify(1); + // Notify the next reader waiting in list. + this.lock.no_writer.notify(1); - // Check the state again. - Ordering::Acquire - } + // Check the state again. + Ordering::Acquire }; // Reload the state. - this.state = this.lock.state.load(load_ordering); + *this.state = this.lock.state.load(load_ordering); } } } } -/// The future returned by [`RawRwLock::upgradable_read`]. +pin_project_lite::pin_project! { + /// The future returned by [`RawRwLock::upgradable_read`]. -pub(super) struct RawUpgradableRead<'a> { - /// The lock that is being acquired. - pub(super) lock: &'a RawRwLock, + pub(super) struct RawUpgradableRead<'a> { + // The lock that is being acquired. + pub(super) lock: &'a RawRwLock, - /// The mutex we are trying to acquire. - acquire: Lock<'a, ()>, + // The mutex we are trying to acquire. + #[pin] + acquire: Lock<'a, ()>, + } } impl<'a> Future for RawUpgradableRead<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let this = self.get_mut(); + let this = self.project(); // Acquire the mutex. - let mutex_guard = ready!(Pin::new(&mut this.acquire).poll(cx)); + let mutex_guard = ready!(this.acquire.poll(cx)); forget(mutex_guard); + // Load the current state. let mut state = this.lock.state.load(Ordering::Acquire); // Make sure the number of readers doesn't overflow. @@ -391,41 +396,61 @@ impl<'a> Future for RawUpgradableRead<'a> { } } -/// The future returned by [`RawRwLock::write`]. +pin_project_lite::pin_project! { + /// The future returned by [`RawRwLock::write`]. + + pub(super) struct RawWrite<'a> { + // The lock that is being acquired. + pub(super) lock: &'a RawRwLock, -pub(super) struct RawWrite<'a> { - /// The lock that is being acquired. - pub(super) lock: &'a RawRwLock, + // Our listener for the "no readers" event. + #[pin] + no_readers: EventListener, + + // Current state fof this future. + #[pin] + state: WriteState<'a>, + } - /// Current state fof this future. - state: WriteState<'a>, + impl PinnedDrop for RawWrite<'_> { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + + if matches!(this.state.project(), WriteStateProj::WaitingReaders) { + // Safety: we hold a write lock, more or less. + unsafe { + this.lock.write_unlock(); + } + } + } + } } -enum WriteState<'a> { - /// We are currently acquiring the inner mutex. - Acquiring(Lock<'a, ()>), +pin_project_lite::pin_project! { + #[project = WriteStateProj] + enum WriteState<'a> { + // We are currently acquiring the inner mutex. + Acquiring { #[pin] lock: Lock<'a, ()> }, - /// We are currently waiting for readers to finish. - WaitingReaders { - /// The listener for the "no readers" event. - listener: Option, - }, + // We are currently waiting for readers to finish. + WaitingReaders, - /// The future has completed. - Acquired, + // The future has completed. + Acquired, + } } impl<'a> Future for RawWrite<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let this = self.get_mut(); + let mut this = self.project(); loop { - match &mut this.state { - WriteState::Acquiring(lock) => { + match this.state.as_mut().project() { + WriteStateProj::Acquiring { lock } => { // First grab the mutex. - let mutex_guard = ready!(Pin::new(lock).poll(cx)); + let mutex_guard = ready!(lock.poll(cx)); forget(mutex_guard); // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled. @@ -433,18 +458,17 @@ impl<'a> Future for RawWrite<'a> { // If we just acquired the lock, return. if new_state == WRITER_BIT { - this.state = WriteState::Acquired; + this.state.as_mut().set(WriteState::Acquired); return Poll::Ready(()); } // Start waiting for the readers to finish. - this.state = WriteState::WaitingReaders { - listener: Some(this.lock.no_readers.listen()), - }; + this.no_readers.as_mut().listen(); + this.state.as_mut().set(WriteState::WaitingReaders); } - WriteState::WaitingReaders { ref mut listener } => { - let load_ordering = if listener.is_some() { + WriteStateProj::WaitingReaders => { + let load_ordering = if this.no_readers.is_listening() { Ordering::Acquire } else { Ordering::SeqCst @@ -453,60 +477,60 @@ impl<'a> Future for RawWrite<'a> { // Check the state again. if this.lock.state.load(load_ordering) == WRITER_BIT { // We are the only ones holding the lock, return `Ready`. - this.state = WriteState::Acquired; + this.state.as_mut().set(WriteState::Acquired); return Poll::Ready(()); } // Wait for the readers to finish. - match listener { - None => { - // Register a listener. - *listener = Some(this.lock.no_readers.listen()); - } - - Some(ref mut evl) => { - // Wait for the readers to finish. - ready!(Pin::new(evl).poll(cx)); - *listener = None; - } + if !this.no_readers.is_listening() { + // Register a listener. + this.no_readers.as_mut().listen(); + } else { + // Wait for the readers to finish. + ready!(this.no_readers.as_mut().poll(cx)); }; } - WriteState::Acquired => panic!("Write lock already acquired"), + WriteStateProj::Acquired => panic!("Write lock already acquired"), } } } } -impl<'a> Drop for RawWrite<'a> { - fn drop(&mut self) { - if matches!(self.state, WriteState::WaitingReaders { .. }) { - // Safety: we hold a write lock, more or less. - unsafe { - self.lock.write_unlock(); - } - } - } -} +pin_project_lite::pin_project! { + /// The future returned by [`RawRwLock::upgrade`]. -/// The future returned by [`RawRwLock::upgrade`]. + pub(super) struct RawUpgrade<'a> { + lock: Option<&'a RawRwLock>, -pub(super) struct RawUpgrade<'a> { - lock: Option<&'a RawRwLock>, + // The event listener we are waiting on. + #[pin] + listener: EventListener, + } - /// The event listener we are waiting on. - listener: Option, + impl PinnedDrop for RawUpgrade<'_> { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + if let Some(lock) = this.lock { + // SAFETY: we are dropping the future that would give us a write lock, + // so we don't need said lock anymore. + unsafe { + lock.write_unlock(); + } + } + } + } } impl<'a> Future for RawUpgrade<'a> { type Output = &'a RawRwLock; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'a RawRwLock> { - let this = self.get_mut(); + let mut this = self.project(); let lock = this.lock.expect("cannot poll future after completion"); // If there are readers, we need to wait for them to finish. loop { - let load_ordering = if this.listener.is_some() { + let load_ordering = if this.listener.is_listening() { Ordering::Acquire } else { Ordering::SeqCst @@ -519,18 +543,13 @@ impl<'a> Future for RawUpgrade<'a> { } // If there are readers, wait for them to finish. - match &mut this.listener { - None => { - // Start listening for "no readers" events. - this.listener = Some(lock.no_readers.listen()); - } - - Some(ref mut listener) => { - // Wait for the readers to finish. - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - } - } + if !this.listener.is_listening() { + // Start listening for "no readers" events. + this.listener.as_mut().listen(); + } else { + // Wait for the readers to finish. + ready!(this.listener.as_mut().poll(cx)); + }; } // We are done. @@ -538,19 +557,6 @@ impl<'a> Future for RawUpgrade<'a> { } } -impl<'a> Drop for RawUpgrade<'a> { - #[inline] - fn drop(&mut self) { - if let Some(lock) = self.lock { - // SAFETY: we are dropping the future that would give us a write lock, - // so we don't need said lock anymore. - unsafe { - lock.write_unlock(); - } - } - } -} - impl<'a> RawUpgrade<'a> { /// Whether the future returned `Poll::Ready(..)` at some point. #[inline] diff --git a/src/semaphore.rs b/src/semaphore.rs index d036305..6d0690e 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -86,7 +86,7 @@ impl Semaphore { pub fn acquire(&self) -> Acquire<'_> { Acquire { semaphore: self, - listener: None, + listener: EventListener::new(&self.event), } } @@ -176,13 +176,16 @@ impl Semaphore { } } -/// The future returned by [`Semaphore::acquire`]. -pub struct Acquire<'a> { - /// The semaphore being acquired. - semaphore: &'a Semaphore, +pin_project_lite::pin_project! { + /// The future returned by [`Semaphore::acquire`]. + pub struct Acquire<'a> { + // The semaphore being acquired. + semaphore: &'a Semaphore, - /// The listener waiting on the semaphore. - listener: Option, + // The listener waiting on the semaphore. + #[pin] + listener: EventListener, + } } impl fmt::Debug for Acquire<'_> { @@ -191,27 +194,21 @@ impl fmt::Debug for Acquire<'_> { } } -impl Unpin for Acquire<'_> {} - impl<'a> Future for Acquire<'a> { type Output = SemaphoreGuard<'a>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); loop { match this.semaphore.try_acquire() { Some(guard) => return Poll::Ready(guard), None => { // Wait on the listener. - match &mut this.listener { - None => { - this.listener = Some(this.semaphore.event.listen()); - } - Some(ref mut listener) => { - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - } + if !this.listener.is_listening() { + this.listener.as_mut().listen(); + } else { + ready!(this.listener.as_mut().poll(cx)); } } } @@ -225,7 +222,9 @@ pub struct AcquireArc { semaphore: Arc, /// The listener waiting on the semaphore. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, } impl fmt::Debug for AcquireArc { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2a7fd5b..51b6783 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -5,10 +5,7 @@ use std::task::Context; use futures_lite::prelude::*; use waker_fn::waker_fn; -pub fn check_yields_when_contended( - contending_guard: G, - mut acquire_future: impl Future + Unpin, -) { +pub fn check_yields_when_contended(contending_guard: G, acquire_future: impl Future) { let was_woken = Arc::new(AtomicBool::new(false)); let waker = { let was_woken = Arc::clone(&was_woken); @@ -16,7 +13,8 @@ pub fn check_yields_when_contended( }; let mut cx = Context::from_waker(&waker); - assert!(acquire_future.poll(&mut cx).is_pending()); + futures_lite::pin!(acquire_future); + assert!(acquire_future.as_mut().poll(&mut cx).is_pending()); drop(contending_guard); assert!(was_woken.load(Ordering::SeqCst)); assert!(acquire_future.poll(&mut cx).is_ready()); From 3a82590a69a6ec5ba90c997734f4c438c97b6c8e Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 16 Aug 2023 22:10:58 -0700 Subject: [PATCH 2/2] Add a default "std" feature for no_std usage This is a breaking change for no-default-features users. Signed-off-by: John Nunley --- .github/workflows/ci.yml | 11 ++++++--- Cargo.toml | 14 +++++------ src/barrier.rs | 10 ++++---- src/lib.rs | 27 +++++++++++++++++++- src/mutex.rs | 40 +++++++++++++++--------------- src/once_cell.rs | 33 +++++++++++++++---------- src/rwlock.rs | 13 +++++----- src/rwlock/futures.rs | 13 +++++----- src/rwlock/raw.rs | 27 ++++++++++---------- src/semaphore.rs | 53 +++++++++++++++++----------------------- 10 files changed, 136 insertions(+), 105 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 96f76bc..4224ab6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,13 +38,18 @@ jobs: - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: rustup target add wasm32-unknown-unknown - - name: Install WASM Test Tools - uses: taiki-e/install-action@wasm-pack + - name: Install WASM Test Tools and Cargo Hack + uses: taiki-e/install-action@v2 + with: + tool: cargo-hack,wasm-pack - name: Run cargo check run: cargo check --all --all-features --all-targets + - run: cargo check --all --no-default-features - name: Run cargo check (without dev-dependencies to catch missing feature flags) if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep + - run: rustup target add thumbv7m-none-eabi + - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps - name: Run cargo check for WASM run: cargo check --all --all-features --all-targets --target wasm32-unknown-unknown - name: Test WASM @@ -57,7 +62,7 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.48'] + rust: ['1.59'] steps: - uses: actions/checkout@v3 - name: Install Rust diff --git a/Cargo.toml b/Cargo.toml index 6c0fd8f..38f9c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-lock" version = "2.7.0" authors = ["Stjepan Glavina "] edition = "2018" -rust-version = "1.48" +rust-version = "1.59" description = "Async synchronization primitives" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-lock" @@ -15,10 +15,14 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -event-listener = "2" -event-listener-strategy = { git = "https://github.com/smol-rs/event-listener.git" } +event-listener = { version = "3.0.0", default-features = false } +event-listener-strategy = { version = "0.2.0", default-features = false } pin-project-lite = "0.2.11" +[features] +default = ["std"] +std = ["event-listener/std", "event-listener-strategy/std"] + [dev-dependencies] async-channel = "1.5.0" fastrand = "2.0.0" @@ -27,7 +31,3 @@ waker-fn = "1.1.0" [target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies] wasm-bindgen-test = "0.3" - -[patch.crates-io] -async-channel = { git = "https://github.com/smol-rs/async-channel.git", branch = "notgull/evl-3.0" } -event-listener = { git = "https://github.com/smol-rs/event-listener.git" } diff --git a/src/barrier.rs b/src/barrier.rs index e01a563..9259603 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -1,9 +1,9 @@ use event_listener::{Event, EventListener}; -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; use crate::futures::Lock; use crate::Mutex; @@ -148,7 +148,7 @@ impl Future for BarrierWait<'_> { // We are the last one. state.count = 0; state.generation_id = state.generation_id.wrapping_add(1); - this.barrier.event.notify(std::usize::MAX); + this.barrier.event.notify(core::usize::MAX); return Poll::Ready(BarrierWaitResult { is_leader: true }); } } diff --git a/src/lib.rs b/src/lib.rs index 04ab51a..244d421 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ //! * [`RwLock`] - a reader-writer lock, allowing any number of readers or a single writer. //! * [`Semaphore`] - limits the number of concurrent operations. +#![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc( html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" @@ -15,6 +16,8 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +extern crate alloc; + /// Simple macro to extract the value of `Poll` or return `Pending`. /// /// TODO: Drop in favor of `core::task::ready`, once MSRV is bumped to 1.64. @@ -38,7 +41,7 @@ macro_rules! pin { let mut $x = $x; #[allow(unused_mut)] let mut $x = unsafe { - std::pin::Pin::new_unchecked(&mut $x) + core::pin::Pin::new_unchecked(&mut $x) }; )* } @@ -69,3 +72,25 @@ pub mod futures { }; pub use crate::semaphore::{Acquire, AcquireArc}; } + +#[cold] +fn abort() -> ! { + // For no_std targets, panicking while panicking is defined as an abort + #[cfg(not(feature = "std"))] + { + struct Bomb; + + impl Drop for Bomb { + fn drop(&mut self) { + panic!("Panicking while panicking to abort") + } + } + + let _bomb = Bomb; + panic!("Panicking while panicking to abort") + } + + // For libstd targets, abort using std::process::abort + #[cfg(feature = "std")] + std::process::abort() +} diff --git a/src/mutex.rs b/src/mutex.rs index df9e154..a463dae 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,20 +1,18 @@ -use std::borrow::Borrow; -use std::cell::UnsafeCell; -use std::fmt; -use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::process; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::Poll; - -// Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54. -#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] +use core::borrow::Borrow; +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::Poll; +use core::usize; + +use alloc::sync::Arc; + +#[cfg(all(feature = "std", not(target_family = "wasm")))] use std::time::{Duration, Instant}; -use std::usize; - use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture}; @@ -263,6 +261,7 @@ impl Default for Mutex { easy_wrapper! { /// The future returned by [`Mutex::lock`]. pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>); + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub(crate) wait(); } @@ -320,6 +319,7 @@ impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> { easy_wrapper! { /// The future returned by [`Mutex::lock_arc`]. pub struct LockArc(LockArcInnards => MutexGuardArc); + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub(crate) wait(); } @@ -412,7 +412,7 @@ pin_project_lite::pin_project! { /// `pin_project_lite` doesn't support `#[cfg]` yet, so we have to do this manually. struct Start { - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + #[cfg(all(feature = "std", not(target_family = "wasm")))] start: Option, } @@ -430,7 +430,7 @@ impl>> AcquireSlow { mutex: Some(mutex), listener, start: Start { - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + #[cfg(all(feature = "std", not(target_family = "wasm")))] start: None, }, starved: false, @@ -464,7 +464,7 @@ impl>> EventListenerFuture for AcquireSlow context: &mut S::Context, ) -> Poll { let mut this = self.as_mut().project(); - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + #[cfg(all(feature = "std", not(target_family = "wasm")))] let start = *this.start.start.get_or_insert_with(Instant::now); let mutex = Borrow::>::borrow( this.mutex.as_ref().expect("future polled after completion"), @@ -518,7 +518,7 @@ impl>> EventListenerFuture for AcquireSlow // If waiting for too long, fall back to a fairer locking strategy that will prevent // newer lock operations from starving us forever. - #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] + #[cfg(all(feature = "std", not(target_family = "wasm")))] if start.elapsed() > Duration::from_micros(500) { break; } @@ -528,7 +528,7 @@ impl>> EventListenerFuture for AcquireSlow // Increment the number of starved lock operations. if mutex.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 { // In case of potential overflow, abort. - process::abort(); + crate::abort(); } // Indicate that we are now starving and will use a fairer locking strategy. diff --git a/src/once_cell.rs b/src/once_cell.rs index 42348f8..8fcb7cb 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -1,14 +1,16 @@ -use std::cell::UnsafeCell; -use std::convert::Infallible; -use std::fmt; -use std::future::Future; -use std::mem::{forget, MaybeUninit}; -use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use core::cell::UnsafeCell; +use core::convert::Infallible; +use core::fmt; +use core::future::Future; +use core::mem::{forget, MaybeUninit}; +use core::ptr; +use core::sync::atomic::{AtomicUsize, Ordering}; + +#[cfg(all(feature = "std", not(target_family = "wasm")))] +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use event_listener::{Event, EventListener}; -use event_listener_strategy::{Blocking, NonBlocking, Strategy}; +use event_listener_strategy::{NonBlocking, Strategy}; /// The current state of the `OnceCell`. #[derive(Copy, Clone, PartialEq, Eq)] @@ -319,6 +321,7 @@ impl OnceCell { /// /// assert_eq!(cell.wait_blocking(), &1); /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn wait_blocking(&self) -> &T { // Fast path: see if the value is already initialized. if let Some(value) = self.get() { @@ -423,6 +426,7 @@ impl OnceCell { /// /// assert_eq!(result.unwrap(), &1); /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn get_or_try_init_blocking( &self, closure: impl FnOnce() -> Result, @@ -435,8 +439,8 @@ impl OnceCell { // Slow path: initialize the value. // The futures provided should never block, so we can use `now_or_never`. now_or_never(self.initialize_or_wait( - move || std::future::ready(closure()), - &mut Blocking::default(), + move || core::future::ready(closure()), + &mut event_listener_strategy::Blocking::default(), ))?; debug_assert!(self.is_initialized()); @@ -497,6 +501,7 @@ impl OnceCell { /// assert_eq!(cell.get_or_init_blocking(|| 1), &1); /// assert_eq!(cell.get_or_init_blocking(|| 2), &1); /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn get_or_init_blocking(&self, closure: impl FnOnce() -> T + Unpin) -> &T { match self.get_or_try_init_blocking(move || { let result: Result = Ok(closure()); @@ -563,6 +568,7 @@ impl OnceCell { /// assert_eq!(cell.get(), Some(&1)); /// assert_eq!(cell.set_blocking(2), Err(2)); /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn set_blocking(&self, value: T) -> Result<&T, T> { let mut value = Some(value); self.get_or_init_blocking(|| value.take().unwrap()); @@ -643,8 +649,8 @@ impl OnceCell { .store(State::Initialized.into(), Ordering::Release); // Notify the listeners that the value is initialized. - self.active_initializers.notify_additional(std::usize::MAX); - self.passive_waiters.notify_additional(std::usize::MAX); + self.active_initializers.notify_additional(core::usize::MAX); + self.passive_waiters.notify_additional(core::usize::MAX); return Ok(()); } @@ -758,6 +764,7 @@ impl Drop for OnceCell { } /// Either return the result of a future now, or panic. +#[cfg(all(feature = "std", not(target_family = "wasm")))] fn now_or_never(f: impl Future) -> T { const NOOP_WAKER: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); diff --git a/src/rwlock.rs b/src/rwlock.rs index 49222f6..ea6e000 100644 --- a/src/rwlock.rs +++ b/src/rwlock.rs @@ -1,9 +1,10 @@ -use std::cell::UnsafeCell; -use std::fmt; -use std::mem::{self, ManuallyDrop}; -use std::ops::{Deref, DerefMut}; -use std::ptr::{self, NonNull}; -use std::sync::Arc; +use core::cell::UnsafeCell; +use core::fmt; +use core::mem::{self, ManuallyDrop}; +use core::ops::{Deref, DerefMut}; +use core::ptr::{self, NonNull}; + +use alloc::sync::Arc; pub(crate) mod futures; mod raw; diff --git a/src/rwlock/futures.rs b/src/rwlock/futures.rs index b9aea7b..613d30a 100644 --- a/src/rwlock/futures.rs +++ b/src/rwlock/futures.rs @@ -1,9 +1,10 @@ -use std::fmt; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; +use core::fmt; +use core::future::Future; +use core::mem::ManuallyDrop; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use alloc::sync::Arc; use super::raw::{RawRead, RawUpgradableRead, RawUpgrade, RawWrite}; use super::{ diff --git a/src/rwlock/raw.rs b/src/rwlock/raw.rs index b5f042a..816e491 100644 --- a/src/rwlock/raw.rs +++ b/src/rwlock/raw.rs @@ -6,12 +6,11 @@ //! the locking code only once, and also lets us make //! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`. -use std::future::Future; -use std::mem::forget; -use std::pin::Pin; -use std::process; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll}; +use core::future::Future; +use core::mem::forget; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll}; use event_listener::{Event, EventListener}; @@ -66,8 +65,8 @@ impl RawRwLock { } // Make sure the number of readers doesn't overflow. - if state > std::isize::MAX as usize { - process::abort(); + if state > core::isize::MAX as usize { + crate::abort(); } // Increment the number of readers. @@ -107,8 +106,8 @@ impl RawRwLock { let mut state = self.state.load(Ordering::Acquire); // Make sure the number of readers doesn't overflow. - if state > std::isize::MAX as usize { - process::abort(); + if state > core::isize::MAX as usize { + crate::abort(); } // Increment the number of readers. @@ -308,8 +307,8 @@ impl<'a> Future for RawRead<'a> { loop { if *this.state & WRITER_BIT == 0 { // Make sure the number of readers doesn't overflow. - if *this.state > std::isize::MAX as usize { - process::abort(); + if *this.state > core::isize::MAX as usize { + crate::abort(); } // If nobody is holding a write lock or attempting to acquire it, increment the @@ -375,8 +374,8 @@ impl<'a> Future for RawUpgradableRead<'a> { let mut state = this.lock.state.load(Ordering::Acquire); // Make sure the number of readers doesn't overflow. - if state > std::isize::MAX as usize { - process::abort(); + if state > core::isize::MAX as usize { + crate::abort(); } // Increment the number of readers. diff --git a/src/semaphore.rs b/src/semaphore.rs index 6d0690e..dedf531 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -1,9 +1,10 @@ -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll}; + +use alloc::sync::Arc; use event_listener::{Event, EventListener}; @@ -147,7 +148,7 @@ impl Semaphore { pub fn acquire_arc(self: &Arc) -> AcquireArc { AcquireArc { semaphore: self.clone(), - listener: None, + listener: EventListener::new(&self.event), } } @@ -216,15 +217,16 @@ impl<'a> Future for Acquire<'a> { } } -/// The future returned by [`Semaphore::acquire_arc`]. -pub struct AcquireArc { - /// The semaphore being acquired. - semaphore: Arc, +pin_project_lite::pin_project! { + /// The future returned by [`Semaphore::acquire_arc`]. + pub struct AcquireArc { + // The semaphore being acquired. + semaphore: Arc, - /// The listener waiting on the semaphore. - /// - /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. - listener: Option>>, + // The listener waiting on the semaphore. + #[pin] + listener: EventListener, + } } impl fmt::Debug for AcquireArc { @@ -233,30 +235,21 @@ impl fmt::Debug for AcquireArc { } } -impl Unpin for AcquireArc {} - impl Future for AcquireArc { type Output = SemaphoreGuardArc; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); loop { match this.semaphore.try_acquire_arc() { - Some(guard) => { - this.listener = None; - return Poll::Ready(guard); - } + Some(guard) => return Poll::Ready(guard), None => { // Wait on the listener. - match &mut this.listener { - None => { - this.listener = Some(this.semaphore.event.listen()); - } - Some(ref mut listener) => { - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - } + if !this.listener.is_listening() { + this.listener.as_mut().listen(); + } else { + ready!(this.listener.as_mut().poll(cx)); } } }