Skip to content

Commit

Permalink
feat(snapshots_applier): Add a method to check whether snapshot recov…
Browse files Browse the repository at this point in the history
…ery is done (#2338)

## What ❔

subj

## Why ❔

Such a method is useful, for example, to understand whether it's safe to
start the API server in distributed mode.
Will be a part of EN check for storage initialization.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 1, 2024
1 parent 6c308d2 commit 610a7cf
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 4 deletions.
54 changes: 53 additions & 1 deletion core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Logic for applying application-level snapshots to Postgres storage.
use std::{collections::HashMap, fmt, mem, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
cmp::Ordering, collections::HashMap, fmt, mem, num::NonZeroUsize, sync::Arc, time::Duration,
};

use anyhow::Context as _;
use async_trait::async_trait;
Expand Down Expand Up @@ -191,6 +193,17 @@ impl SnapshotsApplierMainNodeClient for Box<DynClient<L2>> {
}
}

/// Reported status of the snapshot recovery progress.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum RecoveryCompletionStatus {
/// There is no infomration about snapshot recovery in the database.
NoRecoveryDetected,
/// Snapshot recovery is not finished yet.
InProgress,
/// Snapshot recovery is completed.
Completed,
}

/// Snapshot applier configuration options.
#[derive(Debug, Clone)]
pub struct SnapshotsApplierConfig {
Expand Down Expand Up @@ -263,6 +276,45 @@ impl SnapshotsApplierTask {
}
}

/// Checks whether the snapshot recovery is already completed.
///
/// Returns `None` if no snapshot recovery information is detected in the DB.
/// Returns `Some(true)` if the recovery is completed.
/// Returns `Some(false)` if the recovery is not completed.
pub async fn is_recovery_completed(
conn: &mut Connection<'_, Core>,
client: &dyn SnapshotsApplierMainNodeClient,
) -> anyhow::Result<RecoveryCompletionStatus> {
let Some(applied_snapshot_status) = conn
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?
else {
return Ok(RecoveryCompletionStatus::NoRecoveryDetected);
};
// If there are unprocessed storage logs chunks, the recovery is not complete.
if applied_snapshot_status.storage_logs_chunks_left_to_process() != 0 {
return Ok(RecoveryCompletionStatus::InProgress);
}
// Currently, migrating tokens is the last step of the recovery.
// The number of tokens is not a part of the snapshot header, so we have to re-query the main node.
let added_tokens = conn
.tokens_web3_dal()
.get_all_tokens(Some(applied_snapshot_status.l2_block_number))
.await?
.len();
let tokens_on_main_node = client
.fetch_tokens(applied_snapshot_status.l2_block_number)
.await?
.len();

match added_tokens.cmp(&tokens_on_main_node) {
Ordering::Less => Ok(RecoveryCompletionStatus::InProgress),
Ordering::Equal => Ok(RecoveryCompletionStatus::Completed),
Ordering::Greater => anyhow::bail!("DB contains more tokens than the main node"),
}
}

/// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed.
pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) {
self.snapshot_l1_batch = Some(number);
Expand Down
68 changes: 67 additions & 1 deletion core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ use crate::tests::utils::HangingObjectStore;

mod utils;

async fn is_recovery_completed(
pool: &ConnectionPool<Core>,
client: &MockMainNodeClient,
) -> RecoveryCompletionStatus {
let mut connection = pool.connection().await.unwrap();
SnapshotsApplierTask::is_recovery_completed(&mut connection, client)
.await
.unwrap()
}

#[test_casing(3, [(None, false), (Some(2), false), (None, true)])]
#[tokio::test]
async fn snapshots_creator_can_successfully_recover_db(
Expand All @@ -36,6 +46,7 @@ async fn snapshots_creator_can_successfully_recover_db(
} else {
ConnectionPool::<Core>::test_pool().await
};

let expected_status = mock_recovery_status();
let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200);
let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await;
Expand All @@ -60,6 +71,12 @@ async fn snapshots_creator_can_successfully_recover_db(
object_store
};

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::NoRecoveryDetected,
"No snapshot information in the DB"
);

let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool.clone(),
Expand All @@ -74,6 +91,12 @@ async fn snapshots_creator_can_successfully_recover_db(
HealthStatus::Ready
);

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::Completed,
"Recovery has been completed"
);

let mut storage = pool.connection().await.unwrap();
let mut recovery_dal = storage.snapshot_recovery_dal();

Expand Down Expand Up @@ -261,6 +284,12 @@ async fn snapshot_applier_recovers_after_stopping() {
assert!(!task_handle.is_finished());
task_handle.abort();

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::InProgress,
"Recovery has been aborted"
);

// Check that factory deps have been persisted, but no storage logs.
let mut storage = pool.connection().await.unwrap();
let all_factory_deps = storage
Expand Down Expand Up @@ -290,6 +319,12 @@ async fn snapshot_applier_recovers_after_stopping() {
assert!(!task_handle.is_finished());
task_handle.abort();

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::InProgress,
"Not all logs have been recovered"
);

let all_storage_logs = storage
.storage_logs_dal()
.dump_all_storage_logs_for_tests()
Expand All @@ -301,12 +336,18 @@ async fn snapshot_applier_recovers_after_stopping() {
let mut task = SnapshotsApplierTask::new(
config,
pool.clone(),
Box::new(client),
Box::new(client.clone()),
Arc::new(stopping_object_store),
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine
task.run().await.unwrap();

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::Completed,
"Recovery has been completed"
);

let all_storage_logs = storage
.storage_logs_dal()
.dump_all_storage_logs_for_tests()
Expand Down Expand Up @@ -535,6 +576,25 @@ async fn recovering_tokens() {

client.tokens_response.clone_from(&tokens);

// Make sure that the task will fail when we will start migrating tokens.
client.set_token_response_error(EnrichedClientError::custom("Error", "not_important"));

let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
);
let task_result = task.run().await;
assert!(task_result.is_err());

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::InProgress,
"Tokens are not migrated"
);

// Now perform the recovery again, tokens should be migrated.
let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool.clone(),
Expand All @@ -543,6 +603,12 @@ async fn recovering_tokens() {
);
task.run().await.unwrap();

assert_eq!(
is_recovery_completed(&pool, &client).await,
RecoveryCompletionStatus::Completed,
"Recovery is completed"
);

// Check that tokens are successfully restored.
let mut storage = pool.connection().await.unwrap();
let recovered_tokens = storage
Expand Down
25 changes: 23 additions & 2 deletions core/lib/snapshots_applier/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! Test utils.
use std::{collections::HashMap, fmt, future, sync::Arc};
use std::{
collections::HashMap,
fmt, future,
sync::{Arc, RwLock},
};

use async_trait::async_trait;
use tokio::sync::watch;
Expand All @@ -18,7 +22,7 @@ use zksync_types::{
AccountTreeId, Address, L1BatchNumber, L2BlockNumber, ProtocolVersionId, StorageKey,
StorageValue, H256,
};
use zksync_web3_decl::error::EnrichedClientResult;
use zksync_web3_decl::error::{EnrichedClientError, EnrichedClientResult};

use crate::SnapshotsApplierMainNodeClient;

Expand Down Expand Up @@ -50,6 +54,19 @@ pub(super) struct MockMainNodeClient {
pub fetch_l2_block_responses: HashMap<L2BlockNumber, api::BlockDetails>,
pub fetch_newest_snapshot_response: Option<SnapshotHeader>,
pub tokens_response: Vec<TokenInfo>,
pub tokens_response_error: Arc<RwLock<Option<EnrichedClientError>>>,
}

impl MockMainNodeClient {
/// Sets the error to be returned by the `fetch_tokens` method.
/// Error will be returned just once. Next time the request will succeed.
pub(super) fn set_token_response_error(&self, error: EnrichedClientError) {
*self.tokens_response_error.write().unwrap() = Some(error);
}

fn take_token_response_error(&self) -> Option<EnrichedClientError> {
self.tokens_response_error.write().unwrap().take()
}
}

#[async_trait]
Expand Down Expand Up @@ -91,6 +108,10 @@ impl SnapshotsApplierMainNodeClient for MockMainNodeClient {
&self,
_at_l2_block: L2BlockNumber,
) -> EnrichedClientResult<Vec<TokenInfo>> {
if let Some(error) = self.take_token_response_error() {
return Err(error);
}

Ok(self.tokens_response.clone())
}
}
Expand Down

0 comments on commit 610a7cf

Please sign in to comment.