Skip to content

Commit

Permalink
sync: name mpsc semaphore types (#5146)
Browse files Browse the repository at this point in the history
Make code easier to read. No functional/perf changes.
  • Loading branch information
stepancheg authored Oct 30, 2022
1 parent 620880f commit 15b362d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
23 changes: 15 additions & 8 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ pub struct Receiver<T> {
#[track_caller]
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (semaphore::Semaphore::new(buffer), buffer);
let semaphore = Semaphore {
semaphore: semaphore::Semaphore::new(buffer),
bound: buffer,
};
let (tx, rx) = chan::channel(semaphore);

let tx = Sender::new(tx);
Expand All @@ -154,7 +157,11 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {

/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
/// representing the channel bound.
type Semaphore = (semaphore::Semaphore, usize);
#[derive(Debug)]
pub(crate) struct Semaphore {
pub(crate) semaphore: semaphore::Semaphore,
pub(crate) bound: usize,
}

impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Expand Down Expand Up @@ -572,7 +579,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
match self.chan.semaphore().0.try_acquire(1) {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
Expand Down Expand Up @@ -852,7 +859,7 @@ impl<T> Sender<T> {
}

async fn reserve_inner(&self) -> Result<(), SendError<()>> {
match self.chan.semaphore().0.acquire(1).await {
match self.chan.semaphore().semaphore.acquire(1).await {
Ok(_) => Ok(()),
Err(_) => Err(SendError(())),
}
Expand Down Expand Up @@ -902,7 +909,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().0.try_acquire(1) {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
Expand Down Expand Up @@ -967,7 +974,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
match self.chan.semaphore().0.try_acquire(1) {
match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
Expand Down Expand Up @@ -1028,7 +1035,7 @@ impl<T> Sender<T> {
/// [`channel`]: channel
/// [`max_capacity`]: Sender::max_capacity
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
self.chan.semaphore().semaphore.available_permits()
}

/// Converts the `Sender` to a [`WeakSender`] that does not count
Expand Down Expand Up @@ -1074,7 +1081,7 @@ impl<T> Sender<T> {
/// [`max_capacity`]: Sender::max_capacity
/// [`capacity`]: Sender::capacity
pub fn max_capacity(&self) -> usize {
self.chan.semaphore().1
self.chan.semaphore().bound
}
}

Expand Down
23 changes: 11 additions & 12 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::park::thread::CachedParkThread;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
use crate::sync::mpsc::{bounded, list, unbounded};
use crate::sync::notify::Notify;

use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Channel sender.
pub(crate) struct Tx<T, S> {
Expand Down Expand Up @@ -382,29 +381,29 @@ impl<T, S> Drop for Chan<T, S> {

// ===== impl Semaphore for (::Semaphore, capacity) =====

impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
impl Semaphore for bounded::Semaphore {
fn add_permit(&self) {
self.0.release(1)
self.semaphore.release(1)
}

fn is_idle(&self) -> bool {
self.0.available_permits() == self.1
self.semaphore.available_permits() == self.bound
}

fn close(&self) {
self.0.close();
self.semaphore.close();
}

fn is_closed(&self) -> bool {
self.0.is_closed()
self.semaphore.is_closed()
}
}

// ===== impl Semaphore for AtomicUsize =====

impl Semaphore for AtomicUsize {
impl Semaphore for unbounded::Semaphore {
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
let prev = self.0.fetch_sub(2, Release);

if prev >> 1 == 0 {
// Something went wrong
Expand All @@ -413,14 +412,14 @@ impl Semaphore for AtomicUsize {
}

fn is_idle(&self) -> bool {
self.load(Acquire) >> 1 == 0
self.0.load(Acquire) >> 1 == 0
}

fn close(&self) {
self.fetch_or(1, Release);
self.0.fetch_or(1, Release);
}

fn is_closed(&self) -> bool {
self.load(Acquire) & 1 == 1
self.0.load(Acquire) & 1 == 1
}
}
8 changes: 5 additions & 3 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<T> fmt::Debug for UnboundedReceiver<T> {
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = chan::channel(AtomicUsize::new(0));
let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));

let tx = UnboundedSender::new(tx);
let rx = UnboundedReceiver::new(rx);
Expand All @@ -70,7 +70,8 @@ pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
}

/// No capacity
type Semaphore = AtomicUsize;
#[derive(Debug)]
pub(crate) struct Semaphore(pub(crate) AtomicUsize);

impl<T> UnboundedReceiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
Expand Down Expand Up @@ -279,7 +280,7 @@ impl<T> UnboundedSender<T> {
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Acquire};

let mut curr = self.chan.semaphore().load(Acquire);
let mut curr = self.chan.semaphore().0.load(Acquire);

loop {
if curr & 1 == 1 {
Expand All @@ -295,6 +296,7 @@ impl<T> UnboundedSender<T> {
match self
.chan
.semaphore()
.0
.compare_exchange(curr, curr + 2, AcqRel, Acquire)
{
Ok(_) => return true,
Expand Down

0 comments on commit 15b362d

Please sign in to comment.