Skip to content

Commit

Permalink
refactor: Rename consensus tasks and split storage (BFT-476) (#2357)
Browse files Browse the repository at this point in the history
## What ❔
* Renames `FetcherTask` to `ExternalNodeTask`
* Moves `run_main_node` to `mn::run_main_node` to match
`en::run_external_node`
* Splits `consensus::storage` into `consensus::storage::connection` and
`consensus::storage::store`

## Why ❔

I'm working on #2340 where
I made these changes either because the naming was confusing or the
module was getting very long and I thought it would make it easier to
have it in two before adding more trait implementations to it. The PR
was getting huge even before I did any actual work, so I decided to make
a pure refactoring PR to make the other one easier to review later.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
aakoshh authored Jul 1, 2024
1 parent 2dd35dd commit 107e1a7
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 703 deletions.
2 changes: 1 addition & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async fn run_core(
// but we only need to wait for stop signal once, and it will be propagated to all child contexts.
let ctx = ctx::root();
scope::run!(&ctx, |ctx, s| async move {
s.spawn_bg(consensus::era::run_en(
s.spawn_bg(consensus::era::run_external_node(
ctx,
cfg,
pool,
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zksync_types::{
};
use zksync_utils::{h256_to_u256, u256_to_h256};

use crate::ConnectionPool;
use crate::storage::ConnectionPool;

/// Commitment to the last block of a batch.
pub(crate) struct LastBlockCommit {
Expand Down
4 changes: 2 additions & 2 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use zksync_node_sync::{
use zksync_types::L2BlockNumber;
use zksync_web3_decl::client::{DynClient, L2};

use super::{config, storage::Store, ConnectionPool, ConsensusConfig, ConsensusSecrets};
use crate::storage;
use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets};
use crate::storage::{self, ConnectionPool};

/// External node.
pub(super) struct EN {
Expand Down
6 changes: 3 additions & 3 deletions core/node/consensus/src/era.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_dal::Core;
use zksync_node_sync::{sync_action::ActionQueueSender, SyncState};
use zksync_web3_decl::client::{DynClient, L2};

use super::{en, storage::ConnectionPool};
use super::{en, mn, storage::ConnectionPool};

/// Runs the consensus task in the main node mode.
pub async fn run_main_node(
Expand All @@ -22,7 +22,7 @@ pub async fn run_main_node(
// Consensus is a new component.
// For now in case of error we just log it and allow the server
// to continue running.
if let Err(err) = super::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await {
if let Err(err) = mn::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await {
tracing::error!("Consensus actor failed: {err:#}");
} else {
tracing::info!("Consensus actor stopped");
Expand All @@ -33,7 +33,7 @@ pub async fn run_main_node(
/// Runs the consensus node for the external node.
/// If `cfg` is `None`, it will just fetch blocks from the main node
/// using JSON RPC, without starting the consensus node.
pub async fn run_en(
pub async fn run_external_node(
ctx: &ctx::Ctx,
cfg: Option<(ConsensusConfig, ConsensusSecrets)>,
pool: zksync_dal::ConnectionPool<Core>,
Expand Down
65 changes: 2 additions & 63 deletions core/node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@
#![allow(clippy::redundant_locals)]
#![allow(clippy::needless_pass_by_ref_mut)]
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::storage::{ConnectionPool, Store};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};

// Currently `batch` module is only used in tests,
// but will be used in production once batch syncing is implemented in consensus.
Expand All @@ -18,64 +12,9 @@ mod batch;
mod config;
mod en;
pub mod era;
mod mn;
mod storage;
#[cfg(test)]
pub(crate) mod testonly;
#[cfg(test)]
mod tests;

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
async fn run_main_node(
ctx: &ctx::Ctx,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
pool: ConnectionPool,
) -> anyhow::Result<()> {
let validator_key = config::validator_key(&secrets)
.context("validator_key")?
.context("missing validator_key")?;
scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;
pool.connection(ctx)
.await
.wrap("connection()")?
.adjust_genesis(ctx, &spec)
.await
.wrap("adjust_genesis()")?;
}
let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("Store::runner()") });
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BlockStore::runner()") });
anyhow::ensure!(
block_store.genesis().leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);

// Dummy batch store - we don't gossip batches yet, but we need one anyway.
let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BatchStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
batch_store,
attester: None,
validator: Some(executor::Validator {
key: validator_key,
replica_store: Box::new(store.clone()),
payload_manager: Box::new(store.clone()),
}),
};
executor.run(ctx).await.context("executor.run()")
})
.await
}
72 changes: 72 additions & 0 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::{
config,
storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
pub async fn run_main_node(
ctx: &ctx::Ctx,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
pool: ConnectionPool,
) -> anyhow::Result<()> {
let validator_key = config::validator_key(&secrets)
.context("validator_key")?
.context("missing validator_key")?;

scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

pool.connection(ctx)
.await
.wrap("connection()")?
.adjust_genesis(ctx, &spec)
.await
.wrap("adjust_genesis()")?;
}

let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?;
s.spawn_bg(runner.run(ctx));

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(runner.run(ctx));

anyhow::ensure!(
block_store.genesis().leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);

// Dummy batch store - we don't gossip batches yet, but we need one anyway.
let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BatchStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
batch_store,
attester: None,
validator: Some(executor::Validator {
key: validator_key,
replica_store: Box::new(store.clone()),
payload_manager: Box::new(store.clone()),
}),
};
executor.run(ctx).await
})
.await
}
Loading

0 comments on commit 107e1a7

Please sign in to comment.