Skip to content

Commit

Permalink
metrics: stabilize injection_queue_depth metric (#6854)
Browse files Browse the repository at this point in the history
  • Loading branch information
Owen-CH-Leung authored Sep 22, 2024
1 parent a302367 commit 542197c
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 92 deletions.
52 changes: 26 additions & 26 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,32 @@ impl RuntimeMetrics {
self.handle.inner.num_alive_tasks()
}

/// Returns the number of tasks currently scheduled in the runtime's
/// injection queue.
///
/// Tasks that are spawned or notified from a non-runtime thread are
/// scheduled using the runtime's injection queue. This metric returns the
/// **current** number of tasks pending in the injection queue. As such, the
/// returned value may increase or decrease as new tasks are scheduled and
/// processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.injection_queue_depth();
/// println!("{} tasks currently pending in the runtime's injection queue", n);
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
}

cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
Expand Down Expand Up @@ -655,32 +681,6 @@ impl RuntimeMetrics {
}
}

/// Returns the number of tasks currently scheduled in the runtime's
/// injection queue.
///
/// Tasks that are spawned or notified from a non-runtime thread are
/// scheduled using the runtime's injection queue. This metric returns the
/// **current** number of tasks pending in the injection queue. As such, the
/// returned value may increase or decrease as new tasks are scheduled and
/// processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.injection_queue_depth();
/// println!("{} tasks currently pending in the runtime's injection queue", n);
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
}

/// Returns the number of tasks currently scheduled in the given worker's
/// local queue.
///
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ impl Handle {
pub(crate) fn num_alive_tasks(&self) -> usize {
self.shared.owned.num_alive_tasks()
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}
}

cfg_unstable_metrics! {
Expand All @@ -536,10 +540,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/scheduler/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ cfg_rt_multi_thread! {
mod rt_multi_thread;
}

cfg_unstable_metrics! {
mod metrics;
}
mod metrics;

/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ cfg_rt! {
pub(crate) fn num_alive_tasks(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
}

pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}
}

cfg_unstable_metrics! {
Expand Down Expand Up @@ -217,10 +221,6 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}

pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
}
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ impl Handle {
self.shared.owned.num_alive_tasks()
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}

cfg_unstable_metrics! {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
Expand All @@ -39,10 +43,6 @@ impl Handle {
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ use std::task::Waker;
use std::thread;
use std::time::Duration;

cfg_unstable_metrics! {
mod metrics;
}
mod metrics;

cfg_taskdump! {
mod taskdump;
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ impl Shared {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.inject.len()
}
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
cfg_unstable_metrics! {
impl Shared {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
}
}
}
46 changes: 46 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))]

use std::sync::{Arc, Barrier};
use tokio::runtime::Runtime;

#[test]
Expand Down Expand Up @@ -45,6 +46,51 @@ fn num_alive_tasks() {
assert_eq!(0, rt.metrics().num_alive_tasks());
}

#[test]
fn injection_queue_depth_current_thread() {
use std::thread;

let rt = current_thread();
let handle = rt.handle().clone();
let metrics = rt.metrics();

thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();

assert_eq!(1, metrics.injection_queue_depth());
}

#[test]
fn injection_queue_depth_multi_thread() {
let rt = threaded();
let metrics = rt.metrics();

let barrier1 = Arc::new(Barrier::new(3));
let barrier2 = Arc::new(Barrier::new(3));

// Spawn a task per runtime worker to block it.
for _ in 0..2 {
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
rt.spawn(async move {
barrier1.wait();
barrier2.wait();
});
}

barrier1.wait();

for i in 0..10 {
assert_eq!(i, metrics.injection_queue_depth());
rt.spawn(async {});
}

barrier2.wait();
}

fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down
47 changes: 1 addition & 46 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
))]

use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;
Expand Down Expand Up @@ -622,51 +622,6 @@ fn worker_overflow_count() {
assert_eq!(1, n);
}

#[test]
fn injection_queue_depth_current_thread() {
use std::thread;

let rt = current_thread();
let handle = rt.handle().clone();
let metrics = rt.metrics();

thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();

assert_eq!(1, metrics.injection_queue_depth());
}

#[test]
fn injection_queue_depth_multi_thread() {
let rt = threaded();
let metrics = rt.metrics();

let barrier1 = Arc::new(Barrier::new(3));
let barrier2 = Arc::new(Barrier::new(3));

// Spawn a task per runtime worker to block it.
for _ in 0..2 {
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
rt.spawn(async move {
barrier1.wait();
barrier2.wait();
});
}

barrier1.wait();

for i in 0..10 {
assert_eq!(i, metrics.injection_queue_depth());
rt.spawn(async {});
}

barrier2.wait();
}

#[test]
fn worker_local_queue_depth() {
const N: usize = 100;
Expand Down

0 comments on commit 542197c

Please sign in to comment.