From ede84ad95c503186abf09f7c7a2f5baead273ae8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 17 Feb 2021 18:17:07 +0100 Subject: [PATCH 1/2] implement the Recv future --- Cargo.toml | 1 + src/lib.rs | 98 ++++++++++++++++++++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f93193..46985d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories = ["asynchronous", "concurrency"] concurrent-queue = "1.2.2" event-listener = "2.4.0" futures-core = "0.3.5" +pin-project-lite = "0.2.4" [dev-dependencies] blocking = "1" diff --git a/src/lib.rs b/src/lib.rs index ba1b439..acc717f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,7 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; use futures_core::stream::Stream; +use pin_project_lite::pin_project; struct Channel { /// Inner message queue. @@ -526,38 +527,8 @@ impl Receiver { /// assert_eq!(r.recv().await, Err(RecvError)); /// # }); /// ``` - pub async fn recv(&self) -> Result { - let mut listener = None; - - loop { - // Attempt to receive a message. - match self.try_recv() { - Ok(msg) => { - // If the capacity is larger than 1, notify another blocked receive operation. - // There is no need to notify stream operations because all of them get - // notified every time a message is sent into the channel. - match self.channel.queue.capacity() { - Some(1) => {} - Some(_) | None => self.channel.recv_ops.notify(1), - } - return Ok(msg); - } - Err(TryRecvError::Closed) => return Err(RecvError), - Err(TryRecvError::Empty) => {} - } - - // Receiving failed - now start listening for notifications or wait for one. - match listener.take() { - None => { - // Start listening and then try receiving again. - listener = Some(self.channel.recv_ops.listen()); - } - Some(l) => { - // Wait for a notification. - l.await; - } - } - } + pub fn recv(&self) -> Recv<'_, T> { + Recv::new(self) } /// Closes the channel. @@ -934,3 +905,66 @@ impl fmt::Display for TryRecvError { } } } + +pin_project! { + /// A future returned by [`Receiver::recv()`]. + #[must_use = "futures do nothing unless .awaited"] + pub struct Recv<'a, T> { + receiver: &'a Receiver, + listener: Option, + } +} + +impl<'a, T> Recv<'a, T> { + fn new(receiver: &'a Receiver) -> Self { + Self { + receiver, + listener: None, + } + } +} + +impl<'a, T> Future for Recv<'a, T> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + // Attempt to receive a message. + match this.receiver.try_recv() { + Ok(msg) => { + // If the capacity is larger than 1, notify another blocked receive operation. + // There is no need to notify stream operations because all of them get + // notified every time a message is sent into the channel. + match this.receiver.channel.queue.capacity() { + Some(1) => {} + Some(_) | None => this.receiver.channel.recv_ops.notify(1), + } + return Poll::Ready(Ok(msg)); + } + Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + match &mut this.listener { + None => { + // Start listening and then try receiving again. + *this.listener = Some(this.receiver.channel.recv_ops.listen()); + } + Some(l) => { + // Wait for a notification. + match dbg!(Pin::new(l).poll(cx)) { + Poll::Ready(_) => { + *this.listener = None; + continue; + } + + Poll::Pending => return Poll::Pending, + } + } + } + } + } +} From 5fea2669a1fb80f39e052653d42369db83652303 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 17 Feb 2021 18:29:02 +0100 Subject: [PATCH 2/2] Implement the Send future --- Cargo.toml | 1 - src/lib.rs | 85 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46985d4..0f93193 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ categories = ["asynchronous", "concurrency"] concurrent-queue = "1.2.2" event-listener = "2.4.0" futures-core = "0.3.5" -pin-project-lite = "0.2.4" [dev-dependencies] blocking = "1" diff --git a/src/lib.rs b/src/lib.rs index acc717f..b1b4254 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,7 +41,6 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; use futures_core::stream::Stream; -use pin_project_lite::pin_project; struct Channel { /// Inner message queue. @@ -528,7 +527,10 @@ impl Receiver { /// # }); /// ``` pub fn recv(&self) -> Recv<'_, T> { - Recv::new(self) + Recv { + receiver: self, + listener: None, + } } /// Closes the channel. @@ -906,29 +908,76 @@ impl fmt::Display for TryRecvError { } } -pin_project! { - /// A future returned by [`Receiver::recv()`]. - #[must_use = "futures do nothing unless .awaited"] - pub struct Recv<'a, T> { - receiver: &'a Receiver, - listener: Option, - } +/// A future returned by [`Sender::send()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless .awaited"] +pub struct Send<'a, T> { + sender: &'a Sender, + listener: Option, + msg: Option, } -impl<'a, T> Recv<'a, T> { - fn new(receiver: &'a Receiver) -> Self { - Self { - receiver, - listener: None, +impl<'a, T> Unpin for Send<'a, T> {} + +impl<'a, T> Future for Send<'a, T> { + type Output = Result<(), SendError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = Pin::new(self); + + loop { + let msg = this.msg.take().unwrap(); + // Attempt to send a message. + match this.sender.try_send(msg) { + Ok(()) => { + // If the capacity is larger than 1, notify another blocked send operation. + match this.sender.channel.queue.capacity() { + Some(1) => {} + Some(_) | None => this.sender.channel.send_ops.notify(1), + } + return Poll::Ready(Ok(())); + } + Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), + Err(TrySendError::Full(m)) => this.msg = Some(m), + } + + // Sending failed - now start listening for notifications or wait for one. + match &mut this.listener { + None => { + // Start listening and then try receiving again. + this.listener = Some(this.sender.channel.send_ops.listen()); + } + Some(l) => { + // Wait for a notification. + match Pin::new(l).poll(cx) { + Poll::Ready(_) => { + this.listener = None; + continue; + } + + Poll::Pending => return Poll::Pending, + } + } + } } } } +/// A future returned by [`Receiver::recv()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless .awaited"] +pub struct Recv<'a, T> { + receiver: &'a Receiver, + listener: Option, +} + +impl<'a, T> Unpin for Recv<'a, T> {} + impl<'a, T> Future for Recv<'a, T> { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let mut this = Pin::new(self); loop { // Attempt to receive a message. @@ -951,13 +1000,13 @@ impl<'a, T> Future for Recv<'a, T> { match &mut this.listener { None => { // Start listening and then try receiving again. - *this.listener = Some(this.receiver.channel.recv_ops.listen()); + this.listener = Some(this.receiver.channel.recv_ops.listen()); } Some(l) => { // Wait for a notification. - match dbg!(Pin::new(l).poll(cx)) { + match Pin::new(l).poll(cx) { Poll::Ready(_) => { - *this.listener = None; + this.listener = None; continue; }