Skip to content

Commit

Permalink
subscriber: emit waker stats (#44)
Browse files Browse the repository at this point in the history
Closes #42
  • Loading branch information
seanmonstar authored Jun 14, 2021
1 parent e45fca0 commit 2d2716b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
40 changes: 39 additions & 1 deletion console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Event, Watch};
use super::{Event, WakeOp, Watch};
use console_api as proto;
use tokio::sync::{mpsc, Notify};

Expand Down Expand Up @@ -58,13 +58,20 @@ pub(crate) struct Flush {

#[derive(Default)]
struct Stats {
// task stats
polls: u64,
current_polls: u64,
created_at: Option<SystemTime>,
first_poll: Option<SystemTime>,
last_poll: Option<SystemTime>,
busy_time: Duration,
closed_at: Option<SystemTime>,

// waker stats
wakes: u64,
waker_clones: u64,
waker_drops: u64,
last_wake: Option<SystemTime>,
}

#[derive(Default)]
Expand Down Expand Up @@ -275,6 +282,29 @@ impl Aggregator {
Event::Close { id, at } => {
self.stats.update_or_default(id).closed_at = Some(at);
}

Event::Waker { id, op, at } => {
// It's possible for wakers to exist long after a task has
// finished. We don't want those cases to create a "new"
// task that isn't closed, just to insert some waker stats.
//
// It may be useful to eventually be able to report about
// "wasted" waker ops, but we'll leave that for another time.
if let Some(mut stats) = self.stats.update(&id) {
match op {
WakeOp::Wake | WakeOp::WakeByRef => {
stats.wakes += 1;
stats.last_wake = Some(at);
}
WakeOp::Clone => {
stats.waker_clones += 1;
}
WakeOp::Drop => {
stats.waker_drops += 1;
}
}
}
}
}
}

Expand Down Expand Up @@ -364,6 +394,10 @@ impl<T> TaskData<T> {
Updating(self.data.entry(id).or_default())
}

fn update(&mut self, id: &span::Id) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

fn insert(&mut self, id: span::Id, data: T) {
self.data.insert(id, (data, true));
}
Expand Down Expand Up @@ -432,6 +466,10 @@ impl Stats {
last_poll: self.last_poll.map(Into::into),
busy_time: Some(self.busy_time.into()),
total_time: self.total_time().map(Into::into),
wakes: self.wakes,
waker_clones: self.waker_clones,
waker_drops: self.waker_drops,
last_wake: self.last_wake.map(Into::into),
}
}
}
Expand Down
63 changes: 63 additions & 0 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ struct FieldVisitor {
meta_id: proto::MetaId,
}

struct WakerVisitor {
id: Option<span::Id>,
op: Option<WakeOp>,
}

struct Watch(mpsc::Sender<Result<proto::tasks::TaskUpdate, tonic::Status>>);

enum Event {
Expand All @@ -64,6 +69,18 @@ enum Event {
id: span::Id,
at: SystemTime,
},
Waker {
id: span::Id,
op: WakeOp,
at: SystemTime,
},
}

enum WakeOp {
Wake,
WakeByRef,
Clone,
Drop,
}

impl TasksLayer {
Expand Down Expand Up @@ -226,6 +243,28 @@ where
}
}

fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let meta = event.metadata();
// make faster like spawn metadata pointer check?
if meta.target() == "tokio::task::waker" {
let at = SystemTime::now();
let mut visitor = WakerVisitor { id: None, op: None };
event.record(&mut visitor);

match visitor {
WakerVisitor {
id: Some(id),
op: Some(op),
} => {
self.send(Event::Waker { id, op, at });
}
_ => {
tracing::warn!("unknown waker event: {:?}", event);
}
}
}
}

fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
if !self.is_id_spawned(id, &cx) {
return;
Expand Down Expand Up @@ -349,3 +388,27 @@ impl Visit for FieldVisitor {
});
}
}

impl Visit for WakerVisitor {
fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {
// don't care (yet?)
}

fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
if field.name() == "task.id" {
self.id = Some(span::Id::from_u64(value));
}
}

fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
if field.name() == "op" {
self.op = Some(match value {
"waker.wake" => WakeOp::Wake,
"waker.wake_by_ref" => WakeOp::WakeByRef,
"waker.clone" => WakeOp::Clone,
"waker.drop" => WakeOp::Drop,
_ => return,
});
}
}
}
10 changes: 10 additions & 0 deletions proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,14 @@ message Stats {
// Subtracting `busy_time` from `total_time` calculates the task's idle
// time, the amount of time it has spent *waiting* to be polled.
google.protobuf.Duration total_time = 6;
// The total number of times this task's waker has been woken.
uint64 wakes = 7;
// The total number of times this task's waker has been cloned.
uint64 waker_clones = 8;
// The total number of times this task's waker has been dropped.
uint64 waker_drops = 9;
// The timestamp of the most recent time this task has been woken.
//
// If this is `None`, the task has not yet been woken.
optional google.protobuf.Timestamp last_wake = 10;
}

0 comments on commit 2d2716b

Please sign in to comment.