From 2d2716badf35e3c887c8ab8dfd6ab64a721c6cf5 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Jun 2021 10:59:46 -0700 Subject: [PATCH] subscriber: emit waker stats (#44) Closes #42 --- console-subscriber/src/aggregator.rs | 40 +++++++++++++++++- console-subscriber/src/lib.rs | 63 ++++++++++++++++++++++++++++ proto/tasks.proto | 10 +++++ 3 files changed, 112 insertions(+), 1 deletion(-) diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 6421b28b2..4bd6d575d 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -1,4 +1,4 @@ -use super::{Event, Watch}; +use super::{Event, WakeOp, Watch}; use console_api as proto; use tokio::sync::{mpsc, Notify}; @@ -58,6 +58,7 @@ pub(crate) struct Flush { #[derive(Default)] struct Stats { + // task stats polls: u64, current_polls: u64, created_at: Option, @@ -65,6 +66,12 @@ struct Stats { last_poll: Option, busy_time: Duration, closed_at: Option, + + // waker stats + wakes: u64, + waker_clones: u64, + waker_drops: u64, + last_wake: Option, } #[derive(Default)] @@ -275,6 +282,29 @@ impl Aggregator { Event::Close { id, at } => { self.stats.update_or_default(id).closed_at = Some(at); } + + Event::Waker { id, op, at } => { + // It's possible for wakers to exist long after a task has + // finished. We don't want those cases to create a "new" + // task that isn't closed, just to insert some waker stats. + // + // It may be useful to eventually be able to report about + // "wasted" waker ops, but we'll leave that for another time. + if let Some(mut stats) = self.stats.update(&id) { + match op { + WakeOp::Wake | WakeOp::WakeByRef => { + stats.wakes += 1; + stats.last_wake = Some(at); + } + WakeOp::Clone => { + stats.waker_clones += 1; + } + WakeOp::Drop => { + stats.waker_drops += 1; + } + } + } + } } } @@ -364,6 +394,10 @@ impl TaskData { Updating(self.data.entry(id).or_default()) } + fn update(&mut self, id: &span::Id) -> Option> { + self.data.get_mut(id).map(Updating) + } + fn insert(&mut self, id: span::Id, data: T) { self.data.insert(id, (data, true)); } @@ -432,6 +466,10 @@ impl Stats { last_poll: self.last_poll.map(Into::into), busy_time: Some(self.busy_time.into()), total_time: self.total_time().map(Into::into), + wakes: self.wakes, + waker_clones: self.waker_clones, + waker_drops: self.waker_drops, + last_wake: self.last_wake.map(Into::into), } } } diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 4cedea72f..5f98d3ba8 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -42,6 +42,11 @@ struct FieldVisitor { meta_id: proto::MetaId, } +struct WakerVisitor { + id: Option, + op: Option, +} + struct Watch(mpsc::Sender>); enum Event { @@ -64,6 +69,18 @@ enum Event { id: span::Id, at: SystemTime, }, + Waker { + id: span::Id, + op: WakeOp, + at: SystemTime, + }, +} + +enum WakeOp { + Wake, + WakeByRef, + Clone, + Drop, } impl TasksLayer { @@ -226,6 +243,28 @@ where } } + fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { + let meta = event.metadata(); + // make faster like spawn metadata pointer check? + if meta.target() == "tokio::task::waker" { + let at = SystemTime::now(); + let mut visitor = WakerVisitor { id: None, op: None }; + event.record(&mut visitor); + + match visitor { + WakerVisitor { + id: Some(id), + op: Some(op), + } => { + self.send(Event::Waker { id, op, at }); + } + _ => { + tracing::warn!("unknown waker event: {:?}", event); + } + } + } + } + fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { if !self.is_id_spawned(id, &cx) { return; @@ -349,3 +388,27 @@ impl Visit for FieldVisitor { }); } } + +impl Visit for WakerVisitor { + fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) { + // don't care (yet?) + } + + fn record_u64(&mut self, field: &tracing_core::Field, value: u64) { + if field.name() == "task.id" { + self.id = Some(span::Id::from_u64(value)); + } + } + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + if field.name() == "op" { + self.op = Some(match value { + "waker.wake" => WakeOp::Wake, + "waker.wake_by_ref" => WakeOp::WakeByRef, + "waker.clone" => WakeOp::Clone, + "waker.drop" => WakeOp::Drop, + _ => return, + }); + } + } +} diff --git a/proto/tasks.proto b/proto/tasks.proto index 0fb2f8d19..897d58bf1 100644 --- a/proto/tasks.proto +++ b/proto/tasks.proto @@ -117,4 +117,14 @@ message Stats { // Subtracting `busy_time` from `total_time` calculates the task's idle // time, the amount of time it has spent *waiting* to be polled. google.protobuf.Duration total_time = 6; + // The total number of times this task's waker has been woken. + uint64 wakes = 7; + // The total number of times this task's waker has been cloned. + uint64 waker_clones = 8; + // The total number of times this task's waker has been dropped. + uint64 waker_drops = 9; + // The timestamp of the most recent time this task has been woken. + // + // If this is `None`, the task has not yet been woken. + optional google.protobuf.Timestamp last_wake = 10; }