Skip to content

Commit

Permalink
Use AtomicWaker in mpsc
Browse files Browse the repository at this point in the history
AFAIU AtomicWaker is a perfect candidate to use in mpsc receiver.

This patch is good because:
* it enabled `AtomicWaker` dogfooding
* it is a good example of `AtomicWaker` proper (I hope) usage
* it kills overcomplicated logic of `ReceiverTask`
  • Loading branch information
stepancheg committed Nov 10, 2018
1 parent bf092b3 commit 3fb3baf
Showing 1 changed file with 16 additions and 88 deletions.
104 changes: 16 additions & 88 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

use futures_core::stream::Stream;
use futures_core::task::{LocalWaker, Waker, Poll};
use futures_core::task::__internal::AtomicWaker;
use std::any::Any;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -291,7 +292,7 @@ struct Inner<T> {
num_senders: AtomicUsize,

// Handle to the receiver's task.
recv_task: Mutex<ReceiverTask>,
recv_task: AtomicWaker,
}

// Struct representation of `Inner::state`.
Expand All @@ -304,18 +305,6 @@ struct State {
num_messages: usize,
}

#[derive(Debug)]
struct ReceiverTask {
unparked: bool,
task: Option<Waker>,
}

// Returned from Receiver::try_park()
enum TryPark {
Parked,
NotEmpty,
}

// The `is_open` flag is stored in the left-most bit of `Inner::state`
const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);

Expand Down Expand Up @@ -394,10 +383,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
message_queue: Queue::new(),
parked_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
recv_task: Mutex::new(ReceiverTask {
unparked: false,
task: None,
}),
recv_task: AtomicWaker::new(),
});

let tx = Sender {
Expand Down Expand Up @@ -512,7 +498,7 @@ impl<T> Sender<T> {

// Signal to the receiver that a message has been enqueued. If the
// receiver is parked, this will unpark the task.
self.signal();
self.inner.recv_task.wake();
}

// Increment the number of queued messages. Returns the resulting number.
Expand Down Expand Up @@ -545,35 +531,6 @@ impl<T> Sender<T> {
}
}

// Signal to the receiver task that a message has been enqueued
fn signal(&self) {
// TODO
// This logic can probably be improved by guarding the lock with an
// atomic.
//
// Do this step first so that the lock is dropped when
// `unpark` is called
let task = {
let mut recv_task = self.inner.recv_task.lock().unwrap();

// If the receiver has already been unparked, then there is nothing
// more to do
if recv_task.unparked {
return;
}

// Setting this flag enables the receiving end to detect that
// an unpark event happened in order to avoid unnecessarily
// parking.
recv_task.unparked = true;
recv_task.task.take()
};

if let Some(task) = task {
task.wake();
}
}

fn park(&mut self, lw: Option<&LocalWaker>) {
// TODO: clean up internal state if the task::current will fail

Expand Down Expand Up @@ -633,7 +590,7 @@ impl<T> Sender<T> {
// that stuff from `do_send`.

self.inner.set_closed();
self.signal();
self.inner.recv_task.wake();
}

fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> {
Expand Down Expand Up @@ -680,7 +637,7 @@ impl<T> UnboundedSender<T> {
/// Closes this channel from the sender side, preventing any new messages.
pub fn close_channel(&self) {
self.0.inner.set_closed();
self.0.signal();
self.0.inner.recv_task.wake();
}

// Do the send without parking current task.
Expand Down Expand Up @@ -847,21 +804,6 @@ impl<T> Receiver<T> {
}
}

// Try to park the receiver task
fn try_park(&self, lw: &LocalWaker) -> TryPark {
// First, track the task in the `recv_task` slot
let mut recv_task = self.inner.recv_task.lock().unwrap();

if recv_task.unparked {
// Consume the `unpark` signal without actually parking
recv_task.unparked = false;
return TryPark::NotEmpty;
}

recv_task.task = Some(lw.clone().into_waker());
TryPark::Parked
}

fn dec_num_messages(&self) {
// OPEN_MASK is highest bit, so it's unaffected by subtraction
// unless there's underflow, and we know there's no underflow
Expand All @@ -880,31 +822,17 @@ impl<T> Stream for Receiver<T> {
mut self: Pin<&mut Self>,
lw: &LocalWaker,
) -> Poll<Option<T>> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Poll::Ready(msg) => msg,
Poll::Pending => {
// There are no messages to read, in this case, attempt to
// park. The act of parking will verify that the channel is
// still empty after the park operation has completed.
match self.try_park(lw) {
TryPark::Parked => {
// The task was parked, and the channel is still
// empty, return Pending.
return Poll::Pending;
}
TryPark::NotEmpty => {
// A message has been sent while attempting to
// park. Loop again, the next iteration is
// guaranteed to get the message.
continue;
}
}
}
};
// Return the message
return Poll::Ready(msg);
match self.next_message() {
Poll::Ready(msg) => Poll::Ready(msg),
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.recv_task.register(lw);
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
self.next_message()
}
}
}
}
Expand Down

0 comments on commit 3fb3baf

Please sign in to comment.