Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support spawning asynchronous tasks #212

Merged
merged 16 commits into from
Feb 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cpu_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn task_stall_scope(args: &Args) {
}

#[cfg(not(feature = "unstable"))]
fn task_stall_scope(args: &Args) {
fn task_stall_scope(_args: &Args) {
println!("try `cargo run` with `--features unstable`");
process::exit(1);
}
59 changes: 59 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[allow(unused_imports)]
use log::Event::*;
use std::any::Any;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use registry;

/// Custom error type for the rayon thread pool configuration.
Expand Down Expand Up @@ -46,15 +48,26 @@ impl Error for InitError {
pub struct Configuration {
/// The number of threads in the rayon thread pool. Must not be zero.
num_threads: Option<usize>,

/// Custom closure, if any, to handle a panic that we cannot propagate
/// anywhere else.
panic_handler: Option<PanicHandler>,

/// Closure to compute the name of a thread.
get_thread_name: Option<Box<FnMut(usize) -> String>>,
}

/// The type for a panic handling closure. Note that this same closure
/// may be invoked multiple times in parallel.
pub type PanicHandler = Arc<Fn(Box<Any + Send>) + Send + Sync>;

impl Configuration {
/// Creates and return a valid rayon thread pool configuration, but does not initialize it.
pub fn new() -> Configuration {
Configuration {
num_threads: None,
get_thread_name: None,
panic_handler: None,
}
}

Expand Down Expand Up @@ -86,6 +99,32 @@ impl Configuration {
self
}

/// Returns (and takes ownership of) the current panic handler.
/// After this call, no panic handler is registered in the
/// configuration anymore.
pub fn panic_handler(&self) -> Option<PanicHandler> {
self.panic_handler.clone()
}

/// Normally, whenever Rayon catches a panic, it tries to
/// propagate it to someplace sensible, to try and reflect the
/// semantics of sequential execution. But in some cases,
/// particularly with the `spawn_async()` APIs, there is no
/// obvious place where we should propagate the panic to.
/// In that case, this panic handler is invoked.
///
/// If no panic handler is set, the default is to abort the
/// process, under the principle that panics should not go
/// unobserved.
///
/// If the panic handler itself panics, this will abort the
/// process. To prevent this, wrap the body of your panic handler
/// in a call to `std::panic::catch_unwind()`.
pub fn set_panic_handler(mut self, panic_handler: PanicHandler) -> Configuration {
self.panic_handler = Some(panic_handler);
self
}

/// Checks whether the configuration is valid.
pub fn validate(&self) -> Result<(), InitError> {
if let Some(value) = self.num_threads {
Expand Down Expand Up @@ -142,3 +181,23 @@ pub fn dump_stats() {
dump_stats!();
}

impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Configuration { ref num_threads, ref get_thread_name, ref panic_handler } = *self;

// Just print `Some("<closure>")` or `None` to the debug
// output.
let get_thread_name = get_thread_name.as_ref().map(|_| "<closure>");

// Just print `Some("<closure>")` or `None` to the debug
// output.
let panic_handler = panic_handler.as_ref().map(|_| "<closure>");

f.debug_struct("Configuration")
.field("num_threads", num_threads)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.finish()
}
}

26 changes: 16 additions & 10 deletions src/scope/future/README.md → src/future/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,22 @@ Let's walk through them:
the `Registry` from being dropped. In particular, this doesn't
prevent the threads in a registry from terminating while the future
is unscheduled etc (though other fields in the future do).
- The `scope` field is a `*const Scope<'scope>`, and the idea is that
when the future is created one job is allocated for this future in
the scope. Once the future has finished executing completely (and
hence its data is ready), this count will be marked as
completed. Until this time, the scope will not terminate. This has
an interesting relationship to the future `F`, as the type `F` is
known to be valid for *at least* the call to `scope()` (i.e., we
know that it outlives `'scope`, though that is all we know; it may
have references into the stack which become invalidated once
`scope()` returns).
- The `scope` field (of type `S`) is the "enclosing scope". This scope
is an abstract value that implements the `FutureScope<'scope>` trait
-- this means that it is responsible for ensuring that `'scope` does
not end until one of the `FutureScope` methods are invoked (which
occurs when the future has finished executing). For example, if the
future is spawned inside a `scope()` call, then the `S` will be a
wrapper (`ScopeFutureScope`) around a `*const Scope<'scope>`. When
the future is created one job is allocated for this future in the
scope, and the scope counter is decremented once the future is
marked as completing.
- In general, the job of the `scope` field is to ensure that the
future type (`F`) remains valid. After all, since `F: 'scope`, `F`
is known to be valid until the lifetime `'scope` ends, and that
lifetime cannot end until the `scope` methods are invoked, so we
know that `F` must stay valid until one of those methods are
invoked.
- All of our data of type `F` is stored in the field `spawn` (not
shown here). This field is always set to `None` before the scope
counter is decremented. See the section on lifetime safety for more
Expand Down
138 changes: 88 additions & 50 deletions src/scope/future/mod.rs → src/future/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
//! Future support in Rayon. This module *primary* consists of
//! internal APIs that are exposed through `Scope::spawn_future` and
//! `spawn_future_async`. However, the type `RayonFuture` is a public
//! type exposed to all users.
//!
//! See `README.md` for details.

use latch::{LatchProbe};
#[allow(warnings)]
use log::Event::*;
use futures::{Async, Future, Poll};
use futures::{Async, Poll};
use futures::executor;
use futures::future::CatchUnwind;
use futures::task::{self, Spawn, Task, Unpark};
Expand All @@ -16,39 +23,58 @@ use std::sync::atomic::Ordering::*;
use std::sync::Mutex;
use unwind;

use super::Scope;
pub use futures::Future;

const STATE_PARKED: usize = 0;
const STATE_UNPARKED: usize = 1;
const STATE_EXECUTING: usize = 2;
const STATE_EXECUTING_UNPARKED: usize = 3;
const STATE_COMPLETE: usize = 4;

// Warning: Public end-user API.
/// Represents the result of a future that has been spawned in the
/// Rayon threadpool.
///
/// # Panic behavior
///
/// Any panics that occur while computing the spawned future will be
/// propagated when this future is polled.
pub struct RayonFuture<T, E> {
// Warning: Public end-user API!
inner: Arc<ScopeFutureTrait<Result<T, E>, Box<Any + Send + 'static>>>,
}

/// Unsafe because implementor must guarantee:
///
/// 1. That the type `Self` remains dynamically valid until one of the
/// completion methods is called.
/// 2. That the lifetime `'scope` cannot end until one of those
/// methods is called.
///
/// NB. Although this is public, it is not exposed to outside users.
pub unsafe trait FutureScope<'scope> {
fn registry(&self) -> Arc<Registry>;
fn future_panicked(self, err: Box<Any + Send>);
fn future_completed(self);
}

/// Create a `RayonFuture` that will execute `F` and yield its result,
/// propagating any panics.
///
/// Unsafe because caller asserts that all references in `F` will
/// remain valid at least until `counter` is decremented via `set()`.
/// In practice, this is ensured by the `scope()` API, which ensures
/// that `F: 'scope` and that `'scope` does not end until `counter`
/// reaches 0.
///
/// NB. This is a free fn so that we can expose `RayonFuture` as public API.
pub unsafe fn new_rayon_future<'scope, F>(future: F,
scope: *const Scope<'scope>)
-> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'scope,
/// NB. Although this is public, it is not exposed to outside users.
pub fn new_rayon_future<'scope, F, S>(future: F, scope: S) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
// We always have to have a non-null scope (for now, anyway):
debug_assert!(!scope.is_null());

let inner = ScopeFuture::spawn(future, scope);
return RayonFuture { inner: hide_lifetime(inner) };

// We assert that it is safe to hide the type `F` (and, in
// particular, the lifetimes in it). This is true because the API
// offered by a `RayonFuture` only permits access to the result of
// the future (of type `F::Item` or `F::Error`) and those types
// *are* exposed in the `RayonFuture<F::Item, F::Error>` type. See
// README.md for details.
unsafe {
return RayonFuture { inner: hide_lifetime(inner) };
}

unsafe fn hide_lifetime<'l, T, E>(x: Arc<ScopeFutureTrait<T, E> + 'l>)
-> Arc<ScopeFutureTrait<T, E>> {
Expand All @@ -58,6 +84,7 @@ pub unsafe fn new_rayon_future<'scope, F>(future: F,

impl<T, E> RayonFuture<T, E> {
pub fn rayon_wait(mut self) -> Result<T, E> {
// NB: End-user API!
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
self.wait()
Expand Down Expand Up @@ -106,28 +133,32 @@ impl<T, E> Drop for RayonFuture<T, E> {

/// ////////////////////////////////////////////////////////////////////////

struct ScopeFuture<'scope, F: Future + Send + 'scope> {
struct ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
state: AtomicUsize,
registry: Arc<Registry>,
contents: Mutex<ScopeFutureContents<'scope, F>>,
contents: Mutex<ScopeFutureContents<'scope, F, S>>,
}

type CU<F> = CatchUnwind<AssertUnwindSafe<F>>;
type CUItem<F> = <CU<F> as Future>::Item;
type CUError<F> = <CU<F> as Future>::Error;

struct ScopeFutureContents<'scope, F: Future + Send + 'scope> {
struct ScopeFutureContents<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
spawn: Option<Spawn<CU<F>>>,
unpark: Option<Arc<Unpark>>,

// Pointer to ourselves. We `None` this out when we are finished
// executing, but it's convenient to keep around normally.
this: Option<Arc<ScopeFuture<'scope, F>>>,
this: Option<Arc<ScopeFuture<'scope, F, S>>>,

// the counter in the scope; since the scope doesn't terminate until
// counter reaches zero, and we hold a ref in this counter, we are
// assured that this pointer remains valid
scope: *const Scope<'scope>,
scope: Option<S>,

waiting_task: Option<Task>,
result: Poll<CUItem<F>, CUError<F>>,
Expand All @@ -136,29 +167,30 @@ struct ScopeFutureContents<'scope, F: Future + Send + 'scope> {
}

// Assert that the `*const` is safe to transmit between threads:
unsafe impl<'scope, F: Future + Send> Send for ScopeFuture<'scope, F> {}
unsafe impl<'scope, F: Future + Send> Sync for ScopeFuture<'scope, F> {}

impl<'scope, F: Future + Send> ScopeFuture<'scope, F> {
// Unsafe: Caller asserts that `future` and `counter` will remain
// valid until we invoke `counter.set()`.
unsafe fn spawn(future: F, scope: *const Scope<'scope>) -> Arc<Self> {
let worker_thread = WorkerThread::current();
debug_assert!(!worker_thread.is_null());

unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{}
unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{}

impl<'scope, F, S> ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn spawn(future: F, scope: S) -> Arc<Self> {
// Using `AssertUnwindSafe` is valid here because (a) the data
// is `Send + Sync`, which is our usual boundary and (b)
// panics will be propagated when the `RayonFuture` is polled.
let spawn = task::spawn(AssertUnwindSafe(future).catch_unwind());

let future: Arc<Self> = Arc::new(ScopeFuture::<F> {
let future: Arc<Self> = Arc::new(ScopeFuture::<F, S> {
state: AtomicUsize::new(STATE_PARKED),
registry: (*worker_thread).registry().clone(),
registry: scope.registry(),
contents: Mutex::new(ScopeFutureContents {
spawn: None,
unpark: None,
this: None,
scope: scope,
scope: Some(scope),
waiting_task: None,
result: Ok(Async::NotReady),
canceled: false,
Expand Down Expand Up @@ -233,7 +265,7 @@ impl<'scope, F: Future + Send> ScopeFuture<'scope, F> {
// references in the future are valid.
unsafe {
let job_ref = Self::into_job_ref(contents.this.clone().unwrap());
self.registry.inject(&[job_ref]);
self.registry.inject_or_push(job_ref);
}
return;
}
Expand Down Expand Up @@ -306,13 +338,17 @@ impl<'scope, F: Future + Send> ScopeFuture<'scope, F> {
}
}

impl<'scope, F: Future + Send> Unpark for ScopeFuture<'scope, F> {
impl<'scope, F, S> Unpark for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn unpark(&self) {
self.unpark_inherent();
}
}

impl<'scope, F: Future + Send> Job for ScopeFuture<'scope, F> {
impl<'scope, F, S> Job for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
unsafe fn execute(this: *const Self) {
let this: Arc<Self> = mem::transmute(this);

Expand Down Expand Up @@ -349,7 +385,9 @@ impl<'scope, F: Future + Send> Job for ScopeFuture<'scope, F> {
}
}

impl<'scope, F: Future + Send> ScopeFutureContents<'scope, F> {
impl<'scope, F, S> ScopeFutureContents<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn poll(&mut self) -> Poll<CUItem<F>, CUError<F>> {
let unpark = self.unpark.clone().unwrap();
self.spawn.as_mut().unwrap().poll_future(unpark)
Expand Down Expand Up @@ -392,31 +430,31 @@ impl<'scope, F: Future + Send> ScopeFutureContents<'scope, F> {
// Allow the enclosing scope to end. Asserts that
// `self.counter` is still valid, which we know because caller
// to `new_rayon_future()` ensures it for us.
unsafe {
if let Some(err) = err {
(*self.scope).job_panicked(err);
} else {
(*self.scope).job_completed_ok();
}
let scope = self.scope.take().unwrap();
if let Some(err) = err {
scope.future_panicked(err);
} else {
scope.future_completed();
}
}
}

impl<'scope, F> LatchProbe for ScopeFuture<'scope, F>
where F: Future + Send
impl<'scope, F, S> LatchProbe for ScopeFuture<'scope, F, S>
where F: Future + Send, S: FutureScope<'scope>,
{
fn probe(&self) -> bool {
self.state.load(Acquire) == STATE_COMPLETE
}
}

/// NB. Although this is public, it is not exposed to outside users.
pub trait ScopeFutureTrait<T, E>: Send + Sync + LatchProbe {
fn poll(&self) -> Poll<T, E>;
fn cancel(&self);
}

impl<'scope, F> ScopeFutureTrait<CUItem<F>, CUError<F>> for ScopeFuture<'scope, F>
where F: Future + Send
impl<'scope, F, S> ScopeFutureTrait<CUItem<F>, CUError<F>> for ScopeFuture<'scope, F, S>
where F: Future + Send, S: FutureScope<'scope>,
{
fn poll(&self) -> Poll<CUItem<F>, CUError<F>> {
// Important: due to transmute hackery, not all the fields are
Expand Down
Loading