From 67244da0844c05d34d22e1b2eaea2f8dc86f545a Mon Sep 17 00:00:00 2001 From: KaiJewson Date: Sat, 13 Feb 2021 12:38:03 +0000 Subject: [PATCH] sync: add OwnedRwLockReadGuard and OwnedRwLockWriteGuard I did not add mapping functions as it's not possible to implement them soundly without adding another generic parameter to the guards, which I will leave to another PR. Fixes: #3327 --- tokio/src/sync/rwlock.rs | 122 ++++++++++++++++++++- tokio/src/sync/rwlock/owned_read_guard.rs | 49 +++++++++ tokio/src/sync/rwlock/owned_write_guard.rs | 107 ++++++++++++++++++ tokio/src/sync/rwlock/read_guard.rs | 2 +- tokio/src/sync/rwlock/write_guard.rs | 2 +- tokio/src/sync/tests/loom_rwlock.rs | 8 +- 6 files changed, 286 insertions(+), 4 deletions(-) create mode 100644 tokio/src/sync/rwlock/owned_read_guard.rs create mode 100644 tokio/src/sync/rwlock/owned_write_guard.rs diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 30e69fde04d..4987a76b7be 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -2,10 +2,16 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError}; use crate::sync::mutex::TryLockError; use std::cell::UnsafeCell; use std::marker; +use std::mem::ManuallyDrop; +use std::sync::Arc; +pub(crate) mod owned_read_guard; +pub(crate) mod owned_write_guard; pub(crate) mod read_guard; pub(crate) mod write_guard; pub(crate) mod write_guard_mapped; +pub(crate) use owned_read_guard::OwnedRwLockReadGuard; +pub(crate) use owned_write_guard::OwnedRwLockWriteGuard; pub(crate) use read_guard::RwLockReadGuard; pub(crate) use write_guard::RwLockWriteGuard; pub(crate) use write_guard_mapped::RwLockMappedWriteGuard; @@ -101,13 +107,23 @@ fn bounds() { check_sync::>(); check_unpin::>(); + check_send::>(); + check_sync::>(); + check_unpin::>(); + check_send::>(); check_sync::>(); check_unpin::>(); - let rwlock = RwLock::new(0); + check_send::>(); + check_sync::>(); + check_unpin::>(); + + let rwlock = Arc::new(RwLock::new(0)); check_send_sync_val(rwlock.read()); + check_send_sync_val(Arc::clone(&rwlock).read_owned()); check_send_sync_val(rwlock.write()); + check_send_sync_val(Arc::clone(&rwlock).write_owned()); } // As long as T: Send + Sync, it's fine to send and share RwLock between threads. @@ -120,13 +136,19 @@ unsafe impl Sync for RwLock where T: ?Sized + Send + Sync {} // `T` is `Send`. unsafe impl Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {} unsafe impl Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {} +// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in +// the RwLock, unlike RwLockReadGuard. +unsafe impl Send for OwnedRwLockReadGuard where T: ?Sized + Send + Sync {} +unsafe impl Sync for OwnedRwLockReadGuard where T: ?Sized + Send + Sync {} unsafe impl Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} +unsafe impl Sync for OwnedRwLockWriteGuard where T: ?Sized + Send + Sync {} unsafe impl Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {} // Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over // `T` is `Send` - but since this is also provides mutable access, we need to // make sure that `T` is `Send` since its value can be sent across thread // boundaries. unsafe impl Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} +unsafe impl Send for OwnedRwLockWriteGuard where T: ?Sized + Send + Sync {} unsafe impl Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {} impl RwLock { @@ -222,6 +244,63 @@ impl RwLock { } } + /// Locks this `RwLock` with shared read access, causing the current task + /// to yield until the lock has been acquired. + /// + /// The calling task will yield until there are no writers which hold the + /// lock. There may be other readers inside the lock when the task resumes. + /// + /// This method is identical to [`RwLock::read`], except that the returned + /// guard references the `RwLock` with an [`Arc`] rather than by borrowing + /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this + /// method, and the guard will live for the `'static` lifetime, as it keeps + /// the `RwLock` alive by holding an `Arc`. + /// + /// Note that under the priority policy of [`RwLock`], read locks are not + /// granted until prior write locks, to prevent starvation. Therefore + /// deadlock may occur if a read lock is held by the current task, a write + /// lock attempt is made, and then a subsequent read lock attempt is made + /// by the current task. + /// + /// Returns an RAII guard which will drop this read access of the `RwLock` + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read_owned().await; + /// assert_eq!(*n, 1); + /// + /// tokio::spawn(async move { + /// // While main has an active read lock, we acquire one too. + /// let r = c_lock.read_owned().await; + /// assert_eq!(*r, 1); + /// }).await.expect("The spawned task has panicked"); + /// + /// // Drop the guard after the spawned task finishes. + /// drop(n); + ///} + /// ``` + pub async fn read_owned(self: Arc) -> OwnedRwLockReadGuard { + self.s.acquire(1).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + OwnedRwLockReadGuard { + data: self.c.get(), + lock: self, + } + } + /// Attempts to acquire this `RwLock` with shared read access. /// /// If the access couldn't be acquired immediately, returns [`TryLockError`]. @@ -303,6 +382,47 @@ impl RwLock { } } + /// Locks this `RwLock` with exclusive write access, causing the current + /// task to yield until the lock has been acquired. + /// + /// The calling task will yield while other writers or readers currently + /// have access to the lock. + /// + /// This method is identical to [`RwLock::write`], except that the returned + /// guard references the `RwLock` with an [`Arc`] rather than by borrowing + /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this + /// method, and the guard will live for the `'static` lifetime, as it keeps + /// the `RwLock` alive by holding an `Arc`. + /// + /// Returns an RAII guard which will drop the write access of this `RwLock` + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::RwLock; + /// + /// #[tokio::main] + /// async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let mut n = lock.write_owned().await; + /// *n = 2; + ///} + /// ``` + pub async fn write_owned(self: Arc) -> OwnedRwLockWriteGuard { + self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + OwnedRwLockWriteGuard { + data: self.c.get(), + lock: ManuallyDrop::new(self), + } + } + /// Attempts to acquire this `RwLock` with exclusive write access. /// /// If the access couldn't be acquired immediately, returns [`TryLockError`]. diff --git a/tokio/src/sync/rwlock/owned_read_guard.rs b/tokio/src/sync/rwlock/owned_read_guard.rs new file mode 100644 index 00000000000..22162aa605b --- /dev/null +++ b/tokio/src/sync/rwlock/owned_read_guard.rs @@ -0,0 +1,49 @@ +use crate::sync::rwlock::RwLock; +use std::fmt; +use std::ops; +use std::sync::Arc; + +/// Owned RAII structure used to release the shared read access of a lock when +/// dropped. +/// +/// This structure is created by the [`read_owned`] method on +/// [`RwLock`]. +/// +/// [`read_owned`]: method@crate::sync::RwLock::read_owned +/// [`RwLock`]: struct@crate::sync::RwLock +pub struct OwnedRwLockReadGuard { + pub(super) lock: Arc>, + pub(super) data: *const T, +} + +impl ops::Deref for OwnedRwLockReadGuard { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.data } + } +} + +impl fmt::Debug for OwnedRwLockReadGuard +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for OwnedRwLockReadGuard +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl Drop for OwnedRwLockReadGuard { + fn drop(&mut self) { + self.lock.s.release(1); + } +} diff --git a/tokio/src/sync/rwlock/owned_write_guard.rs b/tokio/src/sync/rwlock/owned_write_guard.rs new file mode 100644 index 00000000000..deb66e3c7df --- /dev/null +++ b/tokio/src/sync/rwlock/owned_write_guard.rs @@ -0,0 +1,107 @@ +use crate::sync::rwlock::owned_read_guard::OwnedRwLockReadGuard; +use crate::sync::rwlock::RwLock; +use std::fmt; +use std::mem::{self, ManuallyDrop}; +use std::ops; +use std::sync::Arc; + +/// Owned RAII structure used to release the exclusive write access of a lock when +/// dropped. +/// +/// This structure is created by the [`write_owned`] method +/// on [`RwLock`]. +/// +/// [`write_owned`]: method@crate::sync::RwLock::write_owned +/// [`RwLock`]: struct@crate::sync::RwLock +pub struct OwnedRwLockWriteGuard { + // ManuallyDrop allows us to destructure into this field without running the destructor. + pub(super) lock: ManuallyDrop>>, + pub(super) data: *mut T, +} + +impl OwnedRwLockWriteGuard { + /// Atomically downgrades a write lock into a read lock without allowing + /// any writers to take exclusive access of the lock in the meantime. + /// + /// **Note:** This won't *necessarily* allow any additional readers to acquire + /// locks, since [`RwLock`] is fair and it is possible that a writer is next + /// in line. + /// + /// Returns an RAII guard which will drop this read access of the `RwLock` + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// # use tokio::sync::RwLock; + /// # use std::sync::Arc; + /// # + /// # #[tokio::main] + /// # async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let n = lock.clone().write_owned().await; + /// + /// let cloned_lock = lock.clone(); + /// let handle = tokio::spawn(async move { + /// *cloned_lock.write_owned().await = 2; + /// }); + /// + /// let n = n.downgrade(); + /// assert_eq!(*n, 1, "downgrade is atomic"); + /// + /// drop(n); + /// handle.await.unwrap(); + /// assert_eq!(*lock.read().await, 2, "second writer obtained write lock"); + /// # } + /// ``` + pub fn downgrade(mut self) -> OwnedRwLockReadGuard { + let lock = unsafe { ManuallyDrop::take(&mut self.lock) }; + let data = self.data; + + // Release all but one of the permits held by the write guard + lock.s.release(super::MAX_READS - 1); + // NB: Forget to avoid drop impl from being called. + mem::forget(self); + OwnedRwLockReadGuard { lock, data } + } +} + +impl ops::Deref for OwnedRwLockWriteGuard { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.data } + } +} + +impl ops::DerefMut for OwnedRwLockWriteGuard { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.data } + } +} + +impl fmt::Debug for OwnedRwLockWriteGuard +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for OwnedRwLockWriteGuard +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl Drop for OwnedRwLockWriteGuard { + fn drop(&mut self) { + self.lock.s.release(super::MAX_READS); + unsafe { ManuallyDrop::drop(&mut self.lock) }; + } +} diff --git a/tokio/src/sync/rwlock/read_guard.rs b/tokio/src/sync/rwlock/read_guard.rs index d648944b2d4..38eec7727c3 100644 --- a/tokio/src/sync/rwlock/read_guard.rs +++ b/tokio/src/sync/rwlock/read_guard.rs @@ -18,7 +18,7 @@ pub struct RwLockReadGuard<'a, T: ?Sized> { pub(super) marker: marker::PhantomData<&'a T>, } -impl<'a, T> RwLockReadGuard<'a, T> { +impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { /// Make a new `RwLockReadGuard` for a component of the locked data. /// /// This operation cannot fail as the `RwLockReadGuard` passed in already diff --git a/tokio/src/sync/rwlock/write_guard.rs b/tokio/src/sync/rwlock/write_guard.rs index 9f52d046176..1737d203c0e 100644 --- a/tokio/src/sync/rwlock/write_guard.rs +++ b/tokio/src/sync/rwlock/write_guard.rs @@ -9,7 +9,7 @@ use std::ops; /// RAII structure used to release the exclusive write access of a lock when /// dropped. /// -/// This structure is created by the [`write`] and method +/// This structure is created by the [`write`] method /// on [`RwLock`]. /// /// [`write`]: method@crate::sync::RwLock::write diff --git a/tokio/src/sync/tests/loom_rwlock.rs b/tokio/src/sync/tests/loom_rwlock.rs index 2834a26356c..f2a360c50e9 100644 --- a/tokio/src/sync/tests/loom_rwlock.rs +++ b/tokio/src/sync/tests/loom_rwlock.rs @@ -53,7 +53,7 @@ fn concurrent_read_write() { let rwclone = rwlock.clone(); let t2 = thread::spawn(move || { block_on(async { - let mut guard = rwclone.write().await; + let mut guard = rwclone.write_owned().await; *guard += 5; }); }); @@ -67,6 +67,12 @@ fn concurrent_read_write() { }); }); + { + let guard = blcok_on(rwlock.clone().read_owned()); + //at this state the value on the lock may either be 0, 5, or 10 + assert!(*guard == 0 || *guard == 5 || *guard == 10); + } + t1.join().expect("thread 1 write should not panic"); t2.join().expect("thread 2 write should not panic"); t3.join().expect("thread 3 read should not panic");