Skip to content

Commit

Permalink
Export metrics about the blocking thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
duarten committed Nov 3, 2022
1 parent 74a29be commit 659b4c9
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 7 deletions.
69 changes: 62 additions & 7 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

pub(crate) struct BlockingPool {
Expand All @@ -23,6 +24,40 @@ pub(crate) struct Spawner {
inner: Arc<Inner>,
}

#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
queue_depth: AtomicUsize,
}

impl SpawnerMetrics {
fn num_threads(&self) -> usize {
self.num_threads.load(Ordering::Relaxed)
}

cfg_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
}

fn inc_num_threads(&self) {
self.num_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_threads(&self) {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
}

fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}

fn dec_queue_depth(&self) {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
}
}

struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
Expand All @@ -47,11 +82,13 @@ struct Inner {

// Customizable wait timeout.
keep_alive: Duration,

// Metrics about the pool.
metrics: SpawnerMetrics,
}

struct Shared {
queue: VecDeque<Task>,
num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
Expand Down Expand Up @@ -165,7 +202,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_th: 0,
num_idle: 0,
num_notify: 0,
shutdown: false,
Expand All @@ -181,6 +217,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
Expand Down Expand Up @@ -350,11 +387,12 @@ impl Spawner {
}

shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();

if shared.num_idle == 0 {
// No threads are able to process the task.

if shared.num_th == self.inner.thread_cap {
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
Expand All @@ -365,11 +403,14 @@ impl Spawner {

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
shared.num_th += 1;
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => {
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
Expand Down Expand Up @@ -419,6 +460,18 @@ impl Spawner {
}
}

cfg_metrics! {
impl Spawner {
pub(crate) fn num_threads(&self) -> usize {
self.inner.metrics.num_threads()
}

pub(crate) fn queue_depth(&self) -> usize {
self.inner.metrics.queue_depth()
}
}
}

// Tells whether the error when spawning a thread is temporary.
#[inline]
fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
Expand All @@ -437,6 +490,7 @@ impl Inner {
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();

Expand Down Expand Up @@ -478,6 +532,7 @@ impl Inner {
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();
Expand All @@ -496,7 +551,7 @@ impl Inner {
}

// Thread exit
shared.num_th -= 1;
self.metrics.dec_num_threads();

// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
Expand All @@ -506,7 +561,7 @@ impl Inner {
.checked_sub(1)
.expect("num_idle underflowed on thread exit");

if shared.shutdown && shared.num_th == 0 {
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

Expand Down
50 changes: 50 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,32 @@ impl RuntimeMetrics {
self.handle.inner.num_workers()
}

/// Returns the number of additional threads spawned by the runtime.
///
/// The number of workers is set by configuring `max_blocking_threads` on
/// `runtime::Builder`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_blocking_threads();
/// println!("Runtime has created {} threads", n);
/// }
/// ```
pub fn num_blocking_threads(&self) -> usize {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count starts at zero when the runtime is created and
Expand Down Expand Up @@ -446,6 +472,30 @@ impl RuntimeMetrics {
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.inner.worker_local_queue_depth(worker)
}

/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
/// This metric returns the **current** number of tasks pending in
/// blocking thread pool. As such, the returned value may increase
/// or decrease as new tasks are scheduled and processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.blocking_queue_depth();
/// println!("{} tasks currently pending in the blocking thread pool", n);
/// }
/// ```
pub fn blocking_queue_depth(&self) -> usize {
self.handle.inner.blocking_queue_depth()
}
}

cfg_net! {
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ cfg_metrics! {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ cfg_rt! {
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_blocking_threads(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down Expand Up @@ -142,6 +150,14 @@ cfg_rt! {
Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
}
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.blocking_queue_depth(),
}
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ cfg_metrics! {
self.shared.worker_metrics.len()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand All @@ -76,6 +80,10 @@ cfg_metrics! {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
40 changes: 40 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]

use std::sync::{Arc, Mutex};

use tokio::runtime::Runtime;
use tokio::time::{self, Duration};

Expand All @@ -13,6 +15,44 @@ fn num_workers() {
assert_eq!(2, rt.metrics().num_workers());
}

#[test]
fn num_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
assert_eq!(1, rt.metrics().num_blocking_threads());
}

#[test]
fn blocking_queue_depth() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap();

assert_eq!(0, rt.metrics().blocking_queue_depth());

let ready = Arc::new(Mutex::new(()));
let guard = ready.lock().unwrap();

let ready_cloned = ready.clone();
let wait_until_ready = move || {
let _ = ready_cloned.lock().unwrap();
};

let h1 = rt.spawn_blocking(wait_until_ready.clone());
let h2 = rt.spawn_blocking(wait_until_ready);
assert!(rt.metrics().blocking_queue_depth() > 0);

drop(guard);

let _ = rt.block_on(h1);
let _ = rt.block_on(h2);

assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down

0 comments on commit 659b4c9

Please sign in to comment.