Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AtomicWaker in mpsc #1317

Merged
merged 2 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 16 additions & 88 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

use futures_core::stream::Stream;
use futures_core::task::{LocalWaker, Waker, Poll};
use futures_core::task::__internal::AtomicWaker;
use std::any::Any;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -291,7 +292,7 @@ struct Inner<T> {
num_senders: AtomicUsize,

// Handle to the receiver's task.
recv_task: Mutex<ReceiverTask>,
recv_task: AtomicWaker,
}

// Struct representation of `Inner::state`.
Expand All @@ -304,18 +305,6 @@ struct State {
num_messages: usize,
}

#[derive(Debug)]
struct ReceiverTask {
unparked: bool,
task: Option<Waker>,
}

// Returned from Receiver::try_park()
enum TryPark {
Parked,
NotEmpty,
}

// The `is_open` flag is stored in the left-most bit of `Inner::state`
const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);

Expand Down Expand Up @@ -394,10 +383,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
message_queue: Queue::new(),
parked_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
recv_task: Mutex::new(ReceiverTask {
unparked: false,
task: None,
}),
recv_task: AtomicWaker::new(),
});

let tx = Sender {
Expand Down Expand Up @@ -512,7 +498,7 @@ impl<T> Sender<T> {

// Signal to the receiver that a message has been enqueued. If the
// receiver is parked, this will unpark the task.
self.signal();
self.inner.recv_task.wake();
}

// Increment the number of queued messages. Returns the resulting number.
Expand Down Expand Up @@ -545,35 +531,6 @@ impl<T> Sender<T> {
}
}

// Signal to the receiver task that a message has been enqueued
fn signal(&self) {
// TODO
// This logic can probably be improved by guarding the lock with an
// atomic.
//
// Do this step first so that the lock is dropped when
// `unpark` is called
let task = {
let mut recv_task = self.inner.recv_task.lock().unwrap();

// If the receiver has already been unparked, then there is nothing
// more to do
if recv_task.unparked {
return;
}

// Setting this flag enables the receiving end to detect that
// an unpark event happened in order to avoid unnecessarily
// parking.
recv_task.unparked = true;
recv_task.task.take()
};

if let Some(task) = task {
task.wake();
}
}

fn park(&mut self, lw: Option<&LocalWaker>) {
// TODO: clean up internal state if the task::current will fail

Expand Down Expand Up @@ -633,7 +590,7 @@ impl<T> Sender<T> {
// that stuff from `do_send`.

self.inner.set_closed();
self.signal();
self.inner.recv_task.wake();
}

fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> {
Expand Down Expand Up @@ -680,7 +637,7 @@ impl<T> UnboundedSender<T> {
/// Closes this channel from the sender side, preventing any new messages.
pub fn close_channel(&self) {
self.0.inner.set_closed();
self.0.signal();
self.0.inner.recv_task.wake();
}

// Do the send without parking current task.
Expand Down Expand Up @@ -847,21 +804,6 @@ impl<T> Receiver<T> {
}
}

// Try to park the receiver task
fn try_park(&self, lw: &LocalWaker) -> TryPark {
// First, track the task in the `recv_task` slot
let mut recv_task = self.inner.recv_task.lock().unwrap();

if recv_task.unparked {
// Consume the `unpark` signal without actually parking
recv_task.unparked = false;
return TryPark::NotEmpty;
}

recv_task.task = Some(lw.clone().into_waker());
TryPark::Parked
}

fn dec_num_messages(&self) {
// OPEN_MASK is highest bit, so it's unaffected by subtraction
// unless there's underflow, and we know there's no underflow
Expand All @@ -880,31 +822,17 @@ impl<T> Stream for Receiver<T> {
mut self: Pin<&mut Self>,
lw: &LocalWaker,
) -> Poll<Option<T>> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Poll::Ready(msg) => msg,
Poll::Pending => {
// There are no messages to read, in this case, attempt to
// park. The act of parking will verify that the channel is
// still empty after the park operation has completed.
match self.try_park(lw) {
TryPark::Parked => {
// The task was parked, and the channel is still
// empty, return Pending.
return Poll::Pending;
}
TryPark::NotEmpty => {
// A message has been sent while attempting to
// park. Loop again, the next iteration is
// guaranteed to get the message.
continue;
}
}
}
};
// Return the message
return Poll::Ready(msg);
match self.next_message() {
Poll::Ready(msg) => Poll::Ready(msg),
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.recv_task.register(lw);
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
self.next_message()
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions futures-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cargo-features = ["rename-dependency"]

[package]
name = "futures-core-preview"
edition = "2018"
Expand All @@ -17,6 +19,10 @@ name = "futures_core"
[features]
default = ["std"]
std = ["either/use_std"]
nightly = []

[dependencies]
either = { version = "1.4", default-features = false, optional = true }

[dev-dependencies]
futures-preview = { path = "../futures", version = "0.3.0-alpha.9" }
1 change: 1 addition & 0 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Core traits and types for asynchronous operations in Rust.

#![feature(pin, arbitrary_self_types, futures_api)]
#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::fmt;
use core::cell::UnsafeCell;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, Release, AcqRel};
use futures_core::task::{LocalWaker, Waker};
use crate::task::{LocalWaker, Waker};

/// A synchronization primitive for task wakeup.
///
Expand Down
10 changes: 10 additions & 0 deletions futures-core/src/task/__internal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[cfg_attr(
feature = "nightly",
cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr"))
)]
mod atomic_waker;
#[cfg_attr(
feature = "nightly",
cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr"))
)]
pub use self::atomic_waker::AtomicWaker;
2 changes: 2 additions & 0 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Task notification.

mod spawn;
#[doc(hidden)]
pub mod __internal;
pub use self::spawn::{Spawn, LocalSpawn, SpawnError};

pub use core::task::{Poll, Waker, LocalWaker, UnsafeWake};
Expand Down
7 changes: 1 addition & 6 deletions futures-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,4 @@ pub use self::local_waker_ref::{local_waker_ref, local_waker_ref_from_nonlocal,
feature = "nightly",
cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr"))
)]
mod atomic_waker;
#[cfg_attr(
feature = "nightly",
cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr"))
)]
pub use self::atomic_waker::AtomicWaker;
pub use futures_core::task::__internal::AtomicWaker;
2 changes: 1 addition & 1 deletion futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ futures-test-preview = { path = "../futures-test", version = "0.3.0-alpha.9", de
tokio = "0.1.11"

[features]
nightly = ["futures-util-preview/nightly"]
nightly = ["futures-util-preview/nightly", "futures-core-preview/nightly"]
std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"]
default = ["std"]
compat = ["std", "futures-util-preview/compat"]
Expand Down