From 0b8e8cb379da77c78011f013ad034dda5f431488 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Fri, 19 Feb 2021 17:32:30 +0100 Subject: [PATCH 1/5] sync: Add acquire_many_owned methods to Semaphore --- tokio/src/sync/semaphore.rs | 42 ++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index bff5de98cda..8611781124a 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -143,7 +143,7 @@ impl Semaphore { } } - /// Tries to acquire n permits from the semaphore. + /// Tries to acquire `n` permits from the semaphore. /// /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, @@ -180,6 +180,24 @@ impl Semaphore { }) } + /// Acquires `n` permits from the semaphore. + /// + /// The semaphore must be wrapped in an [`Arc`] to call this method. + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. + /// + /// [`Arc`]: std::sync::Arc + /// [`AcquireError`]: crate::sync::AcquireError + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit + pub async fn acquire_many_owned(self: Arc, n: u32) -> Result { + self.ll_sem.acquire(n).await?; + Ok(OwnedSemaphorePermit { + sem: self, + permits: n, + }) + } + /// Tries to acquire a permit from the semaphore. /// /// The semaphore must be wrapped in an [`Arc`] to call this method. If @@ -202,6 +220,28 @@ impl Semaphore { } } + /// Tries to acquire `n` permits from the semaphore. + /// + /// The semaphore must be wrapped in an [`Arc`] to call this method. If + /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. + /// + /// [`Arc`]: std::sync::Arc + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit + pub fn try_acquire_many_owned(self: Arc, n: u32) -> Result { + match self.ll_sem.try_acquire(n) { + Ok(_) => Ok(OwnedSemaphorePermit { + sem: self, + permits: n, + }), + Err(e) => Err(e), + } + } + /// Closes the semaphore. /// /// This prevents the semaphore from issuing new permits and notifies all pending waiters. From c0eeaf4711cf9009cdbb65d1b8ae2af97bc1dc88 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Fri, 19 Feb 2021 17:48:02 +0100 Subject: [PATCH 2/5] fix formatting --- tokio/src/sync/semaphore.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 8611781124a..2d3ac3ca8ec 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -190,7 +190,10 @@ impl Semaphore { /// [`Arc`]: std::sync::Arc /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit - pub async fn acquire_many_owned(self: Arc, n: u32) -> Result { + pub async fn acquire_many_owned( + self: Arc, + n: u32, + ) -> Result { self.ll_sem.acquire(n).await?; Ok(OwnedSemaphorePermit { sem: self, @@ -232,7 +235,10 @@ impl Semaphore { /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit - pub fn try_acquire_many_owned(self: Arc, n: u32) -> Result { + pub fn try_acquire_many_owned( + self: Arc, + n: u32, + ) -> Result { match self.ll_sem.try_acquire(n) { Ok(_) => Ok(OwnedSemaphorePermit { sem: self, From d9287845553bd157434b8888a412ec6bcebec518 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Fri, 19 Feb 2021 18:26:10 +0100 Subject: [PATCH 3/5] Test acquire_many_owned --- tokio/tests/sync_semaphore_owned.rs | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index 8ed6209f3b9..e92634bd4fb 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -16,6 +16,22 @@ fn try_acquire() { assert!(p3.is_ok()); } +#[test] +fn try_acquire_many() { + let sem = Arc::new(Semaphore::new(42)); + { + let p1 = sem.clone().try_acquire_many_owned(42); + assert!(p1.is_ok()); + let p2 = sem.clone().try_acquire_owned(); + assert!(p2.is_err()); + } + let p3 = sem.clone().try_acquire_many_owned(32); + assert!(p3.is_ok()); + let p4 = sem.clone().try_acquire_many_owned(10); + assert!(p4.is_ok()); + assert!(sem.try_acquire_owned().is_err()); +} + #[tokio::test] async fn acquire() { let sem = Arc::new(Semaphore::new(1)); @@ -28,6 +44,21 @@ async fn acquire() { j.await.unwrap(); } +#[tokio::test] +async fn acquire_many() { + let semaphore = Arc::new(Semaphore::new(42)); + let permit32 = semaphore.clone().try_acquire_many_owned(32).unwrap(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + let join_handle = tokio::spawn(async move { + let _permit10 = semaphore.clone().acquire_many_owned(10).await; + sender.send(()); + let _permit32 = semaphore.acquire_many_owned(32).await; + }); + receiver.await; + drop(permit32); + join_handle.await.unwrap(); +} + #[tokio::test] async fn add_permits() { let sem = Arc::new(Semaphore::new(0)); From ee4db2363a7f901a207b8ded73132901cc19aa61 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Fri, 19 Feb 2021 18:32:31 +0100 Subject: [PATCH 4/5] acquire_owned test: Don't ignore Result type --- tokio/tests/sync_semaphore_owned.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index e92634bd4fb..2c7153ce611 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -54,7 +54,7 @@ async fn acquire_many() { sender.send(()); let _permit32 = semaphore.acquire_many_owned(32).await; }); - receiver.await; + receiver.await.unwrap(); drop(permit32); join_handle.await.unwrap(); } From 4422bd8f0be2a10a9d7e6d7b89bffc084024d5d0 Mon Sep 17 00:00:00 2001 From: Max Bruckner Date: Fri, 19 Feb 2021 18:38:24 +0100 Subject: [PATCH 5/5] acquire_many_owned tests: Unwrap more unused results --- tokio/tests/sync_semaphore_owned.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index 2c7153ce611..478c3a3326e 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -50,9 +50,9 @@ async fn acquire_many() { let permit32 = semaphore.clone().try_acquire_many_owned(32).unwrap(); let (sender, receiver) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { - let _permit10 = semaphore.clone().acquire_many_owned(10).await; - sender.send(()); - let _permit32 = semaphore.acquire_many_owned(32).await; + let _permit10 = semaphore.clone().acquire_many_owned(10).await.unwrap(); + sender.send(()).unwrap(); + let _permit32 = semaphore.acquire_many_owned(32).await.unwrap(); }); receiver.await.unwrap(); drop(permit32);