From 046665526d11d57dc91b80b0d854da8b5a95b13a Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 25 Jun 2024 18:42:12 +1000 Subject: [PATCH] feat(metadata-calculator): option to use VM runner for protective reads (#2318) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ ## Why ❔ ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- core/bin/external_node/src/main.rs | 1 + core/bin/external_node/src/node_builder.rs | 4 + core/bin/zksync_server/src/node_builder.rs | 2 + ...6862d6ad0de7c7ca1d5320800f317428f07e1.json | 14 ++ core/lib/dal/src/vm_runner_dal.rs | 28 +++ core/node/consensus/src/testonly.rs | 13 +- .../src/api_server/tests.rs | 4 +- core/node/metadata_calculator/src/lib.rs | 14 +- .../metadata_calculator/src/recovery/tests.rs | 8 +- core/node/metadata_calculator/src/tests.rs | 203 +++++++++++++----- core/node/metadata_calculator/src/updater.rs | 75 ++++--- .../node/node_framework/examples/main_node.rs | 2 + 12 files changed, 277 insertions(+), 91 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-daa330d43f150824f2195cdbfb96862d6ad0de7c7ca1d5320800f317428f07e1.json diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 0adf3ddf8cb5..5d5de22cabf9 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -139,6 +139,7 @@ async fn run_tree( .merkle_tree_include_indices_and_filters_in_block_cache, memtable_capacity: config.optional.merkle_tree_memtable_capacity(), stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(), + sealed_batches_have_protective_reads: config.optional.protective_reads_persistence_enabled, recovery: MetadataCalculatorRecoveryConfig { desired_chunk_size: config.experimental.snapshots_recovery_tree_chunk_size, parallel_persistence_buffer: config diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 5eaff63d20a0..cfe8f1ea7c01 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -306,6 +306,10 @@ impl ExternalNodeBuilder { .merkle_tree_include_indices_and_filters_in_block_cache, memtable_capacity: self.config.optional.merkle_tree_memtable_capacity(), stalled_writes_timeout: self.config.optional.merkle_tree_stalled_writes_timeout(), + sealed_batches_have_protective_reads: self + .config + .optional + .protective_reads_persistence_enabled, recovery: MetadataCalculatorRecoveryConfig { desired_chunk_size: self.config.experimental.snapshots_recovery_tree_chunk_size, parallel_persistence_buffer: self diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index d1fecb1e3d7e..32c7daf82ceb 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -175,9 +175,11 @@ impl MainNodeBuilder { let merkle_tree_env_config = try_load_config!(self.configs.db_config).merkle_tree; let operations_manager_env_config = try_load_config!(self.configs.operations_manager_config); + let state_keeper_env_config = try_load_config!(self.configs.state_keeper_config); let metadata_calculator_config = MetadataCalculatorConfig::for_main_node( &merkle_tree_env_config, &operations_manager_env_config, + &state_keeper_env_config, ); let mut layer = MetadataCalculatorLayer::new(metadata_calculator_config); if with_tree_api { diff --git a/core/lib/dal/.sqlx/query-daa330d43f150824f2195cdbfb96862d6ad0de7c7ca1d5320800f317428f07e1.json b/core/lib/dal/.sqlx/query-daa330d43f150824f2195cdbfb96862d6ad0de7c7ca1d5320800f317428f07e1.json new file mode 100644 index 000000000000..836bbc435f00 --- /dev/null +++ b/core/lib/dal/.sqlx/query-daa330d43f150824f2195cdbfb96862d6ad0de7c7ca1d5320800f317428f07e1.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM vm_runner_protective_reads\n WHERE\n l1_batch_number > $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "daa330d43f150824f2195cdbfb96862d6ad0de7c7ca1d5320800f317428f07e1" +} diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index 2d17ff3f9fca..4c07901c32bc 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -84,4 +84,32 @@ impl VmRunnerDal<'_, '_> { .await?; Ok(()) } + + pub async fn delete_protective_reads( + &mut self, + last_batch_to_keep: L1BatchNumber, + ) -> DalResult<()> { + self.delete_protective_reads_inner(Some(last_batch_to_keep)) + .await + } + + async fn delete_protective_reads_inner( + &mut self, + last_batch_to_keep: Option, + ) -> DalResult<()> { + let l1_batch_number = last_batch_to_keep.map_or(-1, |number| i64::from(number.0)); + sqlx::query!( + r#" + DELETE FROM vm_runner_protective_reads + WHERE + l1_batch_number > $1 + "#, + l1_batch_number + ) + .instrument("delete_protective_reads") + .with_arg("l1_batch_number", &l1_batch_number) + .execute(self.storage) + .await?; + Ok(()) + } } diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index ce16efed2225..d20c379a5d66 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -7,7 +7,7 @@ use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_config::{ configs, configs::{ - chain::OperationsManagerConfig, + chain::{OperationsManagerConfig, StateKeeperConfig}, consensus as config, database::{MerkleTreeConfig, MerkleTreeMode}, }, @@ -166,8 +166,15 @@ impl StateKeeper { let operation_manager_config = OperationsManagerConfig { delay_interval: 100, //`100ms` }; - let config = - MetadataCalculatorConfig::for_main_node(&merkle_tree_config, &operation_manager_config); + let state_keeper_config = StateKeeperConfig { + protective_reads_persistence_enabled: true, + ..Default::default() + }; + let config = MetadataCalculatorConfig::for_main_node( + &merkle_tree_config, + &operation_manager_config, + &state_keeper_config, + ); let metadata_calculator = MetadataCalculator::new(config, None, pool.0.clone()) .await .context("MetadataCalculator::new()")?; diff --git a/core/node/metadata_calculator/src/api_server/tests.rs b/core/node/metadata_calculator/src/api_server/tests.rs index 26782e446f3f..614e06b55023 100644 --- a/core/node/metadata_calculator/src/api_server/tests.rs +++ b/core/node/metadata_calculator/src/api_server/tests.rs @@ -17,7 +17,7 @@ use crate::tests::{gen_storage_logs, reset_db_state, run_calculator, setup_calcu async fn merkle_tree_api() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone(), true).await; let api_addr = (Ipv4Addr::LOCALHOST, 0).into(); reset_db_state(&pool, 5).await; @@ -114,7 +114,7 @@ async fn api_client_unparesable_response_error() { async fn local_merkle_tree_client() { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone(), true).await; reset_db_state(&pool, 5).await; let tree_reader = calculator.tree_reader(); diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index b57f0dfacb70..451090694b2c 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -10,7 +10,7 @@ use std::{ use anyhow::Context as _; use tokio::sync::{oneshot, watch}; use zksync_config::configs::{ - chain::OperationsManagerConfig, + chain::{OperationsManagerConfig, StateKeeperConfig}, database::{MerkleTreeConfig, MerkleTreeMode}, }; use zksync_dal::{ConnectionPool, Core}; @@ -89,6 +89,8 @@ pub struct MetadataCalculatorConfig { pub memtable_capacity: usize, /// Timeout to wait for the Merkle tree database to run compaction on stalled writes. pub stalled_writes_timeout: Duration, + /// Whether state keeper writes protective reads when it seals a batch. + pub sealed_batches_have_protective_reads: bool, /// Configuration specific to the Merkle tree recovery. pub recovery: MetadataCalculatorRecoveryConfig, } @@ -97,6 +99,7 @@ impl MetadataCalculatorConfig { pub fn for_main_node( merkle_tree_config: &MerkleTreeConfig, operation_config: &OperationsManagerConfig, + state_keeper_config: &StateKeeperConfig, ) -> Self { Self { db_path: merkle_tree_config.path.clone(), @@ -109,6 +112,8 @@ impl MetadataCalculatorConfig { include_indices_and_filters_in_block_cache: false, memtable_capacity: merkle_tree_config.memtable_capacity(), stalled_writes_timeout: merkle_tree_config.stalled_writes_timeout(), + sealed_batches_have_protective_reads: state_keeper_config + .protective_reads_persistence_enabled, // The main node isn't supposed to be recovered yet, so this value doesn't matter much recovery: MetadataCalculatorRecoveryConfig::default(), } @@ -248,7 +253,12 @@ impl MetadataCalculator { self.health_updater .update(MerkleTreeHealth::MainLoop(tree_info).into()); - let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); + let updater = TreeUpdater::new( + tree, + self.max_l1_batches_per_iter, + self.object_store, + self.config.sealed_batches_have_protective_reads, + ); updater .loop_updating_tree(self.delayer, &self.pool, stop_receiver) .await diff --git a/core/node/metadata_calculator/src/recovery/tests.rs b/core/node/metadata_calculator/src/recovery/tests.rs index f8edd3e5678d..dc333a30fa2e 100644 --- a/core/node/metadata_calculator/src/recovery/tests.rs +++ b/core/node/metadata_calculator/src/recovery/tests.rs @@ -7,7 +7,7 @@ use tempfile::TempDir; use test_casing::{test_casing, Product}; use tokio::sync::mpsc; use zksync_config::configs::{ - chain::OperationsManagerConfig, + chain::{OperationsManagerConfig, StateKeeperConfig}, database::{MerkleTreeConfig, MerkleTreeMode}, }; use zksync_dal::CoreDal; @@ -113,7 +113,7 @@ async fn prepare_recovery_snapshot_with_genesis( drop(storage); // Ensure that metadata for L1 batch #1 is present in the DB. - let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool).await; + let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool, true).await; let l1_batch_root_hash = run_calculator(calculator).await; SnapshotRecoveryStatus { @@ -306,6 +306,10 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { let calculator_config = MetadataCalculatorConfig::for_main_node( &merkle_tree_config, &OperationsManagerConfig { delay_interval: 50 }, + &StateKeeperConfig { + protective_reads_persistence_enabled: true, + ..Default::default() + }, ); let mut calculator = MetadataCalculator::new(calculator_config, None, pool.clone()) .await diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index 38e1a09d1091..d462511829dc 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -8,7 +8,7 @@ use tempfile::TempDir; use test_casing::{test_casing, Product}; use tokio::sync::{mpsc, watch}; use zksync_config::configs::{ - chain::OperationsManagerConfig, + chain::{OperationsManagerConfig, StateKeeperConfig}, database::{MerkleTreeConfig, MerkleTreeMode}, }; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; @@ -57,18 +57,21 @@ pub(super) fn mock_config(db_path: &Path) -> MetadataCalculatorConfig { include_indices_and_filters_in_block_cache: false, memtable_capacity: 16 << 20, // 16 MiB stalled_writes_timeout: Duration::ZERO, // writes should never be stalled in tests + sealed_batches_have_protective_reads: true, recovery: MetadataCalculatorRecoveryConfig::default(), } } +#[test_casing(2, [false, true])] #[tokio::test] -async fn genesis_creation() { +async fn genesis_creation(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; run_calculator(calculator).await; - let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool, sealed_protective_reads).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(tree) = tree else { @@ -100,11 +103,12 @@ async fn low_level_genesis_creation() { assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); } -#[test_casing(8, Product(([1, 4, 7, 9], [false, true])))] +#[test_casing(16, Product(([1, 4, 7, 9], [false, true], [false, true])))] #[tokio::test] async fn tree_truncation_on_l1_batch_divergence( last_common_l1_batch: u32, overwrite_tree_data: bool, + sealed_protective_reads: bool, ) { const INITIAL_BATCH_COUNT: usize = 10; @@ -113,7 +117,8 @@ async fn tree_truncation_on_l1_batch_divergence( let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, INITIAL_BATCH_COUNT).await; run_calculator(calculator).await; @@ -137,7 +142,8 @@ async fn tree_truncation_on_l1_batch_divergence( } } - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(mut tree) = tree else { panic!("Unexpected tree state: {tree:?}"); @@ -154,9 +160,12 @@ async fn tree_truncation_on_l1_batch_divergence( assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1); } -#[test_casing(4, [1, 4, 6, 7])] +#[test_casing(8, Product(([1, 4, 6, 7], [false, true])))] #[tokio::test] -async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch: u32) { +async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree( + retained_l1_batch: u32, + sealed_protective_reads: bool, +) { const INITIAL_BATCH_COUNT: usize = 10; const LAST_COMMON_L1_BATCH: L1BatchNumber = L1BatchNumber(6); @@ -164,7 +173,8 @@ async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, INITIAL_BATCH_COUNT).await; run_calculator(calculator).await; @@ -186,7 +196,8 @@ async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch .unwrap(); } - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(mut tree) = tree else { panic!("Unexpected tree state: {tree:?}"); @@ -221,18 +232,20 @@ async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch } } +#[test_casing(2, [false, true])] #[tokio::test] -async fn basic_workflow() { +async fn basic_workflow(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, object_store) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, object_store) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 1).await; let merkle_tree_hash = run_calculator(calculator).await; // Check the hash against the reference. - let expected_tree_hash = expected_tree_hash(&pool).await; + let expected_tree_hash = expected_tree_hash(&pool, sealed_protective_reads).await; assert_eq!(merkle_tree_hash, expected_tree_hash); let job: PrepareBasicCircuitsJob = object_store.get(L1BatchNumber(1)).await.unwrap(); @@ -242,7 +255,7 @@ async fn basic_workflow() { // ^ The exact values depend on ops in genesis block assert!(merkle_paths.iter().all(|log| log.is_write)); - let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool, sealed_protective_reads).await; let tree = calculator.create_tree().await.unwrap(); let GenericAsyncTree::Ready(tree) = tree else { panic!("Unexpected tree state: {tree:?}"); @@ -250,16 +263,24 @@ async fn basic_workflow() { assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(2)); } -async fn expected_tree_hash(pool: &ConnectionPool) -> H256 { +async fn expected_tree_hash(pool: &ConnectionPool, sealed_protective_reads: bool) -> H256 { let mut storage = pool.connection().await.unwrap(); - let sealed_l1_batch_number = storage - .blocks_dal() - .get_sealed_l1_batch_number() - .await - .unwrap() - .expect("No L1 batches in Postgres"); + let processed_l1_batch_number = if sealed_protective_reads { + storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap() + .expect("No L1 batches in Postgres") + } else { + storage + .vm_runner_dal() + .get_protective_reads_latest_processed_batch(L1BatchNumber(0)) + .await + .unwrap() + }; let mut all_logs = vec![]; - for i in 0..=sealed_l1_batch_number.0 { + for i in 0..=processed_l1_batch_number.0 { let logs = L1BatchWithLogs::new(&mut storage, L1BatchNumber(i), MerkleTreeMode::Lightweight) .await @@ -271,12 +292,14 @@ async fn expected_tree_hash(pool: &ConnectionPool) -> H256 { ZkSyncTree::process_genesis_batch(&all_logs).root_hash } +#[test_casing(2, [false, true])] #[tokio::test] -async fn status_receiver_has_correct_states() { +async fn status_receiver_has_correct_states(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (mut calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (mut calculator, _) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let tree_health_check = calculator.tree_health_check(); assert_eq!(tree_health_check.name(), "tree"); let health = tree_health_check.check_health().await; @@ -324,19 +347,22 @@ async fn status_receiver_has_correct_states() { .unwrap(); } +#[test_casing(2, [false, true])] #[tokio::test] -async fn multi_l1_batch_workflow() { +async fn multi_l1_batch_workflow(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; // Collect all storage logs in a single L1 batch let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 1).await; let root_hash = run_calculator(calculator).await; // Collect the same logs in multiple L1 batches let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, object_store) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, object_store) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 10).await; let multi_block_root_hash = run_calculator(calculator).await; assert_eq!(multi_block_root_hash, root_hash); @@ -360,11 +386,13 @@ async fn multi_l1_batch_workflow() { } } +#[test_casing(2, [false, true])] #[tokio::test] -async fn error_on_pruned_next_l1_batch() { +async fn error_on_pruned_next_l1_batch(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 1).await; run_calculator(calculator).await; @@ -390,7 +418,8 @@ async fn error_on_pruned_next_l1_batch() { .unwrap(); assert!(next_l1_batch_header.is_none()); - let (calculator, _) = setup_calculator(temp_dir.path(), pool.clone()).await; + let (calculator, _) = + setup_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let (_stop_sender, stop_receiver) = watch::channel(false); let err = calculator.run(stop_receiver).await.unwrap_err(); let err = format!("{err:#}"); @@ -400,16 +429,19 @@ async fn error_on_pruned_next_l1_batch() { ); } +#[test_casing(2, [false, true])] #[tokio::test] -async fn running_metadata_calculator_with_additional_blocks() { +async fn running_metadata_calculator_with_additional_blocks(sealed_protective_reads: bool) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 5).await; run_calculator(calculator).await; - let mut calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let mut calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let (stop_sx, stop_rx) = watch::channel(false); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; @@ -445,7 +477,7 @@ async fn running_metadata_calculator_with_additional_blocks() { .unwrap(); // Switch to the full tree. It should pick up from the same spot and result in the same tree root hash. - let (calculator, _) = setup_calculator(temp_dir.path(), pool).await; + let (calculator, _) = setup_calculator(temp_dir.path(), pool, true).await; let root_hash_for_full_tree = run_calculator(calculator).await; assert_eq!(root_hash_for_full_tree, updated_root_hash); } @@ -458,9 +490,17 @@ async fn shutting_down_calculator() { create_config(temp_dir.path(), MerkleTreeMode::Lightweight); operation_config.delay_interval = 30_000; // ms; chosen to be larger than `RUN_TIMEOUT` - let calculator = - setup_calculator_with_options(&merkle_tree_config, &operation_config, pool.clone(), None) - .await; + let calculator = setup_calculator_with_options( + &merkle_tree_config, + &operation_config, + &StateKeeperConfig { + protective_reads_persistence_enabled: true, + ..Default::default() + }, + pool.clone(), + None, + ) + .await; reset_db_state(&pool, 5).await; @@ -477,10 +517,12 @@ async fn shutting_down_calculator() { async fn test_postgres_backup_recovery( sleep_between_batches: bool, insert_batch_without_metadata: bool, + sealed_protective_reads: bool, ) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, 5).await; run_calculator(calculator).await; @@ -501,11 +543,17 @@ async fn test_postgres_backup_recovery( .insert_mock_l1_batch(batch_without_metadata) .await .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_without_metadata.number) + .await + .unwrap(); insert_initial_writes_for_batch(&mut storage, batch_without_metadata.number).await; } drop(storage); - let mut calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let mut calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let (stop_sx, stop_rx) = watch::channel(false); let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); calculator.delayer.delay_notifier = delay_sx; @@ -526,6 +574,10 @@ async fn test_postgres_backup_recovery( .insert_mock_l1_batch(batch_header) .await .unwrap(); + txn.vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_header.number) + .await + .unwrap(); insert_initial_writes_for_batch(&mut txn, batch_header.number).await; txn.commit().await.unwrap(); if sleep_between_batches { @@ -552,30 +604,38 @@ async fn test_postgres_backup_recovery( .unwrap(); } +#[test_casing(2, [false, true])] #[tokio::test] -async fn postgres_backup_recovery() { - test_postgres_backup_recovery(false, false).await; +async fn postgres_backup_recovery(sealed_protective_reads: bool) { + test_postgres_backup_recovery(false, false, sealed_protective_reads).await; } +#[test_casing(2, [false, true])] #[tokio::test] -async fn postgres_backup_recovery_with_delay_between_batches() { - test_postgres_backup_recovery(true, false).await; +async fn postgres_backup_recovery_with_delay_between_batches(sealed_protective_reads: bool) { + test_postgres_backup_recovery(true, false, sealed_protective_reads).await; } +#[test_casing(2, [false, true])] #[tokio::test] -async fn postgres_backup_recovery_with_excluded_metadata() { - test_postgres_backup_recovery(false, true).await; +async fn postgres_backup_recovery_with_excluded_metadata(sealed_protective_reads: bool) { + test_postgres_backup_recovery(false, true, sealed_protective_reads).await; } pub(crate) async fn setup_calculator( db_path: &Path, pool: ConnectionPool, + sealed_protective_reads: bool, ) -> (MetadataCalculator, Arc) { let store = MockObjectStore::arc(); let (merkle_tree_config, operation_manager) = create_config(db_path, MerkleTreeMode::Full); let calculator = setup_calculator_with_options( &merkle_tree_config, &operation_manager, + &StateKeeperConfig { + protective_reads_persistence_enabled: sealed_protective_reads, + ..Default::default() + }, pool, Some(store.clone()), ) @@ -586,9 +646,20 @@ pub(crate) async fn setup_calculator( async fn setup_lightweight_calculator( db_path: &Path, pool: ConnectionPool, + sealed_protective_reads: bool, ) -> MetadataCalculator { let (db_config, operation_config) = create_config(db_path, MerkleTreeMode::Lightweight); - setup_calculator_with_options(&db_config, &operation_config, pool, None).await + setup_calculator_with_options( + &db_config, + &operation_config, + &StateKeeperConfig { + protective_reads_persistence_enabled: sealed_protective_reads, + ..Default::default() + }, + pool, + None, + ) + .await } fn create_config( @@ -610,6 +681,7 @@ fn create_config( async fn setup_calculator_with_options( merkle_tree_config: &MerkleTreeConfig, operation_config: &OperationsManagerConfig, + state_keeper_config: &StateKeeperConfig, pool: ConnectionPool, object_store: Option>, ) -> MetadataCalculator { @@ -621,8 +693,11 @@ async fn setup_calculator_with_options( } drop(storage); - let calculator_config = - MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config); + let calculator_config = MetadataCalculatorConfig::for_main_node( + merkle_tree_config, + operation_config, + state_keeper_config, + ); MetadataCalculator::new(calculator_config, object_store, pool) .await .unwrap() @@ -676,6 +751,11 @@ pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usi .delete_initial_writes(L1BatchNumber(0)) .await .unwrap(); + storage + .vm_runner_dal() + .delete_protective_reads(L1BatchNumber(0)) + .await + .unwrap(); let logs = gen_storage_logs(0..100, num_batches); extend_db_state(&mut storage, logs).await; @@ -730,6 +810,11 @@ pub(super) async fn extend_db_state_from_l1_batch( .mark_l2_blocks_as_executed_in_l1_batch(batch_number) .await .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_number) + .await + .unwrap(); insert_initial_writes_for_batch(storage, batch_number).await; } } @@ -854,6 +939,11 @@ async fn remove_l1_batches( .delete_initial_writes(last_l1_batch_to_keep) .await .unwrap(); + storage + .vm_runner_dal() + .delete_protective_reads(last_l1_batch_to_keep) + .await + .unwrap(); batch_headers } @@ -945,9 +1035,12 @@ async fn deduplication_works_as_expected() { } } -#[test_casing(3, [3, 5, 8])] +#[test_casing(6, Product(([3, 5, 8], [false, true])))] #[tokio::test] -async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) { +async fn l1_batch_divergence_entire_workflow( + last_common_l1_batch: u32, + sealed_protective_reads: bool, +) { const INITIAL_BATCH_COUNT: usize = 10; assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT); @@ -955,7 +1048,8 @@ async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) { let pool = ConnectionPool::::test_pool().await; let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; reset_db_state(&pool, INITIAL_BATCH_COUNT).await; run_calculator(calculator).await; @@ -964,9 +1058,10 @@ async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) { // Extend the state with new L1 batches. let logs = gen_storage_logs(100..200, 5); extend_db_state(&mut storage, logs).await; - let expected_root_hash = expected_tree_hash(&pool).await; + let expected_root_hash = expected_tree_hash(&pool, sealed_protective_reads).await; - let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let calculator = + setup_lightweight_calculator(temp_dir.path(), pool.clone(), sealed_protective_reads).await; let final_root_hash = run_calculator(calculator).await; assert_eq!(final_root_hash, expected_root_hash); } diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index bfb6ad1912a0..4878ab381a07 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -5,6 +5,7 @@ use std::{ops, sync::Arc, time::Instant}; use anyhow::Context as _; use futures::{future, FutureExt}; use tokio::sync::watch; +use zksync_config::configs::database::MerkleTreeMode; use zksync_dal::{helpers::wait_for_l1_batch, Connection, ConnectionPool, Core, CoreDal}; use zksync_merkle_tree::domain::TreeMetadata; use zksync_object_store::ObjectStore; @@ -24,6 +25,7 @@ pub(super) struct TreeUpdater { tree: AsyncTree, max_l1_batches_per_iter: usize, object_store: Option>, + sealed_batches_have_protective_reads: bool, } impl TreeUpdater { @@ -31,11 +33,13 @@ impl TreeUpdater { tree: AsyncTree, max_l1_batches_per_iter: usize, object_store: Option>, + sealed_batches_have_protective_reads: bool, ) -> Self { Self { tree, max_l1_batches_per_iter, object_store, + sealed_batches_have_protective_reads, } } @@ -184,28 +188,40 @@ impl TreeUpdater { async fn step( &mut self, mut storage: Connection<'_, Core>, - next_l1_batch_to_seal: &mut L1BatchNumber, + next_l1_batch_to_process: &mut L1BatchNumber, ) -> anyhow::Result<()> { - let Some(last_sealed_l1_batch) = storage - .blocks_dal() - .get_sealed_l1_batch_number() - .await - .context("failed loading sealed L1 batch number")? - else { - tracing::trace!("No L1 batches to seal: Postgres storage is empty"); - return Ok(()); + let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight + || self.sealed_batches_have_protective_reads + { + let Some(last_sealed_l1_batch) = storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .context("failed loading sealed L1 batch number")? + else { + tracing::trace!("No L1 batches to seal: Postgres storage is empty"); + return Ok(()); + }; + last_sealed_l1_batch + } else { + storage + .vm_runner_dal() + .get_protective_reads_latest_processed_batch(L1BatchNumber(0)) + .await + .context("failed loading latest L1 batch number with protective reads")? }; let last_requested_l1_batch = - next_l1_batch_to_seal.0 + self.max_l1_batches_per_iter as u32 - 1; - let last_requested_l1_batch = last_requested_l1_batch.min(last_sealed_l1_batch.0); - let l1_batch_numbers = next_l1_batch_to_seal.0..=last_requested_l1_batch; + next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1; + let last_requested_l1_batch = + last_requested_l1_batch.min(last_l1_batch_with_protective_reads.0); + let l1_batch_numbers = next_l1_batch_to_process.0..=last_requested_l1_batch; if l1_batch_numbers.is_empty() { tracing::trace!( - "No L1 batches to seal: batch numbers range to be loaded {l1_batch_numbers:?} is empty" + "No L1 batches to process: batch numbers range to be loaded {l1_batch_numbers:?} is empty" ); } else { tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}"); - *next_l1_batch_to_seal = self + *next_l1_batch_to_process = self .process_multiple_batches(&mut storage, l1_batch_numbers) .await?; } @@ -220,10 +236,10 @@ impl TreeUpdater { mut stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { let tree = &mut self.tree; - let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); + let mut next_l1_batch_to_process = tree.next_l1_batch_number(); tracing::info!( "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ - Next L1 batch for Merkle tree: {next_l1_batch_to_seal}", + Next L1 batch for Merkle tree: {next_l1_batch_to_process}", max_batches_per_iter = self.max_l1_batches_per_iter ); @@ -234,17 +250,17 @@ impl TreeUpdater { } let storage = pool.connection_tagged("metadata_calculator").await?; - let snapshot = *next_l1_batch_to_seal; - self.step(storage, &mut next_l1_batch_to_seal).await?; - let delay = if snapshot == *next_l1_batch_to_seal { + let snapshot = *next_l1_batch_to_process; + self.step(storage, &mut next_l1_batch_to_process).await?; + let delay = if snapshot == *next_l1_batch_to_process { tracing::trace!( - "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) \ + "Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \ didn't make any progress; delaying it using {delayer:?}" ); delayer.wait(&self.tree).left_future() } else { tracing::trace!( - "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) made progress from #{snapshot}" + "Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) made progress from #{snapshot}" ); future::ready(()).right_future() }; @@ -394,9 +410,12 @@ impl AsyncTree { let mut storage = pool.connection_tagged("metadata_calculator").await?; self.ensure_genesis(&mut storage, earliest_l1_batch).await?; - let next_l1_batch_to_seal = self.next_l1_batch_number(); + let next_l1_batch_to_process = self.next_l1_batch_number(); - let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; + let current_db_batch = storage + .vm_runner_dal() + .get_protective_reads_latest_processed_batch(L1BatchNumber(0)) + .await?; let last_l1_batch_with_tree_data = storage .blocks_dal() .get_last_l1_batch_number_with_tree_data() @@ -404,7 +423,7 @@ impl AsyncTree { drop(storage); tracing::info!( - "Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ + "Next L1 batch for Merkle tree: {next_l1_batch_to_process}, current Postgres L1 batch: {current_db_batch:?}, \ last L1 batch with metadata: {last_l1_batch_with_tree_data:?}" ); @@ -413,18 +432,18 @@ impl AsyncTree { // responsible for their appearance!), but fortunately most of the updater doesn't depend on it. if let Some(last_l1_batch_with_tree_data) = last_l1_batch_with_tree_data { let backup_lag = - (last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_seal.0); + (last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_process.0); METRICS.backup_lag.set(backup_lag.into()); - if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 { + if next_l1_batch_to_process > last_l1_batch_with_tree_data + 1 { tracing::warn!( - "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ + "Next L1 batch of the tree ({next_l1_batch_to_process}) is greater than last L1 batch with metadata in Postgres \ ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ Truncating Merkle tree versions so that this mismatch is fixed..." ); self.roll_back_logs(last_l1_batch_with_tree_data)?; self.save().await?; - tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); + tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_process}"); } self.ensure_no_l1_batch_divergence(pool).await?; diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index 4c0ef626927c..fe111155d829 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -135,9 +135,11 @@ impl MainNodeBuilder { fn add_metadata_calculator_layer(mut self) -> anyhow::Result { let merkle_tree_env_config = DBConfig::from_env()?.merkle_tree; let operations_manager_env_config = OperationsManagerConfig::from_env()?; + let state_keeper_env_config = StateKeeperConfig::from_env()?; let metadata_calculator_config = MetadataCalculatorConfig::for_main_node( &merkle_tree_env_config, &operations_manager_env_config, + &state_keeper_env_config, ); self.node .add_layer(MetadataCalculatorLayer::new(metadata_calculator_config));