diff --git a/Cargo.lock b/Cargo.lock index 2e1a7412ad..5fc6f92423 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10185,6 +10185,7 @@ dependencies = [ "serde", "starknet_batcher_types", "starknet_sequencer_infra", + "tokio", "tracing", "validator", ] diff --git a/crates/consensus_manager/Cargo.toml b/crates/consensus_manager/Cargo.toml index cf76cc877b..88b13d65d1 100644 --- a/crates/consensus_manager/Cargo.toml +++ b/crates/consensus_manager/Cargo.toml @@ -21,5 +21,6 @@ papyrus_protobuf.workspace = true serde.workspace = true starknet_batcher_types.workspace = true starknet_sequencer_infra.workspace = true +tokio.workspace = true tracing.workspace = true validator.workspace = true diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index 973c0503ad..bbabb8f6d0 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -8,7 +8,7 @@ use futures::SinkExt; use libp2p::PeerId; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; -use papyrus_network::network_manager::BroadcastTopicClient; +use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; use papyrus_protobuf::consensus::ConsensusMessage; use starknet_batcher_types::communication::SharedBatcherClient; @@ -18,6 +18,10 @@ use tracing::{error, info}; use crate::config::ConsensusManagerConfig; +// TODO(Dan, Guy): move to config. +pub const BROADCAST_BUFFER_SIZE: usize = 100; +pub const NETWORK_TOPIC: &str = "consensus_proposals"; + #[derive(Clone)] pub struct ConsensusManager { pub config: ConsensusManagerConfig, @@ -30,12 +34,16 @@ impl ConsensusManager { } pub async fn run(&self) -> Result<(), ConsensusError> { + let network_manager = + NetworkManager::new(self.config.consensus_config.network_config.clone(), None); let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), self.config.consensus_config.num_validators, ); + let network_handle = tokio::task::spawn(network_manager.run()); papyrus_consensus::run_consensus( + network_handle, context, self.config.consensus_config.start_height, self.config.consensus_config.validator_id, diff --git a/crates/papyrus_node/src/bin/run_consensus.rs b/crates/papyrus_node/src/bin/run_consensus.rs index e7bd28ccbc..b4e697b6d9 100644 --- a/crates/papyrus_node/src/bin/run_consensus.rs +++ b/crates/papyrus_node/src/bin/run_consensus.rs @@ -4,6 +4,8 @@ //! Expects to receive 2 groupings of arguments: //! 1. TestConfig - these are prefixed with `--test.` in the command. //! 2. NodeConfig - any argument lacking the above prefix is assumed to be in NodeConfig. +use std::future; + use clap::Parser; use futures::stream::StreamExt; use papyrus_consensus::config::ConsensusConfig; @@ -86,6 +88,7 @@ fn build_consensus( Ok(Some(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( + tokio::spawn(future::pending()), context, consensus_config.start_height, consensus_config.validator_id, diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 97714a79fa..50b1a27605 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -2,7 +2,7 @@ #[path = "run_test.rs"] mod run_test; -use std::future::pending; +use std::future; use std::process::exit; use std::sync::Arc; use std::time::Duration; @@ -152,7 +152,7 @@ async fn spawn_rpc_server( _pending_classes: Arc>, _storage_reader: StorageReader, ) -> anyhow::Result>> { - Ok(tokio::spawn(pending())) + Ok(tokio::spawn(future::pending())) } fn spawn_monitoring_server( @@ -178,7 +178,7 @@ fn spawn_consensus( ) -> anyhow::Result>> { let (Some(config), Some(network_manager)) = (config, network_manager) else { info!("Consensus is disabled."); - return Ok(tokio::spawn(pending())); + return Ok(tokio::spawn(future::pending())); }; let config = config.clone(); debug!("Consensus configuration: {config:?}"); @@ -193,6 +193,7 @@ fn spawn_consensus( ); Ok(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( + tokio::spawn(future::pending()), context, config.start_height, config.validator_id, @@ -248,7 +249,7 @@ async fn spawn_sync_client( (Some(_), Some(_)) => { panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on"); } - (None, None) => tokio::spawn(pending()), + (None, None) => tokio::spawn(future::pending()), (Some(sync_config), None) => { let configs = (sync_config, config.central.clone(), config.base_layer.clone()); let storage = (storage_reader.clone(), storage_writer); @@ -294,7 +295,7 @@ fn spawn_p2p_sync_server( ) -> JoinHandle> { let Some(network_manager) = network_manager else { info!("P2P Sync is disabled."); - return tokio::spawn(pending()); + return tokio::spawn(future::pending()); }; let header_server_receiver = network_manager @@ -404,7 +405,7 @@ async fn run_threads( } else { match resources.maybe_network_manager { Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }), - None => tokio::spawn(pending()), + None => tokio::spawn(future::pending()), } }; tokio::select! { @@ -460,7 +461,7 @@ fn spawn_storage_metrics_collector( interval: Duration, ) -> JoinHandle> { if !collect_metrics { - return tokio::spawn(pending()); + return tokio::spawn(future::pending()); } tokio::spawn( diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 94a1606803..b18cd78443 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -11,10 +11,11 @@ use futures::channel::{mpsc, oneshot}; use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; -use papyrus_network::network_manager::BroadcastTopicClientTrait; +use papyrus_network::network_manager::{BroadcastTopicClientTrait, NetworkError}; use papyrus_protobuf::consensus::{ConsensusMessage, ProposalWrapper}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; +use tokio::task::JoinHandle; use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; @@ -30,7 +31,9 @@ use crate::types::{ // TODO(dvir): add test for this. #[instrument(skip_all, level = "info")] #[allow(missing_docs)] +#[allow(clippy::too_many_arguments)] pub async fn run_consensus( + mut network_handle: JoinHandle>, mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, @@ -70,6 +73,10 @@ where // built and risk equivocating. Therefore, we must only enter the other select branches if // we are certain to leave this height. tokio::select! { + _ = &mut network_handle => { + panic!("Network handle finished unexpectedly"); + }, + decision = run_height => { let decision = decision?; context.decision_reached(decision.block, decision.precommits).await?; diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index e40de91845..a4dc5bddd5 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,5 +1,5 @@ use std::time::Duration; -use std::vec; +use std::{future, vec}; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; @@ -165,6 +165,7 @@ async fn run_consensus_sync() { let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); let consensus_handle = tokio::spawn(async move { run_consensus( + tokio::spawn(future::pending()), context, BlockNumber(1), *VALIDATOR_ID, @@ -224,6 +225,7 @@ async fn run_consensus_sync_cancellation_safety() { let consensus_handle = tokio::spawn(async move { run_consensus( + tokio::spawn(future::pending()), context, BlockNumber(1), *VALIDATOR_ID,