From e2573000d43e9ae05a5420fa0074e75b90aecd78 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Wed, 10 Jul 2024 15:48:35 +0400 Subject: [PATCH 01/15] Make it possible to override health check time limits --- core/lib/health_check/src/lib.rs | 93 +++++++++++++------ core/lib/health_check/src/tests.rs | 8 +- core/node/api_server/src/healthcheck.rs | 1 + .../layers/healtcheck_server.rs | 2 + 4 files changed, 73 insertions(+), 31 deletions(-) diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 8a3068d661d9..e4e8ba3c9a58 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -106,7 +106,12 @@ pub enum AppHealthCheckError { /// Application health check aggregating health from multiple components. #[derive(Debug)] pub struct AppHealthCheck { - components: Mutex>>, + inner: Mutex, +} + +#[derive(Debug, Clone)] +struct AppHealthCheckInner { + components: Vec>, slow_time_limit: Duration, hard_time_limit: Duration, } @@ -118,17 +123,52 @@ impl Default for AppHealthCheck { } impl AppHealthCheck { - pub fn new(slow_time_limit: Option, hard_time_limit: Option) -> Self { - const DEFAULT_SLOW_TIME_LIMIT: Duration = Duration::from_millis(500); - const DEFAULT_HARD_TIME_LIMIT: Duration = Duration::from_secs(3); + const DEFAULT_SLOW_TIME_LIMIT: Duration = Duration::from_millis(500); + const DEFAULT_HARD_TIME_LIMIT: Duration = Duration::from_secs(3); - let slow_time_limit = slow_time_limit.unwrap_or(DEFAULT_SLOW_TIME_LIMIT); - let hard_time_limit = hard_time_limit.unwrap_or(DEFAULT_HARD_TIME_LIMIT); + pub fn new(slow_time_limit: Option, hard_time_limit: Option) -> Self { + let slow_time_limit = slow_time_limit.unwrap_or(Self::DEFAULT_SLOW_TIME_LIMIT); + let hard_time_limit = hard_time_limit.unwrap_or(Self::DEFAULT_HARD_TIME_LIMIT); tracing::debug!("Created app health with time limits: slow={slow_time_limit:?}, hard={hard_time_limit:?}"); - let config = AppHealthCheckConfig { - slow_time_limit: slow_time_limit.into(), - hard_time_limit: hard_time_limit.into(), + let inner = AppHealthCheckInner { + components: Vec::default(), + slow_time_limit, + hard_time_limit, + }; + Self { + inner: Mutex::new(inner), + } + } + + pub fn override_limits( + &self, + slow_time_limit: Option, + hard_time_limit: Option, + ) { + let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned"); + if let Some(slow_time_limit) = slow_time_limit { + guard.slow_time_limit = slow_time_limit; + } + if let Some(hard_time_limit) = hard_time_limit { + guard.hard_time_limit = hard_time_limit; + } + tracing::debug!( + "Overridden app health time limits: slow={:?}, hard={:?}", + guard.slow_time_limit, + guard.hard_time_limit + ); + } + + /// Sets the info metrics for the metrics time limits. + /// This method should be called at most once when all the health checks are collected. + pub fn expose_metrics(&self) { + let config = { + let inner = self.inner.lock().expect("`AppHealthCheck` is poisoned"); + AppHealthCheckConfig { + slow_time_limit: inner.slow_time_limit.into(), + hard_time_limit: inner.hard_time_limit.into(), + } }; if METRICS.info.set(config).is_err() { tracing::warn!( @@ -136,12 +176,6 @@ impl AppHealthCheck { METRICS.info.get() ); } - - Self { - components: Mutex::default(), - slow_time_limit, - hard_time_limit, - } } /// Inserts health check for a component. @@ -166,32 +200,33 @@ impl AppHealthCheck { health_check: Arc, ) -> Result<(), AppHealthCheckError> { let health_check_name = health_check.name(); - let mut guard = self + let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned"); + if guard .components - .lock() - .expect("`AppHealthCheck` is poisoned"); - if guard.iter().any(|check| check.name() == health_check_name) { + .iter() + .any(|check| check.name() == health_check_name) + { return Err(AppHealthCheckError::RedefinedComponent(health_check_name)); } - guard.push(health_check); + guard.components.push(health_check); Ok(()) } /// Checks the overall application health. This will query all component checks concurrently. pub async fn check_health(&self) -> AppHealth { - // Clone checks so that we don't hold a lock for them across a wait point. - let health_checks = self - .components + // Clone `inner` so that we don't hold a lock for them across a wait point. + let AppHealthCheckInner { + components, + slow_time_limit, + hard_time_limit, + } = self + .inner .lock() .expect("`AppHealthCheck` is poisoned") .clone(); - let check_futures = health_checks.iter().map(|check| { - Self::check_health_with_time_limit( - check.as_ref(), - self.slow_time_limit, - self.hard_time_limit, - ) + let check_futures = components.iter().map(|check| { + Self::check_health_with_time_limit(check.as_ref(), slow_time_limit, hard_time_limit) }); let components: HashMap<_, _> = future::join_all(check_futures).await.into_iter().collect(); diff --git a/core/lib/health_check/src/tests.rs b/core/lib/health_check/src/tests.rs index 46c276372ae1..14c610e9fd83 100644 --- a/core/lib/health_check/src/tests.rs +++ b/core/lib/health_check/src/tests.rs @@ -81,9 +81,13 @@ async fn updating_health_status_return_value() { async fn aggregating_health_checks() { let (first_check, first_updater) = ReactiveHealthCheck::new("first"); let (second_check, second_updater) = ReactiveHealthCheck::new("second"); + let inner = AppHealthCheckInner { + components: vec![Arc::new(first_check), Arc::new(second_check)], + slow_time_limit: AppHealthCheck::DEFAULT_SLOW_TIME_LIMIT, + hard_time_limit: AppHealthCheck::DEFAULT_HARD_TIME_LIMIT, + }; let checks = AppHealthCheck { - components: Mutex::new(vec![Arc::new(first_check), Arc::new(second_check)]), - ..AppHealthCheck::default() + inner: Mutex::new(inner), }; let app_health = checks.check_health().await; diff --git a/core/node/api_server/src/healthcheck.rs b/core/node/api_server/src/healthcheck.rs index bb97b87bdfbc..414c2dbc21e9 100644 --- a/core/node/api_server/src/healthcheck.rs +++ b/core/node/api_server/src/healthcheck.rs @@ -25,6 +25,7 @@ async fn run_server( "Starting healthcheck server with checks {app_health_check:?} on {bind_address}" ); + app_health_check.expose_metrics(); let app = Router::new() .route("/health", get(check_health)) .with_state(app_health_check); diff --git a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs index 227048c0f545..83a74c63cb45 100644 --- a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs +++ b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs @@ -45,6 +45,8 @@ impl WiringLayer for HealthCheckLayer { async fn wire(self, input: Self::Input) -> Result { let AppHealthCheckResource(app_health_check) = input.app_health_check; + app_health_check.override_limits(self.0.slow_time_limit(), self.0.hard_time_limit()); + let health_check_task = HealthCheckTask { config: self.0, app_health_check, From b256197a2890d4330414ef7f9c6fff032d245b1e Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Wed, 10 Jul 2024 16:04:04 +0400 Subject: [PATCH 02/15] Add EN metrics as a layer --- Cargo.lock | 1 + core/bin/external_node/Cargo.toml | 1 + core/bin/external_node/src/main.rs | 6 +- .../external_node/src/metrics/framework.rs | 79 +++++++++++++++++++ .../src/{metrics.rs => metrics/mod.rs} | 18 +++-- core/bin/external_node/src/node_builder.rs | 14 +++- 6 files changed, 112 insertions(+), 7 deletions(-) create mode 100644 core/bin/external_node/src/metrics/framework.rs rename core/bin/external_node/src/{metrics.rs => metrics/mod.rs} (84%) diff --git a/Cargo.lock b/Cargo.lock index 5f0b288caa22..581185abe4c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8636,6 +8636,7 @@ version = "24.8.0" dependencies = [ "anyhow", "assert_matches", + "async-trait", "clap 4.4.6", "envy", "futures 0.3.28", diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index 06bc8c203373..6d0114354f66 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -48,6 +48,7 @@ zksync_concurrency.workspace = true zksync_consensus_roles.workspace = true vise.workspace = true +async-trait.workspace = true anyhow.workspace = true tokio = { workspace = true, features = ["full"] } futures.workspace = true diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 75c3a7b8861b..a198d8f84546 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -848,7 +848,11 @@ async fn main() -> anyhow::Result<()> { } RUST_METRICS.initialize(); - EN_METRICS.observe_config(&config); + EN_METRICS.observe_config( + config.required.l1_chain_id, + config.required.l2_chain_id, + config.postgres.max_connections, + ); let singleton_pool_builder = ConnectionPool::singleton(config.postgres.database_url()); let connection_pool = ConnectionPool::::builder( diff --git a/core/bin/external_node/src/metrics/framework.rs b/core/bin/external_node/src/metrics/framework.rs new file mode 100644 index 000000000000..f153562ecf23 --- /dev/null +++ b/core/bin/external_node/src/metrics/framework.rs @@ -0,0 +1,79 @@ +use std::time::Duration; + +use super::EN_METRICS; +use zksync_dal::{ConnectionPool, Core, CoreDal as _}; +use zksync_node_framework::{ + implementations::resources::pools::{MasterPool, PoolResource}, + FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, +}; +use zksync_types::{L1ChainId, L2ChainId}; + +#[derive(Debug)] +pub struct ExternalNodeMetricsLayer { + pub l1_chain_id: L1ChainId, + pub l2_chain_id: L2ChainId, + pub postgres_pool_size: u32, +} + +#[derive(Debug, FromContext)] +pub struct Input { + pub master_pool: PoolResource, +} + +#[derive(Debug, IntoContext)] +pub struct Output { + #[context(task)] + pub task: ProtocolVersionMetricsTask, +} + +#[async_trait::async_trait] +impl WiringLayer for ExternalNodeMetricsLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "external_node_metrics" + } + + async fn wire(self, input: Self::Input) -> Result { + EN_METRICS.observe_config(self.l1_chain_id, self.l2_chain_id, self.postgres_pool_size); + + let pool = input.master_pool.get_singleton().await?; + let task = ProtocolVersionMetricsTask { pool }; + Ok(Output { task }) + } +} + +#[derive(Debug)] +pub struct ProtocolVersionMetricsTask { + pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl Task for ProtocolVersionMetricsTask { + fn id(&self) -> TaskId { + "en_protocol_version_metrics".into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + const QUERY_INTERVAL: Duration = Duration::from_secs(10); + + while !*stop_receiver.0.borrow_and_update() { + let maybe_protocol_version = self + .pool + .connection() + .await? + .protocol_versions_dal() + .last_used_version_id() + .await; + if let Some(version) = maybe_protocol_version { + EN_METRICS.protocol_version.set(version as u64); + } + + tokio::time::timeout(QUERY_INTERVAL, stop_receiver.0.changed()) + .await + .ok(); + } + Ok(()) + } +} diff --git a/core/bin/external_node/src/metrics.rs b/core/bin/external_node/src/metrics/mod.rs similarity index 84% rename from core/bin/external_node/src/metrics.rs rename to core/bin/external_node/src/metrics/mod.rs index ca4495180226..fe1b81adc266 100644 --- a/core/bin/external_node/src/metrics.rs +++ b/core/bin/external_node/src/metrics/mod.rs @@ -3,8 +3,11 @@ use std::time::Duration; use tokio::sync::watch; use vise::{EncodeLabelSet, Gauge, Info, Metrics}; use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_types::{L1ChainId, L2ChainId}; -use crate::{config::ExternalNodeConfig, metadata::SERVER_VERSION}; +use crate::metadata::SERVER_VERSION; + +pub(crate) mod framework; /// Immutable EN parameters that affect multiple components. #[derive(Debug, Clone, Copy, EncodeLabelSet)] @@ -26,12 +29,17 @@ pub(crate) struct ExternalNodeMetrics { } impl ExternalNodeMetrics { - pub(crate) fn observe_config(&self, config: &ExternalNodeConfig) { + pub(crate) fn observe_config( + &self, + l1_chain_id: L1ChainId, + l2_chain_id: L2ChainId, + postgres_pool_size: u32, + ) { let info = ExternalNodeInfo { server_version: SERVER_VERSION, - l1_chain_id: config.required.l1_chain_id.0, - l2_chain_id: config.required.l2_chain_id.as_u64(), - postgres_pool_size: config.postgres.max_connections, + l1_chain_id: l1_chain_id.0, + l2_chain_id: l2_chain_id.as_u64(), + postgres_pool_size, }; tracing::info!("Setting general node information: {info:?}"); diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 43325be7441b..f50c8f4eddeb 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -55,6 +55,7 @@ use zksync_state::RocksdbStorageOptions; use crate::{ config::{self, ExternalNodeConfig}, + metrics::framework::ExternalNodeMetricsLayer, Component, }; @@ -115,6 +116,15 @@ impl ExternalNodeBuilder { Ok(self) } + fn add_external_node_metrics_layer(mut self) -> anyhow::Result { + self.node.add_layer(ExternalNodeMetricsLayer { + l1_chain_id: self.config.required.l1_chain_id, + l2_chain_id: self.config.required.l2_chain_id, + postgres_pool_size: self.config.postgres.max_connections, + }); + Ok(self) + } + fn add_main_node_client_layer(mut self) -> anyhow::Result { let layer = MainNodeClientLayer::new( self.config.required.main_node_url.clone(), @@ -539,7 +549,9 @@ impl ExternalNodeBuilder { // Core is a singleton & mandatory component, // so until we have a dedicated component for "auxiliary" tasks, // it's responsible for things like metrics. - self = self.add_postgres_metrics_layer()?; + self = self + .add_postgres_metrics_layer()? + .add_external_node_metrics_layer()?; // Main tasks self = self From b367bf2bfcad4d4d5614d6df646e066fecc62e24 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:07:34 +0400 Subject: [PATCH 03/15] Adapt tests to use framework & remove db health check --- core/bin/external_node/src/main.rs | 7 +- core/bin/external_node/src/node_builder.rs | 2 +- core/bin/external_node/src/tests/framework.rs | 156 ++++++++++++++++++ .../src/{tests.rs => tests/mod.rs} | 74 +++++---- core/lib/db_connection/src/connection_pool.rs | 5 + core/lib/db_connection/src/healthcheck.rs | 58 ------- core/lib/db_connection/src/lib.rs | 1 - .../src/implementations/layers/pools_layer.rs | 40 +---- 8 files changed, 210 insertions(+), 133 deletions(-) create mode 100644 core/bin/external_node/src/tests/framework.rs rename core/bin/external_node/src/{tests.rs => tests/mod.rs} (86%) delete mode 100644 core/lib/db_connection/src/healthcheck.rs diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index a198d8f84546..7d0885de338c 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -17,9 +17,7 @@ use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode} use zksync_consistency_checker::ConsistencyChecker; use zksync_core_leftovers::setup_sigint_handler; use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core}; -use zksync_db_connection::{ - connection_pool::ConnectionPoolBuilder, healthcheck::ConnectionPoolHealthCheck, -}; +use zksync_db_connection::connection_pool::ConnectionPoolBuilder; use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck}; use zksync_metadata_calculator::{ api_server::{TreeApiClient, TreeApiHttpClient}, @@ -915,9 +913,6 @@ async fn run_node( app_health.insert_custom_component(Arc::new(MainNodeHealthCheck::from( main_node_client.clone(), )))?; - app_health.insert_custom_component(Arc::new(ConnectionPoolHealthCheck::new( - connection_pool.clone(), - )))?; // Start the health check server early into the node lifecycle so that its health can be monitored from the very start. let healthcheck_handle = HealthCheckHandle::spawn_server( diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index f50c8f4eddeb..d66dbcd23f46 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -62,7 +62,7 @@ use crate::{ /// Builder for the external node. #[derive(Debug)] pub(crate) struct ExternalNodeBuilder { - node: ZkStackServiceBuilder, + pub(crate) node: ZkStackServiceBuilder, config: ExternalNodeConfig, } diff --git a/core/bin/external_node/src/tests/framework.rs b/core/bin/external_node/src/tests/framework.rs new file mode 100644 index 000000000000..e3292cc73d7c --- /dev/null +++ b/core/bin/external_node/src/tests/framework.rs @@ -0,0 +1,156 @@ +use std::sync::Arc; + +use tokio::sync::oneshot; +use zksync_health_check::AppHealthCheck; +use zksync_node_framework::{ + implementations::{ + layers::{ + main_node_client::MainNodeClientLayer, query_eth_client::QueryEthClientLayer, + sigint::SigintHandlerLayer, + }, + resources::{ + eth_interface::EthInterfaceResource, healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + }, + }, + service::ServiceContext, + FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, +}; +use zksync_types::{L1ChainId, L2ChainId}; +use zksync_web3_decl::client::{MockClient, L1, L2}; + +use super::{ExternalNodeBuilder, TestEnvironment}; + +pub(super) fn inject_test_layers( + node: &mut ExternalNodeBuilder, + test_env: TestEnvironment, + l1_client: MockClient, + l2_client: MockClient, +) { + node.node + .add_layer(TestSigintLayer { + receiver: test_env.sigint_receiver.unwrap(), + }) + .add_layer(AppHealthHijackLayer { + receiver: test_env.app_health_sender.unwrap(), + }) + .add_layer(MockL1ClientLayer { client: l1_client }) + .add_layer(MockL2ClientLayer { client: l2_client }); +} + +/// A test layer that would stop the node upon request. +/// Replaces the `SigintHandlerLayer` in tests. +#[derive(Debug)] +struct TestSigintLayer { + receiver: oneshot::Receiver<()>, +} + +#[async_trait::async_trait] +impl WiringLayer for TestSigintLayer { + type Input = (); + type Output = TestSigintTask; + + fn layer_name(&self) -> &'static str { + // We want to override layer by inserting it first. + SigintHandlerLayer.layer_name() + } + + async fn wire(self, _: Self::Input) -> Result { + Ok(TestSigintTask(self.receiver)) + } +} + +struct TestSigintTask(oneshot::Receiver<()>); + +#[async_trait::async_trait] +impl Task for TestSigintTask { + fn id(&self) -> TaskId { + "test_sigint_task".into() + } + + async fn run(self: Box, _: StopReceiver) -> anyhow::Result<()> { + self.0.await?; + Ok(()) + } +} + +impl IntoContext for TestSigintTask { + fn into_context(self, context: &mut ServiceContext<'_>) -> Result<(), WiringError> { + context.add_task(self); + Ok(()) + } +} + +/// Hijacks the `AppHealthCheck` from the context and passes it to the test. +/// Note: It's a separate layer to get access to the app health check, not an override. +#[derive(Debug)] +struct AppHealthHijackLayer { + receiver: oneshot::Sender>, +} + +#[derive(Debug, FromContext)] +struct AppHealthHijackInput { + #[context(default)] + app_health_check: AppHealthCheckResource, +} + +#[async_trait::async_trait] +impl WiringLayer for AppHealthHijackLayer { + type Input = AppHealthHijackInput; + type Output = (); + + fn layer_name(&self) -> &'static str { + "app_health_hijack" + } + + async fn wire(self, input: Self::Input) -> Result { + self.receiver.send(input.app_health_check.0).unwrap(); + tracing::error!("Submitted health"); + Ok(()) + } +} + +#[derive(Debug)] +struct MockL1ClientLayer { + client: MockClient, +} + +#[async_trait::async_trait] +impl WiringLayer for MockL1ClientLayer { + type Input = (); + type Output = EthInterfaceResource; + + fn layer_name(&self) -> &'static str { + // We don't care about values, we just want to hijack the layer name. + QueryEthClientLayer::new(L1ChainId(1), "https://example.com".parse().unwrap()).layer_name() + } + + async fn wire(self, _: Self::Input) -> Result { + Ok(EthInterfaceResource(Box::new(self.client))) + } +} + +#[derive(Debug)] +struct MockL2ClientLayer { + client: MockClient, +} + +#[async_trait::async_trait] +impl WiringLayer for MockL2ClientLayer { + type Input = (); + type Output = MainNodeClientResource; + + fn layer_name(&self) -> &'static str { + // We don't care about values, we just want to hijack the layer name. + MainNodeClientLayer::new( + "https://example.com".parse().unwrap(), + 100.try_into().unwrap(), + L2ChainId::default(), + ) + .layer_name() + } + + async fn wire(self, _: Self::Input) -> Result { + Ok(MainNodeClientResource(Box::new(self.client))) + } +} diff --git a/core/bin/external_node/src/tests.rs b/core/bin/external_node/src/tests/mod.rs similarity index 86% rename from core/bin/external_node/src/tests.rs rename to core/bin/external_node/src/tests/mod.rs index 6d3e8f278f32..8a20dc31189c 100644 --- a/core/bin/external_node/src/tests.rs +++ b/core/bin/external_node/src/tests/mod.rs @@ -1,8 +1,10 @@ //! High-level tests for EN. use assert_matches::assert_matches; +use framework::inject_test_layers; use test_casing::test_casing; use zksync_dal::CoreDal; +use zksync_db_connection::connection_pool::TestTemplate; use zksync_eth_client::clients::MockEthereum; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_types::{ @@ -16,6 +18,8 @@ use zksync_web3_decl::{ use super::*; +mod framework; + const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(100); @@ -129,7 +133,7 @@ fn mock_eth_client(diamond_proxy_addr: Address) -> MockClient { mock.build().into_client() } -#[test_casing(5, ["all", "core", "api", "tree", "tree,tree_api"])] +#[test_casing(3, ["all", "core", "api"])] #[tokio::test] #[tracing::instrument] // Add args to the test logs async fn external_node_basics(components_str: &'static str) { @@ -138,8 +142,10 @@ async fn external_node_basics(components_str: &'static str) { // Simplest case to mock: the EN already has a genesis L1 batch / L2 block, and it's the only L1 batch / L2 block // in the network. - let connection_pool = ConnectionPool::test_pool().await; - let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone()); + let test_db: ConnectionPoolBuilder = + TestTemplate::empty().unwrap().create_db(100).await.unwrap(); + let connection_pool = test_db.build().await.unwrap(); + // let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone()); let mut storage = connection_pool.connection().await.unwrap(); let genesis_params = insert_genesis_batch(&mut storage, &GenesisParams::mock()) .await @@ -164,6 +170,8 @@ async fn external_node_basics(components_str: &'static str) { use_node_framework: false, }; let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); + drop(connection_pool); + if opt.components.0.contains(&Component::TreeApi) { config.tree_component.api_port = Some(0); } @@ -195,23 +203,25 @@ async fn external_node_basics(components_str: &'static str) { .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); - let l2_client = Box::new(l2_client); - let eth_client = Box::new(mock_eth_client(diamond_proxy_addr)); + // let l2_client = Box::new(l2_client); + let eth_client = mock_eth_client(diamond_proxy_addr); let (env, env_handles) = TestEnvironment::new(); - let node_handle = tokio::spawn(async move { - run_node( - env, - &opt, - &config, - connection_pool, - singleton_pool_builder, - l2_client, - eth_client, - ) - .await + let node_handle = tokio::task::spawn_blocking(move || { + std::thread::spawn(move || { + let mut node = ExternalNodeBuilder::new(config); + inject_test_layers(&mut node, env, eth_client, l2_client); + + let node = node.build(opt.components.0.into_iter().collect())?; + node.run()?; + anyhow::Ok(()) + }) + .join() + .unwrap() }); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Wait until the node is ready. let app_health = match env_handles.app_health_receiver.await { Ok(app_health) => app_health, @@ -259,8 +269,9 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging let temp_dir = tempfile::TempDir::new().unwrap(); - let connection_pool = ConnectionPool::test_pool().await; - let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone()); + let test_db: ConnectionPoolBuilder = + TestTemplate::empty().unwrap().create_db(100).await.unwrap(); + let connection_pool = test_db.build().await.unwrap(); let mut storage = connection_pool.connection().await.unwrap(); insert_genesis_batch(&mut storage, &GenesisParams::mock()) .await @@ -277,6 +288,8 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { use_node_framework: false, }; let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); + drop(connection_pool); + if opt.components.0.contains(&Component::TreeApi) { config.tree_component.api_port = Some(0); } @@ -293,22 +306,21 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); - let l2_client = Box::new(l2_client); let diamond_proxy_addr = config.remote.diamond_proxy_addr; - let eth_client = Box::new(mock_eth_client(diamond_proxy_addr)); + let eth_client = mock_eth_client(diamond_proxy_addr); let (env, env_handles) = TestEnvironment::new(); - let mut node_handle = tokio::spawn(async move { - run_node( - env, - &opt, - &config, - connection_pool, - singleton_pool_builder, - l2_client, - eth_client, - ) - .await + let mut node_handle = tokio::task::spawn_blocking(move || { + std::thread::spawn(move || { + let mut node = ExternalNodeBuilder::new(config); + inject_test_layers(&mut node, env, eth_client, l2_client); + + let node = node.build(opt.components.0.into_iter().collect())?; + node.run()?; + anyhow::Ok(()) + }) + .join() + .unwrap() }); // Check that the node doesn't stop on its own. diff --git a/core/lib/db_connection/src/connection_pool.rs b/core/lib/db_connection/src/connection_pool.rs index 78d9184222dc..c6e15e26b15b 100644 --- a/core/lib/db_connection/src/connection_pool.rs +++ b/core/lib/db_connection/src/connection_pool.rs @@ -48,6 +48,11 @@ impl fmt::Debug for ConnectionPoolBuilder { } impl ConnectionPoolBuilder { + /// Returns the URL for the database. + pub fn database_url(&self) -> &SensitiveUrl { + &self.database_url + } + /// Overrides the maximum number of connections that can be allocated by the pool. pub fn set_max_size(&mut self, max_size: u32) -> &mut Self { self.max_size = max_size; diff --git a/core/lib/db_connection/src/healthcheck.rs b/core/lib/db_connection/src/healthcheck.rs deleted file mode 100644 index 81be78a64f1d..000000000000 --- a/core/lib/db_connection/src/healthcheck.rs +++ /dev/null @@ -1,58 +0,0 @@ -use serde::Serialize; -use zksync_health_check::{async_trait, CheckHealth, Health, HealthStatus}; - -use crate::{connection::DbMarker, connection_pool::ConnectionPool}; - -#[derive(Debug, Serialize)] -struct ConnectionPoolHealthDetails { - pool_size: u32, - max_size: u32, -} - -impl ConnectionPoolHealthDetails { - fn new(pool: &ConnectionPool) -> Self { - Self { - pool_size: pool.inner.size(), - max_size: pool.max_size(), - } - } -} - -// HealthCheck used to verify if we can connect to the database. -// This guarantees that the app can use it's main "communication" channel. -// Used in the /health endpoint -#[derive(Clone, Debug)] -pub struct ConnectionPoolHealthCheck { - connection_pool: ConnectionPool, -} - -impl ConnectionPoolHealthCheck { - pub fn new(connection_pool: ConnectionPool) -> ConnectionPoolHealthCheck { - Self { connection_pool } - } -} - -#[async_trait] -impl CheckHealth for ConnectionPoolHealthCheck { - fn name(&self) -> &'static str { - "connection_pool" - } - - async fn check_health(&self) -> Health { - // This check is rather feeble, plan to make reliable here: - // https://linear.app/matterlabs/issue/PLA-255/revamp-db-connection-health-check - match self.connection_pool.connection().await { - Ok(_) => { - let details = ConnectionPoolHealthDetails::new(&self.connection_pool); - Health::from(HealthStatus::Ready).with_details(details) - } - Err(err) => { - tracing::warn!("Failed acquiring DB connection for health check: {err:?}"); - let details = serde_json::json!({ - "error": format!("{err:?}"), - }); - Health::from(HealthStatus::NotReady).with_details(details) - } - } - } -} diff --git a/core/lib/db_connection/src/lib.rs b/core/lib/db_connection/src/lib.rs index 649af477e636..908a310c72ba 100644 --- a/core/lib/db_connection/src/lib.rs +++ b/core/lib/db_connection/src/lib.rs @@ -3,7 +3,6 @@ pub mod connection; pub mod connection_pool; pub mod error; -pub mod healthcheck; pub mod instrument; pub mod metrics; #[macro_use] diff --git a/core/node/node_framework/src/implementations/layers/pools_layer.rs b/core/node/node_framework/src/implementations/layers/pools_layer.rs index 54ebdcb2fa9c..734f6f0ccf69 100644 --- a/core/node/node_framework/src/implementations/layers/pools_layer.rs +++ b/core/node/node_framework/src/implementations/layers/pools_layer.rs @@ -1,16 +1,10 @@ -use std::sync::Arc; - use zksync_config::configs::{DatabaseSecrets, PostgresConfig}; use zksync_dal::{ConnectionPool, Core}; -use zksync_db_connection::healthcheck::ConnectionPoolHealthCheck; use crate::{ - implementations::resources::{ - healthcheck::AppHealthCheckResource, - pools::{MasterPool, PoolResource, ProverPool, ReplicaPool}, - }, + implementations::resources::pools::{MasterPool, PoolResource, ProverPool, ReplicaPool}, wiring_layer::{WiringError, WiringLayer}, - FromContext, IntoContext, + IntoContext, }; /// Builder for the [`PoolsLayer`]. @@ -69,10 +63,6 @@ impl PoolsLayerBuilder { /// Wiring layer for connection pools. /// During wiring, also prepares the global configuration for the connection pools. /// -/// ## Requests resources -/// -/// - `AppHealthCheckResource` (adds a health check) -/// /// ## Adds resources /// /// - `PoolResource::` (if master pool is enabled) @@ -87,13 +77,6 @@ pub struct PoolsLayer { with_prover: bool, } -#[derive(Debug, FromContext)] -#[context(crate = crate)] -pub struct Input { - #[context(default)] - pub app_health: AppHealthCheckResource, -} - #[derive(Debug, IntoContext)] #[context(crate = crate)] pub struct Output { @@ -104,14 +87,14 @@ pub struct Output { #[async_trait::async_trait] impl WiringLayer for PoolsLayer { - type Input = Input; + type Input = (); type Output = Output; fn layer_name(&self) -> &'static str { "pools_layer" } - async fn wire(self, input: Self::Input) -> Result { + async fn wire(self, _input: Self::Input) -> Result { if !self.with_master && !self.with_replica && !self.with_prover { return Err(WiringError::Configuration( "At least one pool should be enabled".to_string(), @@ -165,21 +148,6 @@ impl WiringLayer for PoolsLayer { None }; - // Insert health checks for the core pool. - // Replica pool is preferred here. - let healthcheck_pool = match (&replica_pool, &master_pool) { - (Some(replica), _) => Some(replica.get().await?), - (_, Some(master)) => Some(master.get().await?), - _ => None, - }; - if let Some(pool) = healthcheck_pool { - let db_health_check = ConnectionPoolHealthCheck::new(pool); - let AppHealthCheckResource(app_health) = input.app_health; - app_health - .insert_custom_component(Arc::new(db_health_check)) - .map_err(WiringError::internal)?; - } - Ok(Output { master_pool, replica_pool, From b20460ab0897ae3dac8b776c3a1789086fe1d4a3 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:08:00 +0400 Subject: [PATCH 04/15] Initialize rust metrics --- core/bin/external_node/src/metrics/framework.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/bin/external_node/src/metrics/framework.rs b/core/bin/external_node/src/metrics/framework.rs index f153562ecf23..a5faae04f7cd 100644 --- a/core/bin/external_node/src/metrics/framework.rs +++ b/core/bin/external_node/src/metrics/framework.rs @@ -6,6 +6,7 @@ use zksync_node_framework::{ implementations::resources::pools::{MasterPool, PoolResource}, FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; +use zksync_shared_metrics::rustc::RUST_METRICS; use zksync_types::{L1ChainId, L2ChainId}; #[derive(Debug)] @@ -36,6 +37,7 @@ impl WiringLayer for ExternalNodeMetricsLayer { } async fn wire(self, input: Self::Input) -> Result { + RUST_METRICS.initialize(); EN_METRICS.observe_config(self.l1_chain_id, self.l2_chain_id, self.postgres_pool_size); let pool = input.master_pool.get_singleton().await?; From 8f24c1d18303929dd90f61a582a1296e2fed922d Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:36:52 +0400 Subject: [PATCH 05/15] Refactor tests --- core/bin/external_node/src/tests/framework.rs | 13 +- core/bin/external_node/src/tests/mod.rs | 234 +++--------------- core/bin/external_node/src/tests/utils.rs | 147 +++++++++++ core/node/genesis/src/lib.rs | 1 + 4 files changed, 187 insertions(+), 208 deletions(-) create mode 100644 core/bin/external_node/src/tests/utils.rs diff --git a/core/bin/external_node/src/tests/framework.rs b/core/bin/external_node/src/tests/framework.rs index e3292cc73d7c..09ea2da95c4f 100644 --- a/core/bin/external_node/src/tests/framework.rs +++ b/core/bin/external_node/src/tests/framework.rs @@ -19,20 +19,21 @@ use zksync_node_framework::{ use zksync_types::{L1ChainId, L2ChainId}; use zksync_web3_decl::client::{MockClient, L1, L2}; -use super::{ExternalNodeBuilder, TestEnvironment}; +use super::ExternalNodeBuilder; pub(super) fn inject_test_layers( node: &mut ExternalNodeBuilder, - test_env: TestEnvironment, + sigint_receiver: oneshot::Receiver<()>, + app_health_sender: oneshot::Sender>, l1_client: MockClient, l2_client: MockClient, ) { node.node .add_layer(TestSigintLayer { - receiver: test_env.sigint_receiver.unwrap(), + receiver: sigint_receiver, }) .add_layer(AppHealthHijackLayer { - receiver: test_env.app_health_sender.unwrap(), + sender: app_health_sender, }) .add_layer(MockL1ClientLayer { client: l1_client }) .add_layer(MockL2ClientLayer { client: l2_client }); @@ -85,7 +86,7 @@ impl IntoContext for TestSigintTask { /// Note: It's a separate layer to get access to the app health check, not an override. #[derive(Debug)] struct AppHealthHijackLayer { - receiver: oneshot::Sender>, + sender: oneshot::Sender>, } #[derive(Debug, FromContext)] @@ -104,7 +105,7 @@ impl WiringLayer for AppHealthHijackLayer { } async fn wire(self, input: Self::Input) -> Result { - self.receiver.send(input.app_health_check.0).unwrap(); + self.sender.send(input.app_health_check.0).unwrap(); tracing::error!("Submitted health"); Ok(()) } diff --git a/core/bin/external_node/src/tests/mod.rs b/core/bin/external_node/src/tests/mod.rs index 8a20dc31189c..59a1aa29632f 100644 --- a/core/bin/external_node/src/tests/mod.rs +++ b/core/bin/external_node/src/tests/mod.rs @@ -3,180 +3,27 @@ use assert_matches::assert_matches; use framework::inject_test_layers; use test_casing::test_casing; -use zksync_dal::CoreDal; -use zksync_db_connection::connection_pool::TestTemplate; -use zksync_eth_client::clients::MockEthereum; -use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_types::{ - api, ethabi, fee_model::FeeParams, Address, L1BatchNumber, L2BlockNumber, ProtocolVersionId, - H256, U64, -}; -use zksync_web3_decl::{ - client::{MockClient, L1}, - jsonrpsee::core::ClientError, -}; +use zksync_types::{api, fee_model::FeeParams, Address, L1BatchNumber, U64}; +use zksync_web3_decl::{client::MockClient, jsonrpsee::core::ClientError}; use super::*; mod framework; +mod utils; const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(100); -fn block_details_base(hash: H256) -> api::BlockDetailsBase { - api::BlockDetailsBase { - timestamp: 0, - l1_tx_count: 0, - l2_tx_count: 0, - root_hash: Some(hash), - status: api::BlockStatus::Sealed, - commit_tx_hash: None, - committed_at: None, - prove_tx_hash: None, - proven_at: None, - execute_tx_hash: None, - executed_at: None, - l1_gas_price: 0, - l2_fair_gas_price: 0, - fair_pubdata_price: None, - base_system_contracts_hashes: Default::default(), - } -} - -#[derive(Debug)] -struct TestEnvironment { - sigint_receiver: Option>, - app_health_sender: Option>>, -} - -impl TestEnvironment { - fn new() -> (Self, TestEnvironmentHandles) { - let (sigint_sender, sigint_receiver) = oneshot::channel(); - let (app_health_sender, app_health_receiver) = oneshot::channel(); - let this = Self { - sigint_receiver: Some(sigint_receiver), - app_health_sender: Some(app_health_sender), - }; - let handles = TestEnvironmentHandles { - sigint_sender, - app_health_receiver, - }; - (this, handles) - } -} - -impl NodeEnvironment for TestEnvironment { - fn setup_sigint_handler(&mut self) -> oneshot::Receiver<()> { - self.sigint_receiver - .take() - .expect("requested to setup sigint handler twice") - } - - fn set_app_health(&mut self, health: Arc) { - self.app_health_sender - .take() - .expect("set app health twice") - .send(health) - .ok(); - } -} - -#[derive(Debug)] -struct TestEnvironmentHandles { - sigint_sender: oneshot::Sender<()>, - app_health_receiver: oneshot::Receiver>, -} - -// The returned components have the fully implemented health check life cycle (i.e., signal their shutdown). -fn expected_health_components(components: &ComponentsToRun) -> Vec<&'static str> { - let mut output = vec!["reorg_detector"]; - if components.0.contains(&Component::Core) { - output.extend(["consistency_checker", "commitment_generator"]); - } - if components.0.contains(&Component::Tree) { - output.push("tree"); - } - if components.0.contains(&Component::HttpApi) { - output.push("http_api"); - } - if components.0.contains(&Component::WsApi) { - output.push("ws_api"); - } - output -} - -fn mock_eth_client(diamond_proxy_addr: Address) -> MockClient { - let mock = MockEthereum::builder().with_call_handler(move |call, _| { - tracing::info!("L1 call: {call:?}"); - if call.to == Some(diamond_proxy_addr) { - let packed_semver = ProtocolVersionId::latest().into_packed_semver_with_patch(0); - let call_signature = &call.data.as_ref().unwrap().0[..4]; - let contract = zksync_contracts::hyperchain_contract(); - let pricing_mode_sig = contract - .function("getPubdataPricingMode") - .unwrap() - .short_signature(); - let protocol_version_sig = contract - .function("getProtocolVersion") - .unwrap() - .short_signature(); - match call_signature { - sig if sig == pricing_mode_sig => { - return ethabi::Token::Uint(0.into()); // "rollup" mode encoding - } - sig if sig == protocol_version_sig => return ethabi::Token::Uint(packed_semver), - _ => { /* unknown call; panic below */ } - } - } - panic!("Unexpected L1 call: {call:?}"); - }); - mock.build().into_client() -} - #[test_casing(3, ["all", "core", "api"])] #[tokio::test] #[tracing::instrument] // Add args to the test logs async fn external_node_basics(components_str: &'static str) { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging - let temp_dir = tempfile::TempDir::new().unwrap(); - - // Simplest case to mock: the EN already has a genesis L1 batch / L2 block, and it's the only L1 batch / L2 block - // in the network. - let test_db: ConnectionPoolBuilder = - TestTemplate::empty().unwrap().create_db(100).await.unwrap(); - let connection_pool = test_db.build().await.unwrap(); - // let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone()); - let mut storage = connection_pool.connection().await.unwrap(); - let genesis_params = insert_genesis_batch(&mut storage, &GenesisParams::mock()) - .await - .unwrap(); - let genesis_l2_block = storage - .blocks_dal() - .get_l2_block_header(L2BlockNumber(0)) - .await - .unwrap() - .expect("No genesis L2 block"); - drop(storage); - - let components: ComponentsToRun = components_str.parse().unwrap(); - let expected_health_components = expected_health_components(&components); - let opt = Cli { - enable_consensus: false, - components, - config_path: None, - secrets_path: None, - external_node_config_path: None, - consensus_path: None, - use_node_framework: false, - }; - let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); - drop(connection_pool); - if opt.components.0.contains(&Component::TreeApi) { - config.tree_component.api_port = Some(0); - } + let (env, env_handles) = utils::TestEnvironment::with_genesis_block(components_str).await; - let diamond_proxy_addr = config.remote.diamond_proxy_addr; + let expected_health_components = utils::expected_health_components(&env.components); + let diamond_proxy_addr = env.config.remote.diamond_proxy_addr; let l2_client = MockClient::builder(L2::default()) .method("eth_chainId", || Ok(U64::from(270))) @@ -186,7 +33,7 @@ async fn external_node_basics(components_str: &'static str) { assert_eq!(number, L1BatchNumber(0)); Ok(api::L1BatchDetails { number: L1BatchNumber(0), - base: block_details_base(genesis_params.root_hash), + base: utils::block_details_base(env.genesis_params.root_hash), }) }) .method("eth_blockNumber", || Ok(U64::from(0))) @@ -195,7 +42,7 @@ async fn external_node_basics(components_str: &'static str) { move |number: api::BlockNumber, _with_txs: bool| { assert_eq!(number, api::BlockNumber::Number(0.into())); Ok(api::Block:: { - hash: genesis_l2_block.hash, + hash: env.genesis_l2_block.hash, ..api::Block::default() }) }, @@ -204,15 +51,20 @@ async fn external_node_basics(components_str: &'static str) { .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); // let l2_client = Box::new(l2_client); - let eth_client = mock_eth_client(diamond_proxy_addr); + let eth_client = utils::mock_eth_client(diamond_proxy_addr); - let (env, env_handles) = TestEnvironment::new(); let node_handle = tokio::task::spawn_blocking(move || { std::thread::spawn(move || { - let mut node = ExternalNodeBuilder::new(config); - inject_test_layers(&mut node, env, eth_client, l2_client); - - let node = node.build(opt.components.0.into_iter().collect())?; + let mut node = ExternalNodeBuilder::new(env.config); + inject_test_layers( + &mut node, + env.sigint_receiver, + env.app_health_sender, + eth_client, + l2_client, + ); + + let node = node.build(env.components.0.into_iter().collect())?; node.run()?; anyhow::Ok(()) }) @@ -220,8 +72,6 @@ async fn external_node_basics(components_str: &'static str) { .unwrap() }); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Wait until the node is ready. let app_health = match env_handles.app_health_receiver.await { Ok(app_health) => app_health, @@ -267,32 +117,7 @@ async fn external_node_basics(components_str: &'static str) { #[tokio::test] async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging - let temp_dir = tempfile::TempDir::new().unwrap(); - - let test_db: ConnectionPoolBuilder = - TestTemplate::empty().unwrap().create_db(100).await.unwrap(); - let connection_pool = test_db.build().await.unwrap(); - let mut storage = connection_pool.connection().await.unwrap(); - insert_genesis_batch(&mut storage, &GenesisParams::mock()) - .await - .unwrap(); - drop(storage); - - let opt = Cli { - enable_consensus: false, - components: "core".parse().unwrap(), - config_path: None, - secrets_path: None, - external_node_config_path: None, - consensus_path: None, - use_node_framework: false, - }; - let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); - drop(connection_pool); - - if opt.components.0.contains(&Component::TreeApi) { - config.tree_component.api_port = Some(0); - } + let (env, env_handles) = utils::TestEnvironment::with_genesis_block("core").await; let l2_client = MockClient::builder(L2::default()) .method("eth_chainId", || Ok(U64::from(270))) @@ -306,16 +131,21 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); - let diamond_proxy_addr = config.remote.diamond_proxy_addr; - let eth_client = mock_eth_client(diamond_proxy_addr); + let diamond_proxy_addr = env.config.remote.diamond_proxy_addr; + let eth_client = utils::mock_eth_client(diamond_proxy_addr); - let (env, env_handles) = TestEnvironment::new(); let mut node_handle = tokio::task::spawn_blocking(move || { std::thread::spawn(move || { - let mut node = ExternalNodeBuilder::new(config); - inject_test_layers(&mut node, env, eth_client, l2_client); - - let node = node.build(opt.components.0.into_iter().collect())?; + let mut node = ExternalNodeBuilder::new(env.config); + inject_test_layers( + &mut node, + env.sigint_receiver, + env.app_health_sender, + eth_client, + l2_client, + ); + + let node = node.build(env.components.0.into_iter().collect())?; node.run()?; anyhow::Ok(()) }) diff --git a/core/bin/external_node/src/tests/utils.rs b/core/bin/external_node/src/tests/utils.rs new file mode 100644 index 000000000000..b8e02b4f3d96 --- /dev/null +++ b/core/bin/external_node/src/tests/utils.rs @@ -0,0 +1,147 @@ +use tempfile::TempDir; +use zksync_dal::CoreDal; +use zksync_db_connection::connection_pool::TestTemplate; +use zksync_eth_client::clients::MockEthereum; +use zksync_node_genesis::{insert_genesis_batch, GenesisBatchParams, GenesisParams}; +use zksync_types::{ + api, block::L2BlockHeader, ethabi, Address, L2BlockNumber, ProtocolVersionId, H256, +}; +use zksync_web3_decl::client::{MockClient, L1}; + +use super::*; + +pub(super) fn block_details_base(hash: H256) -> api::BlockDetailsBase { + api::BlockDetailsBase { + timestamp: 0, + l1_tx_count: 0, + l2_tx_count: 0, + root_hash: Some(hash), + status: api::BlockStatus::Sealed, + commit_tx_hash: None, + committed_at: None, + prove_tx_hash: None, + proven_at: None, + execute_tx_hash: None, + executed_at: None, + l1_gas_price: 0, + l2_fair_gas_price: 0, + fair_pubdata_price: None, + base_system_contracts_hashes: Default::default(), + } +} + +#[derive(Debug)] +pub(super) struct TestEnvironment { + pub(super) sigint_receiver: oneshot::Receiver<()>, + pub(super) app_health_sender: oneshot::Sender>, + pub(super) components: ComponentsToRun, + pub(super) config: ExternalNodeConfig, + pub(super) genesis_params: GenesisBatchParams, + pub(super) genesis_l2_block: L2BlockHeader, + // We have to prevent object from dropping the temp dir, so we store it here. + _temp_dir: TempDir, +} + +impl TestEnvironment { + pub async fn with_genesis_block(components_str: &str) -> (Self, TestEnvironmentHandles) { + // Generate a new environment with a genesis block. + let temp_dir = tempfile::TempDir::new().unwrap(); + + // Simplest case to mock: the EN already has a genesis L1 batch / L2 block, and it's the only L1 batch / L2 block + // in the network. + let test_db: ConnectionPoolBuilder = + TestTemplate::empty().unwrap().create_db(100).await.unwrap(); + let connection_pool = test_db.build().await.unwrap(); + // let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone()); + let mut storage = connection_pool.connection().await.unwrap(); + let genesis_params = insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + let genesis_l2_block = storage + .blocks_dal() + .get_l2_block_header(L2BlockNumber(0)) + .await + .unwrap() + .expect("No genesis L2 block"); + drop(storage); + + let components: ComponentsToRun = components_str.parse().unwrap(); + let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); + if components.0.contains(&Component::TreeApi) { + config.tree_component.api_port = Some(0); + } + drop(connection_pool); + + // Generate channels to control the node. + + let (sigint_sender, sigint_receiver) = oneshot::channel(); + let (app_health_sender, app_health_receiver) = oneshot::channel(); + let this = Self { + sigint_receiver, + app_health_sender, + components, + config, + genesis_params, + genesis_l2_block, + _temp_dir: temp_dir, + }; + let handles = TestEnvironmentHandles { + sigint_sender, + app_health_receiver, + }; + + (this, handles) + } +} + +#[derive(Debug)] +pub(super) struct TestEnvironmentHandles { + pub(super) sigint_sender: oneshot::Sender<()>, + pub(super) app_health_receiver: oneshot::Receiver>, +} + +// The returned components have the fully implemented health check life cycle (i.e., signal their shutdown). +pub(super) fn expected_health_components(components: &ComponentsToRun) -> Vec<&'static str> { + let mut output = vec!["reorg_detector"]; + if components.0.contains(&Component::Core) { + output.extend(["consistency_checker", "commitment_generator"]); + } + if components.0.contains(&Component::Tree) { + output.push("tree"); + } + if components.0.contains(&Component::HttpApi) { + output.push("http_api"); + } + if components.0.contains(&Component::WsApi) { + output.push("ws_api"); + } + output +} + +pub(super) fn mock_eth_client(diamond_proxy_addr: Address) -> MockClient { + let mock = MockEthereum::builder().with_call_handler(move |call, _| { + tracing::info!("L1 call: {call:?}"); + if call.to == Some(diamond_proxy_addr) { + let packed_semver = ProtocolVersionId::latest().into_packed_semver_with_patch(0); + let call_signature = &call.data.as_ref().unwrap().0[..4]; + let contract = zksync_contracts::hyperchain_contract(); + let pricing_mode_sig = contract + .function("getPubdataPricingMode") + .unwrap() + .short_signature(); + let protocol_version_sig = contract + .function("getProtocolVersion") + .unwrap() + .short_signature(); + match call_signature { + sig if sig == pricing_mode_sig => { + return ethabi::Token::Uint(0.into()); // "rollup" mode encoding + } + sig if sig == protocol_version_sig => return ethabi::Token::Uint(packed_semver), + _ => { /* unknown call; panic below */ } + } + } + panic!("Unexpected L1 call: {call:?}"); + }); + mock.build().into_client() +} diff --git a/core/node/genesis/src/lib.rs b/core/node/genesis/src/lib.rs index 49762f5000d5..a04153a63fc6 100644 --- a/core/node/genesis/src/lib.rs +++ b/core/node/genesis/src/lib.rs @@ -151,6 +151,7 @@ impl GenesisParams { } } +#[derive(Debug)] pub struct GenesisBatchParams { pub root_hash: H256, pub commitment: H256, From 2b764766c4ae03e2ed5668e5f3908ddd7d76fcee Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:44:28 +0400 Subject: [PATCH 06/15] Add more tests for node builder --- core/bin/external_node/src/tests/mod.rs | 104 ++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/core/bin/external_node/src/tests/mod.rs b/core/bin/external_node/src/tests/mod.rs index 59a1aa29632f..bbfe36e27825 100644 --- a/core/bin/external_node/src/tests/mod.rs +++ b/core/bin/external_node/src/tests/mod.rs @@ -23,8 +23,6 @@ async fn external_node_basics(components_str: &'static str) { let (env, env_handles) = utils::TestEnvironment::with_genesis_block(components_str).await; let expected_health_components = utils::expected_health_components(&env.components); - let diamond_proxy_addr = env.config.remote.diamond_proxy_addr; - let l2_client = MockClient::builder(L2::default()) .method("eth_chainId", || Ok(U64::from(270))) .method("zks_L1ChainId", || Ok(U64::from(9))) @@ -50,8 +48,7 @@ async fn external_node_basics(components_str: &'static str) { .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); - // let l2_client = Box::new(l2_client); - let eth_client = utils::mock_eth_client(diamond_proxy_addr); + let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let node_handle = tokio::task::spawn_blocking(move || { std::thread::spawn(move || { @@ -131,8 +128,7 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) .build(); - let diamond_proxy_addr = env.config.remote.diamond_proxy_addr; - let eth_client = utils::mock_eth_client(diamond_proxy_addr); + let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let mut node_handle = tokio::task::spawn_blocking(move || { std::thread::spawn(move || { @@ -161,3 +157,99 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { env_handles.sigint_sender.send(()).unwrap(); node_handle.await.unwrap().unwrap(); } + +#[tokio::test] +async fn running_tree_without_core_is_not_allowed() { + let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging + let (env, _env_handles) = utils::TestEnvironment::with_genesis_block("tree").await; + + let l2_client = MockClient::builder(L2::default()) + .method("eth_chainId", || Ok(U64::from(270))) + .method("zks_L1ChainId", || Ok(U64::from(9))) + .method("zks_L1BatchNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("eth_blockNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) + .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) + .build(); + let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); + + let node_handle = tokio::task::spawn_blocking(move || { + std::thread::spawn(move || { + let mut node = ExternalNodeBuilder::new(env.config); + inject_test_layers( + &mut node, + env.sigint_receiver, + env.app_health_sender, + eth_client, + l2_client, + ); + + // We're only interested in the error, so we drop the result. + node.build(env.components.0.into_iter().collect()).map(drop) + }) + .join() + .unwrap() + }); + + // Check that we cannot build the node without the core component. + let result = node_handle.await.expect("Building the node panicked"); + let err = result.expect_err("Building the node with tree but without core should fail"); + assert!( + err.to_string() + .contains("Tree must run on the same machine as Core"), + "Unexpected errror: {}", + err + ); +} + +#[tokio::test] +async fn running_tree_api_without_tree_is_not_allowed() { + let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging + let (env, _env_handles) = utils::TestEnvironment::with_genesis_block("core,tree_api").await; + + let l2_client = MockClient::builder(L2::default()) + .method("eth_chainId", || Ok(U64::from(270))) + .method("zks_L1ChainId", || Ok(U64::from(9))) + .method("zks_L1BatchNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("eth_blockNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) + .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) + .build(); + let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); + + let node_handle = tokio::task::spawn_blocking(move || { + std::thread::spawn(move || { + let mut node = ExternalNodeBuilder::new(env.config); + inject_test_layers( + &mut node, + env.sigint_receiver, + env.app_health_sender, + eth_client, + l2_client, + ); + + // We're only interested in the error, so we drop the result. + node.build(env.components.0.into_iter().collect()).map(drop) + }) + .join() + .unwrap() + }); + + // Check that we cannot build the node without the core component. + let result = node_handle.await.expect("Building the node panicked"); + let err = result.expect_err("Building the node with tree api but without tree should fail"); + assert!( + err.to_string() + .contains("Merkle tree API cannot be started without a tree component"), + "Unexpected errror: {}", + err + ); +} From 806cb404791f619b5a2f2dfdd5b1f87c8c7e17aa Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:46:32 +0400 Subject: [PATCH 07/15] Add a way to still run the EN old way --- core/bin/external_node/src/main.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 7d0885de338c..634699080bbd 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -723,10 +723,6 @@ struct Cli { external_node_config_path: Option, /// Path to the yaml with consensus. consensus_path: Option, - - /// Run the node using the node framework. - #[arg(long)] - use_node_framework: bool, } #[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] @@ -823,8 +819,11 @@ async fn main() -> anyhow::Result<()> { .await .context("failed fetching remote part of node config from main node")?; + // Can be used to force the old approach to the external node. + let force_old_approach = std::env::var("EXTERNAL_NODE_OLD_APPROACH").is_ok(); + // If the node framework is used, run the node. - if opt.use_node_framework { + if !force_old_approach { // We run the node from a different thread, since the current thread is in tokio context. std::thread::spawn(move || { let node = @@ -838,6 +837,8 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } + tracing::info!("Running the external node in the old approach"); + if let Some(threshold) = config.optional.slow_query_threshold() { ConnectionPool::::global_config().set_slow_query_threshold(threshold)?; } From e476b5de6c5a03e1834cc4ff266b1a06c9338b4a Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:47:05 +0400 Subject: [PATCH 08/15] zk fmt --- core/bin/external_node/src/metrics/framework.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/bin/external_node/src/metrics/framework.rs b/core/bin/external_node/src/metrics/framework.rs index a5faae04f7cd..82f9263e44db 100644 --- a/core/bin/external_node/src/metrics/framework.rs +++ b/core/bin/external_node/src/metrics/framework.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use super::EN_METRICS; use zksync_dal::{ConnectionPool, Core, CoreDal as _}; use zksync_node_framework::{ implementations::resources::pools::{MasterPool, PoolResource}, @@ -9,6 +8,8 @@ use zksync_node_framework::{ use zksync_shared_metrics::rustc::RUST_METRICS; use zksync_types::{L1ChainId, L2ChainId}; +use super::EN_METRICS; + #[derive(Debug)] pub struct ExternalNodeMetricsLayer { pub l1_chain_id: L1ChainId, From ab2067da020498adff88726da27b604186f6adc8 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 09:53:12 +0400 Subject: [PATCH 09/15] Remove development artifact --- core/lib/db_connection/src/connection_pool.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/lib/db_connection/src/connection_pool.rs b/core/lib/db_connection/src/connection_pool.rs index c6e15e26b15b..78d9184222dc 100644 --- a/core/lib/db_connection/src/connection_pool.rs +++ b/core/lib/db_connection/src/connection_pool.rs @@ -48,11 +48,6 @@ impl fmt::Debug for ConnectionPoolBuilder { } impl ConnectionPoolBuilder { - /// Returns the URL for the database. - pub fn database_url(&self) -> &SensitiveUrl { - &self.database_url - } - /// Overrides the maximum number of connections that can be allocated by the pool. pub fn set_max_size(&mut self, max_size: u32) -> &mut Self { self.max_size = max_size; From 8b199ed52337103aeca66863c37c1ab75557511b Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 12:35:30 +0400 Subject: [PATCH 10/15] Initialize L1BatchParamsProvider explicitly --- core/bin/external_node/src/main.rs | 1 - core/lib/vm_utils/src/lib.rs | 5 +-- core/lib/vm_utils/src/storage.rs | 33 ++++++++++++++++--- core/node/consensus/src/testonly.rs | 6 ++-- .../layers/state_keeper/external_io.rs | 1 - .../layers/state_keeper/mempool_io.rs | 3 +- core/node/node_sync/src/external_io.rs | 13 ++++---- core/node/node_sync/src/tests.rs | 1 - core/node/state_keeper/src/io/common/tests.rs | 24 +++++++++----- core/node/state_keeper/src/io/mempool.rs | 14 ++++---- core/node/state_keeper/src/io/tests/tester.rs | 1 - core/node/state_keeper/src/lib.rs | 1 - .../tee_verifier_input_producer/src/lib.rs | 4 ++- core/node/vm_runner/src/storage.rs | 8 +++-- 14 files changed, 71 insertions(+), 44 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 634699080bbd..55b2133250ac 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -103,7 +103,6 @@ async fn build_state_keeper( Box::new(main_node_client.for_component("external_io")), chain_id, ) - .await .context("Failed initializing I/O for external node state keeper")?; Ok(ZkSyncStateKeeper::new( diff --git a/core/lib/vm_utils/src/lib.rs b/core/lib/vm_utils/src/lib.rs index d3f294afd9e7..9cec0e13be8b 100644 --- a/core/lib/vm_utils/src/lib.rs +++ b/core/lib/vm_utils/src/lib.rs @@ -24,8 +24,9 @@ pub fn create_vm( mut connection: Connection<'_, Core>, l2_chain_id: L2ChainId, ) -> anyhow::Result { - let l1_batch_params_provider = rt_handle - .block_on(L1BatchParamsProvider::new(&mut connection)) + let mut l1_batch_params_provider = L1BatchParamsProvider::new(); + rt_handle + .block_on(l1_batch_params_provider.initialize(&mut connection)) .context("failed initializing L1 batch params provider")?; let first_l2_block_in_batch = rt_handle .block_on( diff --git a/core/lib/vm_utils/src/storage.rs b/core/lib/vm_utils/src/storage.rs index 6eeaf92b718e..fbf52a67623d 100644 --- a/core/lib/vm_utils/src/storage.rs +++ b/core/lib/vm_utils/src/storage.rs @@ -83,18 +83,41 @@ pub fn l1_batch_params( /// Provider of L1 batch parameters for state keeper I/O implementations. The provider is stateless; i.e., it doesn't /// enforce a particular order of method calls. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct L1BatchParamsProvider { snapshot: Option, } impl L1BatchParamsProvider { - pub async fn new(storage: &mut Connection<'_, Core>) -> anyhow::Result { - let snapshot = storage + pub fn new() -> Self { + Self { snapshot: None } + } + + /// Performs the provider initialization. Must only be called with the initialized storage (e.g. + /// either after genesis or snapshot recovery). + pub async fn initialize(&mut self, storage: &mut Connection<'_, Core>) -> anyhow::Result<()> { + if storage + .blocks_dal() + .get_earliest_l1_batch_number() + .await? + .is_some() + { + // We have batches in the storage, no need for special treatment. + return Ok(()); + } + + let Some(snapshot) = storage .snapshot_recovery_dal() .get_applied_snapshot_status() - .await?; - Ok(Self { snapshot }) + .await + .context("failed getting snapshot recovery status")? + else { + anyhow::bail!( + "Storage is not initialized, it doesn't have batches or snapshot recovery status" + ) + }; + self.snapshot = Some(snapshot); + Ok(()) } /// Returns state root hash and timestamp of an L1 batch with the specified number waiting for the hash to be computed diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 7ca518a183a7..6c0c48a4ca22 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -493,8 +493,7 @@ impl StateKeeperRunner { self.actions_queue, Box::::default(), L2ChainId::default(), - ) - .await?; + )?; s.spawn_bg(async { Ok(l2_block_sealer @@ -606,8 +605,7 @@ impl StateKeeperRunner { self.actions_queue, Box::::default(), L2ChainId::default(), - ) - .await?; + )?; s.spawn_bg(async { Ok(l2_block_sealer .run() diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs b/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs index ba7e87dcca74..31b76550767c 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs @@ -69,7 +69,6 @@ impl WiringLayer for ExternalIOLayer { Box::new(input.main_node_client.0.for_component("external_io")), self.chain_id, ) - .await .context("Failed initializing I/O for external node state keeper")?; // Create sealer. diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs index cfab1f186438..6be6544ee3df 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs @@ -129,8 +129,7 @@ impl WiringLayer for MempoolIOLayer { self.wallets.fee_account.address(), self.mempool_config.delay_interval(), self.zksync_network_id, - ) - .await?; + )?; // Create sealer. let sealer = SequencerSealer::new(self.state_keeper_config); diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 8ad143861450..50734421341e 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -43,18 +43,13 @@ pub struct ExternalIO { } impl ExternalIO { - pub async fn new( + pub fn new( pool: ConnectionPool, actions: ActionQueue, main_node_client: Box, chain_id: L2ChainId, ) -> anyhow::Result { - let mut storage = pool.connection_tagged("sync_layer").await?; - let l1_batch_params_provider = L1BatchParamsProvider::new(&mut storage) - .await - .context("failed initializing L1 batch params provider")?; - drop(storage); - + let l1_batch_params_provider = L1BatchParamsProvider::new(); Ok(Self { pool, l1_batch_params_provider, @@ -137,6 +132,10 @@ impl StateKeeperIO for ExternalIO { async fn initialize(&mut self) -> anyhow::Result<(IoCursor, Option)> { let mut storage = self.pool.connection_tagged("sync_layer").await?; let cursor = IoCursor::new(&mut storage).await?; + self.l1_batch_params_provider + .initialize(&mut storage) + .await + .context("failed initializing L1 batch params provider")?; tracing::info!( "Initialized the ExternalIO: current L1 batch number {}, current L2 block number {}", cursor.l1_batch, diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 7c57e04a3404..510f9124c297 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -118,7 +118,6 @@ impl StateKeeperHandles { Box::new(main_node_client), L2ChainId::default(), ) - .await .unwrap(); let (stop_sender, stop_receiver) = watch::channel(false); diff --git a/core/node/state_keeper/src/io/common/tests.rs b/core/node/state_keeper/src/io/common/tests.rs index 7e6fbdc795ab..f3b3f6e0fb4b 100644 --- a/core/node/state_keeper/src/io/common/tests.rs +++ b/core/node/state_keeper/src/io/common/tests.rs @@ -102,7 +102,8 @@ async fn waiting_for_l1_batch_params_with_genesis() { .await .unwrap(); - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let (hash, timestamp) = provider .wait_for_l1_batch_params(&mut storage, L1BatchNumber(0)) .await @@ -141,7 +142,8 @@ async fn waiting_for_l1_batch_params_after_snapshot_recovery() { let snapshot_recovery = prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await; - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let (hash, timestamp) = provider .wait_for_l1_batch_params(&mut storage, snapshot_recovery.l1_batch_number) .await @@ -189,7 +191,8 @@ async fn getting_first_l2_block_in_batch_with_genesis() { .await .unwrap(); - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let mut batches_and_l2_blocks = HashMap::from([ (L1BatchNumber(0), Ok(Some(L2BlockNumber(0)))), (L1BatchNumber(1), Ok(Some(L2BlockNumber(1)))), @@ -260,7 +263,8 @@ async fn getting_first_l2_block_in_batch_after_snapshot_recovery() { let snapshot_recovery = prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await; - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let mut batches_and_l2_blocks = HashMap::from([ (L1BatchNumber(1), Err(())), (snapshot_recovery.l1_batch_number, Err(())), @@ -316,7 +320,8 @@ async fn loading_pending_batch_with_genesis() { ) .await; - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let first_l2_block_in_batch = provider .load_first_l2_block_in_batch(&mut storage, L1BatchNumber(1)) .await @@ -397,7 +402,8 @@ async fn loading_pending_batch_after_snapshot_recovery() { ) .await; - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let first_l2_block_in_batch = provider .load_first_l2_block_in_batch(&mut storage, snapshot_recovery.l1_batch_number + 1) .await @@ -459,7 +465,8 @@ async fn getting_batch_version_with_genesis() { .await .unwrap(); - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let version = provider .load_l1_batch_protocol_version(&mut storage, L1BatchNumber(0)) .await @@ -498,7 +505,8 @@ async fn getting_batch_version_after_snapshot_recovery() { let snapshot_recovery = prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await; - let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap(); + let mut provider = L1BatchParamsProvider::new(); + provider.initialize(&mut storage).await.unwrap(); let version = provider .load_l1_batch_protocol_version(&mut storage, snapshot_recovery.l1_batch_number) .await diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index a35b8e031e2b..c3d8dc1dee4d 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -90,6 +90,10 @@ impl StateKeeperIO for MempoolIO { async fn initialize(&mut self) -> anyhow::Result<(IoCursor, Option)> { let mut storage = self.pool.connection_tagged("state_keeper").await?; let cursor = IoCursor::new(&mut storage).await?; + self.l1_batch_params_provider + .initialize(&mut storage) + .await + .context("failed initializing L1 batch params provider")?; L2BlockSealProcess::clear_pending_l2_block(&mut storage, cursor.next_l2_block - 1).await?; @@ -416,7 +420,7 @@ async fn sleep_past(timestamp: u64, l2_block: L2BlockNumber) -> u64 { } impl MempoolIO { - pub async fn new( + pub fn new( mempool: MempoolGuard, batch_fee_input_provider: Arc, pool: ConnectionPool, @@ -425,12 +429,6 @@ impl MempoolIO { delay_interval: Duration, chain_id: L2ChainId, ) -> anyhow::Result { - let mut storage = pool.connection_tagged("state_keeper").await?; - let l1_batch_params_provider = L1BatchParamsProvider::new(&mut storage) - .await - .context("failed initializing L1 batch params provider")?; - drop(storage); - Ok(Self { mempool, pool, @@ -438,7 +436,7 @@ impl MempoolIO { l2_block_max_payload_size_sealer: L2BlockMaxPayloadSizeSealer::new(config), filter: L2TxFilter::default(), // ^ Will be initialized properly on the first newly opened batch - l1_batch_params_provider, + l1_batch_params_provider: L1BatchParamsProvider::new(), fee_account, validation_computational_gas_limit: config.validation_computational_gas_limit, max_allowed_tx_gas_limit: config.max_allowed_l2_tx_gas_limit.into(), diff --git a/core/node/state_keeper/src/io/tests/tester.rs b/core/node/state_keeper/src/io/tests/tester.rs index c056191736f2..28fcbd51822e 100644 --- a/core/node/state_keeper/src/io/tests/tester.rs +++ b/core/node/state_keeper/src/io/tests/tester.rs @@ -129,7 +129,6 @@ impl Tester { Duration::from_secs(1), L2ChainId::from(270), ) - .await .unwrap(); (io, mempool) diff --git a/core/node/state_keeper/src/lib.rs b/core/node/state_keeper/src/lib.rs index 4920e2514b0a..1c12f7825486 100644 --- a/core/node/state_keeper/src/lib.rs +++ b/core/node/state_keeper/src/lib.rs @@ -63,7 +63,6 @@ pub async fn create_state_keeper( mempool_config.delay_interval(), l2chain_id, ) - .await .expect("Failed initializing main node I/O for state keeper"); let sealer = SequencerSealer::new(state_keeper_config); diff --git a/core/node/tee_verifier_input_producer/src/lib.rs b/core/node/tee_verifier_input_producer/src/lib.rs index 501681346bac..0cd28ee5ce79 100644 --- a/core/node/tee_verifier_input_producer/src/lib.rs +++ b/core/node/tee_verifier_input_producer/src/lib.rs @@ -77,7 +77,9 @@ impl TeeVerifierInputProducer { .with_context(|| format!("header is missing for L1 batch #{l1_batch_number}"))? .unwrap(); - let l1_batch_params_provider = L1BatchParamsProvider::new(&mut connection) + let mut l1_batch_params_provider = L1BatchParamsProvider::new(); + l1_batch_params_provider + .initialize(&mut connection) .await .context("failed initializing L1 batch params provider")?; diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index a7a4c6c18a6d..f3e304d7d4ff 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -101,7 +101,9 @@ impl VmRunnerStorage { chain_id: L2ChainId, ) -> anyhow::Result<(Self, StorageSyncTask)> { let mut conn = pool.connection_tagged(io.name()).await?; - let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) + let mut l1_batch_params_provider = L1BatchParamsProvider::new(); + l1_batch_params_provider + .initialize(&mut conn) .await .context("Failed initializing L1 batch params provider")?; drop(conn); @@ -246,7 +248,9 @@ impl StorageSyncTask { state: Arc>, ) -> anyhow::Result { let mut conn = pool.connection_tagged(io.name()).await?; - let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) + let mut l1_batch_params_provider = L1BatchParamsProvider::new(); + l1_batch_params_provider + .initialize(&mut conn) .await .context("Failed initializing L1 batch params provider")?; let target_l1_batch_number = io.latest_processed_batch(&mut conn).await?; From a00ed86af985675dcaf1b2e9129c5c193c0ed714 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 13:26:58 +0400 Subject: [PATCH 11/15] Rework eth_syncing integration test --- core/tests/ts-integration/tests/api/web3.test.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/tests/ts-integration/tests/api/web3.test.ts b/core/tests/ts-integration/tests/api/web3.test.ts index a538eb3a6dff..c660c31ad986 100644 --- a/core/tests/ts-integration/tests/api/web3.test.ts +++ b/core/tests/ts-integration/tests/api/web3.test.ts @@ -148,13 +148,26 @@ describe('web3 API compatibility tests', () => { await expect(alice.provider.send('net_version', [])).resolves.toMatch(chainId.toString()); }); + test('Should check the syncing status', async () => { + // We can't know whether the node is synced, so we just check the validity of the response. + const response = await alice.provider.send('eth_syncing', []); + // Sync status is either `false` or an object with the following fields. + if (response !== false) { + const expectedObject = { + currentBlock: expect.stringMatching(HEX_VALUE_REGEX), + highestBlock: expect.stringMatching(HEX_VALUE_REGEX), + startingBlock: expect.stringMatching(HEX_VALUE_REGEX), + } + expect(response).toMatchObject(expectedObject); + } + }) + // @ts-ignore test.each([ ['net_peerCount', [], '0x0'], ['net_listening', [], false], ['web3_clientVersion', [], 'zkSync/v2.0'], ['eth_protocolVersion', [], 'zks/1'], - ['eth_syncing', [], false], ['eth_accounts', [], []], ['eth_coinbase', [], '0x0000000000000000000000000000000000000000'], ['eth_getCompilers', [], []], From c0440d8ae427c8669187894683cef54c3242561d Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 13:27:07 +0400 Subject: [PATCH 12/15] Rework eth_syncing integration test --- core/tests/ts-integration/tests/api/web3.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/tests/ts-integration/tests/api/web3.test.ts b/core/tests/ts-integration/tests/api/web3.test.ts index c660c31ad986..9b334488fcb8 100644 --- a/core/tests/ts-integration/tests/api/web3.test.ts +++ b/core/tests/ts-integration/tests/api/web3.test.ts @@ -149,18 +149,18 @@ describe('web3 API compatibility tests', () => { }); test('Should check the syncing status', async () => { - // We can't know whether the node is synced, so we just check the validity of the response. + // We can't know whether the node is synced (in EN case), so we just check the validity of the response. const response = await alice.provider.send('eth_syncing', []); // Sync status is either `false` or an object with the following fields. if (response !== false) { const expectedObject = { currentBlock: expect.stringMatching(HEX_VALUE_REGEX), highestBlock: expect.stringMatching(HEX_VALUE_REGEX), - startingBlock: expect.stringMatching(HEX_VALUE_REGEX), - } + startingBlock: expect.stringMatching(HEX_VALUE_REGEX) + }; expect(response).toMatchObject(expectedObject); } - }) + }); // @ts-ignore test.each([ From 7098e02b507627202dc44918b13e10423c16af0c Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 14:18:33 +0400 Subject: [PATCH 13/15] Add block reverter layer --- core/bin/external_node/src/node_builder.rs | 40 +++++--- .../implementations/layers/block_reverter.rs | 95 +++++++++++++++++++ .../src/implementations/layers/mod.rs | 1 + .../external_node_strategy.rs | 13 ++- .../src/implementations/resources/reverter.rs | 6 ++ .../src/external_node/revert.rs | 10 +- 6 files changed, 145 insertions(+), 20 deletions(-) create mode 100644 core/node/node_framework/src/implementations/layers/block_reverter.rs diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index d66dbcd23f46..ff851999f623 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -2,6 +2,7 @@ //! as well as an interface to run the node with the specified components. use anyhow::Context as _; +use zksync_block_reverter::NodeRole; use zksync_config::{ configs::{ api::{HealthCheckConfig, MerkleTreeApiConfig}, @@ -15,6 +16,7 @@ use zksync_node_api_server::{tx_sender::ApiContracts, web3::Namespace}; use zksync_node_framework::{ implementations::layers::{ batch_status_updater::BatchStatusUpdaterLayer, + block_reverter::BlockReverterLayer, commitment_generator::CommitmentGeneratorLayer, consensus::ExternalNodeConsensusLayer, consistency_checker::ConsistencyCheckerLayer, @@ -441,6 +443,18 @@ impl ExternalNodeBuilder { Ok(self) } + fn add_block_reverter_layer(mut self) -> anyhow::Result { + let mut layer = BlockReverterLayer::new(NodeRole::External); + // Reverting executed batches is more-or-less safe for external nodes. + layer + .allow_rolling_back_executed_batches() + .enable_rolling_back_postgres() + .enable_rolling_back_merkle_tree(self.config.required.merkle_tree_path.clone()) + .enable_rolling_back_state_keeper_cache(self.config.required.state_cache_path.clone()); + self.node.add_layer(layer); + Ok(self) + } + /// This layer will make sure that the database is initialized correctly, /// e.g.: /// - genesis or snapshot recovery will be performed if it's required. @@ -490,6 +504,21 @@ impl ExternalNodeBuilder { .add_query_eth_client_layer()? .add_reorg_detector_layer()?; + // Add layers that must run only on a single component. + if components.contains(&Component::Core) { + // Core is a singleton & mandatory component, + // so until we have a dedicated component for "auxiliary" tasks, + // it's responsible for things like metrics. + self = self + .add_postgres_metrics_layer()? + .add_external_node_metrics_layer()?; + // We assign the storage initialization to the core, as it's considered to be + // the "main" component. + self = self + .add_block_reverter_layer()? + .add_storage_initialization_layer(LayerKind::Task)?; + } + // Add preconditions for all the components. self = self .add_l1_batch_commitment_mode_validation_layer()? @@ -546,13 +575,6 @@ impl ExternalNodeBuilder { self = self.add_tree_data_fetcher_layer()?; } Component::Core => { - // Core is a singleton & mandatory component, - // so until we have a dedicated component for "auxiliary" tasks, - // it's responsible for things like metrics. - self = self - .add_postgres_metrics_layer()? - .add_external_node_metrics_layer()?; - // Main tasks self = self .add_state_keeper_layer()? @@ -561,10 +583,6 @@ impl ExternalNodeBuilder { .add_consistency_checker_layer()? .add_commitment_generator_layer()? .add_batch_status_updater_layer()?; - - // We assign the storage initialization to the core, as it's considered to be - // the "main" component. - self = self.add_storage_initialization_layer(LayerKind::Task)?; } } } diff --git a/core/node/node_framework/src/implementations/layers/block_reverter.rs b/core/node/node_framework/src/implementations/layers/block_reverter.rs new file mode 100644 index 000000000000..4cfe4212e4d8 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/block_reverter.rs @@ -0,0 +1,95 @@ +use zksync_block_reverter::{BlockReverter, NodeRole}; + +use crate::{ + implementations::resources::{ + pools::{MasterPool, PoolResource}, + reverter::BlockReverterResource, + }, + FromContext, IntoContext, WiringError, WiringLayer, +}; + +/// Layer for the block reverter resource. +/// For documentation on the methods see the corresponding methods in [`BlockReverter`]. +#[derive(Debug)] +pub struct BlockReverterLayer { + node_role: NodeRole, + allow_rolling_back_executed_batches: bool, + should_roll_back_postgres: bool, + state_keeper_cache_path: Option, + merkle_tree_path: Option, +} + +impl BlockReverterLayer { + pub fn new(node_role: NodeRole) -> Self { + Self { + node_role, + allow_rolling_back_executed_batches: false, + should_roll_back_postgres: false, + state_keeper_cache_path: None, + merkle_tree_path: None, + } + } + + pub fn allow_rolling_back_executed_batches(&mut self) -> &mut Self { + self.allow_rolling_back_executed_batches = true; + self + } + + pub fn enable_rolling_back_postgres(&mut self) -> &mut Self { + self.should_roll_back_postgres = true; + self + } + + pub fn enable_rolling_back_merkle_tree(&mut self, path: String) -> &mut Self { + self.merkle_tree_path = Some(path); + self + } + + pub fn enable_rolling_back_state_keeper_cache(&mut self, path: String) -> &mut Self { + self.state_keeper_cache_path = Some(path); + self + } +} + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub master_pool: PoolResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + pub block_reverter: BlockReverterResource, +} + +#[async_trait::async_trait] +impl WiringLayer for BlockReverterLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "block_reverter_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let pool = input.master_pool.get().await?; + let mut block_reverter = BlockReverter::new(self.node_role, pool); + if self.allow_rolling_back_executed_batches { + block_reverter.allow_rolling_back_executed_batches(); + } + if self.should_roll_back_postgres { + block_reverter.enable_rolling_back_postgres(); + } + if let Some(path) = self.merkle_tree_path { + block_reverter.enable_rolling_back_merkle_tree(path); + } + if let Some(path) = self.state_keeper_cache_path { + block_reverter.enable_rolling_back_state_keeper_cache(path); + } + + Ok(Output { + block_reverter: block_reverter.into(), + }) + } +} diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 4d2be9b11367..55bc0a40ca73 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -1,5 +1,6 @@ pub mod base_token; pub mod batch_status_updater; +pub mod block_reverter; pub mod circuit_breaker_checker; pub mod commitment_generator; pub mod consensus; diff --git a/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs index 0358d30a3133..0b98d0e2b556 100644 --- a/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs +++ b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs @@ -81,13 +81,12 @@ impl WiringLayer for ExternalNodeInitStrategyLayer { app_health, }) as Arc }); - let block_reverter = block_reverter.map(|block_reverter| { - Arc::new(ExternalNodeReverter { - client, - pool: pool.clone(), - reverter: block_reverter, - }) as Arc - }); + // We always want to detect reorgs, even if we can't roll them back. + let block_reverter = Some(Arc::new(ExternalNodeReverter { + client, + pool: pool.clone(), + reverter: block_reverter, + }) as Arc); let strategy = NodeInitializationStrategy { genesis, snapshot_recovery, diff --git a/core/node/node_framework/src/implementations/resources/reverter.rs b/core/node/node_framework/src/implementations/resources/reverter.rs index 8a453b71659b..9186c727800f 100644 --- a/core/node/node_framework/src/implementations/resources/reverter.rs +++ b/core/node/node_framework/src/implementations/resources/reverter.rs @@ -11,3 +11,9 @@ impl Resource for BlockReverterResource { "common/block_reverter".into() } } + +impl From for BlockReverterResource { + fn from(reverter: BlockReverter) -> Self { + Self(Unique::new(reverter)) + } +} diff --git a/core/node/node_storage_init/src/external_node/revert.rs b/core/node/node_storage_init/src/external_node/revert.rs index 0310f525572f..86d137c6b660 100644 --- a/core/node/node_storage_init/src/external_node/revert.rs +++ b/core/node/node_storage_init/src/external_node/revert.rs @@ -12,7 +12,7 @@ use crate::RevertStorage; pub struct ExternalNodeReverter { pub client: Box>, pub pool: ConnectionPool, - pub reverter: BlockReverter, + pub reverter: Option, } #[async_trait::async_trait] @@ -22,8 +22,14 @@ impl RevertStorage for ExternalNodeReverter { to_batch: L1BatchNumber, _stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { + let Some(block_reverter) = self.reverter.as_ref() else { + anyhow::bail!( + "Revert to block {to_batch} was requested, but the reverter was not provided." + ); + }; + tracing::info!("Reverting to l1 batch number {to_batch}"); - self.reverter.roll_back(to_batch).await?; + block_reverter.roll_back(to_batch).await?; tracing::info!("Revert successfully completed"); Ok(()) } From a31b245b388eb21a84a1d626f9bc8bfb1acc831b Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Thu, 11 Jul 2024 15:15:46 +0400 Subject: [PATCH 14/15] Set correct type for test sigint task --- core/bin/external_node/src/tests/framework.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/bin/external_node/src/tests/framework.rs b/core/bin/external_node/src/tests/framework.rs index 09ea2da95c4f..ea0cc366ca64 100644 --- a/core/bin/external_node/src/tests/framework.rs +++ b/core/bin/external_node/src/tests/framework.rs @@ -14,6 +14,7 @@ use zksync_node_framework::{ }, }, service::ServiceContext, + task::TaskKind, FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; use zksync_types::{L1ChainId, L2ChainId}; @@ -65,6 +66,10 @@ struct TestSigintTask(oneshot::Receiver<()>); #[async_trait::async_trait] impl Task for TestSigintTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + fn id(&self) -> TaskId { "test_sigint_task".into() } @@ -106,7 +111,6 @@ impl WiringLayer for AppHealthHijackLayer { async fn wire(self, input: Self::Input) -> Result { self.sender.send(input.app_health_check.0).unwrap(); - tracing::error!("Submitted health"); Ok(()) } } From cb05111433c3928a87e920278c3e86ee3e15720e Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 12 Jul 2024 10:31:08 +0400 Subject: [PATCH 15/15] Tests: Move building mock clients to utils --- core/bin/external_node/src/tests/mod.rs | 69 ++--------------------- core/bin/external_node/src/tests/utils.rs | 48 ++++++++++++++++ 2 files changed, 54 insertions(+), 63 deletions(-) diff --git a/core/bin/external_node/src/tests/mod.rs b/core/bin/external_node/src/tests/mod.rs index bbfe36e27825..e2b7edc174c4 100644 --- a/core/bin/external_node/src/tests/mod.rs +++ b/core/bin/external_node/src/tests/mod.rs @@ -3,8 +3,8 @@ use assert_matches::assert_matches; use framework::inject_test_layers; use test_casing::test_casing; -use zksync_types::{api, fee_model::FeeParams, Address, L1BatchNumber, U64}; -use zksync_web3_decl::{client::MockClient, jsonrpsee::core::ClientError}; +use zksync_types::{fee_model::FeeParams, L1BatchNumber, U64}; +use zksync_web3_decl::jsonrpsee::core::ClientError; use super::*; @@ -23,31 +23,7 @@ async fn external_node_basics(components_str: &'static str) { let (env, env_handles) = utils::TestEnvironment::with_genesis_block(components_str).await; let expected_health_components = utils::expected_health_components(&env.components); - let l2_client = MockClient::builder(L2::default()) - .method("eth_chainId", || Ok(U64::from(270))) - .method("zks_L1ChainId", || Ok(U64::from(9))) - .method("zks_L1BatchNumber", || Ok(U64::from(0))) - .method("zks_getL1BatchDetails", move |number: L1BatchNumber| { - assert_eq!(number, L1BatchNumber(0)); - Ok(api::L1BatchDetails { - number: L1BatchNumber(0), - base: utils::block_details_base(env.genesis_params.root_hash), - }) - }) - .method("eth_blockNumber", || Ok(U64::from(0))) - .method( - "eth_getBlockByNumber", - move |number: api::BlockNumber, _with_txs: bool| { - assert_eq!(number, api::BlockNumber::Number(0.into())); - Ok(api::Block:: { - hash: env.genesis_l2_block.hash, - ..api::Block::default() - }) - }, - ) - .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) - .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) - .build(); + let l2_client = utils::mock_l2_client(&env); let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let node_handle = tokio::task::spawn_blocking(move || { @@ -116,18 +92,7 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging let (env, env_handles) = utils::TestEnvironment::with_genesis_block("core").await; - let l2_client = MockClient::builder(L2::default()) - .method("eth_chainId", || Ok(U64::from(270))) - .method("zks_L1ChainId", || Ok(U64::from(9))) - .method("zks_L1BatchNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("eth_blockNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) - .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) - .build(); + let l2_client = utils::mock_l2_client_hanging(); let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let mut node_handle = tokio::task::spawn_blocking(move || { @@ -163,18 +128,7 @@ async fn running_tree_without_core_is_not_allowed() { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging let (env, _env_handles) = utils::TestEnvironment::with_genesis_block("tree").await; - let l2_client = MockClient::builder(L2::default()) - .method("eth_chainId", || Ok(U64::from(270))) - .method("zks_L1ChainId", || Ok(U64::from(9))) - .method("zks_L1BatchNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("eth_blockNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) - .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) - .build(); + let l2_client = utils::mock_l2_client(&env); let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let node_handle = tokio::task::spawn_blocking(move || { @@ -211,18 +165,7 @@ async fn running_tree_api_without_tree_is_not_allowed() { let _guard = zksync_vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging let (env, _env_handles) = utils::TestEnvironment::with_genesis_block("core,tree_api").await; - let l2_client = MockClient::builder(L2::default()) - .method("eth_chainId", || Ok(U64::from(270))) - .method("zks_L1ChainId", || Ok(U64::from(9))) - .method("zks_L1BatchNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("eth_blockNumber", || { - Err::<(), _>(ClientError::RequestTimeout) - }) - .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) - .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) - .build(); + let l2_client = utils::mock_l2_client(&env); let eth_client = utils::mock_eth_client(env.config.remote.diamond_proxy_addr); let node_handle = tokio::task::spawn_blocking(move || { diff --git a/core/bin/external_node/src/tests/utils.rs b/core/bin/external_node/src/tests/utils.rs index b8e02b4f3d96..3784fea4763b 100644 --- a/core/bin/external_node/src/tests/utils.rs +++ b/core/bin/external_node/src/tests/utils.rs @@ -145,3 +145,51 @@ pub(super) fn mock_eth_client(diamond_proxy_addr: Address) -> MockClient { }); mock.build().into_client() } + +/// Creates a mock L2 client with the genesis block information. +pub(super) fn mock_l2_client(env: &TestEnvironment) -> MockClient { + let genesis_root_hash = env.genesis_params.root_hash; + let genesis_l2_block_hash = env.genesis_l2_block.hash; + + MockClient::builder(L2::default()) + .method("eth_chainId", || Ok(U64::from(270))) + .method("zks_L1ChainId", || Ok(U64::from(9))) + .method("zks_L1BatchNumber", || Ok(U64::from(0))) + .method("zks_getL1BatchDetails", move |number: L1BatchNumber| { + assert_eq!(number, L1BatchNumber(0)); + Ok(api::L1BatchDetails { + number: L1BatchNumber(0), + base: utils::block_details_base(genesis_root_hash), + }) + }) + .method("eth_blockNumber", || Ok(U64::from(0))) + .method( + "eth_getBlockByNumber", + move |number: api::BlockNumber, _with_txs: bool| { + assert_eq!(number, api::BlockNumber::Number(0.into())); + Ok(api::Block:: { + hash: genesis_l2_block_hash, + ..api::Block::default() + }) + }, + ) + .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) + .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) + .build() +} + +/// Creates a mock L2 client that will mimic request timeouts on block info requests. +pub(super) fn mock_l2_client_hanging() -> MockClient { + MockClient::builder(L2::default()) + .method("eth_chainId", || Ok(U64::from(270))) + .method("zks_L1ChainId", || Ok(U64::from(9))) + .method("zks_L1BatchNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("eth_blockNumber", || { + Err::<(), _>(ClientError::RequestTimeout) + }) + .method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default())) + .method("en_whitelistedTokensForAA", || Ok([] as [Address; 0])) + .build() +}