Skip to content

Commit

Permalink
Add missing Arc blocking methods (#71)
Browse files Browse the repository at this point in the history
* Add missing `Arc` blocking methods

Adds:

- `Rwlock::read_arc_blocking`
- `RwLock::upgradable_read_arc_blocking`
- `RwLock::write_arc_blocking`
- `RwLockUpgradableReadGuardArc::upgrade_blocking`
- `Semaphore::acquire_arc_blocking`

* Restore missing future `fmt::Debug` impls
  • Loading branch information
Jules-Bertholet authored Nov 26, 2023
1 parent 751c7b8 commit d1cff63
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pin_project_lite::pin_project! {
unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {}

impl<T: ?Sized> fmt::Debug for LockInner<'_, T> {
impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Lock { .. }")
}
Expand Down
132 changes: 132 additions & 0 deletions src/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,41 @@ impl<T> RwLock<T> {
pub fn read_arc<'a>(self: &'a Arc<Self>) -> ReadArc<'a, T> {
ReadArc::new(self.raw.read(), self)
}

/// Acquires an owned, reference-counted read lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Note that attempts to acquire a read lock will block if there are also concurrent attempts
/// to acquire a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`read_arc`][`RwLock::read_arc`] method,
/// this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// assert!(lock.try_read().is_some());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn read_arc_blocking(self: &Arc<Self>) -> RwLockReadGuardArc<T> {
self.read_arc().wait()
}
}

impl<T: ?Sized> RwLock<T> {
Expand Down Expand Up @@ -347,6 +382,47 @@ impl<T: ?Sized> RwLock<T> {
self.upgradable_read().wait()
}

/// Attempts to acquire an owned, reference-counted read lock
/// with the possiblity to upgrade to a write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Upgradable read lock reserves the right to be upgraded to a write lock, which means there
/// can be at most one upgradable read lock at a time.
///
/// Note that attempts to acquire an upgradable read lock will block if there are concurrent
/// attempts to acquire another upgradable read lock or a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`upgradable_read_arc`][`RwLock::upgradable_read_arc`]
/// method, this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgradable_read_arc_blocking(self: &Arc<Self>) -> RwLockUpgradableReadGuardArc<T> {
self.upgradable_read_arc().wait()
}

/// Attempts to acquire an owned, reference-counted read lock with the possiblity to
/// upgrade to a write lock.
///
Expand Down Expand Up @@ -545,6 +621,36 @@ impl<T: ?Sized> RwLock<T> {
WriteArc::new(self.raw.write(), self)
}

/// Acquires an owned, reference-counted write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`write_arc`][RwLock::write_arc] method, this method will
/// block the current thread until the write lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let writer = lock.write_arc_blocking();
/// assert!(lock.try_read().is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn write_arc_blocking(self: &Arc<Self>) -> RwLockWriteGuardArc<T> {
self.write_arc().wait()
}

/// Returns a mutable reference to the inner value.
///
/// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow
Expand Down Expand Up @@ -1067,6 +1173,32 @@ impl<T: ?Sized> RwLockUpgradableReadGuardArc<T> {
)
}
}

/// Upgrades into a write lock.
///
/// # Blocking
///
/// This function will block the current thread until it is able to acquire the write lock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgrade_blocking(guard: Self) -> RwLockWriteGuardArc<T> {
RwLockUpgradableReadGuardArc::upgrade(guard).wait()
}
}

/// A guard that releases the write lock when dropped.
Expand Down
57 changes: 47 additions & 10 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use core::task::Poll;

use alloc::sync::Arc;

Expand Down Expand Up @@ -174,10 +173,38 @@ impl Semaphore {
/// # });
/// ```
pub fn acquire_arc(self: &Arc<Self>) -> AcquireArc {
AcquireArc {
AcquireArc::_new(AcquireArcInner {
semaphore: self.clone(),
listener: EventListener::new(),
}
})
}

/// Waits for an owned permit for a concurrent operation.
///
/// Returns a guard that releases the permit when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`acquire_arc`][Semaphore::acquire_arc] method,
/// this method will block the current thread until the permit is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a semaphore can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::Semaphore;
///
/// let s = Arc::new(Semaphore::new(2));
/// let guard = s.acquire_arc_blocking();
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn acquire_arc_blocking(self: &Arc<Self>) -> SemaphoreGuardArc {
self.acquire_arc().wait()
}

/// Adds `n` additional permits to the semaphore.
Expand Down Expand Up @@ -223,7 +250,7 @@ pin_project_lite::pin_project! {
}
}

impl fmt::Debug for AcquireInner<'_> {
impl fmt::Debug for Acquire<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Acquire { .. }")
}
Expand Down Expand Up @@ -255,9 +282,15 @@ impl<'a> EventListenerFuture for AcquireInner<'a> {
}
}

pin_project_lite::pin_project! {
easy_wrapper! {
/// The future returned by [`Semaphore::acquire_arc`].
pub struct AcquireArc {
pub struct AcquireArc(AcquireArcInner => SemaphoreGuardArc);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project_lite::pin_project! {
struct AcquireArcInner {
// The semaphore being acquired.
semaphore: Arc<Semaphore>,

Expand All @@ -273,10 +306,14 @@ impl fmt::Debug for AcquireArc {
}
}

impl Future for AcquireArc {
impl EventListenerFuture for AcquireArcInner {
type Output = SemaphoreGuardArc;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();

loop {
Expand All @@ -287,7 +324,7 @@ impl Future for AcquireArc {
if !this.listener.is_listening() {
this.listener.as_mut().listen(&this.semaphore.event);
} else {
ready!(this.listener.as_mut().poll(cx));
ready!(strategy.poll(this.listener.as_mut(), cx));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions tests/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ fn smoke_blocking() {
drop(m.lock_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let m = Arc::new(Mutex::new(()));
drop(m.lock_arc_blocking());
drop(m.lock_arc_blocking());
}

#[test]
fn try_lock() {
let m = Mutex::new(());
Expand Down
18 changes: 18 additions & 0 deletions tests/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,27 @@ fn smoke_blocking() {
drop(lock.read_blocking());
drop(lock.write_blocking());
drop((lock.read_blocking(), lock.read_blocking()));
let read = lock.read_blocking();
let upgradabe = lock.upgradable_read_blocking();
drop(read);
drop(RwLockUpgradableReadGuard::upgrade_blocking(upgradabe));
drop(lock.write_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let lock = Arc::new(RwLock::new(()));
drop(lock.read_arc_blocking());
drop(lock.write_arc_blocking());
drop((lock.read_arc_blocking(), lock.read_arc_blocking()));
let read = lock.read_arc_blocking();
let upgradabe = lock.upgradable_read_arc_blocking();
drop(read);
drop(RwLockUpgradableReadGuardArc::upgrade_blocking(upgradabe));
drop(lock.write_arc_blocking());
}

#[test]
fn try_write() {
future::block_on(async {
Expand Down
11 changes: 11 additions & 0 deletions tests/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ fn smoke_blocking() {
assert!(s.try_acquire().is_some());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let s = Arc::new(Semaphore::new(2));
let g1 = s.acquire_arc_blocking();
let _g2 = s.acquire_arc_blocking();
assert!(s.try_acquire().is_none());
drop(g1);
assert!(s.try_acquire().is_some());
}

#[test]
fn add_permits() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
Expand Down

0 comments on commit d1cff63

Please sign in to comment.