From ebeb78ed40027032feb77c89a1de4b58d2dcafbf Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 16 Sep 2022 14:05:09 -0700 Subject: [PATCH] rt: split internal runtime::Handle concerns (#5022) The `runtime::Handle` struct is part of the public API but is also used internally. This has created a bit of tension. An earlier patch made defined Handle as a private struct in some cases when `rt` is not enabled. This patch splits out internal handle concerns into a new `scheduler::Handle` type, which will only be internal. This also defines a `Handle` type for each scheduler variant. Eventually, the per-scheduler `Handle` types will replace the per-scheduler `Spawner` types, but more work is needed before we can make that change. --- tokio/src/runtime/blocking/pool.rs | 4 +- tokio/src/runtime/builder.rs | 21 +- tokio/src/runtime/context.rs | 27 +- tokio/src/runtime/handle.rs | 644 ++++++++---------- tokio/src/runtime/metrics/runtime.rs | 18 +- tokio/src/runtime/mod.rs | 10 +- tokio/src/runtime/scheduler/current_thread.rs | 33 +- tokio/src/runtime/scheduler/mod.rs | 159 ++++- .../runtime/scheduler/multi_thread/handle.rs | 45 ++ .../src/runtime/scheduler/multi_thread/mod.rs | 29 +- tokio/src/runtime/spawner.rs | 84 --- tokio/src/runtime/time/entry.rs | 13 +- tokio/src/runtime/time/tests/mod.rs | 36 +- tokio/src/task/builder.rs | 2 +- tokio/src/task/spawn.rs | 9 +- tokio/src/time/sleep.rs | 117 ++-- 16 files changed, 649 insertions(+), 602 deletions(-) create mode 100644 tokio/src/runtime/scheduler/multi_thread/handle.rs delete mode 100644 tokio/src/runtime/spawner.rs diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index d892ac111b4..79bc15c6ecd 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -154,7 +154,7 @@ cfg_fs! { R: Send + 'static, { let rt = context::current(); - rt.as_inner().blocking_spawner.spawn_mandatory_blocking(&rt, func) + rt.inner.blocking_spawner().spawn_mandatory_blocking(&rt, func) } } @@ -419,7 +419,7 @@ impl Spawner { builder.spawn(move || { // Only the reference should be moved into the closure let _enter = crate::runtime::context::enter(rt.clone()); - rt.as_inner().blocking_spawner.inner.run(id); + rt.inner.blocking_spawner().inner.run(id); drop(shutdown_tx); }) } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 4e93251b2f1..103203b0dba 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,5 +1,5 @@ use crate::runtime::handle::Handle; -use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, Callback, Runtime}; use crate::util::{RngSeed, RngSeedGenerator}; use std::fmt; @@ -874,7 +874,8 @@ impl Builder { } fn build_current_thread_runtime(&mut self) -> io::Result { - use crate::runtime::{Config, CurrentThread, HandleInner, Scheduler}; + use crate::runtime::scheduler::{self, current_thread, CurrentThread}; + use crate::runtime::{Config, Scheduler}; use std::sync::Arc; let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; @@ -900,14 +901,13 @@ impl Builder { seed_generator: self.seed_generator.next_generator(), }, ); - let spawner = Spawner::CurrentThread(scheduler.spawner().clone()); - - let inner = Arc::new(HandleInner { - spawner, + let inner = Arc::new(current_thread::Handle { + spawner: scheduler.spawner().clone(), driver: driver_handle, blocking_spawner, seed_generator: self.seed_generator.next_generator(), }); + let inner = scheduler::Handle::CurrentThread(inner); Ok(Runtime { scheduler: Scheduler::CurrentThread(scheduler), @@ -993,7 +993,8 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Config, HandleInner, Scheduler, MultiThread}; + use crate::runtime::{Config, Scheduler}; + use crate::runtime::scheduler::{self, multi_thread, MultiThread}; use std::sync::Arc; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); @@ -1019,14 +1020,14 @@ cfg_rt_multi_thread! { seed_generator: self.seed_generator.next_generator(), }, ); - let spawner = Spawner::MultiThread(scheduler.spawner().clone()); - let inner = Arc::new(HandleInner { - spawner, + let inner = Arc::new(multi_thread::Handle { + spawner: scheduler.spawner().clone(), driver: driver_handle, blocking_spawner, seed_generator: self.seed_generator.next_generator(), }); + let inner = scheduler::Handle::MultiThread(inner); // Create the runtime handle let handle = Handle { inner }; diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 53a01cfdaeb..c35bf806c8c 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -31,8 +31,8 @@ cfg_io_driver! { let ctx = ctx.borrow(); ctx.as_ref() .expect(crate::util::error::CONTEXT_MISSING_ERROR) - .as_inner() - .driver + .inner + .driver() .io .clone() }) { @@ -49,9 +49,8 @@ cfg_signal_internal! { let ctx = ctx.borrow(); ctx.as_ref() .expect(crate::util::error::CONTEXT_MISSING_ERROR) - .as_inner() - .driver - .signal + .inner + .signal() .clone() }) { Ok(signal_handle) => signal_handle, @@ -65,10 +64,9 @@ cfg_time! { pub(crate) fn clock() -> Option { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref() - .map(|ctx| { - ctx.as_inner().driver.clock.clone() - }) + ctx + .as_ref() + .map(|ctx| ctx.inner.clock().clone()) }) { Ok(clock) => clock, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -77,15 +75,6 @@ cfg_time! { } } -cfg_rt! { - pub(crate) fn spawn_handle() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.inner.spawner.clone())) { - Ok(spawner) => spawner, - Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), - } - } -} - /// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle @@ -100,7 +89,7 @@ pub(crate) fn enter(new: Handle) -> EnterGuard { /// /// [`Handle`]: Handle pub(crate) fn try_enter(new: Handle) -> Option { - let rng_seed = new.as_inner().seed_generator.next_seed(); + let rng_seed = new.inner.seed_generator().next_seed(); let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?; let old_seed = replace_thread_rng(rng_seed); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 98b06fc071a..c7e3ce9ec37 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,12 +1,4 @@ -// When the runtime refactor is done, this should be removed. -#![cfg_attr(not(feature = "rt"), allow(dead_code))] - -use crate::runtime::driver; - -#[cfg(feature = "rt")] -use crate::util::RngSeedGenerator; - -use std::sync::Arc; +use crate::runtime::scheduler; /// Handle to the runtime. /// @@ -17,386 +9,338 @@ use std::sync::Arc; #[derive(Debug, Clone)] // When the `rt` feature is *not* enabled, this type is still defined, but not // included in the public API. -#[cfg_attr(not(feature = "rt"), allow(unreachable_pub))] pub struct Handle { - pub(super) inner: Arc, + pub(crate) inner: scheduler::Handle, } -/// All internal handles that are *not* the scheduler's spawner. -#[derive(Debug)] -pub(crate) struct HandleInner { - #[cfg(feature = "rt")] - pub(super) spawner: Spawner, - - /// Resource driver handles - #[cfg_attr(not(feature = "full"), allow(dead_code))] - pub(super) driver: driver::Handle, +use crate::runtime::context; +use crate::runtime::task::JoinHandle; +use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; - /// Blocking pool spawner - #[cfg(feature = "rt")] - pub(crate) blocking_spawner: blocking::Spawner, +use std::future::Future; +use std::marker::PhantomData; +use std::{error, fmt}; - /// Current random number generator seed - #[cfg(feature = "rt")] - pub(super) seed_generator: RngSeedGenerator, +/// Runtime context guard. +/// +/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits +/// the runtime context on drop. +/// +/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter +#[derive(Debug)] +#[must_use = "Creating and dropping a guard does nothing"] +pub struct EnterGuard<'a> { + _guard: context::EnterGuard, + _handle_lifetime: PhantomData<&'a Handle>, } -cfg_rt! { - use crate::runtime::task::JoinHandle; - use crate::runtime::{blocking, context, Spawner}; - use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; - - use std::future::Future; - use std::marker::PhantomData; - use std::{error, fmt}; +impl Handle { + /// Enters the runtime context. This allows you to construct types that must + /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. + /// It will also allow you to call methods such as [`tokio::spawn`] and [`Handle::current`] + /// without panicking. + /// + /// [`Sleep`]: struct@crate::time::Sleep + /// [`TcpStream`]: struct@crate::net::TcpStream + /// [`tokio::spawn`]: fn@crate::spawn + pub fn enter(&self) -> EnterGuard<'_> { + EnterGuard { + _guard: context::enter(self.clone()), + _handle_lifetime: PhantomData, + } + } - /// Runtime context guard. + /// Returns a `Handle` view over the currently running `Runtime`. + /// + /// # Panics + /// + /// This will panic if called outside the context of a Tokio runtime. That means that you must + /// call this on one of the threads **being run by the runtime**, or from a thread with an active + /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example) + /// will cause a panic unless that thread has an active `EnterGuard`. + /// + /// # Examples + /// + /// This can be used to obtain the handle of the surrounding runtime from an async + /// block or function running on that runtime. /// - /// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits - /// the runtime context on drop. + /// ``` + /// # use std::thread; + /// # use tokio::runtime::Runtime; + /// # fn dox() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.spawn(async { + /// use tokio::runtime::Handle; /// - /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter - #[derive(Debug)] - #[must_use = "Creating and dropping a guard does nothing"] - pub struct EnterGuard<'a> { - _guard: context::EnterGuard, - _handle_lifetime: PhantomData<&'a Handle>, + /// // Inside an async block or function. + /// let handle = Handle::current(); + /// handle.spawn(async { + /// println!("now running in the existing Runtime"); + /// }); + /// + /// # let handle = + /// thread::spawn(move || { + /// // Notice that the handle is created outside of this thread and then moved in + /// handle.spawn(async { /* ... */ }); + /// // This next line would cause a panic because we haven't entered the runtime + /// // and created an EnterGuard + /// // let handle2 = Handle::current(); // panic + /// // So we create a guard here with Handle::enter(); + /// let _guard = handle.enter(); + /// // Now we can call Handle::current(); + /// let handle2 = Handle::current(); + /// }); + /// # handle.join().unwrap(); + /// # }); + /// # } + /// ``` + #[track_caller] + pub fn current() -> Self { + context::current() } - impl Handle { - /// Enters the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`] and [`Handle::current`] - /// without panicking. - /// - /// [`Sleep`]: struct@crate::time::Sleep - /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`tokio::spawn`]: fn@crate::spawn - pub fn enter(&self) -> EnterGuard<'_> { - EnterGuard { - _guard: context::enter(self.clone()), - _handle_lifetime: PhantomData, - } - } - - /// Returns a `Handle` view over the currently running `Runtime`. - /// - /// # Panics - /// - /// This will panic if called outside the context of a Tokio runtime. That means that you must - /// call this on one of the threads **being run by the runtime**, or from a thread with an active - /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example) - /// will cause a panic unless that thread has an active `EnterGuard`. - /// - /// # Examples - /// - /// This can be used to obtain the handle of the surrounding runtime from an async - /// block or function running on that runtime. - /// - /// ``` - /// # use std::thread; - /// # use tokio::runtime::Runtime; - /// # fn dox() { - /// # let rt = Runtime::new().unwrap(); - /// # rt.spawn(async { - /// use tokio::runtime::Handle; - /// - /// // Inside an async block or function. - /// let handle = Handle::current(); - /// handle.spawn(async { - /// println!("now running in the existing Runtime"); - /// }); - /// - /// # let handle = - /// thread::spawn(move || { - /// // Notice that the handle is created outside of this thread and then moved in - /// handle.spawn(async { /* ... */ }); - /// // This next line would cause a panic because we haven't entered the runtime - /// // and created an EnterGuard - /// // let handle2 = Handle::current(); // panic - /// // So we create a guard here with Handle::enter(); - /// let _guard = handle.enter(); - /// // Now we can call Handle::current(); - /// let handle2 = Handle::current(); - /// }); - /// # handle.join().unwrap(); - /// # }); - /// # } - /// ``` - #[track_caller] - pub fn current() -> Self { - context::current() - } - - /// Returns a Handle view over the currently running Runtime - /// - /// Returns an error if no Runtime has been started - /// - /// Contrary to `current`, this never panics - pub fn try_current() -> Result { - context::try_current() - } - - /// Spawns a future onto the Tokio runtime. - /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// // Get a handle from this runtime - /// let handle = rt.handle(); - /// - /// // Spawn a future onto the runtime using the handle - /// handle.spawn(async { - /// println!("now running on a worker thread"); - /// }); - /// # } - /// ``` - #[track_caller] - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawn_named(future, None) - } - - /// Runs the provided function on an executor dedicated to blocking. - /// operations. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// // Get a handle from this runtime - /// let handle = rt.handle(); - /// - /// // Spawn a blocking function onto the runtime using the handle - /// handle.spawn_blocking(|| { - /// println!("now running on a worker thread"); - /// }); - /// # } - #[track_caller] - pub fn spawn_blocking(&self, func: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.as_inner().blocking_spawner.spawn_blocking(self, func) - } - - pub(crate) fn as_inner(&self) -> &HandleInner { - &self.inner - } - - /// Runs a future to completion on this `Handle`'s associated `Runtime`. - /// - /// This runs the given future on the current thread, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// When this is used on a `current_thread` runtime, only the - /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the - /// `Handle::block_on` method cannot drive them. This means that, when using - /// this method on a current_thread runtime, anything that relies on IO or - /// timers will not work unless there is another thread currently calling - /// [`Runtime::block_on`] on the same runtime. - /// - /// # If the runtime has been shut down - /// - /// If the `Handle`'s associated `Runtime` has been shut down (through - /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by - /// dropping it) and `Handle::block_on` is used it might return an error or - /// panic. Specifically IO resources will return an error and timers will - /// panic. Runtime independent futures will run as normal. - /// - /// # Panics - /// - /// This function panics if the provided future panics, if called within an - /// asynchronous execution context, or if a timer future is executed on a - /// runtime that has been shut down. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Get a handle from this runtime - /// let handle = rt.handle(); - /// - /// // Execute the future, blocking the current thread until completion - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// ``` - /// - /// Or using `Handle::current`: - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main () { - /// let handle = Handle::current(); - /// std::thread::spawn(move || { - /// // Using Handle::block_on to run async code in the new thread. - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// }); - /// } - /// ``` - /// - /// [`JoinError`]: struct@crate::task::JoinError - /// [`JoinHandle`]: struct@crate::task::JoinHandle - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background - /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout - /// [`spawn_blocking`]: crate::task::spawn_blocking - /// [`tokio::fs`]: crate::fs - /// [`tokio::net`]: crate::net - /// [`tokio::time`]: crate::time - #[track_caller] - pub fn block_on(&self, future: F) -> F::Output { - #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = - crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); - - // Enter the **runtime** context. This configures spawning, the current I/O driver, ... - let _rt_enter = self.enter(); - - // Enter a **blocking** context. This prevents blocking from a runtime. - let mut blocking_enter = crate::runtime::enter(true); - - // Block on the future - blocking_enter - .block_on(future) - .expect("failed to park thread") - } - - #[track_caller] - pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let id = crate::runtime::task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", _name, id.as_u64()); - self.inner.spawner.spawn(future, id) - } + /// Returns a Handle view over the currently running Runtime + /// + /// Returns an error if no Runtime has been started + /// + /// Contrary to `current`, this never panics + pub fn try_current() -> Result { + context::try_current() + } - pub(crate) fn shutdown(&self) { - self.inner.spawner.shutdown(); - } + /// Spawns a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a future onto the runtime using the handle + /// handle.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + #[track_caller] + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawn_named(future, None) } - cfg_metrics! { - use crate::runtime::RuntimeMetrics; + /// Runs the provided function on an executor dedicated to blocking. + /// operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a blocking function onto the runtime using the handle + /// handle.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } + #[track_caller] + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.inner.blocking_spawner().spawn_blocking(self, func) + } - impl Handle { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn metrics(&self) -> RuntimeMetrics { - RuntimeMetrics::new(self.clone()) - } - } + /// Runs a future to completion on this `Handle`'s associated `Runtime`. + /// + /// This runs the given future on the current thread, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// When this is used on a `current_thread` runtime, only the + /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the + /// `Handle::block_on` method cannot drive them. This means that, when using + /// this method on a current_thread runtime, anything that relies on IO or + /// timers will not work unless there is another thread currently calling + /// [`Runtime::block_on`] on the same runtime. + /// + /// # If the runtime has been shut down + /// + /// If the `Handle`'s associated `Runtime` has been shut down (through + /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by + /// dropping it) and `Handle::block_on` is used it might return an error or + /// panic. Specifically IO resources will return an error and timers will + /// panic. Runtime independent futures will run as normal. + /// + /// # Panics + /// + /// This function panics if the provided future panics, if called within an + /// asynchronous execution context, or if a timer future is executed on a + /// runtime that has been shut down. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// Or using `Handle::current`: + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main () { + /// let handle = Handle::current(); + /// std::thread::spawn(move || { + /// // Using Handle::block_on to run async code in the new thread. + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// }); + /// } + /// ``` + /// + /// [`JoinError`]: struct@crate::task::JoinError + /// [`JoinHandle`]: struct@crate::task::JoinHandle + /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on + /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background + /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [`spawn_blocking`]: crate::task::spawn_blocking + /// [`tokio::fs`]: crate::fs + /// [`tokio::net`]: crate::net + /// [`tokio::time`]: crate::time + #[track_caller] + pub fn block_on(&self, future: F) -> F::Output { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = + crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); + + // Enter the **runtime** context. This configures spawning, the current I/O driver, ... + let _rt_enter = self.enter(); + + // Enter a **blocking** context. This prevents blocking from a runtime. + let mut blocking_enter = crate::runtime::enter(true); + + // Block on the future + blocking_enter + .block_on(future) + .expect("failed to park thread") } - /// Error returned by `try_current` when no Runtime has been started - #[derive(Debug)] - pub struct TryCurrentError { - kind: TryCurrentErrorKind, + #[track_caller] + pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let id = crate::runtime::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "task", _name, id.as_u64()); + self.inner.spawn(future, id) } +} - impl TryCurrentError { - pub(crate) fn new_no_context() -> Self { - Self { - kind: TryCurrentErrorKind::NoContext, - } - } +cfg_metrics! { + use crate::runtime::RuntimeMetrics; - pub(crate) fn new_thread_local_destroyed() -> Self { - Self { - kind: TryCurrentErrorKind::ThreadLocalDestroyed, - } + impl Handle { + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> RuntimeMetrics { + RuntimeMetrics::new(self.clone()) } + } +} - /// Returns true if the call failed because there is currently no runtime in - /// the Tokio context. - pub fn is_missing_context(&self) -> bool { - matches!(self.kind, TryCurrentErrorKind::NoContext) - } +/// Error returned by `try_current` when no Runtime has been started +#[derive(Debug)] +pub struct TryCurrentError { + kind: TryCurrentErrorKind, +} - /// Returns true if the call failed because the Tokio context thread-local - /// had been destroyed. This can usually only happen if in the destructor of - /// other thread-locals. - pub fn is_thread_local_destroyed(&self) -> bool { - matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed) +impl TryCurrentError { + pub(crate) fn new_no_context() -> Self { + Self { + kind: TryCurrentErrorKind::NoContext, } } - enum TryCurrentErrorKind { - NoContext, - ThreadLocalDestroyed, + pub(crate) fn new_thread_local_destroyed() -> Self { + Self { + kind: TryCurrentErrorKind::ThreadLocalDestroyed, + } } - impl fmt::Debug for TryCurrentErrorKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use TryCurrentErrorKind::*; - match self { - NoContext => f.write_str("NoContext"), - ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), - } - } + /// Returns true if the call failed because there is currently no runtime in + /// the Tokio context. + pub fn is_missing_context(&self) -> bool { + matches!(self.kind, TryCurrentErrorKind::NoContext) } - impl fmt::Display for TryCurrentError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use TryCurrentErrorKind::*; - match self.kind { - NoContext => f.write_str(CONTEXT_MISSING_ERROR), - ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), - } - } + /// Returns true if the call failed because the Tokio context thread-local + /// had been destroyed. This can usually only happen if in the destructor of + /// other thread-locals. + pub fn is_thread_local_destroyed(&self) -> bool { + matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed) } +} - impl error::Error for TryCurrentError {} +enum TryCurrentErrorKind { + NoContext, + ThreadLocalDestroyed, } -cfg_not_rt! { - impl Handle { - pub(crate) fn current() -> Handle { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) +impl fmt::Debug for TryCurrentErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use TryCurrentErrorKind::*; + match self { + NoContext => f.write_str("NoContext"), + ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), } } } -cfg_time! { - impl Handle { - #[track_caller] - pub(crate) fn as_time_handle(&self) -> &crate::runtime::time::Handle { - self.inner.driver.time.as_ref() - .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") +impl fmt::Display for TryCurrentError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use TryCurrentErrorKind::*; + match self.kind { + NoContext => f.write_str(CONTEXT_MISSING_ERROR), + ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), } } } + +impl error::Error for TryCurrentError {} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index c3ebff369ab..6ad5735b0f7 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -39,7 +39,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn num_workers(&self) -> usize { - self.handle.inner.spawner.num_workers() + self.handle.inner.num_workers() } /// Returns the number of tasks scheduled from **outside** of the runtime. @@ -69,7 +69,6 @@ impl RuntimeMetrics { pub fn remote_schedule_count(&self) -> u64 { self.handle .inner - .spawner .scheduler_metrics() .remote_schedule_count .load(Relaxed) @@ -113,7 +112,6 @@ impl RuntimeMetrics { pub fn worker_park_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .park_count .load(Relaxed) @@ -157,7 +155,6 @@ impl RuntimeMetrics { pub fn worker_noop_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .noop_count .load(Relaxed) @@ -203,7 +200,6 @@ impl RuntimeMetrics { pub fn worker_steal_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .steal_count .load(Relaxed) @@ -245,7 +241,6 @@ impl RuntimeMetrics { pub fn worker_poll_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .poll_count .load(Relaxed) @@ -291,7 +286,6 @@ impl RuntimeMetrics { let nanos = self .handle .inner - .spawner .worker_metrics(worker) .busy_duration_total .load(Relaxed); @@ -338,7 +332,6 @@ impl RuntimeMetrics { pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .local_schedule_count .load(Relaxed) @@ -385,7 +378,6 @@ impl RuntimeMetrics { pub fn worker_overflow_count(&self, worker: usize) -> u64 { self.handle .inner - .spawner .worker_metrics(worker) .overflow_count .load(Relaxed) @@ -414,7 +406,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn injection_queue_depth(&self) -> usize { - self.handle.inner.spawner.injection_queue_depth() + self.handle.inner.injection_queue_depth() } /// Returns the number of tasks currently scheduled in the given worker's @@ -452,7 +444,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.handle.inner.spawner.worker_local_queue_depth(worker) + self.handle.inner.worker_local_queue_depth(worker) } } @@ -534,8 +526,8 @@ cfg_net! { // TODO: Investigate if this should return 0, most of our metrics always increase // thus this breaks that guarantee. self.handle - .as_inner() - .driver + .inner + .driver() .io .as_ref() .map(|h| f(h.metrics())) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index d6863b5044f..1ca9b944152 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -178,7 +178,7 @@ mod tests; mod driver; -pub(crate) mod handle; +pub(crate) mod scheduler; cfg_io_driver_impl! { pub(crate) mod io; @@ -193,7 +193,6 @@ cfg_rt! { pub(crate) mod task; - pub(crate) mod scheduler; use scheduler::CurrentThread; mod config; @@ -223,11 +222,8 @@ cfg_rt! { use self::enter::enter; + mod handle; pub use handle::{EnterGuard, Handle, TryCurrentError}; - pub(crate) use handle::HandleInner; - - mod spawner; - use self::spawner::Spawner; cfg_metrics! { mod metrics; @@ -572,7 +568,7 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.shutdown(); + self.handle.inner.shutdown(); self.blocking_pool.shutdown(Some(duration)); } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index c57fddd0900..431e2f1f083 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -2,13 +2,13 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; use crate::runtime::context::EnterGuard; -use crate::runtime::driver::{Driver, Unpark}; +use crate::runtime::driver::{self, Driver, Unpark}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::Config; +use crate::runtime::{blocking, Config}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; -use crate::util::{waker_ref, Wake, WakerRef}; +use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; @@ -37,6 +37,22 @@ pub(crate) struct CurrentThread { context_guard: Option, } +/// Handle to the current thread scheduler +#[derive(Debug)] +pub(crate) struct Handle { + /// Task spawner + pub(crate) spawner: Spawner, + + /// Resource driver handles + pub(crate) driver: driver::Handle, + + /// Blocking pool spawner + pub(crate) blocking_spawner: blocking::Spawner, + + /// Current random number generator seed + pub(crate) seed_generator: RngSeedGenerator, +} + /// Data required for executing the scheduler. The struct is passed around to /// a function that will perform the scheduling work and acts as a capability token. struct Core { @@ -391,15 +407,18 @@ impl Spawner { } cfg_metrics! { - impl Spawner { + impl Handle { pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.shared.scheduler_metrics + &self.spawner.shared.scheduler_metrics } pub(crate) fn injection_queue_depth(&self) -> usize { // TODO: avoid having to lock. The multi-threaded injection queue // could probably be used here. - self.shared.queue.lock() + self.spawner + .shared + .queue + .lock() .as_ref() .map(|queue| queue.len()) .unwrap_or(0) @@ -407,7 +426,7 @@ cfg_metrics! { pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); - &self.shared.worker_metrics + &self.spawner.shared.worker_metrics } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 6ca18614c49..fc277782dbb 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -1,7 +1,162 @@ -pub(crate) mod current_thread; -pub(crate) use current_thread::CurrentThread; +cfg_rt! { + pub(crate) mod current_thread; + pub(crate) use current_thread::CurrentThread; +} cfg_rt_multi_thread! { pub(crate) mod multi_thread; pub(crate) use multi_thread::MultiThread; } + +use crate::runtime::driver; + +#[derive(Debug, Clone)] +pub(crate) enum Handle { + #[cfg(feature = "rt")] + CurrentThread(Arc), + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + MultiThread(Arc), + + // TODO: This is to avoid triggering "dead code" warnings many other places + // in the codebase. Remove this during a later cleanup + #[cfg(not(feature = "rt"))] + #[allow(dead_code)] + Disabled, +} + +impl Handle { + #[cfg_attr(not(feature = "full"), allow(dead_code))] + pub(crate) fn driver(&self) -> &driver::Handle { + match *self { + #[cfg(feature = "rt")] + Handle::CurrentThread(ref h) => &h.driver, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(ref h) => &h.driver, + + #[cfg(not(feature = "rt"))] + Handle::Disabled => unreachable!(), + } + } + + cfg_time! { + #[track_caller] + pub(crate) fn time(&self) -> &crate::runtime::time::Handle { + self.driver() + .time + .as_ref() + .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") + } + + cfg_test_util! { + pub(crate) fn clock(&self) -> &driver::Clock { + &self.driver().clock + } + } + } +} + +cfg_rt! { + use crate::future::Future; + use crate::runtime::{blocking, task::Id}; + use crate::task::JoinHandle; + use crate::util::RngSeedGenerator; + + use std::sync::Arc; + + impl Handle { + pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { + match self { + Handle::CurrentThread(h) => &h.blocking_spawner, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => &h.blocking_spawner, + } + } + + pub(crate) fn spawn(&self, future: F, id: Id) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Handle::CurrentThread(h) => h.spawner.spawn(future, id), + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => h.spawner.spawn(future, id), + } + } + + pub(crate) fn shutdown(&self) { + match *self { + Handle::CurrentThread(_) => {}, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(ref h) => h.spawner.shutdown(), + } + } + + pub(crate) fn seed_generator(&self) -> &RngSeedGenerator { + match self { + Handle::CurrentThread(h) => &h.seed_generator, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => &h.seed_generator, + } + } + + #[cfg(unix)] + cfg_signal_internal! { + pub(crate) fn signal(&self) -> &driver::SignalHandle { + &self.driver().signal + } + } + } + + cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Handle { + pub(crate) fn num_workers(&self) -> usize { + match self { + Handle::CurrentThread(_) => 1, + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_workers(), + } + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + match self { + Handle::CurrentThread(handle) => handle.scheduler_metrics(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.scheduler_metrics(), + } + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + match self { + Handle::CurrentThread(handle) => handle.worker_metrics(worker), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.worker_metrics(worker), + } + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.injection_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.injection_queue_depth(), + } + } + + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + match self { + Handle::CurrentThread(handle) => handle.worker_metrics(worker).queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker), + } + } + } + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs new file mode 100644 index 00000000000..1b02afd4848 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -0,0 +1,45 @@ +use crate::runtime::scheduler::multi_thread::Spawner; +use crate::runtime::{blocking, driver}; +use crate::util::RngSeedGenerator; + +/// Handle to the multi thread scheduler +#[derive(Debug)] +pub(crate) struct Handle { + /// Task spawner + pub(crate) spawner: Spawner, + + /// Resource driver handles + pub(crate) driver: driver::Handle, + + /// Blocking pool spawner + pub(crate) blocking_spawner: blocking::Spawner, + + /// Current random number generator seed + pub(crate) seed_generator: RngSeedGenerator, +} + +cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Handle { + pub(crate) fn num_workers(&self) -> usize { + self.spawner.shared.worker_metrics.len() + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.spawner.shared.scheduler_metrics + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.spawner.shared.worker_metrics[worker] + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + self.spawner.shared.injection_queue_depth() + } + + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.spawner.shared.worker_local_queue_depth(worker) + } + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 4946698fc2c..1c71f1d2913 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -1,5 +1,8 @@ //! Multi-threaded runtime +mod handle; +pub(crate) use handle::Handle; + mod idle; use self::idle::Idle; @@ -104,32 +107,6 @@ impl Spawner { } } -cfg_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; - - impl Spawner { - pub(crate) fn num_workers(&self) -> usize { - self.shared.worker_metrics.len() - } - - pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.shared.scheduler_metrics - } - - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } - - pub(crate) fn injection_queue_depth(&self) -> usize { - self.shared.injection_queue_depth() - } - - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.shared.worker_local_queue_depth(worker) - } - } -} - impl fmt::Debug for Spawner { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Spawner").finish() diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs deleted file mode 100644 index e64b0cb2113..00000000000 --- a/tokio/src/runtime/spawner.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::future::Future; -use crate::runtime::scheduler::current_thread; -use crate::runtime::task::Id; -use crate::task::JoinHandle; - -cfg_rt_multi_thread! { - use crate::runtime::scheduler::multi_thread; -} - -#[derive(Debug, Clone)] -pub(crate) enum Spawner { - CurrentThread(current_thread::Spawner), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - MultiThread(multi_thread::Spawner), -} - -impl Spawner { - pub(crate) fn shutdown(&self) { - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - { - if let Spawner::MultiThread(spawner) = self { - spawner.shutdown(); - } - } - } - - pub(crate) fn spawn(&self, future: F, id: Id) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match self { - Spawner::CurrentThread(spawner) => spawner.spawn(future, id), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.spawn(future, id), - } - } -} - -cfg_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; - - impl Spawner { - pub(crate) fn num_workers(&self) -> usize { - match self { - Spawner::CurrentThread(_) => 1, - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.num_workers(), - } - } - - pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - match self { - Spawner::CurrentThread(spawner) => spawner.scheduler_metrics(), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.scheduler_metrics(), - } - } - - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - match self { - Spawner::CurrentThread(spawner) => spawner.worker_metrics(worker), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.worker_metrics(worker), - } - } - - pub(crate) fn injection_queue_depth(&self) -> usize { - match self { - Spawner::CurrentThread(spawner) => spawner.injection_queue_depth(), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.injection_queue_depth(), - } - } - - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - match self { - Spawner::CurrentThread(spawner) => spawner.worker_metrics(worker).queue_depth(), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.worker_local_queue_depth(worker), - } - } - } -} diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f4fcd254fc7..3b8907add7d 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -58,7 +58,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; -use crate::runtime::handle::Handle; +use crate::runtime::scheduler; use crate::sync::AtomicWaker; use crate::time::Instant; use crate::util::linked_list; @@ -285,7 +285,7 @@ impl StateCell { pub(crate) struct TimerEntry { /// Arc reference to the runtime handle. We can only free the driver after /// deregistering everything from their respective timer wheels. - driver: Handle, + driver: scheduler::Handle, /// Shared inner structure; this is part of an intrusive linked list, and /// therefore other references can exist to it while mutable references to /// Entry exist. @@ -490,9 +490,9 @@ unsafe impl linked_list::Link for TimerShared { impl TimerEntry { #[track_caller] - pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { + pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self { // Panic if the time driver is not enabled - let _ = handle.as_time_handle(); + let _ = handle.time(); let driver = handle.clone(); @@ -550,7 +550,7 @@ impl TimerEntry { unsafe { self.driver() - .reregister(&self.driver.inner.driver.io, tick, self.inner().into()); + .reregister(&self.driver.driver().io, tick, self.inner().into()); } } @@ -572,8 +572,7 @@ impl TimerEntry { } fn driver(&self) -> &super::Handle { - // At this point, we know the time_handle is Some. - self.driver.inner.driver.time.as_ref().unwrap() + self.driver.time() } } diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 3d62c753a06..1ba95f13de5 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -49,8 +49,8 @@ fn single_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_, - handle_.inner.driver.clock.now() + Duration::from_secs(1), + &handle_.inner, + handle_.inner.clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -62,7 +62,7 @@ fn single_timer() { thread::yield_now(); - let handle = handle.as_time_handle(); + let handle = handle.inner.time(); // This may or may not return Some (depending on how it races with the // thread). If it does return None, however, the timer should complete @@ -82,8 +82,8 @@ fn drop_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_, - handle_.inner.driver.clock.now() + Duration::from_secs(1), + &handle_.inner, + handle_.inner.clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -97,7 +97,7 @@ fn drop_timer() { thread::yield_now(); - let handle = handle.as_time_handle(); + let handle = handle.inner.time(); // advance 2s in the future. handle.process_at_time(handle.time_source().now() + 2_000_000_000); @@ -115,8 +115,8 @@ fn change_waker() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_, - handle_.inner.driver.clock.now() + Duration::from_secs(1), + &handle_.inner, + handle_.inner.clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -132,7 +132,7 @@ fn change_waker() { thread::yield_now(); - let handle = handle.as_time_handle(); + let handle = handle.inner.time(); // advance 2s handle.process_at_time(handle.time_source().now() + 2_000_000_000); @@ -151,10 +151,10 @@ fn reset_future() { let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); - let start = handle.inner.driver.clock.now(); + let start = handle.inner.clock().now(); let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); + let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -174,7 +174,7 @@ fn reset_future() { thread::yield_now(); - let handle = handle.as_time_handle(); + let handle = handle.inner.time(); // This may or may not return a wakeup time. handle.process_at_time( @@ -216,8 +216,8 @@ fn poll_process_levels() { for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( - &handle, - handle.inner.driver.clock.now() + Duration::from_millis(i), + &handle.inner, + handle.inner.clock().now() + Duration::from_millis(i), )); let _ = entry @@ -228,7 +228,7 @@ fn poll_process_levels() { } for t in 1..normal_or_miri(1024, 64) { - handle.as_time_handle().process_at_time(t as u64); + handle.inner.time().process_at_time(t as u64); for (deadline, future) in entries.iter_mut().enumerate() { let mut context = Context::from_waker(noop_waker_ref()); @@ -250,12 +250,12 @@ fn poll_process_levels_targeted() { let handle = rt.handle(); let e1 = TimerEntry::new( - &handle, - handle.inner.driver.clock.now() + Duration::from_millis(193), + &handle.inner, + handle.inner.clock().now() + Duration::from_millis(193), ); pin!(e1); - let handle = handle.as_time_handle(); + let handle = handle.inner.time(); handle.process_at_time(62); assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index e18636d96ae..007d8a4474a 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -187,7 +187,7 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, spawn_result) = handle.as_inner().blocking_spawner.spawn_blocking_inner( + let (join_handle, spawn_result) = handle.inner.blocking_spawner().spawn_blocking_inner( function, Mandatory::NonMandatory, self.name, diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 5a60f9d66e6..3fdd0357e80 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,4 +1,5 @@ -use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR}; +use crate::runtime::Handle; +use crate::task::JoinHandle; use std::future::Future; @@ -142,10 +143,10 @@ cfg_rt! { T: Future + Send + 'static, T::Output: Send + 'static, { - use crate::runtime::{task, context}; + use crate::runtime::task; let id = task::Id::next(); - let spawn_handle = context::spawn_handle().expect(CONTEXT_MISSING_ERROR); let task = crate::util::trace::task(future, "task", name, id.as_u64()); - spawn_handle.spawn(task, id) + let handle = Handle::current(); + handle.inner.spawn(task, id) } } diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 11f82de874c..b2e77515982 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,4 +1,3 @@ -use crate::runtime::handle::Handle; #[cfg(all(tokio_unstable, feature = "tracing"))] use crate::runtime::time::TimeSource; use crate::runtime::time::TimerEntry; @@ -252,63 +251,77 @@ cfg_not_trace! { } impl Sleep { - #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] - #[track_caller] - pub(crate) fn new_timeout( - deadline: Instant, - location: Option<&'static Location<'static>>, - ) -> Sleep { - let handle = Handle::current(); - let entry = TimerEntry::new(&handle, deadline); - - #[cfg(all(tokio_unstable, feature = "tracing"))] - let inner = { - let handle = &handle.as_time_handle(); - let time_source = handle.time_source().clone(); - let deadline_tick = time_source.deadline_to_tick(deadline); - let duration = deadline_tick.saturating_sub(time_source.now()); - - let location = location.expect("should have location if tracing"); - let resource_span = tracing::trace_span!( - "runtime.resource", - concrete_type = "Sleep", - kind = "timer", - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), - ); - - let async_op_span = resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - duration = duration, - duration.unit = "ms", - duration.op = "override", + cfg_rt! { + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[track_caller] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { + use crate::runtime::Handle; + + let handle = Handle::current().inner; + let entry = TimerEntry::new(&handle, deadline); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let handle = &handle.time(); + let time_source = handle.time_source().clone(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.saturating_sub(time_source.now()); + + let location = location.expect("should have location if tracing"); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), ); - tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") - }); - - let async_op_poll_span = - async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); - - let ctx = trace::AsyncOpTracingCtx { - async_op_span, - async_op_poll_span, - resource_span, + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; + + Inner { + deadline, + ctx, + time_source, + } }; - Inner { - deadline, - ctx, - time_source, - } - }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner { deadline }; - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let inner = Inner { deadline }; + Sleep { inner, entry } + } + } - Sleep { inner, entry } + cfg_not_rt! { + #[track_caller] + pub(crate) fn new_timeout( + _deadline: Instant, + _location: Option<&'static Location<'static>>, + ) -> Sleep { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } } pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {