diff --git a/Cargo.lock b/Cargo.lock index a5093d36a7cf..750f64f794af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9032,6 +9032,7 @@ dependencies = [ "zksync_node_db_pruner", "zksync_node_fee_model", "zksync_node_framework_derive", + "zksync_node_storage_init", "zksync_node_sync", "zksync_object_store", "zksync_proof_data_handler", @@ -9080,6 +9081,28 @@ dependencies = [ "zksync_utils", ] +[[package]] +name = "zksync_node_storage_init" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "tokio", + "tracing", + "zksync_block_reverter", + "zksync_config", + "zksync_dal", + "zksync_health_check", + "zksync_node_genesis", + "zksync_node_sync", + "zksync_object_store", + "zksync_reorg_detector", + "zksync_shared_metrics", + "zksync_snapshots_applier", + "zksync_types", + "zksync_web3_decl", +] + [[package]] name = "zksync_node_sync" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 2095ce536d87..34e5cb6141c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "core/node/consistency_checker", "core/node/metadata_calculator", "core/node/node_sync", + "core/node/node_storage_init", "core/node/consensus", "core/node/contract_verification_server", "core/node/api_server", @@ -277,6 +278,7 @@ zksync_reorg_detector = { path = "core/node/reorg_detector" } zksync_consistency_checker = { path = "core/node/consistency_checker" } zksync_metadata_calculator = { path = "core/node/metadata_calculator" } zksync_node_sync = { path = "core/node/node_sync" } +zksync_node_storage_init = { path = "core/node/node_storage_init" } zksync_node_consensus = { path = "core/node/consensus" } zksync_contract_verification_server = { path = "core/node/contract_verification_server" } zksync_node_api_server = { path = "core/node/api_server" } diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 28f9aa2c422b..a56e51953899 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -3,6 +3,7 @@ use std::time::Instant; use anyhow::Context as _; +use tokio::sync::watch; use zksync_config::ObjectStoreConfig; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_health_check::AppHealthCheck; @@ -30,6 +31,7 @@ enum InitDecision { } pub(crate) async fn ensure_storage_initialized( + stop_receiver: watch::Receiver, pool: ConnectionPool, main_node_client: Box>, app_health: &AppHealthCheck, @@ -120,7 +122,7 @@ pub(crate) async fn ensure_storage_initialized( let recovery_started_at = Instant::now(); let stats = snapshots_applier_task - .run() + .run(stop_receiver) .await .context("snapshot recovery failed")?; if stats.done_work { @@ -129,6 +131,10 @@ pub(crate) async fn ensure_storage_initialized( .set(latency); tracing::info!("Recovered Postgres from snapshot in {latency:?}"); } + assert!( + !stats.canceled, + "Snapshot recovery task cannot be canceled in the current implementation" + ); } } Ok(()) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index e3ee987a6e62..75c3a7b8861b 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -976,7 +976,10 @@ async fn run_node( .snapshots_recovery_drop_storage_key_preimages, object_store_config: config.optional.snapshots_recovery_object_store.clone(), }); + // Note: while stop receiver is passed there, it won't be respected, since we wait this task + // to complete. Will be fixed after migration to the node framework. ensure_storage_initialized( + stop_receiver.clone(), connection_pool.clone(), main_node_client.clone(), &app_health, diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index e58ece5fdf66..43325be7441b 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -23,11 +23,16 @@ use zksync_node_framework::{ main_node_client::MainNodeClientLayer, main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer, metadata_calculator::MetadataCalculatorLayer, + node_storage_init::{ + external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig}, + NodeStorageInitializerLayer, + }, pools_layer::PoolsLayerBuilder, postgres_metrics::PostgresMetricsLayer, prometheus_exporter::PrometheusExporterLayer, pruning::PruningLayer, query_eth_client::QueryEthClientLayer, + reorg_detector::ReorgDetectorLayer, sigint::SigintHandlerLayer, state_keeper::{ external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer, @@ -421,6 +426,49 @@ impl ExternalNodeBuilder { Ok(self) } + fn add_reorg_detector_layer(mut self) -> anyhow::Result { + self.node.add_layer(ReorgDetectorLayer); + Ok(self) + } + + /// This layer will make sure that the database is initialized correctly, + /// e.g.: + /// - genesis or snapshot recovery will be performed if it's required. + /// - we perform the storage rollback if required (e.g. if reorg is detected). + /// + /// Depending on the `kind` provided, either a task or a precondition will be added. + /// + /// *Important*: the task should be added by at most one component, because + /// it assumes unique control over the database. Multiple components adding this + /// layer in a distributed mode may result in the database corruption. + /// + /// This task works in pair with precondition, which must be present in every component: + /// the precondition will prevent node from starting until the database is initialized. + fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result { + let config = &self.config; + let snapshot_recovery_config = + config + .optional + .snapshots_recovery_enabled + .then_some(SnapshotRecoveryConfig { + snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch, + drop_storage_key_preimages: config + .experimental + .snapshots_recovery_drop_storage_key_preimages, + object_store_config: config.optional.snapshots_recovery_object_store.clone(), + }); + self.node.add_layer(ExternalNodeInitStrategyLayer { + l2_chain_id: self.config.required.l2_chain_id, + snapshot_recovery_config, + }); + let mut layer = NodeStorageInitializerLayer::new(); + if matches!(kind, LayerKind::Precondition) { + layer = layer.as_precondition(); + } + self.node.add_layer(layer); + Ok(self) + } + pub fn build(mut self, mut components: Vec) -> anyhow::Result { // Add "base" layers self = self @@ -429,12 +477,14 @@ impl ExternalNodeBuilder { .add_prometheus_exporter_layer()? .add_pools_layer()? .add_main_node_client_layer()? - .add_query_eth_client_layer()?; + .add_query_eth_client_layer()? + .add_reorg_detector_layer()?; // Add preconditions for all the components. self = self .add_l1_batch_commitment_mode_validation_layer()? - .add_validate_chain_ids_layer()?; + .add_validate_chain_ids_layer()? + .add_storage_initialization_layer(LayerKind::Precondition)?; // Sort the components, so that the components they may depend on each other are added in the correct order. components.sort_unstable_by_key(|component| match component { @@ -499,6 +549,10 @@ impl ExternalNodeBuilder { .add_consistency_checker_layer()? .add_commitment_generator_layer()? .add_batch_status_updater_layer()?; + + // We assign the storage initialization to the core, as it's considered to be + // the "main" component. + self = self.add_storage_initialization_layer(LayerKind::Task)?; } } } @@ -506,3 +560,10 @@ impl ExternalNodeBuilder { Ok(self.node.build()?) } } + +/// Marker for layers that can add either a task or a precondition. +#[derive(Debug)] +enum LayerKind { + Task, + Precondition, +} diff --git a/core/bin/zksync_server/src/main.rs b/core/bin/zksync_server/src/main.rs index 654d4b772006..4612a737bacc 100644 --- a/core/bin/zksync_server/src/main.rs +++ b/core/bin/zksync_server/src/main.rs @@ -21,12 +21,10 @@ use zksync_config::{ SnapshotsCreatorConfig, }; use zksync_core_leftovers::{ - genesis_init, is_genesis_needed, temp_config_store::{decode_yaml_repr, TempConfigStore}, Component, Components, }; use zksync_env_config::FromEnv; -use zksync_eth_client::clients::Client; use crate::node_builder::MainNodeBuilder; @@ -42,9 +40,6 @@ struct Cli { /// Generate genesis block for the first contract deployment using temporary DB. #[arg(long)] genesis: bool, - /// Rebuild tree. - #[arg(long)] - rebuild_tree: bool, /// Comma-separated list of components to launch. #[arg( long, @@ -180,18 +175,6 @@ fn main() -> anyhow::Result<()> { } }; - run_genesis_if_needed(opt.genesis, &genesis, &contracts_config, &secrets)?; - if opt.genesis { - // If genesis is requested, we don't need to run the node. - return Ok(()); - } - - let components = if opt.rebuild_tree { - vec![Component::Tree] - } else { - opt.components.0 - }; - let node = MainNodeBuilder::new( configs, wallets, @@ -199,46 +182,16 @@ fn main() -> anyhow::Result<()> { contracts_config, secrets, consensus, - ) - .build(components)?; - node.run()?; - Ok(()) -} + ); -fn run_genesis_if_needed( - force_genesis: bool, - genesis: &GenesisConfig, - contracts_config: &ContractsConfig, - secrets: &Secrets, -) -> anyhow::Result<()> { - let tokio_runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; - tokio_runtime.block_on(async move { - let database_secrets = secrets.database.clone().context("DatabaseSecrets")?; - if force_genesis || is_genesis_needed(&database_secrets).await { - genesis_init(genesis.clone(), &database_secrets) - .await - .context("genesis_init")?; + if opt.genesis { + // If genesis is requested, we don't need to run the node. + node.only_genesis()?.run()?; + return Ok(()); + } - if let Some(ecosystem_contracts) = &contracts_config.ecosystem_contracts { - let l1_secrets = secrets.l1.as_ref().context("l1_screts")?; - let query_client = Client::http(l1_secrets.l1_rpc_url.clone()) - .context("Ethereum client")? - .for_network(genesis.l1_chain_id.into()) - .build(); - zksync_node_genesis::save_set_chain_id_tx( - &query_client, - contracts_config.diamond_proxy_addr, - ecosystem_contracts.state_transition_proxy_addr, - &database_secrets, - ) - .await - .context("Failed to save SetChainId upgrade transaction")?; - } - } - Ok(()) - }) + node.build(opt.components.0)?.run()?; + Ok(()) } fn load_env_config() -> anyhow::Result { diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 3f8995d2efdc..46cafe227f9a 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -35,6 +35,9 @@ use zksync_node_framework::{ l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer, l1_gas::SequencerL1GasLayer, metadata_calculator::MetadataCalculatorLayer, + node_storage_init::{ + main_node_strategy::MainNodeInitStrategyLayer, NodeStorageInitializerLayer, + }, object_store::ObjectStoreLayer, pk_signing_eth_client::PKSigningEthClientLayer, pools_layer::PoolsLayerBuilder, @@ -532,6 +535,41 @@ impl MainNodeBuilder { Ok(self) } + /// This layer will make sure that the database is initialized correctly, + /// e.g. genesis will be performed if it's required. + /// + /// Depending on the `kind` provided, either a task or a precondition will be added. + /// + /// *Important*: the task should be added by at most one component, because + /// it assumes unique control over the database. Multiple components adding this + /// layer in a distributed mode may result in the database corruption. + /// + /// This task works in pair with precondition, which must be present in every component: + /// the precondition will prevent node from starting until the database is initialized. + fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result { + self.node.add_layer(MainNodeInitStrategyLayer { + genesis: self.genesis_config.clone(), + contracts: self.contracts_config.clone(), + }); + let mut layer = NodeStorageInitializerLayer::new(); + if matches!(kind, LayerKind::Precondition) { + layer = layer.as_precondition(); + } + self.node.add_layer(layer); + Ok(self) + } + + /// Builds the node with the genesis initialization task only. + pub fn only_genesis(mut self) -> anyhow::Result { + self = self + .add_pools_layer()? + .add_query_eth_client_layer()? + .add_storage_initialization_layer(LayerKind::Task)?; + + Ok(self.node.build()?) + } + + /// Builds the node with the specified components. pub fn build(mut self, mut components: Vec) -> anyhow::Result { // Add "base" layers (resources and helper tasks). self = self @@ -542,8 +580,12 @@ impl MainNodeBuilder { .add_healthcheck_layer()? .add_prometheus_exporter_layer()? .add_query_eth_client_layer()? - .add_sequencer_l1_gas_layer()? - .add_l1_batch_commitment_mode_validation_layer()?; + .add_sequencer_l1_gas_layer()?; + + // Add preconditions for all the components. + self = self + .add_l1_batch_commitment_mode_validation_layer()? + .add_storage_initialization_layer(LayerKind::Precondition)?; // Sort the components, so that the components they may depend on each other are added in the correct order. components.sort_unstable_by_key(|component| match component { @@ -557,6 +599,13 @@ impl MainNodeBuilder { // Note that the layers are added only once, so it's fine to add the same layer multiple times. for component in &components { match component { + Component::StateKeeper => { + // State keeper is the core component of the sequencer, + // which is why we consider it to be responsible for the storage initialization. + self = self + .add_storage_initialization_layer(LayerKind::Task)? + .add_state_keeper_layer()?; + } Component::HttpApi => { self = self .add_tx_sender_layer()? @@ -596,9 +645,6 @@ impl MainNodeBuilder { Component::EthTxManager => { self = self.add_eth_tx_manager_layer()?; } - Component::StateKeeper => { - self = self.add_state_keeper_layer()?; - } Component::TeeVerifierInputProducer => { self = self.add_tee_verifier_input_producer_layer()?; } @@ -633,3 +679,10 @@ impl MainNodeBuilder { Ok(self.node.build()?) } } + +/// Marker for layers that can add either a task or a precondition. +#[derive(Debug)] +enum LayerKind { + Task, + Precondition, +} diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index 0ee4b2a901f6..d2231f730b17 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::Context as _; use async_trait::async_trait; use serde::Serialize; -use tokio::sync::Semaphore; +use tokio::sync::{watch, Semaphore}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError, SqlxError}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::{ObjectStore, ObjectStoreError}; @@ -76,6 +76,8 @@ enum SnapshotsApplierError { Fatal(#[from] anyhow::Error), #[error(transparent)] Retryable(anyhow::Error), + #[error("Snapshot recovery has been canceled")] + Canceled, } impl SnapshotsApplierError { @@ -245,6 +247,8 @@ impl SnapshotsApplierConfig { pub struct SnapshotApplierTaskStats { /// Did the task do any work? pub done_work: bool, + /// Was the task canceled? + pub canceled: bool, } #[derive(Debug)] @@ -339,13 +343,23 @@ impl SnapshotsApplierTask { /// or under any of the following conditions: /// /// - There are no snapshots on the main node - pub async fn run(self) -> anyhow::Result { + pub async fn run( + self, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result { tracing::info!("Starting snapshot recovery with config: {:?}", self.config); let mut backoff = self.config.initial_retry_backoff; let mut last_error = None; for retry_id in 0..self.config.retry_count { - let result = SnapshotsApplier::load_snapshot(&self).await; + if *stop_receiver.borrow() { + return Ok(SnapshotApplierTaskStats { + done_work: false, // Not really relevant, since the node will be shut down. + canceled: true, + }); + } + + let result = SnapshotsApplier::load_snapshot(&self, &mut stop_receiver).await; match result { Ok((strategy, final_status)) => { @@ -357,6 +371,7 @@ impl SnapshotsApplierTask { self.health_updater.freeze(); return Ok(SnapshotApplierTaskStats { done_work: !matches!(strategy, SnapshotRecoveryStrategy::Completed), + canceled: false, }); } Err(SnapshotsApplierError::Fatal(err)) => { @@ -370,9 +385,19 @@ impl SnapshotsApplierTask { "Recovering from error; attempt {retry_id} / {}, retrying in {backoff:?}", self.config.retry_count ); - tokio::time::sleep(backoff).await; + tokio::time::timeout(backoff, stop_receiver.changed()) + .await + .ok(); + // Stop receiver will be checked on the next iteration. backoff = backoff.mul_f32(self.config.retry_backoff_multiplier); } + Err(SnapshotsApplierError::Canceled) => { + tracing::info!("Snapshot recovery has been canceled"); + return Ok(SnapshotApplierTaskStats { + done_work: false, + canceled: true, + }); + } } } @@ -637,6 +662,7 @@ impl<'a> SnapshotsApplier<'a> { /// Returns final snapshot recovery status. async fn load_snapshot( task: &'a SnapshotsApplierTask, + stop_receiver: &mut watch::Receiver, ) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> { let health_updater = &task.health_updater; let connection_pool = &task.connection_pool; @@ -717,7 +743,7 @@ impl<'a> SnapshotsApplier<'a> { this.factory_deps_recovered = true; this.update_health(); - this.recover_storage_logs().await?; + this.recover_storage_logs(stop_receiver).await?; for is_chunk_processed in &mut this.applied_snapshot_status.storage_logs_chunks_processed { *is_chunk_processed = true; } @@ -900,7 +926,10 @@ impl<'a> SnapshotsApplier<'a> { Ok(()) } - async fn recover_storage_logs(&self) -> Result<(), SnapshotsApplierError> { + async fn recover_storage_logs( + &self, + stop_receiver: &mut watch::Receiver, + ) -> Result<(), SnapshotsApplierError> { let effective_concurrency = (self.connection_pool.max_size() as usize).min(self.max_concurrency); tracing::info!( @@ -917,7 +946,16 @@ impl<'a> SnapshotsApplier<'a> { .map(|(chunk_id, _)| { self.recover_storage_logs_single_chunk(&semaphore, chunk_id as u64) }); - futures::future::try_join_all(tasks).await?; + let job_completion = futures::future::try_join_all(tasks); + + tokio::select! { + res = job_completion => { + res?; + }, + _ = stop_receiver.changed() => { + return Err(SnapshotsApplierError::Canceled); + } + } let mut storage = self .connection_pool diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 51578b5090d8..379808b365ca 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -84,7 +84,8 @@ async fn snapshots_creator_can_successfully_recover_db( object_store.clone(), ); let task_health = task.health_check(); - let stats = task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let stats = task.run(stop_receiver).await.unwrap(); assert!(stats.done_work); assert_matches!( task_health.check_health().await.status(), @@ -138,7 +139,9 @@ async fn snapshots_creator_can_successfully_recover_db( Box::new(client.clone()), object_store.clone(), ); - task.run().await.unwrap(); + + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap(); // Here, stats would unfortunately have `done_work: true` because work detection isn't smart enough. // Emulate a node processing data after recovery. @@ -161,7 +164,8 @@ async fn snapshots_creator_can_successfully_recover_db( Box::new(client), object_store, ); - let stats = task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let stats = task.run(stop_receiver).await.unwrap(); assert!(!stats.done_work); } @@ -182,7 +186,8 @@ async fn applier_recovers_v0_snapshot(drop_storage_key_preimages: bool) { if drop_storage_key_preimages { task.drop_storage_key_preimages(); } - let stats = task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let stats = task.run(stop_receiver).await.unwrap(); assert!(stats.done_work); let mut storage = pool.connection().await.unwrap(); @@ -226,7 +231,8 @@ async fn applier_recovers_explicitly_specified_snapshot() { object_store, ); task.set_snapshot_l1_batch(expected_status.l1_batch_number); - let stats = task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let stats = task.run(stop_receiver).await.unwrap(); assert!(stats.done_work); let mut storage = pool.connection().await.unwrap(); @@ -252,7 +258,8 @@ async fn applier_error_for_missing_explicitly_specified_snapshot() { ); task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1); - let err = task.run().await.unwrap_err(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let err = task.run(stop_receiver).await.unwrap_err(); assert!( format!("{err:#}").contains("not present on main node"), "{err:#}" @@ -277,7 +284,8 @@ async fn snapshot_applier_recovers_after_stopping() { Box::new(client.clone()), Arc::new(stopping_object_store), ); - let task_handle = tokio::spawn(task.run()); + let (_stop_sender, task_stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(task_stop_receiver)); // Wait until the first storage logs chunk is requested (the object store hangs up at this point) stop_receiver.wait_for(|&count| count > 1).await.unwrap(); @@ -313,7 +321,8 @@ async fn snapshot_applier_recovers_after_stopping() { Box::new(client.clone()), Arc::new(stopping_object_store), ); - let task_handle = tokio::spawn(task.run()); + let (_stop_sender, task_stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(task_stop_receiver)); stop_receiver.wait_for(|&count| count > 3).await.unwrap(); assert!(!task_handle.is_finished()); @@ -340,7 +349,8 @@ async fn snapshot_applier_recovers_after_stopping() { Arc::new(stopping_object_store), ); task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine - task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap(); assert_eq!( is_recovery_completed(&pool, &client).await, @@ -411,7 +421,8 @@ async fn health_status_immediately_after_task_start() { object_store, ); let task_health = task.health_check(); - let task_handle = tokio::spawn(task.run()); + let (_stop_sender, task_stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(task_stop_receiver)); client.0.wait().await; // Wait for the first L2 client call (at which point, the task is certainly initialized) assert_matches!( @@ -465,7 +476,8 @@ async fn applier_errors_after_genesis() { Box::new(client), object_store, ); - task.run().await.unwrap_err(); + let (_stop_sender, task_stop_receiver) = watch::channel(false); + task.run(task_stop_receiver).await.unwrap_err(); } #[tokio::test] @@ -480,7 +492,8 @@ async fn applier_errors_without_snapshots() { Box::new(client), object_store, ); - task.run().await.unwrap_err(); + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap_err(); } #[tokio::test] @@ -499,7 +512,8 @@ async fn applier_errors_with_unrecognized_snapshot_version() { Box::new(client), object_store, ); - task.run().await.unwrap_err(); + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap_err(); } #[tokio::test] @@ -518,7 +532,8 @@ async fn applier_returns_error_on_fatal_object_store_error() { Box::new(client), Arc::new(object_store), ); - let err = task.run().await.unwrap_err(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let err = task.run(stop_receiver).await.unwrap_err(); assert!(err.chain().any(|cause| { matches!( cause.downcast_ref::(), @@ -546,7 +561,8 @@ async fn applier_returns_error_after_too_many_object_store_retries() { Box::new(client), Arc::new(object_store), ); - let err = task.run().await.unwrap_err(); + let (_stop_sender, stop_receiver) = watch::channel(false); + let err = task.run(stop_receiver).await.unwrap_err(); assert!(err.chain().any(|cause| { matches!( cause.downcast_ref::(), @@ -585,7 +601,8 @@ async fn recovering_tokens() { Box::new(client.clone()), object_store.clone(), ); - let task_result = task.run().await; + let (_stop_sender, stop_receiver) = watch::channel(false); + let task_result = task.run(stop_receiver).await; assert!(task_result.is_err()); assert_eq!( @@ -601,7 +618,8 @@ async fn recovering_tokens() { Box::new(client.clone()), object_store.clone(), ); - task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap(); assert_eq!( is_recovery_completed(&pool, &client).await, @@ -635,5 +653,41 @@ async fn recovering_tokens() { Box::new(client), object_store, ); - task.run().await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap(); +} + +#[tokio::test] +async fn snapshot_applier_can_be_canceled() { + let pool = ConnectionPool::::test_pool().await; + let mut expected_status = mock_recovery_status(); + expected_status.storage_logs_chunks_processed = vec![true; 10]; + let storage_logs = random_storage_logs::(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 1); + + let mut config = SnapshotsApplierConfig::for_tests(); + config.max_concurrency = NonZeroUsize::new(1).unwrap(); + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let (task_stop_sender, task_stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(task_stop_receiver)); + + // Wait until the first storage logs chunk is requested (the object store hangs up at this point) + stop_receiver.wait_for(|&count| count > 1).await.unwrap(); + assert!(!task_handle.is_finished()); + + task_stop_sender.send(true).unwrap(); + let result = tokio::time::timeout(Duration::from_secs(5), task_handle) + .await + .expect("Task wasn't canceled") + .unwrap() + .expect("Task erred"); + assert!(result.canceled); + assert!(!result.done_work); } diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index 4e63a39d6c64..b79b86d718d0 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -2,42 +2,10 @@ use std::str::FromStr; -use anyhow::Context as _; use tokio::sync::oneshot; -use zksync_config::{configs::DatabaseSecrets, GenesisConfig}; -use zksync_dal::{ConnectionPool, Core, CoreDal as _}; -use zksync_node_genesis::{ensure_genesis_state, GenesisParams}; pub mod temp_config_store; -/// Inserts the initial information about ZKsync tokens into the database. -pub async fn genesis_init( - genesis_config: GenesisConfig, - database_secrets: &DatabaseSecrets, -) -> anyhow::Result<()> { - let db_url = database_secrets.master_url()?; - let pool = ConnectionPool::::singleton(db_url) - .build() - .await - .context("failed to build connection_pool")?; - let mut storage = pool.connection().await.context("connection()")?; - - let params = GenesisParams::load_genesis_params(genesis_config)?; - ensure_genesis_state(&mut storage, ¶ms).await?; - - Ok(()) -} - -pub async fn is_genesis_needed(database_secrets: &DatabaseSecrets) -> bool { - let db_url = database_secrets.master_url().unwrap(); - let pool = ConnectionPool::::singleton(db_url) - .build() - .await - .expect("failed to build connection_pool"); - let mut storage = pool.connection().await.expect("connection()"); - storage.blocks_dal().is_genesis_needed().await.unwrap() -} - /// Sets up an interrupt handler and returns a future that resolves once an interrupt signal /// is received. pub fn setup_sigint_handler() -> oneshot::Receiver<()> { diff --git a/core/node/genesis/src/lib.rs b/core/node/genesis/src/lib.rs index de0fc14b177b..49762f5000d5 100644 --- a/core/node/genesis/src/lib.rs +++ b/core/node/genesis/src/lib.rs @@ -5,9 +5,9 @@ use std::fmt::Formatter; use anyhow::Context as _; -use zksync_config::{configs::DatabaseSecrets, GenesisConfig}; +use zksync_config::GenesisConfig; use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes, SET_CHAIN_ID_EVENT}; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; +use zksync_dal::{Connection, Core, CoreDal, DalError}; use zksync_eth_client::EthInterface; use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction}; use zksync_multivm::utils::get_max_gas_per_pubdata_byte; @@ -270,6 +270,10 @@ pub async fn insert_genesis_batch( }) } +pub async fn is_genesis_needed(storage: &mut Connection<'_, Core>) -> Result { + Ok(storage.blocks_dal().is_genesis_needed().await?) +} + pub async fn ensure_genesis_state( storage: &mut Connection<'_, Core>, genesis_params: &GenesisParams, @@ -411,15 +415,11 @@ pub async fn create_genesis_l1_batch( // Save chain id transaction into the database // We keep returning anyhow and will refactor it later pub async fn save_set_chain_id_tx( + storage: &mut Connection<'_, Core>, query_client: &dyn EthInterface, diamond_proxy_address: Address, state_transition_manager_address: Address, - database_secrets: &DatabaseSecrets, ) -> anyhow::Result<()> { - let db_url = database_secrets.master_url()?; - let pool = ConnectionPool::::singleton(db_url).build().await?; - let mut storage = pool.connection().await?; - let to = query_client.block_number().await?.as_u64(); let from = to.saturating_sub(PRIORITY_EXPIRATION); let filter = FilterBuilder::default() diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index 554083b830c5..0edbe680ca83 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -50,6 +50,7 @@ zksync_reorg_detector.workspace = true zksync_vm_runner.workspace = true zksync_node_db_pruner.workspace = true zksync_base_token_adjuster.workspace = true +zksync_node_storage_init.workspace = true pin-project-lite.workspace = true tracing.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 7cf05f1aa06c..acfe6c53417a 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -16,6 +16,7 @@ pub mod l1_gas; pub mod main_node_client; pub mod main_node_fee_params_fetcher; pub mod metadata_calculator; +pub mod node_storage_init; pub mod object_store; pub mod pk_signing_eth_client; pub mod pools_layer; @@ -24,6 +25,7 @@ pub mod prometheus_exporter; pub mod proof_data_handler; pub mod pruning; pub mod query_eth_client; +pub mod reorg_detector; pub mod sigint; pub mod state_keeper; pub mod sync_state_updater; diff --git a/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs new file mode 100644 index 000000000000..0358d30a3133 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/node_storage_init/external_node_strategy.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +// Re-export to initialize the layer without having to depend on the crate directly. +pub use zksync_node_storage_init::SnapshotRecoveryConfig; +use zksync_node_storage_init::{ + external_node::{ExternalNodeGenesis, ExternalNodeReverter, ExternalNodeSnapshotRecovery}, + InitializeStorage, NodeInitializationStrategy, RevertStorage, +}; +use zksync_types::L2ChainId; + +use super::NodeInitializationStrategyResource; +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + reverter::BlockReverterResource, + }, + wiring_layer::{WiringError, WiringLayer}, + FromContext, IntoContext, +}; + +/// Wiring layer for external node initialization strategy. +#[derive(Debug)] +pub struct ExternalNodeInitStrategyLayer { + pub l2_chain_id: L2ChainId, + pub snapshot_recovery_config: Option, +} + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub master_pool: PoolResource, + pub main_node_client: MainNodeClientResource, + pub block_reverter: Option, + #[context(default)] + pub app_health: AppHealthCheckResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + pub strategy: NodeInitializationStrategyResource, +} + +#[async_trait::async_trait] +impl WiringLayer for ExternalNodeInitStrategyLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "external_node_role_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let pool = input.master_pool.get().await?; + let MainNodeClientResource(client) = input.main_node_client; + let AppHealthCheckResource(app_health) = input.app_health; + let block_reverter = match input.block_reverter { + Some(reverter) => { + // If reverter was provided, we intend to be its sole consumer. + // We don't want multiple components to attempt reverting blocks. + let reverter = reverter.0.take().ok_or(WiringError::Configuration( + "BlockReverterResource is taken".into(), + ))?; + Some(reverter) + } + None => None, + }; + + let genesis = Arc::new(ExternalNodeGenesis { + l2_chain_id: self.l2_chain_id, + client: client.clone(), + pool: pool.clone(), + }); + let snapshot_recovery = self.snapshot_recovery_config.map(|recovery_config| { + Arc::new(ExternalNodeSnapshotRecovery { + client: client.clone(), + pool: pool.clone(), + recovery_config, + app_health, + }) as Arc + }); + let block_reverter = block_reverter.map(|block_reverter| { + Arc::new(ExternalNodeReverter { + client, + pool: pool.clone(), + reverter: block_reverter, + }) as Arc + }); + let strategy = NodeInitializationStrategy { + genesis, + snapshot_recovery, + block_reverter, + }; + + Ok(Output { + strategy: strategy.into(), + }) + } +} diff --git a/core/node/node_framework/src/implementations/layers/node_storage_init/main_node_strategy.rs b/core/node/node_framework/src/implementations/layers/node_storage_init/main_node_strategy.rs new file mode 100644 index 000000000000..ef43aaf1aee0 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/node_storage_init/main_node_strategy.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use zksync_config::{ContractsConfig, GenesisConfig}; +use zksync_node_storage_init::{main_node::MainNodeGenesis, NodeInitializationStrategy}; + +use super::NodeInitializationStrategyResource; +use crate::{ + implementations::resources::{ + eth_interface::EthInterfaceResource, + pools::{MasterPool, PoolResource}, + }, + wiring_layer::{WiringError, WiringLayer}, + FromContext, IntoContext, +}; + +/// Wiring layer for main node initialization strategy. +#[derive(Debug)] +pub struct MainNodeInitStrategyLayer { + pub genesis: GenesisConfig, + pub contracts: ContractsConfig, +} + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub master_pool: PoolResource, + pub eth_interface: EthInterfaceResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + pub strategy: NodeInitializationStrategyResource, +} + +#[async_trait::async_trait] +impl WiringLayer for MainNodeInitStrategyLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "main_node_role_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let pool = input.master_pool.get().await?; + let EthInterfaceResource(l1_client) = input.eth_interface; + let genesis = Arc::new(MainNodeGenesis { + contracts: self.contracts, + genesis: self.genesis, + l1_client, + pool, + }); + let strategy = NodeInitializationStrategy { + genesis, + snapshot_recovery: None, + block_reverter: None, + }; + + Ok(Output { + strategy: strategy.into(), + }) + } +} diff --git a/core/node/node_framework/src/implementations/layers/node_storage_init/mod.rs b/core/node/node_framework/src/implementations/layers/node_storage_init/mod.rs new file mode 100644 index 000000000000..5fed50e0f53d --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/node_storage_init/mod.rs @@ -0,0 +1,160 @@ +use zksync_node_storage_init::{NodeInitializationStrategy, NodeStorageInitializer}; + +use crate::{ + implementations::resources::pools::{MasterPool, PoolResource}, + resource::Resource, + service::StopReceiver, + task::{Task, TaskId, TaskKind}, + wiring_layer::{WiringError, WiringLayer}, + FromContext, IntoContext, +}; + +pub mod external_node_strategy; +pub mod main_node_strategy; + +/// Wiring layer for `NodeStorageInializer`. +/// +/// ## Requests resources +/// +/// - `PoolResource` +/// - `NodeInitializationStrategyResource` +/// +/// ## Adds tasks +/// +/// Depends on the mode, either `NodeStorageInitializer` or `NodeStorageInitializerPrecondition` +#[derive(Debug, Default)] +pub struct NodeStorageInitializerLayer { + as_precondition: bool, +} + +impl NodeStorageInitializerLayer { + pub fn new() -> Self { + Self::default() + } + + /// Changes the wiring logic to treat the initializer as a precondition. + pub fn as_precondition(mut self) -> Self { + self.as_precondition = true; + self + } +} + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub master_pool: PoolResource, + pub strategy: NodeInitializationStrategyResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + #[context(task)] + pub initializer: Option, + #[context(task)] + pub precondition: Option, +} + +impl Output { + fn initializer(initializer: NodeStorageInitializer) -> Self { + Self { + initializer: Some(initializer), + precondition: None, + } + } + + fn precondition(precondition: NodeStorageInitializer) -> Self { + Self { + initializer: None, + precondition: Some(NodeStorageInitializerPrecondition(precondition)), + } + } +} + +#[async_trait::async_trait] +impl WiringLayer for NodeStorageInitializerLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + if self.as_precondition { + return "node_storage_initializer_precondition_layer"; + } + "node_storage_initializer_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let pool = input.master_pool.get().await?; + let NodeInitializationStrategyResource(strategy) = input.strategy; + + let initializer = NodeStorageInitializer::new(strategy, pool); + + // Insert either task or precondition. + let output = if self.as_precondition { + Output::precondition(initializer) + } else { + Output::initializer(initializer) + }; + + Ok(output) + } +} + +#[async_trait::async_trait] +impl Task for NodeStorageInitializer { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedOneshotTask + } + + fn id(&self) -> TaskId { + "node_storage_initializer".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + tracing::info!("Starting the node storage initialization task"); + (*self).run(stop_receiver.0).await?; + tracing::info!("Node storage initialization task completed"); + Ok(()) + } +} + +/// Runs [`NodeStorageInitializer`] as a precondition, blocking +/// tasks from starting until the storage is initialized. +#[derive(Debug)] +pub struct NodeStorageInitializerPrecondition(NodeStorageInitializer); + +#[async_trait::async_trait] +impl Task for NodeStorageInitializerPrecondition { + fn kind(&self) -> TaskKind { + TaskKind::Precondition + } + + fn id(&self) -> TaskId { + "node_storage_initializer_precondition".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + tracing::info!("Waiting for node storage to be initialized"); + let result = self.0.wait_for_initialized_storage(stop_receiver.0).await; + tracing::info!("Node storage initialization precondition completed"); + result + } +} + +// Note: unlike with other modules, this one keeps within the same file to simplify +// moving the implementations out of the framework soon. +/// Resource representing the node initialization strategy. +#[derive(Debug, Clone)] +pub struct NodeInitializationStrategyResource(NodeInitializationStrategy); + +impl Resource for NodeInitializationStrategyResource { + fn name() -> String { + "node_initialization_strategy".into() + } +} + +impl From for NodeInitializationStrategyResource { + fn from(strategy: NodeInitializationStrategy) -> Self { + Self(strategy) + } +} diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector.rs b/core/node/node_framework/src/implementations/layers/reorg_detector.rs new file mode 100644 index 000000000000..0d4cf8dd5220 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/reorg_detector.rs @@ -0,0 +1,72 @@ +use zksync_reorg_detector::{self, ReorgDetector}; + +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + }, + service::StopReceiver, + task::{Task, TaskId}, + wiring_layer::{WiringError, WiringLayer}, + FromContext, IntoContext, +}; + +/// Wiring layer for [`ReorgDetector`] checker. +/// This layer is responsible for detecting reorgs and shutting down the node if one is detected. +/// +/// This layer assumes that the node starts with the initialized state. +#[derive(Debug)] +pub struct ReorgDetectorLayer; + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub main_node_client: MainNodeClientResource, + pub master_pool: PoolResource, + #[context(default)] + pub app_health: AppHealthCheckResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + #[context(task)] + pub reorg_detector: ReorgDetector, +} + +#[async_trait::async_trait] +impl WiringLayer for ReorgDetectorLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "reorg_detector_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let MainNodeClientResource(main_node_client) = input.main_node_client; + let pool = input.master_pool.get().await?; + + let reorg_detector = ReorgDetector::new(main_node_client, pool); + + let AppHealthCheckResource(app_health) = input.app_health; + app_health + .insert_component(reorg_detector.health_check().clone()) + .map_err(WiringError::internal)?; + + Ok(Output { reorg_detector }) + } +} + +#[async_trait::async_trait] +impl Task for ReorgDetector { + fn id(&self) -> TaskId { + "reorg_detector".into() + } + + async fn run(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await?; + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/resources/reverter.rs b/core/node/node_framework/src/implementations/resources/reverter.rs index 2d24f8fbbaf7..8a453b71659b 100644 --- a/core/node/node_framework/src/implementations/resources/reverter.rs +++ b/core/node/node_framework/src/implementations/resources/reverter.rs @@ -1,12 +1,10 @@ -use std::sync::Arc; - use zksync_block_reverter::BlockReverter; -use crate::resource::Resource; +use crate::resource::{Resource, Unique}; /// A resource that provides [`BlockReverter`] to the service. #[derive(Debug, Clone)] -pub struct BlockReverterResource(pub Arc); +pub struct BlockReverterResource(pub Unique); impl Resource for BlockReverterResource { fn name() -> String { diff --git a/core/node/node_storage_init/Cargo.toml b/core/node/node_storage_init/Cargo.toml new file mode 100644 index 000000000000..b3fdefbfbe60 --- /dev/null +++ b/core/node/node_storage_init/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "zksync_node_storage_init" +version = "0.1.0" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +zksync_config.workspace = true +zksync_dal.workspace = true +zksync_health_check.workspace = true +zksync_node_sync.workspace = true +zksync_node_genesis.workspace = true +zksync_object_store.workspace = true +zksync_shared_metrics.workspace = true +zksync_snapshots_applier.workspace = true +zksync_types.workspace = true +zksync_web3_decl.workspace = true +zksync_reorg_detector.workspace = true +zksync_block_reverter.workspace = true + +anyhow.workspace = true +async-trait.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/core/node/node_storage_init/README.md b/core/node/node_storage_init/README.md new file mode 100644 index 000000000000..e1b6768878ec --- /dev/null +++ b/core/node/node_storage_init/README.md @@ -0,0 +1,5 @@ +# `zksync_node_storage_init` + +A set of actions to ensure that any ZKsync node has initialized storage and can start running. + +This includes genesis, but not limited to it, and may involve other steps. diff --git a/core/node/node_storage_init/src/external_node/genesis.rs b/core/node/node_storage_init/src/external_node/genesis.rs new file mode 100644 index 000000000000..b7a7efa9cf53 --- /dev/null +++ b/core/node/node_storage_init/src/external_node/genesis.rs @@ -0,0 +1,39 @@ +use anyhow::Context as _; +use tokio::sync::watch; +use zksync_dal::{ConnectionPool, Core}; +use zksync_types::L2ChainId; +use zksync_web3_decl::client::{DynClient, L2}; + +use crate::InitializeStorage; + +#[derive(Debug)] +pub struct ExternalNodeGenesis { + pub l2_chain_id: L2ChainId, + pub client: Box>, + pub pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl InitializeStorage for ExternalNodeGenesis { + /// Will perform genesis initialization if it's required. + /// If genesis is already performed, this method will do nothing. + async fn initialize_storage( + &self, + _stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + let mut storage = self.pool.connection_tagged("en").await?; + zksync_node_sync::genesis::perform_genesis_if_needed( + &mut storage, + self.l2_chain_id, + &self.client.clone().for_component("genesis"), + ) + .await + .context("performing genesis failed") + } + + async fn is_initialized(&self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("en").await?; + let needed = zksync_node_sync::genesis::is_genesis_needed(&mut storage).await?; + Ok(!needed) + } +} diff --git a/core/node/node_storage_init/src/external_node/mod.rs b/core/node/node_storage_init/src/external_node/mod.rs new file mode 100644 index 000000000000..b04635bf3ccd --- /dev/null +++ b/core/node/node_storage_init/src/external_node/mod.rs @@ -0,0 +1,8 @@ +pub use self::{ + genesis::ExternalNodeGenesis, revert::ExternalNodeReverter, + snapshot_recovery::ExternalNodeSnapshotRecovery, +}; + +mod genesis; +mod revert; +mod snapshot_recovery; diff --git a/core/node/node_storage_init/src/external_node/revert.rs b/core/node/node_storage_init/src/external_node/revert.rs new file mode 100644 index 000000000000..0310f525572f --- /dev/null +++ b/core/node/node_storage_init/src/external_node/revert.rs @@ -0,0 +1,50 @@ +use anyhow::Context as _; +use tokio::sync::watch; +use zksync_block_reverter::BlockReverter; +use zksync_dal::{ConnectionPool, Core}; +use zksync_reorg_detector::ReorgDetector; +use zksync_types::L1BatchNumber; +use zksync_web3_decl::client::{DynClient, L2}; + +use crate::RevertStorage; + +#[derive(Debug)] +pub struct ExternalNodeReverter { + pub client: Box>, + pub pool: ConnectionPool, + pub reverter: BlockReverter, +} + +#[async_trait::async_trait] +impl RevertStorage for ExternalNodeReverter { + async fn revert_storage( + &self, + to_batch: L1BatchNumber, + _stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + tracing::info!("Reverting to l1 batch number {to_batch}"); + self.reverter.roll_back(to_batch).await?; + tracing::info!("Revert successfully completed"); + Ok(()) + } + + async fn last_correct_batch_for_reorg( + &self, + stop_receiver: watch::Receiver, + ) -> anyhow::Result> { + let mut reorg_detector = ReorgDetector::new(self.client.clone(), self.pool.clone()); + let batch = match reorg_detector.run_once(stop_receiver).await { + Ok(()) => { + // Even if stop signal was received, the node will shut down without launching any tasks. + tracing::info!("No rollback was detected"); + None + } + Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); + Some(last_correct_l1_batch) + } + Err(err) => return Err(err).context("reorg_detector.check_consistency()"), + }; + Ok(batch) + } +} diff --git a/core/node/node_storage_init/src/external_node/snapshot_recovery.rs b/core/node/node_storage_init/src/external_node/snapshot_recovery.rs new file mode 100644 index 000000000000..d9ba60a1bcbf --- /dev/null +++ b/core/node/node_storage_init/src/external_node/snapshot_recovery.rs @@ -0,0 +1,82 @@ +use std::{sync::Arc, time::Instant}; + +use anyhow::Context as _; +use tokio::sync::watch; +use zksync_dal::{ConnectionPool, Core}; +use zksync_health_check::AppHealthCheck; +use zksync_object_store::ObjectStoreFactory; +use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; +use zksync_snapshots_applier::{ + RecoveryCompletionStatus, SnapshotsApplierConfig, SnapshotsApplierTask, +}; +use zksync_web3_decl::client::{DynClient, L2}; + +use crate::{InitializeStorage, SnapshotRecoveryConfig}; + +#[derive(Debug)] +pub struct ExternalNodeSnapshotRecovery { + pub client: Box>, + pub pool: ConnectionPool, + pub recovery_config: SnapshotRecoveryConfig, + pub app_health: Arc, +} + +#[async_trait::async_trait] +impl InitializeStorage for ExternalNodeSnapshotRecovery { + async fn initialize_storage(&self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let pool = self.pool.clone(); + tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); + let object_store_config = + self.recovery_config.object_store_config.clone().context( + "Snapshot object store must be presented if snapshot recovery is activated", + )?; + let object_store = ObjectStoreFactory::new(object_store_config) + .create_store() + .await?; + + let config = SnapshotsApplierConfig::default(); + let mut snapshots_applier_task = SnapshotsApplierTask::new( + config, + pool, + Box::new(self.client.clone().for_component("snapshot_recovery")), + object_store, + ); + if let Some(snapshot_l1_batch) = self.recovery_config.snapshot_l1_batch_override { + tracing::info!( + "Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \ + if the snapshot is too old (order of several weeks old) or non-existent" + ); + snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch); + } + if self.recovery_config.drop_storage_key_preimages { + tracing::info!("Dropping storage key preimages for snapshot storage logs"); + snapshots_applier_task.drop_storage_key_preimages(); + } + self.app_health + .insert_component(snapshots_applier_task.health_check())?; + + let recovery_started_at = Instant::now(); + let stats = snapshots_applier_task + .run(stop_receiver) + .await + .context("snapshot recovery failed")?; + if stats.done_work { + let latency = recovery_started_at.elapsed(); + APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres].set(latency); + tracing::info!("Recovered Postgres from snapshot in {latency:?}"); + } + // We don't really care if the task was canceled. + // If it was, all the other tasks are canceled as well. + + Ok(()) + } + + async fn is_initialized(&self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("en").await?; + let completed = matches!( + SnapshotsApplierTask::is_recovery_completed(&mut storage, &self.client).await?, + RecoveryCompletionStatus::Completed + ); + Ok(completed) + } +} diff --git a/core/node/node_storage_init/src/lib.rs b/core/node/node_storage_init/src/lib.rs new file mode 100644 index 000000000000..10b0131908ca --- /dev/null +++ b/core/node/node_storage_init/src/lib.rs @@ -0,0 +1,213 @@ +use std::{future::Future, sync::Arc, time::Duration}; + +use tokio::sync::watch; +use zksync_config::ObjectStoreConfig; +use zksync_dal::{ConnectionPool, Core, CoreDal as _}; +use zksync_types::L1BatchNumber; + +pub use crate::traits::{InitializeStorage, RevertStorage}; + +pub mod external_node; +pub mod main_node; +mod traits; + +#[derive(Debug)] +pub struct SnapshotRecoveryConfig { + /// If not specified, the latest snapshot will be used. + pub snapshot_l1_batch_override: Option, + pub drop_storage_key_preimages: bool, + pub object_store_config: Option, +} + +#[derive(Debug, Clone, Copy)] +enum InitDecision { + /// Perform or check genesis. + Genesis, + /// Perform or check snapshot recovery. + SnapshotRecovery, +} + +#[derive(Debug, Clone)] +pub struct NodeInitializationStrategy { + pub genesis: Arc, + pub snapshot_recovery: Option>, + pub block_reverter: Option>, +} + +/// Node storage initializer. +/// This structure is responsible for making sure that the node storage is initialized. +/// +/// This structure operates together with [`NodeRole`] to achieve that: +/// `NodeStorageInitializer` understands what does initialized storage mean, but it defers +/// any actual initialization to the `NodeRole` implementation. This allows to have different +/// initialization strategies for different node types, while keeping common invariants +/// for the whole system. +#[derive(Debug)] +pub struct NodeStorageInitializer { + strategy: NodeInitializationStrategy, + pool: ConnectionPool, +} + +impl NodeStorageInitializer { + pub fn new(strategy: NodeInitializationStrategy, pool: ConnectionPool) -> Self { + Self { strategy, pool } + } + + /// Returns the preferred kind of storage initialization. + /// The decision is based on the current state of the storage. + /// Note that the decision does not guarantee that the initialization has not been performed + /// already, so any returned decision should be checked before performing the initialization. + async fn decision(&self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("node_init").await?; + let genesis_l1_batch = storage + .blocks_dal() + .get_l1_batch_header(L1BatchNumber(0)) + .await?; + let snapshot_recovery = storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await?; + drop(storage); + + let decision = match (genesis_l1_batch, snapshot_recovery) { + (Some(batch), Some(snapshot_recovery)) => { + anyhow::bail!( + "Node has both genesis L1 batch: {batch:?} and snapshot recovery information: {snapshot_recovery:?}. \ + This is not supported and can be caused by broken snapshot recovery." + ); + } + (Some(batch), None) => { + tracing::info!( + "Node has a genesis L1 batch: {batch:?} and no snapshot recovery info" + ); + InitDecision::Genesis + } + (None, Some(snapshot_recovery)) => { + tracing::info!("Node has no genesis L1 batch and snapshot recovery information: {snapshot_recovery:?}"); + InitDecision::SnapshotRecovery + } + (None, None) => { + tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info"); + if self.strategy.snapshot_recovery.is_some() { + InitDecision::SnapshotRecovery + } else { + InitDecision::Genesis + } + } + }; + Ok(decision) + } + + /// Initializes the storage for the node. + /// After the initialization, the node can safely start operating. + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let decision = self.decision().await?; + + // Make sure that we have state to work with. + match decision { + InitDecision::Genesis => { + tracing::info!("Performing genesis initialization"); + self.strategy + .genesis + .initialize_storage(stop_receiver.clone()) + .await?; + } + InitDecision::SnapshotRecovery => { + tracing::info!("Performing snapshot recovery initialization"); + if let Some(recovery) = &self.strategy.snapshot_recovery { + recovery.initialize_storage(stop_receiver.clone()).await?; + } else { + anyhow::bail!( + "Snapshot recovery should be performed, but the strategy is not provided" + ); + } + } + } + + // Now we may check whether we're in the invalid state and should perform a rollback. + if let Some(reverter) = &self.strategy.block_reverter { + if let Some(to_batch) = reverter + .last_correct_batch_for_reorg(stop_receiver.clone()) + .await? + { + tracing::info!(l1_batch = %to_batch, "State must be rolled back to L1 batch"); + tracing::info!("Performing the rollback"); + reverter.revert_storage(to_batch, stop_receiver).await?; + } + } + + Ok(()) + } + + /// Checks if the node can safely start operating. + pub async fn wait_for_initialized_storage( + &self, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + const POLLING_INTERVAL: Duration = Duration::from_secs(1); + + // Wait until data is added to the database. + poll(stop_receiver.clone(), POLLING_INTERVAL, || { + self.is_database_initialized() + }) + .await?; + if *stop_receiver.borrow() { + return Ok(()); + } + + // Wait until the rollback is no longer needed. + poll(stop_receiver.clone(), POLLING_INTERVAL, || { + self.is_chain_tip_correct(stop_receiver.clone()) + }) + .await?; + + Ok(()) + } + + async fn is_database_initialized(&self) -> anyhow::Result { + // We're fine if the database is initialized in any meaningful way we can check. + if self.strategy.genesis.is_initialized().await? { + return Ok(true); + } + if let Some(snapshot_recovery) = &self.strategy.snapshot_recovery { + return snapshot_recovery.is_initialized().await; + } + Ok(false) + } + + /// Checks if the head of the chain has correct state, e.g. no rollback needed. + async fn is_chain_tip_correct( + &self, + stop_receiver: watch::Receiver, + ) -> anyhow::Result { + // May be `true` if stop signal is received, but the node will shut down without launching any tasks anyway. + let initialized = if let Some(reverter) = &self.strategy.block_reverter { + reverter + .last_correct_batch_for_reorg(stop_receiver) + .await? + .is_none() + } else { + true + }; + Ok(initialized) + } +} + +async fn poll( + mut stop_receiver: watch::Receiver, + polling_interval: Duration, + mut check: F, +) -> anyhow::Result<()> +where + F: FnMut() -> Fut, + Fut: Future>, +{ + while !*stop_receiver.borrow() && !check().await? { + // Return value will be checked on the next iteration. + tokio::time::timeout(polling_interval, stop_receiver.changed()) + .await + .ok(); + } + + Ok(()) +} diff --git a/core/node/node_storage_init/src/main_node/genesis.rs b/core/node/node_storage_init/src/main_node/genesis.rs new file mode 100644 index 000000000000..db2eef51912e --- /dev/null +++ b/core/node/node_storage_init/src/main_node/genesis.rs @@ -0,0 +1,54 @@ +use anyhow::Context as _; +use tokio::sync::watch; +use zksync_config::{ContractsConfig, GenesisConfig}; +use zksync_dal::{ConnectionPool, Core, CoreDal as _}; +use zksync_node_genesis::GenesisParams; +use zksync_web3_decl::client::{DynClient, L1}; + +use crate::traits::InitializeStorage; + +#[derive(Debug)] +pub struct MainNodeGenesis { + pub genesis: GenesisConfig, + pub contracts: ContractsConfig, + pub l1_client: Box>, + pub pool: ConnectionPool, +} + +#[async_trait::async_trait] +impl InitializeStorage for MainNodeGenesis { + /// Will perform genesis initialization if it's required. + /// If genesis is already performed, this method will do nothing. + async fn initialize_storage( + &self, + _stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + let mut storage = self.pool.connection_tagged("genesis").await?; + + if !storage.blocks_dal().is_genesis_needed().await? { + return Ok(()); + } + + let params = GenesisParams::load_genesis_params(self.genesis.clone())?; + zksync_node_genesis::ensure_genesis_state(&mut storage, ¶ms).await?; + + if let Some(ecosystem_contracts) = &self.contracts.ecosystem_contracts { + zksync_node_genesis::save_set_chain_id_tx( + &mut storage, + &self.l1_client, + self.contracts.diamond_proxy_addr, + ecosystem_contracts.state_transition_proxy_addr, + ) + .await + .context("Failed to save SetChainId upgrade transaction")?; + } + + Ok(()) + } + + async fn is_initialized(&self) -> anyhow::Result { + let mut storage = self.pool.connection_tagged("genesis").await?; + let needed = zksync_node_genesis::is_genesis_needed(&mut storage).await?; + Ok(!needed) + } +} diff --git a/core/node/node_storage_init/src/main_node/mod.rs b/core/node/node_storage_init/src/main_node/mod.rs new file mode 100644 index 000000000000..4254e7b08d87 --- /dev/null +++ b/core/node/node_storage_init/src/main_node/mod.rs @@ -0,0 +1,3 @@ +pub use self::genesis::MainNodeGenesis; + +mod genesis; diff --git a/core/node/node_storage_init/src/traits.rs b/core/node/node_storage_init/src/traits.rs new file mode 100644 index 000000000000..3b6467764d97 --- /dev/null +++ b/core/node/node_storage_init/src/traits.rs @@ -0,0 +1,33 @@ +use std::fmt; + +use tokio::sync::watch; +use zksync_types::L1BatchNumber; + +/// An abstract storage initialization strategy. +#[async_trait::async_trait] +pub trait InitializeStorage: fmt::Debug + Send + Sync + 'static { + /// Checks if the storage is already initialized. + async fn is_initialized(&self) -> anyhow::Result; + + /// Initializes the storage. + /// Implementors of this method may assume that they have unique access to the storage. + async fn initialize_storage(&self, stop_receiver: watch::Receiver) -> anyhow::Result<()>; +} + +/// An abstract storage revert strategy. +/// This trait assumes that for any invalid state there exists a batch number to which the storage can be rolled back. +#[async_trait::async_trait] +pub trait RevertStorage: fmt::Debug + Send + Sync + 'static { + /// Checks if the storage is invalid state and has to be rolled back. + async fn last_correct_batch_for_reorg( + &self, + stop_receiver: watch::Receiver, + ) -> anyhow::Result>; + + /// Reverts the storage to the provided batch number. + async fn revert_storage( + &self, + to_batch: L1BatchNumber, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()>; +} diff --git a/core/node/node_sync/src/genesis.rs b/core/node/node_sync/src/genesis.rs index c1b45f8ade93..ccc26b417e98 100644 --- a/core/node/node_sync/src/genesis.rs +++ b/core/node/node_sync/src/genesis.rs @@ -8,6 +8,10 @@ use zksync_types::{ use super::client::MainNodeClient; +pub async fn is_genesis_needed(storage: &mut Connection<'_, Core>) -> anyhow::Result { + Ok(storage.blocks_dal().is_genesis_needed().await?) +} + pub async fn perform_genesis_if_needed( storage: &mut Connection<'_, Core>, zksync_chain_id: L2ChainId, diff --git a/infrastructure/zk/src/server.ts b/infrastructure/zk/src/server.ts index 2ed74deca98e..8b10559361ae 100644 --- a/infrastructure/zk/src/server.ts +++ b/infrastructure/zk/src/server.ts @@ -14,10 +14,6 @@ export async function server(rebuildTree: boolean, uring: boolean, components?: if (rebuildTree || components || useNodeFramework) { options += ' --'; } - if (rebuildTree) { - clean('db'); - options += ' --rebuild-tree'; - } if (components) { options += ` --components=${components}`; } @@ -75,7 +71,6 @@ export async function genesisFromBinary() { export const serverCommand = new Command('server') .description('start zksync server') .option('--genesis', 'generate genesis data via server') - .option('--rebuild-tree', 'rebuilds merkle tree from database logs', 'rebuild_tree') .option('--uring', 'enables uring support for RocksDB') .option('--components ', 'comma-separated list of components to run') .option('--chain-name ', 'environment name')