Skip to content

Commit

Permalink
Merge pull request #763 from aturon/exec-rfc
Browse files Browse the repository at this point in the history
implement the executor RFC
  • Loading branch information
aturon authored Feb 16, 2018
2 parents a11e78a + 4b562a1 commit eb9f603
Show file tree
Hide file tree
Showing 47 changed files with 1,260 additions and 2,117 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ matrix:
- cargo build --manifest-path futures-core/Cargo.toml --no-default-features
- cargo build --manifest-path futures/Cargo.toml --no-default-features
- cargo build --manifest-path futures-channel/Cargo.toml --no-default-features
- cargo build --manifest-path futures-cpupool/Cargo.toml --no-default-features
- cargo build --manifest-path futures-executor/Cargo.toml --no-default-features
- cargo build --manifest-path futures-sink/Cargo.toml --no-default-features
- cargo build --manifest-path futures-util/Cargo.toml --no-default-features
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ members = [
"futures",
"futures-core",
"futures-channel",
"futures-cpupool",
"futures-executor",
"futures-util",
"futures-sink",
Expand Down
42 changes: 26 additions & 16 deletions futures-channel/benches/sync_mpsc.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,39 @@ extern crate futures;
extern crate futures_channel;
extern crate test;

use futures::task::{self, Notify, NotifyHandle};
use futures::task::{self, Wake, Waker};
use futures::executor::LocalPool;
use futures::prelude::*;

use futures_channel::mpsc::unbounded;
use futures_channel::mpsc::channel;
use futures_channel::mpsc::Sender;
use futures_channel::mpsc::UnboundedSender;


use test::Bencher;

fn notify_noop() -> NotifyHandle {
fn notify_noop() -> Waker {
struct Noop;

impl Notify for Noop {
fn notify(&self, _id: usize) {}
impl Wake for Noop {
fn wake(&self) {}
}

const NOOP : &'static Noop = &Noop;

NotifyHandle::from(NOOP)
Waker::from(NOOP)
}

/// Single producer, single consumer
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
let mut notify = || notify_noop().into();
let pool = LocalPool::new();
let mut exec = pool.executor();
let waker = notify_noop();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);
let mut cx = task::Context::new(&mut map, &waker, &mut exec);

// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
Expand All @@ -56,9 +58,11 @@ fn unbounded_1_tx(b: &mut Bencher) {
fn unbounded_100_tx(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
let mut notify = || notify_noop().into();
let pool = LocalPool::new();
let mut exec = pool.executor();
let waker = notify_noop();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);
let mut cx = task::Context::new(&mut map, &waker, &mut exec);

let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();

Expand All @@ -77,9 +81,11 @@ fn unbounded_100_tx(b: &mut Bencher) {

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
let mut notify = || notify_noop().into();
let pool = LocalPool::new();
let mut exec = pool.executor();
let waker = notify_noop();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);
let mut cx = task::Context::new(&mut map, &waker, &mut exec);

b.iter(|| {
let (tx, mut rx) = unbounded();
Expand Down Expand Up @@ -123,9 +129,11 @@ impl Stream for TestSender {
/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
let mut notify = || notify_noop().into();
let pool = LocalPool::new();
let mut exec = pool.executor();
let waker = notify_noop();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);
let mut cx = task::Context::new(&mut map, &waker, &mut exec);

b.iter(|| {
let (tx, mut rx) = channel(0);
Expand All @@ -149,9 +157,11 @@ fn bounded_100_tx(b: &mut Bencher) {
b.iter(|| {
// Each sender can send one item after specified capacity
let (tx, mut rx) = channel(0);
let mut notify = || notify_noop().into();
let pool = LocalPool::new();
let mut exec = pool.executor();
let waker = notify_noop();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);
let mut cx = task::Context::new(&mut map, &waker, &mut exec);

let mut tx: Vec<_> = (0..100).map(|_| {
TestSender {
Expand Down
4 changes: 2 additions & 2 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl SenderTask {
self.is_parked = false;

if let Some(task) = self.task.take() {
task.notify();
task.wake();
}
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ impl<T> Sender<T> {
};

if let Some(task) = task {
task.notify();
task.wake();
}
}

Expand Down
6 changes: 3 additions & 3 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl<T> Inner<T> {
if let Some(mut slot) = self.rx_task.try_lock() {
if let Some(task) = slot.take() {
drop(slot);
task.notify();
task.wake();
}
}
}
Expand All @@ -233,7 +233,7 @@ impl<T> Inner<T> {
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
task.wake()
}
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ impl<T> Inner<T> {
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
task.wake()
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::thread;

use futures::prelude::*;
use futures::future::{result, poll_fn};
use futures_executor::current_thread::run;
use futures_executor::block_on;
use futures_channel::mpsc;

#[test]
Expand All @@ -16,15 +16,15 @@ fn sequence() {

let amt = 20;
let t = thread::spawn(move || {
run(|c| c.block_on(send(amt, tx)))
block_on(send(amt, tx)).unwrap()
});
let mut list = run(|c| c.block_on(rx.collect())).unwrap().into_iter();
let mut list = block_on(rx.collect()).unwrap().into_iter();
for i in (1..amt + 1).rev() {
assert_eq!(list.next(), Some(i));
}
assert_eq!(list.next(), None);

t.join().unwrap().unwrap();
t.join().unwrap();

fn send(n: u32, sender: mpsc::Sender<u32>)
-> Box<Future<Item=(), Error=()> + Send> {
Expand All @@ -44,15 +44,15 @@ fn drop_sender() {
let f = poll_fn(|cx| {
rx.poll(cx)
});
assert_eq!(run(|c| c.block_on(f)).unwrap(), None)
assert_eq!(block_on(f).unwrap(), None)
}

#[test]
fn drop_rx() {
let (tx, rx) = mpsc::channel::<u32>(1);
let tx = run(|c| c.block_on(tx.send(1))).unwrap();
let tx = block_on(tx.send(1)).unwrap();
drop(rx);
assert!(run(|c| c.block_on(tx.send(1))).is_err());
assert!(block_on(tx.send(1)).is_err());
}

#[test]
Expand All @@ -68,10 +68,10 @@ fn drop_order() {
}
}

let tx = run(|c| c.block_on(tx.send(A))).unwrap();
let tx = block_on(tx.send(A)).unwrap();
assert_eq!(DROPS.load(Ordering::SeqCst), 0);
drop(rx);
assert_eq!(DROPS.load(Ordering::SeqCst), 1);
assert!(run(|c| c.block_on(tx.send(A))).is_err());
assert!(block_on(tx.send(A)).is_err());
assert_eq!(DROPS.load(Ordering::SeqCst), 2);
}
12 changes: 5 additions & 7 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,20 @@ use std::thread;

use futures::prelude::*;
use futures_channel::mpsc::*;
use futures_executor::current_thread::run;
use futures_executor::block_on;

#[test]
fn smoke() {
let (mut sender, receiver) = channel(1);

let t = thread::spawn(move || {
run(|c| {
while let Ok(s) = c.block_on(sender.send(42)) {
sender = s;
}
});
while let Ok(s) = block_on(sender.send(42)) {
sender = s;
}
});

// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
let _ = run(|c| c.block_on(receiver.take(3).for_each(|_| Ok(())))).unwrap();
block_on(receiver.take(3).for_each(|_| Ok(()))).unwrap();

t.join().unwrap()
}
Loading

0 comments on commit eb9f603

Please sign in to comment.