Skip to content

Commit

Permalink
refactor(state-keeper): Use owned VM storage for batch executor (#2559)
Browse files Browse the repository at this point in the history
## What ❔

- Refactors `ReadStorageFactory` and `BatchExecutor`.
- Tests the VM runner on multiple batch workloads and fixes bugs
discovered by the tests (e.g., a data race in the storage loader).

## Why ❔

- `ReadStorageFactory` and `BatchExecutor` become more composable and
work in VM runner with less crutches.
- More tests allow fixing bugs in VM runner earlier.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.

fix(vm-runner): Fix data race in storage loader
test(vm-runner): Test VM runner with multiple L1 batches
  • Loading branch information
slowli authored Aug 6, 2024
1 parent b45aa91 commit 1810b78
Show file tree
Hide file tree
Showing 23 changed files with 565 additions and 427 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ pub use self::{
RocksdbStorage, RocksdbStorageBuilder, RocksdbStorageOptions, StateKeeperColumnFamily,
},
shadow_storage::ShadowStorage,
storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory},
storage_factory::{
BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory,
RocksdbWithMemory,
},
storage_view::{StorageView, StorageViewCache, StorageViewMetrics},
witness::WitnessStorage,
};
Expand Down
6 changes: 6 additions & 0 deletions core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ impl RocksdbStorageBuilder {
) -> anyhow::Result<()> {
self.0.revert(storage, last_l1_batch_to_keep).await
}

/// Returns the underlying storage without any checks. Should only be used in test code.
#[doc(hidden)]
pub fn build_unchecked(self) -> RocksdbStorage {
self.0
}
}

impl RocksdbStorage {
Expand Down
100 changes: 75 additions & 25 deletions core/lib/state/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,33 @@ use crate::{
PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily,
};

/// Factory that can produce a [`ReadStorage`] implementation on demand.
/// Factory that can produce [`OwnedStorage`] instances on demand.
#[async_trait]
pub trait ReadStorageFactory: Debug + Send + Sync + 'static {
/// Creates a [`PgOrRocksdbStorage`] entity over either a Postgres connection or RocksDB
/// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB
/// instance. The specific criteria on which one are left up to the implementation.
///
/// The idea is that in either case this provides a valid [`ReadStorage`] implementation
/// that can be used by the caller.
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<PgOrRocksdbStorage<'_>>>;
) -> anyhow::Result<Option<OwnedStorage>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let storage = OwnedPostgresStorage::new(self.clone(), l1_batch_number);
Ok(Some(storage.into()))
}
}

/// DB difference introduced by one batch.
Expand All @@ -47,29 +61,31 @@ pub struct RocksdbWithMemory {
pub batch_diffs: Vec<BatchDiff>,
}

/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`]
/// underneath.
/// Owned Postgres-backed VM storage for a certain L1 batch.
#[derive(Debug)]
pub enum PgOrRocksdbStorage<'a> {
/// Implementation over a Postgres connection.
Postgres(PostgresStorage<'a>),
/// Implementation over a RocksDB cache instance.
Rocksdb(RocksdbStorage),
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
pub struct OwnedPostgresStorage {
connection_pool: ConnectionPool<Core>,
l1_batch_number: L1BatchNumber,
}

impl<'a> PgOrRocksdbStorage<'a> {
impl OwnedPostgresStorage {
/// Creates a VM storage for the specified batch number.
pub fn new(connection_pool: ConnectionPool<Core>, l1_batch_number: L1BatchNumber) -> Self {
Self {
connection_pool,
l1_batch_number,
}
}

/// Returns a [`ReadStorage`] implementation backed by Postgres
///
/// # Errors
///
/// Propagates Postgres errors.
pub async fn access_storage_pg(
pool: &'a ConnectionPool<Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<PgOrRocksdbStorage<'a>> {
let mut connection = pool.connection().await?;
pub async fn borrow(&self) -> anyhow::Result<PgOrRocksdbStorage<'_>> {
let l1_batch_number = self.l1_batch_number;
let mut connection = self.connection_pool.connection().await?;

let l2_block_number = if let Some((_, l2_block_number)) = connection
.blocks_dal()
.get_l2_block_range_of_l1_batch(l1_batch_number)
Expand All @@ -85,9 +101,8 @@ impl<'a> PgOrRocksdbStorage<'a> {
.context("Could not find snapshot, no state available")?;
if snapshot_recovery.l1_batch_number != l1_batch_number {
anyhow::bail!(
"Snapshot contains L1 batch #{} while #{} was expected",
snapshot_recovery.l1_batch_number,
l1_batch_number
"Snapshot contains L1 batch #{} while #{l1_batch_number} was expected",
snapshot_recovery.l1_batch_number
);
}
snapshot_recovery.l2_block_number
Expand All @@ -99,19 +114,54 @@ impl<'a> PgOrRocksdbStorage<'a> {
.into(),
)
}
}

/// Owned version of [`PgOrRocksdbStorage`]. It is thus possible to send to blocking tasks for VM execution.
#[derive(Debug)]
pub enum OwnedStorage {
/// Readily initialized storage with a static lifetime.
Static(PgOrRocksdbStorage<'static>),
/// Storage that must be `borrow()`ed from.
Lending(OwnedPostgresStorage),
}

impl From<OwnedPostgresStorage> for OwnedStorage {
fn from(storage: OwnedPostgresStorage) -> Self {
Self::Lending(storage)
}
}

impl From<PgOrRocksdbStorage<'static>> for OwnedStorage {
fn from(storage: PgOrRocksdbStorage<'static>) -> Self {
Self::Static(storage)
}
}

/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`]
/// underneath.
#[derive(Debug)]
pub enum PgOrRocksdbStorage<'a> {
/// Implementation over a Postgres connection.
Postgres(PostgresStorage<'a>),
/// Implementation over a RocksDB cache instance.
Rocksdb(RocksdbStorage),
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
}

impl PgOrRocksdbStorage<'static> {
/// Catches up RocksDB synchronously (i.e. assumes the gap is small) and
/// returns a [`ReadStorage`] implementation backed by caught-up RocksDB.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
pub async fn access_storage_rocksdb(
pub async fn rocksdb(
connection: &mut Connection<'_, Core>,
rocksdb: RocksDB<StateKeeperColumnFamily>,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<PgOrRocksdbStorage<'a>>> {
) -> anyhow::Result<Option<Self>> {
tracing::debug!("Catching up RocksDB synchronously");
let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb);
let rocksdb = rocksdb_builder
Expand Down
7 changes: 2 additions & 5 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use zksync_state::RocksdbStorageOptions;
use zksync_state_keeper::{
io::{IoCursor, L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::{
fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory,
MockBatchExecutor,
},
testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor},
AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence,
TreeWritesPersistence, ZkSyncStateKeeper,
};
Expand Down Expand Up @@ -638,7 +635,7 @@ impl StateKeeperRunner {
.with_handler(Box::new(tree_writes_persistence))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
Arc::new(self.pool.0.clone()),
)
.run()
.await
Expand Down
6 changes: 3 additions & 3 deletions core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_node_test_utils::{
use zksync_state_keeper::{
io::{L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
testonly::test_batch_executor::TestBatchExecutorBuilder,
OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper,
};
use zksync_types::{
Expand Down Expand Up @@ -113,7 +113,7 @@ impl StateKeeperHandles {

tokio::spawn(l2_block_sealer.run());
let io = ExternalIO::new(
pool,
pool.clone(),
actions,
Box::new(main_node_client),
L2ChainId::default(),
Expand All @@ -132,7 +132,7 @@ impl StateKeeperHandles {
Box::new(batch_executor_base),
output_handler,
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
Arc::new(pool),
);

Self {
Expand Down
2 changes: 1 addition & 1 deletion core/node/state_keeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ zksync_base_token_adjuster.workspace = true

anyhow.workspace = true
async-trait.workspace = true
tempfile.workspace = true # used in `testonly` module
tokio = { workspace = true, features = ["time"] }
thiserror.workspace = true
tracing.workspace = true
Expand All @@ -45,7 +46,6 @@ hex.workspace = true
assert_matches.workspace = true
test-casing.workspace = true
futures.workspace = true
tempfile.workspace = true

zksync_eth_client.workspace = true
zksync_system_constants.workspace = true
36 changes: 13 additions & 23 deletions core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use once_cell::sync::OnceCell;
use tokio::{
runtime::Handle,
sync::{mpsc, watch},
};
use tokio::{runtime::Handle, sync::mpsc};
use zksync_multivm::{
interface::{
ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv,
Expand All @@ -17,7 +13,7 @@ use zksync_multivm::{
MultiVMTracer, VmInstance,
};
use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS};
use zksync_state::{ReadStorage, ReadStorageFactory, StorageView, WriteStorage};
use zksync_state::{OwnedStorage, ReadStorage, StorageView, WriteStorage};
use zksync_types::{vm_trace::Call, Transaction};
use zksync_utils::bytecode::CompressedBytecodeInfo;

Expand Down Expand Up @@ -50,15 +46,13 @@ impl MainBatchExecutor {
}
}

#[async_trait]
impl BatchExecutor for MainBatchExecutor {
async fn init_batch(
fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
storage: OwnedStorage,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Option<BatchExecutorHandle> {
) -> BatchExecutorHandle {
// Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
// until a previous command is processed), capacity 1 is enough for the commands channel.
let (commands_sender, commands_receiver) = mpsc::channel(1);
Expand All @@ -68,21 +62,17 @@ impl BatchExecutor for MainBatchExecutor {
commands: commands_receiver,
};

let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
if let Some(storage) = Handle::current()
.block_on(
storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1),
)
.context("failed accessing state keeper storage")?
{
executor.run(storage, l1_batch_params, system_env);
} else {
tracing::info!("Interrupted while trying to access state keeper storage");
}
let storage = match storage {
OwnedStorage::Static(storage) => storage,
OwnedStorage::Lending(ref storage) => Handle::current()
.block_on(storage.borrow())
.context("failed accessing state keeper storage")?,
};
executor.run(storage, l1_batch_params, system_env);
anyhow::Ok(())
});
Some(BatchExecutorHandle::from_raw(handle, commands_sender))
BatchExecutorHandle::from_raw(handle, commands_sender)
}
}

Expand Down
13 changes: 5 additions & 8 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::{error::Error as StdError, fmt, sync::Arc};

use anyhow::Context as _;
use async_trait::async_trait;
use tokio::{
sync::{mpsc, oneshot, watch},
sync::{mpsc, oneshot},
task::JoinHandle,
};
use zksync_multivm::interface::{
FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs,
};
use zksync_state::{ReadStorageFactory, StorageViewCache};
use zksync_state::{OwnedStorage, StorageViewCache};
use zksync_types::{vm_trace::Call, Transaction};
use zksync_utils::bytecode::CompressedBytecodeInfo;

Expand Down Expand Up @@ -55,15 +54,13 @@ impl TxExecutionResult {
/// An abstraction that allows us to create different kinds of batch executors.
/// The only requirement is to return a [`BatchExecutorHandle`], which does its work
/// by communicating with the externally initialized thread.
#[async_trait]
pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
async fn init_batch(
fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
storage: OwnedStorage,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
) -> Option<BatchExecutorHandle>;
) -> BatchExecutorHandle;
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 1810b78

Please sign in to comment.