From 543f9e9397915e893d7b747ceccd9b76f9d571aa Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 23 Apr 2024 23:03:24 +0500 Subject: [PATCH] feat(vm-runner): implement VM runner storage layer (#1651) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Adds an abstraction that can load unprocessed batches as well as implement `ReadStorageFactory`. Implementation largely reuses the existing primitives from state keeper and they had to be generalized slightly. ## Why ❔ One of the components for the upcoming VM runner ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. --- Cargo.lock | 16 + Cargo.toml | 5 +- checks-config/era.dic | 1 + ...59ff8d1eef6198390b84127a50c8f460fd2de.json | 22 + ...8f6f962ff3f07aa1cf7f0ffcefffccb633cdd.json | 23 + ...e4e7cd09ca6a5f2b5cac5b130f1476214f403.json | 26 + core/lib/dal/src/factory_deps_dal.rs | 19 + core/lib/dal/src/storage_logs_dedup_dal.rs | 26 + core/lib/dal/src/transactions_dal.rs | 34 +- core/lib/state/Cargo.toml | 3 + core/lib/state/src/catchup.rs | 80 +++ core/lib/state/src/lib.rs | 4 + core/lib/state/src/postgres/mod.rs | 5 +- core/lib/state/src/rocksdb/mod.rs | 35 +- core/lib/state/src/rocksdb/recovery.rs | 2 +- core/lib/state/src/rocksdb/tests.rs | 32 +- core/lib/state/src/storage_factory.rs | 232 +++++++++ core/lib/types/src/block.rs | 2 +- core/lib/zksync_core/Cargo.toml | 3 +- core/lib/zksync_core/src/lib.rs | 1 + .../batch_executor/main_executor.rs | 7 +- .../tests/read_storage_factory.rs | 14 +- .../batch_executor/tests/tester.rs | 2 +- core/lib/zksync_core/src/state_keeper/mod.rs | 2 +- .../src/state_keeper/state_keeper_storage.rs | 207 +------- core/lib/zksync_core/src/vm_runner/mod.rs | 6 + core/lib/zksync_core/src/vm_runner/storage.rs | 406 +++++++++++++++ .../zksync_core/src/vm_runner/tests/mod.rs | 483 ++++++++++++++++++ .../state_keeper/main_batch_executor.rs | 3 +- prover/Cargo.lock | 12 + 30 files changed, 1481 insertions(+), 232 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-0d6916c4bd6ef223f921723642059ff8d1eef6198390b84127a50c8f460fd2de.json create mode 100644 core/lib/dal/.sqlx/query-443b5c62c2c274369764ac5279d8f6f962ff3f07aa1cf7f0ffcefffccb633cdd.json create mode 100644 core/lib/dal/.sqlx/query-9615d45082a848792bca181a3b4e4e7cd09ca6a5f2b5cac5b130f1476214f403.json create mode 100644 core/lib/state/src/catchup.rs create mode 100644 core/lib/state/src/storage_factory.rs create mode 100644 core/lib/zksync_core/src/vm_runner/mod.rs create mode 100644 core/lib/zksync_core/src/vm_runner/storage.rs create mode 100644 core/lib/zksync_core/src/vm_runner/tests/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 8b7359a6d3f1..54bb4af27a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" +dependencies = [ + "fastrand", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -8415,6 +8427,7 @@ dependencies = [ "assert_matches", "async-trait", "axum", + "backon", "bitflags 1.3.2", "chrono", "ctrlc", @@ -8982,9 +8995,11 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_matches", + "async-trait", "chrono", "itertools 0.10.5", "mini-moka", + "once_cell", "rand 0.8.5", "tempfile", "test-casing", @@ -8992,6 +9007,7 @@ dependencies = [ "tracing", "vise", "zksync_dal", + "zksync_shared_metrics", "zksync_storage", "zksync_types", "zksync_utils", diff --git a/Cargo.toml b/Cargo.toml index 622b711ddb24..fdfb70086a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ anyhow = "1" assert_matches = "1.5" async-trait = "0.1" axum = "0.6.19" +backon = "0.4.4" bigdecimal = "0.3.0" bincode = "1" bitflags = "1.3.2" @@ -226,6 +227,6 @@ zksync_crypto_primitives = { path = "core/lib/crypto_primitives" } zksync_node_framework = { path = "core/node/node_framework" } zksync_eth_watch = { path = "core/node/eth_watch" } zksync_shared_metrics = { path = "core/node/shared_metrics" } -zksync_block_reverter = { path = "core/node/block_reverter"} +zksync_block_reverter = { path = "core/node/block_reverter" } zksync_commitment_generator = { path = "core/node/commitment_generator" } -zksync_house_keeper = { path = "core/node/house_keeper" } +zksync_house_keeper = { path = "core/node/house_keeper" } diff --git a/checks-config/era.dic b/checks-config/era.dic index 51b1ab017ef0..66ecf0a86809 100644 --- a/checks-config/era.dic +++ b/checks-config/era.dic @@ -943,3 +943,4 @@ superset 80M 780kb hyperchain +storages diff --git a/core/lib/dal/.sqlx/query-0d6916c4bd6ef223f921723642059ff8d1eef6198390b84127a50c8f460fd2de.json b/core/lib/dal/.sqlx/query-0d6916c4bd6ef223f921723642059ff8d1eef6198390b84127a50c8f460fd2de.json new file mode 100644 index 000000000000..a2f4973c86dd --- /dev/null +++ b/core/lib/dal/.sqlx/query-0d6916c4bd6ef223f921723642059ff8d1eef6198390b84127a50c8f460fd2de.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n number\n FROM\n miniblocks\n WHERE\n miniblocks.l1_batch_number = $1\n AND l1_tx_count = 0\n AND l2_tx_count = 0\n ORDER BY\n number\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "0d6916c4bd6ef223f921723642059ff8d1eef6198390b84127a50c8f460fd2de" +} diff --git a/core/lib/dal/.sqlx/query-443b5c62c2c274369764ac5279d8f6f962ff3f07aa1cf7f0ffcefffccb633cdd.json b/core/lib/dal/.sqlx/query-443b5c62c2c274369764ac5279d8f6f962ff3f07aa1cf7f0ffcefffccb633cdd.json new file mode 100644 index 000000000000..3e2a8752dd2c --- /dev/null +++ b/core/lib/dal/.sqlx/query-443b5c62c2c274369764ac5279d8f6f962ff3f07aa1cf7f0ffcefffccb633cdd.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n INDEX\n FROM\n initial_writes\n WHERE\n hashed_key = $1\n AND l1_batch_number <= $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "index", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "443b5c62c2c274369764ac5279d8f6f962ff3f07aa1cf7f0ffcefffccb633cdd" +} diff --git a/core/lib/dal/.sqlx/query-9615d45082a848792bca181a3b4e4e7cd09ca6a5f2b5cac5b130f1476214f403.json b/core/lib/dal/.sqlx/query-9615d45082a848792bca181a3b4e4e7cd09ca6a5f2b5cac5b130f1476214f403.json new file mode 100644 index 000000000000..0a05683acf81 --- /dev/null +++ b/core/lib/dal/.sqlx/query-9615d45082a848792bca181a3b4e4e7cd09ca6a5f2b5cac5b130f1476214f403.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bytecode,\n bytecode_hash\n FROM\n factory_deps\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bytecode", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "bytecode_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "9615d45082a848792bca181a3b4e4e7cd09ca6a5f2b5cac5b130f1476214f403" +} diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index ad0c0c7c1312..b6294026005b 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -180,4 +180,23 @@ impl FactoryDepsDal<'_, '_> { .await?; Ok(()) } + + /// Retrieves all factory deps entries for testing purposes. + pub async fn dump_all_factory_deps_for_tests(&mut self) -> HashMap> { + sqlx::query!( + r#" + SELECT + bytecode, + bytecode_hash + FROM + factory_deps + "# + ) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode)) + .collect() + } } diff --git a/core/lib/dal/src/storage_logs_dedup_dal.rs b/core/lib/dal/src/storage_logs_dedup_dal.rs index f204df1da29f..6304869a8261 100644 --- a/core/lib/dal/src/storage_logs_dedup_dal.rs +++ b/core/lib/dal/src/storage_logs_dedup_dal.rs @@ -221,6 +221,32 @@ impl StorageLogsDedupDal<'_, '_> { .map(|row| row.index as u64)) } + pub async fn get_enumeration_index_in_l1_batch( + &mut self, + hashed_key: H256, + l1_batch_number: L1BatchNumber, + ) -> DalResult> { + Ok(sqlx::query!( + r#" + SELECT + INDEX + FROM + initial_writes + WHERE + hashed_key = $1 + AND l1_batch_number <= $2 + "#, + hashed_key.as_bytes(), + l1_batch_number.0 as i32, + ) + .instrument("get_enumeration_index_in_l1_batch") + .with_arg("hashed_key", &hashed_key) + .with_arg("l1_batch_number", &l1_batch_number) + .fetch_optional(self.storage) + .await? + .map(|row| row.index as u64)) + } + /// Returns `hashed_keys` that are both present in the input and in `initial_writes` table. pub async fn filter_written_slots(&mut self, hashed_keys: &[H256]) -> DalResult> { let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect(); diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index 5da65b297ce7..23c162d24c51 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -1252,7 +1252,8 @@ impl TransactionsDal<'_, '_> { .fetch_all(self.storage) .await?; - self.map_transactions_to_execution_data(transactions).await + self.map_transactions_to_execution_data(transactions, None) + .await } /// Returns L2 blocks with their transactions to be used in VM execution. @@ -1282,14 +1283,37 @@ impl TransactionsDal<'_, '_> { .fetch_all(self.storage) .await?; - self.map_transactions_to_execution_data(transactions).await + let fictive_l2_block = sqlx::query!( + r#" + SELECT + number + FROM + miniblocks + WHERE + miniblocks.l1_batch_number = $1 + AND l1_tx_count = 0 + AND l2_tx_count = 0 + ORDER BY + number + "#, + i64::from(l1_batch_number.0) + ) + .instrument("get_l2_blocks_to_execute_for_l1_batch#fictive_l2_block") + .with_arg("l1_batch_number", &l1_batch_number) + .fetch_optional(self.storage) + .await? + .map(|row| L2BlockNumber(row.number as u32)); + + self.map_transactions_to_execution_data(transactions, fictive_l2_block) + .await } async fn map_transactions_to_execution_data( &mut self, transactions: Vec, + fictive_l2_block: Option, ) -> DalResult> { - let transactions_by_l2_block: Vec<(L2BlockNumber, Vec)> = transactions + let mut transactions_by_l2_block: Vec<(L2BlockNumber, Vec)> = transactions .into_iter() .group_by(|tx| tx.miniblock_number.unwrap()) .into_iter() @@ -1300,6 +1324,10 @@ impl TransactionsDal<'_, '_> { ) }) .collect(); + // Fictive L2 block is always at the end of a batch so it is safe to append it + if let Some(fictive_l2_block) = fictive_l2_block { + transactions_by_l2_block.push((fictive_l2_block, Vec::new())); + } if transactions_by_l2_block.is_empty() { return Ok(Vec::new()); } diff --git a/core/lib/state/Cargo.toml b/core/lib/state/Cargo.toml index b461b2704bce..fd1742788eff 100644 --- a/core/lib/state/Cargo.toml +++ b/core/lib/state/Cargo.toml @@ -14,14 +14,17 @@ vise.workspace = true zksync_dal.workspace = true zksync_types.workspace = true zksync_utils.workspace = true +zksync_shared_metrics.workspace = true zksync_storage.workspace = true anyhow.workspace = true +async-trait.workspace = true mini-moka.workspace = true tokio = { workspace = true, features = ["rt"] } tracing.workspace = true itertools.workspace = true chrono.workspace = true +once_cell.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/core/lib/state/src/catchup.rs b/core/lib/state/src/catchup.rs new file mode 100644 index 000000000000..60c1b03f974e --- /dev/null +++ b/core/lib/state/src/catchup.rs @@ -0,0 +1,80 @@ +use std::{sync::Arc, time::Instant}; + +use anyhow::Context; +use once_cell::sync::OnceCell; +use tokio::sync::watch; +use zksync_dal::{ConnectionPool, Core}; +use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; +use zksync_storage::RocksDB; +use zksync_types::L1BatchNumber; + +use crate::{RocksdbStorage, StateKeeperColumnFamily}; + +/// A runnable task that blocks until the provided RocksDB cache instance is caught up with +/// Postgres. +/// +/// See [`ReadStorageFactory`] for more context. +#[derive(Debug)] +pub struct AsyncCatchupTask { + pool: ConnectionPool, + state_keeper_db_path: String, + rocksdb_cell: Arc>>, + to_l1_batch_number: Option, +} + +impl AsyncCatchupTask { + /// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally + /// accepts the last L1 batch number to catch up to (defaults to latest if not specified). + pub fn new( + pool: ConnectionPool, + state_keeper_db_path: String, + rocksdb_cell: Arc>>, + to_l1_batch_number: Option, + ) -> Self { + Self { + pool, + state_keeper_db_path, + rocksdb_cell, + to_l1_batch_number, + } + } + + /// Block until RocksDB cache instance is caught up with Postgres. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let started_at = Instant::now(); + tracing::debug!("Catching up RocksDB asynchronously"); + + let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) + .await + .context("Failed creating RocksDB storage builder")?; + let mut connection = self.pool.connection().await?; + let was_recovered_from_snapshot = rocksdb_builder + .ensure_ready(&mut connection, &stop_receiver) + .await + .context("failed initializing state keeper RocksDB from snapshot or scratch")?; + if was_recovered_from_snapshot { + let elapsed = started_at.elapsed(); + APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache] + .set(elapsed); + tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}"); + } + + let rocksdb = rocksdb_builder + .synchronize(&mut connection, &stop_receiver, self.to_l1_batch_number) + .await + .context("Failed to catch up RocksDB to Postgres")?; + drop(connection); + if let Some(rocksdb) = rocksdb { + self.rocksdb_cell + .set(rocksdb.into_rocksdb()) + .map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?; + } else { + tracing::info!("Synchronizing RocksDB interrupted"); + } + Ok(()) + } +} diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index b40c4fabf0a8..cca1570b22b1 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -18,10 +18,12 @@ use zksync_types::{ }; mod cache; +mod catchup; mod in_memory; mod postgres; mod rocksdb; mod shadow_storage; +mod storage_factory; mod storage_view; #[cfg(test)] mod test_utils; @@ -29,10 +31,12 @@ mod witness; pub use self::{ cache::sequential_cache::SequentialCache, + catchup::AsyncCatchupTask, in_memory::InMemoryStorage, postgres::{PostgresStorage, PostgresStorageCaches, PostgresStorageCachesTask}, rocksdb::{RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}, shadow_storage::ShadowStorage, + storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory}, storage_view::{StorageView, StorageViewMetrics}, witness::WitnessStorage, }; diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index 5cac86a41d49..17163af0d56f 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -592,7 +592,10 @@ impl ReadStorage for PostgresStorage<'_> { let mut dal = self.connection.storage_logs_dedup_dal(); let value = self .rt_handle - .block_on(dal.get_enumeration_index_for_key(key.hashed_key())); + .block_on(dal.get_enumeration_index_in_l1_batch( + key.hashed_key(), + self.l1_batch_number_for_l2_block, + )); value.expect("failed getting enumeration index for key") } } diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 8971d430b687..3f4f2b47c563 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -125,7 +125,7 @@ impl From for RocksdbSyncError { } /// [`ReadStorage`] implementation backed by RocksDB. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RocksdbStorage { db: RocksDB, pending_patch: InMemoryStorage, @@ -202,9 +202,13 @@ impl RocksdbStorageBuilder { self, storage: &mut Connection<'_, Core>, stop_receiver: &watch::Receiver, + to_l1_batch_number: Option, ) -> anyhow::Result> { let mut inner = self.0; - match inner.update_from_postgres(storage, stop_receiver).await { + match inner + .update_from_postgres(storage, stop_receiver, to_l1_batch_number) + .await + { Ok(()) => Ok(Some(inner)), Err(RocksdbSyncError::Interrupted) => Ok(None), Err(RocksdbSyncError::Internal(err)) => Err(err), @@ -268,6 +272,7 @@ impl RocksdbStorage { &mut self, storage: &mut Connection<'_, Core>, stop_receiver: &watch::Receiver, + to_l1_batch_number: Option, ) -> Result<(), RocksdbSyncError> { let (_, mut current_l1_batch_number) = self .ensure_ready(storage, Self::DESIRED_LOG_CHUNK_SIZE, stop_receiver) @@ -283,21 +288,33 @@ impl RocksdbStorage { // No L1 batches are persisted in Postgres; update is not necessary. return Ok(()); }; - tracing::debug!("Loading storage for l1 batch number {latest_l1_batch_number}"); + let to_l1_batch_number = if let Some(to_l1_batch_number) = to_l1_batch_number { + if to_l1_batch_number > latest_l1_batch_number { + let err = anyhow::anyhow!( + "Requested to update RocksDB to L1 batch number ({current_l1_batch_number}) that \ + is greater than the last sealed L1 batch number in Postgres ({latest_l1_batch_number})" + ); + return Err(err.into()); + } + to_l1_batch_number + } else { + latest_l1_batch_number + }; + tracing::debug!("Loading storage for l1 batch number {to_l1_batch_number}"); - if current_l1_batch_number > latest_l1_batch_number + 1 { + if current_l1_batch_number > to_l1_batch_number + 1 { let err = anyhow::anyhow!( "L1 batch number in state keeper cache ({current_l1_batch_number}) is greater than \ - the last sealed L1 batch number in Postgres ({latest_l1_batch_number})" + the requested batch number ({to_l1_batch_number})" ); return Err(err.into()); } - while current_l1_batch_number <= latest_l1_batch_number { + while current_l1_batch_number <= to_l1_batch_number { if *stop_receiver.borrow() { return Err(RocksdbSyncError::Interrupted); } - let current_lag = latest_l1_batch_number.0 - current_l1_batch_number.0 + 1; + let current_lag = to_l1_batch_number.0 - current_l1_batch_number.0 + 1; METRICS.lag.set(current_lag.into()); tracing::debug!("Loading state changes for l1 batch {current_l1_batch_number}"); @@ -323,7 +340,7 @@ impl RocksdbStorage { .await .with_context(|| format!("failed saving L1 batch #{current_l1_batch_number}"))?; #[cfg(test)] - (self.listener.on_l1_batch_synced)(current_l1_batch_number - 1); + (self.listener.on_l1_batch_synced.write().await)(current_l1_batch_number - 1); } latency.observe(); @@ -331,7 +348,7 @@ impl RocksdbStorage { let estimated_size = self.estimated_map_size(); METRICS.size.set(estimated_size); tracing::info!( - "Secondary storage for L1 batch #{latest_l1_batch_number} initialized, size is {estimated_size}" + "Secondary storage for L1 batch #{to_l1_batch_number} initialized, size is {estimated_size}" ); Ok(()) diff --git a/core/lib/state/src/rocksdb/recovery.rs b/core/lib/state/src/rocksdb/recovery.rs index 742183a31675..7a268564c60a 100644 --- a/core/lib/state/src/rocksdb/recovery.rs +++ b/core/lib/state/src/rocksdb/recovery.rs @@ -134,7 +134,7 @@ impl RocksdbStorage { })?; #[cfg(test)] - (self.listener.on_logs_chunk_recovered)(chunk_id); + (self.listener.on_logs_chunk_recovered.write().await)(chunk_id); } RECOVERY_METRICS.recovered_chunk_count.inc_by(1); } diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index d2ccd239fc10..c95389079345 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -1,10 +1,11 @@ //! Tests for [`RocksdbStorage`]. -use std::fmt; +use std::{fmt, sync::Arc}; use assert_matches::assert_matches; use tempfile::TempDir; use test_casing::test_casing; +use tokio::sync::RwLock; use zksync_dal::{ConnectionPool, Core}; use zksync_types::{L2BlockNumber, StorageLog}; @@ -14,11 +15,12 @@ use crate::test_utils::{ prepare_postgres_for_snapshot_recovery, }; +#[derive(Clone)] pub(super) struct RocksdbStorageEventListener { /// Called when an L1 batch is synced. - pub on_l1_batch_synced: Box, + pub on_l1_batch_synced: Arc>, /// Called when an storage logs chunk is recovered from a snapshot. - pub on_logs_chunk_recovered: Box, + pub on_logs_chunk_recovered: Arc>, } impl fmt::Debug for RocksdbStorageEventListener { @@ -32,8 +34,8 @@ impl fmt::Debug for RocksdbStorageEventListener { impl Default for RocksdbStorageEventListener { fn default() -> Self { Self { - on_l1_batch_synced: Box::new(|_| { /* do nothing */ }), - on_logs_chunk_recovered: Box::new(|_| { /* do nothing */ }), + on_l1_batch_synced: Arc::new(RwLock::new(|_| { /* do nothing */ })), + on_logs_chunk_recovered: Arc::new(RwLock::new(|_| { /* do nothing */ })), } } } @@ -82,7 +84,7 @@ async fn sync_test_storage(dir: &TempDir, conn: &mut Connection<'_, Core>) -> Ro .await .expect("Failed initializing RocksDB"); builder - .synchronize(conn, &stop_receiver) + .synchronize(conn, &stop_receiver, None) .await .unwrap() .expect("Storage synchronization unexpectedly stopped") @@ -100,7 +102,7 @@ async fn sync_test_storage_and_check_recovery( let was_recovered = builder.ensure_ready(conn, &stop_receiver).await.unwrap(); assert_eq!(was_recovered, expect_recovery); builder - .synchronize(conn, &stop_receiver) + .synchronize(conn, &stop_receiver, None) .await .unwrap() .expect("Storage synchronization unexpectedly stopped") @@ -142,15 +144,15 @@ async fn rocksdb_storage_syncing_fault_tolerance() { .await .expect("Failed initializing RocksDB"); let mut expected_l1_batch_number = L1BatchNumber(0); - storage.0.listener.on_l1_batch_synced = Box::new(move |number| { + storage.0.listener.on_l1_batch_synced = Arc::new(RwLock::new(move |number| { assert_eq!(number, expected_l1_batch_number); expected_l1_batch_number += 1; if number == L1BatchNumber(2) { stop_sender.send_replace(true); } - }); + })); let storage = storage - .synchronize(&mut conn, &stop_receiver) + .synchronize(&mut conn, &stop_receiver, None) .await .unwrap(); assert!(storage.is_none()); @@ -163,7 +165,7 @@ async fn rocksdb_storage_syncing_fault_tolerance() { let (_stop_sender, stop_receiver) = watch::channel(false); let mut storage = storage - .synchronize(&mut conn, &stop_receiver) + .synchronize(&mut conn, &stop_receiver, None) .await .unwrap() .expect("Storage synchronization unexpectedly stopped"); @@ -400,13 +402,13 @@ async fn recovery_fault_tolerance() { let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); let (stop_sender, stop_receiver) = watch::channel(false); let mut synced_chunk_count = 0_u64; - storage.listener.on_logs_chunk_recovered = Box::new(move |chunk_id| { + storage.listener.on_logs_chunk_recovered = Arc::new(RwLock::new(move |chunk_id| { assert_eq!(chunk_id, synced_chunk_count); synced_chunk_count += 1; if synced_chunk_count == 2 { stop_sender.send_replace(true); } - }); + })); let err = storage .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) @@ -418,9 +420,9 @@ async fn recovery_fault_tolerance() { // Resume recovery and check that no chunks are recovered twice. let (_stop_sender, stop_receiver) = watch::channel(false); let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); - storage.listener.on_logs_chunk_recovered = Box::new(|chunk_id| { + storage.listener.on_logs_chunk_recovered = Arc::new(RwLock::new(|chunk_id| { assert!(chunk_id >= 2); - }); + })); storage .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) .await diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs new file mode 100644 index 000000000000..625867b82c45 --- /dev/null +++ b/core/lib/state/src/storage_factory.rs @@ -0,0 +1,232 @@ +use std::{collections::HashMap, fmt::Debug}; + +use anyhow::Context as _; +use async_trait::async_trait; +use tokio::{runtime::Handle, sync::watch}; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_storage::RocksDB; +use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; + +use crate::{ + PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily, +}; + +/// Factory that can produce a [`ReadStorage`] implementation on demand. +#[async_trait] +pub trait ReadStorageFactory: Debug + Send + Sync + 'static { + /// Creates a [`PgOrRocksdbStorage`] entity over either a Postgres connection or RocksDB + /// instance. The specific criteria on which one are left up to the implementation. + /// + /// The idea is that in either case this provides a valid [`ReadStorage`] implementation + /// that can be used by the caller. + async fn access_storage( + &self, + stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>>; +} + +/// DB difference introduced by one batch. +#[derive(Debug, Clone)] +pub struct BatchDiff { + /// Storage slots touched by this batch along with new values there. + pub state_diff: HashMap, + /// Initial write indices introduced by this batch. + pub enum_index_diff: HashMap, + /// Factory dependencies introduced by this batch. + pub factory_dep_diff: HashMap>, +} + +/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to +/// `N + K`, where `K` is the number of diffs. +#[derive(Debug)] +pub struct RocksdbWithMemory { + /// RocksDB cache instance caught up to batch `N`. + pub rocksdb: RocksdbStorage, + /// Diffs for batches `N + 1` to `N + K`. + pub batch_diffs: Vec, +} + +/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] +/// underneath. +#[derive(Debug)] +pub enum PgOrRocksdbStorage<'a> { + /// Implementation over a Postgres connection. + Postgres(PostgresStorage<'a>), + /// Implementation over a RocksDB cache instance. + Rocksdb(RocksdbStorage), + /// Implementation over a RocksDB cache instance with in-memory DB diffs. + RocksdbWithMemory(RocksdbWithMemory), +} + +impl<'a> PgOrRocksdbStorage<'a> { + /// Returns a [`ReadStorage`] implementation backed by Postgres + /// + /// # Errors + /// + /// Propagates Postgres errors. + pub async fn access_storage_pg( + pool: &'a ConnectionPool, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let mut connection = pool.connection().await?; + let l2_block_number = if let Some((_, l2_block_number)) = connection + .blocks_dal() + .get_l2_block_range_of_l1_batch(l1_batch_number) + .await? + { + l2_block_number + } else { + tracing::info!("Could not find latest sealed L2 block, loading from snapshot"); + let snapshot_recovery = connection + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await? + .context("Could not find snapshot, no state available")?; + if snapshot_recovery.l1_batch_number != l1_batch_number { + anyhow::bail!( + "Snapshot contains L1 batch #{} while #{} was expected", + snapshot_recovery.l1_batch_number, + l1_batch_number + ); + } + snapshot_recovery.l2_block_number + }; + tracing::debug!(%l1_batch_number, %l2_block_number, "Using Postgres-based storage"); + Ok( + PostgresStorage::new_async(Handle::current(), connection, l2_block_number, true) + .await? + .into(), + ) + } + + /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and + /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. + /// + /// # Errors + /// + /// Propagates RocksDB and Postgres errors. + pub async fn access_storage_rocksdb( + connection: &mut Connection<'_, Core>, + rocksdb: RocksDB, + stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>> { + tracing::debug!("Catching up RocksDB synchronously"); + let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb); + let rocksdb = rocksdb_builder + .synchronize(connection, stop_receiver, None) + .await + .context("Failed to catch up state keeper RocksDB storage to Postgres")?; + let Some(rocksdb) = rocksdb else { + tracing::info!("Synchronizing RocksDB interrupted"); + return Ok(None); + }; + let rocksdb_l1_batch_number = rocksdb + .l1_batch_number() + .await + .ok_or_else(|| anyhow::anyhow!("No L1 batches available in Postgres"))?; + if l1_batch_number + 1 != rocksdb_l1_batch_number { + anyhow::bail!( + "RocksDB synchronized to L1 batch #{} while #{} was expected", + rocksdb_l1_batch_number, + l1_batch_number + ); + } + tracing::debug!(%rocksdb_l1_batch_number, "Using RocksDB-based storage"); + Ok(Some(rocksdb.into())) + } +} + +impl ReadStorage for RocksdbWithMemory { + fn read_value(&mut self, key: &StorageKey) -> StorageValue { + match self + .batch_diffs + .iter() + .rev() + .find_map(|b| b.state_diff.get(key)) + { + None => self.rocksdb.read_value(key), + Some(value) => *value, + } + } + + fn is_write_initial(&mut self, key: &StorageKey) -> bool { + match self + .batch_diffs + .iter() + .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) + { + None => self.rocksdb.is_write_initial(key), + Some(_) => false, + } + } + + fn load_factory_dep(&mut self, hash: H256) -> Option> { + match self + .batch_diffs + .iter() + .find_map(|b| b.factory_dep_diff.get(&hash)) + { + None => self.rocksdb.load_factory_dep(hash), + Some(value) => Some(value.clone()), + } + } + + fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { + match self + .batch_diffs + .iter() + .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) + { + None => self.rocksdb.get_enumeration_index(key), + Some(value) => Some(*value), + } + } +} + +impl ReadStorage for PgOrRocksdbStorage<'_> { + fn read_value(&mut self, key: &StorageKey) -> zksync_types::StorageValue { + match self { + Self::Postgres(postgres) => postgres.read_value(key), + Self::Rocksdb(rocksdb) => rocksdb.read_value(key), + Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.read_value(key), + } + } + + fn is_write_initial(&mut self, key: &StorageKey) -> bool { + match self { + Self::Postgres(postgres) => postgres.is_write_initial(key), + Self::Rocksdb(rocksdb) => rocksdb.is_write_initial(key), + Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.is_write_initial(key), + } + } + + fn load_factory_dep(&mut self, hash: H256) -> Option> { + match self { + Self::Postgres(postgres) => postgres.load_factory_dep(hash), + Self::Rocksdb(rocksdb) => rocksdb.load_factory_dep(hash), + Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.load_factory_dep(hash), + } + } + + fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { + match self { + Self::Postgres(postgres) => postgres.get_enumeration_index(key), + Self::Rocksdb(rocksdb) => rocksdb.get_enumeration_index(key), + Self::RocksdbWithMemory(rocksdb_mem) => rocksdb_mem.get_enumeration_index(key), + } + } +} + +impl<'a> From> for PgOrRocksdbStorage<'a> { + fn from(value: PostgresStorage<'a>) -> Self { + Self::Postgres(value) + } +} + +impl<'a> From for PgOrRocksdbStorage<'a> { + fn from(value: RocksdbStorage) -> Self { + Self::Rocksdb(value) + } +} diff --git a/core/lib/types/src/block.rs b/core/lib/types/src/block.rs index 22ddfa5f2498..3cf09f9645f1 100644 --- a/core/lib/types/src/block.rs +++ b/core/lib/types/src/block.rs @@ -93,7 +93,7 @@ pub struct StorageOracleInfo { } /// Data needed to execute an L2 block in the VM. -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct L2BlockExecutionData { pub number: L2BlockNumber, pub timestamp: u64, diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index e3e05226a751..f565d6a1d56e 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -85,7 +85,7 @@ lru.workspace = true governor.workspace = true tower-http = { workspace = true, features = ["full"] } tower = { workspace = true, features = ["full"] } -axum = { workspace = true,features = [ +axum = { workspace = true, features = [ "http1", "json", "tokio", @@ -102,6 +102,7 @@ jsonrpsee.workspace = true tempfile.workspace = true test-casing.workspace = true test-log.workspace = true +backon.workspsace = true [build-dependencies] zksync_protobuf_build.workspace = true diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 680c1f3ff5ef..f1a078ac8922 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -113,6 +113,7 @@ pub mod state_keeper; pub mod sync_layer; pub mod temp_config_store; pub mod utils; +pub mod vm_runner; /// Inserts the initial information about zkSync tokens into the database. pub async fn genesis_init( diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs index 4e596f644eaf..1d0c3b203b1f 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/main_executor.rs @@ -16,14 +16,13 @@ use tokio::{ sync::{mpsc, watch}, }; use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; -use zksync_state::{ReadStorage, StorageView, WriteStorage}; +use zksync_state::{ReadStorage, ReadStorageFactory, StorageView, WriteStorage}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}; use crate::state_keeper::{ metrics::{TxExecutionStage, BATCH_TIP_METRICS, EXECUTOR_METRICS, KEEPER_METRICS}, - state_keeper_storage::ReadStorageFactory, types::ExecutionMetricsForCriteria, }; @@ -71,7 +70,9 @@ impl BatchExecutor for MainBatchExecutor { let stop_receiver = stop_receiver.clone(); let handle = tokio::task::spawn_blocking(move || { if let Some(storage) = Handle::current() - .block_on(storage_factory.access_storage(&stop_receiver)) + .block_on( + storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1), + ) .expect("failed getting access to state keeper storage") { executor.run(storage, l1_batch_params, system_env); diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/read_storage_factory.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/read_storage_factory.rs index 5bab53b634ab..f23829ed5203 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/read_storage_factory.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/read_storage_factory.rs @@ -2,12 +2,8 @@ use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; -use zksync_state::RocksdbStorage; - -use crate::state_keeper::{ - state_keeper_storage::{PgOrRocksdbStorage, ReadStorageFactory}, - AsyncRocksdbCache, -}; +use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; +use zksync_types::L1BatchNumber; #[derive(Debug, Clone)] pub struct PostgresFactory { @@ -19,9 +15,10 @@ impl ReadStorageFactory for PostgresFactory { async fn access_storage( &self, _stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { Ok(Some( - AsyncRocksdbCache::access_storage_pg(&self.pool).await?, + PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number).await?, )) } } @@ -43,6 +40,7 @@ impl ReadStorageFactory for RocksdbFactory { async fn access_storage( &self, stop_receiver: &watch::Receiver, + _l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { let builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) .await @@ -53,7 +51,7 @@ impl ReadStorageFactory for RocksdbFactory { .await .context("Failed getting a connection to Postgres")?; let Some(rocksdb_storage) = builder - .synchronize(&mut conn, stop_receiver) + .synchronize(&mut conn, stop_receiver, None) .await .context("Failed synchronizing state keeper's RocksDB to Postgres")? else { diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs index 6d70e7646dd3..b70e9dedfeb8 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs @@ -12,6 +12,7 @@ use tokio::{sync::watch, task::JoinHandle}; use zksync_config::configs::chain::StateKeeperConfig; use zksync_contracts::{get_loadnext_contract, test_contracts::LoadnextContractExecutionParams}; use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_state::ReadStorageFactory; use zksync_test_account::{Account, DeployContractsTx, TxType}; use zksync_types::{ block::L2BlockHasher, ethabi::Token, fee::Fee, snapshots::SnapshotRecoveryStatus, @@ -31,7 +32,6 @@ use crate::{ genesis::create_genesis_l1_batch, state_keeper::{ batch_executor::{BatchExecutorHandle, TxExecutionResult}, - state_keeper_storage::ReadStorageFactory, tests::{default_l1_batch_env, default_system_env, BASE_SYSTEM_CONTRACTS}, AsyncRocksdbCache, BatchExecutor, MainBatchExecutor, }, diff --git a/core/lib/zksync_core/src/state_keeper/mod.rs b/core/lib/zksync_core/src/state_keeper/mod.rs index 399b9ee39aa9..9affd5696d1a 100644 --- a/core/lib/zksync_core/src/state_keeper/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/mod.rs @@ -17,7 +17,7 @@ pub use self::{ keeper::ZkSyncStateKeeper, mempool_actor::MempoolFetcher, seal_criteria::SequencerSealer, - state_keeper_storage::{AsyncCatchupTask, AsyncRocksdbCache}, + state_keeper_storage::AsyncRocksdbCache, types::MempoolGuard, }; use crate::fee_model::BatchFeeModelInputProvider; diff --git a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs index 55be1e015f97..114f5644bd99 100644 --- a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs +++ b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs @@ -1,75 +1,15 @@ -use std::{fmt::Debug, sync::Arc, time::Instant}; +use std::{fmt::Debug, sync::Arc}; use anyhow::Context; use async_trait::async_trait; use once_cell::sync::OnceCell; -use tokio::{runtime::Handle, sync::watch}; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; +use tokio::sync::watch; +use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily, + AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, StateKeeperColumnFamily, }; use zksync_storage::RocksDB; -use zksync_types::{L1BatchNumber, L2BlockNumber}; - -/// Factory that can produce a [`ReadStorage`] implementation on demand. -#[async_trait] -pub trait ReadStorageFactory: Debug + Send + Sync + 'static { - async fn access_storage( - &self, - stop_receiver: &watch::Receiver, - ) -> anyhow::Result>>; -} - -/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] -/// underneath. -#[derive(Debug)] -pub enum PgOrRocksdbStorage<'a> { - Postgres(PostgresStorage<'a>), - Rocksdb(RocksdbStorage), -} - -impl ReadStorage for PgOrRocksdbStorage<'_> { - fn read_value(&mut self, key: &zksync_types::StorageKey) -> zksync_types::StorageValue { - match self { - Self::Postgres(postgres) => postgres.read_value(key), - Self::Rocksdb(rocksdb) => rocksdb.read_value(key), - } - } - - fn is_write_initial(&mut self, key: &zksync_types::StorageKey) -> bool { - match self { - Self::Postgres(postgres) => postgres.is_write_initial(key), - Self::Rocksdb(rocksdb) => rocksdb.is_write_initial(key), - } - } - - fn load_factory_dep(&mut self, hash: zksync_types::H256) -> Option> { - match self { - Self::Postgres(postgres) => postgres.load_factory_dep(hash), - Self::Rocksdb(rocksdb) => rocksdb.load_factory_dep(hash), - } - } - - fn get_enumeration_index(&mut self, key: &zksync_types::StorageKey) -> Option { - match self { - Self::Postgres(postgres) => postgres.get_enumeration_index(key), - Self::Rocksdb(rocksdb) => rocksdb.get_enumeration_index(key), - } - } -} - -impl<'a> From> for PgOrRocksdbStorage<'a> { - fn from(value: PostgresStorage<'a>) -> Self { - Self::Postgres(value) - } -} - -impl<'a> From for PgOrRocksdbStorage<'a> { - fn from(value: RocksdbStorage) -> Self { - Self::Rocksdb(value) - } -} +use zksync_types::L1BatchNumber; /// A [`ReadStorageFactory`] implementation that can produce short-lived [`ReadStorage`] handles /// backed by either Postgres or RocksDB (if it's caught up). Always initialized as a `Postgres` @@ -83,78 +23,10 @@ pub struct AsyncRocksdbCache { } impl AsyncRocksdbCache { - /// Load latest sealed L2 block from the latest sealed L1 batch (ignores sealed L2 blocks - /// from unsealed batches). - async fn load_latest_sealed_l2_block( - connection: &mut Connection<'_, Core>, - ) -> anyhow::Result> { - let mut dal = connection.blocks_dal(); - let Some(l1_batch_number) = dal.get_sealed_l1_batch_number().await? else { - return Ok(None); - }; - let (_, l2_block_number) = dal - .get_l2_block_range_of_l1_batch(l1_batch_number) - .await? - .context("The latest sealed L1 batch does not have a L2 block range")?; - Ok(Some((l2_block_number, l1_batch_number))) - } - - /// Returns a [`ReadStorage`] implementation backed by Postgres - pub(crate) async fn access_storage_pg( - pool: &ConnectionPool, - ) -> anyhow::Result> { - let mut connection = pool.connection().await?; - - let (l2_block_number, l1_batch_number) = - match Self::load_latest_sealed_l2_block(&mut connection).await? { - Some((l2_block_number, l1_batch_number)) => (l2_block_number, l1_batch_number), - None => { - tracing::info!("Could not find latest sealed L2 block, loading from snapshot"); - let snapshot_recovery = connection - .snapshot_recovery_dal() - .get_applied_snapshot_status() - .await? - .context("Could not find snapshot, no state available")?; - ( - snapshot_recovery.l2_block_number, - snapshot_recovery.l1_batch_number, - ) - } - }; - - tracing::debug!(%l1_batch_number, %l2_block_number, "Using Postgres-based storage"); - Ok( - PostgresStorage::new_async(Handle::current(), connection, l2_block_number, true) - .await? - .into(), - ) - } - - /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and - /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. - async fn access_storage_rocksdb<'a>( - connection: &mut Connection<'_, Core>, - rocksdb: RocksDB, - stop_receiver: &watch::Receiver, - ) -> anyhow::Result>> { - tracing::debug!("Catching up RocksDB synchronously"); - let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb); - let rocksdb = rocksdb_builder - .synchronize(connection, stop_receiver) - .await - .context("Failed to catch up state keeper RocksDB storage to Postgres")?; - let Some(rocksdb) = rocksdb else { - tracing::info!("Synchronizing RocksDB interrupted"); - return Ok(None); - }; - let rocksdb_l1_batch_number = rocksdb.l1_batch_number().await.unwrap_or_default(); - tracing::debug!(%rocksdb_l1_batch_number, "Using RocksDB-based storage"); - Ok(Some(rocksdb.into())) - } - async fn access_storage_inner( &self, stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { if let Some(rocksdb) = self.rocksdb_cell.get() { let mut connection = self @@ -162,12 +34,17 @@ impl AsyncRocksdbCache { .connection_tagged("state_keeper") .await .context("Failed getting a Postgres connection")?; - Self::access_storage_rocksdb(&mut connection, rocksdb.clone(), stop_receiver) - .await - .context("Failed accessing RocksDB storage") + PgOrRocksdbStorage::access_storage_rocksdb( + &mut connection, + rocksdb.clone(), + stop_receiver, + l1_batch_number, + ) + .await + .context("Failed accessing RocksDB storage") } else { Ok(Some( - Self::access_storage_pg(&self.pool) + PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) .await .context("Failed accessing Postgres storage")?, )) @@ -179,11 +56,12 @@ impl AsyncRocksdbCache { state_keeper_db_path: String, ) -> (Self, AsyncCatchupTask) { let rocksdb_cell = Arc::new(OnceCell::new()); - let task = AsyncCatchupTask { - pool: pool.clone(), + let task = AsyncCatchupTask::new( + pool.clone(), state_keeper_db_path, - rocksdb_cell: rocksdb_cell.clone(), - }; + rocksdb_cell.clone(), + None, + ); (Self { pool, rocksdb_cell }, task) } } @@ -193,50 +71,9 @@ impl ReadStorageFactory for AsyncRocksdbCache { async fn access_storage( &self, stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { - self.access_storage_inner(stop_receiver).await - } -} - -#[derive(Debug)] -pub struct AsyncCatchupTask { - pool: ConnectionPool, - state_keeper_db_path: String, - rocksdb_cell: Arc>>, -} - -impl AsyncCatchupTask { - pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { - let started_at = Instant::now(); - tracing::debug!("Catching up RocksDB asynchronously"); - - let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) + self.access_storage_inner(stop_receiver, l1_batch_number) .await - .context("Failed creating RocksDB storage builder")?; - let mut connection = self.pool.connection().await?; - let was_recovered_from_snapshot = rocksdb_builder - .ensure_ready(&mut connection, &stop_receiver) - .await - .context("failed initializing state keeper RocksDB from snapshot or scratch")?; - if was_recovered_from_snapshot { - let elapsed = started_at.elapsed(); - APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache] - .set(elapsed); - tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}"); - } - - let rocksdb = rocksdb_builder - .synchronize(&mut connection, &stop_receiver) - .await - .context("Failed to catch up RocksDB to Postgres")?; - drop(connection); - if let Some(rocksdb) = rocksdb { - self.rocksdb_cell - .set(rocksdb.into_rocksdb()) - .map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?; - } else { - tracing::info!("Synchronizing RocksDB interrupted"); - } - Ok(()) } } diff --git a/core/lib/zksync_core/src/vm_runner/mod.rs b/core/lib/zksync_core/src/vm_runner/mod.rs new file mode 100644 index 000000000000..01eb3541fca8 --- /dev/null +++ b/core/lib/zksync_core/src/vm_runner/mod.rs @@ -0,0 +1,6 @@ +mod storage; + +#[cfg(test)] +mod tests; + +pub use storage::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader}; diff --git a/core/lib/zksync_core/src/vm_runner/storage.rs b/core/lib/zksync_core/src/vm_runner/storage.rs new file mode 100644 index 000000000000..019d5f6e80b1 --- /dev/null +++ b/core/lib/zksync_core/src/vm_runner/storage.rs @@ -0,0 +1,406 @@ +use std::{ + collections::{BTreeMap, HashMap}, + fmt::Debug, + marker::PhantomData, + sync::Arc, + time::Duration, +}; + +use anyhow::Context as _; +use async_trait::async_trait; +use multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; +use once_cell::sync::OnceCell; +use tokio::sync::{watch, RwLock}; +use vm_utils::storage::L1BatchParamsProvider; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_state::{ + AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage, + RocksdbStorageBuilder, RocksdbWithMemory, StateKeeperColumnFamily, +}; +use zksync_storage::RocksDB; +use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; + +/// Data needed to execute an L1 batch. +#[derive(Debug, Clone)] +pub struct BatchExecuteData { + /// Parameters for L1 batch this data belongs to. + pub l1_batch_env: L1BatchEnv, + /// Execution process parameters. + pub system_env: SystemEnv, + /// List of L2 blocks and corresponding transactions that were executed within batch. + pub l2_blocks: Vec, +} + +#[derive(Debug, Clone)] +struct BatchData { + execute_data: BatchExecuteData, + diff: BatchDiff, +} + +/// Functionality to fetch data about processed/unprocessed batches for a particular VM runner +/// instance. +#[async_trait] +pub trait VmRunnerStorageLoader: Debug + Send + Sync + 'static { + /// Unique name of the VM runner instance. + fn name() -> &'static str; + + /// Returns the last L1 batch number that has been processed by this VM runner instance. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn latest_processed_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result; + + /// Returns the last L1 batch number that is ready to be loaded by this VM runner instance. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result; +} + +/// Abstraction for VM runner's storage layer that provides two main features: +/// +/// 1. A [`ReadStorageFactory`] implementation backed by either Postgres or RocksDB (if it's +/// caught up). Always initialized as a `Postgres` variant and is then mutated into `Rocksdb` +/// once RocksDB cache is caught up. +/// 2. Loads data needed to re-execute the next unprocessed L1 batch. +/// +/// Users of `VmRunnerStorage` are not supposed to retain storage access to batches that are less +/// than `L::latest_processed_batch`. Holding one is considered to be an undefined behavior. +#[derive(Debug)] +pub struct VmRunnerStorage { + pool: ConnectionPool, + l1_batch_params_provider: L1BatchParamsProvider, + chain_id: L2ChainId, + state: Arc>, + _marker: PhantomData, +} + +#[derive(Debug)] +struct State { + rocksdb: Option, + l1_batch_number: L1BatchNumber, + storage: BTreeMap, +} + +impl State { + /// Whether this state can serve as a `ReadStorage` source for the given L1 batch. + fn can_be_used_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> bool { + l1_batch_number == self.l1_batch_number || self.storage.contains_key(&l1_batch_number) + } +} + +impl VmRunnerStorage { + /// Creates a new VM runner storage using provided Postgres pool and RocksDB path. + pub async fn new( + pool: ConnectionPool, + rocksdb_path: String, + loader: L, + chain_id: L2ChainId, + ) -> anyhow::Result<(Self, StorageSyncTask)> { + let mut conn = pool.connection_tagged(L::name()).await?; + let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) + .await + .context("Failed initializing L1 batch params provider")?; + drop(conn); + let state = Arc::new(RwLock::new(State { + rocksdb: None, + l1_batch_number: L1BatchNumber(0), + storage: BTreeMap::new(), + })); + let task = + StorageSyncTask::new(pool.clone(), chain_id, rocksdb_path, loader, state.clone()) + .await?; + Ok(( + Self { + pool, + l1_batch_params_provider, + chain_id, + state, + _marker: PhantomData, + }, + task, + )) + } + + async fn access_storage_inner( + &self, + _stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>> { + let state = self.state.read().await; + let Some(rocksdb) = &state.rocksdb else { + return Ok(Some( + PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) + .await + .context("Failed accessing Postgres storage")?, + )); + }; + if !state.can_be_used_for_l1_batch(l1_batch_number) { + tracing::debug!( + %l1_batch_number, + min_l1_batch = %state.l1_batch_number, + max_l1_batch = %state.storage.last_key_value().map(|(k, _)| *k).unwrap_or(state.l1_batch_number), + "Trying to access VM runner storage with L1 batch that is not available", + ); + return Ok(None); + } + let batch_diffs = state + .storage + .iter() + .filter_map(|(x, y)| { + if x <= &l1_batch_number { + Some(y.diff.clone()) + } else { + None + } + }) + .collect::>(); + Ok(Some(PgOrRocksdbStorage::RocksdbWithMemory( + RocksdbWithMemory { + rocksdb: rocksdb.clone(), + batch_diffs, + }, + ))) + } + + /// Loads next unprocessed L1 batch along with all transactions that VM runner needs to + /// re-execute. These are the transactions that are included in a sealed L2 block belonging + /// to a sealed L1 batch (with state keeper being the source of truth). The order of the + /// transactions is the same as it was when state keeper executed them. + /// + /// Can return `None` if there are no batches to be processed. + /// + /// # Errors + /// + /// Propagates DB errors. + pub async fn load_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let state = self.state.read().await; + if state.rocksdb.is_none() { + let mut conn = self.pool.connection_tagged(L::name()).await?; + return StorageSyncTask::::load_batch_execute_data( + &mut conn, + l1_batch_number, + &self.l1_batch_params_provider, + self.chain_id, + ) + .await; + } + match state.storage.get(&l1_batch_number) { + None => { + tracing::debug!( + %l1_batch_number, + min_l1_batch = %(state.l1_batch_number + 1), + max_l1_batch = %state.storage.last_key_value().map(|(k, _)| *k).unwrap_or(state.l1_batch_number), + "Trying to load an L1 batch that is not available" + ); + Ok(None) + } + Some(batch_data) => Ok(Some(batch_data.execute_data.clone())), + } + } +} + +#[async_trait] +impl ReadStorageFactory for VmRunnerStorage { + async fn access_storage( + &self, + stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>> { + self.access_storage_inner(stop_receiver, l1_batch_number) + .await + } +} + +/// A runnable task that catches up the provided RocksDB cache instance to the latest processed +/// batch and then continuously makes sure that this invariant is held for the foreseeable future. +/// In the meanwhile, `StorageSyncTask` also loads the next `max_batches_to_load` batches in memory +/// so that they are immediately accessible by [`VmRunnerStorage`]. +#[derive(Debug)] +pub struct StorageSyncTask { + pool: ConnectionPool, + l1_batch_params_provider: L1BatchParamsProvider, + chain_id: L2ChainId, + rocksdb_cell: Arc>>, + loader: L, + state: Arc>, + catchup_task: AsyncCatchupTask, +} + +impl StorageSyncTask { + async fn new( + pool: ConnectionPool, + chain_id: L2ChainId, + rocksdb_path: String, + loader: L, + state: Arc>, + ) -> anyhow::Result { + let mut conn = pool.connection_tagged(L::name()).await?; + let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) + .await + .context("Failed initializing L1 batch params provider")?; + let rocksdb_cell = Arc::new(OnceCell::new()); + let catchup_task = AsyncCatchupTask::new( + pool.clone(), + rocksdb_path, + rocksdb_cell.clone(), + Some(loader.latest_processed_batch(&mut conn).await?), + ); + drop(conn); + Ok(Self { + pool, + l1_batch_params_provider, + chain_id, + rocksdb_cell, + loader, + state, + catchup_task, + }) + } + + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.catchup_task.run(stop_receiver.clone()).await?; + let rocksdb = self.rocksdb_cell.get().ok_or_else(|| { + anyhow::anyhow!("Expected RocksDB to be initialized by `AsyncCatchupTask`") + })?; + loop { + if *stop_receiver.borrow() { + tracing::info!("`StorageSyncTask` was interrupted"); + return Ok(()); + } + let mut conn = self.pool.connection_tagged(L::name()).await?; + let latest_processed_batch = self.loader.latest_processed_batch(&mut conn).await?; + let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb.clone()); + if rocksdb_builder.l1_batch_number().await == Some(latest_processed_batch + 1) { + // RocksDB is already caught up, we might not need to do anything. + // Just need to check that the memory diff is up-to-date in case this is a fresh start. + let state = self.state.read().await; + if state + .storage + .contains_key(&self.loader.last_ready_to_be_loaded_batch(&mut conn).await?) + { + // No need to do anything, killing time until last processed batch is updated. + drop(conn); + drop(state); + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + } + // We rely on the assumption that no one is holding storage access to a batch with + // number less than `latest_processed_batch`. If they do, RocksDB synchronization below + // will cause them to have an inconsistent view on DB which we consider to be an + // undefined behavior. + let rocksdb = rocksdb_builder + .synchronize(&mut conn, &stop_receiver, Some(latest_processed_batch)) + .await + .context("Failed to catch up state keeper RocksDB storage to Postgres")?; + let Some(rocksdb) = rocksdb else { + tracing::info!("`StorageSyncTask` was interrupted during RocksDB synchronization"); + return Ok(()); + }; + let mut state = self.state.write().await; + state.rocksdb = Some(rocksdb); + state.l1_batch_number = latest_processed_batch; + state + .storage + .retain(|l1_batch_number, _| l1_batch_number > &latest_processed_batch); + let max_present = state + .storage + .last_entry() + .map(|e| *e.key()) + .unwrap_or(latest_processed_batch); + drop(state); + let max_desired = self.loader.last_ready_to_be_loaded_batch(&mut conn).await?; + for l1_batch_number in max_present.0 + 1..=max_desired.0 { + let l1_batch_number = L1BatchNumber(l1_batch_number); + let Some(execute_data) = Self::load_batch_execute_data( + &mut conn, + l1_batch_number, + &self.l1_batch_params_provider, + self.chain_id, + ) + .await? + else { + break; + }; + let state_diff = conn + .storage_logs_dal() + .get_touched_slots_for_l1_batch(l1_batch_number) + .await?; + let enum_index_diff = conn + .storage_logs_dedup_dal() + .initial_writes_for_batch(l1_batch_number) + .await? + .into_iter() + .collect::>(); + let factory_dep_diff = conn + .blocks_dal() + .get_l1_batch_factory_deps(l1_batch_number) + .await?; + let diff = BatchDiff { + state_diff, + enum_index_diff, + factory_dep_diff, + }; + + let mut state = self.state.write().await; + state + .storage + .insert(l1_batch_number, BatchData { execute_data, diff }); + drop(state); + } + drop(conn); + } + } + + async fn load_batch_execute_data( + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + l1_batch_params_provider: &L1BatchParamsProvider, + chain_id: L2ChainId, + ) -> anyhow::Result> { + let first_l2_block_in_batch = l1_batch_params_provider + .load_first_l2_block_in_batch(conn, l1_batch_number) + .await + .with_context(|| { + format!( + "Failed loading first L2 block for L1 batch #{}", + l1_batch_number + ) + })?; + let Some(first_l2_block_in_batch) = first_l2_block_in_batch else { + return Ok(None); + }; + let (system_env, l1_batch_env) = l1_batch_params_provider + .load_l1_batch_params( + conn, + &first_l2_block_in_batch, + // `validation_computational_gas_limit` is only relevant when rejecting txs, but we + // are re-executing so none of them should be rejected + u32::MAX, + chain_id, + ) + .await + .with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?; + let l2_blocks = conn + .transactions_dal() + .get_l2_blocks_to_execute_for_l1_batch(l1_batch_number) + .await?; + Ok(Some(BatchExecuteData { + l1_batch_env, + system_env, + l2_blocks, + })) + } +} diff --git a/core/lib/zksync_core/src/vm_runner/tests/mod.rs b/core/lib/zksync_core/src/vm_runner/tests/mod.rs new file mode 100644 index 000000000000..48f4d5073069 --- /dev/null +++ b/core/lib/zksync_core/src/vm_runner/tests/mod.rs @@ -0,0 +1,483 @@ +use std::{collections::HashMap, ops, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use backon::{ConstantBuilder, ExponentialBuilder, Retryable}; +use rand::Rng; +use tempfile::TempDir; +use tokio::{ + runtime::Handle, + sync::{watch, RwLock}, + task::JoinHandle, +}; +use zksync_contracts::BaseSystemContractsHashes; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_state::{PgOrRocksdbStorage, PostgresStorage, ReadStorage, ReadStorageFactory}; +use zksync_types::{ + block::{BlockGasCount, L1BatchHeader}, + fee::TransactionExecutionMetrics, + AccountTreeId, L1BatchNumber, L2ChainId, ProtocolVersionId, StorageKey, StorageLog, + StorageLogKind, StorageValue, H160, H256, +}; + +use super::{BatchExecuteData, VmRunnerStorage, VmRunnerStorageLoader}; +use crate::{ + genesis::{insert_genesis_batch, GenesisParams}, + utils::testonly::{ + create_l1_batch_metadata, create_l2_transaction, create_miniblock, execute_l2_transaction, + l1_batch_metadata_to_commitment_artifacts, + }, +}; + +#[derive(Debug, Default)] +struct LoaderMock { + current: L1BatchNumber, + max: L1BatchNumber, +} + +#[async_trait] +impl VmRunnerStorageLoader for Arc> { + fn name() -> &'static str { + "loader_mock" + } + + async fn latest_processed_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(self.read().await.current) + } + + async fn last_ready_to_be_loaded_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(self.read().await.max) + } +} + +#[derive(Debug)] +struct VmRunnerTester { + db_dir: TempDir, + pool: ConnectionPool, + tasks: Vec>, +} + +impl VmRunnerTester { + fn new(pool: ConnectionPool) -> Self { + Self { + db_dir: TempDir::new().unwrap(), + pool, + tasks: Vec::new(), + } + } + + async fn create_storage( + &mut self, + loader_mock: Arc>, + ) -> anyhow::Result>>> { + let (vm_runner_storage, task) = VmRunnerStorage::new( + self.pool.clone(), + self.db_dir.path().to_str().unwrap().to_owned(), + loader_mock, + L2ChainId::from(270), + ) + .await?; + let handle = tokio::task::spawn(async move { + let (_stop_sender, stop_receiver) = watch::channel(false); + task.run(stop_receiver).await.unwrap() + }); + self.tasks.push(handle); + Ok(vm_runner_storage) + } +} + +impl VmRunnerStorage { + async fn load_batch_eventually( + &self, + number: L1BatchNumber, + ) -> anyhow::Result { + (|| async { + self.load_batch(number) + .await? + .ok_or_else(|| anyhow::anyhow!("Batch #{} is not available yet", number)) + }) + .retry(&ExponentialBuilder::default()) + .await + } + + async fn access_storage_eventually( + &self, + stop_receiver: &watch::Receiver, + number: L1BatchNumber, + ) -> anyhow::Result> { + (|| async { + self.access_storage(stop_receiver, number) + .await? + .ok_or_else(|| { + anyhow::anyhow!("Storage for batch #{} is not available yet", number) + }) + }) + .retry(&ExponentialBuilder::default()) + .await + } + + async fn ensure_batch_unloads_eventually(&self, number: L1BatchNumber) -> anyhow::Result<()> { + (|| async { + Ok(anyhow::ensure!( + self.load_batch(number).await?.is_none(), + "Batch #{} is still available", + number + )) + }) + .retry(&ExponentialBuilder::default()) + .await + } + + async fn batch_stays_unloaded(&self, number: L1BatchNumber) -> bool { + (|| async { + self.load_batch(number) + .await? + .ok_or_else(|| anyhow::anyhow!("Batch #{} is not available yet", number)) + }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_millis(100)) + .with_max_times(3), + ) + .await + .is_err() + } +} + +async fn store_l2_blocks( + conn: &mut Connection<'_, Core>, + numbers: ops::RangeInclusive, + contract_hashes: BaseSystemContractsHashes, +) -> anyhow::Result> { + let mut rng = rand::thread_rng(); + let mut batches = Vec::new(); + let mut l2_block_number = conn + .blocks_dal() + .get_last_sealed_l2_block_header() + .await? + .map(|m| m.number) + .unwrap_or_default() + + 1; + for l1_batch_number in numbers { + let l1_batch_number = L1BatchNumber(l1_batch_number); + let tx = create_l2_transaction(10, 100); + conn.transactions_dal() + .insert_transaction_l2(&tx, TransactionExecutionMetrics::default()) + .await?; + let mut logs = Vec::new(); + let mut written_keys = Vec::new(); + for _ in 0..10 { + let key = StorageKey::new(AccountTreeId::new(H160::random()), H256::random()); + let value = StorageValue::random(); + written_keys.push(key); + logs.push(StorageLog { + kind: StorageLogKind::Write, + key, + value, + }); + } + let mut factory_deps = HashMap::new(); + for _ in 0..10 { + factory_deps.insert(H256::random(), rng.gen::<[u8; 32]>().into()); + } + conn.storage_logs_dal() + .insert_storage_logs(l2_block_number, &[(tx.hash(), logs)]) + .await?; + conn.storage_logs_dedup_dal() + .insert_initial_writes(l1_batch_number, &written_keys) + .await?; + conn.factory_deps_dal() + .insert_factory_deps(l2_block_number, &factory_deps) + .await?; + let mut new_l2_block = create_miniblock(l2_block_number.0); + l2_block_number += 1; + new_l2_block.base_system_contracts_hashes = contract_hashes; + new_l2_block.l2_tx_count = 1; + conn.blocks_dal().insert_l2_block(&new_l2_block).await?; + let tx_result = execute_l2_transaction(tx); + conn.transactions_dal() + .mark_txs_as_executed_in_l2_block(new_l2_block.number, &[tx_result], 1.into()) + .await?; + + // Insert a fictive L2 block at the end of the batch + let fictive_l2_block = create_miniblock(l2_block_number.0); + l2_block_number += 1; + conn.blocks_dal().insert_l2_block(&fictive_l2_block).await?; + + let header = L1BatchHeader::new( + l1_batch_number, + l2_block_number.0 as u64 - 2, // Matches the first L2 block in the batch + BaseSystemContractsHashes::default(), + ProtocolVersionId::default(), + ); + let predicted_gas = BlockGasCount { + commit: 2, + prove: 3, + execute: 10, + }; + conn.blocks_dal() + .insert_l1_batch(&header, &[], predicted_gas, &[], &[], Default::default()) + .await?; + conn.blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(l1_batch_number) + .await?; + + let metadata = create_l1_batch_metadata(l1_batch_number.0); + conn.blocks_dal() + .save_l1_batch_tree_data(l1_batch_number, &metadata.tree_data()) + .await?; + conn.blocks_dal() + .save_l1_batch_commitment_artifacts( + l1_batch_number, + &l1_batch_metadata_to_commitment_artifacts(&metadata), + ) + .await?; + batches.push(header); + } + + Ok(batches) +} + +#[tokio::test] +async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { + let connection_pool = ConnectionPool::::test_pool().await; + let mut conn = connection_pool.connection().await.unwrap(); + let genesis_params = GenesisParams::mock(); + insert_genesis_batch(&mut conn, &genesis_params) + .await + .unwrap(); + drop(conn); + + // Generate 10 batches worth of data and persist it in Postgres + let batches = store_l2_blocks( + &mut connection_pool.connection().await?, + 1u32..=10u32, + genesis_params.base_system_contracts().hashes(), + ) + .await?; + + let mut tester = VmRunnerTester::new(connection_pool.clone()); + let loader_mock = Arc::new(RwLock::new(LoaderMock { + current: 0.into(), + max: 10.into(), + })); + let storage = tester.create_storage(loader_mock.clone()).await?; + // Check that existing batches are returned in the exact same order with the exact same data + for batch in &batches { + let batch_data = storage.load_batch_eventually(batch.number).await?; + let mut conn = connection_pool.connection().await.unwrap(); + let (previous_batch_hash, _) = conn + .blocks_dal() + .get_l1_batch_state_root_and_timestamp(batch_data.l1_batch_env.number - 1) + .await? + .unwrap(); + assert_eq!( + batch_data.l1_batch_env.previous_batch_hash, + Some(previous_batch_hash) + ); + assert_eq!(batch_data.l1_batch_env.number, batch.number); + assert_eq!(batch_data.l1_batch_env.timestamp, batch.timestamp); + let (first_l2_block_number, _) = conn + .blocks_dal() + .get_l2_block_range_of_l1_batch(batch.number) + .await? + .unwrap(); + let previous_l2_block_header = conn + .blocks_dal() + .get_l2_block_header(first_l2_block_number - 1) + .await? + .unwrap(); + let l2_block_header = conn + .blocks_dal() + .get_l2_block_header(first_l2_block_number) + .await? + .unwrap(); + assert_eq!( + batch_data.l1_batch_env.first_l2_block.number, + l2_block_header.number.0 + ); + assert_eq!( + batch_data.l1_batch_env.first_l2_block.timestamp, + l2_block_header.timestamp + ); + assert_eq!( + batch_data.l1_batch_env.first_l2_block.prev_block_hash, + previous_l2_block_header.hash + ); + let l2_blocks = conn + .transactions_dal() + .get_l2_blocks_to_execute_for_l1_batch(batch_data.l1_batch_env.number) + .await?; + assert_eq!(batch_data.l2_blocks, l2_blocks); + } + + // "Mark" these batches as processed + loader_mock.write().await.current += batches.len() as u32; + + // All old batches should no longer be loadable + for batch in batches { + storage + .ensure_batch_unloads_eventually(batch.number) + .await?; + } + + Ok(()) +} + +#[tokio::test] +async fn continuously_load_new_batches() -> anyhow::Result<()> { + let connection_pool = ConnectionPool::::test_pool().await; + let mut conn = connection_pool.connection().await.unwrap(); + let genesis_params = GenesisParams::mock(); + insert_genesis_batch(&mut conn, &genesis_params) + .await + .unwrap(); + drop(conn); + + let mut tester = VmRunnerTester::new(connection_pool.clone()); + let loader_mock = Arc::new(RwLock::new(LoaderMock::default())); + let storage = tester.create_storage(loader_mock.clone()).await?; + // No batches available yet + assert!(storage.load_batch(L1BatchNumber(1)).await?.is_none()); + + // Generate one batch and persist it in Postgres + store_l2_blocks( + &mut connection_pool.connection().await?, + 1u32..=1u32, + genesis_params.base_system_contracts().hashes(), + ) + .await?; + loader_mock.write().await.max += 1; + + // Load batch and mark it as processed + assert_eq!( + storage + .load_batch_eventually(L1BatchNumber(1)) + .await? + .l1_batch_env + .number, + L1BatchNumber(1) + ); + loader_mock.write().await.current += 1; + + // No more batches after that + assert!(storage.batch_stays_unloaded(L1BatchNumber(2)).await); + + // Generate one more batch and persist it in Postgres + store_l2_blocks( + &mut connection_pool.connection().await?, + 2u32..=2u32, + genesis_params.base_system_contracts().hashes(), + ) + .await?; + loader_mock.write().await.max += 1; + + // Load batch and mark it as processed + + assert_eq!( + storage + .load_batch_eventually(L1BatchNumber(2)) + .await? + .l1_batch_env + .number, + L1BatchNumber(2) + ); + loader_mock.write().await.current += 1; + + // No more batches after that + assert!(storage.batch_stays_unloaded(L1BatchNumber(3)).await); + + Ok(()) +} + +#[tokio::test] +async fn access_vm_runner_storage() -> anyhow::Result<()> { + let connection_pool = ConnectionPool::::test_pool().await; + let mut conn = connection_pool.connection().await.unwrap(); + let genesis_params = GenesisParams::mock(); + insert_genesis_batch(&mut conn, &genesis_params) + .await + .unwrap(); + drop(conn); + + // Generate 10 batches worth of data and persist it in Postgres + let batch_range = 1u32..=10u32; + store_l2_blocks( + &mut connection_pool.connection().await?, + batch_range, + genesis_params.base_system_contracts().hashes(), + ) + .await?; + + let mut conn = connection_pool.connection().await?; + let storage_logs = conn + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + let factory_deps = conn + .factory_deps_dal() + .dump_all_factory_deps_for_tests() + .await; + drop(conn); + + let (_sender, receiver) = watch::channel(false); + let mut tester = VmRunnerTester::new(connection_pool.clone()); + let loader_mock = Arc::new(RwLock::new(LoaderMock { + current: 0.into(), + max: 10.into(), + })); + let rt_handle = Handle::current(); + let handle = tokio::task::spawn_blocking(move || { + let vm_runner_storage = + rt_handle.block_on(async { tester.create_storage(loader_mock.clone()).await.unwrap() }); + for i in 1..=10 { + let mut conn = rt_handle.block_on(connection_pool.connection()).unwrap(); + let (_, last_l2_block_number) = rt_handle + .block_on( + conn.blocks_dal() + .get_l2_block_range_of_l1_batch(L1BatchNumber(i)), + )? + .unwrap(); + let mut pg_storage = + PostgresStorage::new(rt_handle.clone(), conn, last_l2_block_number, true); + let mut vm_storage = rt_handle.block_on(async { + vm_runner_storage + .access_storage_eventually(&receiver, L1BatchNumber(i)) + .await + })?; + // Check that both storages have identical key-value pairs written in them + for storage_log in &storage_logs { + let storage_key = + StorageKey::new(AccountTreeId::new(storage_log.address), storage_log.key); + assert_eq!( + pg_storage.read_value(&storage_key), + vm_storage.read_value(&storage_key) + ); + assert_eq!( + pg_storage.get_enumeration_index(&storage_key), + vm_storage.get_enumeration_index(&storage_key) + ); + assert_eq!( + pg_storage.is_write_initial(&storage_key), + vm_storage.is_write_initial(&storage_key) + ); + } + for hash in factory_deps.keys() { + assert_eq!( + pg_storage.load_factory_dep(*hash), + vm_storage.load_factory_dep(*hash) + ); + } + } + + anyhow::Ok(()) + }); + handle.await??; + + Ok(()) +} diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs index 821acddf7404..0862b3c080d6 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use zksync_config::{configs::chain::StateKeeperConfig, DBConfig}; -use zksync_core::state_keeper::{AsyncCatchupTask, AsyncRocksdbCache, MainBatchExecutor}; +use zksync_core::state_keeper::{AsyncRocksdbCache, MainBatchExecutor}; +use zksync_state::AsyncCatchupTask; use crate::{ implementations::resources::{pools::MasterPoolResource, state_keeper::BatchExecutorResource}, diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 6f416776889a..f19ff517d9df 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7938,18 +7938,30 @@ dependencies = [ "zksync_utils", ] +[[package]] +name = "zksync_shared_metrics" +version = "0.1.0" +dependencies = [ + "vise", + "zksync_dal", + "zksync_types", +] + [[package]] name = "zksync_state" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", "itertools 0.10.5", "mini-moka", + "once_cell", "tokio", "tracing", "vise", "zksync_dal", + "zksync_shared_metrics", "zksync_storage", "zksync_types", "zksync_utils",