Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node_framework): Unify Task types + misc improvements #2325

Merged
merged 9 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/node/node_framework/examples/showcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ struct DatabaseResource(pub Arc<dyn Database>);
///
/// For the latter requirement, there exists an `Unique` wrapper that can be used to store non-`Clone`
/// resources. It's not used in this example, but it's a useful thing to know about.
///
/// Finally, there are other wrappers for resources as well, like `ResourceCollection` and `LazyResource`.
impl Resource for DatabaseResource {
fn name() -> String {
// The convention for resource names is `<scope>/<name>`. In this case, the scope is `common`, but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_config::configs::chain::CircuitBreakerConfig;
use crate::{
implementations::resources::circuit_breakers::CircuitBreakersResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -44,7 +44,7 @@ impl WiringLayer for CircuitBreakerCheckerLayer {
circuit_breaker_checker,
};

node.add_unconstrained_task(Box::new(task));
node.add_task(Box::new(task));
Ok(())
}
}
Expand All @@ -55,15 +55,16 @@ struct CircuitBreakerCheckerTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for CircuitBreakerCheckerTask {
impl Task for CircuitBreakerCheckerTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"circuit_breaker_checker".into()
}

async fn run_unconstrained(
mut self: Box<Self>,
stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.circuit_breaker_checker.run(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zksync_node_api_server::healthcheck::HealthCheckHandle;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -41,7 +41,7 @@ impl WiringLayer for HealthCheckLayer {
app_health_check,
};

node.add_unconstrained_task(Box::new(task));
node.add_task(Box::new(task));
Ok(())
}
}
Expand All @@ -53,15 +53,16 @@ struct HealthCheckTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for HealthCheckTask {
impl Task for HealthCheckTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"healthcheck_server".into()
}

async fn run_unconstrained(
mut self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
let handle =
HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check.clone());
stop_receiver.0.changed().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use zksync_types::{commitment::L1BatchCommitmentMode, Address};

use crate::{
implementations::resources::eth_interface::EthInterfaceResource,
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -51,19 +50,23 @@ impl WiringLayer for L1BatchCommitmentModeValidationLayer {
query_client,
);

context.add_precondition(Box::new(task));
context.add_task(Box::new(task));

Ok(())
}
}

#[async_trait::async_trait]
impl Precondition for L1BatchCommitmentModeValidationTask {
impl Task for L1BatchCommitmentModeValidationTask {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"l1_batch_commitment_mode_validation".into()
}

async fn check(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).exit_on_success().run(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core};
use crate::{
implementations::resources::pools::{PoolResource, ReplicaPool},
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand All @@ -32,7 +32,7 @@ impl WiringLayer for PostgresMetricsLayer {
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let replica_pool_resource = context.get_resource::<PoolResource<ReplicaPool>>().await?;
let pool_for_metrics = replica_pool_resource.get_singleton().await?;
context.add_unconstrained_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics }));
context.add_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics }));

Ok(())
}
Expand All @@ -44,15 +44,16 @@ struct PostgresMetricsScrapingTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for PostgresMetricsScrapingTask {
impl Task for PostgresMetricsScrapingTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"postgres_metrics_scraping".into()
}

async fn run_unconstrained(
self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tokio::select! {
() = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => {
tracing::warn!("Postgres metrics scraping unexpectedly stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_vlog::prometheus::PrometheusExporterConfig;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -46,18 +46,22 @@ impl WiringLayer for PrometheusExporterLayer {
prometheus_health_updater,
});

node.add_unconstrained_task(task);
node.add_task(task);
Ok(())
}
}

#[async_trait::async_trait]
impl UnconstrainedTask for PrometheusExporterTask {
impl Task for PrometheusExporterTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"prometheus_exporter".into()
}

async fn run_unconstrained(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let prometheus_task = self.config.run(stop_receiver.0);
self.prometheus_health_updater
.update(HealthStatus::Ready.into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use crate::{
main_node_client::MainNodeClientResource,
pools::{MasterPool, PoolResource},
},
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -45,7 +44,7 @@ impl WiringLayer for ReorgDetectorCheckerLayer {
let pool = pool_resource.get().await?;

// Create and insert precondition.
context.add_precondition(Box::new(CheckerPrecondition {
context.add_task(Box::new(CheckerPrecondition {
pool: pool.clone(),
reorg_detector: ReorgDetector::new(main_node_client, pool),
}));
Expand All @@ -60,12 +59,16 @@ pub struct CheckerPrecondition {
}

#[async_trait::async_trait]
impl Precondition for CheckerPrecondition {
impl Task for CheckerPrecondition {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"reorg_detector_checker".into()
}

async fn check(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
// Given that this is a precondition -- i.e. something that starts before some invariants are met,
// we need to first ensure that there is at least one batch in the database (there may be none if
// either genesis or snapshot recovery has not been performed yet).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
reverter::BlockReverterResource,
},
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedOneshotTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -46,7 +46,7 @@ impl WiringLayer for ReorgDetectorRunnerLayer {
let reverter = context.get_resource::<BlockReverterResource>().await?.0;

// Create and insert task.
context.add_unconstrained_oneshot_task(Box::new(RunnerUnconstrainedOneshotTask {
context.add_task(Box::new(RunnerUnconstrainedOneshotTask {
reorg_detector: ReorgDetector::new(main_node_client, pool),
reverter,
}));
Expand All @@ -61,15 +61,16 @@ pub struct RunnerUnconstrainedOneshotTask {
}

#[async_trait::async_trait]
impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask {
impl Task for RunnerUnconstrainedOneshotTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedOneshotTask
}

fn id(&self) -> TaskId {
"reorg_detector_runner".into()
}

async fn run_unconstrained_oneshot(
mut self: Box<Self>,
stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
match self.reorg_detector.run_once(stop_receiver.0.clone()).await {
Ok(()) => {}
Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => {
Expand Down
15 changes: 8 additions & 7 deletions core/node/node_framework/src/implementations/layers/sigint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use tokio::sync::oneshot;

use crate::{
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand All @@ -23,7 +23,7 @@ impl WiringLayer for SigintHandlerLayer {

async fn wire(self: Box<Self>, mut node: ServiceContext<'_>) -> Result<(), WiringError> {
// SIGINT may happen at any time, so we must handle it as soon as it happens.
node.add_unconstrained_task(Box::new(SigintHandlerTask));
node.add_task(Box::new(SigintHandlerTask));
Ok(())
}
}
Expand All @@ -32,15 +32,16 @@ impl WiringLayer for SigintHandlerLayer {
struct SigintHandlerTask;

#[async_trait::async_trait]
impl UnconstrainedTask for SigintHandlerTask {
impl Task for SigintHandlerTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"sigint_handler".into()
}

async fn run_unconstrained(
self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
let (sigint_sender, sigint_receiver) = oneshot::channel();
let mut sigint_sender = Some(sigint_sender); // Has to be done this way since `set_handler` requires `FnMut`.
ctrlc::set_handler(move || {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::{
implementations::resources::{
eth_interface::EthInterfaceResource, main_node_client::MainNodeClientResource,
},
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -54,19 +53,23 @@ impl WiringLayer for ValidateChainIdsLayer {
main_node_client,
);

context.add_precondition(Box::new(task));
context.add_task(Box::new(task));

Ok(())
}
}

#[async_trait::async_trait]
impl Precondition for ValidateChainIdsTask {
impl Task for ValidateChainIdsTask {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"validate_chain_ids".into()
}

async fn check(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run_once(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ pub struct SyncStateResource(pub SyncState);

impl Resource for SyncStateResource {
fn name() -> String {
"sync_state".into()
"common/sync_state".into()
}
}
11 changes: 1 addition & 10 deletions core/node/node_framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
//! # ZK Stack node initialization framework.
//!
//! ## Introduction
//!
//! This crate provides core abstractions that allow one to compose a ZK Stack node.
//! Main concepts used in this crate are:
//! - [`WiringLayer`](wiring_layer::WiringLayer) - builder interface for tasks.
//! - [`Task`](task::Task) - a unit of work that can be executed by the node.
//! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are
//! represented by generic interfaces and also serve as points of customization for tasks.
//! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node.
//! - [`ZkStackService`](service::ZkStackService) - a container for tasks and resources that takes care of initialization, running
//! and shutting down.
//!
//! The general flow to compose a node is as follows:
//! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs.
//! - Create a [`ZkStackService`](node::ZkStackService) with that [`ResourceProvider`](resource::ResourceProvider).
//! - Add tasks to the node.
//! - Run it.
//! - [`ZkStackServiceBuilder`](service::ZkStackServiceBuilder) - a builder for the service.

pub mod implementations;
pub mod precondition;
pub mod resource;
pub mod service;
pub mod task;
Expand Down
Loading
Loading