Skip to content

Commit

Permalink
tokio: Add initial io driver metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Feb 17, 2022
1 parent 43c224f commit ffa5add
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
target
Cargo.lock

.cargo/config.toml
2 changes: 1 addition & 1 deletion tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ required-features = ["rt-process-signal"]
# For mem check
rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"]
# For test-process-signal
rt-process-signal = ["rt", "tokio/process", "tokio/signal"]
rt-process-signal = ["rt-net", "tokio/process", "tokio/signal"]

full = [
"macros",
Expand Down
22 changes: 22 additions & 0 deletions tokio/src/io/driver/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! This file contains mocks of the metrics types used in the I/O driver.
//!
//! The reason these mocks don't live in `src/runtime/mock.rs` is because
//! these need to be available in the case when `net` is enabled but
//! `rt` is not.

cfg_not_rt_and_metrics! {
#[derive(Default)]
pub(crate) struct IoDriverMetrics {}

impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {}
pub(crate) fn dec_fd_count(&self) {}
pub(crate) fn incr_ready_count_by(&self, _amt: u64) {}
}
}

cfg_rt! {
cfg_metrics! {
pub(crate) use crate::runtime::IoDriverMetrics;
}
}
38 changes: 37 additions & 1 deletion tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ pub(crate) use registration::Registration;
mod scheduled_io;
use scheduled_io::ScheduledIo;

mod metrics;

use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};

use metrics::IoDriverMetrics;

use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -74,6 +78,8 @@ pub(super) struct Inner {

/// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,

metrics: IoDriverMetrics,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -130,6 +136,7 @@ impl Driver {
registry,
io_dispatch: allocator,
waker,
metrics: IoDriverMetrics::default(),
}),
})
}
Expand Down Expand Up @@ -167,14 +174,18 @@ impl Driver {
}

// Process all the events that came in, dispatching appropriately
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();

if token != TOKEN_WAKEUP {
self.dispatch(token, Ready::from_mio(event));
ready_count += 1;
}
}

self.inner.metrics.incr_ready_count_by(ready_count);

self.events = Some(events);

Ok(())
Expand Down Expand Up @@ -279,6 +290,25 @@ cfg_not_rt! {
}
}

cfg_metrics! {
impl Handle {
// TODO: Remove this when handle contains `Arc<Inner>` so that we can return
// &IoDriverMetrics instead of using a closure.
//
// Related issue: https://github.com/tokio-rs/tokio/issues/4509
pub(crate) fn with_io_driver_metrics<F, R>(&self, f: F) -> Option<R>
where
F: Fn(&IoDriverMetrics) -> R,
{
if let Some(inner) = self.inner() {
Some(f(&inner.metrics))
} else {
None
}
}
}
}

impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
Expand Down Expand Up @@ -335,12 +365,18 @@ impl Inner {
self.registry
.register(source, mio::Token(token), interest.to_mio())?;

self.metrics.incr_fd_count();

Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
self.registry.deregister(source)?;

self.metrics.dec_fd_count();

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ pub(crate) use self::doc::winapi;

#[cfg(all(not(docsrs), windows, feature = "net"))]
#[allow(unused)]
pub(crate) use ::winapi;
pub(crate) use winapi;

cfg_macros! {
/// Implementation detail of the `select!` macro. This macro is **not**
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ macro_rules! cfg_not_metrics {
}
}

macro_rules! cfg_not_rt_and_metrics {
($($item:item)*) => {
$( #[cfg(not(all(feature = "rt", all(tokio_unstable, not(loom)))))] $item )*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/runtime/metrics/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#![cfg_attr(not(feature = "net"), allow(dead_code))]

use std::sync::atomic::{AtomicU64, Ordering::Relaxed};

#[derive(Default)]
pub(crate) struct IoDriverMetrics {
pub(super) fd_count: AtomicU64,
pub(super) ready_count: AtomicU64,
}

impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_add(1);
self.fd_count.store(new, Relaxed);
}

pub(crate) fn dec_fd_count(&self) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_sub(1);
self.fd_count.store(new, Relaxed);
}

pub(crate) fn incr_ready_count_by(&self, amt: u64) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_add(amt);
self.ready_count.store(new, Relaxed);
}
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ cfg_metrics! {

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
}
}

cfg_not_metrics! {
Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,61 @@ impl RuntimeMetrics {
self.handle.spawner.worker_local_queue_depth(worker)
}
}

cfg_net! {
impl RuntimeMetrics {
/// Returns the number of file descriptors currently registered with the
/// runtime's I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_fd_count();
/// println!("{} fds currently registered with the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_fd_count(&self) -> u64 {
self.with_io_driver_metrics(|m| m.fd_count.load(Relaxed))
}

/// Returns the number of ready events processed by the runtime's
/// I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_ready_count();
/// println!("{} ready events procssed by the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_ready_count(&self) -> u64 {
self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed))
}

fn with_io_driver_metrics<F>(&self, f: F) -> u64
where
F: Fn(&super::IoDriverMetrics) -> u64,
{
// TODO: Investigate if this should return 0, most of our metrics always increase
// thus this breaks that guarantee.
self.handle
.io_handle
.as_ref()
.map(|h| h.with_io_driver_metrics(f))
.flatten()
.unwrap_or(0)
}
}
}
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ cfg_metrics! {
pub use metrics::RuntimeMetrics;

pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics};

cfg_net! {
pub(crate) use metrics::IoDriverMetrics;
}
}

cfg_not_metrics! {
Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,32 @@ fn worker_local_queue_depth() {
});
}

#[test]
fn io_driver_fd_count() {
let rt = basic();
let metrics = rt.metrics();

let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_fd_count(), 2);

drop(stream);

assert_eq!(metrics.io_driver_fd_count(), 1);
}

#[test]
fn io_driver_ready_count() {
let rt = basic();
let metrics = rt.metrics();

let stream = tokio::net::TcpStream::connect("google.com:80");
let _stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_ready_count(), 3);
}

fn basic() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down

0 comments on commit ffa5add

Please sign in to comment.