From a4b9d2c796055f46b074fe8bc33722edc06e689b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 28 Dec 2021 10:50:36 -0800 Subject: [PATCH] basic functionality works Signed-off-by: Eliza Weisman --- console-subscriber/src/aggregator/id_data.rs | 66 +-- console-subscriber/src/aggregator/mod.rs | 458 +++++-------------- console-subscriber/src/lib.rs | 35 +- console-subscriber/src/record.rs | 41 +- console-subscriber/src/stats.rs | 91 +++- console-subscriber/src/sync.rs | 2 +- 6 files changed, 260 insertions(+), 433 deletions(-) diff --git a/console-subscriber/src/aggregator/id_data.rs b/console-subscriber/src/aggregator/id_data.rs index 92417a584..ebb783c8c 100644 --- a/console-subscriber/src/aggregator/id_data.rs +++ b/console-subscriber/src/aggregator/id_data.rs @@ -1,14 +1,12 @@ -use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto}; +use super::{shrink::ShrinkMap, Id, ToProto}; +use crate::stats::{DroppedAt, Unsent}; use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; use std::time::{Duration, SystemTime}; pub(crate) struct IdData { - data: ShrinkMap, + data: ShrinkMap, } -pub(crate) struct Updating<'a, T>(&'a mut (T, bool)); - pub(crate) enum Include { All, UpdatedOnly, @@ -19,31 +17,19 @@ pub(crate) enum Include { impl Default for IdData { fn default() -> Self { IdData { - data: ShrinkMap::::new(), + data: ShrinkMap::::new(), } } } -impl IdData { - pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T> - where - T: Default, - { - Updating(self.data.entry(id).or_default()) - } - - pub(crate) fn update(&mut self, id: &Id) -> Option> { - self.data.get_mut(id).map(Updating) - } - +impl IdData { pub(crate) fn insert(&mut self, id: Id, data: T) { - self.data.insert(id, (data, true)); + self.data.insert(id, data); } pub(crate) fn since_last_update(&mut self) -> impl Iterator { - self.data.iter_mut().filter_map(|(id, (data, dirty))| { - if *dirty { - *dirty = false; + self.data.iter_mut().filter_map(|(id, data)| { + if data.take_unsent() { Some((id, data)) } else { None @@ -52,11 +38,11 @@ impl IdData { } pub(crate) fn all(&self) -> impl Iterator { - self.data.iter().map(|(id, (data, _))| (id, data)) + self.data.iter() } pub(crate) fn get(&self, id: &Id) -> Option<&T> { - self.data.get(id).map(|(data, _)| data) + self.data.get(id) } pub(crate) fn as_proto(&mut self, include: Include) -> HashMap @@ -75,7 +61,7 @@ impl IdData { } } - pub(crate) fn drop_closed( + pub(crate) fn drop_closed( &mut self, stats: &mut IdData, now: SystemTime, @@ -92,18 +78,19 @@ impl IdData { // drop closed entities tracing::trace!(?retention, has_watchers, "dropping closed"); - stats.data.retain_and_shrink(|id, (stats, dirty)| { + stats.data.retain_and_shrink(|id, stats| { if let Some(dropped_at) = stats.dropped_at() { let dropped_for = now.duration_since(dropped_at).unwrap_or_default(); + let dirty = stats.is_unsent(); let should_drop = // if there are any clients watching, retain all dirty tasks regardless of age - (*dirty && has_watchers) + (dirty && has_watchers) || dropped_for > retention; tracing::trace!( stats.id = ?id, stats.dropped_at = ?dropped_at, stats.dropped_for = ?dropped_for, - stats.dirty = *dirty, + stats.dirty = dirty, should_drop, ); return !should_drop; @@ -114,27 +101,6 @@ impl IdData { // drop closed entities which no longer have stats. self.data - .retain_and_shrink(|id, (_, _)| stats.data.contains_key(id)); - } -} - -// === impl Updating === - -impl<'a, T> Deref for Updating<'a, T> { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.0 .0 - } -} - -impl<'a, T> DerefMut for Updating<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 .0 - } -} - -impl<'a, T> Drop for Updating<'a, T> { - fn drop(&mut self) { - self.0 .1 = true; + .retain_and_shrink(|id, _| stats.data.contains_key(id)); } } diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 7b97ae5b2..a7e367b8c 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,7 +1,9 @@ -use super::{ - AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch, +use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, Watch}; +use crate::{ + record::Recorder, + stats::{self, Unsent}, + ToProto, WatchRequest, }; -use crate::{record::Recorder, WatchRequest}; use console_api as proto; use proto::resources::resource; use proto::Attribute; @@ -66,19 +68,19 @@ pub(crate) struct Aggregator { tasks: IdData, /// Map of task IDs to task stats. - task_stats: IdData, + task_stats: IdData>, /// Map of resource IDs to resource static data. resources: IdData, /// Map of resource IDs to resource stats. - resource_stats: IdData, + resource_stats: IdData>, /// Map of AsyncOp IDs to AsyncOp static data. async_ops: IdData, /// Map of AsyncOp IDs to AsyncOp stats. - async_op_stats: IdData, + async_op_stats: IdData>, /// *All* PollOp events for AsyncOps on Resources. /// @@ -104,19 +106,6 @@ pub(crate) struct Flush { triggered: AtomicBool, } -// An entity (e.g Task, Resource) that at some point in -// time can be dropped. This generally refers to spans that -// have been closed indicating that a task, async op or a -// resource is not in use anymore -pub(crate) trait DroppedAt { - fn dropped_at(&self) -> Option; -} - -pub(crate) trait ToProto { - type Output; - fn to_proto(&self) -> Self::Output; -} - #[derive(Debug)] enum Temporality { Live, @@ -125,6 +114,7 @@ enum Temporality { // Represent static data for resources struct Resource { id: Id, + is_dirty: AtomicBool, parent_id: Option, metadata: &'static Metadata<'static>, concrete_type: String, @@ -134,25 +124,10 @@ struct Resource { inherit_child_attrs: bool, } -/// Represents a key for a `proto::field::Name`. Because the -/// proto::field::Name might not be unique we also include the -/// resource id in this key -#[derive(Hash, PartialEq, Eq)] -struct FieldKey { - update_id: Id, - field_name: proto::field::Name, -} - -#[derive(Default)] -struct ResourceStats { - created_at: Option, - dropped_at: Option, - attributes: HashMap, -} - /// Represents static data for tasks struct Task { id: Id, + is_dirty: AtomicBool, metadata: &'static Metadata<'static>, fields: Vec, location: Option, @@ -160,6 +135,7 @@ struct Task { struct AsyncOp { id: Id, + is_dirty: AtomicBool, parent_id: Option, resource_id: Id, metadata: &'static Metadata<'static>, @@ -167,80 +143,6 @@ struct AsyncOp { inherit_child_attrs: bool, } -#[derive(Default)] -struct AsyncOpStats { - created_at: Option, - dropped_at: Option, - task_id: Option, - poll_stats: PollStats, - attributes: HashMap, -} - -impl DroppedAt for ResourceStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl DroppedAt for TaskStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl DroppedAt for AsyncOpStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl PollStats { - fn update_on_span_enter(&mut self, timestamp: SystemTime) { - if self.current_polls == 0 { - self.last_poll_started = Some(timestamp); - if self.first_poll == None { - self.first_poll = Some(timestamp); - } - self.polls += 1; - } - self.current_polls += 1; - } - - fn update_on_span_exit(&mut self, timestamp: SystemTime) { - self.current_polls -= 1; - if self.current_polls == 0 { - if let Some(last_poll_started) = self.last_poll_started { - let elapsed = timestamp.duration_since(last_poll_started).unwrap(); - self.last_poll_ended = Some(timestamp); - self.busy_time += elapsed; - } - } - } - - fn since_last_poll(&self, timestamp: SystemTime) -> Option { - self.last_poll_started - .map(|lps| timestamp.duration_since(lps).unwrap()) - } -} - -impl Default for TaskStats { - fn default() -> Self { - TaskStats { - created_at: None, - dropped_at: None, - wakes: 0, - waker_clones: 0, - waker_drops: 0, - self_wakes: 0, - last_wake: None, - // significant figures should be in the [0-5] range and memory usage - // grows exponentially with higher a sigfig - poll_times_histogram: Histogram::::new(2).unwrap(), - poll_stats: PollStats::default(), - } - } -} - impl Aggregator { pub(crate) fn new( events: mpsc::Receiver, @@ -438,7 +340,8 @@ impl Aggregator { && subscription.update(&proto::tasks::TaskDetails { task_id: Some(id.clone().into()), now: Some(now.into()), - poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(), + poll_times_histogram: None, // TODO(eliza): put back + // poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(), }) { self.details_watchers @@ -513,8 +416,9 @@ impl Aggregator { let details = proto::tasks::TaskDetails { task_id: Some(id.clone().into()), now: Some(now.into()), - poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram) - .ok(), + // poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram) + // .ok(), + poll_times_histogram: None, // TODO(eliza): put back }; watchers.retain(|watch| watch.update(&details)); !watchers.is_empty() @@ -544,6 +448,7 @@ impl Aggregator { id.clone(), Task { id: id.clone(), + is_dirty: AtomicBool::new(false), metadata, fields, location, @@ -554,95 +459,7 @@ impl Aggregator { self.task_stats.insert(id, stats); } - Event::Enter { id, parent_id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.poll_stats.update_on_span_enter(at); - return; - } - - if let Some(mut async_op_stats) = - parent_id.and_then(|parent_id| self.async_op_stats.update(&parent_id)) - { - async_op_stats.poll_stats.update_on_span_enter(at); - } - } - - Event::Exit { id, parent_id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.poll_stats.update_on_span_exit(at); - if let Some(since_last_poll) = task_stats.poll_stats.since_last_poll(at) { - task_stats - .poll_times_histogram - .record(since_last_poll.as_nanos().try_into().unwrap_or(u64::MAX)) - .unwrap(); - } - return; - } - - if let Some(mut async_op_stats) = - parent_id.and_then(|parent_id| self.async_op_stats.update(&parent_id)) - { - async_op_stats.poll_stats.update_on_span_exit(at); - } - } - - Event::Close { id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.dropped_at = Some(at); - } - - if let Some(mut resource_stats) = self.resource_stats.update(&id) { - resource_stats.dropped_at = Some(at); - } - - if let Some(mut async_op_stats) = self.async_op_stats.update(&id) { - async_op_stats.dropped_at = Some(at); - } - } - - Event::Waker { id, op, at } => { - // It's possible for wakers to exist long after a task has - // finished. We don't want those cases to create a "new" - // task that isn't closed, just to insert some waker stats. - // - // It may be useful to eventually be able to report about - // "wasted" waker ops, but we'll leave that for another time. - if let Some(mut task_stats) = self.task_stats.update(&id) { - match op { - WakeOp::Wake { self_wake } | WakeOp::WakeByRef { self_wake } => { - task_stats.wakes += 1; - task_stats.last_wake = Some(at); - - // If the task has woken itself, increment the - // self-wake count. - if self_wake { - task_stats.self_wakes += 1; - } - - // Note: `Waker::wake` does *not* call the `drop` - // implementation, so waking by value doesn't - // trigger a drop event. so, count this as a `drop` - // to ensure the task's number of wakers can be - // calculated as `clones` - `drops`. - // - // see - // https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/core/src/task/wake.rs#L211-L212 - if let WakeOp::Wake { .. } = op { - task_stats.waker_drops += 1; - } - } - WakeOp::Clone => { - task_stats.waker_clones += 1; - } - WakeOp::Drop => { - task_stats.waker_drops += 1; - } - } - } - } - Event::Resource { - at, id, parent_id, metadata, @@ -651,12 +468,13 @@ impl Aggregator { location, is_internal, inherit_child_attrs, - .. + stats, } => { self.resources.insert( id.clone(), Resource { id: id.clone(), + is_dirty: AtomicBool::new(false), parent_id, kind, metadata, @@ -667,13 +485,7 @@ impl Aggregator { }, ); - self.resource_stats.insert( - id, - ResourceStats { - created_at: Some(at), - ..Default::default() - }, - ); + self.resource_stats.insert(id, stats); } Event::PollOp { @@ -684,20 +496,22 @@ impl Aggregator { task_id, is_ready, } => { - let mut async_op_stats = self.async_op_stats.update_or_default(async_op_id.clone()); - async_op_stats.task_id.get_or_insert(task_id.clone()); - - let poll_op = proto::resources::PollOp { - metadata: Some(metadata.into()), - resource_id: Some(resource_id.into()), - name: op_name, - task_id: Some(task_id.into()), - async_op_id: Some(async_op_id.into()), - is_ready, - }; - - self.all_poll_ops.push(poll_op.clone()); - self.new_poll_ops.push(poll_op); + // TODO(eliza): put back + + // let mut async_op_stats = self.async_op_stats.update_or_default(async_op_id.clone()); + // async_op_stats.task_id.get_or_insert(task_id.clone()); + + // let poll_op = proto::resources::PollOp { + // metadata: Some(metadata.into()), + // resource_id: Some(resource_id.into()), + // name: op_name, + // task_id: Some(task_id.into()), + // async_op_id: Some(async_op_id.into()), + // is_ready, + // }; + + // self.all_poll_ops.push(poll_op.clone()); + // self.new_poll_ops.push(poll_op); } Event::StateUpdate { @@ -706,83 +520,84 @@ impl Aggregator { update, .. } => { - let mut to_update = vec![(update_id.clone(), update_type.clone())]; - - fn update_entry(e: Entry<'_, FieldKey, Attribute>, upd: &AttributeUpdate) { - e.and_modify(|attr| update_attribute(attr, upd)) - .or_insert_with(|| upd.clone().into()); - } - - match update_type { - UpdateType::Resource => { - if let Some(parent) = self - .resources - .get(&update_id) - .and_then(|r| self.resources.get(r.parent_id.as_ref()?)) - .filter(|parent| parent.inherit_child_attrs) - { - to_update.push((parent.id.clone(), UpdateType::Resource)); - } - } - UpdateType::AsyncOp => { - if let Some(parent) = self - .async_ops - .get(&update_id) - .and_then(|r| self.async_ops.get(r.parent_id.as_ref()?)) - .filter(|parent| parent.inherit_child_attrs) - { - to_update.push((parent.id.clone(), UpdateType::AsyncOp)); - } - } - } - - for (update_id, update_type) in to_update { - let field_name = match update.field.name.as_ref() { - Some(name) => name.clone(), - None => { - tracing::warn!(?update.field, "field missing name, skipping..."); - return; - } - }; - - let upd_key = FieldKey { - update_id: update_id.clone(), - field_name, - }; - - match update_type { - UpdateType::Resource => { - let mut stats = self.resource_stats.update(&update_id); - let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); - if let Some(entry) = entry { - update_entry(entry, &update); - } - } - UpdateType::AsyncOp => { - let mut stats = self.async_op_stats.update(&update_id); - let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); - if let Some(entry) = entry { - update_entry(entry, &update); - } - } - }; - } + // let update_id = self.ids.id_for(update_id); + // let mut to_update = vec![(update_id, update_type.clone())]; + + // fn update_entry(e: Entry<'_, FieldKey, Attribute>, upd: &AttributeUpdate) { + // e.and_modify(|attr| update_attribute(attr, upd)) + // .or_insert_with(|| upd.clone().into()); + // } + + // match update_type { + // UpdateType::Resource => { + // if let Some(parent) = self + // .resources + // .get(&update_id) + // .and_then(|r| self.resources.get(r.parent_id.as_ref()?)) + // .filter(|parent| parent.inherit_child_attrs) + // { + // to_update.push((parent.id, UpdateType::Resource)); + // } + // } + // UpdateType::AsyncOp => { + // if let Some(parent) = self + // .async_ops + // .get(&update_id) + // .and_then(|r| self.async_ops.get(r.parent_id.as_ref()?)) + // .filter(|parent| parent.inherit_child_attrs) + // { + // to_update.push((parent.id, UpdateType::AsyncOp)); + // } + // } + // } + + // for (update_id, update_type) in to_update { + // let field_name = match update.field.name.as_ref() { + // Some(name) => name.clone(), + // None => { + // tracing::warn!(?update.field, "field missing name, skipping..."); + // return; + // } + // }; + + // let upd_key = FieldKey { + // update_id, + // field_name, + // }; + + // match update_type { + // UpdateType::Resource => { + // let mut stats = self.resource_stats.update(&update_id); + // let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); + // if let Some(entry) = entry { + // update_entry(entry, &update); + // } + // } + // UpdateType::AsyncOp => { + // let mut stats = self.async_op_stats.update(&update_id); + // let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); + // if let Some(entry) = entry { + // update_entry(entry, &update); + // } + // } + // }; + // } } Event::AsyncResourceOp { - at, id, source, resource_id, metadata, parent_id, inherit_child_attrs, - .. + stats, } => { self.async_ops.insert( id.clone(), AsyncOp { id: id.clone(), + is_dirty: AtomicBool::new(false), resource_id, metadata, source, @@ -791,13 +606,7 @@ impl Aggregator { }, ); - self.async_op_stats.insert( - id, - AsyncOpStats { - created_at: Some(at), - ..Default::default() - }, - ); + self.async_op_stats.insert(id, stats); } } } @@ -837,20 +646,6 @@ impl Watch { } } -impl ToProto for PollStats { - type Output = proto::PollStats; - - fn to_proto(&self) -> Self::Output { - proto::PollStats { - polls: self.polls, - first_poll: self.first_poll.map(Into::into), - last_poll_started: self.last_poll_started.map(Into::into), - last_poll_ended: self.last_poll_ended.map(Into::into), - busy_time: Some(self.busy_time.into()), - } - } -} - impl ToProto for Task { type Output = proto::tasks::Task; @@ -867,20 +662,13 @@ impl ToProto for Task { } } -impl ToProto for TaskStats { - type Output = proto::tasks::Stats; +impl Unsent for Task { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } - fn to_proto(&self) -> Self::Output { - proto::tasks::Stats { - poll_stats: Some(self.poll_stats.to_proto()), - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - wakes: self.wakes, - waker_clones: self.waker_clones, - self_wakes: self.self_wakes, - waker_drops: self.waker_drops, - last_wake: self.last_wake.map(Into::into), - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } @@ -900,16 +688,13 @@ impl ToProto for Resource { } } -impl ToProto for ResourceStats { - type Output = proto::resources::Stats; +impl Unsent for Resource { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } - fn to_proto(&self) -> Self::Output { - let attributes = self.attributes.values().cloned().collect(); - proto::resources::Stats { - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - attributes, - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } @@ -927,18 +712,13 @@ impl ToProto for AsyncOp { } } -impl ToProto for AsyncOpStats { - type Output = proto::async_ops::Stats; +impl Unsent for AsyncOp { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } - fn to_proto(&self) -> Self::Output { - let attributes = self.attributes.values().cloned().collect(); - proto::async_ops::Stats { - poll_stats: Some(self.poll_stats.to_proto()), - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - task_id: self.task_id.clone().map(Into::into), - attributes, - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 78924256f..0077d6121 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -130,6 +130,11 @@ pub struct Server { client_buffer: usize, } +pub(crate) trait ToProto { + type Output; + fn to_proto(&self) -> Self::Output; +} + /// State shared between the `ConsoleLayer` and the `Aggregator` task. #[derive(Debug, Default)] struct Shared { @@ -609,7 +614,7 @@ where if let Some((id, mut op)) = visitor.result() { if let Some(span) = ctx.span(&id) { let exts = span.extensions(); - if let Some(stats) = exts.get::() { + if let Some(stats) = exts.get::>() { if op.is_wake() { // Are we currently inside the task's span? If so, the task // has woken itself. @@ -716,15 +721,15 @@ where fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { fn update LookupSpan<'a>>( - span: SpanRef, + span: &SpanRef, at: Option, ) -> Option { let exts = span.extensions(); - if let Some(stats) = exts.get::() { + if let Some(stats) = exts.get::>() { let at = at.unwrap_or_else(SystemTime::now); stats.start_poll(at); Some(at) - } else if let Some(stats) = exts.get::() { + } else if let Some(stats) = exts.get::>() { let at = at.unwrap_or_else(SystemTime::now); stats.start_poll(at); Some(at) @@ -734,10 +739,9 @@ where } if let Some(span) = cx.span(id) { - let now = SystemTime::now(); - if let Some(at) = update(span, None) { + if let Some(now) = update(&span, None) { if let Some(parent) = span.parent() { - update(parent, Some(at)); + update(&parent, Some(now)); } self.current_spans .get_or_default() @@ -749,15 +753,15 @@ where fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) { fn update LookupSpan<'a>>( - span: SpanRef, + span: &SpanRef, at: Option, ) -> Option { let exts = span.extensions(); - if let Some(stats) = exts.get::() { + if let Some(stats) = exts.get::>() { let at = at.unwrap_or_else(SystemTime::now); stats.end_poll(at); Some(at) - } else if let Some(stats) = exts.get::() { + } else if let Some(stats) = exts.get::>() { let at = at.unwrap_or_else(SystemTime::now); stats.end_poll(at); Some(at) @@ -767,10 +771,9 @@ where } if let Some(span) = cx.span(id) { - let now = SystemTime::now(); - if let Some(at) = update(span, None) { + if let Some(now) = update(&span, None) { if let Some(parent) = span.parent() { - update(parent, Some(at)); + update(&parent, Some(now)); } self.current_spans .get_or_default() @@ -783,11 +786,11 @@ where fn on_close(&self, id: span::Id, cx: Context<'_, S>) { if let Some(span) = cx.span(&id) { let exts = span.extensions(); - if let Some(stats) = exts.get::() { + if let Some(stats) = exts.get::>() { stats.drop_task(SystemTime::now()); - } else if let Some(stats) = exts.get::() { + } else if let Some(stats) = exts.get::>() { stats.drop_async_op(SystemTime::now()); - } else if let Some(stats) = exts.get::() { + } else if let Some(stats) = exts.get::>() { stats.drop_resource(SystemTime::now()); } } diff --git a/console-subscriber/src/record.rs b/console-subscriber/src/record.rs index dc7b5708f..fc44a9720 100644 --- a/console-subscriber/src/record.rs +++ b/console-subscriber/src/record.rs @@ -98,28 +98,31 @@ impl Recorder { pub(crate) fn record(&self, event: &crate::Event) { let event = match event { - crate::Event::Spawn { id, at, fields, .. } => Event::Spawn { + crate::Event::Spawn { + id, fields, stats, .. + } => Event::Spawn { id: id.into_u64(), - at: *at, + at: stats.created_at, fields: SerializeFields(fields), }, - crate::Event::Enter { id, at, .. } => Event::Enter { - id: id.into_u64(), - at: *at, - }, - crate::Event::Exit { id, at, .. } => Event::Exit { - id: id.into_u64(), - at: *at, - }, - crate::Event::Close { id, at } => Event::Close { - id: id.into_u64(), - at: *at, - }, - crate::Event::Waker { id, op, at } => Event::Waker { - id: id.into_u64(), - at: *at, - op: *op, - }, + // TODO(eliza): this more or less bricks recording, lol...put this back. + // crate::Event::Enter { id, at, .. } => Event::Enter { + // id: id.into_u64(), + // at: *at, + // }, + // crate::Event::Exit { id, at, .. } => Event::Exit { + // id: id.into_u64(), + // at: *at, + // }, + // crate::Event::Close { id, at } => Event::Close { + // id: id.into_u64(), + // at: *at, + // }, + // crate::Event::Waker { id, op, at } => Event::Waker { + // id: id.into_u64(), + // at: *at, + // op: *op, + // }, _ => return, }; diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index dac690e5d..160f9dc43 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -1,17 +1,15 @@ -use crate::{attribute, sync::Mutex}; +use crate::{attribute, sync::Mutex, ToProto}; use hdrhistogram::Histogram; use std::cmp; use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::*}; +use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering::*}, + Arc, +}; use std::time::{Duration, SystemTime}; use console_api as proto; -pub(crate) trait ToProto { - type Output; - fn to_proto(&self) -> Self::Output; -} - /// A type which records whether it has unsent updates. /// /// If something implementing this trait has been changed since the last time @@ -25,6 +23,7 @@ pub(crate) trait Unsent { /// current update. If this returns `true`, it will be included, so it /// becomes no longer dirty. fn take_unsent(&self) -> bool; + fn is_unsent(&self) -> bool; } // An entity (e.g Task, Resource) that at some point in @@ -35,12 +34,47 @@ pub(crate) trait DroppedAt { fn dropped_at(&self) -> Option; } +impl DroppedAt for Arc { + fn dropped_at(&self) -> Option { + T::dropped_at(self) + } +} + +impl Unsent for Arc { + fn take_unsent(&self) -> bool { + T::take_unsent(self) + } + + fn is_unsent(&self) -> bool { + T::is_unsent(self) + } +} + +impl ToProto for Arc { + type Output = T::Output; + fn to_proto(&self) -> T::Output { + T::to_proto(self) + } +} + +// pub(crate) trait Stats: ToProto { +// fn dropped_at(&self) -> Option; + +// fn to_proto_if_unsent(&self) -> Option<::Output> { +// if self.take_unsent() { +// Some(self.to_proto) +// } else { +// None +// } +// } +// } + #[derive(Debug)] pub(crate) struct TaskStats { is_dirty: AtomicBool, is_dropped: AtomicBool, // task stats - created_at: SystemTime, + pub(crate) created_at: SystemTime, timestamps: Mutex, // waker stats @@ -163,6 +197,10 @@ impl TaskStats { self.make_dirty(); } + pub(crate) fn since_last_poll(&self, now: SystemTime) -> Option { + self.poll_stats.since_last_poll(now) + } + pub(crate) fn drop_task(&self, dropped_at: SystemTime) { if self.is_dropped.swap(true, AcqRel) { // The task was already dropped. @@ -206,6 +244,10 @@ impl Unsent for TaskStats { fn take_unsent(&self) -> bool { self.is_dirty.swap(false, AcqRel) } + + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) + } } impl DroppedAt for TaskStats { @@ -254,6 +296,10 @@ impl AsyncOpStats { self.make_dirty(); } + pub(crate) fn since_last_poll(&self, now: SystemTime) -> Option { + self.poll_stats.since_last_poll(now) + } + #[inline] fn make_dirty(&self) { self.stats.make_dirty() @@ -265,6 +311,11 @@ impl Unsent for AsyncOpStats { fn take_unsent(&self) -> bool { self.stats.take_unsent() } + + #[inline] + fn is_unsent(&self) -> bool { + self.stats.is_unsent() + } } impl DroppedAt for AsyncOpStats { @@ -329,6 +380,10 @@ impl Unsent for ResourceStats { fn take_unsent(&self) -> bool { self.is_dirty.swap(false, AcqRel) } + + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) + } } impl DroppedAt for ResourceStats { @@ -343,6 +398,19 @@ impl DroppedAt for ResourceStats { } } +impl ToProto for ResourceStats { + type Output = proto::resources::Stats; + + fn to_proto(&self) -> Self::Output { + let attributes = self.attributes.lock().values().cloned().collect(); + proto::resources::Stats { + created_at: Some(self.created_at.into()), + dropped_at: self.dropped_at.lock().map(Into::into), + attributes, + } + } +} + // === impl PollStats === impl PollStats { @@ -382,6 +450,13 @@ impl PollStats { } } } + + fn since_last_poll(&self, timestamp: SystemTime) -> Option { + self.timestamps + .lock() + .last_poll_started + .map(|lps| timestamp.duration_since(lps).unwrap()) + } } impl ToProto for PollStats { diff --git a/console-subscriber/src/sync.rs b/console-subscriber/src/sync.rs index 3267adedc..c2907e325 100644 --- a/console-subscriber/src/sync.rs +++ b/console-subscriber/src/sync.rs @@ -22,7 +22,7 @@ mod std_impl { } impl Mutex { - pub(crate) fn read(&self) -> MutexGuard<'_, T> { + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { self.0.lock().unwrap_or_else(PoisonError::into_inner) } }