diff --git a/Cargo.toml b/Cargo.toml index 2cb9a47..1d562ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,21 +12,14 @@ keywords = ["asynchronous", "executor", "single", "multi", "spawn"] categories = ["asynchronous", "concurrency"] readme = "README.md" -[features] -default = ["async-io"] - [dependencies] -futures-lite = "0.1.8" -multitask = "0.2.0" -parking = "2.0.0" -scoped-tls = "1.0.0" -waker-fn = "1.0.0" - -# Optional optimization: executor waits on I/O when idle. -[dependencies.async-io] -version = "0.1.5" -optional = true +async-task = "3.0.0" +concurrent-queue = "1.2.2" +fastrand = "1.3.4" +futures-lite = "1.0.0" +once_cell = "1.4.1" [dev-dependencies] -async-channel = "1.1.1" +async-channel = "1.4.1" +async-io = "0.2.0" easy-parallel = "3.1.0" diff --git a/README.md b/README.md index 9c1e68d..16fa62d 100644 --- a/README.md +++ b/README.md @@ -9,31 +9,24 @@ https://crates.io/crates/async-executor) [![Documentation](https://docs.rs/async-executor/badge.svg)]( https://docs.rs/async-executor) -Async executor. - -This crate offers two kinds of executors: single-threaded and multi-threaded. +Async executors. ## Examples -Run a single-threaded and a multi-threaded executor at the same time: - ```rust -use async_channel::unbounded; -use async_executor::{Executor, LocalExecutor}; -use easy_parallel::Parallel; +use async_executor::Executor; +use futures_lite::future; +// Create a new executor. let ex = Executor::new(); -let local_ex = LocalExecutor::new(); -let (trigger, shutdown) = unbounded::<()>(); - -Parallel::new() - // Run four executor threads. - .each(0..4, |_| ex.run(shutdown.recv())) - // Run local executor on the current thread. - .finish(|| local_ex.run(async { - println!("Hello world!"); - drop(trigger); - })); + +// Spawn a task. +let task = ex.spawn(async { + println!("Hello world"); +}); + +// Run the executor until the task complets. +future::block_on(ex.run(task)); ``` ## License diff --git a/src/lib.rs b/src/lib.rs index e23b4c9..59f6ebe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,58 +1,298 @@ -//! Async executor. -//! -//! This crate offers two kinds of executors: single-threaded and multi-threaded. +//! Async executors. //! //! # Examples //! -//! Run a single-threaded and a multi-threaded executor at the same time: -//! //! ``` -//! use async_channel::unbounded; -//! use async_executor::{Executor, LocalExecutor}; -//! use easy_parallel::Parallel; +//! use async_executor::Executor; +//! use futures_lite::future; //! +//! // Create a new executor. //! let ex = Executor::new(); -//! let local_ex = LocalExecutor::new(); -//! let (trigger, shutdown) = unbounded::<()>(); //! -//! Parallel::new() -//! // Run four executor threads. -//! .each(0..4, |_| ex.run(shutdown.recv())) -//! // Run local executor on the current thread. -//! .finish(|| local_ex.run(async { -//! println!("Hello world!"); -//! drop(trigger); -//! })); +//! // Spawn a task. +//! let task = ex.spawn(async { +//! println!("Hello world"); +//! }); +//! +//! // Run the executor until the task complets. +//! future::block_on(ex.run(task)); //! ``` #![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +use std::cell::Cell; use std::future::Future; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::task::{Context, Poll, Waker}; -use futures_lite::pin; -use scoped_tls::scoped_thread_local; -use waker_fn::waker_fn; +use concurrent_queue::ConcurrentQueue; +use futures_lite::future; -#[cfg(feature = "async-io")] -use async_io::parking; -#[cfg(not(feature = "async-io"))] -use parking; - -scoped_thread_local!(static EX: Executor); -scoped_thread_local!(static LOCAL_EX: LocalExecutor); +/// A runnable future, ready for execution. +/// +/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`, +/// we get back two values: +/// +/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable` +/// 2. an `async_task::JoinHandle`, which is wrapped inside a `Task` +/// +/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's +/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task +/// queue in an executor. +type Runnable = async_task::Task<()>; -/// Multi-threaded executor. +/// A spawned future. +/// +/// Tasks are also futures themselves and yield the output of the spawned future. +/// +/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit +/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. +/// +/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. +/// +/// If a task panics, the panic will be thrown into the [`Executor::run()`] or +/// [`LocalExecutor::run()`] invocation that polled it. +/// +/// # Examples +/// +/// ``` +/// use async_executor::Executor; +/// use futures_lite::future; +/// use std::thread; +/// +/// let ex = Executor::new(); /// -/// The executor does not spawn threads on its own. Instead, you need to call [`Executor::run()`] -/// on manually spawned executor threads. +/// // Spawn a future onto the executor. +/// let task = ex.spawn(async { +/// println!("Hello from a task!"); +/// 1 + 2 +/// }); +/// +/// // Run an executor thread. +/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); +/// +/// // Wait for the result. +/// assert_eq!(future::block_on(task), 3); +/// ``` +#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] +#[derive(Debug)] +pub struct Task(Option>); + +impl Task { + /// Detaches the task to let it keep running in the background. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use async_io::Timer; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// ex.spawn(async { + /// loop { + /// println!("I'm a daemon task looping forever."); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }) + /// .detach(); + /// ``` + pub fn detach(mut self) { + self.0.take().unwrap(); + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use async_io::Timer; + /// use futures_lite::future; + /// use std::thread; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// let task = ex.spawn(async { + /// loop { + /// println!("Even though I'm in an infinite loop, you can still cancel me!"); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }); + /// + /// // Run an executor thread. + /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); + /// + /// future::block_on(async { + /// Timer::after(Duration::from_secs(3)).await; + /// task.cancel().await; + /// }); + /// ``` + pub async fn cancel(self) -> Option { + let mut task = self; + let handle = task.0.take().unwrap(); + handle.cancel(); + handle.await + } +} + +impl Drop for Task { + fn drop(&mut self) { + if let Some(handle) = &self.0 { + handle.cancel(); + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(output) => Poll::Ready(output.expect("task has failed")), + } + } +} + +/// The state of a executor. +#[derive(Debug)] +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Shards of the global queue created by tickers. + shards: RwLock>>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, +} + +impl State { + /// Creates state for a new executor. + fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + shards: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + id_gen: 0, + }), + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if !self + .notified + .compare_and_swap(false, true, Ordering::SeqCst) + { + let waker = self.sleepers.lock().unwrap().notify(); + if let Some(w) = waker { + w.wake(); + } + } + } +} + +/// A list of sleeping tickers. +#[derive(Debug)] +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(u64, Waker)>, + + /// ID generator for sleepers. + id_gen: u64, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> u64 { + let id = self.id_gen; + self.id_gen += 1; + self.count += 1; + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: u64, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + if !item.1.will_wake(waker) { + item.1 = waker.clone(); + } + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + fn remove(&mut self, id: u64) { + self.count -= 1; + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return; + } + } + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + +/// An async executor. /// /// # Examples /// +/// A multi-threaded executor: +/// /// ``` /// use async_channel::unbounded; /// use async_executor::Executor; @@ -64,7 +304,7 @@ scoped_thread_local!(static LOCAL_EX: LocalExecutor); /// /// Parallel::new() /// // Run four executor threads. -/// .each(0..4, |_| ex.run(shutdown.recv())) +/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) /// // Run the main future on the current thread. /// .finish(|| future::block_on(async { /// println!("Hello world!"); @@ -73,11 +313,14 @@ scoped_thread_local!(static LOCAL_EX: LocalExecutor); /// ``` #[derive(Debug)] pub struct Executor { - ex: Arc, + state: once_cell::sync::OnceCell>, } +impl UnwindSafe for Executor {} +impl RefUnwindSafe for Executor {} + impl Executor { - /// Creates a multi-threaded executor. + /// Creates a new executor. /// /// # Examples /// @@ -86,25 +329,9 @@ impl Executor { /// /// let ex = Executor::new(); /// ``` - pub fn new() -> Executor { + pub const fn new() -> Executor { Executor { - ex: Arc::new(multitask::Executor::new()), - } - } - - /// Creates a spawner for this executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// - /// let ex = Executor::new(); - /// let spawner = ex.spawner(); - /// ``` - pub fn spawner(&self) -> Spawner { - Spawner { - ex: self.ex.clone(), + state: once_cell::sync::OnceCell::new(), } } @@ -125,30 +352,10 @@ impl Executor { &self, future: impl Future + Send + 'static, ) -> Task { - Task(self.ex.spawn(future)) - } - - /// Enters the context of an executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{Executor, Task}; - /// - /// let ex = Executor::new(); - /// - /// ex.enter(|| { - /// // `Task::spawn()` now knows which executor to spawn onto. - /// let task = Task::spawn(async { - /// println!("Hello world"); - /// }); - /// }); - /// ``` - pub fn enter(&self, f: impl FnOnce() -> T) -> T { - if EX.is_set() { - panic!("cannot call `Executor::enter()` if already inside an `Executor`"); - } - EX.set(self, f) + // Create a task, push it into the queue by scheduling it, and return its `Task` handle. + let (runnable, handle) = async_task::spawn(future, self.schedule(), ()); + runnable.schedule(); + Task(Some(handle)) } /// Runs the executor until the given future completes. @@ -157,41 +364,49 @@ impl Executor { /// /// ``` /// use async_executor::Executor; + /// use futures_lite::future; /// /// let ex = Executor::new(); /// /// let task = ex.spawn(async { 1 + 2 }); - /// let res = ex.run(async { task.await * 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); /// /// assert_eq!(res, 6); /// ``` - pub fn run(&self, future: impl Future) -> T { - self.enter(|| { - let (p, u) = parking::pair(); - - let ticker = self.ex.ticker({ - let u = u.clone(); - move || u.unpark() - }); - - pin!(future); - let waker = waker_fn(move || u.unpark()); - let cx = &mut Context::from_waker(&waker); - - 'start: loop { - if let Poll::Ready(t) = future.as_mut().poll(cx) { - break t; - } + pub async fn run(&self, future: impl Future) -> T { + let ticker = Ticker::new(self.state()); + future::race( + future, + future::poll_fn(|cx| { + // Run a batch of tasks. for _ in 0..200 { - if !ticker.tick() { - p.park(); - continue 'start; + if !ticker.tick(cx.waker()) { + return Poll::Pending; } } - p.park_timeout(Duration::from_secs(0)); - } - }) + + // If there are more tasks, yield. + cx.waker().wake_by_ref(); + Poll::Pending + }), + ) + .await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state().clone(); + + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } + + /// Returns a reference to the inner state. + fn state(&self) -> &Arc { + self.state.get_or_init(|| Arc::new(State::new())) } } @@ -201,63 +416,203 @@ impl Default for Executor { } } -/// A spawner for a multi-threaded executor. +/// Executes tasks in a work-stealing executor. +/// +/// A ticker represents a "worker" in a work-stealing executor. #[derive(Debug)] -pub struct Spawner { - ex: Arc, -} +struct Ticker<'a> { + /// The executor state. + state: &'a State, -impl Spawner { - /// Gets a spawner for the current multi-threaded executor. - /// - /// If called from an [`Executor`], returns its [`Spawner`]. - /// - /// Otherwise, this method panics. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{Executor, Spawner}; + /// A shard of the global queue. + shard: Arc>, + + /// Set to `true` when in sleeping state. /// - /// let ex = Executor::new(); + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: Cell>, + + /// Bumped every time a task is run. + ticks: Cell, +} + +impl UnwindSafe for Ticker<'_> {} +impl RefUnwindSafe for Ticker<'_> {} + +impl Ticker<'_> { + /// Creates a ticker and registers it in the executor state. + fn new(state: &State) -> Ticker<'_> { + let ticker = Ticker { + state, + shard: Arc::new(ConcurrentQueue::bounded(512)), + sleeping: Cell::new(None), + ticks: Cell::new(0), + }; + state.shards.write().unwrap().push(ticker.shard.clone()); + ticker + } + + /// Moves the ticker into sleeping and unnotified state. /// - /// ex.run(async { - /// let spawner = Spawner::current(); - /// let task = spawner.spawn(async { 1 + 2 }); - /// assert_eq!(task.await, 3); - /// }); - /// ``` - pub fn current() -> Spawner { - if EX.is_set() { - EX.with(|ex| ex.spawner()) - } else { - panic!("`Spawner::current()` must be called from an `Executor`") + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.lock().unwrap(); + + match self.sleeping.get() { + // Move to sleeping state. + None => self.sleeping.set(Some(sleepers.insert(waker))), + + // Already sleeping, check if notified. + Some(id) => { + if !sleepers.update(id, waker) { + return false; + } + } } + + self.state + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + true } - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; + /// Moves the ticker into woken state. /// - /// let ex = Executor::new(); - /// let spawner = ex.spawner(); - /// - /// let task = spawner.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - pub fn spawn( - &self, - future: impl Future + Send + 'static, - ) -> Task { - Task(self.ex.spawn(future)) + /// Returns `false` if the ticker was already woken. + fn wake(&self) { + if let Some(id) = self.sleeping.take() { + let mut sleepers = self.state.sleepers.lock().unwrap(); + sleepers.remove(id); + + self.state + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + } + } + + /// Executes a single task. + /// + /// This method takes a scheduled task and polls its future. It returns `true` if a scheduled + /// task was found, or `false` otherwise. + pub fn tick(&self, waker: &Waker) -> bool { + loop { + match self.search() { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(waker) { + // If already sleeping and unnotified, return. + return false; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + // Bump the ticker. + let ticks = self.ticks.get(); + self.ticks.set(ticks.wrapping_add(1)); + + // Steal tasks from the global queue to ensure fair task scheduling. + if ticks % 64 == 0 { + steal(&self.state.queue, &self.shard); + } + + // Run the task. + r.run(); + + return true; + } + } + } + } + + /// Finds the next task to run. + fn search(&self) -> Option { + if let Ok(r) = self.shard.pop() { + return Some(r); + } + + // Try stealing from the global queue. + if let Ok(r) = self.state.queue.pop() { + steal(&self.state.queue, &self.shard); + return Some(r); + } + + // Try stealing from other shards. + let shards = self.state.shards.read().unwrap(); + + // Pick a random starting point in the iterator list and rotate the list. + let n = shards.len(); + let start = fastrand::usize(..n); + let iter = shards.iter().chain(shards.iter()).skip(start).take(n); + + // Remove this ticker's shard. + let iter = iter.filter(|shard| !Arc::ptr_eq(shard, &self.shard)); + + // Try stealing from each shard in the list. + for shard in iter { + steal(shard, &self.shard); + if let Ok(r) = self.shard.pop() { + return Some(r); + } + } + + None + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // Wake and unregister the ticker. + self.wake(); + self.state + .shards + .write() + .unwrap() + .retain(|shard| !Arc::ptr_eq(shard, &self.shard)); + + // Re-schedule remaining tasks in the shard. + while let Ok(r) = self.shard.pop() { + r.schedule(); + } + // Notify another ticker to start searching for tasks. + self.state.notify(); + + // TODO(stjepang): Cancel all remaining tasks. } } -/// Single-threaded executor. +/// Steals some items from one queue into another. +fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { + // Half of `src`'s length rounded up. + let mut count = (src.len() + 1) / 2; + + if count > 0 { + // Don't steal more than fits into the queue. + if let Some(cap) = dest.capacity() { + count = count.min(cap - dest.len()); + } + + // Steal tasks. + for _ in 0..count { + if let Ok(t) = src.pop() { + assert!(dest.push(t).is_ok()); + } else { + break; + } + } + } +} + +/// A thread-local executor. /// /// The executor can only be run on the thread that created it. /// @@ -265,17 +620,21 @@ impl Spawner { /// /// ``` /// use async_executor::LocalExecutor; +/// use futures_lite::future; /// /// let local_ex = LocalExecutor::new(); /// -/// local_ex.run(async { +/// future::block_on(local_ex.run(async { /// println!("Hello world!"); -/// }); +/// })); /// ``` #[derive(Debug)] pub struct LocalExecutor { - ex: multitask::LocalExecutor, - parker: parking::Parker, + /// The inner executor. + inner: once_cell::unsync::OnceCell, + + /// Make sure the type is `!Send` and `!Sync`. + _marker: PhantomData>, } impl LocalExecutor { @@ -288,11 +647,10 @@ impl LocalExecutor { /// /// let local_ex = LocalExecutor::new(); /// ``` - pub fn new() -> LocalExecutor { - let (p, u) = parking::pair(); + pub const fn new() -> LocalExecutor { LocalExecutor { - ex: multitask::LocalExecutor::new(move || u.unpark()), - parker: p, + inner: once_cell::unsync::OnceCell::new(), + _marker: PhantomData, } } @@ -310,7 +668,10 @@ impl LocalExecutor { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'static) -> Task { - Task(self.ex.spawn(future)) + // Create a task, push it into the queue by scheduling it, and return its `Task` handle. + let (runnable, handle) = async_task::spawn_local(future, self.schedule(), ()); + runnable.schedule(); + Task(Some(handle)) } /// Runs the executor until the given future completes. @@ -319,207 +680,37 @@ impl LocalExecutor { /// /// ``` /// use async_executor::LocalExecutor; + /// use futures_lite::future; /// /// let local_ex = LocalExecutor::new(); /// /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = local_ex.run(async { task.await * 2 }); + /// let res = future::block_on(local_ex.run(async { task.await * 2 })); /// /// assert_eq!(res, 6); /// ``` - pub fn run(&self, future: impl Future) -> T { - pin!(future); - - let u = self.parker.unparker(); - let waker = waker_fn(move || u.unpark()); - let cx = &mut Context::from_waker(&waker); - - LOCAL_EX.set(self, || { - 'start: loop { - if let Poll::Ready(t) = future.as_mut().poll(cx) { - break t; - } - - for _ in 0..200 { - if !self.ex.tick() { - self.parker.park(); - continue 'start; - } - } - self.parker.park_timeout(Duration::from_secs(0)); - } - }) - } -} - -impl Default for LocalExecutor { - fn default() -> LocalExecutor { - LocalExecutor::new() + pub async fn run(&self, future: impl Future) -> T { + self.inner().run(future).await } -} - -/// A spawned future. -/// -/// Tasks are also futures themselves and yield the output of the spawned future. -/// -/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit -/// more gracefully and wait until it stops running, use the [`cancel()`][`Task::cancel()`] method. -/// -/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. -/// -/// # Examples -/// -/// ``` -/// use async_executor::{Executor, Task}; -/// -/// let ex = Executor::new(); -/// -/// ex.run(async { -/// let task = Task::spawn(async { -/// println!("Hello from a task!"); -/// 1 + 2 -/// }); -/// -/// assert_eq!(task.await, 3); -/// }); -/// ``` -#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -#[derive(Debug)] -pub struct Task(multitask::Task); -impl Task { - /// Spawns a task onto the current multi-threaded or single-threaded executor. - /// - /// If called from an [`Executor`] (preferred) or from a [`LocalExecutor`], the task is spawned - /// on it. - /// - /// Otherwise, this method panics. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{Executor, Task}; - /// - /// let ex = Executor::new(); - /// - /// ex.run(async { - /// let task = Task::spawn(async { 1 + 2 }); - /// assert_eq!(task.await, 3); - /// }); - /// ``` - /// - /// ``` - /// use async_executor::{LocalExecutor, Task}; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// local_ex.run(async { - /// let task = Task::spawn(async { 1 + 2 }); - /// assert_eq!(task.await, 3); - /// }); - /// ``` - pub fn spawn(future: impl Future + Send + 'static) -> Task - where - T: Send + 'static, - { - if EX.is_set() { - EX.with(|ex| ex.spawn(future)) - } else if LOCAL_EX.is_set() { - LOCAL_EX.with(|local_ex| local_ex.spawn(future)) - } else { - panic!("`Task::spawn()` must be called from an `Executor` or `LocalExecutor`") - } - } + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.inner().state().clone(); - /// Spawns a task onto the current single-threaded executor. - /// - /// If called from a [`LocalExecutor`], the task is spawned on it. - /// - /// Otherwise, this method panics. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{LocalExecutor, Task}; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// local_ex.run(async { - /// let task = Task::local(async { 1 + 2 }); - /// assert_eq!(task.await, 3); - /// }); - /// ``` - pub fn local(future: impl Future + 'static) -> Task - where - T: 'static, - { - if LOCAL_EX.is_set() { - LOCAL_EX.with(|local_ex| local_ex.spawn(future)) - } else { - panic!("`Task::local()` must be called from a `LocalExecutor`") + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); } } - /// Detaches the task to let it keep running in the background. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{Executor, Task}; - /// use futures_lite::future; - /// - /// let ex = Executor::new(); - /// - /// ex.spawn(async { - /// loop { - /// println!("I'm a background task looping forever."); - /// future::yield_now().await; - /// } - /// }) - /// .detach(); - /// - /// ex.run(future::yield_now()); - /// ``` - pub fn detach(self) { - self.0.detach(); - } - - /// Cancels the task and waits for it to stop running. - /// - /// Returns the task's output if it was completed just before it got canceled, or [`None`] if - /// it didn't complete. - /// - /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of - /// canceling because it also waits for the task to stop running. - /// - /// # Examples - /// - /// ``` - /// use async_executor::{Executor, Task}; - /// use futures_lite::future; - /// - /// let ex = Executor::new(); - /// - /// let task = ex.spawn(async { - /// loop { - /// println!("Even though I'm in an infinite loop, you can still cancel me!"); - /// future::yield_now().await; - /// } - /// }); - /// - /// ex.run(async { - /// task.cancel().await; - /// }); - /// ``` - pub async fn cancel(self) -> Option { - self.0.cancel().await + /// Returns a reference to the inner executor. + fn inner(&self) -> &Executor { + self.inner.get_or_init(|| Executor::new()) } } -impl Future for Task { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx) +impl Default for LocalExecutor { + fn default() -> LocalExecutor { + LocalExecutor::new() } }