Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_closed, is_empty and len to mpsc::Receiver and mpsc::UnboundedReceiver #6348

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ impl<T> Block<T> {
Some(Read::Value(value.assume_init()))
}

/// Returns true if the sender half of the list is closed and there is no
/// value in the slot to be consumed
///
/// # Safety
///
/// To maintain safety, the caller must ensure:
///
/// * No concurrent access to the slot.
pub(crate) fn is_closed_and_empty(&self, slot_index: usize) -> bool {
let offset = offset(slot_index);
let ready_bits = self.header.ready_slots.load(Acquire);
!is_ready(ready_bits, offset) && is_tx_closed(ready_bits)
}

/// Writes a value to the block at the given offset.
///
/// # Safety
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,42 @@ impl<T> Receiver<T> {
self.chan.close();
}

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. The channel is closed
/// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
///
/// [`Sender`]: crate::sync::mpsc::Sender
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(10);
/// assert!(!rx.is_closed());
///
/// tx.send(100).await.unwrap();
/// tx.send(200).await.unwrap();
///
/// rx.close();
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(100));
///
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(200));
/// assert!(rx.recv().await.is_none());
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&mut self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% happy with this name, since in the end it also checks for outstanding messages in channel buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was going through this one more time and I realized that you may be right here. Currently, it's possible to have a situation where Sender::is_closed returns true but Receiver::is_closed() returns false. That's probably not desirable. It probably makes more sense to change it to match the existing method on the sender.

Of course, another option is to rename this method to reflect its actual behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's possible to have a situation where Sender::is_closed returns true but Receiver::is_closed() returns false. That's probably not desirable. It probably makes more sense to change it to match the existing method on the sender.

Yeah, I totally agree with this. I think the is_closed should return true if close was called or all the senders are dropped, regardless of having any messages available in the channel.

I believe it would be a good idea to provide an additional function, maybe is_empty or has_messages, that checks if there are any messages available in the channel, this way the user would be able to check both conditions with rx.is_closed() && rx.is_empty().

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable to me. Adding an is_empty method could make even more sense together with adding a len method (which would close #6314). After all, clippy emits a warning if you add len without is_empty.

But if you only want to add is_empty, that's also okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the is_empty and len functions.

The len implementation only checks for the tail_position. I am not sure if it is necessary to check the ready bits for valid values in each position, since that would be relatively costly because of the linked list implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, regarding the ready bits, I guess the main question here is concurrency-related. For example, imagine if you have the following two threads running in parallel:

sender.send("message1").await;
assert!(receiver.len() != 0); // or similar with `is_empty`
sender.send("message2").await;

Then, depending on how you implement len, it's possible that the assertion could fail. This would happen if message1 ends up being stored after message2 in the channel queue, but when the assert runs, message2 is has not yet been fully sent.

We once had a similar bug with try_recv. See #1997. Basically, this assert could fail:

sender.send("message1").await;
assert!(receiver.try_recv().is_some());
sender.send("message2").await;

The assert would sometimes fail when message2 ends up before message1 in the queue. Here, try_recv can't return message1 because we guarantee that messages are returned in order. It also can't return message2 because the call to send("message2") is still running. So it returns None instead, even though we know that a message has been successfully sent. When we fixed it in #4113, we did so by having try_recv sleep until send("message2") finished when there are fully sent messages later in the queue.

To test this, you can add these loom tests:

// tokio/src/sync/tests/loom_mpsc.rs
#[test]
fn len_nonzero_after_send() {
    loom::model(|| {
        let (send, recv) = mpsc::unbounded_channel();
	let send2 = send.clone();

	let join = thread::spawn(move || {
	    block_on(send2.send("message2")).unwrap();
	});

        block_on(send.send("message1")).unwrap();
        assert!(recv.len() != 0);

        join.join().unwrap();
    });
}

#[test]
fn nonempty_after_send() {
    loom::model(|| {
        let (send, recv) = mpsc::unbounded_channel();
	let send2 = send.clone();

	let join = thread::spawn(move || {
	    block_on(send2.send("message2")).unwrap();
	});

        block_on(send.send("message1")).unwrap();
        assert!(!recv.is_empty());

        join.join().unwrap();
    });
}

I don't know whether these tests will fail, but if they do, please look at try_recv to see how it determines whether it should sleep or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both tests failed. I am looking on how to fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added both loom tests and fixed the is_empty implementation.

About len_nonzero_after_send test, I think the len implementation was already correct, but I ran the wrong test initially.
At first I had written the assertion assert!(recv.len() == 2), which should fail since the join() is after the assert!.

With the assert!(recv.len() != 0); assertion, the test passes

self.chan.is_closed_and_empty()
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,24 @@ impl<T, S: Semaphore> Rx<T, S> {
self.inner.notify_rx_closed.notify_waiters();
}

pub(crate) fn is_closed_and_empty(&mut self) -> bool {
// There two internal states that can represent a closed channel that has no messages
//
// 1. When `close` is called.
// In this case, the inner semaphore will be closed, and the channel has no
// outstanding messages if the list is at the tail position.
//
// 2. When all senders are dropped.
// In this case, the semaphore remains unclosed, and the `index` in the list won't
// reach the tail position. It is necessary to check the list if the last block is
// `closed` and has no value available.
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
(self.inner.semaphore.is_closed() && rx_fields.list.is_at_tail_position(&self.inner.tx))
|| rx_fields.list.is_closed_and_empty(&self.inner.tx)
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
})
}

/// Receive the next value
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read;
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,23 @@ impl<T> fmt::Debug for Tx<T> {
}

impl<T> Rx<T> {
pub(crate) fn is_closed_and_empty(&mut self, tx: &Tx<T>) -> bool {
// Advance `head`, if needed
if self.try_advancing_head() {
self.reclaim_blocks(tx);
}

unsafe {
let block = self.head.as_ref();
block.is_closed_and_empty(self.index)
}
}

pub(crate) fn is_at_tail_position(&mut self, tx: &Tx<T>) -> bool {
let tail_position = tx.tail_position.load(Acquire);
tail_position == self.index
}

/// Pops the next value off the queue.
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,42 @@ impl<T> UnboundedReceiver<T> {
self.chan.close();
}

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. The channel is closed
/// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
///
/// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
/// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::unbounded_channel();
/// assert!(!rx.is_closed());
///
/// let _ = tx.send(100);
/// let _ = tx.send(200);
///
/// rx.close();
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(100));
///
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(200));
/// assert!(rx.recv().await.is_none());
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&mut self) -> bool {
self.chan.is_closed_and_empty()
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
Expand Down
77 changes: 77 additions & 0 deletions tokio/tests/sync_mpsc.rs
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot of tests! That's awesome.

Original file line number Diff line number Diff line change
Expand Up @@ -1017,4 +1017,81 @@ async fn test_tx_capacity() {
assert_eq!(tx.max_capacity(), 10);
}

#[tokio::test]
async fn test_rx_is_closed_when_calling_close_with_sender() {
// is_closed should return true after calling close but still has a sender
let (_tx, mut rx) = mpsc::channel::<()>(10);
rx.close();

assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_closed_when_dropping_all_senders() {
// is_closed should return true after dropping all senders
let (tx, mut rx) = mpsc::channel::<()>(10);
let another_tx = tx.clone();
let task = tokio::spawn(async move {
drop(another_tx);
});

drop(tx);
let _ = task.await;

assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_senders() {
// is_closed should return false when there is a sender
let (_tx, mut rx) = mpsc::channel::<()>(10);
assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() {
// is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_messages_but_not_senders() {
// is_closed should return false when there is a permit (but no senders)
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
drop(tx);
assert!(!rx.is_closed());
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_messages_and_close_is_called() {
// is_closed should return false when there is a permit (but no senders)
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
rx.close();
assert!(!rx.is_closed());
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test]
async fn test_rx_is_closed_after_consuming_messages() {
// is_closed should return false when there is a permit (but no senders)
let (tx, mut rx) = mpsc::channel(10);
for i in 0..10 {
assert!(tx.send(i).await.is_ok());
}
drop(tx);

assert!(!rx.is_closed());
while (rx.recv().await).is_some() {}
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
assert!(rx.is_closed());
}

fn is_debug<T: fmt::Debug>(_: &T) {}
9 changes: 9 additions & 0 deletions tokio/tests/sync_mpsc_weak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,12 @@ fn test_tx_count_weak_unbounded_sender() {

assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
}

#[tokio::test]
async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() {
// is_closed should return true after dropping all senders except for a weak sender
let (tx, mut rx) = mpsc::channel::<()>(10);
let _weak_sender = tx.clone().downgrade();
drop(tx);
assert!(rx.is_closed());
}
Loading