From 5992d791d90cb2cb45f4416e25b8c3a227cdd5ec Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Wed, 26 Jun 2024 09:35:54 +0400 Subject: [PATCH] feat(node_framework): Unify Task types + misc improvements (#2325) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Unifies `Task` types. Now we only have a single `Task` type with different `TaskKind` specifiers. - Refactors `ZkStackService::run` so that it's more readable. - Updates the framework documentation. - Minor improvements here and there. ## Why ❔ - Preparing framework for the previously proposed refactoring (e.g. `FromContext` / `IntoContext` IO flow). - Preparing framework for the publishing. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- core/node/node_framework/examples/showcase.rs | 2 - .../layers/circuit_breaker_checker.rs | 15 +- .../layers/healtcheck_server.rs | 15 +- .../l1_batch_commitment_mode_validation.rs | 13 +- .../layers/postgres_metrics.rs | 15 +- .../layers/prometheus_exporter.rs | 12 +- .../layers/reorg_detector_checker.rs | 13 +- .../layers/reorg_detector_runner.rs | 15 +- .../src/implementations/layers/sigint.rs | 15 +- .../layers/validate_chain_ids.rs | 13 +- .../implementations/resources/sync_state.rs | 2 +- core/node/node_framework/src/lib.rs | 11 +- core/node/node_framework/src/precondition.rs | 41 ---- .../src/resource/lazy_resource.rs | 194 --------------- core/node/node_framework/src/resource/mod.rs | 41 +++- .../src/resource/resource_collection.rs | 172 -------------- .../node_framework/src/resource/unique.rs | 1 + .../node_framework/src/service/context.rs | 91 +++----- core/node/node_framework/src/service/error.rs | 2 + core/node/node_framework/src/service/mod.rs | 206 ++++++++-------- .../src/service/named_future.rs | 23 +- .../node_framework/src/service/runnables.rs | 204 +++++++--------- .../src/service/stop_receiver.rs | 6 - core/node/node_framework/src/task.rs | 221 ------------------ core/node/node_framework/src/task/mod.rs | 138 +++++++++++ core/node/node_framework/src/task/types.rs | 60 +++++ 26 files changed, 530 insertions(+), 1011 deletions(-) delete mode 100644 core/node/node_framework/src/precondition.rs delete mode 100644 core/node/node_framework/src/resource/lazy_resource.rs delete mode 100644 core/node/node_framework/src/resource/resource_collection.rs delete mode 100644 core/node/node_framework/src/task.rs create mode 100644 core/node/node_framework/src/task/mod.rs create mode 100644 core/node/node_framework/src/task/types.rs diff --git a/core/node/node_framework/examples/showcase.rs b/core/node/node_framework/examples/showcase.rs index 98baa5bc9683..67fa819880bc 100644 --- a/core/node/node_framework/examples/showcase.rs +++ b/core/node/node_framework/examples/showcase.rs @@ -63,8 +63,6 @@ struct DatabaseResource(pub Arc); /// /// For the latter requirement, there exists an `Unique` wrapper that can be used to store non-`Clone` /// resources. It's not used in this example, but it's a useful thing to know about. -/// -/// Finally, there are other wrappers for resources as well, like `ResourceCollection` and `LazyResource`. impl Resource for DatabaseResource { fn name() -> String { // The convention for resource names is `/`. In this case, the scope is `common`, but diff --git a/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs b/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs index 808ac7f57774..d7334147bdc2 100644 --- a/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs +++ b/core/node/node_framework/src/implementations/layers/circuit_breaker_checker.rs @@ -4,7 +4,7 @@ use zksync_config::configs::chain::CircuitBreakerConfig; use crate::{ implementations::resources::circuit_breakers::CircuitBreakersResource, service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -44,7 +44,7 @@ impl WiringLayer for CircuitBreakerCheckerLayer { circuit_breaker_checker, }; - node.add_unconstrained_task(Box::new(task)); + node.add_task(Box::new(task)); Ok(()) } } @@ -55,15 +55,16 @@ struct CircuitBreakerCheckerTask { } #[async_trait::async_trait] -impl UnconstrainedTask for CircuitBreakerCheckerTask { +impl Task for CircuitBreakerCheckerTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "circuit_breaker_checker".into() } - async fn run_unconstrained( - mut self: Box, - stop_receiver: StopReceiver, - ) -> anyhow::Result<()> { + async fn run(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { self.circuit_breaker_checker.run(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs index 10f98d8f9e5a..3982044c3f96 100644 --- a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs +++ b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs @@ -7,7 +7,7 @@ use zksync_node_api_server::healthcheck::HealthCheckHandle; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -41,7 +41,7 @@ impl WiringLayer for HealthCheckLayer { app_health_check, }; - node.add_unconstrained_task(Box::new(task)); + node.add_task(Box::new(task)); Ok(()) } } @@ -53,15 +53,16 @@ struct HealthCheckTask { } #[async_trait::async_trait] -impl UnconstrainedTask for HealthCheckTask { +impl Task for HealthCheckTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "healthcheck_server".into() } - async fn run_unconstrained( - mut self: Box, - mut stop_receiver: StopReceiver, - ) -> anyhow::Result<()> { + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { let handle = HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check.clone()); stop_receiver.0.changed().await?; diff --git a/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs b/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs index b9a83cc06cb6..3bb82dde98b6 100644 --- a/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs +++ b/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs @@ -3,9 +3,8 @@ use zksync_types::{commitment::L1BatchCommitmentMode, Address}; use crate::{ implementations::resources::eth_interface::EthInterfaceResource, - precondition::Precondition, service::{ServiceContext, StopReceiver}, - task::TaskId, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -51,19 +50,23 @@ impl WiringLayer for L1BatchCommitmentModeValidationLayer { query_client, ); - context.add_precondition(Box::new(task)); + context.add_task(Box::new(task)); Ok(()) } } #[async_trait::async_trait] -impl Precondition for L1BatchCommitmentModeValidationTask { +impl Task for L1BatchCommitmentModeValidationTask { + fn kind(&self) -> TaskKind { + TaskKind::Precondition + } + fn id(&self) -> TaskId { "l1_batch_commitment_mode_validation".into() } - async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { (*self).exit_on_success().run(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/layers/postgres_metrics.rs b/core/node/node_framework/src/implementations/layers/postgres_metrics.rs index a0c80d4e9d42..b0690880a4c1 100644 --- a/core/node/node_framework/src/implementations/layers/postgres_metrics.rs +++ b/core/node/node_framework/src/implementations/layers/postgres_metrics.rs @@ -5,7 +5,7 @@ use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core}; use crate::{ implementations::resources::pools::{PoolResource, ReplicaPool}, service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -32,7 +32,7 @@ impl WiringLayer for PostgresMetricsLayer { async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { let replica_pool_resource = context.get_resource::>().await?; let pool_for_metrics = replica_pool_resource.get_singleton().await?; - context.add_unconstrained_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics })); + context.add_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics })); Ok(()) } @@ -44,15 +44,16 @@ struct PostgresMetricsScrapingTask { } #[async_trait::async_trait] -impl UnconstrainedTask for PostgresMetricsScrapingTask { +impl Task for PostgresMetricsScrapingTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "postgres_metrics_scraping".into() } - async fn run_unconstrained( - self: Box, - mut stop_receiver: StopReceiver, - ) -> anyhow::Result<()> { + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { tokio::select! { () = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => { tracing::warn!("Postgres metrics scraping unexpectedly stopped"); diff --git a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs index 0742de55e2db..91b205f38cd8 100644 --- a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs +++ b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs @@ -4,7 +4,7 @@ use zksync_vlog::prometheus::PrometheusExporterConfig; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -46,18 +46,22 @@ impl WiringLayer for PrometheusExporterLayer { prometheus_health_updater, }); - node.add_unconstrained_task(task); + node.add_task(task); Ok(()) } } #[async_trait::async_trait] -impl UnconstrainedTask for PrometheusExporterTask { +impl Task for PrometheusExporterTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "prometheus_exporter".into() } - async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { let prometheus_task = self.config.run(stop_receiver.0); self.prometheus_health_updater .update(HealthStatus::Ready.into()); diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs index 31b93a1b566e..a55c8a5e74ab 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -9,9 +9,8 @@ use crate::{ main_node_client::MainNodeClientResource, pools::{MasterPool, PoolResource}, }, - precondition::Precondition, service::{ServiceContext, StopReceiver}, - task::TaskId, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -45,7 +44,7 @@ impl WiringLayer for ReorgDetectorCheckerLayer { let pool = pool_resource.get().await?; // Create and insert precondition. - context.add_precondition(Box::new(CheckerPrecondition { + context.add_task(Box::new(CheckerPrecondition { pool: pool.clone(), reorg_detector: ReorgDetector::new(main_node_client, pool), })); @@ -60,12 +59,16 @@ pub struct CheckerPrecondition { } #[async_trait::async_trait] -impl Precondition for CheckerPrecondition { +impl Task for CheckerPrecondition { + fn kind(&self) -> TaskKind { + TaskKind::Precondition + } + fn id(&self) -> TaskId { "reorg_detector_checker".into() } - async fn check(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { // Given that this is a precondition -- i.e. something that starts before some invariants are met, // we need to first ensure that there is at least one batch in the database (there may be none if // either genesis or snapshot recovery has not been performed yet). diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs index 2ffc33d3145b..ab0995f10211 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs @@ -11,7 +11,7 @@ use crate::{ reverter::BlockReverterResource, }, service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedOneshotTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -46,7 +46,7 @@ impl WiringLayer for ReorgDetectorRunnerLayer { let reverter = context.get_resource::().await?.0; // Create and insert task. - context.add_unconstrained_oneshot_task(Box::new(RunnerUnconstrainedOneshotTask { + context.add_task(Box::new(RunnerUnconstrainedOneshotTask { reorg_detector: ReorgDetector::new(main_node_client, pool), reverter, })); @@ -61,15 +61,16 @@ pub struct RunnerUnconstrainedOneshotTask { } #[async_trait::async_trait] -impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask { +impl Task for RunnerUnconstrainedOneshotTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedOneshotTask + } + fn id(&self) -> TaskId { "reorg_detector_runner".into() } - async fn run_unconstrained_oneshot( - mut self: Box, - stop_receiver: StopReceiver, - ) -> anyhow::Result<()> { + async fn run(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { match self.reorg_detector.run_once(stop_receiver.0.clone()).await { Ok(()) => {} Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { diff --git a/core/node/node_framework/src/implementations/layers/sigint.rs b/core/node/node_framework/src/implementations/layers/sigint.rs index c3200139aba9..5c1fab73fa16 100644 --- a/core/node/node_framework/src/implementations/layers/sigint.rs +++ b/core/node/node_framework/src/implementations/layers/sigint.rs @@ -2,7 +2,7 @@ use tokio::sync::oneshot; use crate::{ service::{ServiceContext, StopReceiver}, - task::{TaskId, UnconstrainedTask}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -23,7 +23,7 @@ impl WiringLayer for SigintHandlerLayer { async fn wire(self: Box, mut node: ServiceContext<'_>) -> Result<(), WiringError> { // SIGINT may happen at any time, so we must handle it as soon as it happens. - node.add_unconstrained_task(Box::new(SigintHandlerTask)); + node.add_task(Box::new(SigintHandlerTask)); Ok(()) } } @@ -32,15 +32,16 @@ impl WiringLayer for SigintHandlerLayer { struct SigintHandlerTask; #[async_trait::async_trait] -impl UnconstrainedTask for SigintHandlerTask { +impl Task for SigintHandlerTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "sigint_handler".into() } - async fn run_unconstrained( - self: Box, - mut stop_receiver: StopReceiver, - ) -> anyhow::Result<()> { + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { let (sigint_sender, sigint_receiver) = oneshot::channel(); let mut sigint_sender = Some(sigint_sender); // Has to be done this way since `set_handler` requires `FnMut`. ctrlc::set_handler(move || { diff --git a/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs b/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs index a9f5a61c65f1..5d3a9b9e82f5 100644 --- a/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs +++ b/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs @@ -5,9 +5,8 @@ use crate::{ implementations::resources::{ eth_interface::EthInterfaceResource, main_node_client::MainNodeClientResource, }, - precondition::Precondition, service::{ServiceContext, StopReceiver}, - task::TaskId, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, }; @@ -54,19 +53,23 @@ impl WiringLayer for ValidateChainIdsLayer { main_node_client, ); - context.add_precondition(Box::new(task)); + context.add_task(Box::new(task)); Ok(()) } } #[async_trait::async_trait] -impl Precondition for ValidateChainIdsTask { +impl Task for ValidateChainIdsTask { + fn kind(&self) -> TaskKind { + TaskKind::Precondition + } + fn id(&self) -> TaskId { "validate_chain_ids".into() } - async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { (*self).run_once(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/resources/sync_state.rs b/core/node/node_framework/src/implementations/resources/sync_state.rs index 25df1d94d99f..a65342dd38d6 100644 --- a/core/node/node_framework/src/implementations/resources/sync_state.rs +++ b/core/node/node_framework/src/implementations/resources/sync_state.rs @@ -8,6 +8,6 @@ pub struct SyncStateResource(pub SyncState); impl Resource for SyncStateResource { fn name() -> String { - "sync_state".into() + "common/sync_state".into() } } diff --git a/core/node/node_framework/src/lib.rs b/core/node/node_framework/src/lib.rs index 4f688ab56adb..da788609b57c 100644 --- a/core/node/node_framework/src/lib.rs +++ b/core/node/node_framework/src/lib.rs @@ -1,25 +1,16 @@ //! # ZK Stack node initialization framework. //! -//! ## Introduction -//! //! This crate provides core abstractions that allow one to compose a ZK Stack node. //! Main concepts used in this crate are: //! - [`WiringLayer`](wiring_layer::WiringLayer) - builder interface for tasks. //! - [`Task`](task::Task) - a unit of work that can be executed by the node. //! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are //! represented by generic interfaces and also serve as points of customization for tasks. -//! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node. //! - [`ZkStackService`](service::ZkStackService) - a container for tasks and resources that takes care of initialization, running //! and shutting down. -//! -//! The general flow to compose a node is as follows: -//! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs. -//! - Create a [`ZkStackService`](node::ZkStackService) with that [`ResourceProvider`](resource::ResourceProvider). -//! - Add tasks to the node. -//! - Run it. +//! - [`ZkStackServiceBuilder`](service::ZkStackServiceBuilder) - a builder for the service. pub mod implementations; -pub mod precondition; pub mod resource; pub mod service; pub mod task; diff --git a/core/node/node_framework/src/precondition.rs b/core/node/node_framework/src/precondition.rs deleted file mode 100644 index d81e0328bb62..000000000000 --- a/core/node/node_framework/src/precondition.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::{fmt, sync::Arc}; - -use tokio::sync::Barrier; - -use crate::{service::StopReceiver, task::TaskId}; - -#[async_trait::async_trait] -pub trait Precondition: 'static + Send + Sync { - /// Unique name of the precondition. - fn id(&self) -> TaskId; - - async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; -} - -impl dyn Precondition { - /// An internal helper method that runs a precondition check and lifts the barrier as soon - /// as the check is finished. - pub(super) async fn check_with_barrier( - self: Box, - mut stop_receiver: StopReceiver, - preconditions_barrier: Arc, - ) -> anyhow::Result<()> { - self.check(stop_receiver.clone()).await?; - tokio::select! { - _ = preconditions_barrier.wait() => { - Ok(()) - } - _ = stop_receiver.0.changed() => { - Ok(()) - } - } - } -} - -impl fmt::Debug for dyn Precondition { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Precondition") - .field("name", &self.id()) - .finish() - } -} diff --git a/core/node/node_framework/src/resource/lazy_resource.rs b/core/node/node_framework/src/resource/lazy_resource.rs deleted file mode 100644 index 3f70187627b8..000000000000 --- a/core/node/node_framework/src/resource/lazy_resource.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::sync::Arc; - -use thiserror::Error; -use tokio::sync::watch; - -use super::Resource; -use crate::service::StopReceiver; - -/// A lazy resource represents a resource that isn't available at the time when the tasks start. -/// -/// Normally it's used to represent the resources that should be provided by one task to another one. -/// Lazy resources are aware of the node lifecycle, so attempt to resolve the resource won't hang -/// if the resource is never provided: the resolve future will fail once the stop signal is sent by the node. -#[derive(Debug)] -pub struct LazyResource { - resolve_sender: Arc>>, - stop_receiver: StopReceiver, -} - -impl Resource for LazyResource { - fn name() -> String { - format!("lazy {}", T::name()) - } -} - -impl Clone for LazyResource { - fn clone(&self) -> Self { - Self { - resolve_sender: self.resolve_sender.clone(), - stop_receiver: self.stop_receiver.clone(), - } - } -} - -impl LazyResource { - /// Creates a new lazy resource. - /// Provided stop receiver will be used to prevent resolving from hanging if the resource is never provided. - pub fn new(stop_receiver: StopReceiver) -> Self { - let (resolve_sender, _resolve_receiver) = watch::channel(None); - - Self { - resolve_sender: Arc::new(resolve_sender), - stop_receiver, - } - } - - /// Returns a future that resolves to the resource once it is provided. - /// If the resource is never provided, the method will return an error once the node is shutting down. - pub async fn resolve(mut self) -> Result { - let mut resolve_receiver = self.resolve_sender.subscribe(); - if let Some(resource) = resolve_receiver.borrow().as_ref() { - return Ok(resource.clone()); - } - - let result = tokio::select! { - _ = self.stop_receiver.0.changed() => { - Err(LazyResourceError::NodeShutdown) - } - _ = resolve_receiver.changed() => { - // ^ we can ignore the error on `changed`, since we hold a strong reference to the sender. - let resource = resolve_receiver.borrow().as_ref().expect("Can only change if provided").clone(); - Ok(resource) - } - }; - - if result.is_ok() { - tracing::info!("Lazy resource {} has been resolved", T::name()); - } - - result - } - - /// Provides the resource. - /// May be called at most once. Subsequent calls will return an error. - pub async fn provide(&mut self, resource: T) -> Result<(), LazyResourceError> { - let sent = self.resolve_sender.send_if_modified(|current| { - if current.is_some() { - return false; - } - *current = Some(resource.clone()); - true - }); - - if !sent { - return Err(LazyResourceError::ResourceAlreadyProvided); - } - - tracing::info!("Lazy resource {} has been provided", T::name()); - - Ok(()) - } -} - -#[derive(Debug, Error, PartialEq)] -pub enum LazyResourceError { - #[error("Node is shutting down")] - NodeShutdown, - #[error("Resource is already provided")] - ResourceAlreadyProvided, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Debug, Clone, PartialEq)] - struct TestResource(Arc); - - impl Resource for TestResource { - fn name() -> String { - "test_resource".into() - } - } - - struct TestContext { - test_resource: TestResource, - lazy_resource: LazyResource, - stop_sender: watch::Sender, - } - - impl TestContext { - fn new() -> Self { - let (stop_sender, stop_receiver) = watch::channel(false); - Self { - test_resource: TestResource(Arc::new(1)), - lazy_resource: LazyResource::::new(StopReceiver(stop_receiver)), - stop_sender, - } - } - } - - #[tokio::test] - async fn test_already_provided_resource_case() { - let TestContext { - test_resource, - lazy_resource, - stop_sender: _, - } = TestContext::new(); - - lazy_resource - .clone() - .provide(test_resource.clone()) - .await - .unwrap(); - - assert_eq!( - lazy_resource.clone().provide(test_resource.clone()).await, - Err(LazyResourceError::ResourceAlreadyProvided), - "Incorrect result for providing same resource twice" - ); - } - - #[tokio::test] - async fn test_successful_resolve_case() { - let TestContext { - test_resource, - lazy_resource, - stop_sender: _, - } = TestContext::new(); - - lazy_resource - .clone() - .provide(test_resource.clone()) - .await - .unwrap(); - - assert_eq!( - lazy_resource.clone().resolve().await, - Ok(test_resource.clone()), - "Incorrect result for resolving the resource before node shutdown" - ); - } - - #[tokio::test] - async fn test_node_shutdown_case() { - let TestContext { - test_resource: _, - lazy_resource, - stop_sender, - } = TestContext::new(); - - let resolve_task = tokio::spawn(async move { lazy_resource.resolve().await }); - - stop_sender.send(true).unwrap(); - - let result = resolve_task.await.unwrap(); - - assert_eq!( - result, - Err(LazyResourceError::NodeShutdown), - "Incorrect result for resolving the resource after the node shutdown" - ); - } -} diff --git a/core/node/node_framework/src/resource/mod.rs b/core/node/node_framework/src/resource/mod.rs index cf000acf8bb0..2e62d8421f89 100644 --- a/core/node/node_framework/src/resource/mod.rs +++ b/core/node/node_framework/src/resource/mod.rs @@ -1,12 +1,7 @@ use std::{any::TypeId, fmt}; -pub use self::{ - lazy_resource::LazyResource, resource_collection::ResourceCollection, resource_id::ResourceId, - unique::Unique, -}; +pub use self::{resource_id::ResourceId, unique::Unique}; -mod lazy_resource; -mod resource_collection; mod resource_id; mod unique; @@ -14,9 +9,39 @@ mod unique; /// Typically, the type that implements this trait also should implement `Clone` /// since the same resource may be requested by several tasks and thus it would be an additional /// bound on most methods that work with [`Resource`]. +/// +/// # Example +/// +/// ``` +/// # use zksync_node_framework::resource::Resource; +/// # use std::sync::Arc; +/// +/// /// An abstract interface you want to share. +/// /// Normally you want the interface to be thread-safe. +/// trait MyInterface: 'static + Send + Sync { +/// fn do_something(&self); +/// } +/// +/// /// Resource wrapper. +/// #[derive(Clone)] +/// struct MyResource(Arc); +/// +/// impl Resource for MyResource { +/// fn name() -> String { +/// // It is a helpful practice to follow a structured naming pattern for resource names. +/// // For example, you can use a certain prefix for all resources related to a some component, e.g. `api`. +/// "common/my_resource".to_string() +/// } +/// } +/// ``` pub trait Resource: 'static + Send + Sync + std::any::Any { + /// Invoked after the wiring phase of the service is done. + /// Can be used to perform additional resource preparation, knowing that the resource + /// is guaranteed to be requested by all the tasks that need it. fn on_resource_wired(&mut self) {} + /// Returns the name of the resource. + /// Used for logging purposes. fn name() -> String; } @@ -26,10 +51,10 @@ pub trait Resource: 'static + Send + Sync + std::any::Any { /// This trait is implemented for any type that implements [`Resource`], so there is no need to /// implement it manually. pub(crate) trait StoredResource: 'static + std::any::Any + Send + Sync { - /// An object-safe version of [`Resource::resource_id`]. + /// An object-safe version of [`Resource::name`]. fn stored_resource_id(&self) -> ResourceId; - /// An object-safe version of [`Resource::on_resoure_wired`]. + /// An object-safe version of [`Resource::on_resource_wired`]. fn stored_resource_wired(&mut self); } diff --git a/core/node/node_framework/src/resource/resource_collection.rs b/core/node/node_framework/src/resource/resource_collection.rs deleted file mode 100644 index 7f867f236d95..000000000000 --- a/core/node/node_framework/src/resource/resource_collection.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::{ - fmt, - sync::{Arc, Mutex}, -}; - -use thiserror::Error; -use tokio::sync::watch; - -use super::Resource; - -/// Collection of resources that can be extended during the initialization phase, and then resolved once -/// the wiring is complete. -/// -/// During component initialization, resource collections can be requested by the components in order to push new -/// elements there. Once the initialization is complete, it is no longer possible to push new elements, and the -/// collection can be resolved into a vector of resources. -/// -/// Collections implement `Clone`, so they can be consumed by several tasks. Every task that resolves the collection -/// is guaranteed to have the same set of resources. -/// -/// The purpose of this container is to allow different tasks to register their resource in a single place for some -/// other task to consume. For example, tasks may register their healthchecks, and then healthcheck task will observe -/// all the provided healthchecks. -pub struct ResourceCollection { - /// Collection of the resources. - resources: Arc>>, - /// Sender indicating that the wiring is complete. - wiring_complete_sender: Arc>, - /// Flag indicating that the collection has been resolved. - wired: watch::Receiver, -} - -impl Resource for ResourceCollection { - fn on_resource_wired(&mut self) { - self.wiring_complete_sender.send(true).ok(); - } - - fn name() -> String { - format!("collection of {}", T::name()) - } -} - -impl Default for ResourceCollection { - fn default() -> Self { - Self::new() - } -} - -impl Clone for ResourceCollection { - fn clone(&self) -> Self { - Self { - resources: self.resources.clone(), - wiring_complete_sender: self.wiring_complete_sender.clone(), - wired: self.wired.clone(), - } - } -} - -impl fmt::Debug for ResourceCollection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ResourceCollection") - .field("resources", &"{..}") - .finish_non_exhaustive() - } -} - -#[derive(Debug, Error)] -pub enum ResourceCollectionError { - #[error("Adding resources to the collection is not allowed after the wiring is complete")] - AlreadyWired, -} - -impl ResourceCollection { - pub(crate) fn new() -> Self { - let (wiring_complete_sender, wired) = watch::channel(false); - Self { - resources: Arc::default(), - wiring_complete_sender: Arc::new(wiring_complete_sender), - wired, - } - } - - /// Adds a new element to the resource collection. - /// Returns an error if the wiring is already complete. - pub fn push(&self, resource: T) -> Result<(), ResourceCollectionError> { - // This check is sufficient, since no task is guaranteed to be running when the value changes. - if *self.wired.borrow() { - return Err(ResourceCollectionError::AlreadyWired); - } - - let mut handle = self.resources.lock().unwrap(); - handle.push(resource); - tracing::info!( - "A new item has been added to the resource collection {}", - Self::name() - ); - Ok(()) - } - - /// Waits until the wiring is complete, and resolves the collection into a vector of resources. - pub async fn resolve(mut self) -> Vec { - // Guaranteed not to hang on server shutdown, since the node will invoke the `on_wiring_complete` before any task - // is actually spawned (per framework rules). For most cases, this check will resolve immediately, unless - // some tasks would spawn something from the `IntoZkSyncTask` impl. - self.wired.changed().await.expect("Sender can't be dropped"); - - tracing::info!("Resource collection {} has been resolved", Self::name()); - - let handle = self.resources.lock().unwrap(); - (*handle).clone() - } -} - -#[cfg(test)] -mod tests { - use assert_matches::assert_matches; - use futures::FutureExt; - - use super::*; - - #[derive(Debug, Clone, PartialEq)] - struct TestResource(Arc); - - impl Resource for TestResource { - fn name() -> String { - "test_resource".into() - } - } - - #[test] - fn test_push() { - let collection = ResourceCollection::::new(); - let resource1 = TestResource(Arc::new(1)); - collection.clone().push(resource1.clone()).unwrap(); - - let resource2 = TestResource(Arc::new(2)); - collection.clone().push(resource2.clone()).unwrap(); - - assert_eq!( - *collection.resources.lock().unwrap(), - vec![resource1, resource2] - ); - } - - #[test] - fn test_already_wired() { - let mut collection = ResourceCollection::::new(); - let resource = TestResource(Arc::new(1)); - - let rc_clone = collection.clone(); - - collection.on_resource_wired(); - - assert_matches!( - rc_clone.push(resource), - Err(ResourceCollectionError::AlreadyWired) - ); - } - - #[test] - fn test_resolve() { - let mut collection = ResourceCollection::::new(); - let result = collection.clone().resolve().now_or_never(); - - assert!(result.is_none()); - - collection.on_resource_wired(); - - let resolved = collection.resolve().now_or_never(); - assert_eq!(resolved.unwrap(), vec![]); - } -} diff --git a/core/node/node_framework/src/resource/unique.rs b/core/node/node_framework/src/resource/unique.rs index 9a256d8f55f3..5c9bdcfe0e12 100644 --- a/core/node/node_framework/src/resource/unique.rs +++ b/core/node/node_framework/src/resource/unique.rs @@ -29,6 +29,7 @@ impl Unique { } /// Takes the resource from the container. + /// Will return `None` if the resource was already taken. pub fn take(&self) -> Option { let result = self.inner.lock().unwrap().take(); diff --git a/core/node/node_framework/src/service/context.rs b/core/node/node_framework/src/service/context.rs index 9507c2287752..d4bb4db95464 100644 --- a/core/node/node_framework/src/service/context.rs +++ b/core/node/node_framework/src/service/context.rs @@ -3,15 +3,16 @@ use std::{any::type_name, future::Future}; use futures::FutureExt as _; use crate::{ - precondition::Precondition, resource::{Resource, ResourceId, StoredResource}, service::{named_future::NamedFuture, ZkStackService}, - task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, + task::Task, wiring_layer::WiringError, }; -/// An interface to the service's resources provided to the tasks during initialization. -/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle. +/// An interface to the service provided to the tasks during initialization. +/// This the main point of interaction between with the service. +/// +/// The context provides access to the runtime, resources, and allows adding new tasks. #[derive(Debug)] pub struct ServiceContext<'a> { layer: &'a str, @@ -19,16 +20,26 @@ pub struct ServiceContext<'a> { } impl<'a> ServiceContext<'a> { + /// Instantiates a new context. + /// The context keeps information about the layer that created it for reporting purposes. pub(super) fn new(layer: &'a str, service: &'a mut ZkStackService) -> Self { Self { layer, service } } /// Provides access to the runtime used by the service. + /// /// Can be used to spawn additional tasks within the same runtime. /// If some tasks stores the handle to spawn additional tasks, it is expected to do all the required /// cleanup. /// - /// In most cases, however, it is recommended to use [`add_task`] method instead. + /// In most cases, however, it is recommended to use [`add_task`](ServiceContext::add_task) or its alternative + /// instead. + /// + /// ## Note + /// + /// While `tokio::spawn` and `tokio::spawn_blocking` will work as well, using the runtime handle + /// from the context is still a recommended way to get access to runtime, as it tracks the access + /// to the runtimes by layers. pub fn runtime_handle(&self) -> &tokio::runtime::Handle { tracing::info!( "Layer {} has requested access to the Tokio runtime", @@ -38,6 +49,7 @@ impl<'a> ServiceContext<'a> { } /// Adds a task to the service. + /// /// Added tasks will be launched after the wiring process will be finished and all the preconditions /// are met. pub fn add_task(&mut self, task: Box) -> &mut Self { @@ -46,57 +58,6 @@ impl<'a> ServiceContext<'a> { self } - /// Adds an unconstrained task to the service. - /// Unconstrained tasks will be launched immediately after the wiring process is finished. - pub fn add_unconstrained_task(&mut self, task: Box) -> &mut Self { - tracing::info!( - "Layer {} has added a new unconstrained task: {}", - self.layer, - task.id() - ); - self.service.runnables.unconstrained_tasks.push(task); - self - } - - /// Adds a precondition to the service. - pub fn add_precondition(&mut self, precondition: Box) -> &mut Self { - tracing::info!( - "Layer {} has added a new precondition: {}", - self.layer, - precondition.id() - ); - self.service.runnables.preconditions.push(precondition); - self - } - - /// Adds an oneshot task to the service. - pub fn add_oneshot_task(&mut self, task: Box) -> &mut Self { - tracing::info!( - "Layer {} has added a new oneshot task: {}", - self.layer, - task.id() - ); - self.service.runnables.oneshot_tasks.push(task); - self - } - - /// Adds an unconstrained oneshot task to the service. - pub fn add_unconstrained_oneshot_task( - &mut self, - task: Box, - ) -> &mut Self { - tracing::info!( - "Layer {} has added a new unconstrained oneshot task: {}", - self.layer, - task.id() - ); - self.service - .runnables - .unconstrained_oneshot_tasks - .push(task); - self - } - /// Adds a future to be invoked after node shutdown. /// May be used to perform cleanup tasks. /// @@ -119,14 +80,15 @@ impl<'a> ServiceContext<'a> { self } - /// Attempts to retrieve the resource with the specified name. - /// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting - /// on behalf of the caller. + /// Attempts to retrieve the resource of the specified type. /// /// ## Panics /// - /// Panics if the resource with the specified name exists, but is not of the requested type. + /// Panics if the resource with the specified [`ResourceId`] exists, but is not of the requested type. pub async fn get_resource(&mut self) -> Result { + // Implementation details: + // Internally the resources are stored as [`std::any::Any`], and this method does the downcasting + // on behalf of the caller. #[allow(clippy::borrowed_box)] let downcast_clone = |resource: &Box| { resource @@ -167,7 +129,7 @@ impl<'a> ServiceContext<'a> { }) } - /// Attempts to retrieve the resource with the specified name. + /// Attempts to retrieve the resource of the specified type. /// If the resource is not available, it is created using the provided closure. pub async fn get_resource_or_insert_with T>( &mut self, @@ -190,18 +152,19 @@ impl<'a> ServiceContext<'a> { resource } - /// Attempts to retrieve the resource with the specified name. + /// Attempts to retrieve the resource of the specified type. /// If the resource is not available, it is created using `T::default()`. pub async fn get_resource_or_default(&mut self) -> T { self.get_resource_or_insert_with(T::default).await } /// Adds a resource to the service. - /// If the resource with the same name is already provided, the method will return an error. + /// + /// If the resource with the same type is already provided, the method will return an error. pub fn insert_resource(&mut self, resource: T) -> Result<(), WiringError> { let id = ResourceId::of::(); if self.service.resources.contains_key(&id) { - tracing::warn!( + tracing::info!( "Layer {} has attempted to provide resource {} of type {}, but it is already available", self.layer, T::name(), diff --git a/core/node/node_framework/src/service/error.rs b/core/node/node_framework/src/service/error.rs index 9e95b437419b..890cc6b7d4b6 100644 --- a/core/node/node_framework/src/service/error.rs +++ b/core/node/node_framework/src/service/error.rs @@ -1,5 +1,6 @@ use crate::{task::TaskId, wiring_layer::WiringError}; +/// An error that can occur during the task lifecycle. #[derive(Debug, thiserror::Error)] pub enum TaskError { #[error("Task {0} failed: {1}")] @@ -14,6 +15,7 @@ pub enum TaskError { ShutdownHookTimedOut(TaskId), } +/// An error that can occur during the service lifecycle. #[derive(Debug, thiserror::Error)] pub enum ZkStackServiceError { #[error("Detected a Tokio Runtime. ZkStackService manages its own runtime and does not support nested runtimes")] diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 57035a048d86..e727a536e9c4 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -1,17 +1,17 @@ use std::{collections::HashMap, time::Duration}; -use anyhow::Context; use error::TaskError; -use futures::FutureExt; -use runnables::NamedBoxFuture; -use tokio::{runtime::Runtime, sync::watch}; +use futures::future::Fuse; +use tokio::{runtime::Runtime, sync::watch, task::JoinHandle}; use zksync_utils::panic_extractor::try_extract_panic_message; -use self::runnables::Runnables; pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receiver::StopReceiver}; use crate::{ resource::{ResourceId, StoredResource}, - service::runnables::TaskReprs, + service::{ + named_future::NamedFuture, + runnables::{NamedBoxFuture, Runnables, TaskReprs}, + }, task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; @@ -40,6 +40,7 @@ impl ZkStackServiceBuilder { } /// Adds a wiring layer. + /// /// During the [`run`](ZkStackService::run) call the service will invoke /// `wire` method of every layer in the order they were added. /// @@ -58,6 +59,10 @@ impl ZkStackServiceBuilder { self } + /// Builds the service. + /// + /// In case of errors during wiring phase, will return the list of all the errors that happened, in the order + /// of their occurrence. pub fn build(&mut self) -> Result { if tokio::runtime::Handle::try_current().is_ok() { return Err(ZkStackServiceError::RuntimeDetected); @@ -75,6 +80,7 @@ impl ZkStackServiceBuilder { runnables: Default::default(), stop_sender, runtime, + errors: Vec::new(), }) } } @@ -94,11 +100,38 @@ pub struct ZkStackService { stop_sender: watch::Sender, /// Tokio runtime used to spawn tasks. runtime: Runtime, + + /// Collector for the task errors met during the service execution. + errors: Vec, } +type TaskFuture = NamedFuture>>>; + impl ZkStackService { /// Runs the system. pub fn run(mut self) -> Result<(), ZkStackServiceError> { + self.wire()?; + + let TaskReprs { + tasks, + shutdown_hooks, + } = self.prepare_tasks(); + + let remaining = self.run_tasks(tasks); + self.shutdown_tasks(remaining); + self.run_shutdown_hooks(shutdown_hooks); + + tracing::info!("Exiting the service"); + if self.errors.is_empty() { + Ok(()) + } else { + Err(ZkStackServiceError::Task(self.errors)) + } + } + + /// Performs wiring of the service. + /// After invoking this method, the collected tasks will be collected in `self.runnables`. + fn wire(&mut self) -> Result<(), ZkStackServiceError> { // Initialize tasks. let wiring_layers = std::mem::take(&mut self.layers); @@ -108,8 +141,7 @@ impl ZkStackService { for layer in wiring_layers { let name = layer.layer_name().to_string(); // We must process wiring layers sequentially and in the same order as they were added. - let task_result = - runtime_handle.block_on(layer.wire(ServiceContext::new(&name, &mut self))); + let task_result = runtime_handle.block_on(layer.wire(ServiceContext::new(&name, self))); if let Err(err) = task_result { // We don't want to bail on the first error, since it'll provide worse DevEx: // People likely want to fix as much problems as they can in one go, rather than have @@ -131,43 +163,37 @@ impl ZkStackService { return Err(ZkStackServiceError::NoTasks); } - let only_oneshot_tasks = self.runnables.is_oneshot_only(); + // Wiring is now complete. + for resource in self.resources.values_mut() { + resource.stored_resource_wired(); + } + self.resources = HashMap::default(); // Decrement reference counters for resources. + tracing::info!("Wiring complete"); + + Ok(()) + } + /// Prepares collected tasks for running. + fn prepare_tasks(&mut self) -> TaskReprs { // Barrier that will only be lifted once all the preconditions are met. // It will be awaited by the tasks before they start running and by the preconditions once they are fulfilled. let task_barrier = self.runnables.task_barrier(); // Collect long-running tasks. let stop_receiver = StopReceiver(self.stop_sender.subscribe()); - let TaskReprs { - mut long_running_tasks, - oneshot_tasks, - shutdown_hooks, - } = self - .runnables - .prepare_tasks(task_barrier.clone(), stop_receiver.clone()); - - // Wiring is now complete. - for resource in self.resources.values_mut() { - resource.stored_resource_wired(); - } - drop(self.resources); // Decrement reference counters for resources. - tracing::info!("Wiring complete"); - - // Create a system task that is cancellation-aware and will only exit on either oneshot task failure or - // stop signal. - let oneshot_runner_system_task = - oneshot_runner_task(oneshot_tasks, stop_receiver, only_oneshot_tasks); - long_running_tasks.push(oneshot_runner_system_task); + self.runnables + .prepare_tasks(task_barrier.clone(), stop_receiver.clone()) + } + /// Spawn the provided tasks and runs them until at least one task exits, and returns the list + /// of remaining tasks. + /// Adds error, if any, to the `errors` vector. + fn run_tasks(&mut self, tasks: Vec>>) -> Vec { // Prepare tasks for running. let rt_handle = self.runtime.handle().clone(); - let join_handles: Vec<_> = long_running_tasks + let join_handles: Vec<_> = tasks .into_iter() - .map(|task| { - let name = task.id(); - NamedBoxFuture::new(rt_handle.spawn(task.into_inner()).fuse().boxed(), name) - }) + .map(|task| task.spawn(&rt_handle).fuse()) .collect(); // Collect names for remaining tasks for reporting purposes. @@ -179,11 +205,18 @@ impl ZkStackService { .block_on(futures::future::select_all(join_handles)); // Extract the result and report it to logs early, before waiting for any other task to shutdown. // We will also collect the errors from the remaining tasks, hence a vector. - let mut errors = Vec::new(); let task_name = tasks_names.swap_remove(resolved_idx); - handle_task_exit(resolved, task_name, &mut errors); + self.handle_task_exit(resolved, task_name); tracing::info!("One of the task has exited, shutting down the node"); + remaining + } + + /// Sends the stop signal and waits for the remaining tasks to finish. + fn shutdown_tasks(&mut self, remaining: Vec) { + // Send stop signal to remaining tasks and wait for them to finish. + self.stop_sender.send(true).ok(); + // Collect names for remaining tasks for reporting purposes. // We have to re-collect, becuase `select_all` does not guarantes the order of returned remaining futures. let remaining_tasks_names: Vec<_> = remaining.iter().map(|task| task.id()).collect(); @@ -192,8 +225,6 @@ impl ZkStackService { .map(|task| async { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, task).await }) .collect(); - // Send stop signal to remaining tasks and wait for them to finish. - self.stop_sender.send(true).ok(); let execution_results = self .runtime .block_on(futures::future::join_all(remaining_tasks_with_timeout)); @@ -202,15 +233,18 @@ impl ZkStackService { for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) { match result { Ok(resolved) => { - handle_task_exit(resolved, name, &mut errors); + self.handle_task_exit(resolved, name); } Err(_) => { tracing::error!("Task {name} timed out"); - errors.push(TaskError::TaskShutdownTimedOut(name)); + self.errors.push(TaskError::TaskShutdownTimedOut(name)); } } } + } + /// Runs the provided shutdown hooks. + fn run_shutdown_hooks(&mut self, shutdown_hooks: Vec>>) { // Run shutdown hooks sequentially. for hook in shutdown_hooks { let name = hook.id().clone(); @@ -223,86 +257,36 @@ impl ZkStackService { } Ok(Err(err)) => { tracing::error!("Shutdown hook {name} failed: {err}"); - errors.push(TaskError::ShutdownHookFailed(name, err)); + self.errors.push(TaskError::ShutdownHookFailed(name, err)); } Err(_) => { tracing::error!("Shutdown hook {name} timed out"); - errors.push(TaskError::ShutdownHookTimedOut(name)); + self.errors.push(TaskError::ShutdownHookTimedOut(name)); } } } - - tracing::info!("Exiting the service"); - if errors.is_empty() { - Ok(()) - } else { - Err(ZkStackServiceError::Task(errors)) - } } -} - -fn handle_task_exit( - task_result: Result, tokio::task::JoinError>, - task_name: TaskId, - errors: &mut Vec, -) { - match task_result { - Ok(Ok(())) => { - tracing::info!("Task {task_name} finished"); - } - Ok(Err(err)) => { - tracing::error!("Task {task_name} failed: {err}"); - errors.push(TaskError::TaskFailed(task_name, err)); - } - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("Task {task_name} panicked: {panic_msg}"); - errors.push(TaskError::TaskPanicked(task_name, panic_msg)); - } - }; -} -fn oneshot_runner_task( - oneshot_tasks: Vec>>, - mut stop_receiver: StopReceiver, - only_oneshot_tasks: bool, -) -> NamedBoxFuture> { - let future = async move { - let oneshot_tasks = oneshot_tasks.into_iter().map(|fut| async move { - // Spawn each oneshot task as a separate tokio task. - // This way we can handle the cases when such a task panics and propagate the message - // to the service. - let handle = tokio::runtime::Handle::current(); - let name = fut.id().to_string(); - match handle.spawn(fut).await { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err).with_context(|| format!("Oneshot task {name} failed")), - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - Err(anyhow::format_err!( - "Oneshot task {name} panicked: {panic_msg}" - )) - } + /// Checks the result of the task execution, logs the result, and stores the error if any. + fn handle_task_exit( + &mut self, + task_result: Result, tokio::task::JoinError>, + task_name: TaskId, + ) { + match task_result { + Ok(Ok(())) => { + tracing::info!("Task {task_name} finished"); } - }); - - match futures::future::try_join_all(oneshot_tasks).await { - Err(err) => Err(err), - Ok(_) if only_oneshot_tasks => { - // We only run oneshot tasks in this service, so we can exit now. - Ok(()) + Ok(Err(err)) => { + tracing::error!("Task {task_name} failed: {err}"); + self.errors.push(TaskError::TaskFailed(task_name, err)); } - Ok(_) => { - // All oneshot tasks have exited and we have at least one long-running task. - // Simply wait for the stop signal. - stop_receiver.0.changed().await.ok(); - Ok(()) + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("Task {task_name} panicked: {panic_msg}"); + self.errors + .push(TaskError::TaskPanicked(task_name, panic_msg)); } - } - // Note that we don't have to `select` on the stop signal explicitly: - // Each prerequisite is given a stop signal, and if everyone respects it, this future - // will still resolve once the stop signal is received. - }; - - NamedBoxFuture::new(future.boxed(), "oneshot_runner".into()) + }; + } } diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index 9aa715b0a74b..283fbbb327c9 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -1,6 +1,8 @@ use std::{fmt, future::Future, pin::Pin, task}; +use futures::future::{Fuse, FutureExt}; use pin_project_lite::pin_project; +use tokio::task::JoinHandle; use crate::task::TaskId; @@ -15,19 +17,34 @@ pin_project! { impl NamedFuture where - F: Future, + F: Future + Send + 'static, + F::Output: Send + 'static, { /// Creates a new future with the name tag attached. pub fn new(inner: F, name: TaskId) -> Self { Self { inner, name } } + /// Returns the ID of the task attached to the future. pub fn id(&self) -> TaskId { self.name.clone() } - pub fn into_inner(self) -> F { - self.inner + /// Fuses the wrapped future. + pub fn fuse(self) -> NamedFuture> { + NamedFuture { + name: self.name, + inner: self.inner.fuse(), + } + } + + /// Spawns the wrapped future on the provided runtime handle. + /// Returns a named wrapper over the join handle. + pub fn spawn(self, handle: &tokio::runtime::Handle) -> NamedFuture> { + NamedFuture { + name: self.name, + inner: handle.spawn(self.inner), + } } } diff --git a/core/node/node_framework/src/service/runnables.rs b/core/node/node_framework/src/service/runnables.rs index 8d240a8cffab..c3a7c21d2e80 100644 --- a/core/node/node_framework/src/service/runnables.rs +++ b/core/node/node_framework/src/service/runnables.rs @@ -1,30 +1,21 @@ use std::{fmt, sync::Arc}; -use futures::future::BoxFuture; +use anyhow::Context as _; +use futures::{future::BoxFuture, FutureExt as _}; use tokio::sync::Barrier; +use zksync_utils::panic_extractor::try_extract_panic_message; use super::{named_future::NamedFuture, StopReceiver}; -use crate::{ - precondition::Precondition, - task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask}, -}; +use crate::task::{Task, TaskKind}; /// Alias for futures with the name assigned. -pub type NamedBoxFuture = NamedFuture>; +pub(crate) type NamedBoxFuture = NamedFuture>; /// A collection of different flavors of tasks. #[derive(Default)] pub(super) struct Runnables { - /// Preconditions added to the service. - pub(super) preconditions: Vec>, /// Tasks added to the service. pub(super) tasks: Vec>, - /// Oneshot tasks added to the service. - pub(super) oneshot_tasks: Vec>, - /// Unconstrained tasks added to the service. - pub(super) unconstrained_tasks: Vec>, - /// Unconstrained oneshot tasks added to the service. - pub(super) unconstrained_oneshot_tasks: Vec>, /// List of hooks to be invoked after node shutdown. pub(super) shutdown_hooks: Vec>>, } @@ -32,14 +23,7 @@ pub(super) struct Runnables { impl fmt::Debug for Runnables { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Runnables") - .field("preconditions", &self.preconditions) .field("tasks", &self.tasks) - .field("oneshot_tasks", &self.oneshot_tasks) - .field("unconstrained_tasks", &self.unconstrained_tasks) - .field( - "unconstrained_oneshot_tasks", - &self.unconstrained_oneshot_tasks, - ) .field("shutdown_hooks", &self.shutdown_hooks) .finish() } @@ -47,16 +31,14 @@ impl fmt::Debug for Runnables { /// A unified representation of tasks that can be run by the service. pub(super) struct TaskReprs { - pub(super) long_running_tasks: Vec>>, - pub(super) oneshot_tasks: Vec>>, + pub(super) tasks: Vec>>, pub(super) shutdown_hooks: Vec>>, } impl fmt::Debug for TaskReprs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TaskReprs") - .field("long_running_tasks", &self.long_running_tasks.len()) - .field("oneshot_tasks", &self.oneshot_tasks.len()) + .field("long_running_tasks", &self.tasks.len()) .field("shutdown_hooks", &self.shutdown_hooks.len()) .finish() } @@ -68,130 +50,104 @@ impl Runnables { pub(super) fn is_empty(&self) -> bool { // We don't consider preconditions to be tasks. self.tasks.is_empty() - && self.oneshot_tasks.is_empty() - && self.unconstrained_tasks.is_empty() - && self.unconstrained_oneshot_tasks.is_empty() - } - - /// Returns `true` if there are no long-running tasks in the collection. - pub(super) fn is_oneshot_only(&self) -> bool { - self.tasks.is_empty() && self.unconstrained_tasks.is_empty() } /// Prepares a barrier that should be shared between tasks and preconditions. /// The barrier is configured to wait for all the participants to be ready. /// Barrier does not assume the existence of unconstrained tasks. pub(super) fn task_barrier(&self) -> Arc { - Arc::new(Barrier::new( - self.tasks.len() + self.preconditions.len() + self.oneshot_tasks.len(), - )) + let barrier_size = self + .tasks + .iter() + .filter(|t| { + matches!( + t.kind(), + TaskKind::Precondition | TaskKind::OneshotTask | TaskKind::Task + ) + }) + .count(); + Arc::new(Barrier::new(barrier_size)) } /// Transforms the collection of tasks into a set of universal futures. pub(super) fn prepare_tasks( - mut self, + &mut self, task_barrier: Arc, stop_receiver: StopReceiver, ) -> TaskReprs { let mut long_running_tasks = Vec::new(); - self.collect_unconstrained_tasks(&mut long_running_tasks, stop_receiver.clone()); - self.collect_tasks( - &mut long_running_tasks, - task_barrier.clone(), - stop_receiver.clone(), - ); - let mut oneshot_tasks = Vec::new(); - self.collect_preconditions( - &mut oneshot_tasks, - task_barrier.clone(), - stop_receiver.clone(), - ); - self.collect_oneshot_tasks( - &mut oneshot_tasks, - task_barrier.clone(), - stop_receiver.clone(), - ); - self.collect_unconstrained_oneshot_tasks(&mut oneshot_tasks, stop_receiver.clone()); - - TaskReprs { - long_running_tasks, - oneshot_tasks, - shutdown_hooks: self.shutdown_hooks, - } - } - fn collect_unconstrained_tasks( - &mut self, - tasks: &mut Vec>>, - stop_receiver: StopReceiver, - ) { - for task in std::mem::take(&mut self.unconstrained_tasks) { - let name = task.id(); - let stop_receiver = stop_receiver.clone(); - let task_future = Box::pin(task.run_unconstrained(stop_receiver)); - tasks.push(NamedFuture::new(task_future, name)); - } - } - - fn collect_tasks( - &mut self, - tasks: &mut Vec>>, - task_barrier: Arc, - stop_receiver: StopReceiver, - ) { for task in std::mem::take(&mut self.tasks) { let name = task.id(); + let kind = task.kind(); let stop_receiver = stop_receiver.clone(); let task_barrier = task_barrier.clone(); - let task_future = Box::pin(task.run_with_barrier(stop_receiver, task_barrier)); - tasks.push(NamedFuture::new(task_future, name)); + let task_future: BoxFuture<'static, _> = + Box::pin(task.run_internal(stop_receiver, task_barrier)); + let named_future = NamedFuture::new(task_future, name); + if kind.is_oneshot() { + oneshot_tasks.push(named_future); + } else { + long_running_tasks.push(named_future); + } } - } - fn collect_preconditions( - &mut self, - oneshot_tasks: &mut Vec>>, - task_barrier: Arc, - stop_receiver: StopReceiver, - ) { - for precondition in std::mem::take(&mut self.preconditions) { - let name = precondition.id(); - let stop_receiver = stop_receiver.clone(); - let task_barrier = task_barrier.clone(); - let task_future = - Box::pin(precondition.check_with_barrier(stop_receiver, task_barrier)); - oneshot_tasks.push(NamedFuture::new(task_future, name)); - } - } + let only_oneshot_tasks = long_running_tasks.is_empty(); + // Create a system task that is cancellation-aware and will only exit on either oneshot task failure or + // stop signal. + let oneshot_runner_system_task = + oneshot_runner_task(oneshot_tasks, stop_receiver, only_oneshot_tasks); + long_running_tasks.push(oneshot_runner_system_task); - fn collect_oneshot_tasks( - &mut self, - oneshot_tasks: &mut Vec>>, - task_barrier: Arc, - stop_receiver: StopReceiver, - ) { - for oneshot_task in std::mem::take(&mut self.oneshot_tasks) { - let name = oneshot_task.id(); - let stop_receiver = stop_receiver.clone(); - let task_barrier = task_barrier.clone(); - let task_future = - Box::pin(oneshot_task.run_oneshot_with_barrier(stop_receiver, task_barrier)); - oneshot_tasks.push(NamedFuture::new(task_future, name)); + TaskReprs { + tasks: long_running_tasks, + shutdown_hooks: std::mem::take(&mut self.shutdown_hooks), } } +} - fn collect_unconstrained_oneshot_tasks( - &mut self, - oneshot_tasks: &mut Vec>>, - stop_receiver: StopReceiver, - ) { - for unconstrained_oneshot_task in std::mem::take(&mut self.unconstrained_oneshot_tasks) { - let name = unconstrained_oneshot_task.id(); - let stop_receiver = stop_receiver.clone(); - let task_future = - Box::pin(unconstrained_oneshot_task.run_unconstrained_oneshot(stop_receiver)); - oneshot_tasks.push(NamedFuture::new(task_future, name)); +fn oneshot_runner_task( + oneshot_tasks: Vec>>, + mut stop_receiver: StopReceiver, + only_oneshot_tasks: bool, +) -> NamedBoxFuture> { + let future = async move { + let oneshot_tasks = oneshot_tasks.into_iter().map(|fut| async move { + // Spawn each oneshot task as a separate tokio task. + // This way we can handle the cases when such a task panics and propagate the message + // to the service. + let handle = tokio::runtime::Handle::current(); + let name = fut.id().to_string(); + match handle.spawn(fut).await { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err).with_context(|| format!("Oneshot task {name} failed")), + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + Err(anyhow::format_err!( + "Oneshot task {name} panicked: {panic_msg}" + )) + } + } + }); + + match futures::future::try_join_all(oneshot_tasks).await { + Err(err) => Err(err), + Ok(_) if only_oneshot_tasks => { + // We only run oneshot tasks in this service, so we can exit now. + Ok(()) + } + Ok(_) => { + // All oneshot tasks have exited and we have at least one long-running task. + // Simply wait for the stop signal. + stop_receiver.0.changed().await.ok(); + Ok(()) + } } - } + // Note that we don't have to `select` on the stop signal explicitly: + // Each prerequisite is given a stop signal, and if everyone respects it, this future + // will still resolve once the stop signal is received. + }; + + NamedBoxFuture::new(future.boxed(), "oneshot_runner".into()) } diff --git a/core/node/node_framework/src/service/stop_receiver.rs b/core/node/node_framework/src/service/stop_receiver.rs index 7a181b49a80d..e174cf62ba36 100644 --- a/core/node/node_framework/src/service/stop_receiver.rs +++ b/core/node/node_framework/src/service/stop_receiver.rs @@ -8,9 +8,3 @@ use tokio::sync::watch; /// and prevent tasks from hanging by accident. #[derive(Debug, Clone)] pub struct StopReceiver(pub watch::Receiver); - -impl StopReceiver { - pub fn new(receiver: watch::Receiver) -> Self { - Self(receiver) - } -} diff --git a/core/node/node_framework/src/task.rs b/core/node/node_framework/src/task.rs deleted file mode 100644 index 8bb7bbd2c702..000000000000 --- a/core/node/node_framework/src/task.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! Tasks define the "runnable" concept of the service, e.g. a unit of work that can be executed by the service. -//! -//! ## Task kinds -//! -//! This module defines different flavors of tasks. -//! The most basic one is [`Task`], which is only launched after all the preconditions are met (more on this later), -//! and is expected to run until the node is shut down. This is the most common type of task, e.g. API server, -//! state keeper, and metadata calculator are examples of such tasks. -//! -//! Then there exists an [`OneshotTask`], which has a clear exit condition that does not cause the node to shut down. -//! This is useful for tasks that are expected to run once and then exit, e.g. a task that performs a programmatic -//! migration. -//! -//! Finally, the task can be unconstrained by preconditions, which means that it will start immediately without -//! waiting for any preconditions to be met. This kind of tasks is represent by [`UnconstrainedTask`] and -//! [`UnconstrainedOneshotTask`]. -//! -//! ## Tasks and preconditions -//! -//! Besides tasks, service also has a concept of preconditions(crate::precondition::Precondition). Precondition is a -//! piece of logic that is expected to be met before the task can start. One can think of preconditions as a way to -//! express invariants that the tasks may rely on. -//! -//! In this notion, the difference between a task and an unconstrained task is that the former has all the invariants -//! checked already, and unrestricted task is responsible for *manually checking any invariants it may rely on*. -//! -//! The unrestricted tasks are rarely needed, but two common cases for them are: -//! - A task that must be started as soon as possible, e.g. healthcheck server. -//! - A task that may be a driving force for some precondition to be met. - -use std::{ - fmt::{self, Display, Formatter}, - ops::Deref, - sync::Arc, -}; - -use tokio::sync::Barrier; - -use crate::service::StopReceiver; - -/// A unique human-readable identifier of a task. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TaskId(String); - -impl TaskId { - pub fn new(value: String) -> Self { - TaskId(value) - } -} - -impl Display for TaskId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } -} - -impl From<&str> for TaskId { - fn from(value: &str) -> Self { - TaskId(value.to_owned()) - } -} - -impl From for TaskId { - fn from(value: String) -> Self { - TaskId(value) - } -} - -impl Deref for TaskId { - type Target = str; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// A task implementation. -/// -/// Note: any `Task` added to the service will only start after all the [preconditions](crate::precondition::Precondition) -/// are met. If a task should start immediately, one should use [`UnconstrainedTask`](crate::task::UnconstrainedTask). -#[async_trait::async_trait] -pub trait Task: 'static + Send { - /// Unique name of the task. - fn id(&self) -> TaskId; - - /// Runs the task. - /// - /// Once any of the task returns, the node will shutdown. - /// If the task returns an error, the node will spawn an error-level log message and will return a non-zero - /// exit code. - /// - /// `stop_receiver` argument contains a channel receiver that will change its value once the node requests - /// a shutdown. Every task is expected to either await or periodically check the state of channel and stop - /// its execution once the channel is changed. - /// - /// Each task is expected to perform the required cleanup after receiving the stop signal. - async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; -} - -impl dyn Task { - /// An internal helper method that guards running the task with a tokio Barrier. - /// Used to make sure that the task is not started until all the preconditions are met. - pub(super) async fn run_with_barrier( - self: Box, - mut stop_receiver: StopReceiver, - preconditions_barrier: Arc, - ) -> anyhow::Result<()> { - // Wait either for barrier to be lifted or for the stop signal to be received. - tokio::select! { - _ = preconditions_barrier.wait() => { - self.run(stop_receiver).await - } - _ = stop_receiver.0.changed() => { - Ok(()) - } - } - } -} - -impl fmt::Debug for dyn Task { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("Task").field("name", &self.id()).finish() - } -} - -/// A oneshot task implementation. -/// The difference from [`Task`] is that this kind of task may exit without causing the service to shutdown. -/// -/// Note: any `Task` added to the service will only start after all the [preconditions](crate::precondition::Precondition) -/// are met. If a task should start immediately, one should use [`UnconstrainedTask`](crate::task::UnconstrainedTask). -#[async_trait::async_trait] -pub trait OneshotTask: 'static + Send { - /// Unique name of the task. - fn id(&self) -> TaskId; - - /// Runs the task. - /// - /// Unlike [`Task::run`], this method is expected to return once the task is finished, without causing the - /// node to shutdown. - /// - /// `stop_receiver` argument contains a channel receiver that will change its value once the node requests - /// a shutdown. Every task is expected to either await or periodically check the state of channel and stop - /// its execution once the channel is changed. - /// - /// Each task is expected to perform the required cleanup after receiving the stop signal. - async fn run_oneshot(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; -} - -impl dyn OneshotTask { - /// An internal helper method that guards running the task with a tokio Barrier. - /// Used to make sure that the task is not started until all the preconditions are met. - pub(super) async fn run_oneshot_with_barrier( - self: Box, - mut stop_receiver: StopReceiver, - preconditions_barrier: Arc, - ) -> anyhow::Result<()> { - // Wait either for barrier to be lifted or for the stop signal to be received. - tokio::select! { - _ = preconditions_barrier.wait() => { - self.run_oneshot(stop_receiver).await - } - _ = stop_receiver.0.changed() => { - Ok(()) - } - } - } -} - -impl fmt::Debug for dyn OneshotTask { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OneshotTask") - .field("name", &self.id()) - .finish() - } -} - -/// A task implementation that is not constrained by preconditions. -/// -/// This trait is used to define tasks that should start immediately after the wiring phase, without waiting for -/// any preconditions to be met. -/// -/// *Warning*. An unconstrained task may not be aware of the state of the node and is expected to cautiously check -/// any invariants it may rely on. -#[async_trait::async_trait] -pub trait UnconstrainedTask: 'static + Send { - /// Unique name of the task. - fn id(&self) -> TaskId; - - /// Runs the task without waiting for any precondition to be met. - async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; -} - -impl fmt::Debug for dyn UnconstrainedTask { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("UnconstrainedTask") - .field("name", &self.id()) - .finish() - } -} - -/// An unconstrained analog of [`OneshotTask`]. -/// See [`UnconstrainedTask`] and [`OneshotTask`] for more details. -#[async_trait::async_trait] -pub trait UnconstrainedOneshotTask: 'static + Send { - /// Unique name of the task. - fn id(&self) -> TaskId; - - /// Runs the task without waiting for any precondition to be met. - async fn run_unconstrained_oneshot( - self: Box, - stop_receiver: StopReceiver, - ) -> anyhow::Result<()>; -} - -impl fmt::Debug for dyn UnconstrainedOneshotTask { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("UnconstrainedOneshotTask") - .field("name", &self.id()) - .finish() - } -} diff --git a/core/node/node_framework/src/task/mod.rs b/core/node/node_framework/src/task/mod.rs new file mode 100644 index 000000000000..8113a751441a --- /dev/null +++ b/core/node/node_framework/src/task/mod.rs @@ -0,0 +1,138 @@ +//! Tasks define the "runnable" concept of the service, e.g. a unit of work that can be executed by the service. + +use std::{ + fmt::{self, Formatter}, + sync::Arc, +}; + +use tokio::sync::Barrier; + +pub use self::types::{TaskId, TaskKind}; +use crate::service::StopReceiver; + +mod types; + +/// A task implementation. +/// Task defines the "runnable" concept of the service, e.g. a unit of work that can be executed by the service. +/// +/// Based on the task kind, the implemenation will be treated differently by the service. +/// +/// ## Task kinds +/// +/// There may be different kinds of tasks: +/// +/// ### `Task` +/// +/// A regular task. Returning from this task will cause the service to stop. [`Task::kind`] has a default +/// implementation that returns `TaskKind::Task`. +/// +/// Typically, the implementation of [`Task::run`] will be some form of loop that runs until either an +/// irrecoverable error happens (then task should return an error), or stop signal is received (then task should +/// return `Ok(())`). +/// +/// ### `OneshotTask` +/// +/// A task that can exit when completed without causing the service to terminate. +/// In case of `OneshotTask`s, the service will only exit when all the `OneshotTask`s have exited and there are +/// no more tasks running. +/// +/// ### `Precondition` +/// +/// A "barrier" task that is supposed to check invariants before the main tasks are started. +/// An example of a precondition task could be a task that checks if the database has all the required data. +/// Precondition tasks are often paired with some other kind of task that will make sure that the precondition +/// can be satisfied. This is required for a distributed service setup, where the precondition task will be +/// present on all the nodes, while a task that satisfies the precondition will be present only on one node. +/// +/// ### `UnconstrainedTask` +/// +/// A task that can run without waiting for preconditions. +/// Tasks of this kind are expected to check all the invariants they rely on themselves. +/// Usually, this kind of task is used either for tasks that must start as early as possible (e.g. healthcheck server), +/// or for tasks that cannot rely on preconditions. +/// +/// ### `UnconstrainedOneshotTask` +/// +/// A task that can run without waiting for preconditions and can exit without stopping the service. +/// Usually such tasks may be used for satisfying a precondition, for example, they can perform the database +/// setup. +#[async_trait::async_trait] +pub trait Task: 'static + Send { + /// Returns the kind of the task. + /// The returned values is expected to be static, and it will be used by the service + /// to determine how to handle the task. + fn kind(&self) -> TaskKind { + TaskKind::Task + } + + /// Unique name of the task. + fn id(&self) -> TaskId; + + /// Runs the task. + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()>; +} + +impl dyn Task { + /// An internal helper method that guards running the task with a tokio Barrier. + /// Used to make sure that the task is not started until all the preconditions are met. + pub(super) async fn run_internal( + self: Box, + stop_receiver: StopReceiver, + preconditions_barrier: Arc, + ) -> anyhow::Result<()> { + match self.kind() { + TaskKind::Task | TaskKind::OneshotTask => { + self.run_with_barrier(stop_receiver, preconditions_barrier) + .await + } + TaskKind::UnconstrainedTask | TaskKind::UnconstrainedOneshotTask => { + self.run(stop_receiver).await + } + TaskKind::Precondition => { + self.check_precondition(stop_receiver, preconditions_barrier) + .await + } + } + } + + async fn run_with_barrier( + self: Box, + mut stop_receiver: StopReceiver, + preconditions_barrier: Arc, + ) -> anyhow::Result<()> { + // Wait either for barrier to be lifted or for the stop signal to be received. + tokio::select! { + _ = preconditions_barrier.wait() => { + self.run(stop_receiver).await + } + _ = stop_receiver.0.changed() => { + Ok(()) + } + } + } + + async fn check_precondition( + self: Box, + mut stop_receiver: StopReceiver, + preconditions_barrier: Arc, + ) -> anyhow::Result<()> { + self.run(stop_receiver.clone()).await?; + tokio::select! { + _ = preconditions_barrier.wait() => { + Ok(()) + } + _ = stop_receiver.0.changed() => { + Ok(()) + } + } + } +} + +impl fmt::Debug for dyn Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Task") + .field("kind", &self.kind()) + .field("name", &self.id()) + .finish() + } +} diff --git a/core/node/node_framework/src/task/types.rs b/core/node/node_framework/src/task/types.rs new file mode 100644 index 000000000000..70df61e56989 --- /dev/null +++ b/core/node/node_framework/src/task/types.rs @@ -0,0 +1,60 @@ +use std::{ + fmt::{Display, Formatter}, + ops::Deref, +}; + +/// Task kind. +/// See [`Task`](super::Task) documentation for more details. +#[derive(Debug, Clone, Copy)] +pub enum TaskKind { + Task, + OneshotTask, + UnconstrainedTask, + UnconstrainedOneshotTask, + Precondition, +} + +impl TaskKind { + pub(crate) fn is_oneshot(self) -> bool { + matches!( + self, + TaskKind::OneshotTask | TaskKind::UnconstrainedOneshotTask | TaskKind::Precondition + ) + } +} + +/// A unique human-readable identifier of a task. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TaskId(String); + +impl TaskId { + pub fn new(value: String) -> Self { + TaskId(value) + } +} + +impl Display for TaskId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl From<&str> for TaskId { + fn from(value: &str) -> Self { + TaskId(value.to_owned()) + } +} + +impl From for TaskId { + fn from(value: String) -> Self { + TaskId(value) + } +} + +impl Deref for TaskId { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +}