Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: export metrics about the blocking thread pool #5161

Merged
merged 3 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 87 additions & 17 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

pub(crate) struct BlockingPool {
Expand All @@ -23,6 +24,53 @@ pub(crate) struct Spawner {
inner: Arc<Inner>,
}

#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
num_idle_threads: AtomicUsize,
queue_depth: AtomicUsize,
}

impl SpawnerMetrics {
fn num_threads(&self) -> usize {
self.num_threads.load(Ordering::Relaxed)
}

fn num_idle_threads(&self) -> usize {
self.num_idle_threads.load(Ordering::Relaxed)
}

cfg_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
}

fn inc_num_threads(&self) {
self.num_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_threads(&self) {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
}

fn inc_num_idle_threads(&self) {
self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
}

fn dec_num_idle_threads(&self) -> usize {
self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
}

fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}

fn dec_queue_depth(&self) {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
}
}

struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
Expand All @@ -47,12 +95,13 @@ struct Inner {

// Customizable wait timeout.
keep_alive: Duration,

// Metrics about the pool.
metrics: SpawnerMetrics,
}

struct Shared {
queue: VecDeque<Task>,
num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
Expand Down Expand Up @@ -165,8 +214,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_th: 0,
num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
Expand All @@ -181,6 +228,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
Expand Down Expand Up @@ -350,11 +398,12 @@ impl Spawner {
}

shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();

if shared.num_idle == 0 {
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.

if shared.num_th == self.inner.thread_cap {
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
Expand All @@ -365,11 +414,14 @@ impl Spawner {

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
shared.num_th += 1;
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => {
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
Expand All @@ -388,7 +440,7 @@ impl Spawner {
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
}
Expand Down Expand Up @@ -419,6 +471,22 @@ impl Spawner {
}
}

cfg_metrics! {
impl Spawner {
pub(crate) fn num_threads(&self) -> usize {
self.inner.metrics.num_threads()
}

pub(crate) fn num_idle_threads(&self) -> usize {
self.inner.metrics.num_idle_threads()
}

pub(crate) fn queue_depth(&self) -> usize {
self.inner.metrics.queue_depth()
}
}
}

// Tells whether the error when spawning a thread is temporary.
#[inline]
fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
Expand All @@ -437,14 +505,15 @@ impl Inner {
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();

shared = self.shared.lock();
}

// IDLE
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
Expand Down Expand Up @@ -478,6 +547,7 @@ impl Inner {
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();
Expand All @@ -488,25 +558,25 @@ impl Inner {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
}

// Thread exit
shared.num_th -= 1;
self.metrics.dec_num_threads();

// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
shared.num_idle = shared
.num_idle
.checked_sub(1)
.expect("num_idle underflowed on thread exit");
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}

if shared.shutdown && shared.num_th == 0 {
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

Expand Down
74 changes: 74 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,56 @@ impl RuntimeMetrics {
self.handle.inner.num_workers()
}

/// Returns the number of additional threads spawned by the runtime.
///
/// The number of workers is set by configuring `max_blocking_threads` on
/// `runtime::Builder`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_blocking_threads();
/// println!("Runtime has created {} threads", n);
/// }
/// ```
pub fn num_blocking_threads(&self) -> usize {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of idle threads, which hve spawned by the runtime
/// for `spawn_blocking` calls.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_idle_blocking_threads();
/// println!("Runtime has {} idle blocking thread pool threads", n);
/// }
/// ```
pub fn num_idle_blocking_threads(&self) -> usize {
self.handle.inner.num_idle_blocking_threads()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count starts at zero when the runtime is created and
Expand Down Expand Up @@ -446,6 +496,30 @@ impl RuntimeMetrics {
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.inner.worker_local_queue_depth(worker)
}

/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
/// This metric returns the **current** number of tasks pending in
/// blocking thread pool. As such, the returned value may increase
/// or decrease as new tasks are scheduled and processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.blocking_queue_depth();
/// println!("{} tasks currently pending in the blocking thread pool", n);
/// }
/// ```
pub fn blocking_queue_depth(&self) -> usize {
self.handle.inner.blocking_queue_depth()
}
}

cfg_net! {
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,18 @@ cfg_metrics! {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ cfg_rt! {
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_blocking_threads(),
}
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down Expand Up @@ -142,6 +158,14 @@ cfg_rt! {
Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
}
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.blocking_queue_depth(),
}
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ cfg_metrics! {
self.shared.worker_metrics.len()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand All @@ -76,6 +84,10 @@ cfg_metrics! {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

Expand Down
Loading