Skip to content

Commit

Permalink
Merge pull request #33 from smol-rs/send-recv-types
Browse files Browse the repository at this point in the history
Add `Send` and `Recv` futures
  • Loading branch information
yoshuawuyts authored Feb 17, 2021
2 parents fba5d5b + 5fea266 commit a9787b9
Showing 1 changed file with 114 additions and 31 deletions.
145 changes: 114 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,37 +526,10 @@ impl<T> Receiver<T> {
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub async fn recv(&self) -> Result<T, RecvError> {
let mut listener = None;

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// If the capacity is larger than 1, notify another blocked receive operation.
// There is no need to notify stream operations because all of them get
// notified every time a message is sent into the channel.
match self.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => self.channel.recv_ops.notify(1),
}
return Ok(msg);
}
Err(TryRecvError::Closed) => return Err(RecvError),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match listener.take() {
None => {
// Start listening and then try receiving again.
listener = Some(self.channel.recv_ops.listen());
}
Some(l) => {
// Wait for a notification.
l.await;
}
}
pub fn recv(&self) -> Recv<'_, T> {
Recv {
receiver: self,
listener: None,
}
}

Expand Down Expand Up @@ -934,3 +907,113 @@ impl fmt::Display for TryRecvError {
}
}
}

/// A future returned by [`Sender::send()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}

impl<'a, T> Unpin for Send<'a, T> {}

impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match this.sender.try_send(msg) {
Ok(()) => {
// If the capacity is larger than 1, notify another blocked send operation.
match this.sender.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => this.sender.channel.send_ops.notify(1),
}
return Poll::Ready(Ok(()));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = Some(this.sender.channel.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}

Poll::Pending => return Poll::Pending,
}
}
}
}
}
}

/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
}

impl<'a, T> Unpin for Recv<'a, T> {}

impl<'a, T> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
Ok(msg) => {
// If the capacity is larger than 1, notify another blocked receive operation.
// There is no need to notify stream operations because all of them get
// notified every time a message is sent into the channel.
match this.receiver.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => this.receiver.channel.recv_ops.notify(1),
}
return Poll::Ready(Ok(msg));
}
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = Some(this.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}

Poll::Pending => return Poll::Pending,
}
}
}
}
}
}

0 comments on commit a9787b9

Please sign in to comment.