From 610a7cf037c6c655564deffebbf5a3fe5533783b Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Mon, 1 Jul 2024 08:11:37 +0400 Subject: [PATCH] feat(snapshots_applier): Add a method to check whether snapshot recovery is done (#2338) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ subj ## Why ❔ Such a method is useful, for example, to understand whether it's safe to start the API server in distributed mode. Will be a part of EN check for storage initialization. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/snapshots_applier/src/lib.rs | 54 ++++++++++++++- core/lib/snapshots_applier/src/tests/mod.rs | 68 ++++++++++++++++++- core/lib/snapshots_applier/src/tests/utils.rs | 25 ++++++- 3 files changed, 143 insertions(+), 4 deletions(-) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index e160a2b96275..0ee4b2a901f6 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -1,6 +1,8 @@ //! Logic for applying application-level snapshots to Postgres storage. -use std::{collections::HashMap, fmt, mem, num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{ + cmp::Ordering, collections::HashMap, fmt, mem, num::NonZeroUsize, sync::Arc, time::Duration, +}; use anyhow::Context as _; use async_trait::async_trait; @@ -191,6 +193,17 @@ impl SnapshotsApplierMainNodeClient for Box> { } } +/// Reported status of the snapshot recovery progress. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum RecoveryCompletionStatus { + /// There is no infomration about snapshot recovery in the database. + NoRecoveryDetected, + /// Snapshot recovery is not finished yet. + InProgress, + /// Snapshot recovery is completed. + Completed, +} + /// Snapshot applier configuration options. #[derive(Debug, Clone)] pub struct SnapshotsApplierConfig { @@ -263,6 +276,45 @@ impl SnapshotsApplierTask { } } + /// Checks whether the snapshot recovery is already completed. + /// + /// Returns `None` if no snapshot recovery information is detected in the DB. + /// Returns `Some(true)` if the recovery is completed. + /// Returns `Some(false)` if the recovery is not completed. + pub async fn is_recovery_completed( + conn: &mut Connection<'_, Core>, + client: &dyn SnapshotsApplierMainNodeClient, + ) -> anyhow::Result { + let Some(applied_snapshot_status) = conn + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await? + else { + return Ok(RecoveryCompletionStatus::NoRecoveryDetected); + }; + // If there are unprocessed storage logs chunks, the recovery is not complete. + if applied_snapshot_status.storage_logs_chunks_left_to_process() != 0 { + return Ok(RecoveryCompletionStatus::InProgress); + } + // Currently, migrating tokens is the last step of the recovery. + // The number of tokens is not a part of the snapshot header, so we have to re-query the main node. + let added_tokens = conn + .tokens_web3_dal() + .get_all_tokens(Some(applied_snapshot_status.l2_block_number)) + .await? + .len(); + let tokens_on_main_node = client + .fetch_tokens(applied_snapshot_status.l2_block_number) + .await? + .len(); + + match added_tokens.cmp(&tokens_on_main_node) { + Ordering::Less => Ok(RecoveryCompletionStatus::InProgress), + Ordering::Equal => Ok(RecoveryCompletionStatus::Completed), + Ordering::Greater => anyhow::bail!("DB contains more tokens than the main node"), + } + } + /// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed. pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) { self.snapshot_l1_batch = Some(number); diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 2f78bdc274d6..51578b5090d8 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -25,6 +25,16 @@ use crate::tests::utils::HangingObjectStore; mod utils; +async fn is_recovery_completed( + pool: &ConnectionPool, + client: &MockMainNodeClient, +) -> RecoveryCompletionStatus { + let mut connection = pool.connection().await.unwrap(); + SnapshotsApplierTask::is_recovery_completed(&mut connection, client) + .await + .unwrap() +} + #[test_casing(3, [(None, false), (Some(2), false), (None, true)])] #[tokio::test] async fn snapshots_creator_can_successfully_recover_db( @@ -36,6 +46,7 @@ async fn snapshots_creator_can_successfully_recover_db( } else { ConnectionPool::::test_pool().await }; + let expected_status = mock_recovery_status(); let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; @@ -60,6 +71,12 @@ async fn snapshots_creator_can_successfully_recover_db( object_store }; + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::NoRecoveryDetected, + "No snapshot information in the DB" + ); + let task = SnapshotsApplierTask::new( SnapshotsApplierConfig::for_tests(), pool.clone(), @@ -74,6 +91,12 @@ async fn snapshots_creator_can_successfully_recover_db( HealthStatus::Ready ); + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::Completed, + "Recovery has been completed" + ); + let mut storage = pool.connection().await.unwrap(); let mut recovery_dal = storage.snapshot_recovery_dal(); @@ -261,6 +284,12 @@ async fn snapshot_applier_recovers_after_stopping() { assert!(!task_handle.is_finished()); task_handle.abort(); + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::InProgress, + "Recovery has been aborted" + ); + // Check that factory deps have been persisted, but no storage logs. let mut storage = pool.connection().await.unwrap(); let all_factory_deps = storage @@ -290,6 +319,12 @@ async fn snapshot_applier_recovers_after_stopping() { assert!(!task_handle.is_finished()); task_handle.abort(); + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::InProgress, + "Not all logs have been recovered" + ); + let all_storage_logs = storage .storage_logs_dal() .dump_all_storage_logs_for_tests() @@ -301,12 +336,18 @@ async fn snapshot_applier_recovers_after_stopping() { let mut task = SnapshotsApplierTask::new( config, pool.clone(), - Box::new(client), + Box::new(client.clone()), Arc::new(stopping_object_store), ); task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine task.run().await.unwrap(); + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::Completed, + "Recovery has been completed" + ); + let all_storage_logs = storage .storage_logs_dal() .dump_all_storage_logs_for_tests() @@ -535,6 +576,25 @@ async fn recovering_tokens() { client.tokens_response.clone_from(&tokens); + // Make sure that the task will fail when we will start migrating tokens. + client.set_token_response_error(EnrichedClientError::custom("Error", "not_important")); + + let task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool.clone(), + Box::new(client.clone()), + object_store.clone(), + ); + let task_result = task.run().await; + assert!(task_result.is_err()); + + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::InProgress, + "Tokens are not migrated" + ); + + // Now perform the recovery again, tokens should be migrated. let task = SnapshotsApplierTask::new( SnapshotsApplierConfig::for_tests(), pool.clone(), @@ -543,6 +603,12 @@ async fn recovering_tokens() { ); task.run().await.unwrap(); + assert_eq!( + is_recovery_completed(&pool, &client).await, + RecoveryCompletionStatus::Completed, + "Recovery is completed" + ); + // Check that tokens are successfully restored. let mut storage = pool.connection().await.unwrap(); let recovered_tokens = storage diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index 3374e62452d8..c546fb60c09b 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -1,6 +1,10 @@ //! Test utils. -use std::{collections::HashMap, fmt, future, sync::Arc}; +use std::{ + collections::HashMap, + fmt, future, + sync::{Arc, RwLock}, +}; use async_trait::async_trait; use tokio::sync::watch; @@ -18,7 +22,7 @@ use zksync_types::{ AccountTreeId, Address, L1BatchNumber, L2BlockNumber, ProtocolVersionId, StorageKey, StorageValue, H256, }; -use zksync_web3_decl::error::EnrichedClientResult; +use zksync_web3_decl::error::{EnrichedClientError, EnrichedClientResult}; use crate::SnapshotsApplierMainNodeClient; @@ -50,6 +54,19 @@ pub(super) struct MockMainNodeClient { pub fetch_l2_block_responses: HashMap, pub fetch_newest_snapshot_response: Option, pub tokens_response: Vec, + pub tokens_response_error: Arc>>, +} + +impl MockMainNodeClient { + /// Sets the error to be returned by the `fetch_tokens` method. + /// Error will be returned just once. Next time the request will succeed. + pub(super) fn set_token_response_error(&self, error: EnrichedClientError) { + *self.tokens_response_error.write().unwrap() = Some(error); + } + + fn take_token_response_error(&self) -> Option { + self.tokens_response_error.write().unwrap().take() + } } #[async_trait] @@ -91,6 +108,10 @@ impl SnapshotsApplierMainNodeClient for MockMainNodeClient { &self, _at_l2_block: L2BlockNumber, ) -> EnrichedClientResult> { + if let Some(error) = self.take_token_response_error() { + return Err(error); + } + Ok(self.tokens_response.clone()) } }