Skip to content

Commit

Permalink
rt: export metrics about the blocking thread pool (#5161)
Browse files Browse the repository at this point in the history
Publish the blocking thread pool metrics as thread-safe values, written
under the blocking thread pool's lock and able to be read in a lock-free
fashion by any reader.

Fixes #5156
  • Loading branch information
duarten authored Nov 4, 2022
1 parent 5b46395 commit 23fdd32
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 17 deletions.
104 changes: 87 additions & 17 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

pub(crate) struct BlockingPool {
Expand All @@ -23,6 +24,53 @@ pub(crate) struct Spawner {
inner: Arc<Inner>,
}

#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
num_idle_threads: AtomicUsize,
queue_depth: AtomicUsize,
}

impl SpawnerMetrics {
fn num_threads(&self) -> usize {
self.num_threads.load(Ordering::Relaxed)
}

fn num_idle_threads(&self) -> usize {
self.num_idle_threads.load(Ordering::Relaxed)
}

cfg_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
}

fn inc_num_threads(&self) {
self.num_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_threads(&self) {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
}

fn inc_num_idle_threads(&self) {
self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_idle_threads(&self) -> usize {
self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
}

fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}

fn dec_queue_depth(&self) {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
}
}

struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
Expand All @@ -47,12 +95,13 @@ struct Inner {

// Customizable wait timeout.
keep_alive: Duration,

// Metrics about the pool.
metrics: SpawnerMetrics,
}

struct Shared {
queue: VecDeque<Task>,
num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
Expand Down Expand Up @@ -165,8 +214,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_th: 0,
num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
Expand All @@ -181,6 +228,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
Expand Down Expand Up @@ -350,11 +398,12 @@ impl Spawner {
}

shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();

if shared.num_idle == 0 {
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.

if shared.num_th == self.inner.thread_cap {
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
Expand All @@ -365,11 +414,14 @@ impl Spawner {

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
shared.num_th += 1;
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => {
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
Expand All @@ -388,7 +440,7 @@ impl Spawner {
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
}
Expand Down Expand Up @@ -419,6 +471,22 @@ impl Spawner {
}
}

cfg_metrics! {
impl Spawner {
pub(crate) fn num_threads(&self) -> usize {
self.inner.metrics.num_threads()
}

pub(crate) fn num_idle_threads(&self) -> usize {
self.inner.metrics.num_idle_threads()
}

pub(crate) fn queue_depth(&self) -> usize {
self.inner.metrics.queue_depth()
}
}
}

// Tells whether the error when spawning a thread is temporary.
#[inline]
fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
Expand All @@ -437,14 +505,15 @@ impl Inner {
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();

shared = self.shared.lock();
}

// IDLE
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
Expand Down Expand Up @@ -478,6 +547,7 @@ impl Inner {
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();
Expand All @@ -488,25 +558,25 @@ impl Inner {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
}

// Thread exit
shared.num_th -= 1;
self.metrics.dec_num_threads();

// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
shared.num_idle = shared
.num_idle
.checked_sub(1)
.expect("num_idle underflowed on thread exit");
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}

if shared.shutdown && shared.num_th == 0 {
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

Expand Down
74 changes: 74 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,56 @@ impl RuntimeMetrics {
self.handle.inner.num_workers()
}

/// Returns the number of additional threads spawned by the runtime.
///
/// The number of workers is set by configuring `max_blocking_threads` on
/// `runtime::Builder`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_blocking_threads();
/// println!("Runtime has created {} threads", n);
/// }
/// ```
pub fn num_blocking_threads(&self) -> usize {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of idle threads, which hve spawned by the runtime
/// for `spawn_blocking` calls.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_idle_blocking_threads();
/// println!("Runtime has {} idle blocking thread pool threads", n);
/// }
/// ```
pub fn num_idle_blocking_threads(&self) -> usize {
self.handle.inner.num_idle_blocking_threads()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count starts at zero when the runtime is created and
Expand Down Expand Up @@ -446,6 +496,30 @@ impl RuntimeMetrics {
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.inner.worker_local_queue_depth(worker)
}

/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
/// This metric returns the **current** number of tasks pending in
/// blocking thread pool. As such, the returned value may increase
/// or decrease as new tasks are scheduled and processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.blocking_queue_depth();
/// println!("{} tasks currently pending in the blocking thread pool", n);
/// }
/// ```
pub fn blocking_queue_depth(&self) -> usize {
self.handle.inner.blocking_queue_depth()
}
}

cfg_net! {
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,18 @@ cfg_metrics! {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ cfg_rt! {
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_blocking_threads(),
}
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down Expand Up @@ -142,6 +158,14 @@ cfg_rt! {
Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
}
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.blocking_queue_depth(),
}
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ cfg_metrics! {
self.shared.worker_metrics.len()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand All @@ -76,6 +84,10 @@ cfg_metrics! {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
Loading

0 comments on commit 23fdd32

Please sign in to comment.