Skip to content

Commit

Permalink
feat(consensus): run network manager in consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware committed Nov 4, 2024
1 parent 8f5b1d4 commit 898a552
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ papyrus_protobuf.workspace = true
serde.workspace = true
starknet_batcher_types.workspace = true
starknet_sequencer_infra.workspace = true
tokio.workspace = true
tracing.workspace = true
validator.workspace = true
10 changes: 9 additions & 1 deletion crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::SinkExt;
use libp2p::PeerId;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::ConsensusMessage;
use starknet_batcher_types::communication::SharedBatcherClient;
Expand All @@ -18,6 +18,10 @@ use tracing::{error, info};

use crate::config::ConsensusManagerConfig;

// TODO(Dan, Guy): move to config.
pub const BROADCAST_BUFFER_SIZE: usize = 100;
pub const NETWORK_TOPIC: &str = "consensus_proposals";

#[derive(Clone)]
pub struct ConsensusManager {
pub config: ConsensusManagerConfig,
Expand All @@ -30,12 +34,16 @@ impl ConsensusManager {
}

pub async fn run(&self) -> Result<(), ConsensusError> {
let network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);
let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
self.config.consensus_config.num_validators,
);
let network_handle = tokio::task::spawn(network_manager.run());

papyrus_consensus::run_consensus(
network_handle,
context,
self.config.consensus_config.start_height,
self.config.consensus_config.validator_id,
Expand Down
3 changes: 3 additions & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! Expects to receive 2 groupings of arguments:
//! 1. TestConfig - these are prefixed with `--test.` in the command.
//! 2. NodeConfig - any argument lacking the above prefix is assumed to be in NodeConfig.
use std::future;

use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
Expand Down Expand Up @@ -86,6 +88,7 @@ fn build_consensus(

Ok(Some(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
tokio::spawn(future::pending()),
context,
consensus_config.start_height,
consensus_config.validator_id,
Expand Down
15 changes: 8 additions & 7 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[path = "run_test.rs"]
mod run_test;

use std::future::pending;
use std::future;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -152,7 +152,7 @@ async fn spawn_rpc_server(
_pending_classes: Arc<RwLock<PendingClasses>>,
_storage_reader: StorageReader,
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
Ok(tokio::spawn(pending()))
Ok(tokio::spawn(future::pending()))
}

fn spawn_monitoring_server(
Expand All @@ -178,7 +178,7 @@ fn spawn_consensus(
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let (Some(config), Some(network_manager)) = (config, network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
return Ok(tokio::spawn(future::pending()));
};
let config = config.clone();
debug!("Consensus configuration: {config:?}");
Expand All @@ -193,6 +193,7 @@ fn spawn_consensus(
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
tokio::spawn(future::pending()),
context,
config.start_height,
config.validator_id,
Expand Down Expand Up @@ -248,7 +249,7 @@ async fn spawn_sync_client(
(Some(_), Some(_)) => {
panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on");
}
(None, None) => tokio::spawn(pending()),
(None, None) => tokio::spawn(future::pending()),
(Some(sync_config), None) => {
let configs = (sync_config, config.central.clone(), config.base_layer.clone());
let storage = (storage_reader.clone(), storage_writer);
Expand Down Expand Up @@ -294,7 +295,7 @@ fn spawn_p2p_sync_server(
) -> JoinHandle<anyhow::Result<()>> {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
return tokio::spawn(pending());
return tokio::spawn(future::pending());
};

let header_server_receiver = network_manager
Expand Down Expand Up @@ -404,7 +405,7 @@ async fn run_threads(
} else {
match resources.maybe_network_manager {
Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }),
None => tokio::spawn(pending()),
None => tokio::spawn(future::pending()),
}
};
tokio::select! {
Expand Down Expand Up @@ -460,7 +461,7 @@ fn spawn_storage_metrics_collector(
interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
if !collect_metrics {
return tokio::spawn(pending());
return tokio::spawn(future::pending());
}

tokio::spawn(
Expand Down
9 changes: 8 additions & 1 deletion crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network::network_manager::{BroadcastTopicClientTrait, NetworkError};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
Expand All @@ -30,7 +31,9 @@ use crate::types::{
// TODO(dvir): add test for this.
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
pub async fn run_consensus<ContextT, SyncReceiverT>(
mut network_handle: JoinHandle<Result<(), NetworkError>>,
mut context: ContextT,
start_height: BlockNumber,
validator_id: ValidatorId,
Expand Down Expand Up @@ -70,6 +73,10 @@ where
// built and risk equivocating. Therefore, we must only enter the other select branches if
// we are certain to leave this height.
tokio::select! {
_ = &mut network_handle => {
panic!("Network handle finished unexpectedly");
},

decision = run_height => {
let decision = decision?;
context.decision_reached(decision.block, decision.precommits).await?;
Expand Down
4 changes: 3 additions & 1 deletion crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::time::Duration;
use std::vec;
use std::{future, vec};

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -165,6 +165,7 @@ async fn run_consensus_sync() {
let (mut sync_sender, mut sync_receiver) = mpsc::unbounded();
let consensus_handle = tokio::spawn(async move {
run_consensus(
tokio::spawn(future::pending()),
context,
BlockNumber(1),
*VALIDATOR_ID,
Expand Down Expand Up @@ -224,6 +225,7 @@ async fn run_consensus_sync_cancellation_safety() {

let consensus_handle = tokio::spawn(async move {
run_consensus(
tokio::spawn(future::pending()),
context,
BlockNumber(1),
*VALIDATOR_ID,
Expand Down

0 comments on commit 898a552

Please sign in to comment.