Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OwnedRwLockReadGuard and OwnedRwLockWriteGuard #3340

Merged
merged 7 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ cfg_sync! {

mod rwlock;
pub use rwlock::RwLock;
pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
pub use rwlock::read_guard::RwLockReadGuard;
pub use rwlock::write_guard::RwLockWriteGuard;
pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
Expand Down
248 changes: 247 additions & 1 deletion tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
use crate::sync::mutex::TryLockError;
use std::cell::UnsafeCell;
use std::marker;
use std::mem::ManuallyDrop;
use std::sync::Arc;

pub(crate) mod owned_read_guard;
pub(crate) mod owned_write_guard;
pub(crate) mod owned_write_guard_mapped;
pub(crate) mod read_guard;
pub(crate) mod write_guard;
pub(crate) mod write_guard_mapped;
pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
pub(crate) use read_guard::RwLockReadGuard;
pub(crate) use write_guard::RwLockWriteGuard;
pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
Expand Down Expand Up @@ -101,13 +109,31 @@ fn bounds() {
check_sync::<RwLockReadGuard<'_, u32>>();
check_unpin::<RwLockReadGuard<'_, u32>>();

check_send::<OwnedRwLockReadGuard<u32, i32>>();
check_sync::<OwnedRwLockReadGuard<u32, i32>>();
check_unpin::<OwnedRwLockReadGuard<u32, i32>>();

check_send::<RwLockWriteGuard<'_, u32>>();
check_sync::<RwLockWriteGuard<'_, u32>>();
check_unpin::<RwLockWriteGuard<'_, u32>>();

let rwlock = RwLock::new(0);
check_send::<RwLockMappedWriteGuard<'_, u32>>();
check_sync::<RwLockMappedWriteGuard<'_, u32>>();
check_unpin::<RwLockMappedWriteGuard<'_, u32>>();

check_send::<OwnedRwLockWriteGuard<u32>>();
check_sync::<OwnedRwLockWriteGuard<u32>>();
check_unpin::<OwnedRwLockWriteGuard<u32>>();

check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();

let rwlock = Arc::new(RwLock::new(0));
check_send_sync_val(rwlock.read());
check_send_sync_val(Arc::clone(&rwlock).read_owned());
check_send_sync_val(rwlock.write());
check_send_sync_val(Arc::clone(&rwlock).write_owned());
}

// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
Expand All @@ -120,14 +146,42 @@ unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
// `T` is `Send`.
unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
// the RwLock, unlike RwLockReadGuard.
unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Sync,
{
}
unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Send + Sync,
{
}
unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Send + Sync,
{
}
// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
// `T` is `Send` - but since this is also provides mutable access, we need to
// make sure that `T` is `Send` since its value can be sent across thread
// boundaries.
unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Send + Sync,
{
}

impl<T: ?Sized> RwLock<T> {
/// Creates a new instance of an `RwLock<T>` which is unlocked.
Expand Down Expand Up @@ -222,6 +276,63 @@ impl<T: ?Sized> RwLock<T> {
}
}

/// Locks this `RwLock` with shared read access, causing the current task
/// to yield until the lock has been acquired.
///
/// The calling task will yield until there are no writers which hold the
/// lock. There may be other readers inside the lock when the task resumes.
///
/// This method is identical to [`RwLock::read`], except that the returned
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
/// method, and the guard will live for the `'static` lifetime, as it keeps
/// the `RwLock` alive by holding an `Arc`.
///
/// Note that under the priority policy of [`RwLock`], read locks are not
/// granted until prior write locks, to prevent starvation. Therefore
/// deadlock may occur if a read lock is held by the current task, a write
/// lock attempt is made, and then a subsequent read lock attempt is made
/// by the current task.
///
/// Returns an RAII guard which will drop this read access of the `RwLock`
/// when dropped.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
/// let c_lock = lock.clone();
///
/// let n = lock.read_owned().await;
/// assert_eq!(*n, 1);
///
/// tokio::spawn(async move {
/// // While main has an active read lock, we acquire one too.
/// let r = c_lock.read_owned().await;
/// assert_eq!(*r, 1);
/// }).await.expect("The spawned task has panicked");
///
/// // Drop the guard after the spawned task finishes.
/// drop(n);
///}
/// ```
pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
self.s.acquire(1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
OwnedRwLockReadGuard {
data: self.c.get(),
lock: ManuallyDrop::new(self),
}
}

/// Attempts to acquire this `RwLock` with shared read access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
Expand Down Expand Up @@ -268,6 +379,57 @@ impl<T: ?Sized> RwLock<T> {
})
}

/// Attempts to acquire this `RwLock` with shared read access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
/// Otherwise, an RAII guard is returned which will release read access
/// when dropped.
///
/// This method is identical to [`RwLock::try_read`], except that the
/// returned guard references the `RwLock` with an [`Arc`] rather than by
/// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
/// call this method, and the guard will live for the `'static` lifetime,
/// as it keeps the `RwLock` alive by holding an `Arc`.
///
/// [`TryLockError`]: TryLockError
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
/// let c_lock = lock.clone();
///
/// let v = lock.try_read_owned().unwrap();
/// assert_eq!(*v, 1);
///
/// tokio::spawn(async move {
/// // While main has an active read lock, we acquire one too.
/// let n = c_lock.read_owned().await;
/// assert_eq!(*n, 1);
/// }).await.expect("The spawned task has panicked");
///
/// // Drop the guard when spawned task finishes.
/// drop(v);
/// }
/// ```
pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
Err(TryAcquireError::Closed) => unreachable!(),
}

Ok(OwnedRwLockReadGuard {
data: self.c.get(),
lock: ManuallyDrop::new(self),
})
}

/// Locks this `RwLock` with exclusive write access, causing the current
/// task to yield until the lock has been acquired.
///
Expand Down Expand Up @@ -303,6 +465,47 @@ impl<T: ?Sized> RwLock<T> {
}
}

/// Locks this `RwLock` with exclusive write access, causing the current
/// task to yield until the lock has been acquired.
///
/// The calling task will yield while other writers or readers currently
/// have access to the lock.
///
/// This method is identical to [`RwLock::write`], except that the returned
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
/// method, and the guard will live for the `'static` lifetime, as it keeps
/// the `RwLock` alive by holding an `Arc`.
///
/// Returns an RAII guard which will drop the write access of this `RwLock`
/// when dropped.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
///
/// let mut n = lock.write_owned().await;
/// *n = 2;
///}
/// ```
pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
OwnedRwLockWriteGuard {
data: self.c.get(),
lock: ManuallyDrop::new(self),
}
}

/// Attempts to acquire this `RwLock` with exclusive write access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
Expand Down Expand Up @@ -340,6 +543,49 @@ impl<T: ?Sized> RwLock<T> {
})
}

/// Attempts to acquire this `RwLock` with exclusive write access.
///
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
/// Otherwise, an RAII guard is returned which will release write access
/// when dropped.
///
/// This method is identical to [`RwLock::try_write`], except that the
/// returned guard references the `RwLock` with an [`Arc`] rather than by
/// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
/// call this method, and the guard will live for the `'static` lifetime,
/// as it keeps the `RwLock` alive by holding an `Arc`.
///
/// [`TryLockError`]: TryLockError
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let rw = Arc::new(RwLock::new(1));
///
/// let v = Arc::clone(&rw).read_owned().await;
/// assert_eq!(*v, 1);
///
/// assert!(rw.try_write_owned().is_err());
/// }
/// ```
pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
match self.s.try_acquire(MAX_READS as u32) {
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
Err(TryAcquireError::Closed) => unreachable!(),
}

Ok(OwnedRwLockWriteGuard {
data: self.c.get(),
lock: ManuallyDrop::new(self),
})
}

/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `RwLock` mutably, no actual locking needs to
Expand Down
Loading