From a7129c180a50142d03087073615757432e720b64 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 1 Jul 2024 13:34:28 +0100 Subject: [PATCH] BFT-476: Refactor consensus tasks and storage --- core/bin/external_node/src/main.rs | 2 +- core/node/consensus/src/batch.rs | 2 +- core/node/consensus/src/en.rs | 4 +- core/node/consensus/src/era.rs | 6 +- core/node/consensus/src/lib.rs | 57 +- core/node/consensus/src/mn.rs | 64 ++ core/node/consensus/src/storage/connection.rs | 319 ++++++++++ core/node/consensus/src/storage/mod.rs | 575 +----------------- core/node/consensus/src/storage/store.rs | 266 ++++++++ core/node/consensus/src/testonly.rs | 3 +- core/node/consensus/src/tests.rs | 5 +- .../src/implementations/layers/consensus.rs | 14 +- prover/Cargo.lock | 17 +- 13 files changed, 690 insertions(+), 644 deletions(-) create mode 100644 core/node/consensus/src/mn.rs create mode 100644 core/node/consensus/src/storage/connection.rs create mode 100644 core/node/consensus/src/storage/store.rs diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index bb19b5670aac..e3ee987a6e62 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -286,7 +286,7 @@ async fn run_core( // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let ctx = ctx::root(); scope::run!(&ctx, |ctx, s| async move { - s.spawn_bg(consensus::era::run_en( + s.spawn_bg(consensus::era::run_external_node( ctx, cfg, pool, diff --git a/core/node/consensus/src/batch.rs b/core/node/consensus/src/batch.rs index d393a845ec6d..08246c4e5c04 100644 --- a/core/node/consensus/src/batch.rs +++ b/core/node/consensus/src/batch.rs @@ -14,7 +14,7 @@ use zksync_types::{ }; use zksync_utils::{h256_to_u256, u256_to_h256}; -use crate::ConnectionPool; +use crate::storage::ConnectionPool; /// Commitment to the last block of a batch. pub(crate) struct LastBlockCommit { diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 685bc982bd07..f69e14d738d5 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -9,8 +9,8 @@ use zksync_node_sync::{ use zksync_types::L2BlockNumber; use zksync_web3_decl::client::{DynClient, L2}; -use super::{config, storage::Store, ConnectionPool, ConsensusConfig, ConsensusSecrets}; -use crate::storage; +use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets}; +use crate::storage::{self, ConnectionPool}; /// External node. pub(super) struct EN { diff --git a/core/node/consensus/src/era.rs b/core/node/consensus/src/era.rs index 0e73c29f7741..6d69432d8e1b 100644 --- a/core/node/consensus/src/era.rs +++ b/core/node/consensus/src/era.rs @@ -10,7 +10,7 @@ use zksync_dal::Core; use zksync_node_sync::{sync_action::ActionQueueSender, SyncState}; use zksync_web3_decl::client::{DynClient, L2}; -use super::{en, storage::ConnectionPool}; +use super::{en, mn, storage::ConnectionPool}; /// Runs the consensus task in the main node mode. pub async fn run_main_node( @@ -22,7 +22,7 @@ pub async fn run_main_node( // Consensus is a new component. // For now in case of error we just log it and allow the server // to continue running. - if let Err(err) = super::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await { + if let Err(err) = mn::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await { tracing::error!("Consensus actor failed: {err:#}"); } else { tracing::info!("Consensus actor stopped"); @@ -33,7 +33,7 @@ pub async fn run_main_node( /// Runs the consensus node for the external node. /// If `cfg` is `None`, it will just fetch blocks from the main node /// using JSON RPC, without starting the consensus node. -pub async fn run_en( +pub async fn run_external_node( ctx: &ctx::Ctx, cfg: Option<(ConsensusConfig, ConsensusSecrets)>, pool: zksync_dal::ConnectionPool, diff --git a/core/node/consensus/src/lib.rs b/core/node/consensus/src/lib.rs index bc9776c42df5..13d918b5b6ee 100644 --- a/core/node/consensus/src/lib.rs +++ b/core/node/consensus/src/lib.rs @@ -2,14 +2,8 @@ #![allow(clippy::redundant_locals)] #![allow(clippy::needless_pass_by_ref_mut)] -use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope}; -use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor as executor; -use zksync_consensus_roles::validator; -use zksync_consensus_storage::BlockStore; -use crate::storage::{ConnectionPool, Store}; +use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; // Currently `batch` module is only used in tests, // but will be used in production once batch syncing is implemented in consensus. @@ -18,56 +12,9 @@ mod batch; mod config; mod en; pub mod era; +mod mn; mod storage; #[cfg(test)] pub(crate) mod testonly; #[cfg(test)] mod tests; - -/// Task running a consensus validator for the main node. -/// Main node is currently the only leader of the consensus - i.e. it proposes all the -/// L2 blocks (generated by `Statekeeper`). -async fn run_main_node( - ctx: &ctx::Ctx, - cfg: ConsensusConfig, - secrets: ConsensusSecrets, - pool: ConnectionPool, -) -> anyhow::Result<()> { - let validator_key = config::validator_key(&secrets) - .context("validator_key")? - .context("missing validator_key")?; - scope::run!(&ctx, |ctx, s| async { - if let Some(spec) = &cfg.genesis_spec { - let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; - pool.connection(ctx) - .await - .wrap("connection()")? - .adjust_genesis(ctx, &spec) - .await - .wrap("adjust_genesis()")?; - } - let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; - s.spawn_bg(runner.run(ctx)); - let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) - .await - .wrap("BlockStore::new()")?; - s.spawn_bg(runner.run(ctx)); - anyhow::ensure!( - block_store.genesis().leader_selection - == validator::LeaderSelectionMode::Sticky(validator_key.public()), - "unsupported leader selection mode - main node has to be the leader" - ); - - let executor = executor::Executor { - config: config::executor(&cfg, &secrets)?, - block_store, - validator: Some(executor::Validator { - key: validator_key, - replica_store: Box::new(store.clone()), - payload_manager: Box::new(store.clone()), - }), - }; - executor.run(ctx).await - }) - .await -} diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs new file mode 100644 index 000000000000..3ae8ef8d439f --- /dev/null +++ b/core/node/consensus/src/mn.rs @@ -0,0 +1,64 @@ +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; +use zksync_consensus_executor::{self as executor}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::BlockStore; + +use crate::{ + config, + storage::{ConnectionPool, Store}, +}; + +/// Task running a consensus validator for the main node. +/// Main node is currently the only leader of the consensus - i.e. it proposes all the +/// L2 blocks (generated by `Statekeeper`). +pub async fn run_main_node( + ctx: &ctx::Ctx, + cfg: ConsensusConfig, + secrets: ConsensusSecrets, + pool: ConnectionPool, +) -> anyhow::Result<()> { + let validator_key = config::validator_key(&secrets) + .context("validator_key")? + .context("missing validator_key")?; + + scope::run!(&ctx, |ctx, s| async { + if let Some(spec) = &cfg.genesis_spec { + let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; + + pool.connection(ctx) + .await + .wrap("connection()")? + .adjust_genesis(ctx, &spec) + .await + .wrap("adjust_genesis()")?; + } + + let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; + s.spawn_bg(runner.run(ctx)); + + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BlockStore::new()")?; + s.spawn_bg(runner.run(ctx)); + + anyhow::ensure!( + block_store.genesis().leader_selection + == validator::LeaderSelectionMode::Sticky(validator_key.public()), + "unsupported leader selection mode - main node has to be the leader" + ); + + let executor = executor::Executor { + config: config::executor(&cfg, &secrets)?, + block_store, + validator: Some(executor::Validator { + key: validator_key, + replica_store: Box::new(store.clone()), + payload_manager: Box::new(store.clone()), + }), + }; + executor.run(ctx).await + }) + .await +} diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs new file mode 100644 index 000000000000..ba9d7721b6f2 --- /dev/null +++ b/core/node/consensus/src/storage/connection.rs @@ -0,0 +1,319 @@ +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, time}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage as storage; +use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; +use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState}; +use zksync_state_keeper::io::common::IoCursor; +use zksync_types::{commitment::L1BatchWithMetadata, L1BatchNumber}; + +use super::PayloadQueue; +use crate::config; + +/// Context-aware `zksync_dal::ConnectionPool` wrapper. +#[derive(Debug, Clone)] +pub(crate) struct ConnectionPool(pub(crate) zksync_dal::ConnectionPool); + +impl ConnectionPool { + /// Wrapper for `connection_tagged()`. + pub(crate) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(Connection( + ctx.wait(self.0.connection_tagged("consensus")) + .await? + .map_err(DalError::generalize)?, + )) + } + + /// Waits for the `number` L2 block. + pub async fn wait_for_payload( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + if let Some(payload) = self + .connection(ctx) + .await + .wrap("connection()")? + .payload(ctx, number) + .await + .wrap("payload()")? + { + return Ok(payload); + } + ctx.sleep(POLL_INTERVAL).await?; + } + } +} + +/// Context-aware `zksync_dal::Connection` wrapper. +pub(crate) struct Connection<'a>(pub(crate) zksync_dal::Connection<'a, Core>); + +impl<'a> Connection<'a> { + /// Wrapper for `start_transaction()`. + pub async fn start_transaction<'b, 'c: 'b>( + &'c mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(Connection( + ctx.wait(self.0.start_transaction()) + .await? + .context("sqlx")?, + )) + } + + /// Wrapper for `commit()`. + pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> { + Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) + } + + /// Wrapper for `consensus_dal().block_range()`. + pub async fn block_range( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().block_range()) + .await? + .context("sqlx")?) + } + + /// Wrapper for `consensus_dal().block_payload()`. + pub async fn payload( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().block_payload(number)) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().block_payloads()`. + pub async fn payloads( + &mut self, + ctx: &ctx::Ctx, + numbers: std::ops::Range, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().block_payloads(numbers)) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().first_certificate()`. + pub async fn first_certificate( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().first_certificate()) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().last_certificate()`. + pub async fn last_certificate( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().last_certificate()) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().certificate()`. + pub async fn certificate( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().certificate(number)) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().insert_certificate()`. + pub async fn insert_certificate( + &mut self, + ctx: &ctx::Ctx, + cert: &validator::CommitQC, + ) -> ctx::Result<()> { + Ok(ctx + .wait(self.0.consensus_dal().insert_certificate(cert)) + .await??) + } + + /// Wrapper for `consensus_dal().replica_state()`. + pub async fn replica_state(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(ctx + .wait(self.0.consensus_dal().replica_state()) + .await? + .map_err(DalError::generalize)?) + } + + /// Wrapper for `consensus_dal().set_replica_state()`. + pub async fn set_replica_state( + &mut self, + ctx: &ctx::Ctx, + state: &storage::ReplicaState, + ) -> ctx::Result<()> { + Ok(ctx + .wait(self.0.consensus_dal().set_replica_state(state)) + .await? + .context("sqlx")?) + } + + /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. + pub async fn batch( + &mut self, + ctx: &ctx::Ctx, + number: L1BatchNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.blocks_dal().get_l1_batch_metadata(number)) + .await? + .context("get_l1_batch_metadata()")?) + } + + /// Wrapper for `FetcherCursor::new()`. + pub async fn new_payload_queue( + &mut self, + ctx: &ctx::Ctx, + actions: ActionQueueSender, + sync_state: SyncState, + ) -> ctx::Result { + Ok(PayloadQueue { + inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??, + actions, + sync_state, + }) + } + + pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().genesis()) + .await? + .map_err(DalError::generalize)?) + } + + pub async fn try_update_genesis( + &mut self, + ctx: &ctx::Ctx, + genesis: &validator::Genesis, + ) -> ctx::Result<()> { + Ok(ctx + .wait(self.0.consensus_dal().try_update_genesis(genesis)) + .await??) + } + + /// Fetches and verifies consistency of certificates in storage. + pub async fn certificates_range( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result { + // Fetch the range of L2 blocks in storage. + let block_range = self.block_range(ctx).await.context("block_range")?; + + // Fetch the range of certificates in storage. + let genesis = self + .genesis(ctx) + .await + .wrap("genesis()")? + .context("genesis missing")?; + let first_expected_cert = genesis.first_block.max(block_range.start); + let last_cert = self + .last_certificate(ctx) + .await + .wrap("last_certificate()")?; + let next_expected_cert = last_cert + .as_ref() + .map_or(first_expected_cert, |cert| cert.header().number.next()); + + // Check that the first certificate in storage has the expected L2 block number. + if let Some(got) = self + .first_certificate(ctx) + .await + .wrap("first_certificate()")? + { + if got.header().number != first_expected_cert { + return Err(anyhow::format_err!( + "inconsistent storage: certificates should start at {first_expected_cert}, while they start at {}", + got.header().number, + ).into()); + } + } + + // Check that the node has all the blocks before the next expected certificate, because + // the node needs to know the state of the chain up to block `X` to process block `X+1`. + if block_range.end < next_expected_cert { + return Err(anyhow::format_err!("inconsistent storage: cannot start consensus for L2 block {next_expected_cert}, because earlier blocks are missing").into()); + } + Ok(storage::BlockStoreState { + first: first_expected_cert, + last: last_cert, + }) + } + + /// (Re)initializes consensus genesis to start at the last L2 block in storage. + /// Noop if `spec` matches the current genesis. + pub(crate) async fn adjust_genesis( + &mut self, + ctx: &ctx::Ctx, + spec: &config::GenesisSpec, + ) -> ctx::Result<()> { + let block_range = self.block_range(ctx).await.wrap("block_range()")?; + let mut txn = self + .start_transaction(ctx) + .await + .wrap("start_transaction()")?; + let old = txn.genesis(ctx).await.wrap("genesis()")?; + if let Some(old) = &old { + if &config::GenesisSpec::from_genesis(old) == spec { + // Hard fork is not needed. + return Ok(()); + } + } + tracing::info!("Performing a hard fork of consensus."); + let genesis = validator::GenesisRaw { + chain_id: spec.chain_id, + fork_number: old + .as_ref() + .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), + first_block: block_range.end, + + protocol_version: spec.protocol_version, + committee: spec.validators.clone(), + leader_selection: spec.leader_selection.clone(), + } + .with_hash(); + txn.try_update_genesis(ctx, &genesis) + .await + .wrap("try_update_genesis()")?; + txn.commit(ctx).await.wrap("commit()")?; + Ok(()) + } + + pub(super) async fn block( + &mut self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { + return Ok(None); + }; + let payload = self + .payload(ctx, number) + .await + .wrap("payload()")? + .context("L2 block disappeared from storage")?; + Ok(Some(validator::FinalBlock { + payload: payload.encode(), + justification, + })) + } +} diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index bc8a0b8b8409..da1fab17cc1c 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -1,342 +1,31 @@ //! Storage implementation based on DAL. -use std::sync::Arc; -use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, sync, time}; -use zksync_consensus_bft::PayloadManager; use zksync_consensus_roles::validator; -use zksync_consensus_storage as storage; -use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; use zksync_node_sync::{ - fetcher::{FetchedBlock, FetchedTransaction, IoCursorExt as _}, + fetcher::{FetchedBlock, IoCursorExt as _}, sync_action::ActionQueueSender, SyncState, }; use zksync_state_keeper::io::common::IoCursor; -use zksync_types::{commitment::L1BatchWithMetadata, L1BatchNumber, L2BlockNumber}; -use super::config; +mod connection; +mod store; + +pub(crate) use connection::*; +pub(crate) use store::*; #[cfg(test)] pub(crate) mod testonly; -/// Context-aware `zksync_dal::ConnectionPool` wrapper. -#[derive(Debug, Clone)] -pub(super) struct ConnectionPool(pub(super) zksync_dal::ConnectionPool); - -impl ConnectionPool { - /// Wrapper for `connection_tagged()`. - pub(super) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { - Ok(Connection( - ctx.wait(self.0.connection_tagged("consensus")) - .await? - .map_err(DalError::generalize)?, - )) - } - - /// Waits for the `number` L2 block. - pub async fn wait_for_payload( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result { - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - if let Some(payload) = self - .connection(ctx) - .await - .wrap("connection()")? - .payload(ctx, number) - .await - .wrap("payload()")? - { - return Ok(payload); - } - ctx.sleep(POLL_INTERVAL).await?; - } - } -} - -/// Context-aware `zksync_dal::Connection` wrapper. -pub(super) struct Connection<'a>(pub(super) zksync_dal::Connection<'a, Core>); - -impl<'a> Connection<'a> { - /// Wrapper for `start_transaction()`. - pub async fn start_transaction<'b, 'c: 'b>( - &'c mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(Connection( - ctx.wait(self.0.start_transaction()) - .await? - .context("sqlx")?, - )) - } - - /// Wrapper for `commit()`. - pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> { - Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) - } - - /// Wrapper for `consensus_dal().block_range()`. - pub async fn block_range( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_range()) - .await? - .context("sqlx")?) - } - - /// Wrapper for `consensus_dal().block_payload()`. - pub async fn payload( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_payload(number)) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().block_payloads()`. - pub async fn payloads( - &mut self, - ctx: &ctx::Ctx, - numbers: std::ops::Range, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_payloads(numbers)) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().first_certificate()`. - pub async fn first_certificate( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().first_certificate()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().last_certificate()`. - pub async fn last_certificate( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().last_certificate()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().certificate()`. - pub async fn certificate( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().certificate(number)) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().insert_certificate()`. - pub async fn insert_certificate( - &mut self, - ctx: &ctx::Ctx, - cert: &validator::CommitQC, - ) -> ctx::Result<()> { - Ok(ctx - .wait(self.0.consensus_dal().insert_certificate(cert)) - .await??) - } - - /// Wrapper for `consensus_dal().replica_state()`. - pub async fn replica_state(&mut self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(ctx - .wait(self.0.consensus_dal().replica_state()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().set_replica_state()`. - pub async fn set_replica_state( - &mut self, - ctx: &ctx::Ctx, - state: &storage::ReplicaState, - ) -> ctx::Result<()> { - Ok(ctx - .wait(self.0.consensus_dal().set_replica_state(state)) - .await? - .context("sqlx")?) - } - - /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. - pub async fn batch( - &mut self, - ctx: &ctx::Ctx, - number: L1BatchNumber, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.blocks_dal().get_l1_batch_metadata(number)) - .await? - .context("get_l1_batch_metadata()")?) - } - - /// Wrapper for `FetcherCursor::new()`. - pub async fn new_payload_queue( - &mut self, - ctx: &ctx::Ctx, - actions: ActionQueueSender, - sync_state: SyncState, - ) -> ctx::Result { - Ok(PayloadQueue { - inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??, - actions, - sync_state, - }) - } - - pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().genesis()) - .await? - .map_err(DalError::generalize)?) - } - - pub async fn try_update_genesis( - &mut self, - ctx: &ctx::Ctx, - genesis: &validator::Genesis, - ) -> ctx::Result<()> { - Ok(ctx - .wait(self.0.consensus_dal().try_update_genesis(genesis)) - .await??) - } - - /// Fetches and verifies consistency of certificates in storage. - async fn certificates_range( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result { - // Fetch the range of L2 blocks in storage. - let block_range = self.block_range(ctx).await.context("block_range")?; - - // Fetch the range of certificates in storage. - let genesis = self - .genesis(ctx) - .await - .wrap("genesis()")? - .context("genesis missing")?; - let first_expected_cert = genesis.first_block.max(block_range.start); - let last_cert = self - .last_certificate(ctx) - .await - .wrap("last_certificate()")?; - let next_expected_cert = last_cert - .as_ref() - .map_or(first_expected_cert, |cert| cert.header().number.next()); - - // Check that the first certificate in storage has the expected L2 block number. - if let Some(got) = self - .first_certificate(ctx) - .await - .wrap("first_certificate()")? - { - if got.header().number != first_expected_cert { - return Err(anyhow::format_err!( - "inconsistent storage: certificates should start at {first_expected_cert}, while they start at {}", - got.header().number, - ).into()); - } - } - - // Check that the node has all the blocks before the next expected certificate, because - // the node needs to know the state of the chain up to block `X` to process block `X+1`. - if block_range.end < next_expected_cert { - return Err(anyhow::format_err!("inconsistent storage: cannot start consensus for L2 block {next_expected_cert}, because earlier blocks are missing").into()); - } - Ok(storage::BlockStoreState { - first: first_expected_cert, - last: last_cert, - }) - } - - /// (Re)initializes consensus genesis to start at the last L2 block in storage. - /// Noop if `spec` matches the current genesis. - pub(super) async fn adjust_genesis( - &mut self, - ctx: &ctx::Ctx, - spec: &config::GenesisSpec, - ) -> ctx::Result<()> { - let block_range = self.block_range(ctx).await.wrap("block_range()")?; - let mut txn = self - .start_transaction(ctx) - .await - .wrap("start_transaction()")?; - let old = txn.genesis(ctx).await.wrap("genesis()")?; - if let Some(old) = &old { - if &config::GenesisSpec::from_genesis(old) == spec { - // Hard fork is not needed. - return Ok(()); - } - } - tracing::info!("Performing a hard fork of consensus."); - let genesis = validator::GenesisRaw { - chain_id: spec.chain_id, - fork_number: old - .as_ref() - .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), - first_block: block_range.end, - - protocol_version: spec.protocol_version, - committee: spec.validators.clone(), - leader_selection: spec.leader_selection.clone(), - } - .with_hash(); - txn.try_update_genesis(ctx, &genesis) - .await - .wrap("try_update_genesis()")?; - txn.commit(ctx).await.wrap("commit()")?; - Ok(()) - } - - pub(super) async fn block( - &mut self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { - return Ok(None); - }; - let payload = self - .payload(ctx, number) - .await - .wrap("payload()")? - .context("L2 block disappeared from storage")?; - Ok(Some(validator::FinalBlock { - payload: payload.encode(), - justification, - })) - } -} - #[derive(Debug)] -pub(super) struct PayloadQueue { +pub(crate) struct PayloadQueue { inner: IoCursor, actions: ActionQueueSender, sync_state: SyncState, } impl PayloadQueue { - pub(super) fn next(&self) -> validator::BlockNumber { + pub(crate) fn next(&self) -> validator::BlockNumber { validator::BlockNumber(self.inner.next_l2_block.0.into()) } @@ -344,7 +33,7 @@ impl PayloadQueue { /// to the actions queue. /// Does nothing and returns Ok() if the block has been already processed. /// Returns an error if a block with an earlier block number was expected. - pub(super) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { + pub(crate) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { let want = self.inner.next_l2_block; // Some blocks are missing. if block.number > want { @@ -358,249 +47,3 @@ impl PayloadQueue { Ok(()) } } - -fn to_fetched_block( - number: validator::BlockNumber, - payload: &validator::Payload, -) -> anyhow::Result { - let number = L2BlockNumber( - number - .0 - .try_into() - .context("Integer overflow converting block number")?, - ); - let payload = Payload::decode(payload).context("Payload::decode()")?; - Ok(FetchedBlock { - number, - l1_batch_number: payload.l1_batch_number, - last_in_batch: payload.last_in_batch, - protocol_version: payload.protocol_version, - timestamp: payload.timestamp, - reference_hash: Some(payload.hash), - l1_gas_price: payload.l1_gas_price, - l2_fair_gas_price: payload.l2_fair_gas_price, - fair_pubdata_price: payload.fair_pubdata_price, - virtual_blocks: payload.virtual_blocks, - operator_address: payload.operator_address, - transactions: payload - .transactions - .into_iter() - .map(FetchedTransaction::new) - .collect(), - }) -} - -/// Wrapper of `ConnectionPool` implementing `ReplicaStore`, `PayloadManager` and -/// `PersistentBlockStore`. -#[derive(Clone, Debug)] -pub(super) struct Store { - pub(super) pool: ConnectionPool, - payloads: Arc>>, - certificates: ctx::channel::UnboundedSender, - persisted: sync::watch::Receiver, -} - -/// Background task of the `Store`. -pub struct StoreRunner { - pool: ConnectionPool, - persisted: sync::watch::Sender, - certificates: ctx::channel::UnboundedReceiver, -} - -impl Store { - pub(super) async fn new( - ctx: &ctx::Ctx, - pool: ConnectionPool, - payload_queue: Option, - ) -> ctx::Result<(Store, StoreRunner)> { - let persisted = pool - .connection(ctx) - .await - .wrap("connection()")? - .certificates_range(ctx) - .await - .wrap("certificates_range()")?; - let persisted = sync::watch::channel(persisted).0; - let (certs_send, certs_recv) = ctx::channel::unbounded(); - Ok(( - Store { - pool: pool.clone(), - certificates: certs_send, - payloads: Arc::new(sync::Mutex::new(payload_queue)), - persisted: persisted.subscribe(), - }, - StoreRunner { - pool, - persisted, - certificates: certs_recv, - }, - )) - } -} - -impl StoreRunner { - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let res = async { - loop { - let cert = self.certificates.recv(ctx).await?; - self.pool - .wait_for_payload(ctx, cert.header().number) - .await - .wrap("wait_for_payload()")?; - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .insert_certificate(ctx, &cert) - .await - .wrap("insert_certificate()")?; - self.persisted.send_modify(|p| p.last = Some(cert)); - } - } - .await; - match res { - Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } -} - -#[async_trait::async_trait] -impl storage::PersistentBlockStore for Store { - async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? - .genesis(ctx) - .await? - .context("not found")?) - } - - fn persisted(&self) -> sync::watch::Receiver { - self.persisted.clone() - } - - async fn block( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result { - Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? - .block(ctx, number) - .await? - .context("not found")?) - } - - /// If actions queue is set (and the block has not been stored yet), - /// the block will be translated into a sequence of actions. - /// The received actions should be fed - /// to `ExternalIO`, so that `StateKeeper` will store the corresponding L2 block in the db. - /// - /// `store_next_block()` call will wait synchronously for the L2 block. - /// Once the L2 block is observed in storage, `store_next_block()` will store a cert for this - /// L2 block. - async fn queue_next_block( - &self, - ctx: &ctx::Ctx, - block: validator::FinalBlock, - ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); - if let Some(payloads) = &mut *payloads { - payloads - .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) - .await - .context("payload_queue.send()")?; - } - self.certificates.send(block.justification); - Ok(()) - } -} - -#[async_trait::async_trait] -impl storage::ReplicaStore for Store { - async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .replica_state(ctx) - .await - .wrap("replica_state()") - } - - async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .set_replica_state(ctx, state) - .await - .wrap("set_replica_state()") - } -} - -#[async_trait::async_trait] -impl PayloadManager for Store { - /// Currently (for the main node) proposing is implemented as just converting an L2 block from db (without a cert) into a payload. - async fn propose( - &self, - ctx: &ctx::Ctx, - block_number: validator::BlockNumber, - ) -> ctx::Result { - const LARGE_PAYLOAD_SIZE: usize = 1 << 20; - let payload = self.pool.wait_for_payload(ctx, block_number).await?; - let encoded_payload = payload.encode(); - if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { - tracing::warn!( - "large payload ({}B) with {} transactions", - encoded_payload.0.len(), - payload.transactions.len() - ); - } - Ok(encoded_payload) - } - - /// Verify that `payload` is a correct proposal for the block `block_number`. - /// * for the main node it checks whether the same block is already present in storage. - /// * for the EN validator - /// * if the block with this number was already applied, it checks that it was the - /// same block. It should always be true, because main node is the only proposer and - /// to propose a different block a hard fork is needed. - /// * otherwise, EN attempts to apply the received block. If the block was incorrect - /// the statekeeper is expected to crash the whole EN. Otherwise OK is returned. - async fn verify( - &self, - ctx: &ctx::Ctx, - block_number: validator::BlockNumber, - payload: &validator::Payload, - ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); - if let Some(payloads) = &mut *payloads { - let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; - let n = block.number; - payloads.send(block).await.context("payload_queue.send()")?; - // Wait for the block to be processed, without waiting for it to be stored. - // TODO(BFT-459): this is not ideal, because we don't check here whether the - // processed block is the same as `payload`. It will work correctly - // with the current implementation of EN, but we should make it more - // precise when block reverting support is implemented. - ctx.wait(payloads.sync_state.wait_for_local_block(n)) - .await?; - } else { - let want = self.pool.wait_for_payload(ctx, block_number).await?; - let got = Payload::decode(payload).context("Payload::decode(got)")?; - if got != want { - return Err( - anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), - ); - } - } - Ok(()) - } -} diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs new file mode 100644 index 000000000000..3b579402580c --- /dev/null +++ b/core/node/consensus/src/storage/store.rs @@ -0,0 +1,266 @@ +use std::sync::Arc; + +use anyhow::Context as _; +use zksync_concurrency::{ctx, error::Wrap as _, sync}; +use zksync_consensus_bft::PayloadManager; +use zksync_consensus_roles::validator; +use zksync_consensus_storage as storage; +use zksync_dal::consensus_dal::Payload; +use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; +use zksync_types::L2BlockNumber; + +use super::PayloadQueue; +use crate::storage::ConnectionPool; + +fn to_fetched_block( + number: validator::BlockNumber, + payload: &validator::Payload, +) -> anyhow::Result { + let number = L2BlockNumber( + number + .0 + .try_into() + .context("Integer overflow converting block number")?, + ); + let payload = Payload::decode(payload).context("Payload::decode()")?; + Ok(FetchedBlock { + number, + l1_batch_number: payload.l1_batch_number, + last_in_batch: payload.last_in_batch, + protocol_version: payload.protocol_version, + timestamp: payload.timestamp, + reference_hash: Some(payload.hash), + l1_gas_price: payload.l1_gas_price, + l2_fair_gas_price: payload.l2_fair_gas_price, + fair_pubdata_price: payload.fair_pubdata_price, + virtual_blocks: payload.virtual_blocks, + operator_address: payload.operator_address, + transactions: payload + .transactions + .into_iter() + .map(FetchedTransaction::new) + .collect(), + }) +} + +/// Wrapper of `ConnectionPool` implementing `ReplicaStore`, `PayloadManager`, +/// `PersistentBlockStore` and `PersistentBatchStore`. +/// +/// Contains queues to save Quorum Certificates received over gossip to the store +/// as and when the payload they are over becomes available. +#[derive(Clone, Debug)] +pub(crate) struct Store { + pub(super) pool: ConnectionPool, + payloads: Arc>>, + /// L2 block QCs received over gossip + certificates: ctx::channel::UnboundedSender, + /// Range of L2 blocks for which we have a QC persisted. + persisted: sync::watch::Receiver, +} + +/// Background task of the `Store`. +pub struct StoreRunner { + pool: ConnectionPool, + persisted: sync::watch::Sender, + certificates: ctx::channel::UnboundedReceiver, +} + +impl Store { + pub async fn new( + ctx: &ctx::Ctx, + pool: ConnectionPool, + payload_queue: Option, + ) -> ctx::Result<(Store, StoreRunner)> { + let persisted = pool + .connection(ctx) + .await + .wrap("connection()")? + .certificates_range(ctx) + .await + .wrap("certificates_range()")?; + let persisted = sync::watch::channel(persisted).0; + let (certs_send, certs_recv) = ctx::channel::unbounded(); + Ok(( + Store { + pool: pool.clone(), + certificates: certs_send, + payloads: Arc::new(sync::Mutex::new(payload_queue)), + persisted: persisted.subscribe(), + }, + StoreRunner { + pool, + persisted, + certificates: certs_recv, + }, + )) + } +} + +impl StoreRunner { + /// Background tasks for the store: + /// * save L2 quorum certificates into the persistent backend + pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let res = async { + loop { + let cert = self.certificates.recv(ctx).await?; + self.pool + .wait_for_payload(ctx, cert.header().number) + .await + .wrap("wait_for_payload()")?; + self.pool + .connection(ctx) + .await + .wrap("connection()")? + .insert_certificate(ctx, &cert) + .await + .wrap("insert_certificate()")?; + self.persisted.send_modify(|p| p.last = Some(cert)); + } + } + .await; + match res { + Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } + } +} + +#[async_trait::async_trait] +impl storage::PersistentBlockStore for Store { + async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .genesis(ctx) + .await? + .context("not found")?) + } + + fn persisted(&self) -> sync::watch::Receiver { + self.persisted.clone() + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .block(ctx, number) + .await? + .context("not found")?) + } + + /// If actions queue is set (and the block has not been stored yet), + /// the block will be translated into a sequence of actions. + /// The received actions should be fed + /// to `ExternalIO`, so that `StateKeeper` will store the corresponding L2 block in the db. + /// + /// `store_next_block()` call will wait synchronously for the L2 block. + /// Once the L2 block is observed in storage, `store_next_block()` will store a cert for this + /// L2 block. + async fn queue_next_block( + &self, + ctx: &ctx::Ctx, + block: validator::FinalBlock, + ) -> ctx::Result<()> { + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + payloads + .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) + .await + .context("payload_queue.send()")?; + } + self.certificates.send(block.justification); + Ok(()) + } +} + +#[async_trait::async_trait] +impl storage::ReplicaStore for Store { + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { + self.pool + .connection(ctx) + .await + .wrap("connection()")? + .replica_state(ctx) + .await + .wrap("replica_state()") + } + + async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { + self.pool + .connection(ctx) + .await + .wrap("connection()")? + .set_replica_state(ctx, state) + .await + .wrap("set_replica_state()") + } +} + +#[async_trait::async_trait] +impl PayloadManager for Store { + /// Currently (for the main node) proposing is implemented as just converting an L2 block from db (without a cert) into a payload. + async fn propose( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + ) -> ctx::Result { + const LARGE_PAYLOAD_SIZE: usize = 1 << 20; + let payload = self.pool.wait_for_payload(ctx, block_number).await?; + let encoded_payload = payload.encode(); + if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { + tracing::warn!( + "large payload ({}B) with {} transactions", + encoded_payload.0.len(), + payload.transactions.len() + ); + } + Ok(encoded_payload) + } + + /// Verify that `payload` is a correct proposal for the block `block_number`. + /// * for the main node it checks whether the same block is already present in storage. + /// * for the EN validator + /// * if the block with this number was already applied, it checks that it was the + /// same block. It should always be true, because main node is the only proposer and + /// to propose a different block a hard fork is needed. + /// * otherwise, EN attempts to apply the received block. If the block was incorrect + /// the statekeeper is expected to crash the whole EN. Otherwise OK is returned. + async fn verify( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()> { + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; + let n = block.number; + payloads.send(block).await.context("payload_queue.send()")?; + // Wait for the block to be processed, without waiting for it to be stored. + // TODO(BFT-459): this is not ideal, because we don't check here whether the + // processed block is the same as `payload`. It will work correctly + // with the current implementation of EN, but we should make it more + // precise when block reverting support is implemented. + ctx.wait(payloads.sync_state.wait_for_local_block(n)) + .await?; + } else { + let want = self.pool.wait_for_payload(ctx, block_number).await?; + let got = Payload::decode(payload).context("Payload::decode(got)")?; + if got != want { + return Err( + anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), + ); + } + } + Ok(()) + } +} diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index d20c379a5d66..499bdbe22b62 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -49,7 +49,8 @@ use zksync_web3_decl::client::{Client, DynClient, L2}; use crate::{ batch::{L1BatchCommit, L1BatchWithWitness, LastBlockCommit}, - en, ConnectionPool, + en, + storage::ConnectionPool, }; /// Fake StateKeeper for tests. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 5db6e250da6c..37034ecb65cf 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -1,7 +1,8 @@ use anyhow::Context as _; +use storage::Store; use test_casing::{test_casing, Product}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::{ctx, error::Wrap, scope}; use zksync_config::configs::consensus::{ValidatorPublicKey, WeightedValidator}; use zksync_consensus_crypto::TextFmt as _; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; @@ -9,9 +10,11 @@ use zksync_consensus_roles::{ validator, validator::testonly::{Setup, SetupSpec}, }; +use zksync_consensus_storage::BlockStore; use zksync_types::{L1BatchNumber, ProtocolVersionId}; use super::*; +use crate::{mn::run_main_node, storage::ConnectionPool}; const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()]; const FROM_SNAPSHOT: [bool; 2] = [true, false]; diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs index 421e13115ef2..d5b65078b03b 100644 --- a/core/node/node_framework/src/implementations/layers/consensus.rs +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -37,7 +37,7 @@ pub enum Mode { /// ## Adds tasks /// /// - `MainNodeConsensusTask` (if `Mode::Main`) -/// - `FetcherTask` (if `Mode::External`) +/// - `ExternalNodeTask` (if `Mode::External`) #[derive(Debug)] pub struct ConsensusLayer { pub mode: Mode, @@ -101,7 +101,7 @@ impl WiringLayer for ConsensusLayer { } }; - let task = FetcherTask { + let task = ExternalNodeTask { config, pool, main_node_client, @@ -130,7 +130,7 @@ impl Task for MainNodeConsensusTask { async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { // We instantiate the root context here, since the consensus task is the only user of the - // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // structured concurrency framework (`MainNodeConsensusTask` and `ExternalNodeTask` are considered mutually // exclusive). // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, // not the consensus task itself. There may have been any number of tasks running in the root context, @@ -151,7 +151,7 @@ impl Task for MainNodeConsensusTask { } #[derive(Debug)] -pub struct FetcherTask { +pub struct ExternalNodeTask { config: Option<(ConsensusConfig, ConsensusSecrets)>, pool: ConnectionPool, main_node_client: Box>, @@ -160,21 +160,21 @@ pub struct FetcherTask { } #[async_trait::async_trait] -impl Task for FetcherTask { +impl Task for ExternalNodeTask { fn id(&self) -> TaskId { "consensus_fetcher".into() } async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { // We instantiate the root context here, since the consensus task is the only user of the - // structured concurrency framework (`MainNodeConsensusTask` and `FetcherTask` are considered mutually + // structured concurrency framework (`MainNodeConsensusTask` and `ExternalNodeTask` are considered mutually // exclusive). // Note, however, that awaiting for the `stop_receiver` is related to the root context behavior, // not the consensus task itself. There may have been any number of tasks running in the root context, // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async { - s.spawn_bg(consensus::era::run_en( + s.spawn_bg(consensus::era::run_external_node( ctx, self.config, self.pool, diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 8719e133ed74..5bec4d092707 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7712,7 +7712,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "once_cell", @@ -7743,13 +7743,15 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "blst", "ed25519-dalek", + "elliptic-curve 0.13.8", "ff_ce", "hex", + "k256 0.13.3", "num-bigint 0.4.5", "num-traits", "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=d24f2c5871089c4cd4f54c0ca266bb9fef6115eb)", @@ -7764,7 +7766,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -7785,7 +7787,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "async-trait", @@ -7803,8 +7805,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ + "anyhow", "rand 0.8.5", "thiserror", "zksync_concurrency", @@ -8102,7 +8105,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -8122,7 +8125,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "heck 0.5.0",