From 2b2c790b64beb59a885ce785ab01d5c1bd089c43 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Tue, 25 Jun 2024 12:20:16 +0400 Subject: [PATCH] feat(node_framework): Support shutdown hooks + more (#2293) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Adds support for shutdown hooks. These are to be executed sequentially after all the tasks are either completed or dropped. - Note: I didn't spend too much time designing it, went with a "well enough" approach, as there are a ton of other things to do rn. We can revisit the design later, unless there are critical issues here. - One known caveat is that the hooks are not very reusable, and two tasks can add the same hook, and it will be executed twice. Not sure if it's an issue, given that the second execution would be a no-op. - Moves waiting of rocksdb termination from state keeper and metadata calculator tasks to the hooks. - Increases the amount of logs we emit. - Adds task names to many service logs where they were missing. - Collects all the errors that occurred in the framework. - Improves handling of stop signals in queued job processor. ## Why ❔ - Performing the shutdown routine in the task itself is deadlock-prone. - Added logs would've helped with identifying the issues we've already met. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 1 + core/lib/queued_job_processor/src/lib.rs | 24 +++- core/node/node_framework/Cargo.toml | 1 + .../layers/metadata_calculator.rs | 25 ++-- .../src/implementations/layers/sigint.rs | 4 +- .../layers/state_keeper/mod.rs | 16 +-- .../implementations/layers/web3_api/server.rs | 5 +- core/node/node_framework/src/precondition.rs | 10 +- .../node_framework/src/service/context.rs | 28 +++- core/node/node_framework/src/service/error.rs | 20 ++- core/node/node_framework/src/service/mod.rs | 122 +++++++++++++----- .../src/service/named_future.rs | 52 ++++++++ .../node_framework/src/service/runnables.rs | 90 +++++-------- core/node/node_framework/src/task.rs | 32 ++++- 14 files changed, 308 insertions(+), 122 deletions(-) create mode 100644 core/node/node_framework/src/service/named_future.rs diff --git a/Cargo.lock b/Cargo.lock index d2e139bb48ed..f21b4c393d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8691,6 +8691,7 @@ dependencies = [ "async-trait", "ctrlc", "futures 0.3.28", + "pin-project-lite", "thiserror", "tokio", "tracing", diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 569a2b7f59da..a5a4fa39fcae 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::Context as _; pub use async_trait::async_trait; -use tokio::{sync::watch, task::JoinHandle, time::sleep}; +use tokio::{sync::watch, task::JoinHandle}; use vise::{Buckets, Counter, Histogram, LabeledFamily, Metrics}; use zksync_utils::panic_extractor::try_extract_panic_message; @@ -57,7 +57,7 @@ pub trait JobProcessor: Sync + Send { /// To process a batch, pass `Some(batch_size)`. async fn run( self, - stop_receiver: watch::Receiver, + mut stop_receiver: watch::Receiver, mut iterations_left: Option, ) -> anyhow::Result<()> where @@ -86,7 +86,7 @@ pub trait JobProcessor: Sync + Send { ); let task = self.process_job(&job_id, job, started_at).await; - self.wait_for_task(job_id, started_at, task) + self.wait_for_task(job_id, started_at, task, &mut stop_receiver) .await .context("wait_for_task")?; } else if iterations_left.is_some() { @@ -94,7 +94,10 @@ pub trait JobProcessor: Sync + Send { return Ok(()); } else { tracing::trace!("Backing off for {} ms", backoff); - sleep(Duration::from_millis(backoff)).await; + // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. + tokio::time::timeout(Duration::from_millis(backoff), stop_receiver.changed()) + .await + .ok(); backoff = (backoff * Self::BACKOFF_MULTIPLIER).min(Self::MAX_BACKOFF_MS); } } @@ -108,6 +111,7 @@ pub trait JobProcessor: Sync + Send { job_id: Self::JobId, started_at: Instant, task: JoinHandle>, + stop_receiver: &mut watch::Receiver, ) -> anyhow::Result<()> { let attempts = self.get_job_attempts(&job_id).await?; let max_attempts = self.max_attempts(); @@ -130,7 +134,17 @@ pub trait JobProcessor: Sync + Send { if task.is_finished() { break task.await; } - sleep(Duration::from_millis(Self::POLLING_INTERVAL_MS)).await; + if tokio::time::timeout( + Duration::from_millis(Self::POLLING_INTERVAL_MS), + stop_receiver.changed(), + ) + .await + .is_ok() + { + // Stop signal received, return early. + // Exit will be processed/reported by the main loop. + return Ok(()); + } }; let error_message = match result { Ok(Ok(data)) => { diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index 5bed78e4b601..f5b5d9c89165 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -47,6 +47,7 @@ zksync_reorg_detector.workspace = true zksync_vm_runner.workspace = true zksync_node_db_pruner.workspace = true +pin-project-lite.workspace = true tracing.workspace = true thiserror.workspace = true async-trait.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index bc1244410bf2..9fe954c91e4f 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -118,34 +118,27 @@ impl WiringLayer for MetadataCalculatorLayer { metadata_calculator.tree_reader(), )))?; - let metadata_calculator_task = Box::new(MetadataCalculatorTask { - metadata_calculator, + context.add_task(Box::new(metadata_calculator)); + + context.add_shutdown_hook("rocksdb_terminaton", async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") }); - context.add_task(metadata_calculator_task); Ok(()) } } -#[derive(Debug)] -pub struct MetadataCalculatorTask { - metadata_calculator: MetadataCalculator, -} - #[async_trait::async_trait] -impl Task for MetadataCalculatorTask { +impl Task for MetadataCalculator { fn id(&self) -> TaskId { "metadata_calculator".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - let result = self.metadata_calculator.run(stop_receiver.0).await; - - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .context("failed terminating RocksDB instances")?; - result + (*self).run(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/layers/sigint.rs b/core/node/node_framework/src/implementations/layers/sigint.rs index 2d11f1525370..255305629c64 100644 --- a/core/node/node_framework/src/implementations/layers/sigint.rs +++ b/core/node/node_framework/src/implementations/layers/sigint.rs @@ -51,7 +51,9 @@ impl UnconstrainedTask for SigintHandlerTask { // Wait for either SIGINT or stop signal. tokio::select! { - _ = sigint_receiver => {}, + _ = sigint_receiver => { + tracing::info!("Received SIGINT signal"); + }, _ = stop_receiver.0.changed() => {}, }; diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index 97364f6388cd..46e56eca0e65 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -91,6 +91,13 @@ impl WiringLayer for StateKeeperLayer { sealer, storage_factory: Arc::new(storage_factory), })); + + context.add_shutdown_hook("rocksdb_terminaton", async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") + }); Ok(()) } } @@ -119,14 +126,7 @@ impl Task for StateKeeperTask { self.sealer, self.storage_factory, ); - let result = state_keeper.run().await; - - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .unwrap(); - - result + state_keeper.run().await } } diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index da0d9d3cc33a..428e5c88503d 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -250,7 +250,10 @@ impl Task for ApiTaskGarbageCollector { // We can ignore the stop signal here, since we're tied to the main API task through the channel: // it'll either get dropped if API cannot be built or will send something through the channel. // The tasks it sends are aware of the stop receiver themselves. - let tasks = self.task_receiver.await?; + let Ok(tasks) = self.task_receiver.await else { + // API cannot be built, so there are no tasks to wait for. + return Ok(()); + }; let _ = futures::future::join_all(tasks).await; Ok(()) } diff --git a/core/node/node_framework/src/precondition.rs b/core/node/node_framework/src/precondition.rs index a612c5b90a8b..d81e0328bb62 100644 --- a/core/node/node_framework/src/precondition.rs +++ b/core/node/node_framework/src/precondition.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{fmt, sync::Arc}; use tokio::sync::Barrier; @@ -31,3 +31,11 @@ impl dyn Precondition { } } } + +impl fmt::Debug for dyn Precondition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Precondition") + .field("name", &self.id()) + .finish() + } +} diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 81d094630c32..9507c2287752 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -1,9 +1,11 @@ -use std::any::type_name; +use std::{any::type_name, future::Future}; + +use futures::FutureExt as _; use crate::{ precondition::Precondition, resource::{Resource, ResourceId, StoredResource}, - service::ZkStackService, + service::{named_future::NamedFuture, ZkStackService}, task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, wiring_layer::WiringError, }; @@ -95,6 +97,28 @@ impl<'a> ServiceContext<'a> { self } + /// Adds a future to be invoked after node shutdown. + /// May be used to perform cleanup tasks. + /// + /// The future is guaranteed to only be polled after all the node tasks are stopped or timed out. + /// All the futures will be awaited sequentially. + pub fn add_shutdown_hook( + &mut self, + name: &'static str, + hook: impl Future> + Send + 'static, + ) -> &mut Self { + tracing::info!( + "Layer {} has added a new shutdown hook: {}", + self.layer, + name + ); + self.service + .runnables + .shutdown_hooks + .push(NamedFuture::new(hook.boxed(), name.into())); + self + } + /// Attempts to retrieve the resource with the specified name. /// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting /// on behalf of the caller. diff --git a/core/node/node_framework/src/service/error.rs b/core/node/node_framework/src/service/error.rs index 173745e74c75..9e95b437419b 100644 --- a/core/node/node_framework/src/service/error.rs +++ b/core/node/node_framework/src/service/error.rs @@ -1,4 +1,18 @@ -use crate::wiring_layer::WiringError; +use crate::{task::TaskId, wiring_layer::WiringError}; + +#[derive(Debug, thiserror::Error)] +pub enum TaskError { + #[error("Task {0} failed: {1}")] + TaskFailed(TaskId, anyhow::Error), + #[error("Task {0} panicked: {1}")] + TaskPanicked(TaskId, String), + #[error("Shutdown for task {0} timed out")] + TaskShutdownTimedOut(TaskId), + #[error("Shutdown hook {0} failed: {1}")] + ShutdownHookFailed(TaskId, anyhow::Error), + #[error("Shutdown hook {0} timed out")] + ShutdownHookTimedOut(TaskId), +} #[derive(Debug, thiserror::Error)] pub enum ZkStackServiceError { @@ -8,6 +22,6 @@ pub enum ZkStackServiceError { NoTasks, #[error("One or more wiring layers failed to initialize: {0:?}")] Wiring(Vec<(String, WiringError)>), - #[error(transparent)] - Task(#[from] anyhow::Error), + #[error("One or more tasks failed: {0:?}")] + Task(Vec), } diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4a504f393c3a..57035a048d86 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -1,7 +1,9 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Context; -use futures::{future::BoxFuture, FutureExt}; +use error::TaskError; +use futures::FutureExt; +use runnables::NamedBoxFuture; use tokio::{runtime::Runtime, sync::watch}; use zksync_utils::panic_extractor::try_extract_panic_message; @@ -10,11 +12,13 @@ pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receive use crate::{ resource::{ResourceId, StoredResource}, service::runnables::TaskReprs, + task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; mod context; mod error; +mod named_future; mod runnables; mod stop_receiver; #[cfg(test)] @@ -138,6 +142,7 @@ impl ZkStackService { let TaskReprs { mut long_running_tasks, oneshot_tasks, + shutdown_hooks, } = self .runnables .prepare_tasks(task_barrier.clone(), stop_receiver.clone()); @@ -159,67 +164,124 @@ impl ZkStackService { let rt_handle = self.runtime.handle().clone(); let join_handles: Vec<_> = long_running_tasks .into_iter() - .map(|task| rt_handle.spawn(task).fuse()) + .map(|task| { + let name = task.id(); + NamedBoxFuture::new(rt_handle.spawn(task.into_inner()).fuse().boxed(), name) + }) .collect(); + // Collect names for remaining tasks for reporting purposes. + let mut tasks_names: Vec<_> = join_handles.iter().map(|task| task.id()).collect(); + // Run the tasks until one of them exits. - let (resolved, _, remaining) = self + let (resolved, resolved_idx, remaining) = self .runtime .block_on(futures::future::select_all(join_handles)); - let result = match resolved { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err).context("Task failed"), - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - Err(anyhow::format_err!( - "One of the tasks panicked: {panic_msg}" - )) - } - }; + // Extract the result and report it to logs early, before waiting for any other task to shutdown. + // We will also collect the errors from the remaining tasks, hence a vector. + let mut errors = Vec::new(); + let task_name = tasks_names.swap_remove(resolved_idx); + handle_task_exit(resolved, task_name, &mut errors); + tracing::info!("One of the task has exited, shutting down the node"); + // Collect names for remaining tasks for reporting purposes. + // We have to re-collect, becuase `select_all` does not guarantes the order of returned remaining futures. + let remaining_tasks_names: Vec<_> = remaining.iter().map(|task| task.id()).collect(); let remaining_tasks_with_timeout: Vec<_> = remaining .into_iter() .map(|task| async { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, task).await }) .collect(); // Send stop signal to remaining tasks and wait for them to finish. - // Given that we are shutting down, we do not really care about returned values. self.stop_sender.send(true).ok(); let execution_results = self .runtime .block_on(futures::future::join_all(remaining_tasks_with_timeout)); - let execution_timeouts_count = execution_results.iter().filter(|&r| r.is_err()).count(); - if execution_timeouts_count > 0 { - tracing::warn!( - "{execution_timeouts_count} tasks didn't finish in {TASK_SHUTDOWN_TIMEOUT:?} and were dropped" - ); - } else { - tracing::info!("Remaining tasks finished without reaching timeouts"); + + // Report the results of the remaining tasks. + for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) { + match result { + Ok(resolved) => { + handle_task_exit(resolved, name, &mut errors); + } + Err(_) => { + tracing::error!("Task {name} timed out"); + errors.push(TaskError::TaskShutdownTimedOut(name)); + } + } + } + + // Run shutdown hooks sequentially. + for hook in shutdown_hooks { + let name = hook.id().clone(); + // Limit each shutdown hook to the same timeout as the tasks. + let hook_with_timeout = + async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook).await }; + match self.runtime.block_on(hook_with_timeout) { + Ok(Ok(())) => { + tracing::info!("Shutdown hook {name} completed"); + } + Ok(Err(err)) => { + tracing::error!("Shutdown hook {name} failed: {err}"); + errors.push(TaskError::ShutdownHookFailed(name, err)); + } + Err(_) => { + tracing::error!("Shutdown hook {name} timed out"); + errors.push(TaskError::ShutdownHookTimedOut(name)); + } + } } tracing::info!("Exiting the service"); - result?; - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(ZkStackServiceError::Task(errors)) + } } } +fn handle_task_exit( + task_result: Result, tokio::task::JoinError>, + task_name: TaskId, + errors: &mut Vec, +) { + match task_result { + Ok(Ok(())) => { + tracing::info!("Task {task_name} finished"); + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} failed: {err}"); + errors.push(TaskError::TaskFailed(task_name, err)); + } + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("Task {task_name} panicked: {panic_msg}"); + errors.push(TaskError::TaskPanicked(task_name, panic_msg)); + } + }; +} + fn oneshot_runner_task( - oneshot_tasks: Vec>>, + oneshot_tasks: Vec>>, mut stop_receiver: StopReceiver, only_oneshot_tasks: bool, -) -> BoxFuture<'static, anyhow::Result<()>> { - Box::pin(async move { +) -> NamedBoxFuture> { + let future = async move { let oneshot_tasks = oneshot_tasks.into_iter().map(|fut| async move { // Spawn each oneshot task as a separate tokio task. // This way we can handle the cases when such a task panics and propagate the message // to the service. let handle = tokio::runtime::Handle::current(); + let name = fut.id().to_string(); match handle.spawn(fut).await { Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err), + Ok(Err(err)) => Err(err).with_context(|| format!("Oneshot task {name} failed")), Err(panic_err) => { let panic_msg = try_extract_panic_message(panic_err); - Err(anyhow::format_err!("Oneshot task panicked: {panic_msg}")) + Err(anyhow::format_err!( + "Oneshot task {name} panicked: {panic_msg}" + )) } } }); @@ -240,5 +302,7 @@ fn oneshot_runner_task( // Note that we don't have to `select` on the stop signal explicitly: // Each prerequisite is given a stop signal, and if everyone respects it, this future // will still resolve once the stop signal is received. - }) + }; + + NamedBoxFuture::new(future.boxed(), "oneshot_runner".into()) } diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs new file mode 100644 index 000000000000..9aa715b0a74b --- /dev/null +++ b/core/node/node_framework/src/service/named_future.rs @@ -0,0 +1,52 @@ +use std::{fmt, future::Future, pin::Pin, task}; + +use pin_project_lite::pin_project; + +use crate::task::TaskId; + +pin_project! { + /// Implements a future with the name tag attached. + pub struct NamedFuture { + #[pin] + inner: F, + name: TaskId, + } +} + +impl NamedFuture +where + F: Future, +{ + /// Creates a new future with the name tag attached. + pub fn new(inner: F, name: TaskId) -> Self { + Self { inner, name } + } + + pub fn id(&self) -> TaskId { + self.name.clone() + } + + pub fn into_inner(self) -> F { + self.inner + } +} + +impl Future for NamedFuture +where + F: Future, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + tracing::info_span!("NamedFuture", name = %self.name) + .in_scope(|| self.project().inner.poll(cx)) + } +} + +impl fmt::Debug for NamedFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NamedFuture") + .field("name", &self.name) + .finish_non_exhaustive() + } +} diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 7f35e384d6cc..8d240a8cffab 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -1,15 +1,17 @@ use std::{fmt, sync::Arc}; -use anyhow::Context as _; use futures::future::BoxFuture; use tokio::sync::Barrier; -use super::StopReceiver; +use super::{named_future::NamedFuture, StopReceiver}; use crate::{ precondition::Precondition, task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, }; +/// Alias for futures with the name assigned. +pub type NamedBoxFuture = NamedFuture>; + /// A collection of different flavors of tasks. #[derive(Default)] pub(super) struct Runnables { @@ -23,35 +25,31 @@ pub(super) struct Runnables { pub(super) unconstrained_tasks: Vec>, /// Unconstrained oneshot tasks added to the service. pub(super) unconstrained_oneshot_tasks: Vec>, + /// List of hooks to be invoked after node shutdown. + pub(super) shutdown_hooks: Vec>>, } impl fmt::Debug for Runnables { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Macro that iterates over a `Vec`, invokes `.id()` method and collects the results into a `Vec`. - // Returns a reference to created `Vec` to satisfy the `.field` method signature. - macro_rules! ids { - ($vec:expr) => { - &$vec.iter().map(|x| x.id()).collect::>() - }; - } - f.debug_struct("Runnables") - .field("preconditions", ids!(self.preconditions)) - .field("tasks", ids!(self.tasks)) - .field("oneshot_tasks", ids!(self.oneshot_tasks)) - .field("unconstrained_tasks", ids!(self.unconstrained_tasks)) + .field("preconditions", &self.preconditions) + .field("tasks", &self.tasks) + .field("oneshot_tasks", &self.oneshot_tasks) + .field("unconstrained_tasks", &self.unconstrained_tasks) .field( "unconstrained_oneshot_tasks", - ids!(self.unconstrained_oneshot_tasks), + &self.unconstrained_oneshot_tasks, ) + .field("shutdown_hooks", &self.shutdown_hooks) .finish() } } /// A unified representation of tasks that can be run by the service. pub(super) struct TaskReprs { - pub(super) long_running_tasks: Vec>>, - pub(super) oneshot_tasks: Vec>>, + pub(super) long_running_tasks: Vec>>, + pub(super) oneshot_tasks: Vec>>, + pub(super) shutdown_hooks: Vec>>, } impl fmt::Debug for TaskReprs { @@ -59,6 +57,7 @@ impl fmt::Debug for TaskReprs { f.debug_struct("TaskReprs") .field("long_running_tasks", &self.long_running_tasks.len()) .field("oneshot_tasks", &self.oneshot_tasks.len()) + .field("shutdown_hooks", &self.shutdown_hooks.len()) .finish() } } @@ -118,29 +117,26 @@ impl Runnables { TaskReprs { long_running_tasks, oneshot_tasks, + shutdown_hooks: self.shutdown_hooks, } } fn collect_unconstrained_tasks( &mut self, - tasks: &mut Vec>>, + tasks: &mut Vec>>, stop_receiver: StopReceiver, ) { for task in std::mem::take(&mut self.unconstrained_tasks) { let name = task.id(); let stop_receiver = stop_receiver.clone(); - let task_future = Box::pin(async move { - task.run_unconstrained(stop_receiver) - .await - .with_context(|| format!("Task {name} failed")) - }); - tasks.push(task_future); + let task_future = Box::pin(task.run_unconstrained(stop_receiver)); + tasks.push(NamedFuture::new(task_future, name)); } } fn collect_tasks( &mut self, - tasks: &mut Vec>>, + tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -148,18 +144,14 @@ impl Runnables { let name = task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - task.run_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Task {name} failed")) - }); - tasks.push(task_future); + let task_future = Box::pin(task.run_with_barrier(stop_receiver, task_barrier)); + tasks.push(NamedFuture::new(task_future, name)); } } fn collect_preconditions( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -167,19 +159,15 @@ impl Runnables { let name = precondition.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - precondition - .check_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Precondition {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(precondition.check_with_barrier(stop_receiver, task_barrier)); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } fn collect_oneshot_tasks( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -187,31 +175,23 @@ impl Runnables { let name = oneshot_task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - oneshot_task - .run_oneshot_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Oneshot task {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(oneshot_task.run_oneshot_with_barrier(stop_receiver, task_barrier)); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } fn collect_unconstrained_oneshot_tasks( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, stop_receiver: StopReceiver, ) { for unconstrained_oneshot_task in std::mem::take(&mut self.unconstrained_oneshot_tasks) { let name = unconstrained_oneshot_task.id(); let stop_receiver = stop_receiver.clone(); - let task_future = Box::pin(async move { - unconstrained_oneshot_task - .run_unconstrained_oneshot(stop_receiver) - .await - .with_context(|| format!("Unconstrained oneshot task {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(unconstrained_oneshot_task.run_unconstrained_oneshot(stop_receiver)); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } } diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs index 8ff73d75d8fa..8bb7bbd2c702 100644 --- a/core/node/node_framework/src/task.rs +++ b/core/node/node_framework/src/task.rs @@ -29,7 +29,7 @@ //! - A task that may be a driving force for some precondition to be met. use std::{ - fmt::{Display, Formatter}, + fmt::{self, Display, Formatter}, ops::Deref, sync::Arc, }; @@ -117,6 +117,12 @@ impl dyn Task { } } +impl fmt::Debug for dyn Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Task").field("name", &self.id()).finish() + } +} + /// A oneshot task implementation. /// The difference from [`Task`] is that this kind of task may exit without causing the service to shutdown. /// @@ -160,6 +166,14 @@ impl dyn OneshotTask { } } +impl fmt::Debug for dyn OneshotTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OneshotTask") + .field("name", &self.id()) + .finish() + } +} + /// A task implementation that is not constrained by preconditions. /// /// This trait is used to define tasks that should start immediately after the wiring phase, without waiting for @@ -176,6 +190,14 @@ pub trait UnconstrainedTask: 'static + Send { async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; } +impl fmt::Debug for dyn UnconstrainedTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnconstrainedTask") + .field("name", &self.id()) + .finish() + } +} + /// An unconstrained analog of [`OneshotTask`]. /// See [`UnconstrainedTask`] and [`OneshotTask`] for more details. #[async_trait::async_trait] @@ -189,3 +211,11 @@ pub trait UnconstrainedOneshotTask: 'static + Send { stop_receiver: StopReceiver, ) -> anyhow::Result<()>; } + +impl fmt::Debug for dyn UnconstrainedOneshotTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnconstrainedOneshotTask") + .field("name", &self.id()) + .finish() + } +}