Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add group name in task metrics #10196

Merged
merged 18 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl SpawnTaskHandle {
/// In other words, it would be a bad idea for someone to do for example
/// `spawn(format!("{:?}", some_public_key))`.
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_inner(name, task, TaskType::Async)
self.spawn_inner(name, None, task, TaskType::Async)
}

/// Spawns the blocking task with the given name. See also `spawn`.
Expand All @@ -66,38 +66,46 @@ impl SpawnTaskHandle {
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, task, TaskType::Blocking)
self.spawn_inner(name, None, task, TaskType::Blocking)
}

/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
fn spawn_inner(
&self,
name: &'static str,
subsystem: Option<&'static str>,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
if self.task_notifier.is_closed() {
debug!("Attempt to spawn a new task has been prevented: {}", name);
return
return;
}

let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
// Provide a default subsystem name.
let subsystem_name = subsystem.unwrap_or("substrate-unspecified");
sandreim marked this conversation as resolved.
Show resolved Hide resolved

// Note that we increase the started counter here and not within the future. This way,
// we could properly visualize on Prometheus situations where the spawning doesn't work.
if let Some(metrics) = &self.metrics {
metrics.tasks_spawned.with_label_values(&[name]).inc();
metrics.tasks_spawned.with_label_values(&[name, subsystem_name]).inc();
// We do a dummy increase in order for the task to show up in metrics.
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0);
metrics
.tasks_ended
.with_label_values(&[name, "finished", subsystem_name])
.inc_by(0);
}

let future = async move {
if let Some(metrics) = metrics {
// Add some wrappers around `task`.
let task = {
let poll_duration = metrics.poll_duration.with_label_values(&[name]);
let poll_start = metrics.poll_start.with_label_values(&[name]);
let poll_duration =
metrics.poll_duration.with_label_values(&[name, subsystem_name]);
let poll_start =
metrics.poll_start.with_label_values(&[name, subsystem_name]);
let inner =
prometheus_future::with_poll_durations(poll_duration, poll_start, task);
// The logic of `AssertUnwindSafe` here is ok considering that we throw
Expand All @@ -108,16 +116,25 @@ impl SpawnTaskHandle {

match select(on_exit, task).await {
Either::Right((Err(payload), _)) => {
metrics.tasks_ended.with_label_values(&[name, "panic"]).inc();
metrics
.tasks_ended
.with_label_values(&[name, "panic", subsystem_name])
.inc();
panic::resume_unwind(payload)
},
}
Either::Right((Ok(()), _)) => {
metrics.tasks_ended.with_label_values(&[name, "finished"]).inc();
},
metrics
.tasks_ended
.with_label_values(&[name, "finished", subsystem_name])
.inc();
}
Either::Left(((), _)) => {
// The `on_exit` has triggered.
metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc();
},
metrics
.tasks_ended
.with_label_values(&[name, "interrupted", subsystem_name])
.inc();
}
}
} else {
futures::pin_mut!(task);
Expand All @@ -133,7 +150,7 @@ impl SpawnTaskHandle {
self.tokio_handle.spawn_blocking(move || {
handle.block_on(future);
})
},
}
};

let _ = self.task_notifier.unbounded_send(join_handle);
Expand All @@ -148,6 +165,15 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle {
fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn(name, future);
}

fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str,future: BoxFuture<'static, ()>) {
self.spawn_inner(name, Some(subsystem), future, TaskType::Blocking)
}

fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_inner(name, Some(subsystem), future, TaskType::Async)
}

}

/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
Expand Down Expand Up @@ -199,7 +225,7 @@ impl SpawnEssentialTaskHandle {
let _ = essential_failed.close_channel();
});

let _ = self.inner.spawn_inner(name, essential_task, task_type);
let _ = self.inner.spawn_inner(name, None, essential_task, task_type);
}
}

Expand Down Expand Up @@ -396,28 +422,28 @@ impl Metrics {
buckets: exponential_buckets(0.001, 4.0, 9)
.expect("function parameters are constant and always valid; qed"),
},
&["task_name"]
&["task_name", "subsystem"]
)?, registry)?,
poll_start: register(CounterVec::new(
Opts::new(
"tasks_polling_started_total",
"Total number of times we started invoking Future::poll"
),
&["task_name"]
&["task_name", "subsystem"]
)?, registry)?,
tasks_spawned: register(CounterVec::new(
Opts::new(
"tasks_spawned_total",
"Total number of tasks that have been spawned on the Service"
),
&["task_name"]
&["task_name", "subsystem"]
)?, registry)?,
tasks_ended: register(CounterVec::new(
Opts::new(
"tasks_ended_total",
"Total number of tasks for which Future::poll has returned Ready(()) or panicked"
),
&["task_name", "reason"]
&["task_name", "reason", "subsystem"]
)?, registry)?,
})
}
Expand Down
43 changes: 42 additions & 1 deletion primitives/core/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ sp_externalities::decl_extension! {
pub struct RuntimeSpawnExt(Box<dyn RuntimeSpawn>);
}

/// Something that can spawn tasks (blocking and non-blocking) with an assigned name.
/// Something that can spawn tasks (blocking and non-blocking) with an assigned name
/// and subsystem.
#[dyn_clonable::clonable]
pub trait SpawnNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
Expand All @@ -201,6 +202,30 @@ pub trait SpawnNamed: Clone + Send + Sync {
///
/// The given `name` is used to identify the future in tracing.
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
/// Spawn the given blocking future.
///
/// The given `subsystem` and `name` is used to identify the future in tracing.
fn spawn_blocking_with_subsystem(
&self,
name: &'static str,
_subsystem: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
// Default impl doesn't trace subsystem.
self.spawn_blocking(name, future);
}
sandreim marked this conversation as resolved.
Show resolved Hide resolved
/// Spawn the given non-blocking future.
///
/// The given `subsystem` and `name` is used to identify the future in tracing.
fn spawn_with_subsystem(
&self,
name: &'static str,
_subsystem: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
// Default impl doesn't trace subsystem.
self.spawn(name, future);
}
sandreim marked this conversation as resolved.
Show resolved Hide resolved
}

impl SpawnNamed for Box<dyn SpawnNamed> {
Expand All @@ -211,6 +236,22 @@ impl SpawnNamed for Box<dyn SpawnNamed> {
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn(name, future)
}
fn spawn_blocking_with_subsystem(
&self,
name: &'static str,
subsystem: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn_blocking_with_subsystem(name, subsystem, future)
}
fn spawn_with_subsystem(
&self,
name: &'static str,
subsystem: &'static str,
future: futures::future::BoxFuture<'static, ()>,
) {
(**self).spawn_with_subsystem(name, subsystem, future)
}
}

/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name.
Expand Down