From 563110919733ad3c3d167571bb3a76cb8f2ea546 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 3 Jun 2024 16:17:38 +0300 Subject: [PATCH 1/7] Sketch specifying snapshot L1 batch --- core/lib/snapshots_applier/src/lib.rs | 89 +++++++++++++++---- core/lib/snapshots_applier/src/tests/mod.rs | 12 ++- core/lib/snapshots_applier/src/tests/utils.rs | 19 +++- 3 files changed, 102 insertions(+), 18 deletions(-) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index bcf4b3c14329..6334e59f2507 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -123,7 +123,14 @@ pub trait SnapshotsApplierMainNodeClient: fmt::Debug + Send + Sync { number: L2BlockNumber, ) -> EnrichedClientResult>; - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult>; + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult>; + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult>; async fn fetch_tokens( &self, @@ -153,17 +160,23 @@ impl SnapshotsApplierMainNodeClient for Box> { .await } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { let snapshots = self .get_all_snapshots() .rpc_context("get_all_snapshots") .await?; - let Some(newest_snapshot) = snapshots.snapshots_l1_batch_numbers.first() else { - return Ok(None); - }; - self.get_snapshot_by_l1_batch_number(*newest_snapshot) + Ok(snapshots.snapshots_l1_batch_numbers.first().copied()) + } + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { + self.get_snapshot_by_l1_batch_number(l1_batch_number) .rpc_context("get_snapshot_by_l1_batch_number") - .with_arg("number", newest_snapshot) + .with_arg("number", &l1_batch_number) .await } @@ -223,6 +236,7 @@ pub struct SnapshotApplierTaskStats { #[derive(Debug)] pub struct SnapshotsApplierTask { + snapshot_l1_batch: Option, config: SnapshotsApplierConfig, health_updater: HealthUpdater, connection_pool: ConnectionPool, @@ -238,6 +252,7 @@ impl SnapshotsApplierTask { blob_store: Arc, ) -> Self { Self { + snapshot_l1_batch: None, config, health_updater: ReactiveHealthCheck::new("snapshot_recovery").1, connection_pool, @@ -246,6 +261,12 @@ impl SnapshotsApplierTask { } } + /// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete, but is checked + /// if recovery is in progress (so if a node started recovering from another snapshot, it will error). + pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) { + self.snapshot_l1_batch = Some(number); + } + /// Returns the health check for snapshot recovery. pub fn health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() @@ -270,6 +291,7 @@ impl SnapshotsApplierTask { self.main_node_client.as_ref(), &self.blob_store, &self.health_updater, + self.snapshot_l1_batch, self.config.max_concurrency.get(), ) .await; @@ -324,6 +346,7 @@ impl SnapshotRecoveryStrategy { async fn new( storage: &mut Connection<'_, Core>, main_node_client: &dyn SnapshotsApplierMainNodeClient, + snapshot_l1_batch: Option, ) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> { let latency = METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start(); @@ -338,6 +361,18 @@ impl SnapshotRecoveryStrategy { return Ok((Self::Completed, applied_snapshot_status)); } + // Check whether the snapshot L1 batch number is the expected one. Note that we intentionally skip this check + // if snapshot recovery is completed. + if let Some(expected_l1_batch) = snapshot_l1_batch { + if applied_snapshot_status.l1_batch_number != expected_l1_batch { + let err = anyhow::anyhow!( + "snapshot recovery is requested for L1 batch #{expected_l1_batch}, but it is already started for L1 batch #{}", + applied_snapshot_status.l1_batch_number + ); + return Err(SnapshotsApplierError::Fatal(err)); + } + } + let latency = latency.observe(); tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}"); Ok((Self::Resumed, applied_snapshot_status)) @@ -350,7 +385,8 @@ impl SnapshotRecoveryStrategy { return Err(SnapshotsApplierError::Fatal(err)); } - let recovery_status = Self::create_fresh_recovery_status(main_node_client).await?; + let recovery_status = + Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?; let storage_logs_count = storage .storage_logs_dal() @@ -373,16 +409,34 @@ impl SnapshotRecoveryStrategy { async fn create_fresh_recovery_status( main_node_client: &dyn SnapshotsApplierMainNodeClient, + snapshot_l1_batch: Option, ) -> Result { - let snapshot_response = main_node_client.fetch_newest_snapshot().await?; + let (l1_batch_number, is_latest) = match snapshot_l1_batch { + Some(num) => (num, false), + None => { + let num = main_node_client + .fetch_newest_snapshot_l1_batch_number() + .await? + .context("no snapshots on main node; snapshot recovery is impossible")?; + (num, true) + } + }; + let snapshot_response = main_node_client.fetch_snapshot(l1_batch_number).await?; - let snapshot = snapshot_response - .context("no snapshots on main node; snapshot recovery is impossible")?; - let l1_batch_number = snapshot.l1_batch_number; + let snapshot = snapshot_response.with_context(|| { + if is_latest { + format!( + "latest snapshot for L1 batch #{l1_batch_number} disappeared from main node" + ) + } else { + format!("snapshot for L1 batch #{l1_batch_number} is not present on main node") + } + })?; let l2_block_number = snapshot.l2_block_number; tracing::info!( - "Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \ + "Found {snapshot_kind} snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \ version {version}, storage logs are divided into {chunk_count} chunk(s)", + snapshot_kind = if is_latest { "latest" } else { "requested" }, version = snapshot.version, chunk_count = snapshot.storage_logs_chunks.len() ); @@ -461,6 +515,7 @@ impl<'a> SnapshotsApplier<'a> { main_node_client: &'a dyn SnapshotsApplierMainNodeClient, blob_store: &'a dyn ObjectStore, health_updater: &'a HealthUpdater, + snapshot_l1_batch: Option, max_concurrency: usize, ) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> { // While the recovery is in progress, the node is healthy (no error has occurred), @@ -472,8 +527,12 @@ impl<'a> SnapshotsApplier<'a> { .await?; let mut storage_transaction = storage.start_transaction().await?; - let (strategy, applied_snapshot_status) = - SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?; + let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new( + &mut storage_transaction, + main_node_client, + snapshot_l1_batch, + ) + .await?; tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}"); let created_from_scratch = match strategy { SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)), diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 59a95792c1ca..2bbf65a580ec 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -165,7 +165,17 @@ async fn health_status_immediately_after_task_start() { future::pending().await } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { + self.0.wait().await; + future::pending().await + } + + async fn fetch_snapshot( + &self, + _l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { self.0.wait().await; future::pending().await } diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index 4629d8c0a2fb..c8c00001e576 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -45,8 +45,23 @@ impl SnapshotsApplierMainNodeClient for MockMainNodeClient { Ok(self.fetch_l2_block_responses.get(&number).cloned()) } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { - Ok(self.fetch_newest_snapshot_response.clone()) + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { + Ok(self + .fetch_newest_snapshot_response + .as_ref() + .map(|response| response.l1_batch_number)) + } + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { + Ok(self + .fetch_newest_snapshot_response + .clone() + .filter(|response| response.l1_batch_number == l1_batch_number)) } async fn fetch_tokens( From ff356d70ede6922c63866b25808829e78794f2eb Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 3 Jun 2024 16:32:00 +0300 Subject: [PATCH 2/7] Test specifying snapshot L1 batch --- core/lib/snapshots_applier/src/tests/mod.rs | 47 +++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 2bbf65a580ec..1875bb7fdd13 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -142,6 +142,53 @@ async fn snapshots_creator_can_successfully_recover_db( assert!(!stats.done_work); } +#[tokio::test] +async fn applier_recovers_explicitly_specified_snapshot() { + let pool = ConnectionPool::::test_pool().await; + let expected_status = mock_recovery_status(); + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + + let mut task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool.clone(), + Box::new(client), + object_store, + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number); + let stats = task.run().await.unwrap(); + assert!(stats.done_work); + + let mut storage = pool.connection().await.unwrap(); + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_storage_logs.len(), storage_logs.len()); +} + +#[tokio::test] +async fn applier_error_for_missing_explicitly_specified_snapshot() { + let pool = ConnectionPool::::test_pool().await; + let expected_status = mock_recovery_status(); + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + + let mut task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool, + Box::new(client), + object_store, + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1); + + let err = task.run().await.unwrap_err(); + assert!( + format!("{err:#}").contains("not present on main node"), + "{err:#}" + ); +} + #[tokio::test] async fn health_status_immediately_after_task_start() { #[derive(Debug, Clone)] From 83b625e453ceed6c0152ac4f2bdc95362f188df0 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 3 Jun 2024 17:44:08 +0300 Subject: [PATCH 3/7] Test snapshot applier recovery --- core/lib/snapshots_applier/src/lib.rs | 2 +- core/lib/snapshots_applier/src/tests/mod.rs | 111 ++++++++++++++++++ core/lib/snapshots_applier/src/tests/utils.rs | 80 +++++++++++-- 3 files changed, 181 insertions(+), 12 deletions(-) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index 6334e59f2507..ae9489e6dd41 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -192,7 +192,7 @@ impl SnapshotsApplierMainNodeClient for Box> { } /// Snapshot applier configuration options. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SnapshotsApplierConfig { /// Number of retries for transient errors before giving up on recovery (i.e., returning an error /// from [`Self::run()`]). diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 1875bb7fdd13..1e8635566a76 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -21,6 +21,7 @@ use self::utils::{ random_storage_logs, MockMainNodeClient, ObjectStoreWithErrors, }; use super::*; +use crate::tests::utils::HangingObjectStore; mod utils; @@ -189,6 +190,116 @@ async fn applier_error_for_missing_explicitly_specified_snapshot() { ); } +#[tokio::test] +async fn snapshot_applier_recovers_after_stopping() { + let pool = ConnectionPool::::test_pool().await; + let mut expected_status = mock_recovery_status(); + expected_status.storage_logs_chunks_processed = vec![true; 10]; + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 1); + + let mut config = SnapshotsApplierConfig::for_tests(); + config.max_concurrency = NonZeroUsize::new(1).unwrap(); + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let task_handle = tokio::spawn(task.run()); + + // Wait until the first storage logs chunk is requested (the object store hangs up at this point) + stop_receiver.wait_for(|&count| count > 1).await.unwrap(); + assert!(!task_handle.is_finished()); + task_handle.abort(); + + // Check that factory deps have been persisted, but no storage logs. + let mut storage = pool.connection().await.unwrap(); + let all_factory_deps = storage + .factory_deps_dal() + .dump_all_factory_deps_for_tests() + .await; + assert!(!all_factory_deps.is_empty()); + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert!(all_storage_logs.is_empty(), "{all_storage_logs:?}"); + + // Recover 3 storage log chunks and stop again + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 3); + + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let task_handle = tokio::spawn(task.run()); + + stop_receiver.wait_for(|&count| count > 3).await.unwrap(); + assert!(!task_handle.is_finished()); + task_handle.abort(); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert!(all_storage_logs.len() < storage_logs.len()); + + // Recover remaining 7 (10 - 3) storage log chunks. + let (stopping_object_store, _) = HangingObjectStore::new(object_store.clone(), 7); + let mut task = SnapshotsApplierTask::new( + config, + pool.clone(), + Box::new(client), + Arc::new(stopping_object_store), + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine + task.run().await.unwrap(); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_storage_logs.len(), storage_logs.len()); +} + +#[tokio::test] +async fn applier_errors_if_snapshot_l1_batch_differs_from_previous_one() { + let pool = ConnectionPool::::test_pool().await; + let expected_status = mock_recovery_status(); + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 1); + + let mut config = SnapshotsApplierConfig::for_tests(); + config.max_concurrency = NonZeroUsize::new(1).unwrap(); + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let task_handle = tokio::spawn(task.run()); + + stop_receiver.wait_for(|&count| count > 1).await.unwrap(); + assert!(!task_handle.is_finished()); + task_handle.abort(); + + let mut task = SnapshotsApplierTask::new(config, pool, Box::new(client), object_store); + task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1); + let err = task.run().await.unwrap_err(); + assert!( + format!("{err:#}").contains("is already started for L1 batch"), + "{err:#}" + ); +} + #[tokio::test] async fn health_status_immediately_after_task_start() { #[derive(Debug, Clone)] diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index c8c00001e576..c853481ab53b 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -1,8 +1,9 @@ //! Test utils. -use std::{collections::HashMap, fmt, sync::Arc}; +use std::{collections::HashMap, fmt, future, sync::Arc}; use async_trait::async_trait; +use tokio::sync::watch; use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory}; use zksync_types::{ api, @@ -238,16 +239,12 @@ pub(super) fn mock_snapshot_header(status: &SnapshotRecoveryStatus) -> SnapshotH version: SnapshotVersion::Version0.into(), l1_batch_number: status.l1_batch_number, l2_block_number: status.l2_block_number, - storage_logs_chunks: vec![ - SnapshotStorageLogsChunkMetadata { - chunk_id: 0, - filepath: "file0".to_string(), - }, - SnapshotStorageLogsChunkMetadata { - chunk_id: 1, - filepath: "file1".to_string(), - }, - ], + storage_logs_chunks: (0..status.storage_logs_chunks_processed.len() as u64) + .map(|chunk_id| SnapshotStorageLogsChunkMetadata { + chunk_id, + filepath: format!("file{chunk_id}"), + }) + .collect(), factory_deps_filepath: "some_filepath".to_string(), } } @@ -304,3 +301,64 @@ pub(super) async fn prepare_clients( ); (object_store, client) } + +/// Object store wrapper that hangs up after processing the specified number of requests. +/// Used to emulate the snapshot applier being restarted since, if it's configured to have concurrency 1, +/// the applier will request an object from the store strictly after fully processing all previously requested objects. +#[derive(Debug)] +pub(super) struct HangingObjectStore { + inner: Arc, + stop_after_count: usize, + count_sender: watch::Sender, +} + +impl HangingObjectStore { + pub fn new( + inner: Arc, + stop_after_count: usize, + ) -> (Self, watch::Receiver) { + let (count_sender, count_receiver) = watch::channel(0); + let this = Self { + inner, + stop_after_count, + count_sender, + }; + (this, count_receiver) + } +} + +#[async_trait] +impl ObjectStore for HangingObjectStore { + async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { + let mut should_proceed = true; + self.count_sender.send_modify(|count| { + *count += 1; + if dbg!(*count) > self.stop_after_count { + should_proceed = false; + } + }); + + if dbg!(should_proceed) { + self.inner.get_raw(bucket, key).await + } else { + future::pending().await // Hang up the snapshot applier task + } + } + + async fn put_raw( + &self, + _bucket: Bucket, + _key: &str, + _value: Vec, + ) -> Result<(), ObjectStoreError> { + unreachable!("Should not be used in snapshot applier") + } + + async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { + unreachable!("Should not be used in snapshot applier") + } + + fn storage_prefix_raw(&self, bucket: Bucket) -> String { + self.inner.storage_prefix_raw(bucket) + } +} From c338e352b4192f4ee8729101d2b4d93558770254 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 4 Jun 2024 09:58:52 +0300 Subject: [PATCH 4/7] Allow configuring snapshot L1 batch on EN --- core/bin/external_node/src/config/mod.rs | 27 ++++++++------------- core/bin/external_node/src/init.rs | 30 +++++++++++++++++------- core/bin/external_node/src/main.rs | 11 +++++++-- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 08fd955297ed..3d94e833217a 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -25,8 +25,8 @@ use zksync_node_api_server::{ use zksync_protobuf_config::proto; use zksync_snapshots_applier::SnapshotsApplierConfig; use zksync_types::{ - api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, L1ChainId, - L2ChainId, ETHEREUM_ADDRESS, + api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, + L1BatchNumber, L1ChainId, L2ChainId, ETHEREUM_ADDRESS, }; use zksync_web3_decl::{ client::{DynClient, L2}, @@ -746,6 +746,8 @@ pub(crate) struct ExperimentalENConfig { pub state_keeper_db_max_open_files: Option, // Snapshot recovery + /// L1 batch number of the snapshot to use during recovery. Specifying this parameter is mostly useful for testing. + pub snapshots_recovery_l1_batch: Option, /// Approximate chunk size (measured in the number of entries) to recover in a single iteration. /// Reasonable values are order of 100,000 (meaning an iteration takes several seconds). /// @@ -775,6 +777,7 @@ impl ExperimentalENConfig { state_keeper_db_block_cache_capacity_mb: Self::default_state_keeper_db_block_cache_capacity_mb(), state_keeper_db_max_open_files: None, + snapshots_recovery_l1_batch: None, snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(), commitment_generator_max_parallelism: None, } @@ -807,21 +810,11 @@ pub(crate) fn read_consensus_config() -> anyhow::Result> )) } -/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled. -#[derive(Debug)] -pub(crate) struct SnapshotsRecoveryConfig { - pub snapshots_object_store: ObjectStoreConfig, -} - -impl SnapshotsRecoveryConfig { - pub fn new() -> anyhow::Result { - let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") - .from_env::() - .context("failed loading snapshot object store config from env variables")?; - Ok(Self { - snapshots_object_store, - }) - } +/// Configuration for snapshot recovery. Should be loaded optionally, only if snapshot recovery is enabled. +pub(crate) fn snapshot_recovery_object_store_config() -> anyhow::Result { + envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") + .from_env::() + .context("failed loading snapshot object store config from env variables") } #[derive(Debug, Deserialize)] diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 0f4ae9a80362..3ad69783594a 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -12,7 +12,13 @@ use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask}; use zksync_types::{L1BatchNumber, L2ChainId}; use zksync_web3_decl::client::{DynClient, L2}; -use crate::config::SnapshotsRecoveryConfig; +use crate::config::snapshot_recovery_object_store_config; + +#[derive(Debug)] +pub(crate) struct SnapshotRecoveryConfig { + /// If not specified, the latest snapshot will be used. + pub snapshot_l1_batch: Option, +} #[derive(Debug)] enum InitDecision { @@ -27,7 +33,7 @@ pub(crate) async fn ensure_storage_initialized( main_node_client: Box>, app_health: &AppHealthCheck, l2_chain_id: L2ChainId, - consider_snapshot_recovery: bool, + recovery_config: Option, ) -> anyhow::Result<()> { let mut storage = pool.connection_tagged("en").await?; let genesis_l1_batch = storage @@ -57,7 +63,7 @@ pub(crate) async fn ensure_storage_initialized( } (None, None) => { tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info"); - if consider_snapshot_recovery { + if recovery_config.is_some() { InitDecision::SnapshotRecovery } else { InitDecision::Genesis @@ -78,25 +84,31 @@ pub(crate) async fn ensure_storage_initialized( .context("performing genesis failed")?; } InitDecision::SnapshotRecovery => { - anyhow::ensure!( - consider_snapshot_recovery, + let recovery_config = recovery_config.context( "Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \ `EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery" - ); + )?; tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); - let recovery_config = SnapshotsRecoveryConfig::new()?; - let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store) + let object_store_config = snapshot_recovery_object_store_config()?; + let blob_store = ObjectStoreFactory::new(object_store_config) .create_store() .await; let config = SnapshotsApplierConfig::default(); - let snapshots_applier_task = SnapshotsApplierTask::new( + let mut snapshots_applier_task = SnapshotsApplierTask::new( config, pool, Box::new(main_node_client.for_component("snapshot_recovery")), blob_store, ); + if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch { + tracing::info!( + "Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \ + if the snapshot is too old or non-existent" + ); + snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch); + } app_health.insert_component(snapshots_applier_task.health_check())?; let recovery_started_at = Instant::now(); diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 584356e755bf..6845bca236d7 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -55,7 +55,7 @@ use zksync_web3_decl::{ use crate::{ config::ExternalNodeConfig, helpers::{MainNodeHealthCheck, ValidateChainIdsTask}, - init::ensure_storage_initialized, + init::{ensure_storage_initialized, SnapshotRecoveryConfig}, metrics::RUST_METRICS, }; @@ -908,12 +908,19 @@ async fn run_node( task_handles.extend(prometheus_task); // Make sure that the node storage is initialized either via genesis or snapshot recovery. + let recovery_config = + config + .optional + .snapshots_recovery_enabled + .then_some(SnapshotRecoveryConfig { + snapshot_l1_batch: config.experimental.snapshots_recovery_l1_batch, + }); ensure_storage_initialized( connection_pool.clone(), main_node_client.clone(), &app_health, config.required.l2_chain_id, - config.optional.snapshots_recovery_enabled, + recovery_config, ) .await?; let sigint_receiver = env.setup_sigint_handler(); From 5d42ca3ebcde58c7b23daea65566021a53badb80 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 4 Jun 2024 16:51:40 +0300 Subject: [PATCH 5/7] Remove L1 batch check on recovery reinit --- core/lib/snapshots_applier/src/lib.rs | 15 +--------- core/lib/snapshots_applier/src/tests/mod.rs | 32 --------------------- 2 files changed, 1 insertion(+), 46 deletions(-) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index ae9489e6dd41..2ad4a57b9c30 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -261,8 +261,7 @@ impl SnapshotsApplierTask { } } - /// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete, but is checked - /// if recovery is in progress (so if a node started recovering from another snapshot, it will error). + /// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed. pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) { self.snapshot_l1_batch = Some(number); } @@ -361,18 +360,6 @@ impl SnapshotRecoveryStrategy { return Ok((Self::Completed, applied_snapshot_status)); } - // Check whether the snapshot L1 batch number is the expected one. Note that we intentionally skip this check - // if snapshot recovery is completed. - if let Some(expected_l1_batch) = snapshot_l1_batch { - if applied_snapshot_status.l1_batch_number != expected_l1_batch { - let err = anyhow::anyhow!( - "snapshot recovery is requested for L1 batch #{expected_l1_batch}, but it is already started for L1 batch #{}", - applied_snapshot_status.l1_batch_number - ); - return Err(SnapshotsApplierError::Fatal(err)); - } - } - let latency = latency.observe(); tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}"); Ok((Self::Resumed, applied_snapshot_status)) diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 1e8635566a76..e61f76455372 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -268,38 +268,6 @@ async fn snapshot_applier_recovers_after_stopping() { assert_eq!(all_storage_logs.len(), storage_logs.len()); } -#[tokio::test] -async fn applier_errors_if_snapshot_l1_batch_differs_from_previous_one() { - let pool = ConnectionPool::::test_pool().await; - let expected_status = mock_recovery_status(); - let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); - let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; - let (stopping_object_store, mut stop_receiver) = - HangingObjectStore::new(object_store.clone(), 1); - - let mut config = SnapshotsApplierConfig::for_tests(); - config.max_concurrency = NonZeroUsize::new(1).unwrap(); - let task = SnapshotsApplierTask::new( - config.clone(), - pool.clone(), - Box::new(client.clone()), - Arc::new(stopping_object_store), - ); - let task_handle = tokio::spawn(task.run()); - - stop_receiver.wait_for(|&count| count > 1).await.unwrap(); - assert!(!task_handle.is_finished()); - task_handle.abort(); - - let mut task = SnapshotsApplierTask::new(config, pool, Box::new(client), object_store); - task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1); - let err = task.run().await.unwrap_err(); - assert!( - format!("{err:#}").contains("is already started for L1 batch"), - "{err:#}" - ); -} - #[tokio::test] async fn health_status_immediately_after_task_start() { #[derive(Debug, Clone)] From 658642f35cdb633b974e905527c459e4a3fba634 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 4 Jun 2024 16:55:09 +0300 Subject: [PATCH 6/7] Fix minor nits --- core/bin/external_node/src/init.rs | 6 +++--- core/bin/external_node/src/main.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 3ad69783594a..fb30628e3890 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -17,7 +17,7 @@ use crate::config::snapshot_recovery_object_store_config; #[derive(Debug)] pub(crate) struct SnapshotRecoveryConfig { /// If not specified, the latest snapshot will be used. - pub snapshot_l1_batch: Option, + pub snapshot_l1_batch_override: Option, } #[derive(Debug)] @@ -102,10 +102,10 @@ pub(crate) async fn ensure_storage_initialized( Box::new(main_node_client.for_component("snapshot_recovery")), blob_store, ); - if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch { + if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch_override { tracing::info!( "Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \ - if the snapshot is too old or non-existent" + if the snapshot is too old (order of several weeks old) or non-existent" ); snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch); } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 6845bca236d7..05f4b2ba9d43 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -913,7 +913,7 @@ async fn run_node( .optional .snapshots_recovery_enabled .then_some(SnapshotRecoveryConfig { - snapshot_l1_batch: config.experimental.snapshots_recovery_l1_batch, + snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch, }); ensure_storage_initialized( connection_pool.clone(), From 5fea9f7850489a5c69979253da429d0180e1d72e Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 4 Jun 2024 18:12:22 +0300 Subject: [PATCH 7/7] Simplify error handling --- core/lib/snapshots_applier/src/lib.rs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index 2ad4a57b9c30..b0024f78433f 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -398,32 +398,22 @@ impl SnapshotRecoveryStrategy { main_node_client: &dyn SnapshotsApplierMainNodeClient, snapshot_l1_batch: Option, ) -> Result { - let (l1_batch_number, is_latest) = match snapshot_l1_batch { - Some(num) => (num, false), - None => { - let num = main_node_client - .fetch_newest_snapshot_l1_batch_number() - .await? - .context("no snapshots on main node; snapshot recovery is impossible")?; - (num, true) - } + let l1_batch_number = match snapshot_l1_batch { + Some(num) => num, + None => main_node_client + .fetch_newest_snapshot_l1_batch_number() + .await? + .context("no snapshots on main node; snapshot recovery is impossible")?, }; let snapshot_response = main_node_client.fetch_snapshot(l1_batch_number).await?; let snapshot = snapshot_response.with_context(|| { - if is_latest { - format!( - "latest snapshot for L1 batch #{l1_batch_number} disappeared from main node" - ) - } else { - format!("snapshot for L1 batch #{l1_batch_number} is not present on main node") - } + format!("snapshot for L1 batch #{l1_batch_number} is not present on main node") })?; let l2_block_number = snapshot.l2_block_number; tracing::info!( - "Found {snapshot_kind} snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \ + "Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \ version {version}, storage logs are divided into {chunk_count} chunk(s)", - snapshot_kind = if is_latest { "latest" } else { "requested" }, version = snapshot.version, chunk_count = snapshot.storage_logs_chunks.len() );