diff --git a/Cargo.lock b/Cargo.lock index 451ce335fc..e19d7d7f5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10196,6 +10196,7 @@ dependencies = [ "serde", "starknet_batcher_types", "starknet_sequencer_infra", + "tokio", "tracing", "validator", ] diff --git a/crates/consensus_manager/Cargo.toml b/crates/consensus_manager/Cargo.toml index cf76cc877b..88b13d65d1 100644 --- a/crates/consensus_manager/Cargo.toml +++ b/crates/consensus_manager/Cargo.toml @@ -21,5 +21,6 @@ papyrus_protobuf.workspace = true serde.workspace = true starknet_batcher_types.workspace = true starknet_sequencer_infra.workspace = true +tokio.workspace = true tracing.workspace = true validator.workspace = true diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index 973c0503ad..9a1f1407ac 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -8,7 +8,7 @@ use futures::SinkExt; use libp2p::PeerId; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; -use papyrus_network::network_manager::BroadcastTopicClient; +use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; use papyrus_protobuf::consensus::ConsensusMessage; use starknet_batcher_types::communication::SharedBatcherClient; @@ -18,6 +18,10 @@ use tracing::{error, info}; use crate::config::ConsensusManagerConfig; +// TODO(Dan, Guy): move to config. +pub const BROADCAST_BUFFER_SIZE: usize = 100; +pub const NETWORK_TOPIC: &str = "consensus_proposals"; + #[derive(Clone)] pub struct ConsensusManager { pub config: ConsensusManagerConfig, @@ -30,12 +34,15 @@ impl ConsensusManager { } pub async fn run(&self) -> Result<(), ConsensusError> { + let network_manager = + NetworkManager::new(self.config.consensus_config.network_config.clone(), None); let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), self.config.consensus_config.num_validators, ); - papyrus_consensus::run_consensus( + let mut network_handle = tokio::task::spawn(network_manager.run()); + let consensus_task = papyrus_consensus::run_consensus( context, self.config.consensus_config.start_height, self.config.consensus_config.validator_id, @@ -43,8 +50,17 @@ impl ConsensusManager { self.config.consensus_config.timeouts.clone(), create_fake_network_channels(), futures::stream::pending(), - ) - .await + ); + + tokio::select! { + consensus_result = consensus_task => { + match consensus_result { + Ok(_) => panic!("Consensus task finished unexpectedly"), + Err(e) => Err(e), + } + }, + _ = &mut network_handle => panic!("Consensus Network handle finished unexpectedly"), + } } } diff --git a/crates/papyrus_node/src/bin/run_consensus.rs b/crates/papyrus_node/src/bin/run_consensus.rs index e7bd28ccbc..ec62abc1d9 100644 --- a/crates/papyrus_node/src/bin/run_consensus.rs +++ b/crates/papyrus_node/src/bin/run_consensus.rs @@ -4,6 +4,7 @@ //! Expects to receive 2 groupings of arguments: //! 1. TestConfig - these are prefixed with `--test.` in the command. //! 2. NodeConfig - any argument lacking the above prefix is assumed to be in NodeConfig. + use clap::Parser; use futures::stream::StreamExt; use papyrus_consensus::config::ConsensusConfig; diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 97714a79fa..d36d045e37 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -2,7 +2,7 @@ #[path = "run_test.rs"] mod run_test; -use std::future::pending; +use std::future; use std::process::exit; use std::sync::Arc; use std::time::Duration; @@ -152,7 +152,7 @@ async fn spawn_rpc_server( _pending_classes: Arc>, _storage_reader: StorageReader, ) -> anyhow::Result>> { - Ok(tokio::spawn(pending())) + Ok(tokio::spawn(future::pending())) } fn spawn_monitoring_server( @@ -178,7 +178,7 @@ fn spawn_consensus( ) -> anyhow::Result>> { let (Some(config), Some(network_manager)) = (config, network_manager) else { info!("Consensus is disabled."); - return Ok(tokio::spawn(pending())); + return Ok(tokio::spawn(future::pending())); }; let config = config.clone(); debug!("Consensus configuration: {config:?}"); @@ -248,7 +248,7 @@ async fn spawn_sync_client( (Some(_), Some(_)) => { panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on"); } - (None, None) => tokio::spawn(pending()), + (None, None) => tokio::spawn(future::pending()), (Some(sync_config), None) => { let configs = (sync_config, config.central.clone(), config.base_layer.clone()); let storage = (storage_reader.clone(), storage_writer); @@ -294,7 +294,7 @@ fn spawn_p2p_sync_server( ) -> JoinHandle> { let Some(network_manager) = network_manager else { info!("P2P Sync is disabled."); - return tokio::spawn(pending()); + return tokio::spawn(future::pending()); }; let header_server_receiver = network_manager @@ -404,7 +404,7 @@ async fn run_threads( } else { match resources.maybe_network_manager { Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }), - None => tokio::spawn(pending()), + None => tokio::spawn(future::pending()), } }; tokio::select! { @@ -460,7 +460,7 @@ fn spawn_storage_metrics_collector( interval: Duration, ) -> JoinHandle> { if !collect_metrics { - return tokio::spawn(pending()); + return tokio::spawn(future::pending()); } tokio::spawn( diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 9645d40c70..926b23f3a6 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -30,6 +30,7 @@ use crate::types::{ // TODO(dvir): add test for this. #[instrument(skip_all, level = "info")] #[allow(missing_docs)] +#[allow(clippy::too_many_arguments)] pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber,