Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriber): support multiple task callsites #68

Merged
merged 5 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions console-subscriber/src/callsites.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use tracing_core::Metadata;

#[derive(Debug, Default)]
pub(crate) struct Callsites {
// In practice each of these will have like, 1-5 callsites in it, max, so
// 32 is probably fine...if it ever becomes not fine, we'll fix that.
ptrs: [AtomicPtr<Metadata<'static>>; 32],
len: AtomicUsize,
}

impl Callsites {
#[track_caller]
pub(crate) fn insert(&self, callsite: &'static Metadata<'static>) {
// The callsite may already have been inserted, if the callsite cache
// was invalidated and is being rebuilt. In that case, don't insert it
// again.'
if self.contains(callsite) {
return;
}

let idx = self.len.fetch_add(1, Ordering::AcqRel);
assert!(
idx < 64,
hawkw marked this conversation as resolved.
Show resolved Hide resolved
"you tried to store more than 64 callsites, \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still says 64 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops lol

Suggested change
"you tried to store more than 64 callsites, \
"you tried to store more than 32 callsites, \

time to make the callsite sets bigger i guess \
(please open an issue for this)"
);
self.ptrs[idx]
.compare_exchange(
ptr::null_mut(),
callsite as *const _ as *mut _,
Ordering::AcqRel,
Ordering::Acquire,
)
.expect("a callsite would have been clobbered by `insert` (this is a bug)");
}

pub(crate) fn contains(&self, callsite: &'static Metadata<'static>) -> bool {
let len = self.len.load(Ordering::Acquire);
for cs in &self.ptrs[..len + 1] {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
if ptr::eq(cs.load(Ordering::Acquire), callsite) {
return true;
}
}
false
}
}
58 changes: 24 additions & 34 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use tokio::sync::{mpsc, oneshot};

use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
ptr,
sync::{
atomic::{AtomicPtr, Ordering::*},
Arc,
},
sync::Arc,
time::{Duration, SystemTime},
};
use tracing_core::{
Expand All @@ -20,18 +16,21 @@ use tracing_core::{
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

mod aggregator;
use aggregator::Aggregator;
mod builder;
mod callsites;
mod init;

use aggregator::Aggregator;
pub use builder::Builder;
use callsites::Callsites;

mod init;
pub use init::{build, init};

pub struct TasksLayer {
task_meta: AtomicPtr<Metadata<'static>>,
blocking_meta: AtomicPtr<Metadata<'static>>,
tx: mpsc::Sender<Event>,
flush: Arc<aggregator::Flush>,
spawn_callsites: Callsites,
waker_callsites: Callsites,
}

pub struct Server {
Expand Down Expand Up @@ -139,8 +138,8 @@ impl TasksLayer {
let layer = Self {
tx,
flush,
task_meta: AtomicPtr::new(ptr::null_mut()),
blocking_meta: AtomicPtr::new(ptr::null_mut()),
spawn_callsites: Callsites::default(),
waker_callsites: Callsites::default(),
};
(layer, server)
}
Expand All @@ -159,10 +158,8 @@ impl TasksLayer {
// chosen by fair die roll, guaranteed to be random :)
const FLUSH_AT_CAPACITY: usize = 100;

#[inline(always)]
fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
ptr::eq(self.task_meta.load(Relaxed), meta as *const _ as *mut _)
// || ptr::eq(self.blocking_meta.load(Relaxed), meta as *const _ as *mut _)
self.spawn_callsites.contains(meta)
}

fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
Expand Down Expand Up @@ -216,24 +213,19 @@ where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
if meta.target() == "tokio::task" && meta.name() == "task" {
if meta.fields().iter().any(|f| f.name() == "function") {
let _ = self.blocking_meta.compare_exchange(
ptr::null_mut(),
meta as *const _ as *mut _,
AcqRel,
Acquire,
);
} else {
let _ = self.task_meta.compare_exchange(
ptr::null_mut(),
meta as *const _ as *mut _,
AcqRel,
Acquire,
);
}
if meta.name() == "runtime.spawn"
// back compat until tokio is updated to use the standardized naming
// scheme
|| (meta.name() == "task" && meta.target() == "tokio::task")
{
self.spawn_callsites.insert(meta);
} else if meta.target() == "runtime::waker"
// back compat until tokio is updated to use the standardized naming
// scheme
|| meta.target() == "tokio::task::waker"
{
self.waker_callsites.insert(meta);
}

self.send(Event::Metadata(meta));

subscriber::Interest::always()
Expand All @@ -259,9 +251,7 @@ where
}

fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let meta = event.metadata();
// make faster like spawn metadata pointer check?
if meta.target() == "tokio::task::waker" {
if self.waker_callsites.contains(event.metadata()) {
let at = SystemTime::now();
let mut visitor = WakerVisitor { id: None, op: None };
event.record(&mut visitor);
Expand Down
33 changes: 28 additions & 5 deletions console/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub(crate) struct Task {
kind: &'static str,
stats: Stats,
completed_for: usize,
target: Arc<str>,
}

#[derive(Debug, Default)]
Expand All @@ -56,6 +57,7 @@ pub(crate) struct Details {
#[derive(Debug)]
pub(crate) struct Metadata {
field_names: Vec<Arc<str>>,
target: Arc<str>,
//TODO: add more metadata as needed
}

Expand Down Expand Up @@ -141,6 +143,20 @@ impl State {
proto::tasks::task::Kind::Blocking => "B",
};

let meta_id = match task.metadata.as_ref() {
Some(id) => id.id,
None => {
tracing::warn!(?task, "task has no metadata ID, skipping");
return None;
}
};
let meta = match metas.get(&meta_id) {
Some(meta) => meta,
None => {
tracing::warn!(?task, meta_id, "no metadata for task, skipping");
return None;
}
};
let fields: Vec<Field> = task
.fields
.drain(..)
Expand All @@ -149,11 +165,12 @@ impl State {
let name: Option<Arc<str>> = match field_name {
proto::field::Name::StrName(n) => Some(n.clone().into()),
proto::field::Name::NameIdx(idx) => {
let meta_id = f.metadata_id.as_ref()?;
metas
.get(&meta_id.id)
.and_then(|meta| meta.field_names.get(*idx as usize))
.cloned()
debug_assert_eq!(
f.metadata_id.map(|m| m.id),
Some(meta_id),
"malformed field name: metadata ID mismatch!"
);
meta.field_names.get(*idx as usize).cloned()
}
};
let value = f.value.as_ref().expect("no value").clone().into();
Expand All @@ -180,6 +197,7 @@ impl State {
kind,
stats,
completed_for: 0,
target: meta.target.clone(),
};
task.update();
let task = Rc::new(RefCell::new(task));
Expand Down Expand Up @@ -246,6 +264,10 @@ impl Task {
&self.id_hex
}

pub(crate) fn target(&self) -> &str {
&self.target
}

pub(crate) fn formatted_fields(&self) -> &[Vec<Span<'static>>] {
&self.formatted_fields
}
Expand Down Expand Up @@ -373,6 +395,7 @@ impl From<proto::Metadata> for Metadata {
fn from(pb: proto::Metadata) -> Self {
Self {
field_names: pb.field_names.into_iter().map(|n| n.into()).collect(),
target: pb.target.into(),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions console/src/view/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl TaskView {
.constraints(
[
layout::Constraint::Length(1),
layout::Constraint::Length(6),
layout::Constraint::Length(7),
layout::Constraint::Length(9),
layout::Constraint::Percentage(60),
]
Expand Down Expand Up @@ -102,6 +102,7 @@ impl TaskView {
]);

let attrs = Spans::from(vec![bold("ID: "), Span::raw(task.id_hex())]);
let target = Spans::from(vec![bold("Target: "), Span::raw(task.target())]);

let mut total = vec![
bold("Total Time: "),
Expand All @@ -124,7 +125,7 @@ impl TaskView {
Span::from(format!("{:.prec$?}", task.idle(now), prec = DUR_PRECISION,)),
]);

let metrics = vec![attrs, total, busy, idle];
let metrics = vec![attrs, target, total, busy, idle];

let wakers = Spans::from(vec![
bold("Current wakers: "),
Expand Down
8 changes: 6 additions & 2 deletions console/src/view/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ pub(crate) struct List {
}

impl List {
const HEADER: &'static [&'static str] =
&["TID", "KIND", "TOTAL", "BUSY", "IDLE", "POLLS", "FIELDS"];
const HEADER: &'static [&'static str] = &[
"TID", "KIND", "TOTAL", "BUSY", "IDLE", "POLLS", "TARGET", "FIELDS",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the target useful when the fields are showing spawn.location?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, spawn.location indicates which line in the source code the task was spawned from, while the target tells us which library the span was generated in. For example, when using Tokio, the target will be tokio::task, while the spawn location field will be the line in the user code where tokio::task::spawn was called. This lets us easily distinguish between task spans provided by different crates with runtime instrumentation --- in the screenshot I posted, we can use the target to tell which entries in the task list correspond to Tokio tasks and which ones correspond to Rayon tasks, without memorizing which source locations in the application are calls to tokio::spawn and which are calls to rayon::spawn. The target field also allows us to sort (and, eventually, filter) the task list by which runtime the task spans came from, so that (for example) we can see all the Rayon tasks followed by all the Tokio tasks.

Also, we don't currently require any particular fields from the runtime providing the task spans...so just because Tokio currently provides a spawn.location field, other runtimes may not.

];

pub(crate) fn update_input(&mut self, event: input::Event) {
// Clippy likes to remind us that we could use an `if let` here, since
Expand Down Expand Up @@ -83,6 +84,7 @@ impl List {
// there's room for the unit!)
const DUR_PRECISION: usize = 4;
const POLLS_LEN: usize = 5;
const MIN_TARGET_LEN: usize = 15;

self.sorted_tasks.extend(state.take_new_tasks());
self.sort_by.sort(now, &mut self.sorted_tasks);
Expand Down Expand Up @@ -115,6 +117,7 @@ impl List {
prec = DUR_PRECISION,
)),
Cell::from(format!("{:>width$}", task.total_polls(), width = POLLS_LEN)),
Cell::from(task.target().to_owned()),
Cell::from(Spans::from(
task.formatted_fields()
.iter()
Expand Down Expand Up @@ -169,6 +172,7 @@ impl List {
layout::Constraint::Min(DUR_LEN as u16),
layout::Constraint::Min(DUR_LEN as u16),
layout::Constraint::Min(POLLS_LEN as u16),
layout::Constraint::Min(MIN_TARGET_LEN as u16),
layout::Constraint::Min(10),
])
.highlight_symbol(">> ")
Expand Down