Skip to content

Commit

Permalink
Implement the Send future
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Feb 17, 2021
1 parent ede84ad commit 2d847cc
Showing 1 changed file with 60 additions and 10 deletions.
70 changes: 60 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,10 @@ impl<T> Receiver<T> {
/// # });
/// ```
pub fn recv(&self) -> Recv<'_, T> {
Recv::new(self)
Recv {
receiver: self,
listener: None,
}
}

/// Closes the channel.
Expand Down Expand Up @@ -907,23 +910,70 @@ impl fmt::Display for TryRecvError {
}

pin_project! {
/// A future returned by [`Receiver::recv()`].
/// A future returned by [`Sender::send()`].
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a Receiver<T>,
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
}

impl<'a, T> Recv<'a, T> {
fn new(receiver: &'a Receiver<T>) -> Self {
Self {
receiver,
listener: None,
impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
loop {
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match this.sender.try_send(msg) {
Ok(()) => {
// If the capacity is larger than 1, notify another blocked send operation.
match this.sender.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => this.sender.channel.send_ops.notify(1),
}
return Poll::Ready(Ok(()));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
*this.listener = Some(this.sender.channel.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
*this.listener = None;
continue;
}

Poll::Pending => return Poll::Pending,
}
}
}
}
}
}
}

pin_project! {
/// A future returned by [`Receiver::recv()`].
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
}
}

impl<'a, T> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;

Expand Down Expand Up @@ -955,7 +1005,7 @@ impl<'a, T> Future for Recv<'a, T> {
}
Some(l) => {
// Wait for a notification.
match dbg!(Pin::new(l).poll(cx)) {
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
*this.listener = None;
continue;
Expand Down

0 comments on commit 2d847cc

Please sign in to comment.