diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index dd166109469..f3f206ce1f2 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle}; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; pub(crate) struct BlockingPool { @@ -23,6 +24,40 @@ pub(crate) struct Spawner { inner: Arc, } +#[derive(Default)] +pub(crate) struct SpawnerMetrics { + num_threads: AtomicUsize, + queue_depth: AtomicUsize, +} + +impl SpawnerMetrics { + fn num_threads(&self) -> usize { + self.num_threads.load(Ordering::Relaxed) + } + + cfg_metrics! { + fn queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::Relaxed) + } + } + + fn inc_num_threads(&self) { + self.num_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_threads(&self) { + self.num_threads.fetch_sub(1, Ordering::Relaxed); + } + + fn inc_queue_depth(&self) { + self.queue_depth.fetch_add(1, Ordering::Relaxed); + } + + fn dec_queue_depth(&self) { + self.queue_depth.fetch_sub(1, Ordering::Relaxed); + } +} + struct Inner { /// State shared between worker threads. shared: Mutex, @@ -47,11 +82,13 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Metrics about the pool. + metrics: SpawnerMetrics, } struct Shared { queue: VecDeque, - num_th: usize, num_idle: u32, num_notify: u32, shutdown: bool, @@ -165,7 +202,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_th: 0, num_idle: 0, num_notify: 0, shutdown: false, @@ -181,6 +217,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + metrics: Default::default(), }), }, shutdown_rx, @@ -350,11 +387,12 @@ impl Spawner { } shared.queue.push_back(task); + self.inner.metrics.inc_queue_depth(); if shared.num_idle == 0 { // No threads are able to process the task. - if shared.num_th == self.inner.thread_cap { + if self.inner.metrics.num_threads() == self.inner.thread_cap { // At max number of threads } else { assert!(shared.shutdown_tx.is_some()); @@ -365,11 +403,14 @@ impl Spawner { match self.spawn_thread(shutdown_tx, rt, id) { Ok(handle) => { - shared.num_th += 1; + self.inner.metrics.inc_num_threads(); shared.worker_thread_index += 1; shared.worker_threads.insert(id, handle); } - Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => { + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { // OS temporarily failed to spawn a new thread. // The task will be picked up eventually by a currently // busy thread. @@ -419,6 +460,18 @@ impl Spawner { } } +cfg_metrics! { + impl Spawner { + pub(crate) fn num_threads(&self) -> usize { + self.inner.metrics.num_threads() + } + + pub(crate) fn queue_depth(&self) -> usize { + self.inner.metrics.queue_depth() + } + } +} + // Tells whether the error when spawning a thread is temporary. #[inline] fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { @@ -437,6 +490,7 @@ impl Inner { 'main: loop { // BUSY while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.run(); @@ -478,6 +532,7 @@ impl Inner { if shared.shutdown { // Drain the queue while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.shutdown_or_run_if_mandatory(); @@ -496,7 +551,7 @@ impl Inner { } // Thread exit - shared.num_th -= 1; + self.metrics.dec_num_threads(); // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the @@ -506,7 +561,7 @@ impl Inner { .checked_sub(1) .expect("num_idle underflowed on thread exit"); - if shared.shutdown && shared.num_th == 0 { + if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 49c926302f5..cda162bb0c0 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -42,6 +42,32 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } + /// Returns the number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count starts at zero when the runtime is created and @@ -446,6 +472,30 @@ impl RuntimeMetrics { pub fn worker_local_queue_depth(&self, worker: usize) -> usize { self.handle.inner.worker_local_queue_depth(worker) } + + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } } cfg_net! { diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 778f93a84c5..f466a83e9e4 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -424,6 +424,14 @@ cfg_metrics! { assert_eq!(0, worker); &self.shared.worker_metrics } + + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 18ac474fa75..eabf5a77248 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -111,6 +111,14 @@ cfg_rt! { } } + pub(crate) fn num_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_blocking_threads(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), @@ -142,6 +150,14 @@ cfg_rt! { Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker), } } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.blocking_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.blocking_queue_depth(), + } + } } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 884f400bf00..c0815231bb2 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -61,6 +61,10 @@ cfg_metrics! { self.shared.worker_metrics.len() } + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } @@ -76,6 +80,10 @@ cfg_metrics! { pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.shared.worker_local_queue_depth(worker) } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index cffc117bce2..47dea08ea4c 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] +use std::sync::{Arc, Mutex}; + use tokio::runtime::Runtime; use tokio::time::{self, Duration}; @@ -13,6 +15,44 @@ fn num_workers() { assert_eq!(2, rt.metrics().num_workers()); } +#[test] +fn num_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); +} + +#[test] +fn blocking_queue_depth() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(1) + .build() + .unwrap(); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); + + let ready = Arc::new(Mutex::new(())); + let guard = ready.lock().unwrap(); + + let ready_cloned = ready.clone(); + let wait_until_ready = move || { + let _unused = ready_cloned.lock().unwrap(); + }; + + let h1 = rt.spawn_blocking(wait_until_ready.clone()); + let h2 = rt.spawn_blocking(wait_until_ready); + assert!(rt.metrics().blocking_queue_depth() > 0); + + drop(guard); + + let _ = rt.block_on(h1); + let _ = rt.block_on(h2); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); +} + #[test] fn remote_schedule_count() { use std::thread;