From d4f7c1744ade45fd36cc93b4fdcb6e659f246134 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 09:49:53 +0400 Subject: [PATCH 01/12] Support shutdown hooks in the node --- .../node_framework/src/service/context.rs | 12 +++++++++++ core/node/node_framework/src/service/mod.rs | 20 ++++++++++++++++++- .../node_framework/src/service/runnables.rs | 7 +++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 81d094630c32..9a1b51830cc3 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -8,6 +8,8 @@ use crate::{ wiring_layer::WiringError, }; +use super::runnables::ShutdownHook; + /// An interface to the service's resources provided to the tasks during initialization. /// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle. #[derive(Debug)] @@ -95,6 +97,16 @@ impl<'a> ServiceContext<'a> { self } + /// Adds a function to be invoked after node shutdown. + /// May be used to perform cleanup tasks. + /// + /// All the collected shutdown hooks will be invoked sequentially after all the node tasks are stopped. + pub fn add_shutdown_hook(&mut self, hook: ShutdownHook) -> &mut Self { + tracing::info!("Layer {} has added a new shutdown hook", self.layer); + self.service.runnables.shutdown_hooks.push(hook); + self + } + /// Attempts to retrieve the resource with the specified name. /// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting /// on behalf of the caller. diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4a504f393c3a..009ac5acbae9 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -138,6 +138,7 @@ impl ZkStackService { let TaskReprs { mut long_running_tasks, oneshot_tasks, + shutdown_hooks, } = self .runnables .prepare_tasks(task_barrier.clone(), stop_receiver.clone()); @@ -166,11 +167,16 @@ impl ZkStackService { let (resolved, _, remaining) = self .runtime .block_on(futures::future::select_all(join_handles)); + // Extract the result and report it to logs early, before waiting for any other task to shutdown. let result = match resolved { Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err).context("Task failed"), + Ok(Err(err)) => { + tracing::error!("Task failed: {err}"); + Err(err).context("Task failed") + } Err(panic_err) => { let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("One of the tasks panicked: {panic_msg}"); Err(anyhow::format_err!( "One of the tasks panicked: {panic_msg}" )) @@ -197,6 +203,18 @@ impl ZkStackService { tracing::info!("Remaining tasks finished without reaching timeouts"); } + // Run shutdown hooks sequentially. + for hook in shutdown_hooks { + // Limit each shutdown hook to the same timeout as the tasks. + let hook_with_timeout = tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook()); + match self.runtime.block_on(hook_with_timeout) { + Ok(()) => {} + Err(_) => { + tracing::error!("One of the shutdown hooks timed out"); + } + } + } + tracing::info!("Exiting the service"); result?; Ok(()) diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 7f35e384d6cc..f6a913fc3194 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -10,6 +10,8 @@ use crate::{ task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, }; +pub type ShutdownHook = Box BoxFuture<'static, ()> + Send + Sync + 'static>; + /// A collection of different flavors of tasks. #[derive(Default)] pub(super) struct Runnables { @@ -23,6 +25,8 @@ pub(super) struct Runnables { pub(super) unconstrained_tasks: Vec>, /// Unconstrained oneshot tasks added to the service. pub(super) unconstrained_oneshot_tasks: Vec>, + /// List of hooks to be invoked after node shutdown. + pub(super) shutdown_hooks: Vec, } impl fmt::Debug for Runnables { @@ -52,6 +56,7 @@ impl fmt::Debug for Runnables { pub(super) struct TaskReprs { pub(super) long_running_tasks: Vec>>, pub(super) oneshot_tasks: Vec>>, + pub(super) shutdown_hooks: Vec, } impl fmt::Debug for TaskReprs { @@ -59,6 +64,7 @@ impl fmt::Debug for TaskReprs { f.debug_struct("TaskReprs") .field("long_running_tasks", &self.long_running_tasks.len()) .field("oneshot_tasks", &self.oneshot_tasks.len()) + .field("shutdown_hooks", &self.shutdown_hooks.len()) .finish() } } @@ -118,6 +124,7 @@ impl Runnables { TaskReprs { long_running_tasks, oneshot_tasks, + shutdown_hooks: self.shutdown_hooks, } } From b79524c0821e539e60a9495cdec3d5648a9681bc Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 10:09:34 +0400 Subject: [PATCH 02/12] Use shutdown hooks in metadata calculator and state keeper --- .../layers/metadata_calculator.rs | 27 ++++---- .../layers/state_keeper/mod.rs | 18 +++--- core/node/node_framework/src/precondition.rs | 10 ++- .../node_framework/src/service/context.rs | 13 +++- core/node/node_framework/src/service/mod.rs | 11 +++- .../node_framework/src/service/runnables.rs | 62 ++++++++++++++----- core/node/node_framework/src/task.rs | 32 +++++++++- 7 files changed, 127 insertions(+), 46 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index bc1244410bf2..5b5decfe9061 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -118,34 +118,29 @@ impl WiringLayer for MetadataCalculatorLayer { metadata_calculator.tree_reader(), )))?; - let metadata_calculator_task = Box::new(MetadataCalculatorTask { - metadata_calculator, + context.add_task(Box::new(metadata_calculator)); + + context.add_shutdown_hook("rocksdb_terminaton", || { + Box::pin(async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") + }) }); - context.add_task(metadata_calculator_task); Ok(()) } } -#[derive(Debug)] -pub struct MetadataCalculatorTask { - metadata_calculator: MetadataCalculator, -} - #[async_trait::async_trait] -impl Task for MetadataCalculatorTask { +impl Task for MetadataCalculator { fn id(&self) -> TaskId { "metadata_calculator".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - let result = self.metadata_calculator.run(stop_receiver.0).await; - - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .context("failed terminating RocksDB instances")?; - result + (*self).run(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index 97364f6388cd..7a77c219bb49 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -91,6 +91,15 @@ impl WiringLayer for StateKeeperLayer { sealer, storage_factory: Arc::new(storage_factory), })); + + context.add_shutdown_hook("rocksdb_terminaton", || { + Box::pin(async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") + }) + }); Ok(()) } } @@ -119,14 +128,7 @@ impl Task for StateKeeperTask { self.sealer, self.storage_factory, ); - let result = state_keeper.run().await; - - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .unwrap(); - - result + state_keeper.run().await } } diff --git a/core/node/node_framework/src/precondition.rs b/core/node/node_framework/src/precondition.rs index a612c5b90a8b..d81e0328bb62 100644 --- a/core/node/node_framework/src/precondition.rs +++ b/core/node/node_framework/src/precondition.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{fmt, sync::Arc}; use tokio::sync::Barrier; @@ -31,3 +31,11 @@ impl dyn Precondition { } } } + +impl fmt::Debug for dyn Precondition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Precondition") + .field("name", &self.id()) + .finish() + } +} diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 9a1b51830cc3..926e168536e9 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -8,7 +8,7 @@ use crate::{ wiring_layer::WiringError, }; -use super::runnables::ShutdownHook; +use super::runnables::{ShutdownHook, ShutdownHookFn}; /// An interface to the service's resources provided to the tasks during initialization. /// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle. @@ -101,9 +101,16 @@ impl<'a> ServiceContext<'a> { /// May be used to perform cleanup tasks. /// /// All the collected shutdown hooks will be invoked sequentially after all the node tasks are stopped. - pub fn add_shutdown_hook(&mut self, hook: ShutdownHook) -> &mut Self { + pub fn add_shutdown_hook( + &mut self, + name: &'static str, + hook: impl ShutdownHookFn, + ) -> &mut Self { tracing::info!("Layer {} has added a new shutdown hook", self.layer); - self.service.runnables.shutdown_hooks.push(hook); + self.service + .runnables + .shutdown_hooks + .push(ShutdownHook::new(name, hook)); self } diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 009ac5acbae9..93d6d8935886 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -205,10 +205,17 @@ impl ZkStackService { // Run shutdown hooks sequentially. for hook in shutdown_hooks { + let name = hook.id().clone(); // Limit each shutdown hook to the same timeout as the tasks. - let hook_with_timeout = tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook()); + let hook_with_timeout = tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook.invoke()); match self.runtime.block_on(hook_with_timeout) { - Ok(()) => {} + Ok(Ok(())) => { + tracing::info!("Shutdown hook {name} completed"); + } + Ok(Err(err)) => { + tracing::error!("Shutdown hook {name} failed: {err}"); + // We still have to invoke all the remaining hooks, so we don't return early. + } Err(_) => { tracing::error!("One of the shutdown hooks timed out"); } diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index f6a913fc3194..5f61065505db 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -7,10 +7,49 @@ use tokio::sync::Barrier; use super::StopReceiver; use crate::{ precondition::Precondition, - task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, + task::{OneshotTask, Task, TaskId, UnconstrainedOneshotTask, UnconstrainedTask}, }; -pub type ShutdownHook = Box BoxFuture<'static, ()> + Send + Sync + 'static>; +/// Alias for a shutdown hook function type. +pub trait ShutdownHookFn: + FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static +{ +} + +impl ShutdownHookFn for T where + T: FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static +{ +} + +pub struct ShutdownHook { + id: TaskId, + hook: Box, +} + +impl ShutdownHook { + pub fn new(id: impl Into, hook: impl ShutdownHookFn) -> Self { + Self { + id: id.into(), + hook: Box::new(hook), + } + } + + pub fn id(&self) -> &TaskId { + &self.id + } + + pub async fn invoke(self) -> anyhow::Result<()> { + (self.hook)().await + } +} + +impl fmt::Debug for ShutdownHook { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ShutdownHook") + .field("name", &self.id) + .finish() + } +} /// A collection of different flavors of tasks. #[derive(Default)] @@ -31,23 +70,16 @@ pub(super) struct Runnables { impl fmt::Debug for Runnables { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Macro that iterates over a `Vec`, invokes `.id()` method and collects the results into a `Vec`. - // Returns a reference to created `Vec` to satisfy the `.field` method signature. - macro_rules! ids { - ($vec:expr) => { - &$vec.iter().map(|x| x.id()).collect::>() - }; - } - f.debug_struct("Runnables") - .field("preconditions", ids!(self.preconditions)) - .field("tasks", ids!(self.tasks)) - .field("oneshot_tasks", ids!(self.oneshot_tasks)) - .field("unconstrained_tasks", ids!(self.unconstrained_tasks)) + .field("preconditions", &self.preconditions) + .field("tasks", &self.tasks) + .field("oneshot_tasks", &self.oneshot_tasks) + .field("unconstrained_tasks", &self.unconstrained_tasks) .field( "unconstrained_oneshot_tasks", - ids!(self.unconstrained_oneshot_tasks), + &self.unconstrained_oneshot_tasks, ) + .field("shutdown_hooks", &self.shutdown_hooks) .finish() } } diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs index 8ff73d75d8fa..8bb7bbd2c702 100644 --- a/core/node/node_framework/src/task.rs +++ b/core/node/node_framework/src/task.rs @@ -29,7 +29,7 @@ //! - A task that may be a driving force for some precondition to be met. use std::{ - fmt::{Display, Formatter}, + fmt::{self, Display, Formatter}, ops::Deref, sync::Arc, }; @@ -117,6 +117,12 @@ impl dyn Task { } } +impl fmt::Debug for dyn Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Task").field("name", &self.id()).finish() + } +} + /// A oneshot task implementation. /// The difference from [`Task`] is that this kind of task may exit without causing the service to shutdown. /// @@ -160,6 +166,14 @@ impl dyn OneshotTask { } } +impl fmt::Debug for dyn OneshotTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OneshotTask") + .field("name", &self.id()) + .finish() + } +} + /// A task implementation that is not constrained by preconditions. /// /// This trait is used to define tasks that should start immediately after the wiring phase, without waiting for @@ -176,6 +190,14 @@ pub trait UnconstrainedTask: 'static + Send { async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; } +impl fmt::Debug for dyn UnconstrainedTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnconstrainedTask") + .field("name", &self.id()) + .finish() + } +} + /// An unconstrained analog of [`OneshotTask`]. /// See [`UnconstrainedTask`] and [`OneshotTask`] for more details. #[async_trait::async_trait] @@ -189,3 +211,11 @@ pub trait UnconstrainedOneshotTask: 'static + Send { stop_receiver: StopReceiver, ) -> anyhow::Result<()>; } + +impl fmt::Debug for dyn UnconstrainedOneshotTask { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("UnconstrainedOneshotTask") + .field("name", &self.id()) + .finish() + } +} From f8b78759f02d88ee7cdc8d2005bc75c8e1f5c7c2 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 10:13:31 +0400 Subject: [PATCH 03/12] Increase number of emitted logs --- .../node_framework/src/implementations/layers/sigint.rs | 4 +++- core/node/node_framework/src/service/context.rs | 6 +++++- core/node/node_framework/src/service/mod.rs | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/sigint.rs b/core/node/node_framework/src/implementations/layers/sigint.rs index 2d11f1525370..255305629c64 100644 --- a/core/node/node_framework/src/implementations/layers/sigint.rs +++ b/core/node/node_framework/src/implementations/layers/sigint.rs @@ -51,7 +51,9 @@ impl UnconstrainedTask for SigintHandlerTask { // Wait for either SIGINT or stop signal. tokio::select! { - _ = sigint_receiver => {}, + _ = sigint_receiver => { + tracing::info!("Received SIGINT signal"); + }, _ = stop_receiver.0.changed() => {}, }; diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 926e168536e9..a5882753e075 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -106,7 +106,11 @@ impl<'a> ServiceContext<'a> { name: &'static str, hook: impl ShutdownHookFn, ) -> &mut Self { - tracing::info!("Layer {} has added a new shutdown hook", self.layer); + tracing::info!( + "Layer {} has added a new shutdown hook: {}", + self.layer, + name + ); self.service .runnables .shutdown_hooks diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 93d6d8935886..4990436773b3 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -182,6 +182,7 @@ impl ZkStackService { )) } }; + tracing::info!("One of the task has exited, shutting down the node"); let remaining_tasks_with_timeout: Vec<_> = remaining .into_iter() From 3ee2941f8b88cb33307b46ece27631589bb92fce Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 10:53:47 +0400 Subject: [PATCH 04/12] Create timeout future in context of tokio runtime --- core/node/node_framework/src/service/context.rs | 3 +-- core/node/node_framework/src/service/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index a5882753e075..6db7101822b7 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -1,5 +1,6 @@ use std::any::type_name; +use super::runnables::{ShutdownHook, ShutdownHookFn}; use crate::{ precondition::Precondition, resource::{Resource, ResourceId, StoredResource}, @@ -8,8 +9,6 @@ use crate::{ wiring_layer::WiringError, }; -use super::runnables::{ShutdownHook, ShutdownHookFn}; - /// An interface to the service's resources provided to the tasks during initialization. /// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle. #[derive(Debug)] diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4990436773b3..d87ac88b11df 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -208,7 +208,8 @@ impl ZkStackService { for hook in shutdown_hooks { let name = hook.id().clone(); // Limit each shutdown hook to the same timeout as the tasks. - let hook_with_timeout = tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook.invoke()); + let hook_with_timeout = + async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook.invoke()).await }; match self.runtime.block_on(hook_with_timeout) { Ok(Ok(())) => { tracing::info!("Shutdown hook {name} completed"); From 776ce8284b2c0f0d4c9348a0374327f03d27a806 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 10:58:11 +0400 Subject: [PATCH 05/12] Improve logging --- core/node/node_framework/src/service/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index d87ac88b11df..a051591af39c 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -197,7 +197,7 @@ impl ZkStackService { .block_on(futures::future::join_all(remaining_tasks_with_timeout)); let execution_timeouts_count = execution_results.iter().filter(|&r| r.is_err()).count(); if execution_timeouts_count > 0 { - tracing::warn!( + tracing::error!( "{execution_timeouts_count} tasks didn't finish in {TASK_SHUTDOWN_TIMEOUT:?} and were dropped" ); } else { @@ -219,7 +219,7 @@ impl ZkStackService { // We still have to invoke all the remaining hooks, so we don't return early. } Err(_) => { - tracing::error!("One of the shutdown hooks timed out"); + tracing::error!("Shutdown hook {name} timed out"); } } } From d45167a6d0822f633710899fe2d234b94674affa Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 13:53:24 +0400 Subject: [PATCH 06/12] Report names for tasks during their lifecycle --- Cargo.lock | 1 + core/node/node_framework/Cargo.toml | 1 + core/node/node_framework/src/service/mod.rs | 81 ++++++++++++++----- .../src/service/named_future.rs | 41 ++++++++++ .../node_framework/src/service/runnables.rs | 66 ++++++--------- 5 files changed, 127 insertions(+), 63 deletions(-) create mode 100644 core/node/node_framework/src/service/named_future.rs diff --git a/Cargo.lock b/Cargo.lock index 3582fbe51319..7ca00e48e49d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8765,6 +8765,7 @@ dependencies = [ "async-trait", "ctrlc", "futures 0.3.28", + "pin-project-lite", "prometheus_exporter", "prover_dal", "thiserror", diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index d48522fb8116..08eabd6def6d 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -47,6 +47,7 @@ zksync_reorg_detector.workspace = true zksync_vm_runner.workspace = true zksync_node_db_pruner.workspace = true +pin-project-lite.workspace = true tracing.workspace = true thiserror.workspace = true async-trait.workspace = true diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index a051591af39c..a8f9ebb1247a 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -1,7 +1,8 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Context; -use futures::{future::BoxFuture, FutureExt}; +use futures::FutureExt; +use runnables::NamedBoxFuture; use tokio::{runtime::Runtime, sync::watch}; use zksync_utils::panic_extractor::try_extract_panic_message; @@ -15,6 +16,7 @@ use crate::{ mod context; mod error; +mod named_future; mod runnables; mod stop_receiver; #[cfg(test)] @@ -160,48 +162,80 @@ impl ZkStackService { let rt_handle = self.runtime.handle().clone(); let join_handles: Vec<_> = long_running_tasks .into_iter() - .map(|task| rt_handle.spawn(task).fuse()) + .map(|task| { + let name = task.name().to_string(); + NamedBoxFuture::new(rt_handle.spawn(task.into_inner()).fuse().boxed(), name) + }) .collect(); + // Collect names for remaining tasks for reporting purposes. + let tasks_names = join_handles + .iter() + .map(|task| task.name().to_string()) + .collect::>(); + // Run the tasks until one of them exits. - let (resolved, _, remaining) = self + let (resolved, resolved_idx, remaining) = self .runtime .block_on(futures::future::select_all(join_handles)); // Extract the result and report it to logs early, before waiting for any other task to shutdown. let result = match resolved { - Ok(Ok(())) => Ok(()), + Ok(Ok(())) => { + tracing::info!("Task {} finished", tasks_names[resolved_idx]); + Ok(()) + } Ok(Err(err)) => { - tracing::error!("Task failed: {err}"); + tracing::error!("Task {} failed: {err}", tasks_names[resolved_idx]); Err(err).context("Task failed") } Err(panic_err) => { let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("One of the tasks panicked: {panic_msg}"); + tracing::error!("Task {}: {panic_msg}", tasks_names[resolved_idx]); Err(anyhow::format_err!( - "One of the tasks panicked: {panic_msg}" + "Task {} panicked: {panic_msg}", + tasks_names[resolved_idx] )) } }; tracing::info!("One of the task has exited, shutting down the node"); + // Collect names for remaining tasks for reporting purposes. + // We have to re-collect, becuase `select_all` does not guarantes the order of returned remaining futures. + let remaining_tasks_names = remaining + .iter() + .map(|task| task.name().to_string()) + .collect::>(); let remaining_tasks_with_timeout: Vec<_> = remaining .into_iter() .map(|task| async { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, task).await }) .collect(); // Send stop signal to remaining tasks and wait for them to finish. - // Given that we are shutting down, we do not really care about returned values. self.stop_sender.send(true).ok(); let execution_results = self .runtime .block_on(futures::future::join_all(remaining_tasks_with_timeout)); - let execution_timeouts_count = execution_results.iter().filter(|&r| r.is_err()).count(); - if execution_timeouts_count > 0 { - tracing::error!( - "{execution_timeouts_count} tasks didn't finish in {TASK_SHUTDOWN_TIMEOUT:?} and were dropped" - ); - } else { - tracing::info!("Remaining tasks finished without reaching timeouts"); + + // Report the results of the remaining tasks. + for (name, result) in remaining_tasks_names + .into_iter() + .zip(execution_results.into_iter()) + { + match result { + Ok(Ok(Ok(()))) => { + tracing::info!("Task {name} finished"); + } + Ok(Ok(Err(err))) => { + tracing::error!("Task {name} failed: {err}"); + } + Ok(Err(err)) => { + let panic_msg = try_extract_panic_message(err); + tracing::error!("Task {name} panicked: {panic_msg}"); + } + Err(_) => { + tracing::error!("Task {name} timed out"); + } + } } // Run shutdown hooks sequentially. @@ -231,22 +265,25 @@ impl ZkStackService { } fn oneshot_runner_task( - oneshot_tasks: Vec>>, + oneshot_tasks: Vec>>, mut stop_receiver: StopReceiver, only_oneshot_tasks: bool, -) -> BoxFuture<'static, anyhow::Result<()>> { - Box::pin(async move { +) -> NamedBoxFuture> { + let future = async move { let oneshot_tasks = oneshot_tasks.into_iter().map(|fut| async move { // Spawn each oneshot task as a separate tokio task. // This way we can handle the cases when such a task panics and propagate the message // to the service. let handle = tokio::runtime::Handle::current(); + let name = fut.name().to_string(); match handle.spawn(fut).await { Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err), + Ok(Err(err)) => Err(err).with_context(|| format!("Oneshot task {name} failed")), Err(panic_err) => { let panic_msg = try_extract_panic_message(panic_err); - Err(anyhow::format_err!("Oneshot task panicked: {panic_msg}")) + Err(anyhow::format_err!( + "Oneshot task {name} panicked: {panic_msg}" + )) } } }); @@ -267,5 +304,7 @@ fn oneshot_runner_task( // Note that we don't have to `select` on the stop signal explicitly: // Each prerequisite is given a stop signal, and if everyone respects it, this future // will still resolve once the stop signal is received. - }) + }; + + NamedBoxFuture::new(future.boxed(), "Oneshot runner".to_string()) } diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs new file mode 100644 index 000000000000..0f34bfec944e --- /dev/null +++ b/core/node/node_framework/src/service/named_future.rs @@ -0,0 +1,41 @@ +use pin_project_lite::pin_project; +use std::{future::Future, pin::Pin, task}; + +pin_project! { + /// Implements a future with the name tag attached. + #[derive(Debug)] + pub struct NamedFuture { + #[pin] + inner: F, + name: String, + } +} + +impl NamedFuture +where + F: Future, +{ + /// Creates a new future with the name tag attached. + pub fn new(inner: F, name: String) -> Self { + Self { inner, name } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn into_inner(self) -> F { + self.inner + } +} + +impl Future for NamedFuture +where + F: Future, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + self.project().inner.poll(cx) + } +} diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 5f61065505db..0b9903733c91 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -1,15 +1,17 @@ use std::{fmt, sync::Arc}; -use anyhow::Context as _; use futures::future::BoxFuture; use tokio::sync::Barrier; -use super::StopReceiver; +use super::{named_future::NamedFuture, StopReceiver}; use crate::{ precondition::Precondition, task::{OneshotTask, Task, TaskId, UnconstrainedOneshotTask, UnconstrainedTask}, }; +/// Alias for futures with the name assigned. +pub type NamedBoxFuture = NamedFuture>; + /// Alias for a shutdown hook function type. pub trait ShutdownHookFn: FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static @@ -86,8 +88,8 @@ impl fmt::Debug for Runnables { /// A unified representation of tasks that can be run by the service. pub(super) struct TaskReprs { - pub(super) long_running_tasks: Vec>>, - pub(super) oneshot_tasks: Vec>>, + pub(super) long_running_tasks: Vec>>, + pub(super) oneshot_tasks: Vec>>, pub(super) shutdown_hooks: Vec, } @@ -162,24 +164,20 @@ impl Runnables { fn collect_unconstrained_tasks( &mut self, - tasks: &mut Vec>>, + tasks: &mut Vec>>, stop_receiver: StopReceiver, ) { for task in std::mem::take(&mut self.unconstrained_tasks) { let name = task.id(); let stop_receiver = stop_receiver.clone(); - let task_future = Box::pin(async move { - task.run_unconstrained(stop_receiver) - .await - .with_context(|| format!("Task {name} failed")) - }); - tasks.push(task_future); + let task_future = Box::pin(task.run_unconstrained(stop_receiver)); + tasks.push(NamedFuture::new(task_future, name.to_string())); } } fn collect_tasks( &mut self, - tasks: &mut Vec>>, + tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -187,18 +185,14 @@ impl Runnables { let name = task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - task.run_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Task {name} failed")) - }); - tasks.push(task_future); + let task_future = Box::pin(task.run_with_barrier(stop_receiver, task_barrier)); + tasks.push(NamedFuture::new(task_future, name.to_string())); } } fn collect_preconditions( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -206,19 +200,15 @@ impl Runnables { let name = precondition.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - precondition - .check_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Precondition {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(precondition.check_with_barrier(stop_receiver, task_barrier)); + oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); } } fn collect_oneshot_tasks( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, task_barrier: Arc, stop_receiver: StopReceiver, ) { @@ -226,31 +216,23 @@ impl Runnables { let name = oneshot_task.id(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(async move { - oneshot_task - .run_oneshot_with_barrier(stop_receiver, task_barrier) - .await - .with_context(|| format!("Oneshot task {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(oneshot_task.run_oneshot_with_barrier(stop_receiver, task_barrier)); + oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); } } fn collect_unconstrained_oneshot_tasks( &mut self, - oneshot_tasks: &mut Vec>>, + oneshot_tasks: &mut Vec>>, stop_receiver: StopReceiver, ) { for unconstrained_oneshot_task in std::mem::take(&mut self.unconstrained_oneshot_tasks) { let name = unconstrained_oneshot_task.id(); let stop_receiver = stop_receiver.clone(); - let task_future = Box::pin(async move { - unconstrained_oneshot_task - .run_unconstrained_oneshot(stop_receiver) - .await - .with_context(|| format!("Unconstrained oneshot task {name} failed")) - }); - oneshot_tasks.push(task_future); + let task_future = + Box::pin(unconstrained_oneshot_task.run_unconstrained_oneshot(stop_receiver)); + oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); } } } From 02d0b35a4435e1f2d14f37ee6be78d48d142721a Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 14:07:06 +0400 Subject: [PATCH 07/12] Collect all the errors that happen in the service --- core/node/node_framework/src/service/error.rs | 20 ++++++-- core/node/node_framework/src/service/mod.rs | 49 ++++++++++--------- .../src/service/named_future.rs | 10 ++-- .../node_framework/src/service/runnables.rs | 10 ++-- 4 files changed, 53 insertions(+), 36 deletions(-) diff --git a/core/node/node_framework/src/service/error.rs b/core/node/node_framework/src/service/error.rs index 173745e74c75..9e95b437419b 100644 --- a/core/node/node_framework/src/service/error.rs +++ b/core/node/node_framework/src/service/error.rs @@ -1,4 +1,18 @@ -use crate::wiring_layer::WiringError; +use crate::{task::TaskId, wiring_layer::WiringError}; + +#[derive(Debug, thiserror::Error)] +pub enum TaskError { + #[error("Task {0} failed: {1}")] + TaskFailed(TaskId, anyhow::Error), + #[error("Task {0} panicked: {1}")] + TaskPanicked(TaskId, String), + #[error("Shutdown for task {0} timed out")] + TaskShutdownTimedOut(TaskId), + #[error("Shutdown hook {0} failed: {1}")] + ShutdownHookFailed(TaskId, anyhow::Error), + #[error("Shutdown hook {0} timed out")] + ShutdownHookTimedOut(TaskId), +} #[derive(Debug, thiserror::Error)] pub enum ZkStackServiceError { @@ -8,6 +22,6 @@ pub enum ZkStackServiceError { NoTasks, #[error("One or more wiring layers failed to initialize: {0:?}")] Wiring(Vec<(String, WiringError)>), - #[error(transparent)] - Task(#[from] anyhow::Error), + #[error("One or more tasks failed: {0:?}")] + Task(Vec), } diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index a8f9ebb1247a..9d0c0ba540c2 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, time::Duration}; use anyhow::Context; +use error::TaskError; use futures::FutureExt; use runnables::NamedBoxFuture; use tokio::{runtime::Runtime, sync::watch}; @@ -163,48 +164,41 @@ impl ZkStackService { let join_handles: Vec<_> = long_running_tasks .into_iter() .map(|task| { - let name = task.name().to_string(); + let name = task.id(); NamedBoxFuture::new(rt_handle.spawn(task.into_inner()).fuse().boxed(), name) }) .collect(); // Collect names for remaining tasks for reporting purposes. - let tasks_names = join_handles - .iter() - .map(|task| task.name().to_string()) - .collect::>(); + let mut tasks_names: Vec<_> = join_handles.iter().map(|task| task.id()).collect(); // Run the tasks until one of them exits. let (resolved, resolved_idx, remaining) = self .runtime .block_on(futures::future::select_all(join_handles)); // Extract the result and report it to logs early, before waiting for any other task to shutdown. - let result = match resolved { + // We will also collect the errors from the remaining tasks, hence a vector. + let mut errors = Vec::new(); + let task_name = tasks_names.swap_remove(resolved_idx); + match resolved { Ok(Ok(())) => { - tracing::info!("Task {} finished", tasks_names[resolved_idx]); - Ok(()) + tracing::info!("Task {task_name} finished"); } Ok(Err(err)) => { - tracing::error!("Task {} failed: {err}", tasks_names[resolved_idx]); - Err(err).context("Task failed") + tracing::error!("Task {task_name} failed: {err}"); + errors.push(TaskError::TaskFailed(task_name, err)); } Err(panic_err) => { let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("Task {}: {panic_msg}", tasks_names[resolved_idx]); - Err(anyhow::format_err!( - "Task {} panicked: {panic_msg}", - tasks_names[resolved_idx] - )) + tracing::error!("Task {task_name}: {panic_msg}"); + errors.push(TaskError::TaskPanicked(task_name, panic_msg)); } }; tracing::info!("One of the task has exited, shutting down the node"); // Collect names for remaining tasks for reporting purposes. // We have to re-collect, becuase `select_all` does not guarantes the order of returned remaining futures. - let remaining_tasks_names = remaining - .iter() - .map(|task| task.name().to_string()) - .collect::>(); + let remaining_tasks_names: Vec<_> = remaining.iter().map(|task| task.id()).collect(); let remaining_tasks_with_timeout: Vec<_> = remaining .into_iter() .map(|task| async { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, task).await }) @@ -227,13 +221,16 @@ impl ZkStackService { } Ok(Ok(Err(err))) => { tracing::error!("Task {name} failed: {err}"); + errors.push(TaskError::TaskFailed(name, err)); } Ok(Err(err)) => { let panic_msg = try_extract_panic_message(err); tracing::error!("Task {name} panicked: {panic_msg}"); + errors.push(TaskError::TaskPanicked(name, panic_msg)); } Err(_) => { tracing::error!("Task {name} timed out"); + errors.push(TaskError::TaskShutdownTimedOut(name)); } } } @@ -250,17 +247,21 @@ impl ZkStackService { } Ok(Err(err)) => { tracing::error!("Shutdown hook {name} failed: {err}"); - // We still have to invoke all the remaining hooks, so we don't return early. + errors.push(TaskError::ShutdownHookFailed(name, err)); } Err(_) => { tracing::error!("Shutdown hook {name} timed out"); + errors.push(TaskError::ShutdownHookTimedOut(name)); } } } tracing::info!("Exiting the service"); - result?; - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(ZkStackServiceError::Task(errors)) + } } } @@ -275,7 +276,7 @@ fn oneshot_runner_task( // This way we can handle the cases when such a task panics and propagate the message // to the service. let handle = tokio::runtime::Handle::current(); - let name = fut.name().to_string(); + let name = fut.id().to_string(); match handle.spawn(fut).await { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err).with_context(|| format!("Oneshot task {name} failed")), @@ -306,5 +307,5 @@ fn oneshot_runner_task( // will still resolve once the stop signal is received. }; - NamedBoxFuture::new(future.boxed(), "Oneshot runner".to_string()) + NamedBoxFuture::new(future.boxed(), "oneshot_runner".into()) } diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index 0f34bfec944e..b988548b810d 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -1,13 +1,15 @@ use pin_project_lite::pin_project; use std::{future::Future, pin::Pin, task}; +use crate::task::TaskId; + pin_project! { /// Implements a future with the name tag attached. #[derive(Debug)] pub struct NamedFuture { #[pin] inner: F, - name: String, + name: TaskId, } } @@ -16,12 +18,12 @@ where F: Future, { /// Creates a new future with the name tag attached. - pub fn new(inner: F, name: String) -> Self { + pub fn new(inner: F, name: TaskId) -> Self { Self { inner, name } } - pub fn name(&self) -> &str { - &self.name + pub fn id(&self) -> TaskId { + self.name.clone() } pub fn into_inner(self) -> F { diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 0b9903733c91..c649a8983791 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -171,7 +171,7 @@ impl Runnables { let name = task.id(); let stop_receiver = stop_receiver.clone(); let task_future = Box::pin(task.run_unconstrained(stop_receiver)); - tasks.push(NamedFuture::new(task_future, name.to_string())); + tasks.push(NamedFuture::new(task_future, name)); } } @@ -186,7 +186,7 @@ impl Runnables { let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); let task_future = Box::pin(task.run_with_barrier(stop_receiver, task_barrier)); - tasks.push(NamedFuture::new(task_future, name.to_string())); + tasks.push(NamedFuture::new(task_future, name)); } } @@ -202,7 +202,7 @@ impl Runnables { let task_barrier = task_barrier.clone(); let task_future = Box::pin(precondition.check_with_barrier(stop_receiver, task_barrier)); - oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } @@ -218,7 +218,7 @@ impl Runnables { let task_barrier = task_barrier.clone(); let task_future = Box::pin(oneshot_task.run_oneshot_with_barrier(stop_receiver, task_barrier)); - oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } @@ -232,7 +232,7 @@ impl Runnables { let stop_receiver = stop_receiver.clone(); let task_future = Box::pin(unconstrained_oneshot_task.run_unconstrained_oneshot(stop_receiver)); - oneshot_tasks.push(NamedFuture::new(task_future, name.to_string())); + oneshot_tasks.push(NamedFuture::new(task_future, name)); } } } From 74fd1207552d52562429dfc75c8f923dbeebdf1d Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 14:09:04 +0400 Subject: [PATCH 08/12] Fix the logic of the API garbage collector task --- .../src/implementations/layers/web3_api/server.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index da0d9d3cc33a..428e5c88503d 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -250,7 +250,10 @@ impl Task for ApiTaskGarbageCollector { // We can ignore the stop signal here, since we're tied to the main API task through the channel: // it'll either get dropped if API cannot be built or will send something through the channel. // The tasks it sends are aware of the stop receiver themselves. - let tasks = self.task_receiver.await?; + let Ok(tasks) = self.task_receiver.await else { + // API cannot be built, so there are no tasks to wait for. + return Ok(()); + }; let _ = futures::future::join_all(tasks).await; Ok(()) } From 997964ccf1c7ade7b2d49a9f114efdb4a6b509e6 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 14:34:24 +0400 Subject: [PATCH 09/12] Make queued job processor respect timeouts more --- core/lib/queued_job_processor/src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 569a2b7f59da..702fd557187b 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::Context as _; pub use async_trait::async_trait; -use tokio::{sync::watch, task::JoinHandle, time::sleep}; +use tokio::{sync::watch, task::JoinHandle}; use vise::{Buckets, Counter, Histogram, LabeledFamily, Metrics}; use zksync_utils::panic_extractor::try_extract_panic_message; @@ -57,7 +57,7 @@ pub trait JobProcessor: Sync + Send { /// To process a batch, pass `Some(batch_size)`. async fn run( self, - stop_receiver: watch::Receiver, + mut stop_receiver: watch::Receiver, mut iterations_left: Option, ) -> anyhow::Result<()> where @@ -86,7 +86,7 @@ pub trait JobProcessor: Sync + Send { ); let task = self.process_job(&job_id, job, started_at).await; - self.wait_for_task(job_id, started_at, task) + self.wait_for_task(job_id, started_at, task, &mut stop_receiver) .await .context("wait_for_task")?; } else if iterations_left.is_some() { @@ -94,7 +94,10 @@ pub trait JobProcessor: Sync + Send { return Ok(()); } else { tracing::trace!("Backing off for {} ms", backoff); - sleep(Duration::from_millis(backoff)).await; + // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. + tokio::time::timeout(Duration::from_millis(backoff), stop_receiver.changed()) + .await + .ok(); backoff = (backoff * Self::BACKOFF_MULTIPLIER).min(Self::MAX_BACKOFF_MS); } } @@ -108,6 +111,7 @@ pub trait JobProcessor: Sync + Send { job_id: Self::JobId, started_at: Instant, task: JoinHandle>, + stop_receiver: &mut watch::Receiver, ) -> anyhow::Result<()> { let attempts = self.get_job_attempts(&job_id).await?; let max_attempts = self.max_attempts(); @@ -130,7 +134,17 @@ pub trait JobProcessor: Sync + Send { if task.is_finished() { break task.await; } - sleep(Duration::from_millis(Self::POLLING_INTERVAL_MS)).await; + if tokio::time::timeout( + Duration::from_millis(Self::POLLING_INTERVAL_MS), + stop_receiver.changed(), + ) + .await + .is_err() + { + // Stop signal received, return early. + // Exit will be processed/reported by the main loop. + return Ok(()); + } }; let error_message = match result { Ok(Ok(data)) => { From bf640fe3291a1b329a5818bd2e7d1b4b404ae78b Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 14:35:45 +0400 Subject: [PATCH 10/12] zk fmt --- core/node/node_framework/src/service/named_future.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index b988548b810d..8c8afdd20e53 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -1,6 +1,7 @@ -use pin_project_lite::pin_project; use std::{future::Future, pin::Pin, task}; +use pin_project_lite::pin_project; + use crate::task::TaskId; pin_project! { From af32a3475a5337d375a3b6f34c8b99575927b864 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 14:42:29 +0400 Subject: [PATCH 11/12] Change hook type from a function to a future --- .../layers/metadata_calculator.rs | 12 ++--- .../layers/state_keeper/mod.rs | 12 ++--- .../node_framework/src/service/context.rs | 16 ++++--- core/node/node_framework/src/service/mod.rs | 2 +- .../src/service/named_future.rs | 11 ++++- .../node_framework/src/service/runnables.rs | 47 ++----------------- 6 files changed, 32 insertions(+), 68 deletions(-) diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 5b5decfe9061..9fe954c91e4f 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -120,13 +120,11 @@ impl WiringLayer for MetadataCalculatorLayer { context.add_task(Box::new(metadata_calculator)); - context.add_shutdown_hook("rocksdb_terminaton", || { - Box::pin(async { - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .context("failed terminating RocksDB instances") - }) + context.add_shutdown_hook("rocksdb_terminaton", async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") }); Ok(()) diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index 7a77c219bb49..46e56eca0e65 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -92,13 +92,11 @@ impl WiringLayer for StateKeeperLayer { storage_factory: Arc::new(storage_factory), })); - context.add_shutdown_hook("rocksdb_terminaton", || { - Box::pin(async { - // Wait for all the instances of RocksDB to be destroyed. - tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) - .await - .context("failed terminating RocksDB instances") - }) + context.add_shutdown_hook("rocksdb_terminaton", async { + // Wait for all the instances of RocksDB to be destroyed. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .context("failed terminating RocksDB instances") }); Ok(()) } diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 6db7101822b7..9507c2287752 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -1,10 +1,11 @@ -use std::any::type_name; +use std::{any::type_name, future::Future}; + +use futures::FutureExt as _; -use super::runnables::{ShutdownHook, ShutdownHookFn}; use crate::{ precondition::Precondition, resource::{Resource, ResourceId, StoredResource}, - service::ZkStackService, + service::{named_future::NamedFuture, ZkStackService}, task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, wiring_layer::WiringError, }; @@ -96,14 +97,15 @@ impl<'a> ServiceContext<'a> { self } - /// Adds a function to be invoked after node shutdown. + /// Adds a future to be invoked after node shutdown. /// May be used to perform cleanup tasks. /// - /// All the collected shutdown hooks will be invoked sequentially after all the node tasks are stopped. + /// The future is guaranteed to only be polled after all the node tasks are stopped or timed out. + /// All the futures will be awaited sequentially. pub fn add_shutdown_hook( &mut self, name: &'static str, - hook: impl ShutdownHookFn, + hook: impl Future> + Send + 'static, ) -> &mut Self { tracing::info!( "Layer {} has added a new shutdown hook: {}", @@ -113,7 +115,7 @@ impl<'a> ServiceContext<'a> { self.service .runnables .shutdown_hooks - .push(ShutdownHook::new(name, hook)); + .push(NamedFuture::new(hook.boxed(), name.into())); self } diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 9d0c0ba540c2..4acb18d14e48 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -240,7 +240,7 @@ impl ZkStackService { let name = hook.id().clone(); // Limit each shutdown hook to the same timeout as the tasks. let hook_with_timeout = - async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook.invoke()).await }; + async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook).await }; match self.runtime.block_on(hook_with_timeout) { Ok(Ok(())) => { tracing::info!("Shutdown hook {name} completed"); diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index 8c8afdd20e53..b6fdb905f8d0 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin, task}; +use std::{fmt, future::Future, pin::Pin, task}; use pin_project_lite::pin_project; @@ -6,7 +6,6 @@ use crate::task::TaskId; pin_project! { /// Implements a future with the name tag attached. - #[derive(Debug)] pub struct NamedFuture { #[pin] inner: F, @@ -42,3 +41,11 @@ where self.project().inner.poll(cx) } } + +impl fmt::Debug for NamedFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NamedFuture") + .field("name", &self.name) + .finish_non_exhaustive() + } +} diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index c649a8983791..8d240a8cffab 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -6,53 +6,12 @@ use tokio::sync::Barrier; use super::{named_future::NamedFuture, StopReceiver}; use crate::{ precondition::Precondition, - task::{OneshotTask, Task, TaskId, UnconstrainedOneshotTask, UnconstrainedTask}, + task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, }; /// Alias for futures with the name assigned. pub type NamedBoxFuture = NamedFuture>; -/// Alias for a shutdown hook function type. -pub trait ShutdownHookFn: - FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static -{ -} - -impl ShutdownHookFn for T where - T: FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static -{ -} - -pub struct ShutdownHook { - id: TaskId, - hook: Box, -} - -impl ShutdownHook { - pub fn new(id: impl Into, hook: impl ShutdownHookFn) -> Self { - Self { - id: id.into(), - hook: Box::new(hook), - } - } - - pub fn id(&self) -> &TaskId { - &self.id - } - - pub async fn invoke(self) -> anyhow::Result<()> { - (self.hook)().await - } -} - -impl fmt::Debug for ShutdownHook { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ShutdownHook") - .field("name", &self.id) - .finish() - } -} - /// A collection of different flavors of tasks. #[derive(Default)] pub(super) struct Runnables { @@ -67,7 +26,7 @@ pub(super) struct Runnables { /// Unconstrained oneshot tasks added to the service. pub(super) unconstrained_oneshot_tasks: Vec>, /// List of hooks to be invoked after node shutdown. - pub(super) shutdown_hooks: Vec, + pub(super) shutdown_hooks: Vec>>, } impl fmt::Debug for Runnables { @@ -90,7 +49,7 @@ impl fmt::Debug for Runnables { pub(super) struct TaskReprs { pub(super) long_running_tasks: Vec>>, pub(super) oneshot_tasks: Vec>>, - pub(super) shutdown_hooks: Vec, + pub(super) shutdown_hooks: Vec>>, } impl fmt::Debug for TaskReprs { From 5472ebc4135650258e3a4f699eec7007d674f2bc Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 16:34:39 +0400 Subject: [PATCH 12/12] Review fixes --- core/lib/queued_job_processor/src/lib.rs | 2 +- core/node/node_framework/src/service/mod.rs | 55 +++++++++---------- .../src/service/named_future.rs | 15 ++--- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 702fd557187b..a5a4fa39fcae 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -139,7 +139,7 @@ pub trait JobProcessor: Sync + Send { stop_receiver.changed(), ) .await - .is_err() + .is_ok() { // Stop signal received, return early. // Exit will be processed/reported by the main loop. diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4acb18d14e48..57035a048d86 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -12,6 +12,7 @@ pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receive use crate::{ resource::{ResourceId, StoredResource}, service::runnables::TaskReprs, + task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; @@ -180,20 +181,7 @@ impl ZkStackService { // We will also collect the errors from the remaining tasks, hence a vector. let mut errors = Vec::new(); let task_name = tasks_names.swap_remove(resolved_idx); - match resolved { - Ok(Ok(())) => { - tracing::info!("Task {task_name} finished"); - } - Ok(Err(err)) => { - tracing::error!("Task {task_name} failed: {err}"); - errors.push(TaskError::TaskFailed(task_name, err)); - } - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("Task {task_name}: {panic_msg}"); - errors.push(TaskError::TaskPanicked(task_name, panic_msg)); - } - }; + handle_task_exit(resolved, task_name, &mut errors); tracing::info!("One of the task has exited, shutting down the node"); // Collect names for remaining tasks for reporting purposes. @@ -211,22 +199,10 @@ impl ZkStackService { .block_on(futures::future::join_all(remaining_tasks_with_timeout)); // Report the results of the remaining tasks. - for (name, result) in remaining_tasks_names - .into_iter() - .zip(execution_results.into_iter()) - { + for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) { match result { - Ok(Ok(Ok(()))) => { - tracing::info!("Task {name} finished"); - } - Ok(Ok(Err(err))) => { - tracing::error!("Task {name} failed: {err}"); - errors.push(TaskError::TaskFailed(name, err)); - } - Ok(Err(err)) => { - let panic_msg = try_extract_panic_message(err); - tracing::error!("Task {name} panicked: {panic_msg}"); - errors.push(TaskError::TaskPanicked(name, panic_msg)); + Ok(resolved) => { + handle_task_exit(resolved, name, &mut errors); } Err(_) => { tracing::error!("Task {name} timed out"); @@ -265,6 +241,27 @@ impl ZkStackService { } } +fn handle_task_exit( + task_result: Result, tokio::task::JoinError>, + task_name: TaskId, + errors: &mut Vec, +) { + match task_result { + Ok(Ok(())) => { + tracing::info!("Task {task_name} finished"); + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} failed: {err}"); + errors.push(TaskError::TaskFailed(task_name, err)); + } + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("Task {task_name} panicked: {panic_msg}"); + errors.push(TaskError::TaskPanicked(task_name, panic_msg)); + } + }; +} + fn oneshot_runner_task( oneshot_tasks: Vec>>, mut stop_receiver: StopReceiver, diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index b6fdb905f8d0..9aa715b0a74b 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -13,9 +13,9 @@ pin_project! { } } -impl NamedFuture +impl NamedFuture where - F: Future, + F: Future, { /// Creates a new future with the name tag attached. pub fn new(inner: F, name: TaskId) -> Self { @@ -31,18 +31,19 @@ where } } -impl Future for NamedFuture +impl Future for NamedFuture where - F: Future, + F: Future, { - type Output = T; + type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - self.project().inner.poll(cx) + tracing::info_span!("NamedFuture", name = %self.name) + .in_scope(|| self.project().inner.poll(cx)) } } -impl fmt::Debug for NamedFuture { +impl fmt::Debug for NamedFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("NamedFuture") .field("name", &self.name)