From b45aa9168dd66d07ca61c8bb4c01f73dda822040 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 6 Aug 2024 09:37:43 +0100 Subject: [PATCH 1/2] feat: Poll the main node API for attestation status - relaxed (BFT-496) (#2583) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Reverts https://github.com/matter-labs/zksync-era/pull/2574 to re-establish the attestation status API integration, but also bumps the era-consensus version to [0.1.0-rc.6](https://github.com/matter-labs/era-consensus/pull/174) which has changes to make it backwards compatible: * the `Executor` is started without waiting for the initial attestation batch number to become available, so as not to stop the node completely from participating in gossip without the main node API (this behaviour is internal to the `AttesterStatusRunner`) * invalid batch vote signatures are not rejected, just ignored, so the node doesn't break connection with a peer who signed a different payload while the feature is in flux TODO: - [x] Update once `0.1.0-rc.6` is published to crates.io ## Why ❔ The first reason is to not be so eager to drop a gossip peer for incompatibilities between newer and existing versions of the software while we're actively working on the features involved. The second is that the rollout strategy to mainnet seems to be to roll out external nodes first, main nodes second. It is expected that the external node should work with an older version of the main node, or at least not fail to start; it is okay if a new feature doesn't work. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --------- Co-authored-by: Bruno França --- Cargo.lock | 41 ++++++------- Cargo.toml | 20 +++---- core/lib/dal/src/consensus_dal.rs | 1 + core/node/consensus/src/config.rs | 1 + core/node/consensus/src/en.rs | 52 +++++++++++++++- core/node/consensus/src/mn.rs | 19 +++++- core/node/consensus/src/storage/connection.rs | 11 ++++ core/node/consensus/src/storage/store.rs | 59 ++++++------------- core/node/consensus/src/tests.rs | 17 +++++- prover/Cargo.lock | 28 ++++----- zk_toolbox/Cargo.lock | 16 ++--- zk_toolbox/Cargo.toml | 8 ++- 12 files changed, 173 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4dd4713dd303..1852386b18c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8142,9 +8142,9 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50302b77192891256d180ff2551dc0c3bc4144958b49e9a16c50a0dc218958ba" +checksum = "209b83578357184ab72af4d4cb2eca76f85e5f2f35d739a47e3fd5931eb9252d" dependencies = [ "anyhow", "once_cell", @@ -8177,9 +8177,9 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2325c7486a8280db1c26c10020350bead6eecb3de03f8bbfd878060f000cdce7" +checksum = "a7c7f1613bdb9d02b21d66ab60bdf6523456dcc5006290cd67702d3f729f549e" dependencies = [ "anyhow", "async-trait", @@ -8199,9 +8199,9 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5cb8ed0d59593f6147085b77142628e459ba673aa4d48fce064d5b96e31eb36" +checksum = "9e1abf1f3d9c8109da32a6d5e61a2a64a61b0bff90fdd355992522a4e8a57e69" dependencies = [ "anyhow", "blst", @@ -8223,11 +8223,12 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "247b70ec255781b3b740acb744236e771a192922ffbaa52c462b84c4ea67609f" +checksum = "8495b9056a895ee4e720b803d3e26ffad18776ae374805bab34a5ff5b648be6e" dependencies = [ "anyhow", + "async-trait", "rand 0.8.5", "tracing", "vise", @@ -8243,9 +8244,9 @@ dependencies = [ [[package]] name = "zksync_consensus_network" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f10626b79885a9b096cd19ee83d85ef9b0554f061a9db6946f2b7c9d1b2f49ea" +checksum = "42ec4a076c63c76599711a7dc28cdf3a7923b6bc7720bc572ea11e92fb2b526f" dependencies = [ "anyhow", "async-trait", @@ -8278,9 +8279,9 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ffe3e47d99eb943eb94f2f5c9d929b1192bf3e8d1434de0fa6f0090f9c1197e" +checksum = "32e0b04d43a542a3bb1af0ac4c0a17acf6b743607c3cb9028192df0c7d2f5b24" dependencies = [ "anyhow", "bit-vec", @@ -8300,9 +8301,9 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ae9a0ec64ce9c0af346e50cc87dc257c30259101ce9675b408cb883e096087" +checksum = "0c39f79628bd6685f7ec5561874c007f5d26693d6ba7e5595dfa260981e8f006" dependencies = [ "anyhow", "async-trait", @@ -8320,9 +8321,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24dc6135abeefa80f617eb2903fe43d137d362bf673f0651b4894b17069d1fb1" +checksum = "c4514629a34abdf943ef911c16228dfec656edb02d8412db4febd4df5ccf3f91" dependencies = [ "anyhow", "rand 0.8.5", @@ -9260,9 +9261,9 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e7c7820f290db565a1b4ff73aa1175cd7d31498fca8d859eb5aceebd33468c" +checksum = "53128384270314bfbd4e044c15138af63cb3a505ab95bb3339f3b866ccbe211c" dependencies = [ "anyhow", "bit-vec", @@ -9281,9 +9282,9 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6cafeec1150ae91f1a37c8f0dce6b71b92b93e0c4153d32b4c37e2fd71bce2f" +checksum = "0d7dfb4dcdd48ab5fa1ccff25f585d73b58cf95e0fb74e96618dd666f198a005" dependencies = [ "anyhow", "heck 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 253ffea824b9..4210911a2596 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -216,16 +216,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" } zk_evm_1_5_0 = { package = "zk_evm", version = "=0.150.0" } # Consensus dependencies. -zksync_concurrency = "=0.1.0-rc.4" -zksync_consensus_bft = "=0.1.0-rc.4" -zksync_consensus_crypto = "=0.1.0-rc.4" -zksync_consensus_executor = "=0.1.0-rc.4" -zksync_consensus_network = "=0.1.0-rc.4" -zksync_consensus_roles = "=0.1.0-rc.4" -zksync_consensus_storage = "=0.1.0-rc.4" -zksync_consensus_utils = "=0.1.0-rc.4" -zksync_protobuf = "=0.1.0-rc.4" -zksync_protobuf_build = "=0.1.0-rc.4" +zksync_concurrency = "=0.1.0-rc.8" +zksync_consensus_bft = "=0.1.0-rc.8" +zksync_consensus_crypto = "=0.1.0-rc.8" +zksync_consensus_executor = "=0.1.0-rc.8" +zksync_consensus_network = "=0.1.0-rc.8" +zksync_consensus_roles = "=0.1.0-rc.8" +zksync_consensus_storage = "=0.1.0-rc.8" +zksync_consensus_utils = "=0.1.0-rc.8" +zksync_protobuf = "=0.1.0-rc.8" +zksync_protobuf_build = "=0.1.0-rc.8" # "Local" dependencies zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" } diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 28559e8a62d2..15c4c18b5d88 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> { } .await? else { + tracing::info!(%genesis.first_block, "genesis block not found"); return Ok(None); }; Ok(Some(AttestationStatus { diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index a46b1ab5afa7..c2fa13472066 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -147,5 +147,6 @@ pub(super) fn executor( rpc, // TODO: Add to configuration debug_page: None, + batch_poll_interval: time::Duration::seconds(1), }) } diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index d14893042f5b..81402cd1d47a 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -1,8 +1,13 @@ use anyhow::Context as _; +use async_trait::async_trait; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; -use zksync_consensus_executor as executor; -use zksync_consensus_roles::validator; +use zksync_consensus_executor::{ + self as executor, + attestation::{AttestationStatusClient, AttestationStatusRunner}, +}; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; +use zksync_dal::consensus_dal; use zksync_node_sync::{ fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState, }; @@ -47,6 +52,7 @@ impl EN { // Initialize genesis. let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?; + let genesis_hash = genesis.hash(); let mut conn = self.pool.connection(ctx).await.wrap("connection()")?; conn.try_update_genesis(ctx, &genesis) @@ -99,6 +105,18 @@ impl EN { .wrap("BatchStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let (attestation_status, runner) = { + AttestationStatusRunner::init( + ctx, + Box::new(MainNodeAttestationStatus(self.client.clone())), + time::Duration::seconds(5), + genesis_hash, + ) + .await + .wrap("AttestationStatusRunner::init()")? + }; + s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, @@ -111,7 +129,9 @@ impl EN { payload_manager: Box::new(store.clone()), }), attester, + attestation_status, }; + tracing::info!("running the external node executor"); executor.run(ctx).await?; Ok(()) @@ -239,3 +259,31 @@ impl EN { Ok(()) } } + +/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient]. +struct MainNodeAttestationStatus(Box>); + +#[async_trait] +impl AttestationStatusClient for MainNodeAttestationStatus { + async fn attestation_status( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + match ctx.wait(self.0.fetch_attestation_status()).await? { + Ok(Some(status)) => { + // If this fails the AttestationStatusRunner will log it an retry it later, + // but it won't stop the whole node. + let status: consensus_dal::AttestationStatus = + zksync_protobuf::serde::deserialize(&status.0) + .context("deserialize(AttestationStatus")?; + + Ok(Some((status.genesis, status.next_batch_to_attest))) + } + Ok(None) => Ok(None), + Err(err) => { + tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}"); + Ok(None) + } + } + } +} diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 29cacf7a548f..b5e76afd63e1 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor::{self as executor, Attester}; +use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester}; use zksync_consensus_roles::validator; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -61,6 +61,18 @@ pub async fn run_main_node( .wrap("BatchStore::new()")?; s.spawn_bg(runner.run(ctx)); + let (attestation_status, runner) = { + AttestationStatusRunner::init_from_store( + ctx, + batch_store.clone(), + time::Duration::seconds(1), + block_store.genesis().hash(), + ) + .await + .wrap("AttestationStatusRunner::init_from_store()")? + }; + s.spawn_bg(runner.run(ctx)); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, @@ -71,7 +83,10 @@ pub async fn run_main_node( payload_manager: Box::new(store.clone()), }), attester, + attestation_status, }; + + tracing::info!("running the main node executor"); executor.run(ctx).await }) .await diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 8c8992b4d01d..0e2039ae6bc0 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -435,4 +435,15 @@ impl<'a> Connection<'a> { last, }) } + + /// Wrapper for `consensus_dal().attestation_status()`. + pub async fn attestation_status( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().attestation_status()) + .await? + .context("attestation_status()")?) + } } diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index 70744988390d..0e08811c237f 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -523,44 +523,18 @@ impl storage::PersistentBatchStore for Store { self.batches_persisted.clone() } - /// Get the earliest L1 batch number which has to be signed by attesters. - async fn earliest_batch_number_to_sign( + /// Get the next L1 batch number which has to be signed by attesters. + async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - // This is the rough roadmap of how this logic will evolve: - // 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters. - // Still, we can re-sign more than the last batch, anticipating step 2. - // 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1). - // 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically. - // 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that. - - // Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap - // to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also - // didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might - // as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to - // try to produce as many QCs as the voting register allows, within reason. - - // The latest decision is not to store batches with gaps between in the database *of the main node*. - // Once we have an API to serve to external nodes the earliest number the main node wants them to sign, - // we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and - // while external nodes we can go from whatever the API returned. - - const NUM_BATCHES_TO_SIGN: u64 = 10; - - let Some(last_batch_number) = self + Ok(self .conn(ctx) .await? - .get_last_batch_number(ctx) + .attestation_status(ctx) .await - .wrap("get_last_batch_number")? - else { - return Ok(None); - }; - - Ok(Some(attester::BatchNumber( - last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN), - ))) + .wrap("next_batch_to_attest")? + .map(|s| s.next_batch_to_attest)) } /// Get the L1 batch QC from storage with the highest number. @@ -603,16 +577,21 @@ impl storage::PersistentBatchStore for Store { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - let Some(hash) = self - .conn(ctx) - .await? - .batch_hash(ctx, number) - .await - .wrap("batch_hash()")? - else { + let mut conn = self.conn(ctx).await?; + + let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else { return Ok(None); }; - Ok(Some(attester::Batch { number, hash })) + + let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else { + return Ok(None); + }; + + Ok(Some(attester::Batch { + number, + hash, + genesis: genesis.hash(), + })) } /// Returns the QC of the batch with the given number. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 27c3a7175c7a..8e1594393eac 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -616,8 +616,16 @@ async fn test_with_pruning(version: ProtocolVersionId) { .wait_for_batch(ctx, validator.last_sealed_batch()) .await?; + // The main node is not supposed to be pruned. In particular `ConsensusDal::attestation_status` + // does not look for where the last prune happened at, and thus if we prune the block genesis + // points at, we might never be able to start the Executor. + tracing::info!("Wait until the external node has all the batches we want to prune"); + node_pool + .wait_for_batch(ctx, to_prune.next()) + .await + .context("wait_for_batch()")?; tracing::info!("Prune some blocks and sync more"); - validator_pool + node_pool .prune_batches(ctx, to_prune) .await .context("prune_batches")?; @@ -725,9 +733,14 @@ async fn test_attestation_status_api(version: ProtocolVersionId) { let mut conn = pool.connection(ctx).await?; let number = status.next_batch_to_attest; let hash = conn.batch_hash(ctx, number).await?.unwrap(); + let genesis = conn.genesis(ctx).await?.unwrap().hash(); let cert = attester::BatchQC { signatures: attester::MultiSig::default(), - message: attester::Batch { number, hash }, + message: attester::Batch { + number, + hash, + genesis, + }, }; conn.insert_batch_certificate(ctx, &cert) .await diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 54e60640d7bb..a88155be9024 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7722,9 +7722,9 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50302b77192891256d180ff2551dc0c3bc4144958b49e9a16c50a0dc218958ba" +checksum = "209b83578357184ab72af4d4cb2eca76f85e5f2f35d739a47e3fd5931eb9252d" dependencies = [ "anyhow", "once_cell", @@ -7757,9 +7757,9 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5cb8ed0d59593f6147085b77142628e459ba673aa4d48fce064d5b96e31eb36" +checksum = "9e1abf1f3d9c8109da32a6d5e61a2a64a61b0bff90fdd355992522a4e8a57e69" dependencies = [ "anyhow", "blst", @@ -7781,9 +7781,9 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ffe3e47d99eb943eb94f2f5c9d929b1192bf3e8d1434de0fa6f0090f9c1197e" +checksum = "32e0b04d43a542a3bb1af0ac4c0a17acf6b743607c3cb9028192df0c7d2f5b24" dependencies = [ "anyhow", "bit-vec", @@ -7803,9 +7803,9 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ae9a0ec64ce9c0af346e50cc87dc257c30259101ce9675b408cb883e096087" +checksum = "0c39f79628bd6685f7ec5561874c007f5d26693d6ba7e5595dfa260981e8f006" dependencies = [ "anyhow", "async-trait", @@ -7823,9 +7823,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24dc6135abeefa80f617eb2903fe43d137d362bf673f0651b4894b17069d1fb1" +checksum = "c4514629a34abdf943ef911c16228dfec656edb02d8412db4febd4df5ccf3f91" dependencies = [ "anyhow", "rand 0.8.5", @@ -8133,9 +8133,9 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e7c7820f290db565a1b4ff73aa1175cd7d31498fca8d859eb5aceebd33468c" +checksum = "53128384270314bfbd4e044c15138af63cb3a505ab95bb3339f3b866ccbe211c" dependencies = [ "anyhow", "bit-vec", @@ -8154,9 +8154,9 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6cafeec1150ae91f1a37c8f0dce6b71b92b93e0c4153d32b4c37e2fd71bce2f" +checksum = "0d7dfb4dcdd48ab5fa1ccff25f585d73b58cf95e0fb74e96618dd666f198a005" dependencies = [ "anyhow", "heck 0.5.0", diff --git a/zk_toolbox/Cargo.lock b/zk_toolbox/Cargo.lock index cc2640f1f029..51d6af249f71 100644 --- a/zk_toolbox/Cargo.lock +++ b/zk_toolbox/Cargo.lock @@ -6347,9 +6347,9 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50302b77192891256d180ff2551dc0c3bc4144958b49e9a16c50a0dc218958ba" +checksum = "209b83578357184ab72af4d4cb2eca76f85e5f2f35d739a47e3fd5931eb9252d" dependencies = [ "anyhow", "once_cell", @@ -6381,9 +6381,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24dc6135abeefa80f617eb2903fe43d137d362bf673f0651b4894b17069d1fb1" +checksum = "c4514629a34abdf943ef911c16228dfec656edb02d8412db4febd4df5ccf3f91" dependencies = [ "anyhow", "rand", @@ -6432,9 +6432,9 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e7c7820f290db565a1b4ff73aa1175cd7d31498fca8d859eb5aceebd33468c" +checksum = "53128384270314bfbd4e044c15138af63cb3a505ab95bb3339f3b866ccbe211c" dependencies = [ "anyhow", "bit-vec", @@ -6453,9 +6453,9 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0-rc.4" +version = "0.1.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6cafeec1150ae91f1a37c8f0dce6b71b92b93e0c4153d32b4c37e2fd71bce2f" +checksum = "0d7dfb4dcdd48ab5fa1ccff25f585d73b58cf95e0fb74e96618dd666f198a005" dependencies = [ "anyhow", "heck", diff --git a/zk_toolbox/Cargo.toml b/zk_toolbox/Cargo.toml index e1b11d8495bc..5a08b56cce7d 100644 --- a/zk_toolbox/Cargo.toml +++ b/zk_toolbox/Cargo.toml @@ -30,7 +30,7 @@ types = { path = "crates/types" } zksync_config = { path = "../core/lib/config" } zksync_protobuf_config = { path = "../core/lib/protobuf_config" } zksync_basic_types = { path = "../core/lib/basic_types" } -zksync_protobuf = "=0.1.0-rc.4" +zksync_protobuf = "=0.1.0-rc.8" # External dependencies anyhow = "1.0.82" @@ -47,7 +47,11 @@ rand = "0.8.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" -sqlx = { version = "0.8.0", features = ["runtime-tokio", "migrate", "postgres"] } +sqlx = { version = "0.8.0", features = [ + "runtime-tokio", + "migrate", + "postgres", +] } strum = { version = "0.26", features = ["derive"] } thiserror = "1.0.57" tokio = { version = "1.37", features = ["full"] } From 1810b78594083f1f98d2901f3643b5687ce9d8e8 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 6 Aug 2024 14:56:32 +0300 Subject: [PATCH 2/2] refactor(state-keeper): Use owned VM storage for batch executor (#2559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Refactors `ReadStorageFactory` and `BatchExecutor`. - Tests the VM runner on multiple batch workloads and fixes bugs discovered by the tests (e.g., a data race in the storage loader). ## Why ❔ - `ReadStorageFactory` and `BatchExecutor` become more composable and work in VM runner with less crutches. - More tests allow fixing bugs in VM runner earlier. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. fix(vm-runner): Fix data race in storage loader test(vm-runner): Test VM runner with multiple L1 batches --- Cargo.lock | 1 + core/lib/state/src/lib.rs | 5 +- core/lib/state/src/rocksdb/mod.rs | 6 + core/lib/state/src/storage_factory.rs | 100 ++++++-- core/node/consensus/src/testonly.rs | 7 +- core/node/node_sync/src/tests.rs | 6 +- core/node/state_keeper/Cargo.toml | 2 +- .../src/batch_executor/main_executor.rs | 36 ++- .../state_keeper/src/batch_executor/mod.rs | 13 +- .../tests/read_storage_factory.rs | 36 +-- .../src/batch_executor/tests/tester.rs | 32 +-- core/node/state_keeper/src/keeper.rs | 41 ++-- .../state_keeper/src/state_keeper_storage.rs | 14 +- core/node/state_keeper/src/testonly/mod.rs | 17 +- .../src/testonly/test_batch_executor.rs | 74 ++---- core/node/state_keeper/src/tests/mod.rs | 2 +- core/node/vm_runner/Cargo.toml | 1 + core/node/vm_runner/src/process.rs | 22 +- core/node/vm_runner/src/storage.rs | 196 ++++++---------- core/node/vm_runner/src/tests/mod.rs | 52 ++++- core/node/vm_runner/src/tests/process.rs | 44 ++-- core/node/vm_runner/src/tests/storage.rs | 70 ++---- .../vm_runner/src/tests/storage_writer.rs | 215 ++++++++++++++++++ 23 files changed, 565 insertions(+), 427 deletions(-) create mode 100644 core/node/vm_runner/src/tests/storage_writer.rs diff --git a/Cargo.lock b/Cargo.lock index 1852386b18c9..d5fa605649df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9735,6 +9735,7 @@ dependencies = [ "once_cell", "rand 0.8.5", "tempfile", + "test-casing", "tokio", "tracing", "vise", diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index 66577841fd45..5044490c46dd 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -28,7 +28,10 @@ pub use self::{ RocksdbStorage, RocksdbStorageBuilder, RocksdbStorageOptions, StateKeeperColumnFamily, }, shadow_storage::ShadowStorage, - storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory}, + storage_factory::{ + BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, + RocksdbWithMemory, + }, storage_view::{StorageView, StorageViewCache, StorageViewMetrics}, witness::WitnessStorage, }; diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index aab33c7dfe83..61a1eb362be1 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -257,6 +257,12 @@ impl RocksdbStorageBuilder { ) -> anyhow::Result<()> { self.0.revert(storage, last_l1_batch_to_keep).await } + + /// Returns the underlying storage without any checks. Should only be used in test code. + #[doc(hidden)] + pub fn build_unchecked(self) -> RocksdbStorage { + self.0 + } } impl RocksdbStorage { diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index 307fa465a7c9..c506bf7042d1 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -11,19 +11,33 @@ use crate::{ PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily, }; -/// Factory that can produce a [`ReadStorage`] implementation on demand. +/// Factory that can produce [`OwnedStorage`] instances on demand. #[async_trait] pub trait ReadStorageFactory: Debug + Send + Sync + 'static { - /// Creates a [`PgOrRocksdbStorage`] entity over either a Postgres connection or RocksDB + /// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB /// instance. The specific criteria on which one are left up to the implementation. /// - /// The idea is that in either case this provides a valid [`ReadStorage`] implementation - /// that can be used by the caller. + /// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives + /// a stop signal; this is the only case in which `Ok(None)` should be returned. async fn access_storage( &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>>; + ) -> anyhow::Result>; +} + +/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced +/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing). +#[async_trait] +impl ReadStorageFactory for ConnectionPool { + async fn access_storage( + &self, + _stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let storage = OwnedPostgresStorage::new(self.clone(), l1_batch_number); + Ok(Some(storage.into())) + } } /// DB difference introduced by one batch. @@ -47,29 +61,31 @@ pub struct RocksdbWithMemory { pub batch_diffs: Vec, } -/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] -/// underneath. +/// Owned Postgres-backed VM storage for a certain L1 batch. #[derive(Debug)] -pub enum PgOrRocksdbStorage<'a> { - /// Implementation over a Postgres connection. - Postgres(PostgresStorage<'a>), - /// Implementation over a RocksDB cache instance. - Rocksdb(RocksdbStorage), - /// Implementation over a RocksDB cache instance with in-memory DB diffs. - RocksdbWithMemory(RocksdbWithMemory), +pub struct OwnedPostgresStorage { + connection_pool: ConnectionPool, + l1_batch_number: L1BatchNumber, } -impl<'a> PgOrRocksdbStorage<'a> { +impl OwnedPostgresStorage { + /// Creates a VM storage for the specified batch number. + pub fn new(connection_pool: ConnectionPool, l1_batch_number: L1BatchNumber) -> Self { + Self { + connection_pool, + l1_batch_number, + } + } + /// Returns a [`ReadStorage`] implementation backed by Postgres /// /// # Errors /// /// Propagates Postgres errors. - pub async fn access_storage_pg( - pool: &'a ConnectionPool, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - let mut connection = pool.connection().await?; + pub async fn borrow(&self) -> anyhow::Result> { + let l1_batch_number = self.l1_batch_number; + let mut connection = self.connection_pool.connection().await?; + let l2_block_number = if let Some((_, l2_block_number)) = connection .blocks_dal() .get_l2_block_range_of_l1_batch(l1_batch_number) @@ -85,9 +101,8 @@ impl<'a> PgOrRocksdbStorage<'a> { .context("Could not find snapshot, no state available")?; if snapshot_recovery.l1_batch_number != l1_batch_number { anyhow::bail!( - "Snapshot contains L1 batch #{} while #{} was expected", - snapshot_recovery.l1_batch_number, - l1_batch_number + "Snapshot contains L1 batch #{} while #{l1_batch_number} was expected", + snapshot_recovery.l1_batch_number ); } snapshot_recovery.l2_block_number @@ -99,19 +114,54 @@ impl<'a> PgOrRocksdbStorage<'a> { .into(), ) } +} + +/// Owned version of [`PgOrRocksdbStorage`]. It is thus possible to send to blocking tasks for VM execution. +#[derive(Debug)] +pub enum OwnedStorage { + /// Readily initialized storage with a static lifetime. + Static(PgOrRocksdbStorage<'static>), + /// Storage that must be `borrow()`ed from. + Lending(OwnedPostgresStorage), +} + +impl From for OwnedStorage { + fn from(storage: OwnedPostgresStorage) -> Self { + Self::Lending(storage) + } +} + +impl From> for OwnedStorage { + fn from(storage: PgOrRocksdbStorage<'static>) -> Self { + Self::Static(storage) + } +} + +/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] +/// underneath. +#[derive(Debug)] +pub enum PgOrRocksdbStorage<'a> { + /// Implementation over a Postgres connection. + Postgres(PostgresStorage<'a>), + /// Implementation over a RocksDB cache instance. + Rocksdb(RocksdbStorage), + /// Implementation over a RocksDB cache instance with in-memory DB diffs. + RocksdbWithMemory(RocksdbWithMemory), +} +impl PgOrRocksdbStorage<'static> { /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. /// /// # Errors /// /// Propagates RocksDB and Postgres errors. - pub async fn access_storage_rocksdb( + pub async fn rocksdb( connection: &mut Connection<'_, Core>, rocksdb: RocksDB, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { tracing::debug!("Catching up RocksDB synchronously"); let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb); let rocksdb = rocksdb_builder diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 8ed9b933d164..411d1044df3b 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -33,10 +33,7 @@ use zksync_state::RocksdbStorageOptions; use zksync_state_keeper::{ io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::{ - fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory, - MockBatchExecutor, - }, + testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor}, AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; @@ -638,7 +635,7 @@ impl StateKeeperRunner { .with_handler(Box::new(tree_writes_persistence)) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), - Arc::new(MockReadStorageFactory), + Arc::new(self.pool.0.clone()), ) .run() .await diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 510f9124c297..e091472ad512 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -13,7 +13,7 @@ use zksync_node_test_utils::{ use zksync_state_keeper::{ io::{L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, - testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder}, + testonly::test_batch_executor::TestBatchExecutorBuilder, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, ZkSyncStateKeeper, }; use zksync_types::{ @@ -113,7 +113,7 @@ impl StateKeeperHandles { tokio::spawn(l2_block_sealer.run()); let io = ExternalIO::new( - pool, + pool.clone(), actions, Box::new(main_node_client), L2ChainId::default(), @@ -132,7 +132,7 @@ impl StateKeeperHandles { Box::new(batch_executor_base), output_handler, Arc::new(NoopSealer), - Arc::new(MockReadStorageFactory), + Arc::new(pool), ); Self { diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 904d17718503..890543bcd910 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -33,6 +33,7 @@ zksync_base_token_adjuster.workspace = true anyhow.workspace = true async-trait.workspace = true +tempfile.workspace = true # used in `testonly` module tokio = { workspace = true, features = ["time"] } thiserror.workspace = true tracing.workspace = true @@ -45,7 +46,6 @@ hex.workspace = true assert_matches.workspace = true test-casing.workspace = true futures.workspace = true -tempfile.workspace = true zksync_eth_client.workspace = true zksync_system_constants.workspace = true diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index 2434e92e812f..4c85fc5bb1fc 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -1,12 +1,8 @@ use std::sync::Arc; use anyhow::Context as _; -use async_trait::async_trait; use once_cell::sync::OnceCell; -use tokio::{ - runtime::Handle, - sync::{mpsc, watch}, -}; +use tokio::{runtime::Handle, sync::mpsc}; use zksync_multivm::{ interface::{ ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, @@ -17,7 +13,7 @@ use zksync_multivm::{ MultiVMTracer, VmInstance, }; use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; -use zksync_state::{ReadStorage, ReadStorageFactory, StorageView, WriteStorage}; +use zksync_state::{OwnedStorage, ReadStorage, StorageView, WriteStorage}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; @@ -50,15 +46,13 @@ impl MainBatchExecutor { } } -#[async_trait] impl BatchExecutor for MainBatchExecutor { - async fn init_batch( + fn init_batch( &mut self, - storage_factory: Arc, + storage: OwnedStorage, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { // Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued // until a previous command is processed), capacity 1 is enough for the commands channel. let (commands_sender, commands_receiver) = mpsc::channel(1); @@ -68,21 +62,17 @@ impl BatchExecutor for MainBatchExecutor { commands: commands_receiver, }; - let stop_receiver = stop_receiver.clone(); let handle = tokio::task::spawn_blocking(move || { - if let Some(storage) = Handle::current() - .block_on( - storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1), - ) - .context("failed accessing state keeper storage")? - { - executor.run(storage, l1_batch_params, system_env); - } else { - tracing::info!("Interrupted while trying to access state keeper storage"); - } + let storage = match storage { + OwnedStorage::Static(storage) => storage, + OwnedStorage::Lending(ref storage) => Handle::current() + .block_on(storage.borrow()) + .context("failed accessing state keeper storage")?, + }; + executor.run(storage, l1_batch_params, system_env); anyhow::Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, commands_sender)) + BatchExecutorHandle::from_raw(handle, commands_sender) } } diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index da4eb194bff5..d4fea2e9dfd5 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -1,15 +1,14 @@ use std::{error::Error as StdError, fmt, sync::Arc}; use anyhow::Context as _; -use async_trait::async_trait; use tokio::{ - sync::{mpsc, oneshot, watch}, + sync::{mpsc, oneshot}, task::JoinHandle, }; use zksync_multivm::interface::{ FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, }; -use zksync_state::{ReadStorageFactory, StorageViewCache}; +use zksync_state::{OwnedStorage, StorageViewCache}; use zksync_types::{vm_trace::Call, Transaction}; use zksync_utils::bytecode::CompressedBytecodeInfo; @@ -55,15 +54,13 @@ impl TxExecutionResult { /// An abstraction that allows us to create different kinds of batch executors. /// The only requirement is to return a [`BatchExecutorHandle`], which does its work /// by communicating with the externally initialized thread. -#[async_trait] pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { - async fn init_batch( + fn init_batch( &mut self, - storage_factory: Arc, + storage: OwnedStorage, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Option; + ) -> BatchExecutorHandle; } #[derive(Debug)] diff --git a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs index f23829ed5203..838b92407673 100644 --- a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs +++ b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs @@ -2,46 +2,22 @@ use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; -use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; +use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; use zksync_types::L1BatchNumber; #[derive(Debug, Clone)] -pub struct PostgresFactory { - pool: ConnectionPool, -} - -#[async_trait] -impl ReadStorageFactory for PostgresFactory { - async fn access_storage( - &self, - _stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number).await?, - )) - } -} - -impl PostgresFactory { - pub fn new(pool: ConnectionPool) -> Self { - Self { pool } - } -} - -#[derive(Debug, Clone)] -pub struct RocksdbFactory { +pub struct RocksdbStorageFactory { pool: ConnectionPool, state_keeper_db_path: String, } #[async_trait] -impl ReadStorageFactory for RocksdbFactory { +impl ReadStorageFactory for RocksdbStorageFactory { async fn access_storage( &self, stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { let builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) .await .context("Failed opening state keeper RocksDB")?; @@ -57,11 +33,11 @@ impl ReadStorageFactory for RocksdbFactory { else { return Ok(None); }; - Ok(Some(PgOrRocksdbStorage::Rocksdb(rocksdb_storage))) + Ok(Some(PgOrRocksdbStorage::Rocksdb(rocksdb_storage).into())) } } -impl RocksdbFactory { +impl RocksdbStorageFactory { pub fn new(pool: ConnectionPool, state_keeper_db_path: String) -> Self { Self { pool, diff --git a/core/node/state_keeper/src/batch_executor/tests/tester.rs b/core/node/state_keeper/src/batch_executor/tests/tester.rs index 579f3bee4819..961ccf9db16f 100644 --- a/core/node/state_keeper/src/batch_executor/tests/tester.rs +++ b/core/node/state_keeper/src/batch_executor/tests/tester.rs @@ -29,10 +29,7 @@ use zksync_types::{ }; use zksync_utils::u256_to_h256; -use super::{ - read_storage_factory::{PostgresFactory, RocksdbFactory}, - StorageType, -}; +use super::{read_storage_factory::RocksdbStorageFactory, StorageType}; use crate::{ batch_executor::{BatchExecutorHandle, TxExecutionResult}, testonly, @@ -121,7 +118,7 @@ impl Tester { } StorageType::Rocksdb => { self.create_batch_executor_inner( - Arc::new(RocksdbFactory::new( + Arc::new(RocksdbStorageFactory::new( self.pool(), self.state_keeper_db_path(), )), @@ -131,12 +128,8 @@ impl Tester { .await } StorageType::Postgres => { - self.create_batch_executor_inner( - Arc::new(PostgresFactory::new(self.pool())), - l1_batch_env, - system_env, - ) - .await + self.create_batch_executor_inner(Arc::new(self.pool()), l1_batch_env, system_env) + .await } } } @@ -149,10 +142,12 @@ impl Tester { ) -> BatchExecutorHandle { let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false); let (_stop_sender, stop_receiver) = watch::channel(false); - batch_executor - .init_batch(storage_factory, l1_batch_env, system_env, &stop_receiver) + let storage = storage_factory + .access_storage(&stop_receiver, l1_batch_env.number - 1) .await - .expect("Batch executor was interrupted") + .expect("failed creating VM storage") + .unwrap(); + batch_executor.init_batch(storage, l1_batch_env, system_env) } pub(super) async fn recover_batch_executor( @@ -180,7 +175,7 @@ impl Tester { StorageType::AsyncRocksdbCache => self.recover_batch_executor(snapshot).await, StorageType::Rocksdb => { self.recover_batch_executor_inner( - Arc::new(RocksdbFactory::new( + Arc::new(RocksdbStorageFactory::new( self.pool(), self.state_keeper_db_path(), )), @@ -189,11 +184,8 @@ impl Tester { .await } StorageType::Postgres => { - self.recover_batch_executor_inner( - Arc::new(PostgresFactory::new(self.pool())), - snapshot, - ) - .await + self.recover_batch_executor_inner(Arc::new(self.pool()), snapshot) + .await } } } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 7675960ecbdc..934ed9493f86 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -146,16 +146,8 @@ impl ZkSyncStateKeeper { .await?; let mut batch_executor = self - .batch_executor_base - .init_batch( - self.storage_factory.clone(), - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) - .await - .ok_or(Error::Canceled)?; - + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .await?; self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks) .await?; @@ -203,15 +195,8 @@ impl ZkSyncStateKeeper { (system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); batch_executor = self - .batch_executor_base - .init_batch( - self.storage_factory.clone(), - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) - .await - .ok_or(Error::Canceled)?; + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) + .await?; let version_changed = system_env.version != sealed_batch_protocol_version; protocol_upgrade_tx = if version_changed { @@ -223,6 +208,24 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + async fn create_batch_executor( + &mut self, + l1_batch_env: L1BatchEnv, + system_env: SystemEnv, + ) -> Result { + let Some(storage) = self + .storage_factory + .access_storage(&self.stop_receiver, l1_batch_env.number - 1) + .await + .context("failed creating VM storage")? + else { + return Err(Error::Canceled); + }; + Ok(self + .batch_executor_base + .init_batch(storage, l1_batch_env, system_env)) + } + /// This function is meant to be called only once during the state-keeper initialization. /// It will check if we should load a protocol upgrade or a `setChainId` transaction, /// perform some checks and return it. diff --git a/core/node/state_keeper/src/state_keeper_storage.rs b/core/node/state_keeper/src/state_keeper_storage.rs index 13cedc3a58a8..fbda064b5d71 100644 --- a/core/node/state_keeper/src/state_keeper_storage.rs +++ b/core/node/state_keeper/src/state_keeper_storage.rs @@ -5,7 +5,8 @@ use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, RocksdbStorageOptions, + AsyncCatchupTask, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, + RocksdbCell, RocksdbStorageOptions, }; use zksync_types::L1BatchNumber; @@ -41,7 +42,7 @@ impl ReadStorageFactory for AsyncRocksdbCache { &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { let initial_state = self.rocksdb_cell.ensure_initialized().await?; let rocksdb = if initial_state.l1_batch_number >= Some(l1_batch_number) { tracing::info!( @@ -63,19 +64,18 @@ impl ReadStorageFactory for AsyncRocksdbCache { .connection_tagged("state_keeper") .await .context("Failed getting a Postgres connection")?; - PgOrRocksdbStorage::access_storage_rocksdb( + let storage = PgOrRocksdbStorage::rocksdb( &mut connection, rocksdb, stop_receiver, l1_batch_number, ) .await - .context("Failed accessing RocksDB storage") + .context("Failed accessing RocksDB storage")?; + Ok(storage.map(Into::into)) } else { Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) - .await - .context("Failed accessing Postgres storage")?, + OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number).into(), )) } } diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index c287bc97407f..465042a602df 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -1,11 +1,8 @@ //! Test utilities that can be used for testing sequencer that may //! be useful outside of this crate. -use std::sync::Arc; - -use async_trait::async_trait; use once_cell::sync::Lazy; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use zksync_contracts::BaseSystemContracts; use zksync_dal::{ConnectionPool, Core, CoreDal as _}; use zksync_multivm::{ @@ -15,7 +12,7 @@ use zksync_multivm::{ }, vm_latest::VmExecutionLogs, }; -use zksync_state::{ReadStorageFactory, StorageViewCache}; +use zksync_state::{OwnedStorage, StorageViewCache}; use zksync_test_account::Account; use zksync_types::{ fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute, @@ -87,15 +84,13 @@ pub(crate) fn storage_view_cache() -> StorageViewCache { #[derive(Debug)] pub struct MockBatchExecutor; -#[async_trait] impl BatchExecutor for MockBatchExecutor { - async fn init_batch( + fn init_batch( &mut self, - _storage_factory: Arc, + _storage: OwnedStorage, _l1batch_params: L1BatchEnv, _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { let (send, recv) = mpsc::channel(1); let handle = tokio::task::spawn(async { let mut recv = recv; @@ -116,7 +111,7 @@ impl BatchExecutor for MockBatchExecutor { } anyhow::Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, send)) + BatchExecutorHandle::from_raw(handle, send) } } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 1be84cfbf54e..aefc8d50bc7d 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -13,14 +13,14 @@ use std::{ }; use async_trait::async_trait; -use tokio::sync::{mpsc, watch, watch::Receiver}; +use tokio::sync::{mpsc, watch}; use zksync_contracts::BaseSystemContracts; use zksync_multivm::{ interface::{ExecutionResult, L1BatchEnv, SystemEnv, VmExecutionResultAndLogs}, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_node_test_utils::create_l2_transaction; -use zksync_state::{PgOrRocksdbStorage, ReadStorageFactory}; +use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; use zksync_types::{ fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -207,7 +207,7 @@ impl TestScenario { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::new(MockReadStorageFactory), + Arc::::default(), ); let sk_thread = tokio::spawn(state_keeper.run()); @@ -410,15 +410,13 @@ impl TestBatchExecutorBuilder { } } -#[async_trait] impl BatchExecutor for TestBatchExecutorBuilder { - async fn init_batch( + fn init_batch( &mut self, - _storage_factory: Arc, - _l1batch_params: L1BatchEnv, + _storage: OwnedStorage, + _l1_batch_params: L1BatchEnv, _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { + ) -> BatchExecutorHandle { let (commands_sender, commands_receiver) = mpsc::channel(1); let executor = TestBatchExecutor::new( @@ -430,7 +428,7 @@ impl BatchExecutor for TestBatchExecutorBuilder { executor.run(); Ok(()) }); - Some(BatchExecutorHandle::from_raw(handle, commands_sender)) + BatchExecutorHandle::from_raw(handle, commands_sender) } } @@ -805,55 +803,31 @@ impl StateKeeperIO for TestIO { } } -/// `BatchExecutor` which doesn't check anything at all. Accepts all transactions. -// FIXME: move to `utils`? +/// Storage factory that produces empty VM storage for any batch. Should only be used with a mock batch executor +/// that doesn't read from the storage. Prefer using `ConnectionPool` as a factory if it's available. #[derive(Debug)] -pub(crate) struct MockBatchExecutor; +pub struct MockReadStorageFactory(tempfile::TempDir); -#[async_trait] -impl BatchExecutor for MockBatchExecutor { - async fn init_batch( - &mut self, - _storage_factory: Arc, - _l1batch_params: L1BatchEnv, - _system_env: SystemEnv, - _stop_receiver: &watch::Receiver, - ) -> Option { - let (send, recv) = mpsc::channel(1); - let handle = tokio::task::spawn(async { - let mut recv = recv; - while let Some(cmd) = recv.recv().await { - match cmd { - Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), - Command::StartNextL2Block(_, resp) => resp.send(()).unwrap(), - Command::RollbackLastTx(_) => panic!("unexpected rollback"), - Command::FinishBatch(resp) => { - // Blanket result, it doesn't really matter. - resp.send(default_vm_batch_result()).unwrap(); - break; - } - Command::FinishBatchWithCache(resp) => resp - .send((default_vm_batch_result(), storage_view_cache())) - .unwrap(), - } - } - anyhow::Ok(()) - }); - Some(BatchExecutorHandle::from_raw(handle, send)) +impl Default for MockReadStorageFactory { + fn default() -> Self { + Self( + tempfile::TempDir::new() + .expect("failed creating temporary directory for `MockReadStorageFactory`"), + ) } } -#[derive(Debug)] -pub struct MockReadStorageFactory; - #[async_trait] impl ReadStorageFactory for MockReadStorageFactory { async fn access_storage( &self, - _stop_receiver: &Receiver, + _stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - // Presume that the storage is never accessed in mocked environment - unimplemented!() + ) -> anyhow::Result> { + let storage = RocksdbStorage::builder(self.0.path()) + .await + .expect("Cannot create mock RocksDB storage") + .build_unchecked(); + Ok(Some(PgOrRocksdbStorage::Rocksdb(storage).into())) } } diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index 8bfc53c8f7b1..2d0af7dd281b 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -438,7 +438,7 @@ async fn load_upgrade_tx() { Box::new(batch_executor_base), output_handler, Arc::new(sealer), - Arc::new(MockReadStorageFactory), + Arc::::default(), ); // Since the version hasn't changed, and we are not using shared bridge, we should not load any diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 3af52ed4688e..52a8e4676437 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -39,3 +39,4 @@ backon.workspace = true futures = { workspace = true, features = ["compat"] } rand.workspace = true tempfile.workspace = true +test-casing.workspace = true diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index e84ec76d0726..3c5a00e074c0 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -173,26 +173,18 @@ impl VmRunner { tokio::time::sleep(SLEEP_INTERVAL).await; continue; } - let Some(batch_data) = self.loader.load_batch(next_batch).await? else { + let Some((batch_data, storage)) = self.loader.load_batch(next_batch).await? else { // Next batch has not been loaded yet tokio::time::sleep(SLEEP_INTERVAL).await; continue; }; let updates_manager = UpdatesManager::new(&batch_data.l1_batch_env, &batch_data.system_env); - let Some(batch_executor) = self - .batch_processor - .init_batch( - self.loader.clone().upcast(), - batch_data.l1_batch_env, - batch_data.system_env, - stop_receiver, - ) - .await - else { - tracing::info!("VM runner was interrupted"); - break; - }; + let batch_executor = self.batch_processor.init_batch( + storage, + batch_data.l1_batch_env, + batch_data.system_env, + ); let output_handler = self .output_handler_factory .create_handler(next_batch) @@ -211,7 +203,5 @@ impl VmRunner { next_batch += 1; } - - Ok(()) } } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index f3e304d7d4ff..75ed4cb57a9c 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -1,6 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, - fmt::Debug, + fmt, sync::Arc, time::Duration, }; @@ -11,8 +11,8 @@ use tokio::sync::{watch, RwLock}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; use zksync_state::{ - AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, - RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, + AsyncCatchupTask, BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, + RocksdbCell, RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, }; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; use zksync_vm_utils::storage::L1BatchParamsProvider; @@ -20,7 +20,7 @@ use zksync_vm_utils::storage::L1BatchParamsProvider; use crate::{metrics::METRICS, VmRunnerIo}; #[async_trait] -pub trait StorageLoader: ReadStorageFactory { +pub trait StorageLoader: 'static + Send + Sync + fmt::Debug { /// Loads next unprocessed L1 batch along with all transactions that VM runner needs to /// re-execute. These are the transactions that are included in a sealed L2 block belonging /// to a sealed L1 batch (with state keeper being the source of truth). The order of the @@ -34,13 +34,7 @@ pub trait StorageLoader: ReadStorageFactory { async fn load_batch( &self, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>; - - /// A workaround for Rust's limitations on upcasting coercion. See - /// https://github.com/rust-lang/rust/issues/65991. - /// - /// Should always be implementable as [`StorageLoader`] requires [`ReadStorageFactory`]. - fn upcast(self: Arc) -> Arc; + ) -> anyhow::Result>; } /// Data needed to execute an L1 batch. @@ -85,13 +79,6 @@ struct State { storage: BTreeMap, } -impl State { - /// Whether this state can serve as a `ReadStorage` source for the given L1 batch. - fn can_be_used_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> bool { - l1_batch_number == self.l1_batch_number || self.storage.contains_key(&l1_batch_number) - } -} - impl VmRunnerStorage { /// Creates a new VM runner storage using provided Postgres pool and RocksDB path. pub async fn new( @@ -133,66 +120,34 @@ impl VmRunnerStorage { } } -impl VmRunnerStorage { - async fn access_storage_inner( - &self, - _stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - let state = self.state.read().await; - let Some(rocksdb) = &state.rocksdb else { - return Ok(Some( - PgOrRocksdbStorage::access_storage_pg(&self.pool, l1_batch_number) - .await - .context("Failed accessing Postgres storage")?, - )); - }; - if !state.can_be_used_for_l1_batch(l1_batch_number) { - tracing::debug!( - %l1_batch_number, - min_l1_batch = %state.l1_batch_number, - max_l1_batch = %state.storage.last_key_value().map(|(k, _)| *k).unwrap_or(state.l1_batch_number), - "Trying to access VM runner storage with L1 batch that is not available", - ); - return Ok(None); - } - let batch_diffs = state - .storage - .iter() - .filter_map(|(x, y)| { - if x <= &l1_batch_number { - Some(y.diff.clone()) - } else { - None - } - }) - .collect::>(); - Ok(Some(PgOrRocksdbStorage::RocksdbWithMemory( - RocksdbWithMemory { - rocksdb: rocksdb.clone(), - batch_diffs, - }, - ))) - } -} - #[async_trait] impl StorageLoader for VmRunnerStorage { async fn load_batch( &self, l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let state = self.state.read().await; - if state.rocksdb.is_none() { + let rocksdb = if let Some(rocksdb) = &state.rocksdb { + rocksdb + } else { + drop(state); let mut conn = self.pool.connection_tagged(self.io.name()).await?; - return StorageSyncTask::::load_batch_execute_data( + let batch_data = load_batch_execute_data( &mut conn, l1_batch_number, &self.l1_batch_params_provider, self.chain_id, ) - .await; - } + .await?; + + return Ok(batch_data.map(|data| { + ( + data, + OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number - 1).into(), + ) + })); + }; + match state.storage.get(&l1_batch_number) { None => { tracing::debug!( @@ -203,25 +158,22 @@ impl StorageLoader for VmRunnerStorage { ); Ok(None) } - Some(batch_data) => Ok(Some(batch_data.execute_data.clone())), + Some(batch_data) => { + let data = batch_data.execute_data.clone(); + let batch_diffs = state + .storage + .iter() + .filter(|(&num, _)| num < l1_batch_number) + .map(|(_, data)| data.diff.clone()) + .collect::>(); + let storage = PgOrRocksdbStorage::RocksdbWithMemory(RocksdbWithMemory { + rocksdb: rocksdb.clone(), + batch_diffs, + }); + Ok(Some((data, storage.into()))) + } } } - - fn upcast(self: Arc) -> Arc { - self - } -} - -#[async_trait] -impl ReadStorageFactory for VmRunnerStorage { - async fn access_storage( - &self, - stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - self.access_storage_inner(stop_receiver, l1_batch_number) - .await - } } /// A runnable task that catches up the provided RocksDB cache instance to the latest processed @@ -335,7 +287,7 @@ impl StorageSyncTask { for l1_batch_number in max_present.0 + 1..=max_desired.0 { let latency = METRICS.storage_load_time.start(); let l1_batch_number = L1BatchNumber(l1_batch_number); - let Some(execute_data) = Self::load_batch_execute_data( + let Some(execute_data) = load_batch_execute_data( &mut conn, l1_batch_number, &self.l1_batch_params_provider, @@ -375,44 +327,44 @@ impl StorageSyncTask { drop(conn); } } +} - async fn load_batch_execute_data( - conn: &mut Connection<'_, Core>, - l1_batch_number: L1BatchNumber, - l1_batch_params_provider: &L1BatchParamsProvider, - chain_id: L2ChainId, - ) -> anyhow::Result> { - let first_l2_block_in_batch = l1_batch_params_provider - .load_first_l2_block_in_batch(conn, l1_batch_number) - .await - .with_context(|| { - format!( - "Failed loading first L2 block for L1 batch #{}", - l1_batch_number - ) - })?; - let Some(first_l2_block_in_batch) = first_l2_block_in_batch else { - return Ok(None); - }; - let (system_env, l1_batch_env) = l1_batch_params_provider - .load_l1_batch_params( - conn, - &first_l2_block_in_batch, - // `validation_computational_gas_limit` is only relevant when rejecting txs, but we - // are re-executing so none of them should be rejected - u32::MAX, - chain_id, +pub(crate) async fn load_batch_execute_data( + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + l1_batch_params_provider: &L1BatchParamsProvider, + chain_id: L2ChainId, +) -> anyhow::Result> { + let first_l2_block_in_batch = l1_batch_params_provider + .load_first_l2_block_in_batch(conn, l1_batch_number) + .await + .with_context(|| { + format!( + "Failed loading first L2 block for L1 batch #{}", + l1_batch_number ) - .await - .with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?; - let l2_blocks = conn - .transactions_dal() - .get_l2_blocks_to_execute_for_l1_batch(l1_batch_number) - .await?; - Ok(Some(BatchExecuteData { - l1_batch_env, - system_env, - l2_blocks, - })) - } + })?; + let Some(first_l2_block_in_batch) = first_l2_block_in_batch else { + return Ok(None); + }; + let (system_env, l1_batch_env) = l1_batch_params_provider + .load_l1_batch_params( + conn, + &first_l2_block_in_batch, + // `validation_computational_gas_limit` is only relevant when rejecting txs, but we + // are re-executing so none of them should be rejected + u32::MAX, + chain_id, + ) + .await + .with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?; + let l2_blocks = conn + .transactions_dal() + .get_l2_blocks_to_execute_for_l1_batch(l1_batch_number) + .await?; + Ok(Some(BatchExecuteData { + l1_batch_env, + system_env, + l2_blocks, + })) } diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 50acba610ba5..e9dbebfa24d5 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -9,6 +9,7 @@ use zksync_node_test_utils::{ create_l1_batch_metadata, create_l2_block, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, }; +use zksync_state::{OwnedPostgresStorage, OwnedStorage}; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_test_account::Account; use zksync_types::{ @@ -17,16 +18,48 @@ use zksync_types::{ get_intrinsic_constants, l2::L2Tx, utils::storage_key_for_standard_token_balance, - AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, ProtocolVersionId, StorageKey, - StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, + AccountTreeId, Address, Execute, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, + StorageKey, StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, }; use zksync_utils::u256_to_h256; +use zksync_vm_utils::storage::L1BatchParamsProvider; -use super::{OutputHandlerFactory, VmRunnerIo}; +use super::{BatchExecuteData, OutputHandlerFactory, VmRunnerIo}; +use crate::storage::{load_batch_execute_data, StorageLoader}; mod output_handler; mod process; mod storage; +mod storage_writer; + +const TEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Simplified storage loader that always gets data from Postgres (i.e., doesn't do RocksDB caching). +#[derive(Debug)] +struct PostgresLoader(ConnectionPool); + +#[async_trait] +impl StorageLoader for PostgresLoader { + async fn load_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let mut conn = self.0.connection().await?; + let Some(data) = load_batch_execute_data( + &mut conn, + l1_batch_number, + &L1BatchParamsProvider::new(), + L2ChainId::default(), + ) + .await? + else { + return Ok(None); + }; + + let storage = OwnedPostgresStorage::new(self.0.clone(), l1_batch_number - 1); + Ok(Some((data, storage.into()))) + } +} #[derive(Debug, Default)] struct IoMock { @@ -141,7 +174,7 @@ mod wait { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct TestOutputFactory { delays: HashMap, } @@ -152,11 +185,11 @@ impl OutputHandlerFactory for TestOutputFactory { &mut self, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { - let delay = self.delays.get(&l1_batch_number).copied(); #[derive(Debug)] struct TestOutputHandler { delay: Option, } + #[async_trait] impl StateKeeperOutputHandler for TestOutputHandler { async fn handle_l2_block( @@ -176,6 +209,8 @@ impl OutputHandlerFactory for TestOutputFactory { Ok(()) } } + + let delay = self.delays.get(&l1_batch_number).copied(); Ok(Box::new(TestOutputHandler { delay })) } } @@ -289,12 +324,11 @@ async fn store_l1_batches( // Insert a fictive L2 block at the end of the batch let mut fictive_l2_block = create_l2_block(l2_block_number.0); - let mut digest = L2BlockHasher::new( + let digest = L2BlockHasher::new( fictive_l2_block.number, fictive_l2_block.timestamp, last_l2_block_hash, ); - digest.push_tx_hash(tx.hash()); fictive_l2_block.hash = digest.finalize(ProtocolVersionId::latest()); l2_block_number += 1; conn.blocks_dal().insert_l2_block(&fictive_l2_block).await?; @@ -337,9 +371,7 @@ async fn store_l1_batches( Ok(batches) } -async fn fund(pool: &ConnectionPool, accounts: &[Account]) { - let mut conn = pool.connection().await.unwrap(); - +async fn fund(conn: &mut Connection<'_, Core>, accounts: &[Account]) { let eth_amount = U256::from(10).pow(U256::from(32)); //10^32 wei for account in accounts { diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index 664bdeebf855..7ea1335db71f 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -1,26 +1,20 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tempfile::TempDir; +use test_casing::test_casing; use tokio::sync::{watch, RwLock}; use zksync_dal::{ConnectionPool, Core}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_state_keeper::MainBatchExecutor; use zksync_test_account::Account; -use zksync_types::L2ChainId; +use zksync_types::{L1BatchNumber, L2ChainId}; -use crate::{ - tests::{fund, store_l1_batches, wait, IoMock, TestOutputFactory}, - ConcurrentOutputHandlerFactory, VmRunner, VmRunnerStorage, -}; +use super::*; +use crate::{ConcurrentOutputHandlerFactory, VmRunner, VmRunnerStorage}; -// Testing more than a one-batch scenario is pretty difficult as that requires storage to have -// completely valid state after each L2 block execution (current block number, hash, rolling txs -// hash etc written to the correct places). To achieve this we could run state keeper e2e but that -// is pretty difficult to set up. -// -// Instead, we rely on integration tests to verify the correctness of VM runner main process. -#[tokio::test] -async fn process_one_batch() -> anyhow::Result<()> { +#[test_casing(4, [(1, 1), (5, 1), (5, 3), (5, 5)])] +#[tokio::test(flavor = "multi_thread")] +async fn process_batches((batch_count, window): (u32, u32)) -> anyhow::Result<()> { let rocksdb_dir = TempDir::new()?; let connection_pool = ConnectionPool::::test_pool().await; let mut conn = connection_pool.connection().await.unwrap(); @@ -28,23 +22,24 @@ async fn process_one_batch() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - let alice = Account::random(); - let bob = Account::random(); - let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + let mut accounts = vec![Account::random(), Account::random()]; + fund(&mut conn, &accounts).await; - let batches = store_l1_batches( + store_l1_batches( &mut conn, - 1..=1, + 1..=batch_count, genesis_params.base_system_contracts().hashes(), &mut accounts, ) .await?; drop(conn); + // Fill in missing storage logs for all batches so that running VM for all of them works correctly. + storage_writer::write_storage_logs(connection_pool.clone()).await; + let io = Arc::new(RwLock::new(IoMock { current: 0.into(), - max: 1, + max: window, })); let (storage, task) = VmRunnerStorage::new( connection_pool.clone(), @@ -53,7 +48,7 @@ async fn process_one_batch() -> anyhow::Result<()> { L2ChainId::default(), ) .await?; - let (_, stop_receiver) = watch::channel(false); + let (_stop_sender, stop_receiver) = watch::channel(false); let storage_stop_receiver = stop_receiver.clone(); tokio::task::spawn(async move { task.run(storage_stop_receiver).await.unwrap() }); let test_factory = TestOutputFactory { @@ -75,9 +70,6 @@ async fn process_one_batch() -> anyhow::Result<()> { ); tokio::task::spawn(async move { vm_runner.run(&stop_receiver).await.unwrap() }); - for batch in batches { - wait::for_batch(io.clone(), batch.number, Duration::from_secs(1)).await?; - } - + wait::for_batch_progressively(io, L1BatchNumber(batch_count), TEST_TIMEOUT).await?; Ok(()) } diff --git a/core/node/vm_runner/src/tests/storage.rs b/core/node/vm_runner/src/tests/storage.rs index 52de43801ff0..90aeda335e1d 100644 --- a/core/node/vm_runner/src/tests/storage.rs +++ b/core/node/vm_runner/src/tests/storage.rs @@ -9,7 +9,7 @@ use tokio::{ }; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_state::{PgOrRocksdbStorage, PostgresStorage, ReadStorage, ReadStorageFactory}; +use zksync_state::{OwnedStorage, PostgresStorage, ReadStorage}; use zksync_test_account::Account; use zksync_types::{AccountTreeId, L1BatchNumber, L2ChainId, StorageKey}; @@ -59,7 +59,7 @@ impl VmRunnerStorage { async fn load_batch_eventually( &self, number: L1BatchNumber, - ) -> anyhow::Result { + ) -> anyhow::Result<(BatchExecuteData, OwnedStorage)> { (|| async { self.load_batch(number) .await? @@ -69,22 +69,6 @@ impl VmRunnerStorage { .await } - async fn access_storage_eventually( - &self, - stop_receiver: &watch::Receiver, - number: L1BatchNumber, - ) -> anyhow::Result> { - (|| async { - self.access_storage(stop_receiver, number) - .await? - .ok_or_else(|| { - anyhow::anyhow!("Storage for batch #{} is not available yet", number) - }) - }) - .retry(&ExponentialBuilder::default()) - .await - } - async fn ensure_batch_unloads_eventually(&self, number: L1BatchNumber) -> anyhow::Result<()> { (|| async { Ok(anyhow::ensure!( @@ -121,11 +105,11 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); // Generate 10 batches worth of data and persist it in Postgres let batches = store_l1_batches( @@ -144,7 +128,7 @@ async fn rerun_storage_on_existing_data() -> anyhow::Result<()> { let storage = tester.create_storage(io_mock.clone()).await?; // Check that existing batches are returned in the exact same order with the exact same data for batch in &batches { - let batch_data = storage.load_batch_eventually(batch.number).await?; + let (batch_data, _) = storage.load_batch_eventually(batch.number).await?; let mut conn = connection_pool.connection().await.unwrap(); let (previous_batch_hash, _) = conn .blocks_dal() @@ -212,11 +196,11 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); let mut tester = StorageTester::new(connection_pool.clone()); let io_mock = Arc::new(RwLock::new(IoMock::default())); @@ -235,14 +219,8 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { io_mock.write().await.max += 1; // Load batch and mark it as processed - assert_eq!( - storage - .load_batch_eventually(L1BatchNumber(1)) - .await? - .l1_batch_env - .number, - L1BatchNumber(1) - ); + let (batch_data, _) = storage.load_batch_eventually(L1BatchNumber(1)).await?; + assert_eq!(batch_data.l1_batch_env.number, L1BatchNumber(1)); io_mock.write().await.current += 1; // No more batches after that @@ -259,15 +237,8 @@ async fn continuously_load_new_batches() -> anyhow::Result<()> { io_mock.write().await.max += 1; // Load batch and mark it as processed - - assert_eq!( - storage - .load_batch_eventually(L1BatchNumber(2)) - .await? - .l1_batch_env - .number, - L1BatchNumber(2) - ); + let (batch_data, _) = storage.load_batch_eventually(L1BatchNumber(2)).await?; + assert_eq!(batch_data.l1_batch_env.number, L1BatchNumber(2)); io_mock.write().await.current += 1; // No more batches after that @@ -284,11 +255,11 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - drop(conn); let alice = Account::random(); let bob = Account::random(); let mut accounts = vec![alice, bob]; - fund(&connection_pool, &accounts).await; + fund(&mut conn, &accounts).await; + drop(conn); // Generate 10 batches worth of data and persist it in Postgres let batch_range = 1..=10; @@ -311,7 +282,6 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { .await; drop(conn); - let (_sender, receiver) = watch::channel(false); let mut tester = StorageTester::new(connection_pool.clone()); let io_mock = Arc::new(RwLock::new(IoMock { current: 0.into(), @@ -321,7 +291,7 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { let handle = tokio::task::spawn_blocking(move || { let vm_runner_storage = rt_handle.block_on(async { tester.create_storage(io_mock.clone()).await.unwrap() }); - for i in 1..=10 { + for i in 1..=9 { let mut conn = rt_handle.block_on(connection_pool.connection()).unwrap(); let (_, last_l2_block_number) = rt_handle .block_on( @@ -331,11 +301,13 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { .unwrap(); let mut pg_storage = PostgresStorage::new(rt_handle.clone(), conn, last_l2_block_number, true); - let mut vm_storage = rt_handle.block_on(async { - vm_runner_storage - .access_storage_eventually(&receiver, L1BatchNumber(i)) - .await - })?; + let (_, vm_storage) = rt_handle + .block_on(vm_runner_storage.load_batch_eventually(L1BatchNumber(i + 1)))?; + let mut vm_storage = match vm_storage { + OwnedStorage::Lending(ref storage) => rt_handle.block_on(storage.borrow()).unwrap(), + OwnedStorage::Static(storage) => storage, + }; + // Check that both storages have identical key-value pairs written in them for storage_log in &storage_logs { let storage_key = StorageKey::new( diff --git a/core/node/vm_runner/src/tests/storage_writer.rs b/core/node/vm_runner/src/tests/storage_writer.rs new file mode 100644 index 000000000000..4c7a6e0d6612 --- /dev/null +++ b/core/node/vm_runner/src/tests/storage_writer.rs @@ -0,0 +1,215 @@ +use tokio::sync::watch; +use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_state_keeper::MainBatchExecutor; + +use super::*; +use crate::{ConcurrentOutputHandlerFactory, VmRunner}; + +#[derive(Debug, Clone)] +struct StorageWriterIo { + last_processed_batch: Arc>, + pool: ConnectionPool, +} + +impl StorageWriterIo { + fn batch(&self) -> L1BatchNumber { + *self.last_processed_batch.borrow() + } +} + +#[async_trait] +impl VmRunnerIo for StorageWriterIo { + fn name(&self) -> &'static str { + "storage_writer" + } + + async fn latest_processed_batch( + &self, + _conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + Ok(self.batch()) + } + + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + let sealed_batch = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await? + .expect("No L1 batches in storage"); + Ok(sealed_batch.min(self.batch() + 1)) + } + + async fn mark_l1_batch_as_processing( + &self, + _conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + assert_eq!(l1_batch_number, self.batch() + 1); + Ok(()) + } + + async fn mark_l1_batch_as_completed( + &self, + _conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + assert_eq!(l1_batch_number, self.batch()); + Ok(()) + } +} + +impl StorageWriterIo { + async fn write_storage_logs( + conn: &mut Connection<'_, Core>, + updates_manager: &UpdatesManager, + ) -> anyhow::Result<()> { + let storage_logs = updates_manager + .l2_block + .storage_logs + .iter() + .filter_map(|log| log.log.is_write().then_some(log.log)); + let storage_logs: Vec<_> = storage_logs.collect(); + conn.storage_logs_dal() + .append_storage_logs(updates_manager.l2_block.number, &storage_logs) + .await?; + Ok(()) + } +} + +#[async_trait] +impl StateKeeperOutputHandler for StorageWriterIo { + async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { + let mut conn = self.pool.connection().await?; + Self::write_storage_logs(&mut conn, updates_manager).await?; + Ok(()) + } + + async fn handle_l1_batch( + &mut self, + updates_manager: Arc, + ) -> anyhow::Result<()> { + let mut conn = self.pool.connection().await?; + // Storage logs are added to the fictive block *after* `handle_l2_block()` is called for it, so we need to call it again here. + Self::write_storage_logs(&mut conn, &updates_manager).await?; + + let finished_batch = updates_manager + .l1_batch + .finished + .as_ref() + .expect("L1 batch is not finished"); + let state_diffs = finished_batch.state_diffs.as_ref().expect("no state diffs"); + let initial_writes: Vec<_> = state_diffs + .iter() + .filter(|diff| diff.is_write_initial()) + .map(|diff| { + H256(StorageKey::raw_hashed_key( + &diff.address, + &u256_to_h256(diff.key), + )) + }) + .collect(); + conn.storage_logs_dedup_dal() + .insert_initial_writes(updates_manager.l1_batch.number, &initial_writes) + .await?; + + self.last_processed_batch + .send_replace(updates_manager.l1_batch.number); + Ok(()) + } +} + +#[async_trait] +impl OutputHandlerFactory for StorageWriterIo { + async fn create_handler( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + assert_eq!(l1_batch_number, self.batch() + 1); + Ok(Box::new(self.clone())) + } +} + +/// Writes missing storage logs into Postgres by executing all transactions from it. Useful both for testing `VmRunner`, +/// and to fill the storage for multi-batch tests for other components. +pub(super) async fn write_storage_logs(pool: ConnectionPool) { + let mut conn = pool.connection().await.unwrap(); + let sealed_batch = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap() + .expect("No L1 batches in storage"); + drop(conn); + let io = Box::new(StorageWriterIo { + last_processed_batch: Arc::new(watch::channel(L1BatchNumber(0)).0), + pool: pool.clone(), + }); + let mut processed_batch = io.last_processed_batch.subscribe(); + + let loader = Arc::new(PostgresLoader(pool.clone())); + let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let vm_runner = VmRunner::new(pool, io.clone(), loader, io, batch_executor); + let (stop_sender, stop_receiver) = watch::channel(false); + let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); + + processed_batch + .wait_for(|&number| number >= sealed_batch) + .await + .unwrap(); + stop_sender.send_replace(true); + vm_runner_handle.await.unwrap().unwrap(); +} + +#[tokio::test] +async fn storage_writer_works() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let genesis_params = GenesisParams::mock(); + insert_genesis_batch(&mut conn, &genesis_params) + .await + .unwrap(); + + let mut accounts = [Account::random()]; + fund(&mut conn, &accounts).await; + store_l1_batches( + &mut conn, + 1..=5, + genesis_params.base_system_contracts().hashes(), + &mut accounts, + ) + .await + .unwrap(); + drop(conn); + + write_storage_logs(pool.clone()).await; + + // Re-run the VM on all batches to check that storage logs are persisted correctly + let (stop_sender, stop_receiver) = watch::channel(false); + let io = Arc::new(RwLock::new(IoMock { + current: L1BatchNumber(0), + max: 5, + })); + let loader = Arc::new(PostgresLoader(pool.clone())); + let (output_factory, output_factory_task) = + ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), TestOutputFactory::default()); + let output_factory_handle = tokio::spawn(output_factory_task.run(stop_receiver.clone())); + let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let vm_runner = VmRunner::new( + pool, + Box::new(io.clone()), + loader, + Box::new(output_factory), + batch_executor, + ); + + let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); + wait::for_batch_progressively(io, L1BatchNumber(5), TEST_TIMEOUT) + .await + .unwrap(); + stop_sender.send_replace(true); + output_factory_handle.await.unwrap().unwrap(); + vm_runner_handle.await.unwrap().unwrap(); +}