Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Rename consensus tasks and split storage (BFT-476) #2366

Merged
merged 5 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async fn run_core(
// but we only need to wait for stop signal once, and it will be propagated to all child contexts.
let ctx = ctx::root();
scope::run!(&ctx, |ctx, s| async move {
s.spawn_bg(consensus::era::run_en(
s.spawn_bg(consensus::era::run_external_node(
ctx,
cfg,
pool,
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zksync_types::{
};
use zksync_utils::{h256_to_u256, u256_to_h256};

use crate::ConnectionPool;
use crate::storage::ConnectionPool;

/// Commitment to the last block of a batch.
pub(crate) struct LastBlockCommit {
Expand Down
4 changes: 2 additions & 2 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use zksync_node_sync::{
use zksync_types::L2BlockNumber;
use zksync_web3_decl::client::{DynClient, L2};

use super::{config, storage::Store, ConnectionPool, ConsensusConfig, ConsensusSecrets};
use crate::storage;
use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets};
use crate::storage::{self, ConnectionPool};

/// External node.
pub(super) struct EN {
Expand Down
6 changes: 3 additions & 3 deletions core/node/consensus/src/era.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_dal::Core;
use zksync_node_sync::{sync_action::ActionQueueSender, SyncState};
use zksync_web3_decl::client::{DynClient, L2};

use super::{en, storage::ConnectionPool};
use super::{en, mn, storage::ConnectionPool};

/// Runs the consensus task in the main node mode.
pub async fn run_main_node(
Expand All @@ -22,7 +22,7 @@ pub async fn run_main_node(
// Consensus is a new component.
// For now in case of error we just log it and allow the server
// to continue running.
if let Err(err) = super::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await {
if let Err(err) = mn::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await {
tracing::error!("Consensus actor failed: {err:#}");
} else {
tracing::info!("Consensus actor stopped");
Expand All @@ -33,7 +33,7 @@ pub async fn run_main_node(
/// Runs the consensus node for the external node.
/// If `cfg` is `None`, it will just fetch blocks from the main node
/// using JSON RPC, without starting the consensus node.
pub async fn run_en(
pub async fn run_external_node(
ctx: &ctx::Ctx,
cfg: Option<(ConsensusConfig, ConsensusSecrets)>,
pool: zksync_dal::ConnectionPool<Core>,
Expand Down
65 changes: 2 additions & 63 deletions core/node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@

#![allow(clippy::redundant_locals)]
#![allow(clippy::needless_pass_by_ref_mut)]
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::storage::{ConnectionPool, Store};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};

// Currently `batch` module is only used in tests,
// but will be used in production once batch syncing is implemented in consensus.
Expand All @@ -18,64 +12,9 @@ mod batch;
mod config;
mod en;
pub mod era;
mod mn;
mod storage;
#[cfg(test)]
pub(crate) mod testonly;
#[cfg(test)]
mod tests;

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
async fn run_main_node(
ctx: &ctx::Ctx,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
pool: ConnectionPool,
) -> anyhow::Result<()> {
let validator_key = config::validator_key(&secrets)
.context("validator_key")?
.context("missing validator_key")?;
scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;
pool.connection(ctx)
.await
.wrap("connection()")?
.adjust_genesis(ctx, &spec)
.await
.wrap("adjust_genesis()")?;
}
let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("Store::runner()") });
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BlockStore::runner()") });
anyhow::ensure!(
block_store.genesis().leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);

// Dummy batch store - we don't gossip batches yet, but we need one anyway.
let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BatchStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
batch_store,
attester: None,
validator: Some(executor::Validator {
key: validator_key,
replica_store: Box::new(store.clone()),
payload_manager: Box::new(store.clone()),
}),
};
executor.run(ctx).await.context("executor.run()")
})
.await
}
72 changes: 72 additions & 0 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::{
config,
storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
pub async fn run_main_node(
ctx: &ctx::Ctx,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
pool: ConnectionPool,
) -> anyhow::Result<()> {
let validator_key = config::validator_key(&secrets)
.context("validator_key")?
.context("missing validator_key")?;

scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

pool.connection(ctx)
.await
.wrap("connection()")?
.adjust_genesis(ctx, &spec)
.await
.wrap("adjust_genesis()")?;
}

let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?;
s.spawn_bg(runner.run(ctx));

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(runner.run(ctx));

anyhow::ensure!(
block_store.genesis().leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);

// Dummy batch store - we don't gossip batches yet, but we need one anyway.
let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BatchStore::new()")?;
s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
batch_store,
attester: None,
validator: Some(executor::Validator {
key: validator_key,
replica_store: Box::new(store.clone()),
payload_manager: Box::new(store.clone()),
}),
};
executor.run(ctx).await
})
.await
}
Loading
Loading