From 791fa0f80f65c56f9bc1ad8c308377f73e56cd0f Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 1 Jul 2024 18:41:56 +0100 Subject: [PATCH] refactor: Rename consensus tasks and split storage (BFT-476) (#2357) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ * Renames `FetcherTask` to `ExternalNodeTask` * Moves `run_main_node` to `mn::run_main_node` to match `en::run_external_node` * Splits `consensus::storage` into `consensus::storage::connection` and `consensus::storage::store` ## Why ❔ I'm working on https://github.com/matter-labs/zksync-era/pull/2340 where I made these changes either because the naming was confusing or the module was getting very long and I thought it would make it easier to have it in two before adding more trait implementations to it. The PR was getting huge even before I did any actual work, so I decided to make a pure refactoring PR to make the other one easier to review later. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- contracts | 2 +- core/bin/external_node/src/main.rs | 2 +- core/node/consensus/src/batch.rs | 2 +- core/node/consensus/src/en.rs | 4 +- core/node/consensus/src/era.rs | 6 +- core/node/consensus/src/lib.rs | 65 +- core/node/consensus/src/mn.rs | 72 ++ core/node/consensus/src/storage/connection.rs | 255 +++++++ core/node/consensus/src/storage/mod.rs | 635 +----------------- core/node/consensus/src/storage/store.rs | 381 +++++++++++ core/node/consensus/src/testonly.rs | 3 +- core/node/consensus/src/tests.rs | 5 +- .../src/implementations/layers/consensus.rs | 14 +- 13 files changed, 743 insertions(+), 703 deletions(-) create mode 100644 core/node/consensus/src/mn.rs create mode 100644 core/node/consensus/src/storage/connection.rs create mode 100644 core/node/consensus/src/storage/store.rs diff --git a/contracts b/contracts index 8172969672cc..db9387690502 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit 8172969672cc6a38542cd8f5578c74b7e30cd3b4 +Subproject commit db9387690502937de081a959b164db5a5262ce0a diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index bb19b5670aac..e3ee987a6e62 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -286,7 +286,7 @@ async fn run_core( // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let ctx = ctx::root(); scope::run!(&ctx, |ctx, s| async move { - s.spawn_bg(consensus::era::run_en( + s.spawn_bg(consensus::era::run_external_node( ctx, cfg, pool, diff --git a/core/node/consensus/src/batch.rs b/core/node/consensus/src/batch.rs index d393a845ec6d..08246c4e5c04 100644 --- a/core/node/consensus/src/batch.rs +++ b/core/node/consensus/src/batch.rs @@ -14,7 +14,7 @@ use zksync_types::{ }; use zksync_utils::{h256_to_u256, u256_to_h256}; -use crate::ConnectionPool; +use crate::storage::ConnectionPool; /// Commitment to the last block of a batch. pub(crate) struct LastBlockCommit { diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 3a3263d41b70..66326756fb77 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -9,8 +9,8 @@ use zksync_node_sync::{ use zksync_types::L2BlockNumber; use zksync_web3_decl::client::{DynClient, L2}; -use super::{config, storage::Store, ConnectionPool, ConsensusConfig, ConsensusSecrets}; -use crate::storage; +use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets}; +use crate::storage::{self, ConnectionPool}; /// External node. pub(super) struct EN { diff --git a/core/node/consensus/src/era.rs b/core/node/consensus/src/era.rs index 0e73c29f7741..6d69432d8e1b 100644 --- a/core/node/consensus/src/era.rs +++ b/core/node/consensus/src/era.rs @@ -10,7 +10,7 @@ use zksync_dal::Core; use zksync_node_sync::{sync_action::ActionQueueSender, SyncState}; use zksync_web3_decl::client::{DynClient, L2}; -use super::{en, storage::ConnectionPool}; +use super::{en, mn, storage::ConnectionPool}; /// Runs the consensus task in the main node mode. pub async fn run_main_node( @@ -22,7 +22,7 @@ pub async fn run_main_node( // Consensus is a new component. // For now in case of error we just log it and allow the server // to continue running. - if let Err(err) = super::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await { + if let Err(err) = mn::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await { tracing::error!("Consensus actor failed: {err:#}"); } else { tracing::info!("Consensus actor stopped"); @@ -33,7 +33,7 @@ pub async fn run_main_node( /// Runs the consensus node for the external node. /// If `cfg` is `None`, it will just fetch blocks from the main node /// using JSON RPC, without starting the consensus node. -pub async fn run_en( +pub async fn run_external_node( ctx: &ctx::Ctx, cfg: Option<(ConsensusConfig, ConsensusSecrets)>, pool: zksync_dal::ConnectionPool, diff --git a/core/node/consensus/src/lib.rs b/core/node/consensus/src/lib.rs index 82604d6f817c..13d918b5b6ee 100644 --- a/core/node/consensus/src/lib.rs +++ b/core/node/consensus/src/lib.rs @@ -2,14 +2,8 @@ #![allow(clippy::redundant_locals)] #![allow(clippy::needless_pass_by_ref_mut)] -use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope}; -use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor as executor; -use zksync_consensus_roles::validator; -use zksync_consensus_storage::{BatchStore, BlockStore}; -use crate::storage::{ConnectionPool, Store}; +use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; // Currently `batch` module is only used in tests, // but will be used in production once batch syncing is implemented in consensus. @@ -18,64 +12,9 @@ mod batch; mod config; mod en; pub mod era; +mod mn; mod storage; #[cfg(test)] pub(crate) mod testonly; #[cfg(test)] mod tests; - -/// Task running a consensus validator for the main node. -/// Main node is currently the only leader of the consensus - i.e. it proposes all the -/// L2 blocks (generated by `Statekeeper`). -async fn run_main_node( - ctx: &ctx::Ctx, - cfg: ConsensusConfig, - secrets: ConsensusSecrets, - pool: ConnectionPool, -) -> anyhow::Result<()> { - let validator_key = config::validator_key(&secrets) - .context("validator_key")? - .context("missing validator_key")?; - scope::run!(&ctx, |ctx, s| async { - if let Some(spec) = &cfg.genesis_spec { - let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; - pool.connection(ctx) - .await - .wrap("connection()")? - .adjust_genesis(ctx, &spec) - .await - .wrap("adjust_genesis()")?; - } - let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; - s.spawn_bg(async { runner.run(ctx).await.context("Store::runner()") }); - let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) - .await - .wrap("BlockStore::new()")?; - s.spawn_bg(async { runner.run(ctx).await.context("BlockStore::runner()") }); - anyhow::ensure!( - block_store.genesis().leader_selection - == validator::LeaderSelectionMode::Sticky(validator_key.public()), - "unsupported leader selection mode - main node has to be the leader" - ); - - // Dummy batch store - we don't gossip batches yet, but we need one anyway. - let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) - .await - .wrap("BatchStore::new()")?; - s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") }); - - let executor = executor::Executor { - config: config::executor(&cfg, &secrets)?, - block_store, - batch_store, - attester: None, - validator: Some(executor::Validator { - key: validator_key, - replica_store: Box::new(store.clone()), - payload_manager: Box::new(store.clone()), - }), - }; - executor.run(ctx).await.context("executor.run()") - }) - .await -} diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs new file mode 100644 index 000000000000..0aac43b8ef87 --- /dev/null +++ b/core/node/consensus/src/mn.rs @@ -0,0 +1,72 @@ +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; +use zksync_consensus_executor::{self as executor}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::{BatchStore, BlockStore}; + +use crate::{ + config, + storage::{ConnectionPool, Store}, +}; + +/// Task running a consensus validator for the main node. +/// Main node is currently the only leader of the consensus - i.e. it proposes all the +/// L2 blocks (generated by `Statekeeper`). +pub async fn run_main_node( + ctx: &ctx::Ctx, + cfg: ConsensusConfig, + secrets: ConsensusSecrets, + pool: ConnectionPool, +) -> anyhow::Result<()> { + let validator_key = config::validator_key(&secrets) + .context("validator_key")? + .context("missing validator_key")?; + + scope::run!(&ctx, |ctx, s| async { + if let Some(spec) = &cfg.genesis_spec { + let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; + + pool.connection(ctx) + .await + .wrap("connection()")? + .adjust_genesis(ctx, &spec) + .await + .wrap("adjust_genesis()")?; + } + + let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; + s.spawn_bg(runner.run(ctx)); + + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BlockStore::new()")?; + s.spawn_bg(runner.run(ctx)); + + anyhow::ensure!( + block_store.genesis().leader_selection + == validator::LeaderSelectionMode::Sticky(validator_key.public()), + "unsupported leader selection mode - main node has to be the leader" + ); + + // Dummy batch store - we don't gossip batches yet, but we need one anyway. + let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BatchStore::new()")?; + s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") }); + + let executor = executor::Executor { + config: config::executor(&cfg, &secrets)?, + block_store, + batch_store, + attester: None, + validator: Some(executor::Validator { + key: validator_key, + replica_store: Box::new(store.clone()), + payload_manager: Box::new(store.clone()), + }), + }; + executor.run(ctx).await + }) + .await +} diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs new file mode 100644 index 000000000000..673cb87d2f4e --- /dev/null +++ b/core/node/consensus/src/storage/connection.rs @@ -0,0 +1,255 @@ +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, time}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage as storage; +use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; +use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState}; +use zksync_state_keeper::io::common::IoCursor; +use zksync_types::{commitment::L1BatchWithMetadata, L1BatchNumber}; + +use super::{InsertCertificateError, PayloadQueue}; +use crate::config; + +/// Context-aware `zksync_dal::ConnectionPool` wrapper. +#[derive(Debug, Clone)] +pub(crate) struct ConnectionPool(pub(crate) zksync_dal::ConnectionPool); + +impl ConnectionPool { + /// Wrapper for `connection_tagged()`. + pub(crate) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(Connection( + ctx.wait(self.0.connection_tagged("consensus")) + .await? + .map_err(DalError::generalize)?, + )) + } + + /// Waits for the `number` L2 block. + pub async fn wait_for_payload( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + if let Some(payload) = self + .connection(ctx) + .await + .wrap("connection()")? + .payload(ctx, number) + .await + .with_wrap(|| format!("payload({number})"))? + { + return Ok(payload); + } + ctx.sleep(POLL_INTERVAL).await?; + } + } +} + +/// Context-aware `zksync_dal::Connection` wrapper. +pub(crate) struct Connection<'a>(pub(crate) zksync_dal::Connection<'a, Core>); + +impl<'a> Connection<'a> { + /// Wrapper for `start_transaction()`. + pub async fn start_transaction<'b, 'c: 'b>( + &'c mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(Connection( + ctx.wait(self.0.start_transaction()) + .await? + .context("sqlx")?, + )) + } + + /// Wrapper for `commit()`. + pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> { + Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) + } + + /// Wrapper for `consensus_dal().block_payload()`. + pub async fn payload( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().block_payload(number)) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().block_payloads()`. + pub async fn payloads( + &mut self, + ctx: &ctx::Ctx, + numbers: std::ops::Range, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().block_payloads(numbers)) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().certificate()`. + pub async fn certificate( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().certificate(number)) + .await??) + } + + /// Wrapper for `consensus_dal().insert_certificate()`. + pub async fn insert_certificate( + &mut self, + ctx: &ctx::Ctx, + cert: &validator::CommitQC, + ) -> Result<(), InsertCertificateError> { + Ok(ctx + .wait(self.0.consensus_dal().insert_certificate(cert)) + .await??) + } + + /// Wrapper for `consensus_dal().replica_state()`. + pub async fn replica_state(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(ctx + .wait(self.0.consensus_dal().replica_state()) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().set_replica_state()`. + pub async fn set_replica_state( + &mut self, + ctx: &ctx::Ctx, + state: &storage::ReplicaState, + ) -> ctx::Result<()> { + Ok(ctx + .wait(self.0.consensus_dal().set_replica_state(state)) + .await? + .context("sqlx")?) + } + + /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. + pub async fn batch( + &mut self, + ctx: &ctx::Ctx, + number: L1BatchNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.blocks_dal().get_l1_batch_metadata(number)) + .await? + .context("get_l1_batch_metadata()")?) + } + + /// Wrapper for `FetcherCursor::new()`. + pub async fn new_payload_queue( + &mut self, + ctx: &ctx::Ctx, + actions: ActionQueueSender, + sync_state: SyncState, + ) -> ctx::Result { + Ok(PayloadQueue { + inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??, + actions, + sync_state, + }) + } + + /// Wrapper for `consensus_dal().genesis()`. + pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().genesis()) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().try_update_genesis()`. + pub async fn try_update_genesis( + &mut self, + ctx: &ctx::Ctx, + genesis: &validator::Genesis, + ) -> ctx::Result<()> { + Ok(ctx + .wait(self.0.consensus_dal().try_update_genesis(genesis)) + .await??) + } + + /// Wrapper for `consensus_dal().next_block()`. + async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) + } + + /// Wrapper for `consensus_dal().certificates_range()`. + pub(crate) async fn certificates_range( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result { + Ok(ctx + .wait(self.0.consensus_dal().certificates_range()) + .await??) + } + + /// (Re)initializes consensus genesis to start at the last L2 block in storage. + /// Noop if `spec` matches the current genesis. + pub(crate) async fn adjust_genesis( + &mut self, + ctx: &ctx::Ctx, + spec: &config::GenesisSpec, + ) -> ctx::Result<()> { + let mut txn = self + .start_transaction(ctx) + .await + .wrap("start_transaction()")?; + let old = txn.genesis(ctx).await.wrap("genesis()")?; + if let Some(old) = &old { + if &config::GenesisSpec::from_genesis(old) == spec { + // Hard fork is not needed. + return Ok(()); + } + } + tracing::info!("Performing a hard fork of consensus."); + let genesis = validator::GenesisRaw { + chain_id: spec.chain_id, + fork_number: old + .as_ref() + .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), + first_block: txn.next_block(ctx).await.context("next_block()")?, + + protocol_version: spec.protocol_version, + validators: spec.validators.clone(), + attesters: None, + leader_selection: spec.leader_selection.clone(), + } + .with_hash(); + txn.try_update_genesis(ctx, &genesis) + .await + .wrap("try_update_genesis()")?; + txn.commit(ctx).await.wrap("commit()")?; + Ok(()) + } + + /// Fetches a block from storage. + pub(crate) async fn block( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { + return Ok(None); + }; + let payload = self + .payload(ctx, number) + .await + .wrap("payload()")? + .context("L2 block disappeared from storage")?; + Ok(Some(validator::FinalBlock { + payload: payload.encode(), + justification, + })) + } +} diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 894c0c1c05e7..58238f4b601b 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -1,32 +1,24 @@ //! Storage implementation based on DAL. -use std::sync::Arc; -use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; -use zksync_consensus_bft::PayloadManager; -use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage as storage; -use zksync_dal::{ - consensus_dal::{self, Payload}, - Core, CoreDal, DalError, -}; +use zksync_concurrency::ctx; +use zksync_consensus_roles::validator; +use zksync_dal::consensus_dal; use zksync_node_sync::{ - fetcher::{FetchedBlock, FetchedTransaction, IoCursorExt as _}, + fetcher::{FetchedBlock, IoCursorExt as _}, sync_action::ActionQueueSender, SyncState, }; use zksync_state_keeper::io::common::IoCursor; -use zksync_types::{commitment::L1BatchWithMetadata, L1BatchNumber, L2BlockNumber}; -use super::config; +mod connection; +mod store; + +pub(crate) use connection::*; +pub(crate) use store::*; #[cfg(test)] pub(crate) mod testonly; -/// Context-aware `zksync_dal::ConnectionPool` wrapper. -#[derive(Debug, Clone)] -pub(super) struct ConnectionPool(pub(super) zksync_dal::ConnectionPool); - #[derive(thiserror::Error, Debug)] pub enum InsertCertificateError { #[error(transparent)] @@ -35,255 +27,15 @@ pub enum InsertCertificateError { Inner(#[from] consensus_dal::InsertCertificateError), } -impl ConnectionPool { - /// Wrapper for `connection_tagged()`. - pub(super) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { - Ok(Connection( - ctx.wait(self.0.connection_tagged("consensus")) - .await? - .map_err(DalError::generalize)?, - )) - } - - /// Waits for the `number` L2 block. - pub async fn wait_for_payload( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result { - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - if let Some(payload) = self - .connection(ctx) - .await - .wrap("connection()")? - .payload(ctx, number) - .await - .with_wrap(|| format!("payload({number})"))? - { - return Ok(payload); - } - ctx.sleep(POLL_INTERVAL).await?; - } - } -} - -/// Context-aware `zksync_dal::Connection` wrapper. -pub(super) struct Connection<'a>(pub(super) zksync_dal::Connection<'a, Core>); - -impl<'a> Connection<'a> { - /// Wrapper for `start_transaction()`. - pub async fn start_transaction<'b, 'c: 'b>( - &'c mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(Connection( - ctx.wait(self.0.start_transaction()) - .await? - .context("sqlx")?, - )) - } - - /// Wrapper for `commit()`. - pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> { - Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) - } - - /// Wrapper for `consensus_dal().block_payload()`. - pub async fn payload( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_payload(number)) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().block_payloads()`. - pub async fn payloads( - &mut self, - ctx: &ctx::Ctx, - numbers: std::ops::Range, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_payloads(numbers)) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().certificate()`. - pub async fn certificate( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().certificate(number)) - .await??) - } - - /// Wrapper for `consensus_dal().insert_certificate()`. - pub async fn insert_certificate( - &mut self, - ctx: &ctx::Ctx, - cert: &validator::CommitQC, - ) -> Result<(), InsertCertificateError> { - Ok(ctx - .wait(self.0.consensus_dal().insert_certificate(cert)) - .await??) - } - - /// Wrapper for `consensus_dal().replica_state()`. - pub async fn replica_state(&mut self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(ctx - .wait(self.0.consensus_dal().replica_state()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().set_replica_state()`. - pub async fn set_replica_state( - &mut self, - ctx: &ctx::Ctx, - state: &storage::ReplicaState, - ) -> ctx::Result<()> { - Ok(ctx - .wait(self.0.consensus_dal().set_replica_state(state)) - .await? - .context("sqlx")?) - } - - /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. - pub async fn batch( - &mut self, - ctx: &ctx::Ctx, - number: L1BatchNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.blocks_dal().get_l1_batch_metadata(number)) - .await? - .context("get_l1_batch_metadata()")?) - } - - /// Wrapper for `FetcherCursor::new()`. - pub async fn new_payload_queue( - &mut self, - ctx: &ctx::Ctx, - actions: ActionQueueSender, - sync_state: SyncState, - ) -> ctx::Result { - Ok(PayloadQueue { - inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??, - actions, - sync_state, - }) - } - - /// Wrapper for `consensus_dal().genesis()`. - pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().genesis()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().try_update_genesis()`. - pub async fn try_update_genesis( - &mut self, - ctx: &ctx::Ctx, - genesis: &validator::Genesis, - ) -> ctx::Result<()> { - Ok(ctx - .wait(self.0.consensus_dal().try_update_genesis(genesis)) - .await??) - } - - /// Wrapper for `consensus_dal().next_block()`. - async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) - } - - /// Wrapper for `consensus_dal().certificates_range()`. - async fn certificates_range( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result { - Ok(ctx - .wait(self.0.consensus_dal().certificates_range()) - .await??) - } - - /// (Re)initializes consensus genesis to start at the last L2 block in storage. - /// Noop if `spec` matches the current genesis. - pub(super) async fn adjust_genesis( - &mut self, - ctx: &ctx::Ctx, - spec: &config::GenesisSpec, - ) -> ctx::Result<()> { - let mut txn = self - .start_transaction(ctx) - .await - .wrap("start_transaction()")?; - let old = txn.genesis(ctx).await.wrap("genesis()")?; - if let Some(old) = &old { - if &config::GenesisSpec::from_genesis(old) == spec { - // Hard fork is not needed. - return Ok(()); - } - } - tracing::info!("Performing a hard fork of consensus."); - let genesis = validator::GenesisRaw { - chain_id: spec.chain_id, - fork_number: old - .as_ref() - .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), - first_block: txn.next_block(ctx).await.context("next_block()")?, - - protocol_version: spec.protocol_version, - validators: spec.validators.clone(), - attesters: None, - leader_selection: spec.leader_selection.clone(), - } - .with_hash(); - txn.try_update_genesis(ctx, &genesis) - .await - .wrap("try_update_genesis()")?; - txn.commit(ctx).await.wrap("commit()")?; - Ok(()) - } - - /// Fetches a block from storage. - pub(super) async fn block( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { - return Ok(None); - }; - let payload = self - .payload(ctx, number) - .await - .wrap("payload()")? - .context("L2 block disappeared from storage")?; - Ok(Some(validator::FinalBlock { - payload: payload.encode(), - justification, - })) - } -} - #[derive(Debug)] -pub(super) struct PayloadQueue { +pub(crate) struct PayloadQueue { inner: IoCursor, actions: ActionQueueSender, sync_state: SyncState, } impl PayloadQueue { - pub(super) fn next(&self) -> validator::BlockNumber { + pub(crate) fn next(&self) -> validator::BlockNumber { validator::BlockNumber(self.inner.next_l2_block.0.into()) } @@ -291,7 +43,7 @@ impl PayloadQueue { /// to the actions queue. /// Does nothing and returns Ok() if the block has been already processed. /// Returns an error if a block with an earlier block number was expected. - pub(super) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { + pub(crate) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { let want = self.inner.next_l2_block; // Some blocks are missing. if block.number > want { @@ -305,366 +57,3 @@ impl PayloadQueue { Ok(()) } } - -fn to_fetched_block( - number: validator::BlockNumber, - payload: &validator::Payload, -) -> anyhow::Result { - let number = L2BlockNumber( - number - .0 - .try_into() - .context("Integer overflow converting block number")?, - ); - let payload = Payload::decode(payload).context("Payload::decode()")?; - Ok(FetchedBlock { - number, - l1_batch_number: payload.l1_batch_number, - last_in_batch: payload.last_in_batch, - protocol_version: payload.protocol_version, - timestamp: payload.timestamp, - reference_hash: Some(payload.hash), - l1_gas_price: payload.l1_gas_price, - l2_fair_gas_price: payload.l2_fair_gas_price, - fair_pubdata_price: payload.fair_pubdata_price, - virtual_blocks: payload.virtual_blocks, - operator_address: payload.operator_address, - transactions: payload - .transactions - .into_iter() - .map(FetchedTransaction::new) - .collect(), - }) -} - -/// Wrapper of `ConnectionPool` implementing `ReplicaStore`, `PayloadManager` and -/// `PersistentBlockStore`. -#[derive(Clone, Debug)] -pub(super) struct Store { - pub(super) pool: ConnectionPool, - payloads: Arc>>, - certificates: ctx::channel::UnboundedSender, - persisted: sync::watch::Receiver, -} - -struct PersistedState(sync::watch::Sender); - -/// Background task of the `Store`. -pub struct StoreRunner { - pool: ConnectionPool, - persisted: PersistedState, - certificates: ctx::channel::UnboundedReceiver, -} - -impl Store { - pub(super) async fn new( - ctx: &ctx::Ctx, - pool: ConnectionPool, - payload_queue: Option, - ) -> ctx::Result<(Store, StoreRunner)> { - let persisted = pool - .connection(ctx) - .await - .wrap("connection()")? - .certificates_range(ctx) - .await - .wrap("certificates_range()")?; - let persisted = sync::watch::channel(persisted).0; - let (certs_send, certs_recv) = ctx::channel::unbounded(); - Ok(( - Store { - pool: pool.clone(), - certificates: certs_send, - payloads: Arc::new(sync::Mutex::new(payload_queue)), - persisted: persisted.subscribe(), - }, - StoreRunner { - pool, - persisted: PersistedState(persisted), - certificates: certs_recv, - }, - )) - } -} - -impl PersistedState { - /// Updates `persisted` to new. - /// Ends of the range can only be moved forward. - /// If `persisted.first` is moved forward, it means that blocks have been pruned. - /// If `persisted.last` is moved forward, it means that new blocks with certificates have been - /// persisted. - fn update(&self, new: storage::BlockStoreState) { - self.0.send_if_modified(|p| { - if &new == p { - return false; - } - p.first = p.first.max(new.first); - if p.next() < new.next() { - p.last = new.last; - } - true - }); - } - - /// Checks if the given certificate is exactly the next one that should - /// be persisted. - fn should_be_persisted(&self, cert: &validator::CommitQC) -> bool { - self.0.borrow().next() == cert.header().number - } - - /// Appends the `cert` to `persisted` range. - fn advance(&self, cert: validator::CommitQC) { - self.0.send_if_modified(|p| { - if p.next() != cert.header().number { - return false; - } - p.last = Some(cert); - true - }); - } -} - -impl StoreRunner { - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let res = scope::run!(ctx, |ctx, s| async { - s.spawn::<()>(async { - // Loop updating `persisted` whenever blocks get pruned. - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); - loop { - let range = self - .pool - .connection(ctx) - .await - .wrap("connection")? - .certificates_range(ctx) - .await - .wrap("certificates_range()")?; - self.persisted.update(range); - ctx.sleep(POLL_INTERVAL).await?; - } - }); - - // Loop inserting certs to storage. - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - let cert = self.certificates.recv(ctx).await?; - // Wait for the block to be persisted, so that we can attach a cert to it. - // We may exit this loop without persisting the certificate in case the - // corresponding block has been pruned in the meantime. - while self.persisted.should_be_persisted(&cert) { - use consensus_dal::InsertCertificateError as E; - // Try to insert the cert. - let res = self - .pool - .connection(ctx) - .await - .wrap("connection")? - .insert_certificate(ctx, &cert) - .await; - match res { - Ok(()) => { - // Insertion succeeded: update persisted state - // and wait for the next cert. - self.persisted.advance(cert); - break; - } - Err(InsertCertificateError::Inner(E::MissingPayload)) => { - // the payload is not in storage, it's either not yet persisted - // or already pruned. We will retry after a delay. - ctx.sleep(POLL_INTERVAL).await?; - } - Err(InsertCertificateError::Canceled(err)) => { - return Err(ctx::Error::Canceled(err)) - } - Err(InsertCertificateError::Inner(err)) => { - return Err(ctx::Error::Internal(anyhow::Error::from(err))) - } - } - } - } - }) - .await; - match res { - Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } -} - -#[async_trait::async_trait] -impl storage::PersistentBlockStore for Store { - async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? - .genesis(ctx) - .await? - .context("not found")?) - } - - fn persisted(&self) -> sync::watch::Receiver { - self.persisted.clone() - } - - async fn block( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result { - Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? - .block(ctx, number) - .await? - .context("not found")?) - } - - /// If actions queue is set (and the block has not been stored yet), - /// the block will be translated into a sequence of actions. - /// The received actions should be fed - /// to `ExternalIO`, so that `StateKeeper` will store the corresponding L2 block in the db. - /// - /// `store_next_block()` call will wait synchronously for the L2 block. - /// Once the L2 block is observed in storage, `store_next_block()` will store a cert for this - /// L2 block. - async fn queue_next_block( - &self, - ctx: &ctx::Ctx, - block: validator::FinalBlock, - ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); - if let Some(payloads) = &mut *payloads { - payloads - .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) - .await - .context("payload_queue.send()")?; - } - self.certificates.send(block.justification); - Ok(()) - } -} - -#[async_trait::async_trait] -impl storage::ReplicaStore for Store { - async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .replica_state(ctx) - .await - .wrap("replica_state()") - } - - async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .set_replica_state(ctx, state) - .await - .wrap("set_replica_state()") - } -} - -#[async_trait::async_trait] -impl PayloadManager for Store { - /// Currently (for the main node) proposing is implemented as just converting an L2 block from db (without a cert) into a payload. - async fn propose( - &self, - ctx: &ctx::Ctx, - block_number: validator::BlockNumber, - ) -> ctx::Result { - const LARGE_PAYLOAD_SIZE: usize = 1 << 20; - let payload = self - .pool - .wait_for_payload(ctx, block_number) - .await - .wrap("wait_for_payload")?; - let encoded_payload = payload.encode(); - if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { - tracing::warn!( - "large payload ({}B) with {} transactions", - encoded_payload.0.len(), - payload.transactions.len() - ); - } - Ok(encoded_payload) - } - - /// Verify that `payload` is a correct proposal for the block `block_number`. - /// * for the main node it checks whether the same block is already present in storage. - /// * for the EN validator - /// * if the block with this number was already applied, it checks that it was the - /// same block. It should always be true, because main node is the only proposer and - /// to propose a different block a hard fork is needed. - /// * otherwise, EN attempts to apply the received block. If the block was incorrect - /// the statekeeper is expected to crash the whole EN. Otherwise OK is returned. - async fn verify( - &self, - ctx: &ctx::Ctx, - block_number: validator::BlockNumber, - payload: &validator::Payload, - ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); - if let Some(payloads) = &mut *payloads { - let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; - let n = block.number; - payloads.send(block).await.context("payload_queue.send()")?; - // Wait for the block to be processed, without waiting for it to be stored. - // TODO(BFT-459): this is not ideal, because we don't check here whether the - // processed block is the same as `payload`. It will work correctly - // with the current implementation of EN, but we should make it more - // precise when block reverting support is implemented. - ctx.wait(payloads.sync_state.wait_for_local_block(n)) - .await?; - } else { - let want = self.pool.wait_for_payload(ctx, block_number).await?; - let got = Payload::decode(payload).context("Payload::decode(got)")?; - if got != want { - return Err( - anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), - ); - } - } - Ok(()) - } -} - -// Dummy implementation -#[async_trait::async_trait] -impl storage::PersistentBatchStore for Store { - async fn last_batch(&self) -> attester::BatchNumber { - unimplemented!() - } - async fn last_batch_qc(&self) -> attester::BatchQC { - unimplemented!() - } - async fn get_batch(&self, _number: attester::BatchNumber) -> Option { - None - } - async fn get_batch_qc(&self, _number: attester::BatchNumber) -> Option { - None - } - async fn store_qc(&self, _qc: attester::BatchQC) { - unimplemented!() - } - fn persisted(&self) -> sync::watch::Receiver { - sync::watch::channel(storage::BatchStoreState { - first: attester::BatchNumber(0), - last: None, - }) - .1 - } - async fn queue_next_batch( - &self, - _ctx: &ctx::Ctx, - _batch: attester::SyncBatch, - ) -> ctx::Result<()> { - Err(anyhow::format_err!("unimplemented").into()) - } -} diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs new file mode 100644 index 000000000000..fa6309bc2ef7 --- /dev/null +++ b/core/node/consensus/src/storage/store.rs @@ -0,0 +1,381 @@ +use std::sync::Arc; + +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; +use zksync_consensus_bft::PayloadManager; +use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_storage as storage; +use zksync_dal::consensus_dal::{self, Payload}; +use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; +use zksync_types::L2BlockNumber; + +use super::PayloadQueue; +use crate::storage::{ConnectionPool, InsertCertificateError}; + +fn to_fetched_block( + number: validator::BlockNumber, + payload: &validator::Payload, +) -> anyhow::Result { + let number = L2BlockNumber( + number + .0 + .try_into() + .context("Integer overflow converting block number")?, + ); + let payload = Payload::decode(payload).context("Payload::decode()")?; + Ok(FetchedBlock { + number, + l1_batch_number: payload.l1_batch_number, + last_in_batch: payload.last_in_batch, + protocol_version: payload.protocol_version, + timestamp: payload.timestamp, + reference_hash: Some(payload.hash), + l1_gas_price: payload.l1_gas_price, + l2_fair_gas_price: payload.l2_fair_gas_price, + fair_pubdata_price: payload.fair_pubdata_price, + virtual_blocks: payload.virtual_blocks, + operator_address: payload.operator_address, + transactions: payload + .transactions + .into_iter() + .map(FetchedTransaction::new) + .collect(), + }) +} + +/// Wrapper of `ConnectionPool` implementing `ReplicaStore`, `PayloadManager`, +/// `PersistentBlockStore` and `PersistentBatchStore`. +/// +/// Contains queues to save Quorum Certificates received over gossip to the store +/// as and when the payload they are over becomes available. +#[derive(Clone, Debug)] +pub(crate) struct Store { + pub(super) pool: ConnectionPool, + payloads: Arc>>, + /// L2 block QCs received over gossip + certificates: ctx::channel::UnboundedSender, + /// Range of L2 blocks for which we have a QC persisted. + persisted: sync::watch::Receiver, +} + +struct PersistedState(sync::watch::Sender); + +/// Background task of the `Store`. +pub struct StoreRunner { + pool: ConnectionPool, + persisted: PersistedState, + certificates: ctx::channel::UnboundedReceiver, +} + +impl Store { + pub(crate) async fn new( + ctx: &ctx::Ctx, + pool: ConnectionPool, + payload_queue: Option, + ) -> ctx::Result<(Store, StoreRunner)> { + let persisted = pool + .connection(ctx) + .await + .wrap("connection()")? + .certificates_range(ctx) + .await + .wrap("certificates_range()")?; + let persisted = sync::watch::channel(persisted).0; + let (certs_send, certs_recv) = ctx::channel::unbounded(); + Ok(( + Store { + pool: pool.clone(), + certificates: certs_send, + payloads: Arc::new(sync::Mutex::new(payload_queue)), + persisted: persisted.subscribe(), + }, + StoreRunner { + pool, + persisted: PersistedState(persisted), + certificates: certs_recv, + }, + )) + } +} + +impl PersistedState { + /// Updates `persisted` to new. + /// Ends of the range can only be moved forward. + /// If `persisted.first` is moved forward, it means that blocks have been pruned. + /// If `persisted.last` is moved forward, it means that new blocks with certificates have been + /// persisted. + fn update(&self, new: storage::BlockStoreState) { + self.0.send_if_modified(|p| { + if &new == p { + return false; + } + p.first = p.first.max(new.first); + if p.next() < new.next() { + p.last = new.last; + } + true + }); + } + + /// Checks if the given certificate is exactly the next one that should + /// be persisted. + fn should_be_persisted(&self, cert: &validator::CommitQC) -> bool { + self.0.borrow().next() == cert.header().number + } + + /// Appends the `cert` to `persisted` range. + fn advance(&self, cert: validator::CommitQC) { + self.0.send_if_modified(|p| { + if p.next() != cert.header().number { + return false; + } + p.last = Some(cert); + true + }); + } +} + +impl StoreRunner { + pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let res = scope::run!(ctx, |ctx, s| async { + s.spawn::<()>(async { + // Loop updating `persisted` whenever blocks get pruned. + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + loop { + let range = self + .pool + .connection(ctx) + .await + .wrap("connection")? + .certificates_range(ctx) + .await + .wrap("certificates_range()")?; + self.persisted.update(range); + ctx.sleep(POLL_INTERVAL).await?; + } + }); + + // Loop inserting certs to storage. + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + let cert = self.certificates.recv(ctx).await?; + // Wait for the block to be persisted, so that we can attach a cert to it. + // We may exit this loop without persisting the certificate in case the + // corresponding block has been pruned in the meantime. + while self.persisted.should_be_persisted(&cert) { + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + let res = self + .pool + .connection(ctx) + .await + .wrap("connection")? + .insert_certificate(ctx, &cert) + .await; + match res { + Ok(()) => { + // Insertion succeeded: update persisted state + // and wait for the next cert. + self.persisted.advance(cert); + break; + } + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // the payload is not in storage, it's either not yet persisted + // or already pruned. We will retry after a delay. + ctx.sleep(POLL_INTERVAL).await?; + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + } + } + } + }) + .await; + match res { + Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } + } +} + +#[async_trait::async_trait] +impl storage::PersistentBlockStore for Store { + async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .genesis(ctx) + .await? + .context("not found")?) + } + + fn persisted(&self) -> sync::watch::Receiver { + self.persisted.clone() + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .block(ctx, number) + .await? + .context("not found")?) + } + + /// If actions queue is set (and the block has not been stored yet), + /// the block will be translated into a sequence of actions. + /// The received actions should be fed + /// to `ExternalIO`, so that `StateKeeper` will store the corresponding L2 block in the db. + /// + /// `store_next_block()` call will wait synchronously for the L2 block. + /// Once the L2 block is observed in storage, `store_next_block()` will store a cert for this + /// L2 block. + async fn queue_next_block( + &self, + ctx: &ctx::Ctx, + block: validator::FinalBlock, + ) -> ctx::Result<()> { + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + payloads + .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) + .await + .context("payload_queue.send()")?; + } + self.certificates.send(block.justification); + Ok(()) + } +} + +#[async_trait::async_trait] +impl storage::ReplicaStore for Store { + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { + self.pool + .connection(ctx) + .await + .wrap("connection()")? + .replica_state(ctx) + .await + .wrap("replica_state()") + } + + async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { + self.pool + .connection(ctx) + .await + .wrap("connection()")? + .set_replica_state(ctx, state) + .await + .wrap("set_replica_state()") + } +} + +#[async_trait::async_trait] +impl PayloadManager for Store { + /// Currently (for the main node) proposing is implemented as just converting an L2 block from db (without a cert) into a payload. + async fn propose( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + ) -> ctx::Result { + const LARGE_PAYLOAD_SIZE: usize = 1 << 20; + let payload = self + .pool + .wait_for_payload(ctx, block_number) + .await + .wrap("wait_for_payload")?; + let encoded_payload = payload.encode(); + if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { + tracing::warn!( + "large payload ({}B) with {} transactions", + encoded_payload.0.len(), + payload.transactions.len() + ); + } + Ok(encoded_payload) + } + + /// Verify that `payload` is a correct proposal for the block `block_number`. + /// * for the main node it checks whether the same block is already present in storage. + /// * for the EN validator + /// * if the block with this number was already applied, it checks that it was the + /// same block. It should always be true, because main node is the only proposer and + /// to propose a different block a hard fork is needed. + /// * otherwise, EN attempts to apply the received block. If the block was incorrect + /// the statekeeper is expected to crash the whole EN. Otherwise OK is returned. + async fn verify( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()> { + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; + let n = block.number; + payloads.send(block).await.context("payload_queue.send()")?; + // Wait for the block to be processed, without waiting for it to be stored. + // TODO(BFT-459): this is not ideal, because we don't check here whether the + // processed block is the same as `payload`. It will work correctly + // with the current implementation of EN, but we should make it more + // precise when block reverting support is implemented. + ctx.wait(payloads.sync_state.wait_for_local_block(n)) + .await?; + } else { + let want = self.pool.wait_for_payload(ctx, block_number).await?; + let got = Payload::decode(payload).context("Payload::decode(got)")?; + if got != want { + return Err( + anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), + ); + } + } + Ok(()) + } +} + +// Dummy implementation +#[async_trait::async_trait] +impl storage::PersistentBatchStore for Store { + async fn last_batch(&self) -> attester::BatchNumber { + unimplemented!() + } + async fn last_batch_qc(&self) -> attester::BatchQC { + unimplemented!() + } + async fn get_batch(&self, _number: attester::BatchNumber) -> Option { + None + } + async fn get_batch_qc(&self, _number: attester::BatchNumber) -> Option { + None + } + async fn store_qc(&self, _qc: attester::BatchQC) { + unimplemented!() + } + fn persisted(&self) -> sync::watch::Receiver { + sync::watch::channel(storage::BatchStoreState { + first: attester::BatchNumber(0), + last: None, + }) + .1 + } + async fn queue_next_batch( + &self, + _ctx: &ctx::Ctx, + _batch: attester::SyncBatch, + ) -> ctx::Result<()> { + Err(anyhow::format_err!("unimplemented").into()) + } +} diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 514e66c81fea..f2c51521b3f4 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -49,7 +49,8 @@ use zksync_web3_decl::client::{Client, DynClient, L2}; use crate::{ batch::{L1BatchCommit, L1BatchWithWitness, LastBlockCommit}, - en, ConnectionPool, + en, + storage::ConnectionPool, }; /// Fake StateKeeper for tests. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index acff2365585f..3f57e4beeade 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -1,7 +1,8 @@ use anyhow::Context as _; +use storage::Store; use test_casing::{test_casing, Product}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::{ctx, error::Wrap, scope}; use zksync_config::configs::consensus::{ValidatorPublicKey, WeightedValidator}; use zksync_consensus_crypto::TextFmt as _; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; @@ -9,9 +10,11 @@ use zksync_consensus_roles::{ validator, validator::testonly::{Setup, SetupSpec}, }; +use zksync_consensus_storage::BlockStore; use zksync_types::{L1BatchNumber, ProtocolVersionId}; use super::*; +use crate::{mn::run_main_node, storage::ConnectionPool}; const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()]; const FROM_SNAPSHOT: [bool; 2] = [true, false]; diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs index 14b20aaa3c38..d1d7fa3b7de1 100644 --- a/core/node/node_framework/src/implementations/layers/consensus.rs +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -37,7 +37,7 @@ pub enum Mode { /// ## Adds tasks /// /// - `MainNodeConsensusTask` (if `Mode::Main`) -/// - `FetcherTask` (if `Mode::External`) +/// - `ExternalNodeTask` (if `Mode::External`) #[derive(Debug)] pub struct ConsensusLayer { pub mode: Mode, @@ -99,7 +99,7 @@ impl WiringLayer for ConsensusLayer { } }; - let task = FetcherTask { + let task = ExternalNodeTask { config, pool, main_node_client, @@ -128,7 +128,7 @@ impl Task for MainNodeConsensusTask { async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { // We instantiate the root context here, since the consensus task is the only user of the - // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // structured concurrency framework (`MainNodeConsensusTask` and `ExternalNodeTask` are considered mutually // exclusive). // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, // not the consensus task itself. There may have been any number of tasks running in the root context, @@ -149,7 +149,7 @@ impl Task for MainNodeConsensusTask { } #[derive(Debug)] -pub struct FetcherTask { +pub struct ExternalNodeTask { config: Option<(ConsensusConfig, ConsensusSecrets)>, pool: ConnectionPool, main_node_client: Box>, @@ -158,21 +158,21 @@ pub struct FetcherTask { } #[async_trait::async_trait] -impl Task for FetcherTask { +impl Task for ExternalNodeTask { fn id(&self) -> TaskId { "consensus_fetcher".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { // We instantiate the root context here, since the consensus task is the only user of the - // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // structured concurrency framework (`MainNodeConsensusTask` and `ExternalNodeTask` are considered mutually // exclusive). // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, // not the consensus task itself. There may have been any number of tasks running in the root context, // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async { - s.spawn_bg(consensus::era::run_en( + s.spawn_bg(consensus::era::run_external_node( ctx, self.config, self.pool,