From 7842bc4842c5c92437639105d8edac5f775ad0e6 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Wed, 19 Jun 2024 13:53:04 +0400 Subject: [PATCH] feat(node): Port (most of) Node to the Node Framework (#2196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Ports most of the EN parts to the framework, except: - Reorg detector - Genesis / snapshot recovery - Unique metrics stuff ## Why ❔ This forms the "main" body of the framework-based EN. The binary can already run, it can proxy and re-execute transactions, etc. The remaining work may be somewhat complex, so I would like to ship it separately -- so that it receives thorough review. There are some TODOs without a task number in this PR, expect them either to be fixed in the next one, or otherwise an issue to be added. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. - [ ] Spellcheck has been run via `zk spellcheck`. --- Cargo.lock | 2 + checks-config/era.dic | 1 + core/bin/external_node/Cargo.toml | 1 + core/bin/external_node/src/main.rs | 31 +- core/bin/external_node/src/node_builder.rs | 508 ++++++++++++++++++ core/bin/external_node/src/tests.rs | 2 + core/bin/zksync_server/src/node_builder.rs | 50 +- core/node/api_server/src/tx_sender/proxy.rs | 22 +- core/node/node_framework/Cargo.toml | 1 + .../node/node_framework/examples/main_node.rs | 27 +- .../layers/batch_status_updater.rs | 52 ++ .../layers/commitment_generator.rs | 24 +- .../src/implementations/layers/consensus.rs | 4 +- .../layers/consistency_checker.rs | 12 +- .../implementations/layers/house_keeper.rs | 32 -- .../l1_batch_commitment_mode_validation.rs | 59 ++ .../layers/main_node_client.rs | 19 +- .../layers/main_node_fee_params_fetcher.rs | 46 ++ .../layers/metadata_calculator.rs | 31 +- .../src/implementations/layers/mod.rs | 8 + .../layers/postgres_metrics.rs | 57 ++ .../layers/prometheus_exporter.rs | 8 +- .../src/implementations/layers/pruning.rs | 75 +++ .../layers/reorg_detector_checker.rs | 19 +- .../layers/state_keeper/external_io.rs | 68 +++ .../state_keeper/main_batch_executor.rs | 12 +- .../layers/state_keeper/mempool_io.rs | 51 +- .../layers/state_keeper/mod.rs | 29 +- .../layers/state_keeper/output_handler.rs | 121 +++++ .../layers/sync_state_updater.rs | 75 +++ .../layers/tree_data_fetcher.rs | 67 +++ .../layers/validate_chain_ids.rs | 61 +++ .../implementations/layers/web3_api/server.rs | 14 +- .../layers/web3_api/tx_sender.rs | 70 ++- .../layers/web3_api/tx_sink.rs | 31 +- .../node_sync/src/validate_chain_ids_task.rs | 17 + .../src/batch_executor/main_executor.rs | 14 +- etc/env/configs/ext-node.toml | 2 + 38 files changed, 1569 insertions(+), 154 deletions(-) create mode 100644 core/bin/external_node/src/node_builder.rs create mode 100644 core/node/node_framework/src/implementations/layers/batch_status_updater.rs create mode 100644 core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs create mode 100644 core/node/node_framework/src/implementations/layers/main_node_fee_params_fetcher.rs create mode 100644 core/node/node_framework/src/implementations/layers/postgres_metrics.rs create mode 100644 core/node/node_framework/src/implementations/layers/pruning.rs create mode 100644 core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs create mode 100644 core/node/node_framework/src/implementations/layers/state_keeper/output_handler.rs create mode 100644 core/node/node_framework/src/implementations/layers/sync_state_updater.rs create mode 100644 core/node/node_framework/src/implementations/layers/tree_data_fetcher.rs create mode 100644 core/node/node_framework/src/implementations/layers/validate_chain_ids.rs diff --git a/Cargo.lock b/Cargo.lock index c41faf9d1faf..be0ffd1566b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8643,6 +8643,7 @@ dependencies = [ "zksync_node_consensus", "zksync_node_db_pruner", "zksync_node_fee_model", + "zksync_node_framework", "zksync_node_genesis", "zksync_node_sync", "zksync_object_store", @@ -8954,6 +8955,7 @@ dependencies = [ "zksync_metadata_calculator", "zksync_node_api_server", "zksync_node_consensus", + "zksync_node_db_pruner", "zksync_node_fee_model", "zksync_node_sync", "zksync_object_store", diff --git a/checks-config/era.dic b/checks-config/era.dic index a93a467f956e..0b55a55c83ea 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -962,6 +962,7 @@ zksync_merkle_tree TreeMetadata delegator decrement +whitelisted Bbellman Sbellman DCMAKE diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index d4a883b190f4..ee6aa08be9da 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -42,6 +42,7 @@ zksync_metadata_calculator.workspace = true zksync_node_sync.workspace = true zksync_node_api_server.workspace = true zksync_node_consensus.workspace = true +zksync_node_framework.workspace = true vlog.workspace = true zksync_concurrency.workspace = true diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index cca61889ff92..04435f66bf4b 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -3,6 +3,7 @@ use std::{collections::HashSet, net::Ipv4Addr, str::FromStr, sync::Arc, time::Du use anyhow::Context as _; use clap::Parser; use metrics::EN_METRICS; +use node_builder::ExternalNodeBuilder; use tokio::{ sync::{oneshot, watch, RwLock}, task::{self, JoinHandle}, @@ -63,6 +64,7 @@ mod config; mod init; mod metadata; mod metrics; +mod node_builder; #[cfg(test)] mod tests; @@ -426,10 +428,11 @@ async fn run_api( .build() .await .context("failed to build a proxy_cache_updater_pool")?; - task_handles.push(tokio::spawn(tx_proxy.run_account_nonce_sweeper( - proxy_cache_updater_pool.clone(), - stop_receiver.clone(), - ))); + task_handles.push(tokio::spawn( + tx_proxy + .account_nonce_sweeper_task(proxy_cache_updater_pool.clone()) + .run(stop_receiver.clone()), + )); let fee_params_fetcher_handle = tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); @@ -701,6 +704,10 @@ struct Cli { /// Comma-separated list of components to launch. #[arg(long, default_value = "all")] components: ComponentsToRun, + + /// Run the node using the node framework. + #[arg(long)] + use_node_framework: bool, } #[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] @@ -784,6 +791,22 @@ async fn main() -> anyhow::Result<()> { .fetch_remote(main_node_client.as_ref()) .await .context("failed fetching remote part of node config from main node")?; + + // If the node framework is used, run the node. + if opt.use_node_framework { + // We run the node from a different thread, since the current thread is in tokio context. + std::thread::spawn(move || { + let node = + ExternalNodeBuilder::new(config).build(opt.components.0.into_iter().collect())?; + node.run()?; + anyhow::Ok(()) + }) + .join() + .expect("Failed to run the node")?; + + return Ok(()); + } + if let Some(threshold) = config.optional.slow_query_threshold() { ConnectionPool::::global_config().set_slow_query_threshold(threshold)?; } diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs new file mode 100644 index 000000000000..5eaff63d20a0 --- /dev/null +++ b/core/bin/external_node/src/node_builder.rs @@ -0,0 +1,508 @@ +//! This module provides a "builder" for the external node, +//! as well as an interface to run the node with the specified components. + +use anyhow::Context as _; +use zksync_config::{ + configs::{ + api::{HealthCheckConfig, MerkleTreeApiConfig}, + database::MerkleTreeMode, + DatabaseSecrets, + }, + PostgresConfig, +}; +use zksync_metadata_calculator::{MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig}; +use zksync_node_api_server::{tx_sender::ApiContracts, web3::Namespace}; +use zksync_node_framework::{ + implementations::layers::{ + batch_status_updater::BatchStatusUpdaterLayer, + commitment_generator::CommitmentGeneratorLayer, + consensus::{ConsensusLayer, Mode}, + consistency_checker::ConsistencyCheckerLayer, + healtcheck_server::HealthCheckLayer, + l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer, + main_node_client::MainNodeClientLayer, + main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer, + metadata_calculator::MetadataCalculatorLayer, + pools_layer::PoolsLayerBuilder, + postgres_metrics::PostgresMetricsLayer, + prometheus_exporter::PrometheusExporterLayer, + pruning::PruningLayer, + query_eth_client::QueryEthClientLayer, + sigint::SigintHandlerLayer, + state_keeper::{ + external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer, + output_handler::OutputHandlerLayer, StateKeeperLayer, + }, + sync_state_updater::SyncStateUpdaterLayer, + tree_data_fetcher::TreeDataFetcherLayer, + validate_chain_ids::ValidateChainIdsLayer, + web3_api::{ + caches::MempoolCacheLayer, + server::{Web3ServerLayer, Web3ServerOptionalConfig}, + tree_api_client::TreeApiClientLayer, + tx_sender::{PostgresStorageCachesConfig, TxSenderLayer}, + tx_sink::TxSinkLayer, + }, + }, + service::{ZkStackService, ZkStackServiceBuilder}, +}; +use zksync_state::RocksdbStorageOptions; + +use crate::{ + config::{self, ExternalNodeConfig}, + Component, +}; + +/// Builder for the external node. +#[derive(Debug)] +pub(crate) struct ExternalNodeBuilder { + node: ZkStackServiceBuilder, + config: ExternalNodeConfig, +} + +impl ExternalNodeBuilder { + pub fn new(config: ExternalNodeConfig) -> Self { + Self { + node: ZkStackServiceBuilder::new(), + config, + } + } + + fn add_sigint_handler_layer(mut self) -> anyhow::Result { + self.node.add_layer(SigintHandlerLayer); + Ok(self) + } + + fn add_pools_layer(mut self) -> anyhow::Result { + // Note: the EN config doesn't currently support specifying configuration for replicas, + // so we reuse the master configuration for that purpose. + // Settings unconditionally set to `None` are either not supported by the EN configuration layer + // or are not used in the context of the external node. + let config = PostgresConfig { + max_connections: Some(self.config.postgres.max_connections), + max_connections_master: Some(self.config.postgres.max_connections), + acquire_timeout_sec: None, + statement_timeout_sec: None, + long_connection_threshold_ms: None, + slow_query_threshold_ms: self + .config + .optional + .slow_query_threshold() + .map(|d| d.as_millis() as u64), + test_server_url: None, + test_prover_url: None, + }; + let secrets = DatabaseSecrets { + server_url: Some(self.config.postgres.database_url()), + server_replica_url: Some(self.config.postgres.database_url()), + prover_url: None, + }; + let pools_layer = PoolsLayerBuilder::empty(config, secrets) + .with_master(true) + .with_replica(true) + .build(); + self.node.add_layer(pools_layer); + Ok(self) + } + + fn add_postgres_metrics_layer(mut self) -> anyhow::Result { + self.node.add_layer(PostgresMetricsLayer); + Ok(self) + } + + fn add_main_node_client_layer(mut self) -> anyhow::Result { + let layer = MainNodeClientLayer::new( + self.config.required.main_node_url.clone(), + self.config.optional.main_node_rate_limit_rps, + self.config.required.l2_chain_id, + ); + self.node.add_layer(layer); + Ok(self) + } + + fn add_healthcheck_layer(mut self) -> anyhow::Result { + let healthcheck_config = HealthCheckConfig { + port: self.config.required.healthcheck_port, + slow_time_limit_ms: self + .config + .optional + .healthcheck_slow_time_limit() + .map(|d| d.as_millis() as u64), + hard_time_limit_ms: self + .config + .optional + .healthcheck_hard_time_limit() + .map(|d| d.as_millis() as u64), + }; + self.node.add_layer(HealthCheckLayer(healthcheck_config)); + Ok(self) + } + + fn add_prometheus_exporter_layer(mut self) -> anyhow::Result { + if let Some(prom_config) = self.config.observability.prometheus() { + self.node.add_layer(PrometheusExporterLayer(prom_config)); + } else { + tracing::info!("No configuration for prometheus exporter, skipping"); + } + Ok(self) + } + + fn add_query_eth_client_layer(mut self) -> anyhow::Result { + let query_eth_client_layer = QueryEthClientLayer::new( + self.config.required.l1_chain_id, + self.config.required.eth_client_url.clone(), + ); + self.node.add_layer(query_eth_client_layer); + Ok(self) + } + + fn add_state_keeper_layer(mut self) -> anyhow::Result { + // While optional bytecode compression may be disabled on the main node, there are batches where + // optional bytecode compression was enabled. To process these batches (and also for the case where + // compression will become optional on the sequencer again), EN has to allow txs without bytecode + // compression. + const OPTIONAL_BYTECODE_COMPRESSION: bool = true; + + let persistence_layer = OutputHandlerLayer::new( + self.config + .remote + .l2_shared_bridge_addr + .expect("L2 shared bridge address is not set"), + self.config.optional.l2_block_seal_queue_capacity, + ) + .with_pre_insert_txs(true) // EN requires txs to be pre-inserted. + .with_protective_reads_persistence_enabled( + self.config.optional.protective_reads_persistence_enabled, + ); + + let io_layer = ExternalIOLayer::new(self.config.required.l2_chain_id); + + // We only need call traces on the external node if the `debug_` namespace is enabled. + let save_call_traces = self + .config + .optional + .api_namespaces() + .contains(&Namespace::Debug); + let main_node_batch_executor_builder_layer = + MainBatchExecutorLayer::new(save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); + + let rocksdb_options = RocksdbStorageOptions { + block_cache_capacity: self + .config + .experimental + .state_keeper_db_block_cache_capacity(), + max_open_files: self.config.experimental.state_keeper_db_max_open_files, + }; + let state_keeper_layer = StateKeeperLayer::new( + self.config.required.state_cache_path.clone(), + rocksdb_options, + ); + self.node + .add_layer(persistence_layer) + .add_layer(io_layer) + .add_layer(main_node_batch_executor_builder_layer) + .add_layer(state_keeper_layer); + Ok(self) + } + + fn add_consensus_layer(mut self) -> anyhow::Result { + let config = self.config.consensus.clone(); + let secrets = + config::read_consensus_secrets().context("config::read_consensus_secrets()")?; + let layer = ConsensusLayer { + mode: Mode::External, + config, + secrets, + }; + self.node.add_layer(layer); + Ok(self) + } + + fn add_pruning_layer(mut self) -> anyhow::Result { + if self.config.optional.pruning_enabled { + let layer = PruningLayer::new( + self.config.optional.pruning_removal_delay(), + self.config.optional.pruning_chunk_size, + self.config.optional.pruning_data_retention(), + ); + self.node.add_layer(layer); + } else { + tracing::info!("Pruning is disabled"); + } + Ok(self) + } + + fn add_l1_batch_commitment_mode_validation_layer(mut self) -> anyhow::Result { + let layer = L1BatchCommitmentModeValidationLayer::new( + self.config.remote.diamond_proxy_addr, + self.config.optional.l1_batch_commit_data_generator_mode, + ); + self.node.add_layer(layer); + Ok(self) + } + + fn add_validate_chain_ids_layer(mut self) -> anyhow::Result { + let layer = ValidateChainIdsLayer::new( + self.config.required.l1_chain_id, + self.config.required.l2_chain_id, + ); + self.node.add_layer(layer); + Ok(self) + } + + fn add_consistency_checker_layer(mut self) -> anyhow::Result { + let max_batches_to_recheck = 10; // TODO (BFT-97): Make it a part of a proper EN config + let layer = ConsistencyCheckerLayer::new( + self.config.remote.diamond_proxy_addr, + max_batches_to_recheck, + self.config.optional.l1_batch_commit_data_generator_mode, + ); + self.node.add_layer(layer); + Ok(self) + } + + fn add_commitment_generator_layer(mut self) -> anyhow::Result { + let layer = + CommitmentGeneratorLayer::new(self.config.optional.l1_batch_commit_data_generator_mode) + .with_max_parallelism( + self.config + .experimental + .commitment_generator_max_parallelism, + ); + self.node.add_layer(layer); + Ok(self) + } + + fn add_batch_status_updater_layer(mut self) -> anyhow::Result { + let layer = BatchStatusUpdaterLayer; + self.node.add_layer(layer); + Ok(self) + } + + fn add_tree_data_fetcher_layer(mut self) -> anyhow::Result { + let layer = TreeDataFetcherLayer::new(self.config.remote.diamond_proxy_addr); + self.node.add_layer(layer); + Ok(self) + } + + fn add_sync_state_updater_layer(mut self) -> anyhow::Result { + // This layer may be used as a fallback for EN API if API server runs without the core component. + self.node.add_layer(SyncStateUpdaterLayer); + Ok(self) + } + + fn add_metadata_calculator_layer(mut self, with_tree_api: bool) -> anyhow::Result { + let metadata_calculator_config = MetadataCalculatorConfig { + db_path: self.config.required.merkle_tree_path.clone(), + max_open_files: self.config.optional.merkle_tree_max_open_files, + mode: MerkleTreeMode::Lightweight, + delay_interval: self.config.optional.merkle_tree_processing_delay(), + max_l1_batches_per_iter: self.config.optional.merkle_tree_max_l1_batches_per_iter, + multi_get_chunk_size: self.config.optional.merkle_tree_multi_get_chunk_size, + block_cache_capacity: self.config.optional.merkle_tree_block_cache_size(), + include_indices_and_filters_in_block_cache: self + .config + .optional + .merkle_tree_include_indices_and_filters_in_block_cache, + memtable_capacity: self.config.optional.merkle_tree_memtable_capacity(), + stalled_writes_timeout: self.config.optional.merkle_tree_stalled_writes_timeout(), + recovery: MetadataCalculatorRecoveryConfig { + desired_chunk_size: self.config.experimental.snapshots_recovery_tree_chunk_size, + parallel_persistence_buffer: self + .config + .experimental + .snapshots_recovery_tree_parallel_persistence_buffer, + }, + }; + + // Configure basic tree layer. + let mut layer = MetadataCalculatorLayer::new(metadata_calculator_config); + + // Add tree API if needed. + if with_tree_api { + let merkle_tree_api_config = MerkleTreeApiConfig { + port: self + .config + .tree_component + .api_port + .context("should contain tree api port")?, + }; + layer = layer.with_tree_api_config(merkle_tree_api_config); + } + + // Add tree pruning if needed. + if self.config.optional.pruning_enabled { + layer = layer.with_pruning_config(self.config.optional.pruning_removal_delay()); + } + + self.node.add_layer(layer); + Ok(self) + } + + fn add_tx_sender_layer(mut self) -> anyhow::Result { + let postgres_storage_config = PostgresStorageCachesConfig { + factory_deps_cache_size: self.config.optional.factory_deps_cache_size() as u64, + initial_writes_cache_size: self.config.optional.initial_writes_cache_size() as u64, + latest_values_cache_size: self.config.optional.latest_values_cache_size() as u64, + }; + let max_vm_concurrency = self.config.optional.vm_concurrency_limit; + let api_contracts = ApiContracts::load_from_disk_blocking(); // TODO (BFT-138): Allow to dynamically reload API contracts; + let tx_sender_layer = TxSenderLayer::new( + (&self.config).into(), + postgres_storage_config, + max_vm_concurrency, + api_contracts, + ) + .with_whitelisted_tokens_for_aa_cache(true); + + self.node.add_layer(TxSinkLayer::ProxySink); + self.node.add_layer(tx_sender_layer); + Ok(self) + } + + fn add_mempool_cache_layer(mut self) -> anyhow::Result { + self.node.add_layer(MempoolCacheLayer::new( + self.config.optional.mempool_cache_size, + self.config.optional.mempool_cache_update_interval(), + )); + Ok(self) + } + + fn add_tree_api_client_layer(mut self) -> anyhow::Result { + self.node.add_layer(TreeApiClientLayer::http( + self.config.api_component.tree_api_remote_url.clone(), + )); + Ok(self) + } + + fn add_main_node_fee_params_fetcher_layer(mut self) -> anyhow::Result { + self.node.add_layer(MainNodeFeeParamsFetcherLayer); + Ok(self) + } + + fn web3_api_optional_config(&self) -> Web3ServerOptionalConfig { + // The refresh interval should be several times lower than the pruning removal delay, so that + // soft-pruning will timely propagate to the API server. + let pruning_info_refresh_interval = self.config.optional.pruning_removal_delay() / 5; + + Web3ServerOptionalConfig { + namespaces: Some(self.config.optional.api_namespaces()), + filters_limit: Some(self.config.optional.filters_limit), + subscriptions_limit: Some(self.config.optional.filters_limit), + batch_request_size_limit: Some(self.config.optional.max_batch_request_size), + response_body_size_limit: Some(self.config.optional.max_response_body_size()), + with_extended_tracing: self.config.optional.extended_rpc_tracing, + pruning_info_refresh_interval: Some(pruning_info_refresh_interval), + websocket_requests_per_minute_limit: None, // To be set by WS server layer method if required. + replication_lag_limit: None, // TODO: Support replication lag limit + } + } + + fn add_http_web3_api_layer(mut self) -> anyhow::Result { + let optional_config = self.web3_api_optional_config(); + self.node.add_layer(Web3ServerLayer::http( + self.config.required.http_port, + (&self.config).into(), + optional_config, + )); + + Ok(self) + } + + fn add_ws_web3_api_layer(mut self) -> anyhow::Result { + // TODO: Support websocket requests per minute limit + let optional_config = self.web3_api_optional_config(); + self.node.add_layer(Web3ServerLayer::ws( + self.config.required.ws_port, + (&self.config).into(), + optional_config, + )); + + Ok(self) + } + + pub fn build(mut self, mut components: Vec) -> anyhow::Result { + // Add "base" layers + self = self + .add_sigint_handler_layer()? + .add_healthcheck_layer()? + .add_prometheus_exporter_layer()? + .add_pools_layer()? + .add_main_node_client_layer()? + .add_query_eth_client_layer()?; + + // Add preconditions for all the components. + self = self + .add_l1_batch_commitment_mode_validation_layer()? + .add_validate_chain_ids_layer()?; + + // Sort the components, so that the components they may depend on each other are added in the correct order. + components.sort_unstable_by_key(|component| match component { + // API consumes the resources provided by other layers (multiple ones), so it has to come the last. + Component::HttpApi | Component::WsApi => 1, + // Default priority. + _ => 0, + }); + + for component in &components { + match component { + Component::HttpApi => { + self = self + .add_sync_state_updater_layer()? + .add_mempool_cache_layer()? + .add_tree_api_client_layer()? + .add_main_node_fee_params_fetcher_layer()? + .add_tx_sender_layer()? + .add_http_web3_api_layer()?; + } + Component::WsApi => { + self = self + .add_sync_state_updater_layer()? + .add_mempool_cache_layer()? + .add_tree_api_client_layer()? + .add_main_node_fee_params_fetcher_layer()? + .add_tx_sender_layer()? + .add_ws_web3_api_layer()?; + } + Component::Tree => { + // Right now, distributed mode for EN is not fully supported, e.g. there are some + // issues with reorg detection and snapshot recovery. + // So we require the core component to be present, e.g. forcing the EN to run in a monolithic mode. + anyhow::ensure!( + components.contains(&Component::Core), + "Tree must run on the same machine as Core" + ); + let with_tree_api = components.contains(&Component::TreeApi); + self = self.add_metadata_calculator_layer(with_tree_api)?; + } + Component::TreeApi => { + anyhow::ensure!( + components.contains(&Component::Tree), + "Merkle tree API cannot be started without a tree component" + ); + // Do nothing, will be handled by the `Tree` component. + } + Component::TreeFetcher => { + self = self.add_tree_data_fetcher_layer()?; + } + Component::Core => { + // Core is a singleton & mandatory component, + // so until we have a dedicated component for "auxiliary" tasks, + // it's responsible for things like metrics. + self = self.add_postgres_metrics_layer()?; + + // Main tasks + self = self + .add_state_keeper_layer()? + .add_consensus_layer()? + .add_pruning_layer()? + .add_consistency_checker_layer()? + .add_commitment_generator_layer()? + .add_batch_status_updater_layer()?; + } + } + } + + Ok(self.node.build()?) + } +} diff --git a/core/bin/external_node/src/tests.rs b/core/bin/external_node/src/tests.rs index c78c5329386e..8966a7ac3f3b 100644 --- a/core/bin/external_node/src/tests.rs +++ b/core/bin/external_node/src/tests.rs @@ -157,6 +157,7 @@ async fn external_node_basics(components_str: &'static str) { let opt = Cli { enable_consensus: false, components, + use_node_framework: false, }; let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); if opt.components.0.contains(&Component::TreeApi) { @@ -265,6 +266,7 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() { let opt = Cli { enable_consensus: false, components: "core".parse().unwrap(), + use_node_framework: false, }; let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool); if opt.components.0.contains(&Component::TreeApi) { diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 551683605479..096d5e783551 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -23,18 +23,20 @@ use zksync_node_framework::{ eth_watch::EthWatchLayer, healtcheck_server::HealthCheckLayer, house_keeper::HouseKeeperLayer, + l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer, l1_gas::SequencerL1GasLayer, metadata_calculator::MetadataCalculatorLayer, object_store::ObjectStoreLayer, pk_signing_eth_client::PKSigningEthClientLayer, pools_layer::PoolsLayerBuilder, + postgres_metrics::PostgresMetricsLayer, prometheus_exporter::PrometheusExporterLayer, proof_data_handler::ProofDataHandlerLayer, query_eth_client::QueryEthClientLayer, sigint::SigintHandlerLayer, state_keeper::{ main_batch_executor::MainBatchExecutorLayer, mempool_io::MempoolIOLayer, - StateKeeperLayer, + output_handler::OutputHandlerLayer, RocksdbStorageOptions, StateKeeperLayer, }, tee_verifier_input_producer::TeeVerifierInputProducerLayer, vm_runner::protective_reads::ProtectiveReadsWriterLayer, @@ -111,6 +113,11 @@ impl MainNodeBuilder { Ok(self) } + fn add_postgres_metrics_layer(mut self) -> anyhow::Result { + self.node.add_layer(PostgresMetricsLayer); + Ok(self) + } + fn add_pk_signing_client_layer(mut self) -> anyhow::Result { let eth_config = try_load_config!(self.configs.eth); let wallets = try_load_config!(self.wallets.eth_sender); @@ -155,6 +162,15 @@ impl MainNodeBuilder { Ok(self) } + fn add_l1_batch_commitment_mode_validation_layer(mut self) -> anyhow::Result { + let layer = L1BatchCommitmentModeValidationLayer::new( + self.contracts_config.diamond_proxy_addr, + self.genesis_config.l1_batch_commit_data_generator_mode, + ); + self.node.add_layer(layer); + Ok(self) + } + fn add_metadata_calculator_layer(mut self, with_tree_api: bool) -> anyhow::Result { let merkle_tree_env_config = try_load_config!(self.configs.db_config).merkle_tree; let operations_manager_env_config = @@ -173,19 +189,37 @@ impl MainNodeBuilder { } fn add_state_keeper_layer(mut self) -> anyhow::Result { + // Bytecode compression is currently mandatory for the transactions processed by the sequencer. + const OPTIONAL_BYTECODE_COMPRESSION: bool = false; + let wallets = self.wallets.clone(); let sk_config = try_load_config!(self.configs.state_keeper_config); + let persistence_layer = OutputHandlerLayer::new( + self.contracts_config + .l2_shared_bridge_addr + .context("L2 shared bridge address")?, + sk_config.l2_block_seal_queue_capacity, + ); let mempool_io_layer = MempoolIOLayer::new( self.genesis_config.l2_chain_id, - self.contracts_config.clone(), sk_config.clone(), try_load_config!(self.configs.mempool_config), try_load_config!(wallets.state_keeper), ); let db_config = try_load_config!(self.configs.db_config); - let main_node_batch_executor_builder_layer = MainBatchExecutorLayer::new(sk_config); - let state_keeper_layer = StateKeeperLayer::new(db_config); + let main_node_batch_executor_builder_layer = + MainBatchExecutorLayer::new(sk_config.save_call_traces, OPTIONAL_BYTECODE_COMPRESSION); + + let rocksdb_options = RocksdbStorageOptions { + block_cache_capacity: db_config + .experimental + .state_keeper_db_block_cache_capacity(), + max_open_files: db_config.experimental.state_keeper_db_max_open_files, + }; + let state_keeper_layer = + StateKeeperLayer::new(db_config.state_keeper_db_path, rocksdb_options); self.node + .add_layer(persistence_layer) .add_layer(mempool_io_layer) .add_layer(main_node_batch_executor_builder_layer) .add_layer(state_keeper_layer); @@ -308,6 +342,7 @@ impl MainNodeBuilder { rpc_config.websocket_requests_per_minute_limit(), ), replication_lag_limit: circuit_breaker_config.replication_lag_limit(), + ..Default::default() }; self.node.add_layer(Web3ServerLayer::ws( rpc_config.ws_port, @@ -419,7 +454,8 @@ impl MainNodeBuilder { .add_healthcheck_layer()? .add_prometheus_exporter_layer()? .add_query_eth_client_layer()? - .add_sequencer_l1_gas_layer()?; + .add_sequencer_l1_gas_layer()? + .add_l1_batch_commitment_mode_validation_layer()?; // Sort the components, so that the components they may depend on each other are added in the correct order. components.sort_unstable_by_key(|component| match component { @@ -479,7 +515,9 @@ impl MainNodeBuilder { self = self.add_tee_verifier_input_producer_layer()?; } Component::Housekeeper => { - self = self.add_house_keeper_layer()?; + self = self + .add_house_keeper_layer()? + .add_postgres_metrics_layer()?; } Component::ProofDataHandler => { self = self.add_proof_data_handler_layer()?; diff --git a/core/node/api_server/src/tx_sender/proxy.rs b/core/node/api_server/src/tx_sender/proxy.rs index a1fa77d2f1b2..52fcc8a1a8b0 100644 --- a/core/node/api_server/src/tx_sender/proxy.rs +++ b/core/node/api_server/src/tx_sender/proxy.rs @@ -1,6 +1,5 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, - future::Future, sync::Arc, time::Duration, }; @@ -282,13 +281,24 @@ impl TxProxy { pending_nonce } - pub fn run_account_nonce_sweeper( + pub fn account_nonce_sweeper_task( &self, pool: ConnectionPool, - stop_receiver: watch::Receiver, - ) -> impl Future> { - let tx_cache = self.tx_cache.clone(); - tx_cache.run_updates(pool, stop_receiver) + ) -> AccountNonceSweeperTask { + let cache = self.tx_cache.clone(); + AccountNonceSweeperTask { cache, pool } + } +} + +#[derive(Debug)] +pub struct AccountNonceSweeperTask { + cache: TxCache, + pool: ConnectionPool, +} + +impl AccountNonceSweeperTask { + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.cache.run_updates(self.pool, stop_receiver).await } } diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index 8e2c915d5749..d48522fb8116 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -45,6 +45,7 @@ zksync_tee_verifier_input_producer.workspace = true zksync_queued_job_processor.workspace = true zksync_reorg_detector.workspace = true zksync_vm_runner.workspace = true +zksync_node_db_pruner.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index a62f04af0334..f0cb8417ff97 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -43,7 +43,7 @@ use zksync_node_framework::{ sigint::SigintHandlerLayer, state_keeper::{ main_batch_executor::MainBatchExecutorLayer, mempool_io::MempoolIOLayer, - StateKeeperLayer, + output_handler::OutputHandlerLayer, StateKeeperLayer, }, web3_api::{ caches::MempoolCacheLayer, @@ -55,6 +55,7 @@ use zksync_node_framework::{ }, service::{ZkStackService, ZkStackServiceBuilder, ZkStackServiceError}, }; +use zksync_state::RocksdbStorageOptions; struct MainNodeBuilder { node: ZkStackServiceBuilder, @@ -145,17 +146,32 @@ impl MainNodeBuilder { fn add_state_keeper_layer(mut self) -> anyhow::Result { let wallets = Wallets::from_env()?; + let contracts_config = ContractsConfig::from_env()?; + let sk_config = StateKeeperConfig::from_env()?; + let persisence_layer = OutputHandlerLayer::new( + contracts_config.l2_shared_bridge_addr.unwrap(), + sk_config.l2_block_seal_queue_capacity, + ); let mempool_io_layer = MempoolIOLayer::new( NetworkConfig::from_env()?.zksync_network_id, - ContractsConfig::from_env()?, - StateKeeperConfig::from_env()?, + sk_config, MempoolConfig::from_env()?, wallets.state_keeper.context("State keeper wallets")?, ); let main_node_batch_executor_builder_layer = - MainBatchExecutorLayer::new(StateKeeperConfig::from_env()?); - let state_keeper_layer = StateKeeperLayer::new(DBConfig::from_env()?); + MainBatchExecutorLayer::new(StateKeeperConfig::from_env()?.save_call_traces, true); + let db_config = DBConfig::from_env()?; + + let rocksdb_options = RocksdbStorageOptions { + block_cache_capacity: db_config + .experimental + .state_keeper_db_block_cache_capacity(), + max_open_files: db_config.experimental.state_keeper_db_max_open_files, + }; + let state_keeper_layer = + StateKeeperLayer::new(db_config.state_keeper_db_path, rocksdb_options); self.node + .add_layer(persisence_layer) .add_layer(mempool_io_layer) .add_layer(main_node_batch_executor_builder_layer) .add_layer(state_keeper_layer); @@ -286,6 +302,7 @@ impl MainNodeBuilder { rpc_config.websocket_requests_per_minute_limit(), ), replication_lag_limit: circuit_breaker_config.replication_lag_limit(), + ..Default::default() }; self.node.add_layer(Web3ServerLayer::ws( rpc_config.ws_port, diff --git a/core/node/node_framework/src/implementations/layers/batch_status_updater.rs b/core/node/node_framework/src/implementations/layers/batch_status_updater.rs new file mode 100644 index 000000000000..ba328facc8a3 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/batch_status_updater.rs @@ -0,0 +1,52 @@ +use zksync_node_sync::batch_status_updater::BatchStatusUpdater; + +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct BatchStatusUpdaterLayer; + +#[async_trait::async_trait] +impl WiringLayer for BatchStatusUpdaterLayer { + fn layer_name(&self) -> &'static str { + "batch_status_updater_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let pool = context.get_resource::>().await?; + let MainNodeClientResource(client) = context.get_resource().await?; + + let updater = BatchStatusUpdater::new(client, pool.get().await?); + + // Insert healthcheck + let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; + app_health + .insert_component(updater.health_check()) + .map_err(WiringError::internal)?; + + // Insert task + context.add_task(Box::new(updater)); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for BatchStatusUpdater { + fn id(&self) -> TaskId { + "batch_status_updater".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await?; + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/commitment_generator.rs b/core/node/node_framework/src/implementations/layers/commitment_generator.rs index 5d2f63931295..cc57599759eb 100644 --- a/core/node/node_framework/src/implementations/layers/commitment_generator.rs +++ b/core/node/node_framework/src/implementations/layers/commitment_generator.rs @@ -1,3 +1,5 @@ +use std::num::NonZero; + use zksync_commitment_generator::CommitmentGenerator; use zksync_types::commitment::L1BatchCommitmentMode; @@ -14,11 +16,20 @@ use crate::{ #[derive(Debug)] pub struct CommitmentGeneratorLayer { mode: L1BatchCommitmentMode, + max_parallelism: Option>, } impl CommitmentGeneratorLayer { pub fn new(mode: L1BatchCommitmentMode) -> Self { - Self { mode } + Self { + mode, + max_parallelism: None, + } + } + + pub fn with_max_parallelism(mut self, max_parallelism: Option>) -> Self { + self.max_parallelism = max_parallelism; + self } } @@ -30,10 +41,17 @@ impl WiringLayer for CommitmentGeneratorLayer { async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { let pool_resource = context.get_resource::>().await?; - let pool_size = CommitmentGenerator::default_parallelism().get(); + + let pool_size = self + .max_parallelism + .unwrap_or(CommitmentGenerator::default_parallelism()) + .get(); let main_pool = pool_resource.get_custom(pool_size).await?; - let commitment_generator = CommitmentGenerator::new(main_pool, self.mode); + let mut commitment_generator = CommitmentGenerator::new(main_pool, self.mode); + if let Some(max_parallelism) = self.max_parallelism { + commitment_generator.set_max_parallelism(max_parallelism); + } let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; app_health diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs index 06bca1bba3ae..8cc7ea4098de 100644 --- a/core/node/node_framework/src/implementations/layers/consensus.rs +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -161,14 +161,14 @@ impl Task for FetcherTask { let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async { s.spawn_bg(consensus::era::run_en( - &root_ctx, + ctx, self.config, self.pool, self.sync_state, self.main_node_client, self.action_queue_sender, )); - ctx.wait(stop_receiver.0.wait_for(|stop| *stop)).await??; + let _ = stop_receiver.0.wait_for(|stop| *stop).await?; Ok(()) }) .await diff --git a/core/node/node_framework/src/implementations/layers/consistency_checker.rs b/core/node/node_framework/src/implementations/layers/consistency_checker.rs index a387fc19ead1..fb4b6d8f5eed 100644 --- a/core/node/node_framework/src/implementations/layers/consistency_checker.rs +++ b/core/node/node_framework/src/implementations/layers/consistency_checker.rs @@ -61,25 +61,19 @@ impl WiringLayer for ConsistencyCheckerLayer { .map_err(WiringError::internal)?; // Create and add tasks. - context.add_task(Box::new(ConsistencyCheckerTask { - consistency_checker, - })); + context.add_task(Box::new(consistency_checker)); Ok(()) } } -pub struct ConsistencyCheckerTask { - consistency_checker: ConsistencyChecker, -} - #[async_trait::async_trait] -impl Task for ConsistencyCheckerTask { +impl Task for ConsistencyChecker { fn id(&self) -> TaskId { "consistency_checker".into() } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - self.consistency_checker.run(stop_receiver.0).await + (*self).run(stop_receiver.0).await } } diff --git a/core/node/node_framework/src/implementations/layers/house_keeper.rs b/core/node/node_framework/src/implementations/layers/house_keeper.rs index 7b3e52c7ed5d..416d80691a31 100644 --- a/core/node/node_framework/src/implementations/layers/house_keeper.rs +++ b/core/node/node_framework/src/implementations/layers/house_keeper.rs @@ -1,10 +1,7 @@ -use std::time::Duration; - use zksync_config::configs::{ fri_prover_group::FriProverGroupConfig, house_keeper::HouseKeeperConfig, FriProofCompressorConfig, FriProverConfig, FriWitnessGeneratorConfig, }; -use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core}; use zksync_house_keeper::{ blocks_state_reporter::L1BatchMetricsReporter, periodic_job::PeriodicJob, @@ -23,8 +20,6 @@ use crate::{ wiring_layer::{WiringError, WiringLayer}, }; -const SCRAPE_INTERVAL: Duration = Duration::from_secs(60); - #[derive(Debug)] pub struct HouseKeeperLayer { house_keeper_config: HouseKeeperConfig, @@ -67,9 +62,6 @@ impl WiringLayer for HouseKeeperLayer { let prover_pool = prover_pool_resource.get().await?; // initialize and add tasks - let pool_for_metrics = replica_pool_resource.get_singleton().await?; - context.add_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics })); - let l1_batch_metrics_reporter = L1BatchMetricsReporter::new( self.house_keeper_config .l1_batch_metrics_reporting_interval_ms, @@ -172,30 +164,6 @@ impl WiringLayer for HouseKeeperLayer { } } -#[derive(Debug)] -struct PostgresMetricsScrapingTask { - pool_for_metrics: ConnectionPool, -} - -#[async_trait::async_trait] -impl Task for PostgresMetricsScrapingTask { - fn id(&self) -> TaskId { - "postgres_metrics_scraping".into() - } - - async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { - tokio::select! { - () = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => { - tracing::warn!("Postgres metrics scraping unexpectedly stopped"); - } - _ = stop_receiver.0.changed() => { - tracing::info!("Stop signal received, Postgres metrics scraping is shutting down"); - } - } - Ok(()) - } -} - #[derive(Debug)] struct L1BatchMetricsReporterTask { l1_batch_metrics_reporter: L1BatchMetricsReporter, diff --git a/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs b/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs new file mode 100644 index 000000000000..e333eda51192 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/l1_batch_commitment_mode_validation.rs @@ -0,0 +1,59 @@ +use zksync_commitment_generator::validation_task::L1BatchCommitmentModeValidationTask; +use zksync_types::{commitment::L1BatchCommitmentMode, Address}; + +use crate::{ + implementations::resources::eth_interface::EthInterfaceResource, + precondition::Precondition, + service::{ServiceContext, StopReceiver}, + task::TaskId, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct L1BatchCommitmentModeValidationLayer { + diamond_proxy_addr: Address, + l1_batch_commit_data_generator_mode: L1BatchCommitmentMode, +} + +impl L1BatchCommitmentModeValidationLayer { + pub fn new( + diamond_proxy_addr: Address, + l1_batch_commit_data_generator_mode: L1BatchCommitmentMode, + ) -> Self { + Self { + diamond_proxy_addr, + l1_batch_commit_data_generator_mode, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for L1BatchCommitmentModeValidationLayer { + fn layer_name(&self) -> &'static str { + "l1_batch_commitment_mode_validation_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let EthInterfaceResource(query_client) = context.get_resource().await?; + let task = L1BatchCommitmentModeValidationTask::new( + self.diamond_proxy_addr, + self.l1_batch_commit_data_generator_mode, + query_client, + ); + + context.add_precondition(Box::new(task)); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Precondition for L1BatchCommitmentModeValidationTask { + fn id(&self) -> TaskId { + "l1_batch_commitment_mode_validation".into() + } + + async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).exit_on_success().run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/main_node_client.rs b/core/node/node_framework/src/implementations/layers/main_node_client.rs index 80e5d44c350f..a694eb831330 100644 --- a/core/node/node_framework/src/implementations/layers/main_node_client.rs +++ b/core/node/node_framework/src/implementations/layers/main_node_client.rs @@ -1,11 +1,14 @@ -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, sync::Arc}; use anyhow::Context; +use zksync_node_sync::MainNodeHealthCheck; use zksync_types::{url::SensitiveUrl, L2ChainId}; use zksync_web3_decl::client::{Client, DynClient, L2}; use crate::{ - implementations::resources::main_node_client::MainNodeClientResource, + implementations::resources::{ + healthcheck::AppHealthCheckResource, main_node_client::MainNodeClientResource, + }, service::ServiceContext, wiring_layer::{WiringError, WiringLayer}, }; @@ -40,9 +43,15 @@ impl WiringLayer for MainNodeClientLayer { .with_allowed_requests_per_second(self.rate_limit_rps) .build(); - context.insert_resource(MainNodeClientResource( - Box::new(main_node_client) as Box> - ))?; + let client = Box::new(main_node_client) as Box>; + context.insert_resource(MainNodeClientResource(client.clone()))?; + + // Insert healthcheck + let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; + app_health + .insert_custom_component(Arc::new(MainNodeHealthCheck::from(client))) + .map_err(WiringError::internal)?; + Ok(()) } } diff --git a/core/node/node_framework/src/implementations/layers/main_node_fee_params_fetcher.rs b/core/node/node_framework/src/implementations/layers/main_node_fee_params_fetcher.rs new file mode 100644 index 000000000000..11bfab18a4c6 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/main_node_fee_params_fetcher.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use zksync_node_fee_model::l1_gas_price::MainNodeFeeParamsFetcher; + +use crate::{ + implementations::resources::{ + fee_input::FeeInputResource, main_node_client::MainNodeClientResource, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct MainNodeFeeParamsFetcherLayer; + +#[async_trait::async_trait] +impl WiringLayer for MainNodeFeeParamsFetcherLayer { + fn layer_name(&self) -> &'static str { + "main_node_fee_params_fetcher_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let MainNodeClientResource(main_node_client) = context.get_resource().await?; + let fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client)); + context.insert_resource(FeeInputResource(fetcher.clone()))?; + context.add_task(Box::new(MainNodeFeeParamsFetcherTask { fetcher })); + Ok(()) + } +} + +#[derive(Debug)] +struct MainNodeFeeParamsFetcherTask { + fetcher: Arc, +} + +#[async_trait::async_trait] +impl Task for MainNodeFeeParamsFetcherTask { + fn id(&self) -> TaskId { + "main_node_fee_params_fetcher".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.fetcher.run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 935bb283fe81..bc1244410bf2 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -1,12 +1,13 @@ use std::{ net::{Ipv4Addr, SocketAddr}, sync::Arc, + time::Duration, }; use anyhow::Context as _; use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode}; use zksync_metadata_calculator::{ - LazyAsyncTreeReader, MetadataCalculator, MetadataCalculatorConfig, + LazyAsyncTreeReader, MerkleTreePruningTask, MetadataCalculator, MetadataCalculatorConfig, }; use zksync_storage::RocksDB; @@ -35,6 +36,7 @@ use crate::{ pub struct MetadataCalculatorLayer { config: MetadataCalculatorConfig, tree_api_config: Option, + pruning_config: Option, } impl MetadataCalculatorLayer { @@ -42,6 +44,7 @@ impl MetadataCalculatorLayer { Self { config, tree_api_config: None, + pruning_config: None, } } @@ -49,6 +52,11 @@ impl MetadataCalculatorLayer { self.tree_api_config = Some(tree_api_config); self } + + pub fn with_pruning_config(mut self, pruning_config: Duration) -> Self { + self.pruning_config = Some(pruning_config); + self + } } #[async_trait::async_trait] @@ -76,7 +84,7 @@ impl WiringLayer for MetadataCalculatorLayer { } }; - let metadata_calculator = MetadataCalculator::new( + let mut metadata_calculator = MetadataCalculator::new( self.config, object_store.map(|store_resource| store_resource.0), main_pool, @@ -98,6 +106,14 @@ impl WiringLayer for MetadataCalculatorLayer { })); } + if let Some(pruning_removal_delay) = self.pruning_config { + let pruning_task = Box::new(metadata_calculator.pruning_task(pruning_removal_delay)); + app_health + .insert_component(pruning_task.health_check()) + .map_err(|err| WiringError::Internal(err.into()))?; + context.add_task(pruning_task); + } + context.insert_resource(TreeApiClientResource(Arc::new( metadata_calculator.tree_reader(), )))?; @@ -154,3 +170,14 @@ impl Task for TreeApiTask { .await } } + +#[async_trait::async_trait] +impl Task for MerkleTreePruningTask { + fn id(&self) -> TaskId { + "merkle_tree_pruning_task".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 1c171e84b5ba..8637f15459d5 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -1,3 +1,4 @@ +pub mod batch_status_updater; pub mod circuit_breaker_checker; pub mod commitment_generator; pub mod consensus; @@ -7,19 +8,26 @@ pub mod eth_sender; pub mod eth_watch; pub mod healtcheck_server; pub mod house_keeper; +pub mod l1_batch_commitment_mode_validation; pub mod l1_gas; pub mod main_node_client; +pub mod main_node_fee_params_fetcher; pub mod metadata_calculator; pub mod object_store; pub mod pk_signing_eth_client; pub mod pools_layer; +pub mod postgres_metrics; pub mod prometheus_exporter; pub mod proof_data_handler; +pub mod pruning; pub mod query_eth_client; pub mod reorg_detector_checker; pub mod reorg_detector_runner; pub mod sigint; pub mod state_keeper; +pub mod sync_state_updater; pub mod tee_verifier_input_producer; +pub mod tree_data_fetcher; +pub mod validate_chain_ids; pub mod vm_runner; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/layers/postgres_metrics.rs b/core/node/node_framework/src/implementations/layers/postgres_metrics.rs new file mode 100644 index 000000000000..09d81844dd5a --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/postgres_metrics.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core}; + +use crate::{ + implementations::resources::pools::{PoolResource, ReplicaPool}, + service::{ServiceContext, StopReceiver}, + task::{TaskId, UnconstrainedTask}, + wiring_layer::{WiringError, WiringLayer}, +}; + +const SCRAPE_INTERVAL: Duration = Duration::from_secs(60); + +#[derive(Debug)] +pub struct PostgresMetricsLayer; + +#[async_trait::async_trait] +impl WiringLayer for PostgresMetricsLayer { + fn layer_name(&self) -> &'static str { + "postgres_metrics_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let replica_pool_resource = context.get_resource::>().await?; + let pool_for_metrics = replica_pool_resource.get_singleton().await?; + context.add_unconstrained_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics })); + + Ok(()) + } +} + +#[derive(Debug)] +struct PostgresMetricsScrapingTask { + pool_for_metrics: ConnectionPool, +} + +#[async_trait::async_trait] +impl UnconstrainedTask for PostgresMetricsScrapingTask { + fn id(&self) -> TaskId { + "postgres_metrics_scraping".into() + } + + async fn run_unconstrained( + self: Box, + mut stop_receiver: StopReceiver, + ) -> anyhow::Result<()> { + tokio::select! { + () = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => { + tracing::warn!("Postgres metrics scraping unexpectedly stopped"); + } + _ = stop_receiver.0.changed() => { + tracing::info!("Stop signal received, Postgres metrics scraping is shutting down"); + } + } + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs index 6c7d4f915df4..4b7451348235 100644 --- a/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs +++ b/core/node/node_framework/src/implementations/layers/prometheus_exporter.rs @@ -4,7 +4,7 @@ use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, service::{ServiceContext, StopReceiver}, - task::{Task, TaskId}, + task::{TaskId, UnconstrainedTask}, wiring_layer::{WiringError, WiringLayer}, }; @@ -43,18 +43,18 @@ impl WiringLayer for PrometheusExporterLayer { prometheus_health_updater, }); - node.add_task(task); + node.add_unconstrained_task(task); Ok(()) } } #[async_trait::async_trait] -impl Task for PrometheusExporterTask { +impl UnconstrainedTask for PrometheusExporterTask { fn id(&self) -> TaskId { "prometheus_exporter".into() } - async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run_unconstrained(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { let prometheus_task = self.config.run(stop_receiver.0); self.prometheus_health_updater .update(HealthStatus::Ready.into()); diff --git a/core/node/node_framework/src/implementations/layers/pruning.rs b/core/node/node_framework/src/implementations/layers/pruning.rs new file mode 100644 index 000000000000..3ad52606083b --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/pruning.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use zksync_node_db_pruner::{DbPruner, DbPrunerConfig}; + +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, + pools::{MasterPool, PoolResource}, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct PruningLayer { + pruning_removal_delay: Duration, + pruning_chunk_size: u32, + minimum_l1_batch_age: Duration, +} + +impl PruningLayer { + pub fn new( + pruning_removal_delay: Duration, + pruning_chunk_size: u32, + minimum_l1_batch_age: Duration, + ) -> Self { + Self { + pruning_removal_delay, + pruning_chunk_size, + minimum_l1_batch_age, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for PruningLayer { + fn layer_name(&self) -> &'static str { + "pruning_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let pool_resource = context.get_resource::>().await?; + let main_pool = pool_resource.get().await?; + + let db_pruner = DbPruner::new( + DbPrunerConfig { + removal_delay: self.pruning_removal_delay, + pruned_batch_chunk_size: self.pruning_chunk_size, + minimum_l1_batch_age: self.minimum_l1_batch_age, + }, + main_pool, + ); + + let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; + app_health + .insert_component(db_pruner.health_check()) + .map_err(WiringError::internal)?; + + context.add_task(Box::new(db_pruner)); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for DbPruner { + fn id(&self) -> TaskId { + "db_pruner".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs index 64454b63998b..eee63e6763b1 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -1,6 +1,7 @@ use std::time::Duration; use anyhow::Context; +use zksync_dal::{ConnectionPool, Core}; use zksync_reorg_detector::{self, ReorgDetector}; use crate::{ @@ -36,6 +37,7 @@ impl WiringLayer for ReorgDetectorCheckerLayer { // Create and insert precondition. context.add_precondition(Box::new(CheckerPrecondition { + pool: pool.clone(), reorg_detector: ReorgDetector::new(main_node_client, pool), })); @@ -44,6 +46,7 @@ impl WiringLayer for ReorgDetectorCheckerLayer { } pub struct CheckerPrecondition { + pool: ConnectionPool, reorg_detector: ReorgDetector, } @@ -53,7 +56,21 @@ impl Precondition for CheckerPrecondition { "reorg_detector_checker".into() } - async fn check(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn check(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + // Given that this is a precondition -- i.e. something that starts before some invariants are met, + // we need to first ensure that there is at least one batch in the database (there may be none if + // either genesis or snapshot recovery has not been performed yet). + let earliest_batch = zksync_dal::helpers::wait_for_l1_batch( + &self.pool, + REORG_DETECTED_SLEEP_INTERVAL, + &mut stop_receiver.0, + ) + .await?; + if earliest_batch.is_none() { + // Stop signal received. + return Ok(()); + } + loop { match self.reorg_detector.run_once(stop_receiver.0.clone()).await { Ok(()) => return Ok(()), diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs b/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs new file mode 100644 index 000000000000..1ec80fef4272 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/state_keeper/external_io.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; + +use anyhow::Context as _; +use zksync_node_sync::{ActionQueue, ExternalIO, SyncState}; +use zksync_state_keeper::seal_criteria::NoopSealer; +use zksync_types::L2ChainId; + +use crate::{ + implementations::resources::{ + action_queue::ActionQueueSenderResource, + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + state_keeper::{ConditionalSealerResource, StateKeeperIOResource}, + sync_state::SyncStateResource, + }, + resource::Unique, + service::ServiceContext, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct ExternalIOLayer { + chain_id: L2ChainId, +} + +impl ExternalIOLayer { + pub fn new(chain_id: L2ChainId) -> Self { + Self { chain_id } + } +} + +#[async_trait::async_trait] +impl WiringLayer for ExternalIOLayer { + fn layer_name(&self) -> &'static str { + "external_io_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Fetch required resources. + let master_pool = context.get_resource::>().await?; + let MainNodeClientResource(main_node_client) = context.get_resource().await?; + + // Create `SyncState` resource. + let sync_state = SyncState::default(); + context.insert_resource(SyncStateResource(sync_state))?; + + // Create `ActionQueueSender` resource. + let (action_queue_sender, action_queue) = ActionQueue::new(); + context.insert_resource(ActionQueueSenderResource(Unique::new(action_queue_sender)))?; + + // Create external IO resource. + let io_pool = master_pool.get().await.context("Get master pool")?; + let io = ExternalIO::new( + io_pool, + action_queue, + Box::new(main_node_client.for_component("external_io")), + self.chain_id, + ) + .await + .context("Failed initializing I/O for external node state keeper")?; + context.insert_resource(StateKeeperIOResource(Unique::new(Box::new(io))))?; + + // Create sealer. + context.insert_resource(ConditionalSealerResource(Arc::new(NoopSealer)))?; + + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs index 2fb35fb201ab..82e6e52274aa 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs @@ -1,4 +1,3 @@ -use zksync_config::configs::chain::StateKeeperConfig; use zksync_state_keeper::MainBatchExecutor; use crate::{ @@ -10,13 +9,15 @@ use crate::{ #[derive(Debug)] pub struct MainBatchExecutorLayer { - state_keeper_config: StateKeeperConfig, + save_call_traces: bool, + optional_bytecode_compression: bool, } impl MainBatchExecutorLayer { - pub fn new(state_keeper_config: StateKeeperConfig) -> Self { + pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self { Self { - state_keeper_config, + save_call_traces, + optional_bytecode_compression, } } } @@ -28,7 +29,8 @@ impl WiringLayer for MainBatchExecutorLayer { } async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { - let builder = MainBatchExecutor::new(self.state_keeper_config.save_call_traces, false); + let builder = + MainBatchExecutor::new(self.save_call_traces, self.optional_bytecode_compression); context.insert_resource(BatchExecutorResource(Unique::new(Box::new(builder))))?; Ok(()) diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs index 65e86bef5204..1a913fd990bf 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mempool_io.rs @@ -1,24 +1,18 @@ use std::sync::Arc; use anyhow::Context as _; -use zksync_config::{ - configs::{ - chain::{MempoolConfig, StateKeeperConfig}, - wallets, - }, - ContractsConfig, -}; -use zksync_state_keeper::{ - io::seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, MempoolFetcher, MempoolGuard, - MempoolIO, OutputHandler, SequencerSealer, StateKeeperPersistence, TreeWritesPersistence, +use zksync_config::configs::{ + chain::{MempoolConfig, StateKeeperConfig}, + wallets, }; +use zksync_state_keeper::{MempoolFetcher, MempoolGuard, MempoolIO, SequencerSealer}; use zksync_types::L2ChainId; use crate::{ implementations::resources::{ fee_input::FeeInputResource, pools::{MasterPool, PoolResource}, - state_keeper::{ConditionalSealerResource, OutputHandlerResource, StateKeeperIOResource}, + state_keeper::{ConditionalSealerResource, StateKeeperIOResource}, }, resource::Unique, service::{ServiceContext, StopReceiver}, @@ -29,7 +23,6 @@ use crate::{ #[derive(Debug)] pub struct MempoolIOLayer { zksync_network_id: L2ChainId, - contracts_config: ContractsConfig, state_keeper_config: StateKeeperConfig, mempool_config: MempoolConfig, wallets: wallets::StateKeeper, @@ -38,14 +31,12 @@ pub struct MempoolIOLayer { impl MempoolIOLayer { pub fn new( zksync_network_id: L2ChainId, - contracts_config: ContractsConfig, state_keeper_config: StateKeeperConfig, mempool_config: MempoolConfig, wallets: wallets::StateKeeper, ) -> Self { Self { zksync_network_id, - contracts_config, state_keeper_config, mempool_config, wallets, @@ -81,23 +72,6 @@ impl WiringLayer for MempoolIOLayer { let batch_fee_input_provider = context.get_resource::().await?.0; let master_pool = context.get_resource::>().await?; - // Create L2 block sealer task and output handler. - // L2 Block sealing process is parallelized, so we have to provide enough pooled connections. - let persistence_pool = master_pool - .get_custom(L2BlockSealProcess::subtasks_len()) - .await - .context("Get master pool")?; - let (persistence, l2_block_sealer) = StateKeeperPersistence::new( - persistence_pool.clone(), - self.contracts_config.l2_shared_bridge_addr.unwrap(), - self.state_keeper_config.l2_block_seal_queue_capacity, - ); - let tree_writes_persistence = TreeWritesPersistence::new(persistence_pool); - let output_handler = OutputHandler::new(Box::new(persistence)) - .with_handler(Box::new(tree_writes_persistence)); - context.insert_resource(OutputHandlerResource(Unique::new(output_handler)))?; - context.add_task(Box::new(L2BlockSealerTask(l2_block_sealer))); - // Create mempool fetcher task. let mempool_guard = self.build_mempool_guard(&master_pool).await?; let mempool_fetcher_pool = master_pool @@ -137,21 +111,6 @@ impl WiringLayer for MempoolIOLayer { } } -#[derive(Debug)] -struct L2BlockSealerTask(zksync_state_keeper::L2BlockSealerTask); - -#[async_trait::async_trait] -impl Task for L2BlockSealerTask { - fn id(&self) -> TaskId { - "state_keeper/l2_block_sealer".into() - } - - async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { - // Miniblock sealer will exit itself once sender is dropped. - self.0.run().await - } -} - #[derive(Debug)] struct MempoolFetcherTask(MempoolFetcher); diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index edbe1d6e12f7..97364f6388cd 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -1,16 +1,20 @@ use std::sync::Arc; use anyhow::Context; -use zksync_config::DBConfig; -use zksync_state::{AsyncCatchupTask, ReadStorageFactory, RocksdbStorageOptions}; +use zksync_state::{AsyncCatchupTask, ReadStorageFactory}; use zksync_state_keeper::{ seal_criteria::ConditionalSealer, AsyncRocksdbCache, BatchExecutor, OutputHandler, StateKeeperIO, ZkSyncStateKeeper, }; use zksync_storage::RocksDB; +pub mod external_io; pub mod main_batch_executor; pub mod mempool_io; +pub mod output_handler; + +// Public re-export to not require the user to directly depend on `zksync_state`. +pub use zksync_state::RocksdbStorageOptions; use crate::{ implementations::resources::{ @@ -32,12 +36,16 @@ use crate::{ /// #[derive(Debug)] pub struct StateKeeperLayer { - db_config: DBConfig, + state_keeper_db_path: String, + rocksdb_options: RocksdbStorageOptions, } impl StateKeeperLayer { - pub fn new(db_config: DBConfig) -> Self { - Self { db_config } + pub fn new(state_keeper_db_path: String, rocksdb_options: RocksdbStorageOptions) -> Self { + Self { + state_keeper_db_path, + rocksdb_options, + } } } @@ -69,17 +77,10 @@ impl WiringLayer for StateKeeperLayer { let sealer = context.get_resource::().await?.0; let master_pool = context.get_resource::>().await?; - let cache_options = RocksdbStorageOptions { - block_cache_capacity: self - .db_config - .experimental - .state_keeper_db_block_cache_capacity(), - max_open_files: self.db_config.experimental.state_keeper_db_max_open_files, - }; let (storage_factory, task) = AsyncRocksdbCache::new( master_pool.get_custom(2).await?, - self.db_config.state_keeper_db_path, - cache_options, + self.state_keeper_db_path, + self.rocksdb_options, ); context.add_task(Box::new(RocksdbCatchupTask(task))); diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/output_handler.rs b/core/node/node_framework/src/implementations/layers/state_keeper/output_handler.rs new file mode 100644 index 000000000000..d0e94f637e08 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/state_keeper/output_handler.rs @@ -0,0 +1,121 @@ +use anyhow::Context as _; +use zksync_state_keeper::{ + io::seal_logic::l2_block_seal_subtasks::L2BlockSealProcess, OutputHandler, + StateKeeperPersistence, TreeWritesPersistence, +}; +use zksync_types::Address; + +use crate::{ + implementations::resources::{ + pools::{MasterPool, PoolResource}, + state_keeper::OutputHandlerResource, + sync_state::SyncStateResource, + }, + resource::Unique, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct OutputHandlerLayer { + l2_shared_bridge_addr: Address, + l2_block_seal_queue_capacity: usize, + /// Whether transactions should be pre-inserted to DB. + /// Should be set to `true` for EN's IO as EN doesn't store transactions in DB + /// before they are included into L2 blocks. + pre_insert_txs: bool, + /// Whether protective reads persistence is enabled. + /// Must be `true` for any node that maintains a full Merkle Tree (e.g. any instance of main node). + /// May be set to `false` for nodes that do not participate in the sequencing process (e.g. external nodes). + protective_reads_persistence_enabled: bool, +} + +impl OutputHandlerLayer { + pub fn new(l2_shared_bridge_addr: Address, l2_block_seal_queue_capacity: usize) -> Self { + Self { + l2_shared_bridge_addr, + l2_block_seal_queue_capacity, + pre_insert_txs: false, + protective_reads_persistence_enabled: true, + } + } + + pub fn with_pre_insert_txs(mut self, pre_insert_txs: bool) -> Self { + self.pre_insert_txs = pre_insert_txs; + self + } + + pub fn with_protective_reads_persistence_enabled( + mut self, + protective_reads_persistence_enabled: bool, + ) -> Self { + self.protective_reads_persistence_enabled = protective_reads_persistence_enabled; + self + } +} + +#[async_trait::async_trait] +impl WiringLayer for OutputHandlerLayer { + fn layer_name(&self) -> &'static str { + "state_keeper_output_handler_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Fetch required resources. + let master_pool = context.get_resource::>().await?; + // Use `SyncState` if provided. + let sync_state = match context.get_resource::().await { + Ok(sync_state) => Some(sync_state.0), + Err(WiringError::ResourceLacking { .. }) => None, + Err(err) => return Err(err), + }; + + // Create L2 block sealer task and output handler. + // L2 Block sealing process is parallelized, so we have to provide enough pooled connections. + let persistence_pool = master_pool + .get_custom(L2BlockSealProcess::subtasks_len()) + .await + .context("Get master pool")?; + let (mut persistence, l2_block_sealer) = StateKeeperPersistence::new( + persistence_pool.clone(), + self.l2_shared_bridge_addr, + self.l2_block_seal_queue_capacity, + ); + if self.pre_insert_txs { + persistence = persistence.with_tx_insertion(); + } + if !self.protective_reads_persistence_enabled { + // **Important:** Disabling protective reads persistence is only sound if the node will never + // run a full Merkle tree. + tracing::warn!("Disabling persisting protective reads; this should be safe, but is considered an experimental option at the moment"); + persistence = persistence.without_protective_reads(); + } + + let tree_writes_persistence = TreeWritesPersistence::new(persistence_pool); + let mut output_handler = OutputHandler::new(Box::new(persistence)) + .with_handler(Box::new(tree_writes_persistence)); + if let Some(sync_state) = sync_state { + output_handler = output_handler.with_handler(Box::new(sync_state)); + } + context.insert_resource(OutputHandlerResource(Unique::new(output_handler)))?; + context.add_task(Box::new(L2BlockSealerTask(l2_block_sealer))); + + Ok(()) + } +} + +#[derive(Debug)] +struct L2BlockSealerTask(zksync_state_keeper::L2BlockSealerTask); + +#[async_trait::async_trait] +impl Task for L2BlockSealerTask { + fn id(&self) -> TaskId { + "state_keeper/l2_block_sealer".into() + } + + async fn run(self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { + // Miniblock sealer will exit itself once sender is dropped. + self.0.run().await + } +} diff --git a/core/node/node_framework/src/implementations/layers/sync_state_updater.rs b/core/node/node_framework/src/implementations/layers/sync_state_updater.rs new file mode 100644 index 000000000000..fcbe51f581e1 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/sync_state_updater.rs @@ -0,0 +1,75 @@ +use zksync_dal::{ConnectionPool, Core}; +use zksync_node_sync::SyncState; +use zksync_web3_decl::client::{DynClient, L2}; + +use crate::{ + implementations::resources::{ + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + sync_state::SyncStateResource, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +/// Runs the dynamic sync state updater for `SyncState` if no `SyncState` was provided before. +/// This layer may be used as a fallback for EN API if API server runs without the core component. +#[derive(Debug)] +pub struct SyncStateUpdaterLayer; + +#[async_trait::async_trait] +impl WiringLayer for SyncStateUpdaterLayer { + fn layer_name(&self) -> &'static str { + "sync_state_updater_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + if context.get_resource::().await.is_ok() { + // `SyncState` was provided by some other layer -- we assume that the layer that added this resource + // will be responsible for its maintenance. + tracing::info!( + "SyncState was provided by another layer, skipping SyncStateUpdaterLayer" + ); + return Ok(()); + } + + let pool = context.get_resource::>().await?; + let MainNodeClientResource(main_node_client) = context.get_resource().await?; + + let sync_state = SyncState::default(); + + // Insert resource. + context.insert_resource(SyncStateResource(sync_state.clone()))?; + + // Insert task + context.add_task(Box::new(SyncStateUpdater { + sync_state, + connection_pool: pool.get().await?, + main_node_client, + })); + + Ok(()) + } +} + +#[derive(Debug)] +struct SyncStateUpdater { + sync_state: SyncState, + connection_pool: ConnectionPool, + main_node_client: Box>, +} + +#[async_trait::async_trait] +impl Task for SyncStateUpdater { + fn id(&self) -> TaskId { + "sync_state_updater".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.sync_state + .run_updater(self.connection_pool, self.main_node_client, stop_receiver.0) + .await?; + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/tree_data_fetcher.rs b/core/node/node_framework/src/implementations/layers/tree_data_fetcher.rs new file mode 100644 index 000000000000..c45071ce418b --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/tree_data_fetcher.rs @@ -0,0 +1,67 @@ +use zksync_node_sync::tree_data_fetcher::TreeDataFetcher; +use zksync_types::Address; + +use crate::{ + implementations::resources::{ + eth_interface::EthInterfaceResource, + healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + }, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct TreeDataFetcherLayer { + diamond_proxy_addr: Address, +} + +impl TreeDataFetcherLayer { + pub fn new(diamond_proxy_addr: Address) -> Self { + Self { diamond_proxy_addr } + } +} + +#[async_trait::async_trait] +impl WiringLayer for TreeDataFetcherLayer { + fn layer_name(&self) -> &'static str { + "tree_data_fetcher_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let pool = context.get_resource::>().await?; + let MainNodeClientResource(client) = context.get_resource().await?; + let EthInterfaceResource(eth_client) = context.get_resource().await?; + + tracing::warn!( + "Running tree data fetcher (allows a node to operate w/o a Merkle tree or w/o waiting the tree to catch up). \ + This is an experimental feature; do not use unless you know what you're doing" + ); + let fetcher = TreeDataFetcher::new(client, pool.get().await?) + .with_l1_data(eth_client, self.diamond_proxy_addr)?; + + // Insert healthcheck + let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; + app_health + .insert_component(fetcher.health_check()) + .map_err(WiringError::internal)?; + + // Insert task + context.add_task(Box::new(fetcher)); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Task for TreeDataFetcher { + fn id(&self) -> TaskId { + "tree_data_fetcher".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs b/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs new file mode 100644 index 000000000000..0f04a35d484a --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/validate_chain_ids.rs @@ -0,0 +1,61 @@ +use zksync_node_sync::validate_chain_ids_task::ValidateChainIdsTask; +use zksync_types::{L1ChainId, L2ChainId}; + +use crate::{ + implementations::resources::{ + eth_interface::EthInterfaceResource, main_node_client::MainNodeClientResource, + }, + precondition::Precondition, + service::{ServiceContext, StopReceiver}, + task::TaskId, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct ValidateChainIdsLayer { + l1_chain_id: L1ChainId, + l2_chain_id: L2ChainId, +} + +impl ValidateChainIdsLayer { + pub fn new(l1_chain_id: L1ChainId, l2_chain_id: L2ChainId) -> Self { + Self { + l1_chain_id, + l2_chain_id, + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for ValidateChainIdsLayer { + fn layer_name(&self) -> &'static str { + "validate_chain_ids_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + let EthInterfaceResource(query_client) = context.get_resource().await?; + let MainNodeClientResource(main_node_client) = context.get_resource().await?; + + let task = ValidateChainIdsTask::new( + self.l1_chain_id, + self.l2_chain_id, + query_client, + main_node_client, + ); + + context.add_precondition(Box::new(task)); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Precondition for ValidateChainIdsTask { + fn id(&self) -> TaskId { + "validate_chain_ids".into() + } + + async fn check(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run_once(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/web3_api/server.rs b/core/node/node_framework/src/implementations/layers/web3_api/server.rs index c81b475c3ec4..da0d9d3cc33a 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/server.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/server.rs @@ -27,8 +27,11 @@ pub struct Web3ServerOptionalConfig { pub batch_request_size_limit: Option, pub response_body_size_limit: Option, pub websocket_requests_per_minute_limit: Option, - // used by circuit breaker. + pub with_extended_tracing: bool, + // Used by circuit breaker. pub replication_lag_limit: Option, + // Used by the external node. + pub pruning_info_refresh_interval: Option, } impl Web3ServerOptionalConfig { @@ -132,7 +135,8 @@ impl WiringLayer for Web3ServerLayer { ApiBuilder::jsonrpsee_backend(self.internal_api_config, replica_pool.clone()) .with_updaters_pool(updaters_pool) .with_tx_sender(tx_sender) - .with_mempool_cache(mempool_cache); + .with_mempool_cache(mempool_cache) + .with_extended_tracing(self.optional_config.with_extended_tracing); if let Some(client) = tree_api_client { api_builder = api_builder.with_tree_api(client); } @@ -147,6 +151,12 @@ impl WiringLayer for Web3ServerLayer { if let Some(sync_state) = sync_state { api_builder = api_builder.with_sync_state(sync_state); } + if let Some(pruning_info_refresh_interval) = + self.optional_config.pruning_info_refresh_interval + { + api_builder = + api_builder.with_pruning_info_refresh_interval(pruning_info_refresh_interval); + } let replication_lag_limit = self.optional_config.replication_lag_limit; api_builder = self.optional_config.apply(api_builder); let server = api_builder.build()?; diff --git a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs index 8a717258cb46..010778315e58 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/tx_sender.rs @@ -1,14 +1,22 @@ -use std::{fmt, sync::Arc}; +use std::{fmt, sync::Arc, time::Duration}; +use tokio::sync::RwLock; use zksync_node_api_server::{ execution_sandbox::{VmConcurrencyBarrier, VmConcurrencyLimiter}, tx_sender::{ApiContracts, TxSenderBuilder, TxSenderConfig}, }; use zksync_state::PostgresStorageCaches; +use zksync_types::Address; +use zksync_web3_decl::{ + client::{DynClient, L2}, + jsonrpsee, + namespaces::EnNamespaceClient as _, +}; use crate::{ implementations::resources::{ fee_input::FeeInputResource, + main_node_client::MainNodeClientResource, pools::{PoolResource, ReplicaPool}, state_keeper::ConditionalSealerResource, web3_api::{TxSenderResource, TxSinkResource}, @@ -31,6 +39,7 @@ pub struct TxSenderLayer { postgres_storage_caches_config: PostgresStorageCachesConfig, max_vm_concurrency: usize, api_contracts: ApiContracts, + whitelisted_tokens_for_aa_cache: bool, } impl TxSenderLayer { @@ -45,8 +54,18 @@ impl TxSenderLayer { postgres_storage_caches_config, max_vm_concurrency, api_contracts, + whitelisted_tokens_for_aa_cache: false, } } + + /// Enables the task for fetching the whitelisted tokens for the AA cache from the main node. + /// Disabled by default. + /// + /// Requires `MainNodeClientResource` to be present. + pub fn with_whitelisted_tokens_for_aa_cache(mut self, value: bool) -> Self { + self.whitelisted_tokens_for_aa_cache = value; + self + } } #[async_trait::async_trait] @@ -96,6 +115,18 @@ impl WiringLayer for TxSenderLayer { if let Some(sealer) = sealer { tx_sender = tx_sender.with_sealer(sealer); } + + // Add the task for updating the whitelisted tokens for the AA cache. + if self.whitelisted_tokens_for_aa_cache { + let MainNodeClientResource(main_node_client) = context.get_resource().await?; + let whitelisted_tokens = Arc::new(RwLock::new(Default::default())); + context.add_task(Box::new(WhitelistedTokensForAaUpdateTask { + whitelisted_tokens: whitelisted_tokens.clone(), + main_node_client, + })); + tx_sender = tx_sender.with_whitelisted_tokens_for_aa(whitelisted_tokens); + } + let tx_sender = tx_sender.build( fee_input, Arc::new(vm_concurrency_limiter), @@ -153,3 +184,40 @@ impl Task for VmConcurrencyBarrierTask { Ok(()) } } + +#[derive(Debug)] +struct WhitelistedTokensForAaUpdateTask { + whitelisted_tokens: Arc>>, + main_node_client: Box>, +} + +#[async_trait::async_trait] +impl Task for WhitelistedTokensForAaUpdateTask { + fn id(&self) -> TaskId { + "whitelisted_tokens_for_aa_update_task".into() + } + + async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + while !*stop_receiver.0.borrow_and_update() { + match self.main_node_client.whitelisted_tokens_for_aa().await { + Ok(tokens) => { + *self.whitelisted_tokens.write().await = tokens; + } + Err(jsonrpsee::core::client::Error::Call(error)) + if error.code() == jsonrpsee::types::error::METHOD_NOT_FOUND_CODE => + { + // Method is not supported by the main node, do nothing. + } + Err(err) => { + tracing::error!("Failed to query `whitelisted_tokens_for_aa`, error: {err:?}"); + } + } + + // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. + tokio::time::timeout(Duration::from_secs(30), stop_receiver.0.changed()) + .await + .ok(); + } + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/web3_api/tx_sink.rs b/core/node/node_framework/src/implementations/layers/web3_api/tx_sink.rs index df4812b3c098..98ed50ba9e45 100644 --- a/core/node/node_framework/src/implementations/layers/web3_api/tx_sink.rs +++ b/core/node/node_framework/src/implementations/layers/web3_api/tx_sink.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use zksync_node_api_server::tx_sender::{master_pool_sink::MasterPoolSink, proxy::TxProxy}; +use zksync_node_api_server::tx_sender::{ + master_pool_sink::MasterPoolSink, + proxy::{AccountNonceSweeperTask, TxProxy}, +}; use crate::{ implementations::resources::{ @@ -8,7 +11,8 @@ use crate::{ pools::{MasterPool, PoolResource}, web3_api::TxSinkResource, }, - service::ServiceContext, + service::{ServiceContext, StopReceiver}, + task::{Task, TaskId}, wiring_layer::{WiringError, WiringLayer}, }; @@ -37,10 +41,31 @@ impl WiringLayer for TxSinkLayer { } TxSinkLayer::ProxySink => { let MainNodeClientResource(client) = context.get_resource().await?; - TxSinkResource(Arc::new(TxProxy::new(client))) + let proxy = TxProxy::new(client); + + let pool = context + .get_resource::>() + .await? + .get_singleton() + .await?; + let task = proxy.account_nonce_sweeper_task(pool); + context.add_task(Box::new(task)); + + TxSinkResource(Arc::new(proxy)) } }; context.insert_resource(tx_sink)?; Ok(()) } } + +#[async_trait::async_trait] +impl Task for AccountNonceSweeperTask { + fn id(&self) -> TaskId { + "account_nonce_sweeper_task".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_sync/src/validate_chain_ids_task.rs b/core/node/node_sync/src/validate_chain_ids_task.rs index 5a75cb384aec..1414b5ab6014 100644 --- a/core/node/node_sync/src/validate_chain_ids_task.rs +++ b/core/node/node_sync/src/validate_chain_ids_task.rs @@ -138,6 +138,23 @@ impl ValidateChainIdsTask { } } + /// Runs the task once, exiting either when all the checks are performed or when the stop signal is received. + pub async fn run_once(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let eth_client_check = Self::check_eth_client(self.eth_client, self.l1_chain_id); + let main_node_l1_check = + Self::check_l1_chain_using_main_node(self.main_node_client.clone(), self.l1_chain_id); + let main_node_l2_check = + Self::check_l2_chain_using_main_node(self.main_node_client, self.l2_chain_id); + let joined_futures = + futures::future::try_join3(eth_client_check, main_node_l1_check, main_node_l2_check) + .fuse(); + tokio::select! { + res = joined_futures => res.map(drop), + _ = stop_receiver.changed() => Ok(()), + } + } + + /// Runs the task until the stop signal is received. pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { // Since check futures are fused, they are safe to poll after getting resolved; they will never resolve again, // so we'll just wait for another check or a stop signal. diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index a16b9920dd6e..f3f947d0d1e6 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -32,6 +32,12 @@ use crate::{ #[derive(Debug, Clone)] pub struct MainBatchExecutor { save_call_traces: bool, + /// Whether batch executor would allow transactions with bytecode that cannot be compressed. + /// For new blocks, bytecode compression is mandatory -- if bytecode compression is not supported, + /// the transaction will be rejected. + /// Note that this flag, if set to `true`, is strictly more permissive than if set to `false`. It means + /// that in cases where the node is expected to process any transactions processed by the sequencer + /// regardless of its configuration, this flag should be set to `true`. optional_bytecode_compression: bool, } @@ -218,6 +224,8 @@ impl CommandReceiver { result } + /// Attempts to execute transaction with or without bytecode compression. + /// If compression fails, the transaction will be re-executed without compression. fn execute_tx_in_vm_with_optional_compression( &self, tx: &Transaction, @@ -283,10 +291,8 @@ impl CommandReceiver { (result.1, compressed_bytecodes, trace) } - // Err when transaction is rejected. - // `Ok(TxExecutionStatus::Success)` when the transaction succeeded - // `Ok(TxExecutionStatus::Failure)` when the transaction failed. - // Note that failed transactions are considered properly processed and are included in blocks + /// Attempts to execute transaction with mandatory bytecode compression. + /// If bytecode compression fails, the transaction will be rejected. fn execute_tx_in_vm( &self, tx: &Transaction, diff --git a/etc/env/configs/ext-node.toml b/etc/env/configs/ext-node.toml index eb07aa387542..145b1455ab93 100644 --- a/etc/env/configs/ext-node.toml +++ b/etc/env/configs/ext-node.toml @@ -55,6 +55,8 @@ url = "http://127.0.0.1:3050" # Here we use TOML multiline strings: newlines will be trimmed. log = """\ warn,\ +zksync_node_framework=info,\ +zksync_node_consensus=info,\ zksync_consensus_bft=info,\ zksync_consensus_network=info,\ zksync_consensus_storage=info,\