Skip to content

Commit

Permalink
sync: add {try_,}acquire_many_owned to Semaphore (#3535)
Browse files Browse the repository at this point in the history
  • Loading branch information
FSMaxB authored Mar 1, 2021
1 parent fd93ecf commit fc23f8a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
48 changes: 47 additions & 1 deletion tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Semaphore {
}
}

/// Tries to acquire n permits from the semaphore.
/// Tries to acquire `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
Expand Down Expand Up @@ -180,6 +180,27 @@ impl Semaphore {
})
}

/// Acquires `n` permits from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// [`Arc`]: std::sync::Arc
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub async fn acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, AcquireError> {
self.ll_sem.acquire(n).await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: n,
})
}

/// Tries to acquire a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
Expand All @@ -202,6 +223,31 @@ impl Semaphore {
}
}

/// Tries to acquire `n` permits from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
/// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left.
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub fn try_acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, TryAcquireError> {
match self.ll_sem.try_acquire(n) {
Ok(_) => Ok(OwnedSemaphorePermit {
sem: self,
permits: n,
}),
Err(e) => Err(e),
}
}

/// Closes the semaphore.
///
/// This prevents the semaphore from issuing new permits and notifies all pending waiters.
Expand Down
31 changes: 31 additions & 0 deletions tokio/tests/sync_semaphore_owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ fn try_acquire() {
assert!(p3.is_ok());
}

#[test]
fn try_acquire_many() {
let sem = Arc::new(Semaphore::new(42));
{
let p1 = sem.clone().try_acquire_many_owned(42);
assert!(p1.is_ok());
let p2 = sem.clone().try_acquire_owned();
assert!(p2.is_err());
}
let p3 = sem.clone().try_acquire_many_owned(32);
assert!(p3.is_ok());
let p4 = sem.clone().try_acquire_many_owned(10);
assert!(p4.is_ok());
assert!(sem.try_acquire_owned().is_err());
}

#[tokio::test]
async fn acquire() {
let sem = Arc::new(Semaphore::new(1));
Expand All @@ -28,6 +44,21 @@ async fn acquire() {
j.await.unwrap();
}

#[tokio::test]
async fn acquire_many() {
let semaphore = Arc::new(Semaphore::new(42));
let permit32 = semaphore.clone().try_acquire_many_owned(32).unwrap();
let (sender, receiver) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let _permit10 = semaphore.clone().acquire_many_owned(10).await.unwrap();
sender.send(()).unwrap();
let _permit32 = semaphore.acquire_many_owned(32).await.unwrap();
});
receiver.await.unwrap();
drop(permit32);
join_handle.await.unwrap();
}

#[tokio::test]
async fn add_permits() {
let sem = Arc::new(Semaphore::new(0));
Expand Down

0 comments on commit fc23f8a

Please sign in to comment.