Skip to content

Commit

Permalink
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
Browse files Browse the repository at this point in the history
  • Loading branch information
Owen-CH-Leung authored Jan 1, 2024
1 parent 02b779e commit 7341004
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 4 deletions.
81 changes: 79 additions & 2 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ impl<T> Receiver<T> {
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
Expand All @@ -475,6 +475,83 @@ impl<T> Receiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::Receiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(32);
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).await.unwrap();
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
81 changes: 79 additions & 2 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ impl<T> UnboundedReceiver<T> {
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
Expand All @@ -354,6 +354,83 @@ impl<T> UnboundedReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::UnboundedReceiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel::<i32>();
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).expect("Unable to send integer");
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> UnboundedSender<T> {
Expand Down

0 comments on commit 7341004

Please sign in to comment.