Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OwnedRwLockReadGuard and OwnedRwLockWriteGuard #3340

Merged
merged 7 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
use crate::sync::mutex::TryLockError;
use std::cell::UnsafeCell;
use std::marker;
use std::mem::ManuallyDrop;
use std::sync::Arc;

pub(crate) mod owned_read_guard;
pub(crate) mod owned_write_guard;
pub(crate) mod read_guard;
pub(crate) mod write_guard;
pub(crate) mod write_guard_mapped;
pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
pub(crate) use read_guard::RwLockReadGuard;
pub(crate) use write_guard::RwLockWriteGuard;
pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
Expand Down Expand Up @@ -101,13 +107,23 @@ fn bounds() {
check_sync::<RwLockReadGuard<'_, u32>>();
check_unpin::<RwLockReadGuard<'_, u32>>();

check_send::<OwnedRwLockReadGuard<u32>>();
check_sync::<OwnedRwLockReadGuard<u32>>();
check_unpin::<OwnedRwLockReadGuard<u32>>();

check_send::<RwLockWriteGuard<'_, u32>>();
check_sync::<RwLockWriteGuard<'_, u32>>();
check_unpin::<RwLockWriteGuard<'_, u32>>();

let rwlock = RwLock::new(0);
check_send::<OwnedRwLockWriteGuard<u32>>();
check_sync::<OwnedRwLockWriteGuard<u32>>();
check_unpin::<OwnedRwLockWriteGuard<u32>>();

let rwlock = Arc::new(RwLock::new(0));
check_send_sync_val(rwlock.read());
check_send_sync_val(Arc::clone(&rwlock).read_owned());
check_send_sync_val(rwlock.write());
check_send_sync_val(Arc::clone(&rwlock).write_owned());
}

// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
Expand All @@ -120,13 +136,19 @@ unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
// `T` is `Send`.
unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
// the RwLock, unlike RwLockReadGuard.
unsafe impl<T> Send for OwnedRwLockReadGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedRwLockReadGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
// `T` is `Send` - but since this is also provides mutable access, we need to
// make sure that `T` is `Send` since its value can be sent across thread
// boundaries.
unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}

impl<T: ?Sized> RwLock<T> {
Expand Down Expand Up @@ -222,6 +244,63 @@ impl<T: ?Sized> RwLock<T> {
}
}

/// Locks this `RwLock` with shared read access, causing the current task
/// to yield until the lock has been acquired.
///
/// The calling task will yield until there are no writers which hold the
/// lock. There may be other readers inside the lock when the task resumes.
///
/// This method is identical to [`RwLock::read`], except that the returned
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
/// method, and the guard will live for the `'static` lifetime, as it keeps
/// the `RwLock` alive by holding an `Arc`.
///
/// Note that under the priority policy of [`RwLock`], read locks are not
/// granted until prior write locks, to prevent starvation. Therefore
/// deadlock may occur if a read lock is held by the current task, a write
/// lock attempt is made, and then a subsequent read lock attempt is made
/// by the current task.
///
/// Returns an RAII guard which will drop this read access of the `RwLock`
/// when dropped.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
/// let c_lock = lock.clone();
///
/// let n = lock.read_owned().await;
/// assert_eq!(*n, 1);
///
/// tokio::spawn(async move {
/// // While main has an active read lock, we acquire one too.
/// let r = c_lock.read_owned().await;
/// assert_eq!(*r, 1);
/// }).await.expect("The spawned task has panicked");
///
/// // Drop the guard after the spawned task finishes.
/// drop(n);
///}
/// ```
pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
self.s.acquire(1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
OwnedRwLockReadGuard {
data: self.c.get(),
lock: self,
}
}

/// Attempts to acquire this `RwLock` with shared read access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
Expand Down Expand Up @@ -303,6 +382,47 @@ impl<T: ?Sized> RwLock<T> {
}
}

/// Locks this `RwLock` with exclusive write access, causing the current
/// task to yield until the lock has been acquired.
///
/// The calling task will yield while other writers or readers currently
/// have access to the lock.
///
/// This method is identical to [`RwLock::write`], except that the returned
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
/// method, and the guard will live for the `'static` lifetime, as it keeps
/// the `RwLock` alive by holding an `Arc`.
///
/// Returns an RAII guard which will drop the write access of this `RwLock`
/// when dropped.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
///
/// let mut n = lock.write_owned().await;
/// *n = 2;
///}
/// ```
pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
OwnedRwLockWriteGuard {
data: self.c.get(),
lock: ManuallyDrop::new(self),
}
}

/// Attempts to acquire this `RwLock` with exclusive write access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
Expand Down
49 changes: 49 additions & 0 deletions tokio/src/sync/rwlock/owned_read_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::sync::rwlock::RwLock;
use std::fmt;
use std::ops;
use std::sync::Arc;

/// Owned RAII structure used to release the shared read access of a lock when
/// dropped.
///
/// This structure is created by the [`read_owned`] method on
/// [`RwLock`].
///
/// [`read_owned`]: method@crate::sync::RwLock::read_owned
/// [`RwLock`]: struct@crate::sync::RwLock
pub struct OwnedRwLockReadGuard<T: ?Sized> {
pub(super) lock: Arc<RwLock<T>>,
pub(super) data: *const T,
}

impl<T: ?Sized> ops::Deref for OwnedRwLockReadGuard<T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.data }
}
}

impl<T: ?Sized> fmt::Debug for OwnedRwLockReadGuard<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

impl<T: ?Sized> fmt::Display for OwnedRwLockReadGuard<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}

impl<T: ?Sized> Drop for OwnedRwLockReadGuard<T> {
fn drop(&mut self) {
self.lock.s.release(1);
}
}
107 changes: 107 additions & 0 deletions tokio/src/sync/rwlock/owned_write_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use crate::sync::rwlock::owned_read_guard::OwnedRwLockReadGuard;
use crate::sync::rwlock::RwLock;
use std::fmt;
use std::mem::{self, ManuallyDrop};
use std::ops;
use std::sync::Arc;

/// Owned RAII structure used to release the exclusive write access of a lock when
/// dropped.
///
/// This structure is created by the [`write_owned`] method
/// on [`RwLock`].
///
/// [`write_owned`]: method@crate::sync::RwLock::write_owned
/// [`RwLock`]: struct@crate::sync::RwLock
pub struct OwnedRwLockWriteGuard<T: ?Sized> {
// ManuallyDrop allows us to destructure into this field without running the destructor.
pub(super) lock: ManuallyDrop<Arc<RwLock<T>>>,
pub(super) data: *mut T,
}
SabrinaJewson marked this conversation as resolved.
Show resolved Hide resolved

impl<T: ?Sized> OwnedRwLockWriteGuard<T> {
/// Atomically downgrades a write lock into a read lock without allowing
/// any writers to take exclusive access of the lock in the meantime.
///
/// **Note:** This won't *necessarily* allow any additional readers to acquire
/// locks, since [`RwLock`] is fair and it is possible that a writer is next
/// in line.
///
/// Returns an RAII guard which will drop this read access of the `RwLock`
/// when dropped.
///
/// # Examples
///
/// ```
/// # use tokio::sync::RwLock;
/// # use std::sync::Arc;
/// #
/// # #[tokio::main]
/// # async fn main() {
/// let lock = Arc::new(RwLock::new(1));
///
/// let n = lock.clone().write_owned().await;
///
/// let cloned_lock = lock.clone();
/// let handle = tokio::spawn(async move {
/// *cloned_lock.write_owned().await = 2;
/// });
///
/// let n = n.downgrade();
/// assert_eq!(*n, 1, "downgrade is atomic");
///
/// drop(n);
/// handle.await.unwrap();
/// assert_eq!(*lock.read().await, 2, "second writer obtained write lock");
/// # }
/// ```
pub fn downgrade(mut self) -> OwnedRwLockReadGuard<T> {
let lock = unsafe { ManuallyDrop::take(&mut self.lock) };
let data = self.data;

// Release all but one of the permits held by the write guard
lock.s.release(super::MAX_READS - 1);
// NB: Forget to avoid drop impl from being called.
mem::forget(self);
OwnedRwLockReadGuard { lock, data }
}
}

impl<T: ?Sized> ops::Deref for OwnedRwLockWriteGuard<T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.data }
}
}

impl<T: ?Sized> ops::DerefMut for OwnedRwLockWriteGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.data }
}
}

impl<T: ?Sized> fmt::Debug for OwnedRwLockWriteGuard<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

impl<T: ?Sized> fmt::Display for OwnedRwLockWriteGuard<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}

impl<T: ?Sized> Drop for OwnedRwLockWriteGuard<T> {
fn drop(&mut self) {
self.lock.s.release(super::MAX_READS);
unsafe { ManuallyDrop::drop(&mut self.lock) };
}
}
2 changes: 1 addition & 1 deletion tokio/src/sync/rwlock/read_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct RwLockReadGuard<'a, T: ?Sized> {
pub(super) marker: marker::PhantomData<&'a T>,
}

impl<'a, T> RwLockReadGuard<'a, T> {
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
/// Make a new `RwLockReadGuard` for a component of the locked data.
///
/// This operation cannot fail as the `RwLockReadGuard` passed in already
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/rwlock/write_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ops;
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
///
/// This structure is created by the [`write`] and method
/// This structure is created by the [`write`] method
/// on [`RwLock`].
///
/// [`write`]: method@crate::sync::RwLock::write
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/sync/tests/loom_rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn concurrent_read_write() {
let rwclone = rwlock.clone();
let t2 = thread::spawn(move || {
block_on(async {
let mut guard = rwclone.write().await;
let mut guard = rwclone.write_owned().await;
*guard += 5;
});
});
Expand All @@ -67,6 +67,12 @@ fn concurrent_read_write() {
});
});

{
let guard = block_on(rwlock.clone().read_owned());
//at this state the value on the lock may either be 0, 5, or 10
assert!(*guard == 0 || *guard == 5 || *guard == 10);
}

t1.join().expect("thread 1 write should not panic");
t2.join().expect("thread 2 write should not panic");
t3.join().expect("thread 3 read should not panic");
Expand Down