diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 20d16a1dc7..4dd54fbe48 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -80,6 +80,7 @@ use futures_core::stream::Stream; use futures_core::task::{LocalWaker, Waker, Poll}; +use futures_core::task::__internal::AtomicWaker; use std::any::Any; use std::error::Error; use std::fmt; @@ -291,7 +292,7 @@ struct Inner { num_senders: AtomicUsize, // Handle to the receiver's task. - recv_task: Mutex, + recv_task: AtomicWaker, } // Struct representation of `Inner::state`. @@ -304,18 +305,6 @@ struct State { num_messages: usize, } -#[derive(Debug)] -struct ReceiverTask { - unparked: bool, - task: Option, -} - -// Returned from Receiver::try_park() -enum TryPark { - Parked, - NotEmpty, -} - // The `is_open` flag is stored in the left-most bit of `Inner::state` const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); @@ -394,10 +383,7 @@ fn channel2(buffer: Option) -> (Sender, Receiver) { message_queue: Queue::new(), parked_queue: Queue::new(), num_senders: AtomicUsize::new(1), - recv_task: Mutex::new(ReceiverTask { - unparked: false, - task: None, - }), + recv_task: AtomicWaker::new(), }); let tx = Sender { @@ -512,7 +498,7 @@ impl Sender { // Signal to the receiver that a message has been enqueued. If the // receiver is parked, this will unpark the task. - self.signal(); + self.inner.recv_task.wake(); } // Increment the number of queued messages. Returns the resulting number. @@ -545,35 +531,6 @@ impl Sender { } } - // Signal to the receiver task that a message has been enqueued - fn signal(&self) { - // TODO - // This logic can probably be improved by guarding the lock with an - // atomic. - // - // Do this step first so that the lock is dropped when - // `unpark` is called - let task = { - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - // If the receiver has already been unparked, then there is nothing - // more to do - if recv_task.unparked { - return; - } - - // Setting this flag enables the receiving end to detect that - // an unpark event happened in order to avoid unnecessarily - // parking. - recv_task.unparked = true; - recv_task.task.take() - }; - - if let Some(task) = task { - task.wake(); - } - } - fn park(&mut self, lw: Option<&LocalWaker>) { // TODO: clean up internal state if the task::current will fail @@ -633,7 +590,7 @@ impl Sender { // that stuff from `do_send`. self.inner.set_closed(); - self.signal(); + self.inner.recv_task.wake(); } fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> { @@ -680,7 +637,7 @@ impl UnboundedSender { /// Closes this channel from the sender side, preventing any new messages. pub fn close_channel(&self) { self.0.inner.set_closed(); - self.0.signal(); + self.0.inner.recv_task.wake(); } // Do the send without parking current task. @@ -847,21 +804,6 @@ impl Receiver { } } - // Try to park the receiver task - fn try_park(&self, lw: &LocalWaker) -> TryPark { - // First, track the task in the `recv_task` slot - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - if recv_task.unparked { - // Consume the `unpark` signal without actually parking - recv_task.unparked = false; - return TryPark::NotEmpty; - } - - recv_task.task = Some(lw.clone().into_waker()); - TryPark::Parked - } - fn dec_num_messages(&self) { // OPEN_MASK is highest bit, so it's unaffected by subtraction // unless there's underflow, and we know there's no underflow @@ -880,31 +822,17 @@ impl Stream for Receiver { mut self: Pin<&mut Self>, lw: &LocalWaker, ) -> Poll> { - loop { // Try to read a message off of the message queue. - let msg = match self.next_message() { - Poll::Ready(msg) => msg, - Poll::Pending => { - // There are no messages to read, in this case, attempt to - // park. The act of parking will verify that the channel is - // still empty after the park operation has completed. - match self.try_park(lw) { - TryPark::Parked => { - // The task was parked, and the channel is still - // empty, return Pending. - return Poll::Pending; - } - TryPark::NotEmpty => { - // A message has been sent while attempting to - // park. Loop again, the next iteration is - // guaranteed to get the message. - continue; - } - } - } - }; - // Return the message - return Poll::Ready(msg); + match self.next_message() { + Poll::Ready(msg) => Poll::Ready(msg), + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.recv_task.register(lw); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } } } }