Skip to content

Commit

Permalink
metrics: add a new metric for budget exhaustion yields (#5517)
Browse files Browse the repository at this point in the history
  • Loading branch information
Noah-Kennedy authored Mar 1, 2023
1 parent ee1c940 commit 52da177
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 5 deletions.
47 changes: 42 additions & 5 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ use crate::runtime::context;
#[derive(Debug, Copy, Clone)]
pub(crate) struct Budget(Option<u8>);

pub(crate) struct BudgetDecrement {
success: bool,
hit_zero: bool,
}

impl Budget {
/// Budget assigned to a task on each poll.
///
Expand Down Expand Up @@ -172,9 +177,17 @@ cfg_coop! {
context::budget(|cell| {
let mut budget = cell.get();

if budget.decrement() {
let decrement = budget.decrement();

if decrement.success {
let restore = RestoreOnPending(Cell::new(cell.get()));
cell.set(budget);

// avoid double counting
if decrement.hit_zero {
inc_budget_forced_yield_count();
}

Poll::Ready(restore)
} else {
cx.waker().wake_by_ref();
Expand All @@ -183,19 +196,43 @@ cfg_coop! {
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
}

cfg_rt! {
cfg_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {
if let Ok(handle) = context::try_current() {
handle.scheduler_metrics().inc_budget_forced_yield_count();
}
}
}

cfg_not_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}
}

cfg_not_rt! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}

impl Budget {
/// Decrements the budget. Returns `true` if successful. Decrementing fails
/// when there is not enough remaining budget.
fn decrement(&mut self) -> bool {
fn decrement(&mut self) -> BudgetDecrement {
if let Some(num) = &mut self.0 {
if *num > 0 {
*num -= 1;
true

let hit_zero = *num == 0;

BudgetDecrement { success: true, hit_zero }
} else {
false
BudgetDecrement { success: false, hit_zero: false }
}
} else {
true
BudgetDecrement { success: true, hit_zero: false }
}
}

Expand Down
15 changes: 15 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the number of times that tasks have been forced to yield back to the scheduler
/// after exhausting their task budgets.
///
/// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
pub fn budget_forced_yield_count(&self) -> u64 {
self.handle
.inner
.scheduler_metrics()
.budget_forced_yield_count
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked.
///
/// The worker park count starts at zero when the runtime is created and
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/metrics/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
pub(crate) struct SchedulerMetrics {
/// Number of tasks that are scheduled from outside the runtime.
pub(super) remote_schedule_count: AtomicU64,
pub(super) budget_forced_yield_count: AtomicU64,
}

impl SchedulerMetrics {
pub(crate) fn new() -> SchedulerMetrics {
SchedulerMetrics {
remote_schedule_count: AtomicU64::new(0),
budget_forced_yield_count: AtomicU64::new(0),
}
}

/// Increment the number of tasks scheduled externally
pub(crate) fn inc_remote_schedule_count(&self) {
self.remote_schedule_count.fetch_add(1, Relaxed);
}

/// Increment the number of tasks forced to yield due to budget exhaustion
pub(crate) fn inc_budget_forced_yield_count(&self) {
self.budget_forced_yield_count.fetch_add(1, Relaxed);
}
}
76 changes: 76 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

use tokio::runtime::Runtime;
use tokio::task::consume_budget;
use tokio::time::{self, Duration};

#[test]
Expand Down Expand Up @@ -433,6 +437,78 @@ fn worker_local_queue_depth() {
});
}

#[test]
fn budget_exhaustion_yield() {
let rt = current_thread();
let metrics = rt.metrics();

assert_eq!(0, metrics.budget_forced_yield_count());

let mut did_yield = false;

// block on a task which consumes budget until it yields
rt.block_on(poll_fn(|cx| loop {
if did_yield {
return Poll::Ready(());
}

let fut = consume_budget();
tokio::pin!(fut);

if fut.poll(cx).is_pending() {
did_yield = true;
return Poll::Pending;
}
}));

assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[test]
fn budget_exhaustion_yield_with_joins() {
let rt = current_thread();
let metrics = rt.metrics();

assert_eq!(0, metrics.budget_forced_yield_count());

let mut did_yield_1 = false;
let mut did_yield_2 = false;

// block on a task which consumes budget until it yields
rt.block_on(async {
tokio::join!(
poll_fn(|cx| loop {
if did_yield_1 {
return Poll::Ready(());
}

let fut = consume_budget();
tokio::pin!(fut);

if fut.poll(cx).is_pending() {
did_yield_1 = true;
return Poll::Pending;
}
}),
poll_fn(|cx| loop {
if did_yield_2 {
return Poll::Ready(());
}

let fut = consume_budget();
tokio::pin!(fut);

if fut.poll(cx).is_pending() {
did_yield_2 = true;
return Poll::Pending;
}
})
)
});

assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
Expand Down

0 comments on commit 52da177

Please sign in to comment.