Skip to content

Commit

Permalink
put back JSON recording
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw committed Dec 28, 2021
1 parent 94f541e commit 235b2c4
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 136 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ tracing = "0.1.26"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# The parking_lot dependency is renamed, because we want our `parking_lot`
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"

# Required for recording:
serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

[dev-dependencies]
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] }
futures = "0.3"
Expand Down
11 changes: 0 additions & 11 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ pub(crate) struct Aggregator {
/// This is emptied on every state update.
new_poll_ops: Vec<proto::resources::PollOp>,

/// A sink to record all events to a file.
recorder: Option<Recorder>,

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,
}
Expand Down Expand Up @@ -168,10 +165,6 @@ impl Aggregator {
async_op_stats: IdData::default(),
all_poll_ops: Default::default(),
new_poll_ops: Default::default(),
recorder: builder
.recording_path
.as_ref()
.map(|path| Recorder::new(path).expect("creating recorder")),
temporality: Temporality::Live,
}
}
Expand Down Expand Up @@ -232,10 +225,6 @@ impl Aggregator {
while let Some(event) = self.events.recv().now_or_never() {
match event {
Some(event) => {
// always be recording...
if let Some(ref recorder) = self.recorder {
recorder.record(&event);
}
self.update_state(event);
drained = true;
}
Expand Down
52 changes: 44 additions & 8 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod visitors;
use aggregator::Aggregator;
pub use builder::Builder;
use callsites::Callsites;
use record::Recorder;
use stack::SpanStack;
use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};

Expand Down Expand Up @@ -110,6 +111,9 @@ pub struct ConsoleLayer {

/// Used for unsetting the default dispatcher inside of span callbacks.
no_dispatch: Dispatch,

/// A sink to record all events to a file.
recorder: Option<Recorder>,
}

/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
Expand Down Expand Up @@ -293,7 +297,10 @@ impl ConsoleLayer {
// Conservatively, start to trigger a flush when half the channel is full.
// This tries to reduce the chance of losing events to a full channel.
let flush_under_capacity = config.event_buffer_capacity / 2;

let recorder = config
.recording_path
.as_ref()
.map(|path| Recorder::new(path).expect("creating recorder"));
let server = Server {
aggregator: Some(aggregator),
addr: config.server_addr,
Expand All @@ -314,6 +321,7 @@ impl ConsoleLayer {
resource_state_update_callsites: Callsites::default(),
async_op_state_update_callsites: Callsites::default(),
no_dispatch: Dispatch::new(NoSubscriber::default()),
recorder,
};
(layer, server)
}
Expand Down Expand Up @@ -462,6 +470,12 @@ impl ConsoleLayer {

sent
}

fn record(&self, event: impl FnOnce() -> record::Event) {
if let Some(ref recorder) = self.recorder {
recorder.record(event());
}
}
}

impl<S> Layer<S> for ConsoleLayer
Expand Down Expand Up @@ -516,6 +530,11 @@ where
let mut task_visitor = TaskVisitor::new(metadata.into());
attrs.record(&mut task_visitor);
let (fields, location) = task_visitor.result();
self.record(|| record::Event::Spawn {
id: id.into_u64(),
at,
fields: record::SerializeFields(fields.clone()),
});
if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
let stats = Arc::new(stats::TaskStats::new(at));
let event = Event::Spawn {
Expand Down Expand Up @@ -628,6 +647,11 @@ where
}

stats.record_wake_op(op, at);
self.record(|| record::Event::Waker {
id: id.into_u64(),
at,
op,
});
}
}
}
Expand Down Expand Up @@ -747,6 +771,11 @@ where
.get_or_default()
.borrow_mut()
.push(id.clone());

self.record(|| record::Event::Enter {
id: id.into_u64(),
at: now,
});
}
}
}
Expand Down Expand Up @@ -775,24 +804,31 @@ where
if let Some(parent) = span.parent() {
update(&parent, Some(now));
}
self.current_spans
.get_or_default()
.borrow_mut()
.push(id.clone());
self.current_spans.get_or_default().borrow_mut().pop(id);

self.record(|| record::Event::Exit {
id: id.into_u64(),
at: now,
});
}
}
}

fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
if let Some(span) = cx.span(&id) {
let now = SystemTime::now();
let exts = span.extensions();
if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
stats.drop_task(SystemTime::now());
stats.drop_task(now);
} else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
stats.drop_async_op(SystemTime::now());
stats.drop_async_op(now);
} else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
stats.drop_resource(SystemTime::now());
stats.drop_resource(now);
}
self.record(|| record::Event::Close {
id: id.into_u64(),
at: now,
});
}
}
}
Expand Down
Loading

0 comments on commit 235b2c4

Please sign in to comment.