Skip to content

Commit

Permalink
Add blocking equivalents to non-blocking functions
Browse files Browse the repository at this point in the history
Needs event-listener-strategy#6 first

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Sep 20, 2023
1 parent c788964 commit 070d6a2
Show file tree
Hide file tree
Showing 7 changed files with 547 additions and 120 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ waker-fn = "1.1.0"

[target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies]
wasm-bindgen-test = "0.3"

[patch.crates-io]
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy", branch = "notgull/no-lt" }
86 changes: 74 additions & 12 deletions src/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};

use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::task::Poll;

use crate::futures::Lock;
use crate::Mutex;
Expand Down Expand Up @@ -79,18 +79,66 @@ impl Barrier {
/// }
/// ```
pub fn wait(&self) -> BarrierWait<'_> {
BarrierWait {
BarrierWait::_new(BarrierWaitInner {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(&self.event),
state: WaitState::Initial,
}
})
}

/// Blocks the current thread until all tasks reach this point.
///
/// Barriers are reusable after all tasks have synchronized, and can be used continuously.
///
/// Returns a [`BarrierWaitResult`] indicating whether this task is the "leader", meaning the
/// last task to call this method.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`wait`] method, this method will
/// block the current thread until the wait is complete.
///
/// This method should not be used in a synchronous context. It is intended to be
/// used in a way that a barrier can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
///
/// # Examples
///
/// ```
/// use async_lock::Barrier;
/// use futures_lite::future;
/// use std::sync::Arc;
/// use std::thread;
///
/// let barrier = Arc::new(Barrier::new(5));
///
/// for _ in 0..5 {
/// let b = barrier.clone();
/// thread::spawn(move || {
/// // The same messages will be printed together.
/// // There will NOT be interleaving of "before" and "after".
/// println!("before wait");
/// b.wait_blocking();
/// println!("after wait");
/// });
/// }
/// ```
pub fn wait_blocking(&self) -> BarrierWaitResult {
self.wait().wait()
}
}

easy_wrapper! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a>(BarrierWaitInner<'a> => BarrierWaitResult);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project_lite::pin_project! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
struct BarrierWaitInner<'a> {
// The barrier to wait on.
barrier: &'a Barrier,

Expand Down Expand Up @@ -124,18 +172,27 @@ enum WaitState {
Reacquiring { local_gen: u64 },
}

impl Future for BarrierWait<'_> {
impl EventListenerFuture for BarrierWaitInner<'_> {
type Output = BarrierWaitResult;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'a, S: Strategy<'a>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state {
WaitState::Initial => {
// See if the lock is ready yet.
let mut state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);
let mut state = ready!(this
.lock
.as_mut()
.as_pin_mut()
.unwrap()
.poll_with_strategy(strategy, cx));
this.lock.as_mut().set(None);

let local_gen = state.generation_id;
state.count += 1;
Expand All @@ -154,18 +211,23 @@ impl Future for BarrierWait<'_> {
}

WaitState::Waiting { local_gen } => {
ready!(this.evl.as_mut().poll(cx));
ready!(strategy.poll(this.evl.as_mut(), cx));

// We are now re-acquiring the mutex.
this.lock.set(Some(this.barrier.state.lock()));
this.lock.as_mut().set(Some(this.barrier.state.lock()));
*this.state = WaitState::Reacquiring {
local_gen: *local_gen,
};
}

WaitState::Reacquiring { local_gen } => {
// Acquire the local state again.
let state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
let state = ready!(this
.lock
.as_mut()
.as_pin_mut()
.unwrap()
.poll_with_strategy(strategy, cx));
this.lock.set(None);

if *local_gen == state.generation_id && state.count < this.barrier.n {
Expand Down
61 changes: 58 additions & 3 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,33 @@ impl<T: ?Sized> Mutex<T> {
})
}

/// Acquires the mutex using the blocking strategy.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`lock`] method, this method will
/// block the current thread until the lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a mutex can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[inline]
pub fn lock_blocking(&self) -> MutexGuard<'_, T> {
self.lock().wait()
}

/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
Expand Down Expand Up @@ -199,6 +226,34 @@ impl<T: ?Sized> Mutex<T> {
})
}

/// Acquires the mutex and clones a reference to it using the blocking strategy.
///
/// Returns an owned guard that releases the mutex when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`lock_arc`] method, this method will
/// block the current thread until the lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a mutex can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10));
/// let guard = mutex.lock_arc_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[inline]
pub fn lock_arc_blocking(self: &Arc<Self>) -> MutexGuardArc<T> {
self.lock_arc().wait()
}

/// Attempts to acquire the mutex and clone a reference to it.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
Expand Down Expand Up @@ -291,7 +346,7 @@ impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> {

#[inline]
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down Expand Up @@ -350,7 +405,7 @@ impl<T: ?Sized> EventListenerFuture for LockArcInnards<T> {
type Output = MutexGuardArc<T>;

fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&'a mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down Expand Up @@ -459,7 +514,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow

#[cold]
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&'a mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down
Loading

0 comments on commit 070d6a2

Please sign in to comment.