Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Unify snapshot recovery and recovery from L1 #2256

Merged
merged 23 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ jobs:
fi
ENABLE_CONSENSUS=${{ matrix.consensus }} \
DEPLOYMENT_MODE=${{ matrix.deployment_mode }} \
SNAPSHOTS_CREATOR_VERSION=${{ matrix.deployment_mode == 'Validium' && '0' || '1' }} \
DISABLE_TREE_DURING_PRUNING=${{ matrix.base_token == 'Eth' }} \
ETH_CLIENT_WEB3_URL="http://reth:8545" \
PASSED_ENV_VARS="ENABLE_CONSENSUS,DEPLOYMENT_MODE,DISABLE_TREE_DURING_PRUNING,ETH_CLIENT_WEB3_URL" \
PASSED_ENV_VARS="ENABLE_CONSENSUS,DEPLOYMENT_MODE,DISABLE_TREE_DURING_PRUNING,SNAPSHOTS_CREATOR_VERSION,ETH_CLIENT_WEB3_URL" \
ci_run yarn recovery-test snapshot-recovery-test

- name: Genesis recovery test
Expand Down
2 changes: 1 addition & 1 deletion checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ eth_call
versa
blake2
AR16MT
Preimages
preimages
EN's
SystemContext
StorageOracle
Expand Down
16 changes: 13 additions & 3 deletions core/bin/snapshots_creator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ Creating a snapshot is a part of the [snapshot recovery integration test]. You c

Each snapshot consists of three types of data (see [`snapshots.rs`] for exact definitions):

- **Header:** Includes basic information, such as the miniblock / L1 batch of the snapshot, miniblock / L1 batch
timestamps, miniblock hash and L1 batch root hash. Returned by the methods in the `snapshots` namespace of the
JSON-RPC API of the main node.
- **Header:** Includes basic information, such as the L2 block / L1 batch of the snapshot, L2 block / L1 batch
timestamps, L2 block hash and L1 batch root hash. Returned by the methods in the `snapshots` namespace of the JSON-RPC
API of the main node.
- **Storage log chunks:** Latest values for all VM storage slots ever written to at the time the snapshot is made.
Besides key–value pairs, each storage log record also contains the L1 batch number of its initial write and its
enumeration index; both are used to restore the contents of the `initial_writes` table. Chunking storage logs is
Expand All @@ -64,6 +64,16 @@ Each snapshot consists of three types of data (see [`snapshots.rs`] for exact de
- **Factory dependencies:** All bytecodes deployed on L2 at the time the snapshot is made. Stored as a single gzipped
Protobuf message in an object store.

### Versioning

There are currently 2 versions of the snapshot format which differ in how keys are mentioned in storage logs.

- Version 0 includes key preimages (EVM-compatible keys), i.e. address / contract slot tuples.
- Version 1 includes only hashed keys as used in Era ZKP circuits and in the Merkle tree. Besides reducing the snapshot
size (with the change, keys occupy 32 bytes instead of 52), this allows to unify snapshot recovery with recovery from
L1 data. Having only hashed keys for snapshot storage logs is safe; key preimages are only required for a couple of
components to sort keys in a batch, but these cases only require preimages for L1 batches locally executed on a node.

[`snapshots.rs`]: ../../lib/types/src/snapshots.rs
[object store]: ../../lib/object_store
[snapshot recovery integration test]: ../../tests/recovery-test/tests/snapshot-recovery.test.ts
145 changes: 100 additions & 45 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! [`SnapshotCreator`] and tightly related types.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use anyhow::Context as _;
use tokio::sync::Semaphore;
use zksync_config::SnapshotsCreatorConfig;
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalResult};
use zksync_object_store::ObjectStore;
use zksync_object_store::{ObjectStore, StoredObject};
use zksync_types::{
snapshots::{
uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, SnapshotVersion,
SnapshotMetadata, SnapshotStorageLog, SnapshotStorageLogsChunk,
SnapshotStorageLogsStorageKey, SnapshotVersion,
},
L1BatchNumber, L2BlockNumber,
};
Expand All @@ -22,6 +23,7 @@ use crate::tests::HandleEvent;
/// Encapsulates progress of creating a particular storage snapshot.
#[derive(Debug)]
struct SnapshotProgress {
version: SnapshotVersion,
l1_batch_number: L1BatchNumber,
/// `true` if the snapshot is new (i.e., its progress is not recovered from Postgres).
is_new_snapshot: bool,
Expand All @@ -30,8 +32,9 @@ struct SnapshotProgress {
}

impl SnapshotProgress {
fn new(l1_batch_number: L1BatchNumber, chunk_count: u64) -> Self {
fn new(version: SnapshotVersion, l1_batch_number: L1BatchNumber, chunk_count: u64) -> Self {
Self {
version,
l1_batch_number,
is_new_snapshot: true,
chunk_count,
Expand All @@ -48,6 +51,7 @@ impl SnapshotProgress {
.collect();

Self {
version: snapshot.version,
l1_batch_number: snapshot.l1_batch_number,
is_new_snapshot: false,
chunk_count: snapshot.storage_logs_filepaths.len() as u64,
Expand Down Expand Up @@ -76,11 +80,13 @@ impl SnapshotCreator {
async fn process_storage_logs_single_chunk(
&self,
semaphore: &Semaphore,
progress: &SnapshotProgress,
l2_block_number: L2BlockNumber,
l1_batch_number: L1BatchNumber,
chunk_id: u64,
chunk_count: u64,
) -> anyhow::Result<()> {
let chunk_count = progress.chunk_count;
let l1_batch_number = progress.l1_batch_number;

let _permit = semaphore.acquire().await?;
#[cfg(test)]
if self.event_listener.on_chunk_started().should_exit() {
Expand All @@ -92,35 +98,45 @@ impl SnapshotCreator {

let latency =
METRICS.storage_logs_processing_duration[&StorageChunkStage::LoadFromPostgres].start();
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(l2_block_number, l1_batch_number, hashed_keys_range)
.await
.context("Error fetching storage logs count")?;
drop(conn);
let latency = latency.observe();
tracing::info!(
"Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}",
logs.len()
);

let latency =
METRICS.storage_logs_processing_duration[&StorageChunkStage::SaveToGcs].start();
let storage_logs_chunk = SnapshotStorageLogsChunk { storage_logs: logs };
let key = SnapshotStorageLogsStorageKey {
l1_batch_number,
chunk_id,
let (output_filepath, latency) = match progress.version {
SnapshotVersion::Version0 => {
#[allow(deprecated)] // support of version 0 snapshots will be removed eventually
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk_with_key_preimages(
l2_block_number,
l1_batch_number,
hashed_keys_range,
)
.await
.context("error fetching storage logs")?;
drop(conn);

let latency = latency.observe();
tracing::info!(
"Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}",
logs.len()
);
self.store_storage_logs_chunk(l1_batch_number, chunk_id, logs)
.await?
}
SnapshotVersion::Version1 => {
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(l2_block_number, l1_batch_number, hashed_keys_range)
.await
.context("error fetching storage logs")?;
drop(conn);

let latency = latency.observe();
tracing::info!(
"Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}",
logs.len()
);
self.store_storage_logs_chunk(l1_batch_number, chunk_id, logs)
.await?
}
};
let filename = self
.blob_store
.put(key, &storage_logs_chunk)
.await
.context("Error storing storage logs chunk in blob store")?;
let output_filepath_prefix = self
.blob_store
.get_storage_prefix::<SnapshotStorageLogsChunk>();
let output_filepath = format!("{output_filepath_prefix}/{filename}");
let latency = latency.observe();

let mut master_conn = self
.master_pool
Expand All @@ -141,6 +157,35 @@ impl SnapshotCreator {
Ok(())
}

async fn store_storage_logs_chunk<K>(
&self,
l1_batch_number: L1BatchNumber,
chunk_id: u64,
logs: Vec<SnapshotStorageLog<K>>,
) -> anyhow::Result<(String, Duration)>
where
for<'a> SnapshotStorageLogsChunk<K>: StoredObject<Key<'a> = SnapshotStorageLogsStorageKey>,
{
let latency =
METRICS.storage_logs_processing_duration[&StorageChunkStage::SaveToGcs].start();
let storage_logs_chunk = SnapshotStorageLogsChunk { storage_logs: logs };
let key = SnapshotStorageLogsStorageKey {
l1_batch_number,
chunk_id,
};
let filename = self
.blob_store
.put(key, &storage_logs_chunk)
.await
.context("Error storing storage logs chunk in blob store")?;
let output_filepath_prefix = self
.blob_store
.get_storage_prefix::<SnapshotStorageLogsChunk<K>>();
let output_filepath = format!("{output_filepath_prefix}/{filename}");
let latency = latency.observe();
Ok((output_filepath, latency))
}

async fn process_factory_deps(
&self,
l2_block_number: L2BlockNumber,
Expand Down Expand Up @@ -194,6 +239,9 @@ impl SnapshotCreator {
latest_snapshot: Option<&SnapshotMetadata>,
conn: &mut Connection<'_, Core>,
) -> anyhow::Result<Option<SnapshotProgress>> {
let snapshot_version = SnapshotVersion::try_from(config.version)
.context("invalid snapshot version specified in config")?;

// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch.
let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?;
let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?;
Expand Down Expand Up @@ -238,7 +286,11 @@ impl SnapshotCreator {
"Selected storage logs chunking for L1 batch {l1_batch_number}: \
{chunk_count} chunks of expected size {chunk_size}"
);
Ok(Some(SnapshotProgress::new(l1_batch_number, chunk_count)))
Ok(Some(SnapshotProgress::new(
snapshot_version,
l1_batch_number,
chunk_count,
)))
}

/// Returns `Ok(None)` if a snapshot should not be created / resumed.
Expand Down Expand Up @@ -319,7 +371,7 @@ impl SnapshotCreator {
master_conn
.snapshots_dal()
.add_snapshot(
SnapshotVersion::Version0,
progress.version,
progress.l1_batch_number,
progress.chunk_count,
&factory_deps_output_file,
Expand All @@ -331,15 +383,18 @@ impl SnapshotCreator {
.storage_logs_chunks_left_to_process
.set(progress.remaining_chunk_ids.len());
let semaphore = Semaphore::new(config.concurrent_queries_count as usize);
let tasks = progress.remaining_chunk_ids.into_iter().map(|chunk_id| {
self.process_storage_logs_single_chunk(
&semaphore,
last_l2_block_number_in_batch,
progress.l1_batch_number,
chunk_id,
progress.chunk_count,
)
});
let tasks = progress
.remaining_chunk_ids
.iter()
.copied()
.map(|chunk_id| {
self.process_storage_logs_single_chunk(
&semaphore,
&progress,
last_l2_block_number_in_batch,
chunk_id,
)
});
futures::future::try_join_all(tasks).await?;

METRICS
Expand Down
50 changes: 48 additions & 2 deletions core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ use zksync_types::{
use super::*;

const TEST_CONFIG: SnapshotsCreatorConfig = SnapshotsCreatorConfig {
version: 1,
storage_logs_chunk_size: 1_000_000,
concurrent_queries_count: 10,
object_store: None,
};
const SEQUENTIAL_TEST_CONFIG: SnapshotsCreatorConfig = SnapshotsCreatorConfig {
version: 1,
storage_logs_chunk_size: 1_000_000,
concurrent_queries_count: 1,
object_store: None,
Expand Down Expand Up @@ -181,6 +183,7 @@ async fn create_l1_batch(

let mut written_keys: Vec<_> = logs_for_initial_writes.iter().map(|log| log.key).collect();
written_keys.sort_unstable();
let written_keys: Vec<_> = written_keys.iter().map(StorageKey::hashed_key).collect();
conn.storage_logs_dedup_dal()
.insert_initial_writes(l1_batch_number, &written_keys)
.await
Expand Down Expand Up @@ -241,7 +244,7 @@ async fn prepare_postgres(
let (l1_batch_number_of_initial_write, enumeration_index) =
expected_l1_batches_and_indices[&log.key.hashed_key()];
SnapshotStorageLog {
key: log.key,
key: log.key.hashed_key(),
value: log.value,
l1_batch_number_of_initial_write,
enumeration_index,
Expand Down Expand Up @@ -350,7 +353,50 @@ async fn assert_storage_logs(
chunk_id,
};
let chunk: SnapshotStorageLogsChunk = object_store.get(key).await.unwrap();
actual_logs.extend(chunk.storage_logs.into_iter());
actual_logs.extend(chunk.storage_logs);
}
assert_eq!(actual_logs, expected_outputs.storage_logs);
}

#[tokio::test]
async fn persisting_snapshot_logs_for_v0_snapshot() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

let config = SnapshotsCreatorConfig {
version: 0,
..TEST_CONFIG
};
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(config, MIN_CHUNK_COUNT)
.await
.unwrap();
let snapshot_l1_batch_number = L1BatchNumber(8);

// Logs must be compatible with version 1 `SnapshotStorageLog` format
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;

// ...and must be compatible with version 0 format as well
let mut actual_logs = HashSet::new();
for chunk_id in 0..MIN_CHUNK_COUNT {
let key = SnapshotStorageLogsStorageKey {
l1_batch_number: snapshot_l1_batch_number,
chunk_id,
};
let chunk: SnapshotStorageLogsChunk<StorageKey> = object_store.get(key).await.unwrap();
let logs_with_hashed_key = chunk
.storage_logs
.into_iter()
.map(|log| SnapshotStorageLog {
key: log.key.hashed_key(),
value: log.value,
l1_batch_number_of_initial_write: log.l1_batch_number_of_initial_write,
enumeration_index: log.enumeration_index,
});
actual_logs.extend(logs_with_hashed_key);
}
assert_eq!(actual_logs, expected_outputs.storage_logs);
}
Expand Down
Loading
Loading