Skip to content

Commit

Permalink
Merge branch 'master' into tokio-reform
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Jan 26, 2018
2 parents b0509c5 + 92924d5 commit 0f6c661
Show file tree
Hide file tree
Showing 35 changed files with 1,006 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ matrix:
after_success:
- travis-cargo doc-upload
- os: linux
rust: 1.13.0
rust: 1.15.0
script: cargo test
sudo: false
script:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures"
version = "0.1.17"
version = "0.1.18"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT/Apache-2.0"
readme = "README.md"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ First, add this to your `Cargo.toml`:

```toml
[dependencies]
futures = "0.1.14"
futures = "0.1.17"
```

Next, add this to your crate:
Expand All @@ -39,7 +39,7 @@ a `#[no_std]` environment, use:

```toml
[dependencies]
futures = { version = "0.1.14", default-features = false }
futures = { version = "0.1.17", default-features = false }
```

# License
Expand All @@ -56,5 +56,5 @@ at your option.
### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Serde by you, as defined in the Apache-2.0 license, shall be
for inclusion in Futures by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
6 changes: 3 additions & 3 deletions benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn unbounded_1_tx(b: &mut Bencher) {
// Poll, not ready, park
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));

UnboundedSender::send(&tx, i).unwrap();
UnboundedSender::unbounded_send(&tx, i).unwrap();

// Now poll ready
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
Expand All @@ -68,7 +68,7 @@ fn unbounded_100_tx(b: &mut Bencher) {
for i in 0..tx.len() {
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));

UnboundedSender::send(&tx[i], i).unwrap();
UnboundedSender::unbounded_send(&tx[i], i).unwrap();

assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
}
Expand All @@ -82,7 +82,7 @@ fn unbounded_uncontended(b: &mut Bencher) {
let (tx, mut rx) = unbounded();

for i in 0..1000 {
UnboundedSender::send(&tx, i).expect("send");
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
}
Expand Down
2 changes: 1 addition & 1 deletion futures-cpupool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-cpupool"
version = "0.1.7"
version = "0.1.8"
authors = ["Alex Crichton <[email protected]>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/alexcrichton/futures-rs"
Expand Down
1 change: 1 addition & 0 deletions futures-cpupool/LICENSE-APACHE
1 change: 1 addition & 0 deletions futures-cpupool/LICENSE-MIT
63 changes: 47 additions & 16 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//! use futures::Future;
//! use futures_cpupool::CpuPool;
//!
//! # fn long_running_future(a: u32) -> futures::future::BoxFuture<u32, ()> {
//! # futures::future::result(Ok(a)).boxed()
//! # fn long_running_future(a: u32) -> Box<futures::future::Future<Item = u32, Error = ()> + Send> {
//! # Box::new(futures::future::result(Ok(a)))
//! # }
//! # fn main() {
//!
Expand Down Expand Up @@ -80,6 +80,7 @@ pub struct CpuPool {
/// of CPUs on the host. But you can change it until you call `create()`.
pub struct Builder {
pool_size: usize,
stack_size: usize,
name_prefix: Option<String>,
after_start: Option<Arc<Fn() + Send + Sync>>,
before_stop: Option<Arc<Fn() + Send + Sync>>,
Expand All @@ -99,8 +100,6 @@ struct Inner {
rx: Mutex<mpsc::Receiver<Message>>,
cnt: AtomicUsize,
size: usize,
after_start: Option<Arc<Fn() + Send + Sync>>,
before_stop: Option<Arc<Fn() + Send + Sync>>,
}

impl fmt::Debug for CpuPool {
Expand Down Expand Up @@ -250,16 +249,16 @@ impl Inner {
self.tx.lock().unwrap().send(msg).unwrap();
}

fn work(&self) {
self.after_start.as_ref().map(|fun| fun());
fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) {
after_start.map(|fun| fun());
loop {
let msg = self.rx.lock().unwrap().recv().unwrap();
match msg {
Message::Run(r) => r.run(),
Message::Close => break,
}
}
self.before_stop.as_ref().map(|fun| fun());
before_stop.map(|fun| fun());
}
}

Expand Down Expand Up @@ -302,7 +301,7 @@ impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
type Error = E;

fn poll(&mut self) -> Poll<T, E> {
match self.inner.poll().expect("shouldn't be canceled") {
match self.inner.poll().expect("cannot poll CpuFuture twice") {
Async::Ready(Ok(Ok(e))) => Ok(e.into()),
Async::Ready(Ok(Err(e))) => Err(e),
Async::Ready(Err(e)) => panic::resume_unwind(e),
Expand Down Expand Up @@ -342,6 +341,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
pool_size: num_cpus::get(),
stack_size: 0,
name_prefix: None,
after_start: None,
before_stop: None,
Expand All @@ -356,6 +356,12 @@ impl Builder {
self
}

/// Set stack size of threads in the pool.
pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
self.stack_size = stack_size;
self
}

/// Set thread name prefix of a future CpuPool
///
/// Thread name prefix is used for generating thread names. For example, if prefix is
Expand All @@ -366,19 +372,23 @@ impl Builder {
}

/// Execute function `f` right after each thread is started but before
/// running any jobs on it
/// running any jobs on it.
///
/// This is initially intended for bookkeeping and monitoring uses
/// This is initially intended for bookkeeping and monitoring uses.
/// The `f` will be deconstructed after the `builder` is deconstructed
/// and all threads in the pool has executed it.
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
self.after_start = Some(Arc::new(f));
self
}

/// Execute function `f` before each worker thread stops
/// Execute function `f` before each worker thread stops.
///
/// This is initially intended for bookkeeping and monitoring uses
/// This is initially intended for bookkeeping and monitoring uses.
/// The `f` will be deconstructed after the `builder` is deconstructed
/// and all threads in the pool has executed it.
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
Expand All @@ -399,21 +409,42 @@ impl Builder {
rx: Mutex::new(rx),
cnt: AtomicUsize::new(1),
size: self.pool_size,
after_start: self.after_start.clone(),
before_stop: self.before_stop.clone(),
}),
};
assert!(self.pool_size > 0);

for counter in 0..self.pool_size {
let inner = pool.inner.clone();
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let mut thread_builder = thread::Builder::new();
if let Some(ref name_prefix) = self.name_prefix {
thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
}
thread_builder.spawn(move || inner.work()).unwrap();
if self.stack_size > 0 {
thread_builder = thread_builder.stack_size(self.stack_size);
}
thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap();
}

return pool
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;

#[test]
fn test_drop_after_start() {
let (tx, rx) = mpsc::sync_channel(2);
let _cpu_pool = Builder::new()
.pool_size(2)
.after_start(move || tx.send(1).unwrap()).create();

// After Builder is deconstructed, the tx should be droped
// so that we can use rx as an iterator.
let count = rx.into_iter().count();
assert_eq!(count, 2);
}
}
6 changes: 3 additions & 3 deletions futures-cpupool/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::thread;
use std::time::Duration;

use futures::future::{Future, BoxFuture};
use futures::future::Future;
use futures_cpupool::{CpuPool, Builder};

fn done<T: Send + 'static>(t: T) -> BoxFuture<T, ()> {
futures::future::ok(t).boxed()
fn done<T: Send + 'static>(t: T) -> Box<Future<Item = T, Error = ()> + Send> {
Box::new(futures::future::ok(t))
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions src/future/join_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl<I> fmt::Debug for JoinAll<I>
/// });
///
/// let f = join_all(vec![
/// ok::<u32, u32>(1).boxed(),
/// err::<u32, u32>(2).boxed(),
/// ok::<u32, u32>(3).boxed(),
/// Box::new(ok::<u32, u32>(1)),
/// Box::new(err::<u32, u32>(2)),
/// Box::new(ok::<u32, u32>(3)),
/// ]);
/// let f = f.then(|x| {
/// assert_eq!(x, Err(2));
Expand Down
2 changes: 1 addition & 1 deletion src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ pub trait Future {
/// Create a cloneable handle to this future where all handles will resolve
/// to the same result.
///
/// The shared() method provides a mean to convert any future into a
/// The shared() method provides a method to convert any future into a
/// cloneable future. It enables a future to be polled by multiple threads.
///
/// The returned `Shared` future resolves successfully with
Expand Down
6 changes: 6 additions & 0 deletions src/future/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ impl<T, E> Future for FutureResult<T, E> {
self.inner.take().expect("cannot poll Result twice").map(Async::Ready)
}
}

impl<T, E> From<Result<T, E>> for FutureResult<T, E> {
fn from(r: Result<T, E>) -> Self {
result(r)
}
}
6 changes: 4 additions & 2 deletions src/future/select2.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use {Future, Poll, Async};
use future::Either;

/// Future for the `merge` combinator, waiting for one of two differently-typed
/// Future for the `select2` combinator, waiting for one of two differently-typed
/// futures to complete.
///
/// This is created by the `Future::merge` method.
/// This is created by the [`Future::select2`] method.
///
/// [`Future::select2`]: trait.Future.html#method.select2
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Select2<A, B> {
Expand Down
22 changes: 21 additions & 1 deletion src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use {Future, Poll, Async};
use task::{self, Task};
use executor::{self, Notify, Spawn};

use std::{fmt, mem, ops};
use std::{error, fmt, mem, ops};
use std::cell::UnsafeCell;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -278,3 +278,23 @@ impl<E> ops::Deref for SharedError<E> {
&self.error.as_ref()
}
}

impl<E> fmt::Display for SharedError<E>
where E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.error.fmt(f)
}
}

impl<E> error::Error for SharedError<E>
where E: error::Error,
{
fn description(&self) -> &str {
self.error.description()
}

fn cause(&self) -> Option<&error::Error> {
self.error.cause()
}
}
Loading

0 comments on commit 0f6c661

Please sign in to comment.