From 1810b78594083f1f98d2901f3643b5687ce9d8e8 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 6 Aug 2024 14:56:32 +0300 Subject: [PATCH] refactor(state-keeper): Use owned VM storage for batch executor (#2559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Refactors `ReadStorageFactory` and `BatchExecutor`. - Tests the VM runner on multiple batch workloads and fixes bugs discovered by the tests (e.g., a data race in the storage loader). ## Why ❔ - `ReadStorageFactory` and `BatchExecutor` become more composable and work in VM runner with less crutches. - More tests allow fixing bugs in VM runner earlier. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. fix(vm-runner): Fix data race in storage loader test(vm-runner): Test VM runner with multiple L1 batches --- Cargo.lock | 1 + core/lib/state/src/lib.rs | 5 +- core/lib/state/src/rocksdb/mod.rs | 6 + core/lib/state/src/storage_factory.rs | 100 ++++++-- core/node/consensus/src/testonly.rs | 7 +- core/node/node_sync/src/tests.rs | 6 +- core/node/state_keeper/Cargo.toml | 2 +- .../src/batch_executor/main_executor.rs | 36 ++- .../state_keeper/src/batch_executor/mod.rs | 13 +- .../tests/read_storage_factory.rs | 36 +-- .../src/batch_executor/tests/tester.rs | 32 +-- core/node/state_keeper/src/keeper.rs | 41 ++-- .../state_keeper/src/state_keeper_storage.rs | 14 +- core/node/state_keeper/src/testonly/mod.rs | 17 +- .../src/testonly/test_batch_executor.rs | 74 ++---- core/node/state_keeper/src/tests/mod.rs | 2 +- core/node/vm_runner/Cargo.toml | 1 + core/node/vm_runner/src/process.rs | 22 +- core/node/vm_runner/src/storage.rs | 196 ++++++---------- core/node/vm_runner/src/tests/mod.rs | 52 ++++- core/node/vm_runner/src/tests/process.rs | 44 ++-- core/node/vm_runner/src/tests/storage.rs | 70 ++---- .../vm_runner/src/tests/storage_writer.rs | 215 ++++++++++++++++++ 23 files changed, 565 insertions(+), 427 deletions(-) create mode 100644 core/node/vm_runner/src/tests/storage_writer.rs diff --git a/Cargo.lock b/Cargo.lock index 1852386b18c9..d5fa605649df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9735,6 +9735,7 @@ dependencies = [ "once_cell", "rand 0.8.5", "tempfile", + "test-casing", "tokio", "tracing", "vise", diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index 66577841fd45..5044490c46dd 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -28,7 +28,10 @@ pub use self::{ RocksdbStorage, RocksdbStorageBuilder, RocksdbStorageOptions, StateKeeperColumnFamily, }, shadow_storage::ShadowStorage, - storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory}, + storage_factory::{ + BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, + RocksdbWithMemory, + }, storage_view::{StorageView, StorageViewCache, StorageViewMetrics}, witness::WitnessStorage, }; diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index aab33c7dfe83..61a1eb362be1 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -257,6 +257,12 @@ impl RocksdbStorageBuilder { ) -> anyhow::Result<()> { self.0.revert(storage, last_l1_batch_to_keep).await } + + /// Returns the underlying storage without any checks. Should only be used in test code. + #[doc(hidden)] + pub fn build_unchecked(self) -> RocksdbStorage { + self.0 + } } impl RocksdbStorage { diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index 307fa465a7c9..c506bf7042d1 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -11,19 +11,33 @@ use crate::{ PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily, }; -/// Factory that can produce a [`ReadStorage`] implementation on demand. +/// Factory that can produce [`OwnedStorage`] instances on demand. #[async_trait] pub trait ReadStorageFactory: Debug + Send + Sync + 'static { - /// Creates a [`PgOrRocksdbStorage`] entity over either a Postgres connection or RocksDB + /// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB /// instance. The specific criteria on which one are left up to the implementation. /// - /// The idea is that in either case this provides a valid [`ReadStorage`] implementation - /// that can be used by the caller. + /// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives + /// a stop signal; this is the only case in which `Ok(None)` should be returned. async fn access_storage( &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>>; + ) -> anyhow::Result>; +} + +/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced +/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing). +#[async_trait] +impl ReadStorageFactory for ConnectionPool { + async fn access_storage( + &self, + _stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let storage = OwnedPostgresStorage::new(self.clone(), l1_batch_number); + Ok(Some(storage.into())) + } } /// DB difference introduced by one batch. @@ -47,29 +61,31 @@ pub struct RocksdbWithMemory { pub batch_diffs: Vec, } -/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] -/// underneath. +/// Owned Postgres-backed VM storage for a certain L1 batch. #[derive(Debug)] -pub enum PgOrRocksdbStorage<'a> { - /// Implementation over a Postgres connection. - Postgres(PostgresStorage<'a>), - /// Implementation over a RocksDB cache instance. - Rocksdb(RocksdbStorage), - /// Implementation over a RocksDB cache instance with in-memory DB diffs. - RocksdbWithMemory(RocksdbWithMemory), +pub struct OwnedPostgresStorage { + connection_pool: ConnectionPool, + l1_batch_number: L1BatchNumber, } -impl<'a> PgOrRocksdbStorage<'a> { +impl OwnedPostgresStorage { + /// Creates a VM storage for the specified batch number. + pub fn new(connection_pool: ConnectionPool, l1_batch_number: L1BatchNumber) -> Self { + Self { + connection_pool, + l1_batch_number, + } + } + /// Returns a [`ReadStorage`] implementation backed by Postgres /// /// # Errors /// /// Propagates Postgres errors. - pub async fn access_storage_pg( - pool: &'a ConnectionPool, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - let mut connection = pool.connection().await?; + pub async fn borrow(&self) -> anyhow::Result> { + let l1_batch_number = self.l1_batch_number; + let mut connection = self.connection_pool.connection().await?; + let l2_block_number = if let Some((_, l2_block_number)) = connection .blocks_dal() .get_l2_block_range_of_l1_batch(l1_batch_number) @@ -85,9 +101,8 @@ impl<'a> PgOrRocksdbStorage<'a> { .context("Could not find snapshot, no state available")?; if snapshot_recovery.l1_batch_number != l1_batch_number { anyhow::bail!( - "Snapshot contains L1 batch #{} while #{} was expected", - snapshot_recovery.l1_batch_number, - l1_batch_number + "Snapshot contains L1 batch #{} while #{l1_batch_number} was expected", + snapshot_recovery.l1_batch_number ); } snapshot_recovery.l2_block_number @@ -99,19 +114,54 @@ impl<'a> PgOrRocksdbStorage<'a> { .into(), ) } +} + +/// Owned version of [`PgOrRocksdbStorage`]. It is thus possible to send to blocking tasks for VM execution. +#[derive(Debug)] +pub enum OwnedStorage { + /// Readily initialized storage with a static lifetime. + Static(PgOrRocksdbStorage<'static>), + /// Storage that must be `borrow()`ed from. + Lending(OwnedPostgresStorage), +} + +impl From for OwnedStorage { + fn from(storage: OwnedPostgresStorage) -> Self { + Self::Lending(storage) + } +} + +impl From> for OwnedStorage { + fn from(storage: PgOrRocksdbStorage<'static>) -> Self { + Self::Static(storage) + } +} + +/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] +/// underneath. +#[derive(Debug)] +pub enum PgOrRocksdbStorage<'a> { + /// Implementation over a Postgres connection. + Postgres(PostgresStorage<'a>), + /// Implementation over a RocksDB cache instance. + Rocksdb(RocksdbStorage), + /// Implementation over a RocksDB cache instance with in-memory DB diffs. + RocksdbWithMemory(RocksdbWithMemory), +} +impl PgOrRocksdbStorage<'static> { /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. /// /// # Errors /// /// Propagates RocksDB and Postgres errors. - pub async fn access_storage_rocksdb( + pub async fn rocksdb( connection: &mut Connection<'_, Core>, rocksdb: RocksDB, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { tracing::debug!("Catching up RocksDB synchronously"); let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb); let rocksdb = rocksdb_builder diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 8ed9b933d164..411d1044df3b 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -33,10 +33,7 @@ use zksync_state::RocksdbStorageOptions; use zksync_state_keeper::{ io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::{ - fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory, - MockBatchExecutor, - }, + testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor}, AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; @@ -638,7 +635,7 @@ impl StateKeeperRunner { .with_handler(Box::new(tree_writes_persistence)) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), - Arc::new(MockReadStorageFactory), + Arc::new(self.pool.0.clone()), ) .run() .await diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 510f9124c297..e091472ad512 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -13,7 +13,7 @@ use zksync_node_test_utils::{ use zksync_state_keeper::{ io::{L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder}, + testonly::test_batch_executor::TestBatchExecutorBuilder, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_types::{ @@ -113,7 +113,7 @@ impl StateKeeperHandles { tokio::spawn(l2_block_sealer.run()); let io = ExternalIO::new( - pool, + pool.clone(), actions, Box::new(main_node_client), L2ChainId::default(), @@ -132,7 +132,7 @@ impl StateKeeperHandles { Box::new(batch_executor_base), output_handler, Arc::new(NoopSealer), - Arc::new(MockReadStorageFactory), + Arc::new(pool), ); Self { diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 904d17718503..890543bcd910 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -33,6 +33,7 @@ zksync_base_token_adjuster.workspace = true anyhow.workspace = true async-trait.workspace = true +tempfile.workspace = true # used in `testonly` module tokio = { workspace = true, features = ["time"] } thiserror.workspace = true tracing.workspace = true @@ -45,7 +46,6 @@ hex.workspace = true assert_matches.workspace = true test-casing.workspace = true futures.workspace = true -tempfile.workspace = true zksync_eth_client.workspace = true zksync_system_constants.workspace = true diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index 2434e92e812f..4c85fc5bb1fc 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -1,12 +1,8 @@ use std::sync::Arc; use anyhow::Context as _; -use async_trait::async_trait; use once_cell::sync::OnceCell; -use tokio::{ - runtime::Handle, - sync::{mpsc, watch}, -}; +use tokio::{runtime::Handle, sync::mpsc}; use zksync_multivm::{ interface::{ ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, @@ -17,7 +13,7 @@ use zksync_multivm::{ MultiVMTracer, VmInstance, }; use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; -use zksync_state::{ReadStorage, ReadStorageFactory, StorageView, WriteStorage}; +use zksync_state::{OwnedStorage, ReadStorage, StorageView, WriteStorage}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; @@ -50,15 +46,13 @@ impl MainBatchExecutor { } } -#[async_trait] impl BatchExecutor for MainBatchExecutor { - async fn init_batch( + fn init_batch( &mut self, - storage_factory: Arc, + storage: OwnedStorage, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { // Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued // until a previous command is processed), capacity 1 is enough for the commands channel. let (commands_sender, commands_receiver) = mpsc::channel(1); @@ -68,21 +62,17 @@ impl BatchExecutor for MainBatchExecutor { commands: commands_receiver, }; - let stop_receiver = stop_receiver.clone(); let handle = tokio::task::spawn_blocking(move || { - if let Some(storage) = Handle::current() - .block_on( - storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1), - ) - .context("failed accessing state keeper storage")? - { - executor.run(storage, l1_batch_params, system_env); - } else { - tracing::info!("Interrupted while trying to access state keeper storage"); - } + let storage = match storage { + OwnedStorage::Static(storage) => storage, + OwnedStorage::Lending(ref storage) => Handle::current() + .block_on(storage.borrow()) + .context("failed accessing state keeper storage")?, + }; + executor.run(storage, l1_batch_params, system_env); anyhow::Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, commands_sender)) + BatchExecutorHandle::from_raw(handle, commands_sender) } } diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index da4eb194bff5..d4fea2e9dfd5 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -1,15 +1,14 @@ use std::{error::Error as StdError, fmt, sync::Arc}; use anyhow::Context as _; -use async_trait::async_trait; use tokio::{ - sync::{mpsc, oneshot, watch}, + sync::{mpsc, oneshot}, task::JoinHandle, }; use zksync_multivm::interface::{ FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, }; -use zksync_state::{ReadStorageFactory, StorageViewCache}; +use zksync_state::{OwnedStorage, StorageViewCache}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; @@ -55,15 +54,13 @@ impl TxExecutionResult { /// An abstraction that allows us to create different kinds of batch executors. /// The only requirement is to return a [`BatchExecutorHandle`], which does its work /// by communicating with the externally initialized thread. -#[async_trait] pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { - async fn init_batch( + fn init_batch( &mut self, - storage_factory: Arc, + storage: OwnedStorage, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Option; + ) -> BatchExecutorHandle; } #[derive(Debug)] diff --git a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs index f23829ed5203..838b92407673 100644 --- a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs +++ b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs @@ -2,46 +2,22 @@ use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; -use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; +use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; use zksync_types::L1BatchNumber; #[derive(Debug, Clone)] -pub struct PostgresFactory { - pool: ConnectionPool, -} - -#[async_trait] -impl ReadStorageFactory for PostgresFactory { - async fn access_storage( - &self, - _stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number).await?, - )) - } -} - -impl PostgresFactory { - pub fn new(pool: ConnectionPool) -> Self { - Self { pool } - } -} - -#[derive(Debug, Clone)] -pub struct RocksdbFactory { +pub struct RocksdbStorageFactory { pool: ConnectionPool, state_keeper_db_path: String, } #[async_trait] -impl ReadStorageFactory for RocksdbFactory { +impl ReadStorageFactory for RocksdbStorageFactory { async fn access_storage( &self, stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { let builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) .await .context("Failed opening state keeper RocksDB")?; @@ -57,11 +33,11 @@ impl ReadStorageFactory for RocksdbFactory { else { return Ok(None); }; - Ok(Some(PgOrRocksdbStorage::Rocksdb(rocksdb_storage))) + Ok(Some(PgOrRocksdbStorage::Rocksdb(rocksdb_storage).into())) } } -impl RocksdbFactory { +impl RocksdbStorageFactory { pub fn new(pool: ConnectionPool, state_keeper_db_path: String) -> Self { Self { pool, diff --git a/core/node/state_keeper/src/batch_executor/tests/tester.rs b/core/node/state_keeper/src/batch_executor/tests/tester.rs index 579f3bee4819..961ccf9db16f 100644 --- a/core/node/state_keeper/src/batch_executor/tests/tester.rs +++ b/core/node/state_keeper/src/batch_executor/tests/tester.rs @@ -29,10 +29,7 @@ use zksync_types::{ }; use zksync_utils::u256_to_h256; -use super::{ - read_storage_factory::{PostgresFactory, RocksdbFactory}, - StorageType, -}; +use super::{read_storage_factory::RocksdbStorageFactory, StorageType}; use crate::{ batch_executor::{BatchExecutorHandle, TxExecutionResult}, testonly, @@ -121,7 +118,7 @@ impl Tester { } StorageType::Rocksdb => { self.create_batch_executor_inner( - Arc::new(RocksdbFactory::new( + Arc::new(RocksdbStorageFactory::new( self.pool(), self.state_keeper_db_path(), )), @@ -131,12 +128,8 @@ impl Tester { .await } StorageType::Postgres => { - self.create_batch_executor_inner( - Arc::new(PostgresFactory::new(self.pool())), - l1_batch_env, - system_env, - ) - .await + self.create_batch_executor_inner(Arc::new(self.pool()), l1_batch_env, system_env) + .await } } } @@ -149,10 +142,12 @@ impl Tester { ) -> BatchExecutorHandle { let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false); let (_stop_sender, stop_receiver) = watch::channel(false); - batch_executor - .init_batch(storage_factory, l1_batch_env, system_env, &stop_receiver) + let storage = storage_factory + .access_storage(&stop_receiver, l1_batch_env.number - 1) .await - .expect("Batch executor was interrupted") + .expect("failed creating VM storage") + .unwrap(); + batch_executor.init_batch(storage, l1_batch_env, system_env) } pub(super) async fn recover_batch_executor( @@ -180,7 +175,7 @@ impl Tester { StorageType::AsyncRocksdbCache => self.recover_batch_executor(snapshot).await, StorageType::Rocksdb => { self.recover_batch_executor_inner( - Arc::new(RocksdbFactory::new( + Arc::new(RocksdbStorageFactory::new( self.pool(), self.state_keeper_db_path(), )), @@ -189,11 +184,8 @@ impl Tester { .await } StorageType::Postgres => { - self.recover_batch_executor_inner( - Arc::new(PostgresFactory::new(self.pool())), - snapshot, - ) - .await + self.recover_batch_executor_inner(Arc::new(self.pool()), snapshot) + .await } } } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 7675960ecbdc..934ed9493f86 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -146,16 +146,8 @@ impl ZkSyncStateKeeper { .await?; let mut batch_executor = self - .batch_executor_base - .init_batch( - self.storage_factory.clone(), - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) - .await - .ok_or(Error::Canceled)?; - + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .await?; self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks) .await?; @@ -203,15 +195,8 @@ impl ZkSyncStateKeeper { (system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); batch_executor = self - .batch_executor_base - .init_batch( - self.storage_factory.clone(), - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) - .await - .ok_or(Error::Canceled)?; + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .await?; let version_changed = system_env.version != sealed_batch_protocol_version; protocol_upgrade_tx = if version_changed { @@ -223,6 +208,24 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + async fn create_batch_executor( + &mut self, + l1_batch_env: L1BatchEnv, + system_env: SystemEnv, + ) -> Result { + let Some(storage) = self + .storage_factory + .access_storage(&self.stop_receiver, l1_batch_env.number - 1) + .await + .context("failed creating VM storage")? + else { + return Err(Error::Canceled); + }; + Ok(self + .batch_executor_base + .init_batch(storage, l1_batch_env, system_env)) + } + /// This function is meant to be called only once during the state-keeper initialization. /// It will check if we should load a protocol upgrade or a `setChainId` transaction, /// perform some checks and return it. diff --git a/core/node/state_keeper/src/state_keeper_storage.rs b/core/node/state_keeper/src/state_keeper_storage.rs index 13cedc3a58a8..fbda064b5d71 100644 --- a/core/node/state_keeper/src/state_keeper_storage.rs +++ b/core/node/state_keeper/src/state_keeper_storage.rs @@ -5,7 +5,8 @@ use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, RocksdbStorageOptions, + AsyncCatchupTask, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, + RocksdbCell, RocksdbStorageOptions, }; use zksync_types::L1BatchNumber; @@ -41,7 +42,7 @@ impl ReadStorageFactory for AsyncRocksdbCache { &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { let initial_state = self.rocksdb_cell.ensure_initialized().await?; let rocksdb = if initial_state.l1_batch_number >= Some(l1_batch_number) { tracing::info!( @@ -63,19 +64,18 @@ impl ReadStorageFactory for AsyncRocksdbCache { .connection_tagged("state_keeper") .await .context("Failed getting a Postgres connection")?; - PgOrRocksdbStorage::access_storage_rocksdb( + let storage = PgOrRocksdbStorage::rocksdb( &mut connection, rocksdb, stop_receiver, l1_batch_number, ) .await - .context("Failed accessing RocksDB storage") + .context("Failed accessing RocksDB storage")?; + Ok(storage.map(Into::into)) } else { Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) - .await - .context("Failed accessing Postgres storage")?, + OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number).into(), )) } } diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index c287bc97407f..465042a602df 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -1,11 +1,8 @@ //! Test utilities that can be used for testing sequencer that may //! be useful outside of this crate. -use std::sync::Arc; - -use async_trait::async_trait; use once_cell::sync::Lazy; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use zksync_contracts::BaseSystemContracts; use zksync_dal::{ConnectionPool, Core, CoreDal as _}; use zksync_multivm::{ @@ -15,7 +12,7 @@ use zksync_multivm::{ }, vm_latest::VmExecutionLogs, }; -use zksync_state::{ReadStorageFactory, StorageViewCache}; +use zksync_state::{OwnedStorage, StorageViewCache}; use zksync_test_account::Account; use zksync_types::{ fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute, @@ -87,15 +84,13 @@ pub(crate) fn storage_view_cache() -> StorageViewCache { #[derive(Debug)] pub struct MockBatchExecutor; -#[async_trait] impl BatchExecutor for MockBatchExecutor { - async fn init_batch( + fn init_batch( &mut self, - _storage_factory: Arc, + _storage: OwnedStorage, _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { let (send, recv) = mpsc::channel(1); let handle = tokio::task::spawn(async { let mut recv = recv; @@ -116,7 +111,7 @@ impl BatchExecutor for MockBatchExecutor { } anyhow::Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, send)) + BatchExecutorHandle::from_raw(handle, send) } } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 1be84cfbf54e..aefc8d50bc7d 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -13,14 +13,14 @@ use std::{ }; use async_trait::async_trait; -use tokio::sync::{mpsc, watch, watch::Receiver}; +use tokio::sync::{mpsc, watch}; use zksync_contracts::BaseSystemContracts; use zksync_multivm::{ interface::{ExecutionResult, L1BatchEnv, SystemEnv, VmExecutionResultAndLogs}, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_node_test_utils::create_l2_transaction; -use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory}; +use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; use zksync_types::{ fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -207,7 +207,7 @@ impl TestScenario { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::new(MockReadStorageFactory), + Arc::::default(), ); let sk_thread = tokio::spawn(state_keeper.run()); @@ -410,15 +410,13 @@ impl TestBatchExecutorBuilder { } } -#[async_trait] impl BatchExecutor for TestBatchExecutorBuilder { - async fn init_batch( + fn init_batch( &mut self, - _storage_factory: Arc, - _l1batch_params: L1BatchEnv, + _storage: OwnedStorage, + _l1_batch_params: L1BatchEnv, _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { let (commands_sender, commands_receiver) = mpsc::channel(1); let executor = TestBatchExecutor::new( @@ -430,7 +428,7 @@ impl BatchExecutor for TestBatchExecutorBuilder { executor.run(); Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, commands_sender)) + BatchExecutorHandle::from_raw(handle, commands_sender) } } @@ -805,55 +803,31 @@ impl StateKeeperIO for TestIO { } } -/// `BatchExecutor` which doesn't check anything at all. Accepts all transactions. -// FIXME: move to `utils`? +/// Storage factory that produces empty VM storage for any batch. Should only be used with a mock batch executor +/// that doesn't read from the storage. Prefer using `ConnectionPool` as a factory if it's available. #[derive(Debug)] -pub(crate) struct MockBatchExecutor; +pub struct MockReadStorageFactory(tempfile::TempDir); -#[async_trait] -impl BatchExecutor for MockBatchExecutor { - async fn init_batch( - &mut self, - _storage_factory: Arc, - _l1batch_params: L1BatchEnv, - _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { - let (send, recv) = mpsc::channel(1); - let handle = tokio::task::spawn(async { - let mut recv = recv; - while let Some(cmd) = recv.recv().await { - match cmd { - Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), - Command::StartNextL2Block(_, resp) => resp.send(()).unwrap(), - Command::RollbackLastTx(_) => panic!("unexpected rollback"), - Command::FinishBatch(resp) => { - // Blanket result, it doesn't really matter. - resp.send(default_vm_batch_result()).unwrap(); - break; - } - Command::FinishBatchWithCache(resp) => resp - .send((default_vm_batch_result(), storage_view_cache())) - .unwrap(), - } - } - anyhow::Ok(()) - }); - Some(BatchExecutorHandle::from_raw(handle, send)) +impl Default for MockReadStorageFactory { + fn default() -> Self { + Self( + tempfile::TempDir::new() + .expect("failed creating temporary directory for `MockReadStorageFactory`"), + ) } } -#[derive(Debug)] -pub struct MockReadStorageFactory; - #[async_trait] impl ReadStorageFactory for MockReadStorageFactory { async fn access_storage( &self, - _stop_receiver: &Receiver, + _stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - // Presume that the storage is never accessed in mocked environment - unimplemented!() + ) -> anyhow::Result> { + let storage = RocksdbStorage::builder(self.0.path()) + .await + .expect("Cannot create mock RocksDB storage") + .build_unchecked(); + Ok(Some(PgOrRocksdbStorage::Rocksdb(storage).into())) } } diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index 8bfc53c8f7b1..2d0af7dd281b 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -438,7 +438,7 @@ async fn load_upgrade_tx() { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::new(MockReadStorageFactory), + Arc::::default(), ); // Since the version hasn't changed, and we are not using shared bridge, we should not load any diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 3af52ed4688e..52a8e4676437 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -39,3 +39,4 @@ backon.workspace = true futures = { workspace = true, features = ["compat"] } rand.workspace = true tempfile.workspace = true +test-casing.workspace = true diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index e84ec76d0726..3c5a00e074c0 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -173,26 +173,18 @@ impl VmRunner { tokio::time::sleep(SLEEP_INTERVAL).await; continue; } - let Some(batch_data) = self.loader.load_batch(next_batch).await? else { + let Some((batch_data, storage)) = self.loader.load_batch(next_batch).await? else { // Next batch has not been loaded yet tokio::time::sleep(SLEEP_INTERVAL).await; continue; }; let updates_manager = UpdatesManager::new(&batch_data.l1_batch_env, &batch_data.system_env); - let Some(batch_executor) = self - .batch_processor - .init_batch( - self.loader.clone().upcast(), - batch_data.l1_batch_env, - batch_data.system_env, - stop_receiver, - ) - .await - else { - tracing::info!("VM runner was interrupted"); - break; - }; + let batch_executor = self.batch_processor.init_batch( + storage, + batch_data.l1_batch_env, + batch_data.system_env, + ); let output_handler = self .output_handler_factory .create_handler(next_batch) @@ -211,7 +203,5 @@ impl VmRunner { next_batch += 1; } - - Ok(()) } } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index f3e304d7d4ff..75ed4cb57a9c 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -1,6 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, - fmt::Debug, + fmt, sync::Arc, time::Duration, }; @@ -11,8 +11,8 @@ use tokio::sync::{watch, RwLock}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; use zksync_state::{ - AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, - RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, + AsyncCatchupTask, BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, + RocksdbCell, RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, }; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; use zksync_vm_utils::storage::L1BatchParamsProvider; @@ -20,7 +20,7 @@ use zksync_vm_utils::storage::L1BatchParamsProvider; use crate::{metrics::METRICS, VmRunnerIo}; #[async_trait] -pub trait StorageLoader: ReadStorageFactory { +pub trait StorageLoader: 'static + Send + Sync + fmt::Debug { /// Loads next unprocessed L1 batch along with all transactions that VM runner needs to /// re-execute. These are the transactions that are included in a sealed L2 block belonging /// to a sealed L1 batch (with state keeper being the source of truth). The order of the @@ -34,13 +34,7 @@ pub trait StorageLoader: ReadStorageFactory { async fn load_batch( &self, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>; - - /// A workaround for Rust's limitations on upcasting coercion. See - /// https://github.com/rust-lang/rust/issues/65991. - /// - /// Should always be implementable as [`StorageLoader`] requires [`ReadStorageFactory`]. - fn upcast(self: Arc) -> Arc; + ) -> anyhow::Result>; } /// Data needed to execute an L1 batch. @@ -85,13 +79,6 @@ struct State { storage: BTreeMap, } -impl State { - /// Whether this state can serve as a `ReadStorage` source for the given L1 batch. - fn can_be_used_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> bool { - l1_batch_number == self.l1_batch_number || self.storage.contains_key(&l1_batch_number) - } -} - impl VmRunnerStorage { /// Creates a new VM runner storage using provided Postgres pool and RocksDB path. pub async fn new( @@ -133,66 +120,34 @@ impl VmRunnerStorage { } } -impl VmRunnerStorage { - async fn access_storage_inner( - &self, - _stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - let state = self.state.read().await; - let Some(rocksdb) = &state.rocksdb else { - return Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) - .await - .context("Failed accessing Postgres storage")?, - )); - }; - if !state.can_be_used_for_l1_batch(l1_batch_number) { - tracing::debug!( - %l1_batch_number, - min_l1_batch = %state.l1_batch_number, - max_l1_batch = %state.storage.last_key_value().map(|(k, _)| *k).unwrap_or(state.l1_batch_number), - "Trying to access VM runner storage with L1 batch that is not available", - ); - return Ok(None); - } - let batch_diffs = state - .storage - .iter() - .filter_map(|(x, y)| { - if x <= &l1_batch_number { - Some(y.diff.clone()) - } else { - None - } - }) - .collect::>(); - Ok(Some(PgOrRocksdbStorage::RocksdbWithMemory( - RocksdbWithMemory { - rocksdb: rocksdb.clone(), - batch_diffs, - }, - ))) - } -} - #[async_trait] impl StorageLoader for VmRunnerStorage { async fn load_batch( &self, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let state = self.state.read().await; - if state.rocksdb.is_none() { + let rocksdb = if let Some(rocksdb) = &state.rocksdb { + rocksdb + } else { + drop(state); let mut conn = self.pool.connection_tagged(self.io.name()).await?; - return StorageSyncTask::::load_batch_execute_data( + let batch_data = load_batch_execute_data( &mut conn, l1_batch_number, &self.l1_batch_params_provider, self.chain_id, ) - .await; - } + .await?; + + return Ok(batch_data.map(|data| { + ( + data, + OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number - 1).into(), + ) + })); + }; + match state.storage.get(&l1_batch_number) { None => { tracing::debug!( @@ -203,25 +158,22 @@ impl StorageLoader for VmRunnerStorage { ); Ok(None) } - Some(batch_data) => Ok(Some(batch_data.execute_data.clone())), + Some(batch_data) => { + let data = batch_data.execute_data.clone(); + let batch_diffs = state + .storage + .iter() + .filter(|(&num, _)| num < l1_batch_number) + .map(|(_, data)| data.diff.clone()) + .collect::>(); + let storage = PgOrRocksdbStorage::RocksdbWithMemory(RocksdbWithMemory { + rocksdb: rocksdb.clone(), + batch_diffs, + }); + Ok(Some((data, storage.into()))) + } } } - - fn upcast(self: Arc) -> Arc { - self - } -} - -#[async_trait] -impl ReadStorageFactory for VmRunnerStorage { - async fn access_storage( - &self, - stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - self.access_storage_inner(stop_receiver, l1_batch_number) - .await - } } /// A runnable task that catches up the provided RocksDB cache instance to the latest processed @@ -335,7 +287,7 @@ impl StorageSyncTask { for l1_batch_number in max_present.0 + 1..=max_desired.0 { let latency = METRICS.storage_load_time.start(); let l1_batch_number = L1BatchNumber(l1_batch_number); - let Some(execute_data) = Self::load_batch_execute_data( + let Some(execute_data) = load_batch_execute_data( &mut conn, l1_batch_number, &self.l1_batch_params_provider, @@ -375,44 +327,44 @@ impl StorageSyncTask { drop(conn); } } +} - async fn load_batch_execute_data( - conn: &mut Connection<'_, Core>, - l1_batch_number: L1BatchNumber, - l1_batch_params_provider: &L1BatchParamsProvider, - chain_id: L2ChainId, - ) -> anyhow::Result> { - let first_l2_block_in_batch = l1_batch_params_provider - .load_first_l2_block_in_batch(conn, l1_batch_number) - .await - .with_context(|| { - format!( - "Failed loading first L2 block for L1 batch #{}", - l1_batch_number - ) - })?; - let Some(first_l2_block_in_batch) = first_l2_block_in_batch else { - return Ok(None); - }; - let (system_env, l1_batch_env) = l1_batch_params_provider - .load_l1_batch_params( - conn, - &first_l2_block_in_batch, - // `validation_computational_gas_limit` is only relevant when rejecting txs, but we - // are re-executing so none of them should be rejected - u32::MAX, - chain_id, +pub(crate) async fn load_batch_execute_data( + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + l1_batch_params_provider: &L1BatchParamsProvider, + chain_id: L2ChainId, +) -> anyhow::Result> { + let first_l2_block_in_batch = l1_batch_params_provider + .load_first_l2_block_in_batch(conn, l1_batch_number) + .await + .with_context(|| { + format!( + "Failed loading first L2 block for L1 batch #{}", + l1_batch_number ) - .await - .with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?; - let l2_blocks = conn - .transactions_dal() - .get_l2_blocks_to_execute_for_l1_batch(l1_batch_number) - .await?; - Ok(Some(BatchExecuteData { - l1_batch_env, - system_env, - l2_blocks, - })) - } + })?; + let Some(first_l2_block_in_batch) = first_l2_block_in_batch else { + return Ok(None); + }; + let (system_env, l1_batch_env) = l1_batch_params_provider + .load_l1_batch_params( + conn, + &first_l2_block_in_batch, + // `validation_computational_gas_limit` is only relevant when rejecting txs, but we + // are re-executing so none of them should be rejected + u32::MAX, + chain_id, + ) + .await + .with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?; + let l2_blocks = conn + .transactions_dal() + .get_l2_blocks_to_execute_for_l1_batch(l1_batch_number) + .await?; + Ok(Some(BatchExecuteData { + l1_batch_env, + system_env, + l2_blocks, + })) } diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 50acba610ba5..e9dbebfa24d5 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -9,6 +9,7 @@ use zksync_node_test_utils::{ create_l1_batch_metadata, create_l2_block, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, }; +use zksync_state::{OwnedPostgresStorage, OwnedStorage}; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_test_account::Account; use zksync_types::{ @@ -17,16 +18,48 @@ use zksync_types::{ get_intrinsic_constants, l2::L2Tx, utils::storage_key_for_standard_token_balance, - AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, ProtocolVersionId, StorageKey, - StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, + AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, + StorageKey, StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, }; use zksync_utils::u256_to_h256; +use zksync_vm_utils::storage::L1BatchParamsProvider; -use super::{OutputHandlerFactory, VmRunnerIo}; +use super::{BatchExecuteData, OutputHandlerFactory, VmRunnerIo}; +use crate::storage::{load_batch_execute_data, StorageLoader}; mod output_handler; mod process; mod storage; +mod storage_writer; + +const TEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Simplified storage loader that always gets data from Postgres (i.e., doesn't do RocksDB caching). +#[derive(Debug)] +struct PostgresLoader(ConnectionPool); + +#[async_trait] +impl StorageLoader for PostgresLoader { + async fn load_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let mut conn = self.0.connection().await?; + let Some(data) = load_batch_execute_data( + &mut conn, + l1_batch_number, + &L1BatchParamsProvider::new(), + L2ChainId::default(), + ) + .await? + else { + return Ok(None); + }; + + let storage = OwnedPostgresStorage::new(self.0.clone(), l1_batch_number - 1); + Ok(Some((data, storage.into()))) + } +} #[derive(Debug, Default)] struct IoMock { @@ -141,7 +174,7 @@ mod wait { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct TestOutputFactory { delays: HashMap, } @@ -152,11 +185,11 @@ impl OutputHandlerFactory for TestOutputFactory { &mut self, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { - let delay = self.delays.get(&l1_batch_number).copied(); #[derive(Debug)] struct TestOutputHandler { delay: Option, } + #[async_trait] impl StateKeeperOutputHandler for TestOutputHandler { async fn handle_l2_block( @@ -176,6 +209,8 @@ impl OutputHandlerFactory for TestOutputFactory { Ok(()) } } + + let delay = self.delays.get(&l1_batch_number).copied(); Ok(Box::new(TestOutputHandler { delay })) } } @@ -289,12 +324,11 @@ async fn store_l1_batches( // Insert a fictive L2 block at the end of the batch let mut fictive_l2_block = create_l2_block(l2_block_number.0); - let mut digest = L2BlockHasher::new( + let digest = L2BlockHasher::new( fictive_l2_block.number, fictive_l2_block.timestamp, last_l2_block_hash, ); - digest.push_tx_hash(tx.hash()); fictive_l2_block.hash = digest.finalize(ProtocolVersionId::latest()); l2_block_number += 1; conn.blocks_dal().insert_l2_block(&fictive_l2_block).await?; @@ -337,9 +371,7 @@ async fn store_l1_batches( Ok(batches) } -async fn fund(pool: &ConnectionPool, accounts: &[Account]) { - let mut conn = pool.connection().await.unwrap(); - +async fn fund(conn: &mut Connection<'_, Core>, accounts: &[Account]) { let eth_amount = U256::from(10).pow(U256::from(32)); //10^32 wei for account in accounts { diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index 664bdeebf855..7ea1335db71f 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -1,26 +1,20 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tempfile::TempDir; +use test_casing::test_casing; use tokio::sync::{watch, RwLock}; use zksync_dal::{ConnectionPool, Core}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_state_keeper::MainBatchExecutor; use zksync_test_account::Account; -use zksync_types::L2ChainId; +use zksync_types::{L1BatchNumber, L2ChainId}; -use crate::{ - tests::{fund, store_l1_batches, wait, IoMock, TestOutputFactory}, - ConcurrentOutputHandlerFactory, VmRunner, VmRunnerStorage, -}; +use super::*; +use crate::{ConcurrentOutputHandlerFactory, VmRunner, VmRunnerStorage}; -// Testing more than a one-batch scenario is pretty difficult as that requires storage to have -// completely valid state after each L2 block execution (current block number, hash, rolling txs -// hash etc written to the correct places). To achieve this we could run state keeper e2e but that -// is pretty difficult to set up. -// -// Instead, we rely on integration tests to verify the correctness of VM runner main process. -#[tokio::test] -async fn process_one_batch() -> anyhow::Result<()> { +#[test_casing(4, [(1, 1), (5, 1), (5, 3), (5, 5)])] +#[tokio::test(flavor = "multi_thread")] +async fn process_batches((batch_count, window): (u32, u32)) -> anyhow::Result<()> { let rocksdb_dir = TempDir::new()?; let connection_pool = ConnectionPool::::test_pool().await; let mut conn = connection_pool.connection().await.unwrap(); @@ -28,23 +22,24 @@ async fn process_one_batch() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - let alice = Account::random(); - let bob = Account::random(); - let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + let mut accounts = vec![Account::random(), Account::random()]; + fund(&mut conn, &accounts).await; - let batches = store_l1_batches( + store_l1_batches( &mut conn, - 1..=1, + 1..=batch_count, genesis_params.base_system_contracts().hashes(), &mut accounts, ) .await?; drop(conn); + // Fill in missing storage logs for all batches so that running VM for all of them works correctly. + storage_writer::write_storage_logs(connection_pool.clone()).await; + let io = Arc::new(RwLock::new(IoMock { current: 0.into(), - max: 1, + max: window, })); let (storage, task) = VmRunnerStorage::new( connection_pool.clone(), @@ -53,7 +48,7 @@ async fn process_one_batch() -> anyhow::Result<()> { L2ChainId::default(), ) .await?; - let (_, stop_receiver) = watch::channel(false); + let (_stop_sender, stop_receiver) = watch::channel(false); let storage_stop_receiver = stop_receiver.clone(); tokio::task::spawn(async move { task.run(storage_stop_receiver).await.unwrap() }); let test_factory = TestOutputFactory { @@ -75,9 +70,6 @@ async fn process_one_batch() -> anyhow::Result<()> { ); tokio::task::spawn(async move { vm_runner.run(&stop_receiver).await.unwrap() }); - for batch in batches { - wait::for_batch(io.clone(), batch.number, Duration::from_secs(1)).await?; - } - + wait::for_batch_progressively(io, L1BatchNumber(batch_count), TEST_TIMEOUT).await?; Ok(()) } diff --git a/core/node/vm_runner/src/tests/storage.rs b/core/node/vm_runner/src/tests/storage.rs index 52de43801ff0..90aeda335e1d 100644 --- a/core/node/vm_runner/src/tests/storage.rs +++ b/core/node/vm_runner/src/tests/storage.rs @@ -9,7 +9,7 @@ use tokio::{ }; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_state::{PgOrRocksdbStorage, PostgresStorage, ReadStorage, ReadStorageFactory}; +use zksync_state::{OwnedStorage, PostgresStorage, ReadStorage}; use zksync_test_account::Account; use zksync_types::{AccountTreeId, L1BatchNumber, L2ChainId, StorageKey}; @@ -59,7 +59,7 @@ impl VmRunnerStorage { async fn load_batch_eventually( &self, number: L1BatchNumber, - ) -> anyhow::Result { + ) -> anyhow::Result<(BatchExecuteData, OwnedStorage)> { (|| async { self.load_batch(number) .await? @@ -69,22 +69,6 @@ impl VmRunnerStorage { .await } - async fn access_storage_eventually( - &self, - stop_receiver: &watch::Receiver, - number: L1BatchNumber, - ) -> anyhow::Result> { - (|| async { - self.access_storage(stop_receiver, number) - .await? - .ok_or_else(|| { - anyhow::anyhow!("Storage for batch #{} is not available yet", number) - }) - }) - .retry(&ExponentialBuilder::default()) - .await - } - async fn ensure_batch_unloads_eventually(&self, number: L1BatchNumber) -> anyhow::Result<()> { (|| async { Ok(anyhow::ensure!( @@ -121,11 +105,11 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); // Generate 10 batches worth of data and persist it in Postgres let batches = store_l1_batches( @@ -144,7 +128,7 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { let storage = tester.create_storage(io_mock.clone()).await?; // Check that existing batches are returned in the exact same order with the exact same data for batch in &batches { - let batch_data = storage.load_batch_eventually(batch.number).await?; + let (batch_data, _) = storage.load_batch_eventually(batch.number).await?; let mut conn = connection_pool.connection().await.unwrap(); let (previous_batch_hash, _) = conn .blocks_dal() @@ -212,11 +196,11 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); let mut tester = StorageTester::new(connection_pool.clone()); let io_mock = Arc::new(RwLock::new(IoMock::default())); @@ -235,14 +219,8 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { io_mock.write().await.max += 1; // Load batch and mark it as processed - assert_eq!( - storage - .load_batch_eventually(L1BatchNumber(1)) - .await? - .l1_batch_env - .number, - L1BatchNumber(1) - ); + let (batch_data, _) = storage.load_batch_eventually(L1BatchNumber(1)).await?; + assert_eq!(batch_data.l1_batch_env.number, L1BatchNumber(1)); io_mock.write().await.current += 1; // No more batches after that @@ -259,15 +237,8 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { io_mock.write().await.max += 1; // Load batch and mark it as processed - - assert_eq!( - storage - .load_batch_eventually(L1BatchNumber(2)) - .await? - .l1_batch_env - .number, - L1BatchNumber(2) - ); + let (batch_data, _) = storage.load_batch_eventually(L1BatchNumber(2)).await?; + assert_eq!(batch_data.l1_batch_env.number, L1BatchNumber(2)); io_mock.write().await.current += 1; // No more batches after that @@ -284,11 +255,11 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); // Generate 10 batches worth of data and persist it in Postgres let batch_range = 1..=10; @@ -311,7 +282,6 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { .await; drop(conn); - let (_sender, receiver) = watch::channel(false); let mut tester = StorageTester::new(connection_pool.clone()); let io_mock = Arc::new(RwLock::new(IoMock { current: 0.into(), @@ -321,7 +291,7 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { let handle = tokio::task::spawn_blocking(move || { let vm_runner_storage = rt_handle.block_on(async { tester.create_storage(io_mock.clone()).await.unwrap() }); - for i in 1..=10 { + for i in 1..=9 { let mut conn = rt_handle.block_on(connection_pool.connection()).unwrap(); let (_, last_l2_block_number) = rt_handle .block_on( @@ -331,11 +301,13 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { .unwrap(); let mut pg_storage = PostgresStorage::new(rt_handle.clone(), conn, last_l2_block_number, true); - let mut vm_storage = rt_handle.block_on(async { - vm_runner_storage - .access_storage_eventually(&receiver, L1BatchNumber(i)) - .await - })?; + let (_, vm_storage) = rt_handle + .block_on(vm_runner_storage.load_batch_eventually(L1BatchNumber(i + 1)))?; + let mut vm_storage = match vm_storage { + OwnedStorage::Lending(ref storage) => rt_handle.block_on(storage.borrow()).unwrap(), + OwnedStorage::Static(storage) => storage, + }; + // Check that both storages have identical key-value pairs written in them for storage_log in &storage_logs { let storage_key = StorageKey::new( diff --git a/core/node/vm_runner/src/tests/storage_writer.rs b/core/node/vm_runner/src/tests/storage_writer.rs new file mode 100644 index 000000000000..4c7a6e0d6612 --- /dev/null +++ b/core/node/vm_runner/src/tests/storage_writer.rs @@ -0,0 +1,215 @@ +use tokio::sync::watch; +use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_state_keeper::MainBatchExecutor; + +use super::*; +use crate::{ConcurrentOutputHandlerFactory, VmRunner}; + +#[derive(Debug, Clone)] +struct StorageWriterIo { + last_processed_batch: Arc>, + pool: ConnectionPool, +} + +impl StorageWriterIo { + fn batch(&self) -> L1BatchNumber { + *self.last_processed_batch.borrow() + } +} + +#[async_trait] +impl VmRunnerIo for StorageWriterIo { + fn name(&self) -> &'static str { + "storage_writer" + } + + async fn latest_processed_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(self.batch()) + } + + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + let sealed_batch = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await? + .expect("No L1 batches in storage"); + Ok(sealed_batch.min(self.batch() + 1)) + } + + async fn mark_l1_batch_as_processing( + &self, + _conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + assert_eq!(l1_batch_number, self.batch() + 1); + Ok(()) + } + + async fn mark_l1_batch_as_completed( + &self, + _conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + assert_eq!(l1_batch_number, self.batch()); + Ok(()) + } +} + +impl StorageWriterIo { + async fn write_storage_logs( + conn: &mut Connection<'_, Core>, + updates_manager: &UpdatesManager, + ) -> anyhow::Result<()> { + let storage_logs = updates_manager + .l2_block + .storage_logs + .iter() + .filter_map(|log| log.log.is_write().then_some(log.log)); + let storage_logs: Vec<_> = storage_logs.collect(); + conn.storage_logs_dal() + .append_storage_logs(updates_manager.l2_block.number, &storage_logs) + .await?; + Ok(()) + } +} + +#[async_trait] +impl StateKeeperOutputHandler for StorageWriterIo { + async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + let mut conn = self.pool.connection().await?; + Self::write_storage_logs(&mut conn, updates_manager).await?; + Ok(()) + } + + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { + let mut conn = self.pool.connection().await?; + // Storage logs are added to the fictive block *after* `handle_l2_block()` is called for it, so we need to call it again here. + Self::write_storage_logs(&mut conn, &updates_manager).await?; + + let finished_batch = updates_manager + .l1_batch + .finished + .as_ref() + .expect("L1 batch is not finished"); + let state_diffs = finished_batch.state_diffs.as_ref().expect("no state diffs"); + let initial_writes: Vec<_> = state_diffs + .iter() + .filter(|diff| diff.is_write_initial()) + .map(|diff| { + H256(StorageKey::raw_hashed_key( + &diff.address, + &u256_to_h256(diff.key), + )) + }) + .collect(); + conn.storage_logs_dedup_dal() + .insert_initial_writes(updates_manager.l1_batch.number, &initial_writes) + .await?; + + self.last_processed_batch + .send_replace(updates_manager.l1_batch.number); + Ok(()) + } +} + +#[async_trait] +impl OutputHandlerFactory for StorageWriterIo { + async fn create_handler( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + assert_eq!(l1_batch_number, self.batch() + 1); + Ok(Box::new(self.clone())) + } +} + +/// Writes missing storage logs into Postgres by executing all transactions from it. Useful both for testing `VmRunner`, +/// and to fill the storage for multi-batch tests for other components. +pub(super) async fn write_storage_logs(pool: ConnectionPool) { + let mut conn = pool.connection().await.unwrap(); + let sealed_batch = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap() + .expect("No L1 batches in storage"); + drop(conn); + let io = Box::new(StorageWriterIo { + last_processed_batch: Arc::new(watch::channel(L1BatchNumber(0)).0), + pool: pool.clone(), + }); + let mut processed_batch = io.last_processed_batch.subscribe(); + + let loader = Arc::new(PostgresLoader(pool.clone())); + let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let vm_runner = VmRunner::new(pool, io.clone(), loader, io, batch_executor); + let (stop_sender, stop_receiver) = watch::channel(false); + let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); + + processed_batch + .wait_for(|&number| number >= sealed_batch) + .await + .unwrap(); + stop_sender.send_replace(true); + vm_runner_handle.await.unwrap().unwrap(); +} + +#[tokio::test] +async fn storage_writer_works() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let genesis_params = GenesisParams::mock(); + insert_genesis_batch(&mut conn, &genesis_params) + .await + .unwrap(); + + let mut accounts = [Account::random()]; + fund(&mut conn, &accounts).await; + store_l1_batches( + &mut conn, + 1..=5, + genesis_params.base_system_contracts().hashes(), + &mut accounts, + ) + .await + .unwrap(); + drop(conn); + + write_storage_logs(pool.clone()).await; + + // Re-run the VM on all batches to check that storage logs are persisted correctly + let (stop_sender, stop_receiver) = watch::channel(false); + let io = Arc::new(RwLock::new(IoMock { + current: L1BatchNumber(0), + max: 5, + })); + let loader = Arc::new(PostgresLoader(pool.clone())); + let (output_factory, output_factory_task) = + ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), TestOutputFactory::default()); + let output_factory_handle = tokio::spawn(output_factory_task.run(stop_receiver.clone())); + let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let vm_runner = VmRunner::new( + pool, + Box::new(io.clone()), + loader, + Box::new(output_factory), + batch_executor, + ); + + let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); + wait::for_batch_progressively(io, L1BatchNumber(5), TEST_TIMEOUT) + .await + .unwrap(); + stop_sender.send_replace(true); + output_factory_handle.await.unwrap().unwrap(); + vm_runner_handle.await.unwrap().unwrap(); +}