Skip to content

Commit

Permalink
Reduce current_thread API surface (#717)
Browse files Browse the repository at this point in the history
This patch removes the concept of daemon tasks from the current thread
executor as well as the global function to cancel all spawned tasks.

If appropriate, these APIs can be brought back at a later time, but in
an effort to initially be conservative and ship a release, they are
being held back.

The main hesitation being that these APIs are related to shutting down
an executor and do not seem appropriate to be accessible from a global
context. If they remain accessible, then once the executor
implementation is pluggable, it will require that all executors
implement the same logic.

Before making this commitment, an analysis of real use cases should be
done.
  • Loading branch information
carllerche authored Jan 31, 2018
1 parent 0f6c661 commit 01a4ccf
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 167 deletions.
192 changes: 40 additions & 152 deletions src/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//!
//! Before being able to spawn futures with this module, an executor
//! context must be setup by calling [`run`]. From within that context [`spawn`]
//! or [`spawn_daemon`] may be called with the future to run in the background.
//! may be called with the future to run in the background.
//!
//! ```
//! # use futures::current_thread::*;
Expand Down Expand Up @@ -36,26 +36,12 @@
//! All futures managed by this module will remain on the current thread,
//! as such, this module is able to safely execute futures that are not `Send`.
//!
//! Once a future is complete, it is dropped. Once all [non
//! daemon](#daemon-futures) futures are completed, [`run`] will unblock and
//! return.
//! Once a future is complete, it is dropped. Once all futures are completed,
//! [`run`] will unblock and return.
//!
//! This module makes a best effort to fairly schedule futures that it manages.
//!
//! # Daemon Futures
//!
//! A daemon future is a future that does not require to be complete in order
//! for [`run`] to complete running. These are useful for background
//! "maintenance" tasks that are not critical to the completion of the primary
//! computation.
//!
//! When the execution functions finish running and unblock, any daemon futures
//! that have not yet completed are immediately dropped.
//!
//! A daemon future can be executed with [`spawn_daemon`].
//!
//! [here]: https://tokio.rs/docs/going-deeper-futures/tasks/
//! [`spawn_daemon`]: fn.spawn_daemon.html
//! [`spawn`]: fn.spawn.html
//! [`run`]: fn.run.html
Expand All @@ -74,8 +60,8 @@ use std::rc::Rc;
/// Executes futures on the current thread.
///
/// All futures executed using this executor will be executed on the current
/// thread as non-daemon futures. As such, `run` will wait for these futures to
/// complete before returning.
/// thread. As such, `run` will wait for these futures to complete before
/// returning.
///
/// For more details, see the [module level](index.html) documentation.
#[derive(Debug, Clone)]
Expand All @@ -84,19 +70,6 @@ pub struct TaskExecutor {
_p: ::std::marker::PhantomData<Rc<()>>,
}

/// Executes daemon futures on the current thread.
///
/// All futures executed using this executor will be executed on the current
/// thread as daemon futures. As such, `run` will **not** wait for these
/// futures to complete before returning from `run`.
///
/// For more details, see the [module level](index.html) documentation.
#[derive(Debug, Clone)]
pub struct DaemonExecutor {
// Prevent the handle from moving across threads.
_p: ::std::marker::PhantomData<Rc<()>>,
}

/// A context yielded to the closure provided to `run`.
///
/// This context is mostly a future-proofing of the library to add future
Expand All @@ -105,7 +78,7 @@ pub struct DaemonExecutor {
#[derive(Debug)]
pub struct Context<'a> {
enter: executor::Enter,
_p: ::std::marker::PhantomData<&'a ()>,
cancel: &'a Cell<bool>,
}

/// Implements the "blocking" logic for the current thread executor. A
Expand All @@ -119,37 +92,27 @@ struct TaskRunner<T> {

struct CurrentRunner {
/// When set to true, the executor should return immediately, even if there
/// still are non-daemon futures to run.
/// still futures to run.
cancel: Cell<bool>,

/// Number of non-daemon futures currently being executed by the runner.
non_daemons: Cell<usize>,
/// Number of futures currently being executed by the runner.
num_futures: Cell<usize>,

/// Raw pointer to the current scheduler pusher.
///
/// The raw pointer is required in order to store it in a thread-local slot.
schedule: Cell<Option<*mut Schedule>>,
}

type Scheduler<T> = scheduler::Scheduler<SpawnedFuture, T>;
type Schedule = scheduler::Schedule<SpawnedFuture>;

#[derive(Debug)]
struct SpawnedFuture {
/// True if the executed future should not prevent the executor from
/// terminating.
daemon: bool,

/// The task to execute.
inner: Task,
}
type Scheduler<T> = scheduler::Scheduler<Task, T>;
type Schedule = scheduler::Schedule<Task>;

struct Task(Spawn<Box<Future<Item = (), Error = ()>>>);

/// Current thread's task runner. This is set in `TaskRunner::with`
thread_local!(static CURRENT: CurrentRunner = CurrentRunner {
cancel: Cell::new(false),
non_daemons: Cell::new(0),
num_futures: Cell::new(0),
schedule: Cell::new(None),
});

Expand All @@ -158,7 +121,7 @@ thread_local!(static CURRENT: CurrentRunner = CurrentRunner {
///
/// In more detail, this function will block until:
/// - All executing futures are complete, or
/// - `cancel_all_executing` is invoked.
/// - `cancel_all_spawned` is invoked.
pub fn run<F, R>(f: F) -> R
where F: FnOnce(&mut Context) -> R
{
Expand All @@ -180,57 +143,21 @@ where F: FnOnce(&mut Context) -> R,

/// Executes a future on the current thread.
///
/// The provided future must complete or be canceled before
/// `run` will return.
/// The provided future must complete or be canceled before `run` will return.
///
/// # Panics
///
/// This function can only be invoked from the context of a
/// `run` call; any other use will result in a panic.
/// This function can only be invoked from the context of a `run` call; any
/// other use will result in a panic.
pub fn spawn<F>(future: F)
where F: Future<Item = (), Error = ()> + 'static
{
execute(future, false).unwrap_or_else(|_| {
panic!("cannot call `execute` unless the thread is already \
in the context of a call to `run`")
})
}

/// Executes a daemon future on the current thread.
///
/// Completion of the provided future is not required for the pending
/// `run` call to complete. If `run` returns before `future` completes, it
/// will be dropped.
///
/// # Panics
///
/// This function can only be invoked from the context of a
/// `run` call; any other use will result in a panic.
pub fn spawn_daemon<F>(future: F)
where F: Future<Item = (), Error = ()> + 'static
{
execute(future, true).unwrap_or_else(|_| {
execute(future).unwrap_or_else(|_| {
panic!("cannot call `execute` unless the thread is already \
in the context of a call to `run`")
})
}

/// Cancels *all* executing futures.
///
/// This cancels both daemon and non-daemon futures.
///
/// # Panics
///
/// This function can only be invoked from the context of a
/// `run` call; any other use will result in a panic.
pub fn cancel_all_spawned() {
CurrentRunner::with(|runner| runner.cancel_all_executing())
.unwrap_or_else(|()| {
panic!("cannot call `cancel_all_executing` unless the thread is already \
in the context of a call to `run`")
})
}

/// Returns an executor that executes futures on the current thread.
///
/// The user of `TaskExecutor` must ensure that when a future is submitted,
Expand All @@ -247,27 +174,7 @@ impl<F> Executor<F> for TaskExecutor
where F: Future<Item = (), Error = ()> + 'static
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
execute(future, false)
}
}

/// Returns an executor that executes daemon futures on the current thread.
///
/// The user of `DaemonExecutor` must ensure that when a future is submitted,
/// that it is done from the context a call to `run`.
///
/// For more details, see the [module level](index.html) documentation.
pub fn daemon_executor() -> DaemonExecutor {
DaemonExecutor {
_p: ::std::marker::PhantomData,
}
}

impl<F> Executor<F> for DaemonExecutor
where F: Future<Item = (), Error = ()> + 'static
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
execute(future, true)
execute(future)
}
}

Expand All @@ -280,6 +187,11 @@ impl<'a> Context<'a> {
pub fn enter(&self) -> &executor::Enter {
&self.enter
}

/// Cancels *all* executing futures.
pub fn cancel_all_spawned(&self) {
self.cancel.set(true);
}
}

/// Submits a future to the current executor. This is done by
Expand All @@ -289,21 +201,16 @@ impl<'a> Context<'a> {
/// `run`, then `Err` is returned.
///
/// This function does not panic.
fn execute<F>(future: F, daemon: bool) -> Result<(), ExecuteError<F>>
fn execute<F>(future: F) -> Result<(), ExecuteError<F>>
where F: Future<Item = (), Error = ()> + 'static,
{
CURRENT.with(|current| {
match current.schedule.get() {
Some(schedule) => {
let spawned = SpawnedFuture {
daemon: daemon,
inner: Task::new(future),
};

if !daemon {
let non_daemons = current.non_daemons.get();
current.non_daemons.set(non_daemons + 1);
}
let spawned = Task::new(future);

let num_futures = current.num_futures.get();
current.num_futures.set(num_futures + 1);

unsafe { (*schedule).schedule(spawned); }

Expand Down Expand Up @@ -346,9 +253,8 @@ where T: Wakeup,
/// Once all context is setup, the init closure is invoked. This is the
/// "boostrapping" process that executes the initial futures into the
/// scheduler. After this, the function loops and advances the scheduler
/// state until all non daemon futures complete. When no scheduled futures
/// are ready to be advanced, the thread is blocked using
/// `ThreadNotify::park`.
/// state until all futures complete. When no scheduled futures are ready to
/// be advanced, the thread is blocked using `S: Sleep`.
fn enter<S, F, R>(sleep: &mut S, f: F) -> R
where F: FnOnce(&mut Context) -> R,
S: Sleep<Wakeup = T>,
Expand All @@ -369,7 +275,7 @@ where T: Wakeup,
// Enter an execution scope
let mut ctx = Context {
enter: enter,
_p: ::std::marker::PhantomData,
cancel: &current.cancel,
};

// Set the scheduler to the TLS and perform setup work,
Expand All @@ -384,8 +290,8 @@ where T: Wakeup,
//
// This function will not return until either
//
// a) All non daemon futures have completed execution
// b) `cancel_all_executing` is called, forcing the executor to
// a) All futures have completed execution
// b) `cancel_all_spawned` is called, forcing the executor to
// return.
runner.run(sleep, current);

Expand Down Expand Up @@ -418,9 +324,9 @@ where T: Wakeup,
// See `set_schedule` documentation for more details on how we
// guard against mutable pointer aliasing.
current.set_schedule(scheduler as &mut Schedule, || {
match spawned.inner.0.poll_future_notify(notify, 0) {
match spawned.0.poll_future_notify(notify, 0) {
Ok(Async::Ready(_)) | Err(_) => {
Async::Ready(spawned.daemon)
Async::Ready(())
}
Ok(Async::NotReady) => Async::NotReady,
}
Expand All @@ -431,12 +337,10 @@ where T: Wakeup,
match res {
// A future completed. `is_daemon` is true when the future was
// submitted as a daemon future.
Tick::Data(is_daemon) => {
if !is_daemon {
let non_daemons = current.non_daemons.get();
debug_assert!(non_daemons > 0);
current.non_daemons.set(non_daemons - 1);
}
Tick::Data(_) => {
let num_futures = current.num_futures.get();
debug_assert!(num_futures > 0);
current.num_futures.set(num_futures - 1);
},
Tick::Empty => {
// The scheduler did not have any work to process.
Expand All @@ -461,18 +365,6 @@ where T: Wakeup,
}

impl CurrentRunner {
fn with<F, R>(f: F) -> Result<R, ()>
where F: FnOnce(&Self) -> R,
{
CURRENT.with(|current| {
if current.schedule.get().is_some() {
Ok(f(current))
} else {
Err(())
}
})
}

/// Set the provided schedule handle to the TLS slot for the duration of the
/// closure.
///
Expand Down Expand Up @@ -516,11 +408,7 @@ impl CurrentRunner {
}

fn is_running(&self) -> bool {
self.non_daemons.get() > 0 && !self.cancel.get()
}

fn cancel_all_executing(&self) {
self.cancel.set(true);
self.num_futures.get() > 0 && !self.cancel.get()
}
}

Expand Down
Loading

0 comments on commit 01a4ccf

Please sign in to comment.