diff --git a/crates/example-types/src/node_types.rs b/crates/example-types/src/node_types.rs index 18de95cc22..b84c2da482 100644 --- a/crates/example-types/src/node_types.rs +++ b/crates/example-types/src/node_types.rs @@ -8,12 +8,13 @@ use crate::{ use hotshot::traits::{ election::static_committee::{StaticCommittee, StaticElectionConfig}, implementations::{ - CombinedCommChannel, Libp2pCommChannel, MemoryCommChannel, MemoryStorage, WebCommChannel, + CombinedNetworks, Libp2pNetwork, MemoryNetwork, MemoryStorage, WebServerNetwork, }, NodeImplementation, }; use hotshot_types::{ - data::ViewNumber, signature_key::BLSPubKey, traits::node_implementation::NodeType, + data::ViewNumber, message::Message, signature_key::BLSPubKey, + traits::node_implementation::NodeType, }; use serde::{Deserialize, Serialize}; @@ -65,34 +66,31 @@ pub struct CombinedImpl; pub type StaticMembership = StaticCommittee; /// memory network -pub type StaticMemoryDAComm = MemoryCommChannel; +pub type StaticMemoryDAComm = + MemoryNetwork, ::SignatureKey>; /// libp2p network -type StaticLibp2pDAComm = Libp2pCommChannel; +type StaticLibp2pDAComm = Libp2pNetwork, ::SignatureKey>; /// web server network communication channel -type StaticWebDAComm = WebCommChannel; +type StaticWebDAComm = WebServerNetwork; /// combined network -type StaticCombinedDAComm = CombinedCommChannel; +type StaticCombinedDAComm = CombinedNetworks; /// memory comm channel -pub type StaticMemoryQuorumComm = MemoryCommChannel; +pub type StaticMemoryQuorumComm = + MemoryNetwork, ::SignatureKey>; /// libp2p comm channel -type StaticLibp2pQuorumComm = Libp2pCommChannel; +type StaticLibp2pQuorumComm = + Libp2pNetwork, ::SignatureKey>; /// web server comm channel -type StaticWebQuorumComm = WebCommChannel; +type StaticWebQuorumComm = WebServerNetwork; /// combined network (libp2p + web server) -type StaticCombinedQuorumComm = CombinedCommChannel; - -/// memory network -pub type StaticMemoryViewSyncComm = MemoryCommChannel; - -/// memory network -pub type StaticMemoryVIDComm = MemoryCommChannel; +type StaticCombinedQuorumComm = CombinedNetworks; impl NodeImplementation for Libp2pImpl { type Storage = MemoryStorage; diff --git a/crates/examples/combined/all.rs b/crates/examples/combined/all.rs index 1d6f1c0a89..6249abb909 100644 --- a/crates/examples/combined/all.rs +++ b/crates/examples/combined/all.rs @@ -20,7 +20,7 @@ use tracing::{error, instrument}; use crate::{ infra::run_orchestrator, infra::{ConfigArgs, OrchestratorArgs}, - types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; /// general infra used for this example @@ -78,8 +78,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -96,19 +94,13 @@ async fn main() { for _ in 0..config.config.total_nodes.into() { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url, - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/combined/multi-validator.rs b/crates/examples/combined/multi-validator.rs index d97a7f7bc3..f903f1e57c 100644 --- a/crates/examples/combined/multi-validator.rs +++ b/crates/examples/combined/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/combined/orchestrator.rs b/crates/examples/combined/orchestrator.rs index d4ced1536c..0069093505 100644 --- a/crates/examples/combined/orchestrator.rs +++ b/crates/examples/combined/orchestrator.rs @@ -6,11 +6,10 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::instrument; -use types::VIDNetwork; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -27,8 +26,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/combined/types.rs b/crates/examples/combined/types.rs index 94c316f7ab..9a904b592f 100644 --- a/crates/examples/combined/types.rs +++ b/crates/examples/combined/types.rs @@ -1,5 +1,5 @@ use crate::infra::CombinedDARun; -use hotshot::traits::implementations::{CombinedCommChannel, MemoryStorage}; +use hotshot::traits::implementations::{CombinedNetworks, MemoryStorage}; use hotshot_example_types::state_types::TestTypes; use hotshot_types::traits::node_implementation::NodeImplementation; use serde::{Deserialize, Serialize}; @@ -10,13 +10,13 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = CombinedCommChannel; +pub type DANetwork = CombinedNetworks; /// convenience type alias -pub type VIDNetwork = CombinedCommChannel; +pub type VIDNetwork = CombinedNetworks; /// convenience type alias -pub type QuorumNetwork = CombinedCommChannel; +pub type QuorumNetwork = CombinedNetworks; /// convenience type alias -pub type ViewSyncNetwork = CombinedCommChannel; +pub type ViewSyncNetwork = CombinedNetworks; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/combined/validator.rs b/crates/examples/combined/validator.rs index d0493134d6..38c8dbe0b8 100644 --- a/crates/examples/combined/validator.rs +++ b/crates/examples/combined/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index cdc42122a4..80ef89bb02 100644 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -5,13 +5,10 @@ use async_lock::RwLock; use async_trait::async_trait; use clap::Parser; use futures::StreamExt; -use hotshot::traits::implementations::{CombinedCommChannel, CombinedNetworks}; +use hotshot::traits::implementations::{CombinedNetworks, UnderlyingCombinedNetworks}; use hotshot::{ traits::{ - implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, - WebCommChannel, WebServerNetwork, - }, + implementations::{Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, WebServerNetwork}, NodeImplementation, }, types::{SignatureKey, SystemContextHandle}, @@ -37,7 +34,6 @@ use hotshot_types::{ traits::{ block_contents::TestableBlock, election::Membership, - network::CommunicationChannel, node_implementation::{ConsensusTime, NodeType}, states::TestableState, }, @@ -130,10 +126,8 @@ pub fn load_config_from_file( /// Runs the orchestrator pub async fn run_orchestrator< TYPES: NodeType, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DACHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMCHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation>, >( OrchestratorArgs { url, config_file }: OrchestratorArgs, @@ -305,14 +299,12 @@ async fn libp2p_network_from_config( #[async_trait] pub trait RunDA< TYPES: NodeType, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DANET: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMNET: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation< TYPES, - QuorumNetwork = QUORUMCHANNEL, - CommitteeNetwork = DACHANNEL, + QuorumNetwork = QUORUMNET, + CommitteeNetwork = DANET, Storage = MemoryStorage, >, > where @@ -354,8 +346,8 @@ pub trait RunDA< config.config.da_committee_size.try_into().unwrap(), ); let networks_bundle = Networks { - quorum_network: quorum_network.clone(), - da_network: da_network.clone(), + quorum_network: quorum_network.clone().into(), + da_network: da_network.clone().into(), _pd: PhantomData, }; @@ -489,16 +481,10 @@ pub trait RunDA< } /// Returns the da network for this run - fn get_da_channel(&self) -> DACHANNEL; + fn get_da_channel(&self) -> DANET; /// Returns the quorum network for this run - fn get_quorum_channel(&self) -> QUORUMCHANNEL; - - ///Returns view sync network for this run - fn get_view_sync_channel(&self) -> VIEWSYNCCHANNEL; - - ///Returns VID network for this run - fn get_vid_channel(&self) -> VIDCHANNEL; + fn get_quorum_channel(&self) -> QUORUMNET; /// Returns the config for this run fn get_config(&self) -> NetworkConfig; @@ -511,13 +497,9 @@ pub struct WebServerDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: WebCommChannel, + quorum_channel: WebServerNetwork, /// data availability channel - da_channel: WebCommChannel, - /// view sync channel - view_sync_channel: WebCommChannel, - /// vid channel - vid_channel: WebCommChannel, + da_channel: WebServerNetwork, } #[async_trait] @@ -530,19 +512,11 @@ impl< >, NODE: NodeImplementation< TYPES, - QuorumNetwork = WebCommChannel, - CommitteeNetwork = WebCommChannel, + QuorumNetwork = WebServerNetwork, + CommitteeNetwork = WebServerNetwork, Storage = MemoryStorage, >, - > - RunDA< - TYPES, - WebCommChannel, - WebCommChannel, - WebCommChannel, - WebCommChannel, - NODE, - > for WebServerDARun + > RunDA, WebServerNetwork, NODE> for WebServerDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -567,46 +541,24 @@ where underlying_quorum_network.wait_for_ready().await; - // create communication channels - let quorum_channel: WebCommChannel = - WebCommChannel::new(underlying_quorum_network.clone().into()); - - let view_sync_channel: WebCommChannel = - WebCommChannel::new(underlying_quorum_network.into()); - - let da_channel: WebCommChannel = WebCommChannel::new( - WebServerNetwork::create(url.clone(), wait_between_polls, pub_key.clone(), true).into(), - ); - - let vid_channel: WebCommChannel = WebCommChannel::new( - WebServerNetwork::create(url, wait_between_polls, pub_key, true).into(), - ); + let da_channel: WebServerNetwork = + WebServerNetwork::create(url.clone(), wait_between_polls, pub_key.clone(), true); WebServerDARun { config, - quorum_channel, + quorum_channel: underlying_quorum_network, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> WebCommChannel { + fn get_da_channel(&self) -> WebServerNetwork { self.da_channel.clone() } - fn get_quorum_channel(&self) -> WebCommChannel { + fn get_quorum_channel(&self) -> WebServerNetwork { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> WebCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> WebCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -619,13 +571,9 @@ pub struct Libp2pDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: Libp2pCommChannel, + quorum_channel: Libp2pNetwork, TYPES::SignatureKey>, /// data availability channel - da_channel: Libp2pCommChannel, - /// view sync channel - view_sync_channel: Libp2pCommChannel, - /// vid channel - vid_channel: Libp2pCommChannel, + da_channel: Libp2pNetwork, TYPES::SignatureKey>, } #[async_trait] @@ -638,17 +586,15 @@ impl< >, NODE: NodeImplementation< TYPES, - QuorumNetwork = Libp2pCommChannel, - CommitteeNetwork = Libp2pCommChannel, + QuorumNetwork = Libp2pNetwork, TYPES::SignatureKey>, + CommitteeNetwork = Libp2pNetwork, TYPES::SignatureKey>, Storage = MemoryStorage, >, > RunDA< TYPES, - Libp2pCommChannel, - Libp2pCommChannel, - Libp2pCommChannel, - Libp2pCommChannel, + Libp2pNetwork, TYPES::SignatureKey>, + Libp2pNetwork, TYPES::SignatureKey>, NODE, > for Libp2pDARun where @@ -663,49 +609,26 @@ where let pub_key = config.config.my_own_validator_config.public_key.clone(); // create and wait for underlying network - let underlying_quorum_network = - libp2p_network_from_config::(config.clone(), pub_key).await; - - underlying_quorum_network.wait_for_ready().await; - - // create communication channels - let quorum_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - - let view_sync_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); + let quorum_channel = libp2p_network_from_config::(config.clone(), pub_key).await; - let da_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - - let vid_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); + let da_channel = quorum_channel.clone(); + quorum_channel.wait_for_ready().await; Libp2pDARun { config, quorum_channel, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> Libp2pCommChannel { + fn get_da_channel(&self) -> Libp2pNetwork, TYPES::SignatureKey> { self.da_channel.clone() } - fn get_quorum_channel(&self) -> Libp2pCommChannel { + fn get_quorum_channel(&self) -> Libp2pNetwork, TYPES::SignatureKey> { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> Libp2pCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> Libp2pCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -718,13 +641,9 @@ pub struct CombinedDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: CombinedCommChannel, + quorum_channel: CombinedNetworks, /// data availability channel - da_channel: CombinedCommChannel, - /// view sync channel - view_sync_channel: CombinedCommChannel, - /// vid channel - vid_channel: CombinedCommChannel, + da_channel: CombinedNetworks, } #[async_trait] @@ -738,18 +657,10 @@ impl< NODE: NodeImplementation< TYPES, Storage = MemoryStorage, - QuorumNetwork = CombinedCommChannel, - CommitteeNetwork = CombinedCommChannel, + QuorumNetwork = CombinedNetworks, + CommitteeNetwork = CombinedNetworks, >, - > - RunDA< - TYPES, - CombinedCommChannel, - CombinedCommChannel, - CombinedCommChannel, - CombinedCommChannel, - NODE, - > for CombinedDARun + > RunDA, CombinedNetworks, NODE> for CombinedDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -779,61 +690,39 @@ where }: WebServerConfig = config.clone().da_web_server_config.unwrap(); // create and wait for underlying webserver network - let webserver_underlying_quorum_network = + let web_quorum_network = webserver_network_from_config::(config.clone(), pub_key.clone()); - let webserver_underlying_da_network = - WebServerNetwork::create(url, wait_between_polls, pub_key, true); + let web_da_network = WebServerNetwork::create(url, wait_between_polls, pub_key, true); - webserver_underlying_quorum_network.wait_for_ready().await; + web_quorum_network.wait_for_ready().await; - // combine the two communication channels - let quorum_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network.clone(), - libp2p_underlying_quorum_network.clone(), - ))); + // combine the two communication channel - let view_sync_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network.clone(), + let da_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks( + web_da_network.clone(), libp2p_underlying_quorum_network.clone(), ))); - - let da_channel: CombinedCommChannel = - CombinedCommChannel::new(Arc::new(CombinedNetworks( - webserver_underlying_da_network, - libp2p_underlying_quorum_network.clone(), - ))); - - let vid_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network, - libp2p_underlying_quorum_network, + let quorum_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks( + web_quorum_network.clone(), + libp2p_underlying_quorum_network.clone(), ))); CombinedDARun { config, quorum_channel, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> CombinedCommChannel { + fn get_da_channel(&self) -> CombinedNetworks { self.da_channel.clone() } - fn get_quorum_channel(&self) -> CombinedCommChannel { + fn get_quorum_channel(&self) -> CombinedNetworks { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> CombinedCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> CombinedCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -849,17 +738,15 @@ pub async fn main_entry_point< BlockHeader = TestBlockHeader, InstanceState = TestInstanceState, >, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DACHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMCHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation< TYPES, QuorumNetwork = QUORUMCHANNEL, CommitteeNetwork = DACHANNEL, Storage = MemoryStorage, >, - RUNDA: RunDA, + RUNDA: RunDA, >( args: ValidatorArgs, ) where diff --git a/crates/examples/libp2p/all.rs b/crates/examples/libp2p/all.rs index 5b67f667d7..ddc8b472b9 100644 --- a/crates/examples/libp2p/all.rs +++ b/crates/examples/libp2p/all.rs @@ -18,7 +18,7 @@ use tracing::instrument; use crate::{ infra::run_orchestrator, infra::{ConfigArgs, OrchestratorArgs}, - types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; /// general infra used for this example @@ -44,8 +44,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -61,19 +59,13 @@ async fn main() { for _ in 0..config.config.total_nodes.into() { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url, - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/libp2p/multi-validator.rs b/crates/examples/libp2p/multi-validator.rs index aec3325383..e085b498f6 100644 --- a/crates/examples/libp2p/multi-validator.rs +++ b/crates/examples/libp2p/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/libp2p/orchestrator.rs b/crates/examples/libp2p/orchestrator.rs index c26fc73bba..42e23a39d6 100644 --- a/crates/examples/libp2p/orchestrator.rs +++ b/crates/examples/libp2p/orchestrator.rs @@ -10,7 +10,7 @@ use tracing::instrument; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -27,8 +27,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/libp2p/types.rs b/crates/examples/libp2p/types.rs index af535db8d7..500581b9d6 100644 --- a/crates/examples/libp2p/types.rs +++ b/crates/examples/libp2p/types.rs @@ -1,7 +1,10 @@ use crate::infra::Libp2pDARun; -use hotshot::traits::implementations::{Libp2pCommChannel, MemoryStorage}; +use hotshot::traits::implementations::{Libp2pNetwork, MemoryStorage}; use hotshot_example_types::state_types::TestTypes; -use hotshot_types::traits::node_implementation::NodeImplementation; +use hotshot_types::{ + message::Message, + traits::node_implementation::{NodeImplementation, NodeType}, +}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -10,13 +13,9 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = Libp2pCommChannel; +pub type DANetwork = Libp2pNetwork, ::SignatureKey>; /// convenience type alias -pub type VIDNetwork = Libp2pCommChannel; -/// convenience type alias -pub type QuorumNetwork = Libp2pCommChannel; -/// convenience type alias -pub type ViewSyncNetwork = Libp2pCommChannel; +pub type QuorumNetwork = Libp2pNetwork, ::SignatureKey>; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/libp2p/validator.rs b/crates/examples/libp2p/validator.rs index 9873cac76e..cebcd44d04 100644 --- a/crates/examples/libp2p/validator.rs +++ b/crates/examples/libp2p/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/examples/webserver/all.rs b/crates/examples/webserver/all.rs index d70de4bba5..b68b3ec46e 100644 --- a/crates/examples/webserver/all.rs +++ b/crates/examples/webserver/all.rs @@ -7,7 +7,7 @@ use crate::infra::{ConfigArgs, OrchestratorArgs}; use crate::types::ThisRun; use crate::{ infra::run_orchestrator, - types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; @@ -24,7 +24,6 @@ use hotshot_orchestrator::config::NetworkConfig; use hotshot_types::traits::node_implementation::NodeType; use surf_disco::Url; use tracing::error; -use types::VIDNetwork; #[cfg_attr(async_executor_impl = "tokio", tokio::main)] #[cfg_attr(async_executor_impl = "async-std", async_std::main)] @@ -73,8 +72,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -90,19 +87,13 @@ async fn main() { for _ in 0..(config.config.total_nodes.get()) { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url.clone(), - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/webserver/multi-validator.rs b/crates/examples/webserver/multi-validator.rs index b975f07b0f..61d81c79c5 100644 --- a/crates/examples/webserver/multi-validator.rs +++ b/crates/examples/webserver/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/webserver/orchestrator.rs b/crates/examples/webserver/orchestrator.rs index 62f2006f2e..49080dae8a 100644 --- a/crates/examples/webserver/orchestrator.rs +++ b/crates/examples/webserver/orchestrator.rs @@ -7,11 +7,10 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::instrument; -use types::VIDNetwork; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -28,8 +27,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/webserver/types.rs b/crates/examples/webserver/types.rs index 0e67f7a742..95abc27be1 100644 --- a/crates/examples/webserver/types.rs +++ b/crates/examples/webserver/types.rs @@ -1,5 +1,5 @@ use crate::infra::WebServerDARun; -use hotshot::traits::implementations::{MemoryStorage, WebCommChannel}; +use hotshot::traits::implementations::{MemoryStorage, WebServerNetwork}; use hotshot_example_types::state_types::TestTypes; use hotshot_types::traits::node_implementation::NodeImplementation; use serde::{Deserialize, Serialize}; @@ -10,13 +10,13 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = WebCommChannel; +pub type DANetwork = WebServerNetwork; /// convenience type alias -pub type VIDNetwork = WebCommChannel; +pub type VIDNetwork = WebServerNetwork; /// convenience type alias -pub type QuorumNetwork = WebCommChannel; +pub type QuorumNetwork = WebServerNetwork; /// convenience type alias -pub type ViewSyncNetwork = WebCommChannel; +pub type ViewSyncNetwork = WebServerNetwork; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/webserver/validator.rs b/crates/examples/webserver/validator.rs index 96bcde1807..e335cae2be 100644 --- a/crates/examples/webserver/validator.rs +++ b/crates/examples/webserver/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index e948c47879..240c740f06 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -42,7 +42,8 @@ use hotshot_types::{ simple_certificate::QuorumCertificate, traits::{ consensus_api::ConsensusApi, - network::CommunicationChannel, + election::Membership, + network::ConnectedNetwork, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, states::ValidatedState, @@ -78,10 +79,10 @@ pub const H_256: usize = 32; /// Bundle of the networks used in consensus pub struct Networks> { /// Newtork for reaching all nodes - pub quorum_network: I::QuorumNetwork, + pub quorum_network: Arc, /// Network for reaching the DA committee - pub da_network: I::CommitteeNetwork, + pub da_network: Arc, /// Phantom for TYPES and I pub _pd: PhantomData<(TYPES, I)>, @@ -321,7 +322,7 @@ impl> SystemContext { sender: api.inner.public_key.clone(), kind: MessageKind::from(message), }, - da_membership, + da_membership.get_committee(TYPES::Time::new(0)), ), api .send_external_event(Event { diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 172f904b9f..14ef94c48a 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -15,18 +15,21 @@ use hotshot_task_impls::{ vid::VIDTaskState, view_sync::ViewSyncTaskState, }; -use hotshot_types::traits::election::Membership; use hotshot_types::{ event::Event, message::Messages, traits::{ block_contents::vid_commitment, consensus_api::ConsensusApi, - network::{CommunicationChannel, ConsensusIntentEvent, TransmitType}, + network::{ConsensusIntentEvent, TransmitType}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, BlockPayload, }, }; +use hotshot_types::{ + message::Message, + traits::{election::Membership, network::ConnectedNetwork}, +}; use std::{ collections::{BTreeMap, HashMap, HashSet}, marker::PhantomData, @@ -45,10 +48,13 @@ pub enum GlobalEvent { } /// Add the network task to handle messages and publish events. -pub async fn add_network_message_task>( +pub async fn add_network_message_task< + TYPES: NodeType, + NET: ConnectedNetwork, TYPES::SignatureKey>, +>( task_reg: Arc, event_stream: Sender>, - channel: NET, + channel: Arc, ) { let net = channel.clone(); let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState { @@ -105,11 +111,14 @@ pub async fn add_network_message_task>( +pub async fn add_network_event_task< + TYPES: NodeType, + NET: ConnectedNetwork, TYPES::SignatureKey>, +>( task_reg: Arc, tx: Sender>, rx: Receiver>, - channel: NET, + channel: Arc, membership: TYPES::Membership, filter: fn(&HotShotEvent) -> bool, ) { @@ -168,8 +177,8 @@ pub async fn create_consensus_state>( consensus: handle.hotshot.get_consensus(), cur_view: TYPES::Time::new(0), vote_collector: None, - network: c_api.inner.networks.quorum_network.clone().into(), + network: c_api.inner.networks.quorum_network.clone(), membership: c_api.inner.memberships.vid_membership.clone().into(), public_key: c_api.public_key().clone(), private_key: c_api.private_key().clone(), @@ -262,7 +271,7 @@ pub async fn add_upgrade_task>( api: c_api.clone(), cur_view: TYPES::Time::new(0), quorum_membership: c_api.inner.memberships.quorum_membership.clone().into(), - quorum_network: c_api.inner.networks.quorum_network.clone().into(), + quorum_network: c_api.inner.networks.quorum_network.clone(), should_vote: |_upgrade_proposal| false, vote_collector: None.into(), public_key: c_api.public_key().clone(), @@ -288,7 +297,7 @@ pub async fn add_da_task>( api: c_api.clone(), consensus: handle.hotshot.get_consensus(), da_membership: c_api.inner.memberships.da_membership.clone().into(), - da_network: c_api.inner.networks.da_network.clone().into(), + da_network: c_api.inner.networks.da_network.clone(), quorum_membership: c_api.inner.memberships.quorum_membership.clone().into(), cur_view: TYPES::Time::new(0), vote_collector: None.into(), @@ -318,7 +327,7 @@ pub async fn add_transaction_task> transactions: Arc::default(), seen_transactions: HashSet::new(), cur_view: TYPES::Time::new(0), - network: c_api.inner.networks.quorum_network.clone().into(), + network: c_api.inner.networks.quorum_network.clone(), membership: c_api.inner.memberships.quorum_membership.clone().into(), public_key: c_api.public_key().clone(), private_key: c_api.private_key().clone(), @@ -342,7 +351,7 @@ pub async fn add_view_sync_task>( let view_sync_state = ViewSyncTaskState { current_view: TYPES::Time::new(0), next_view: TYPES::Time::new(0), - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.view_sync_membership.clone().into(), public_key: api.public_key().clone(), private_key: api.private_key().clone(), diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index 3bd07c4c15..48a2669493 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -13,10 +13,12 @@ pub use storage::{Result as StorageResult, Storage}; pub mod implementations { pub use super::{ networking::{ - combined_network::{calculate_hash_of, Cache, CombinedCommChannel, CombinedNetworks}, - libp2p_network::{Libp2pCommChannel, Libp2pNetwork, PeerInfoVec}, - memory_network::{MasterMap, MemoryCommChannel, MemoryNetwork}, - web_server_network::{WebCommChannel, WebServerNetwork}, + combined_network::{ + calculate_hash_of, Cache, CombinedNetworks, UnderlyingCombinedNetworks, + }, + libp2p_network::{Libp2pNetwork, PeerInfoVec}, + memory_network::{MasterMap, MemoryNetwork}, + web_server_network::WebServerNetwork, NetworkingMetricsValue, }, storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage, diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index ae093c5f72..d401ba15e8 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -8,7 +8,7 @@ use hotshot_constants::{ COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL, }; use std::{ - collections::HashSet, + collections::{BTreeSet, HashSet}, hash::Hasher, sync::atomic::{AtomicU64, Ordering}, }; @@ -26,11 +26,7 @@ use hotshot_types::{ data::ViewNumber, message::Message, traits::{ - election::Membership, - network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, - TestableChannelImplementation, TransmitType, ViewMessage, - }, + network::{ConnectedNetwork, ConsensusIntentEvent, TransmitType}, node_implementation::NodeType, }, BoxSyncFuture, @@ -110,9 +106,9 @@ pub fn calculate_hash_of(t: &T) -> u64 { /// A communication channel with 2 networks, where we can fall back to the slower network if the /// primary fails #[derive(Clone, Debug)] -pub struct CombinedCommChannel { +pub struct CombinedNetworks { /// The two networks we'll use for send/recv - networks: Arc>, + networks: Arc>, /// Last n seen messages to prevent processing duplicates message_cache: Arc>, @@ -121,10 +117,10 @@ pub struct CombinedCommChannel { primary_down: Arc, } -impl CombinedCommChannel { +impl CombinedNetworks { /// Constructor #[must_use] - pub fn new(networks: Arc>) -> Self { + pub fn new(networks: Arc>) -> Self { Self { networks, message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), @@ -149,7 +145,7 @@ impl CombinedCommChannel { /// We need this so we can impl `TestableNetworkingImplementation` /// on the tuple #[derive(Debug, Clone)] -pub struct CombinedNetworks( +pub struct UnderlyingCombinedNetworks( pub WebServerNetwork, pub Libp2pNetwork, TYPES::SignatureKey>, ); @@ -163,7 +159,7 @@ impl TestableNetworkingImplementation for CombinedNetwor da_committee_size: usize, is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { let generators = ( TestableNetworkingImplementation for CombinedNetwor reliability_config, ) ); - Box::new(move |node_id| CombinedNetworks(generators.0(node_id), generators.1(node_id))) - } - - /// Get the number of messages in-flight. - /// - /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. - fn in_flight_message_count(&self) -> Option { - None - } -} - -#[cfg(feature = "hotshot-testing")] -impl TestableNetworkingImplementation for CombinedCommChannel { - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ); - Box::new(move |node_id| Self { - networks: generator(node_id).into(), - message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), - primary_down: Arc::new(AtomicU64::new(0)), + Box::new(move |node_id| { + let (quorum_web, da_web) = generators.0(node_id); + let (quorum_p2p, da_p2p) = generators.1(node_id); + let da_networks = UnderlyingCombinedNetworks( + Arc::>::into_inner(da_web).unwrap(), + Arc::, TYPES::SignatureKey>>::unwrap_or_clone(da_p2p), + ); + let quorum_networks = UnderlyingCombinedNetworks( + Arc::>::into_inner(quorum_web).unwrap(), + Arc::, TYPES::SignatureKey>>::unwrap_or_clone( + quorum_p2p, + ), + ); + let quorum_net = Self { + networks: Arc::new(quorum_networks), + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + }; + let da_net = Self { + networks: Arc::new(da_networks), + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + }; + (quorum_net.into(), da_net.into()) }) } @@ -229,9 +216,9 @@ impl TestableNetworkingImplementation for CombinedCommCh } #[async_trait] -impl CommunicationChannel for CombinedCommChannel { - type NETWORK = CombinedNetworks; - +impl ConnectedNetwork, TYPES::SignatureKey> + for CombinedNetworks +{ fn pause(&self) { self.networks.0.pause(); } @@ -265,11 +252,8 @@ impl CommunicationChannel for CombinedCommChannel async fn broadcast_message( &self, message: Message, - election: &TYPES::Membership, + recipients: BTreeSet, ) -> Result<(), NetworkError> { - let recipients = - ::Membership::get_committee(election, message.get_view_number()); - // broadcast optimistically on both networks, but if the primary network is down, skip it let primary_down = self.primary_down.load(Ordering::Relaxed); if primary_down < COMBINED_NETWORK_MIN_PRIMARY_FAILURES @@ -384,12 +368,6 @@ impl CommunicationChannel for CombinedCommChannel } } -impl TestableChannelImplementation for CombinedCommChannel { - fn generate_network() -> Box) -> Self + 'static> { - Box::new(move |network| CombinedCommChannel::new(network)) - } -} - #[cfg(test)] mod test { use super::*; diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 9a8162c6ba..d7111c0cea 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -20,10 +20,9 @@ use hotshot_types::{ data::ViewNumber, message::{Message, MessageKind}, traits::{ - election::Membership, network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, - NetworkError, NetworkMsg, TestableChannelImplementation, TransmitType, ViewMessage, + ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError, + NetworkMsg, TransmitType, ViewMessage, }, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, @@ -51,7 +50,6 @@ use std::{collections::HashSet, num::NonZeroUsize, str::FromStr}; use std::{ collections::BTreeSet, fmt::Debug, - marker::PhantomData, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -166,7 +164,7 @@ where da_committee_size: usize, _is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { assert!( da_committee_size <= expected_node_count, "DA committee size must be less than or equal to total # nodes" @@ -252,7 +250,7 @@ where let keys = all_keys.clone(); let da = da_keys.clone(); let reliability_config_dup = reliability_config.clone(); - async_block_on(async move { + let net = Arc::new(async_block_on(async move { match Libp2pNetwork::new( NetworkingMetricsValue::default(), config, @@ -273,7 +271,8 @@ where panic!("Failed to create libp2p network: {err:?}"); } } - }) + })); + (net.clone(), net) } }) } @@ -629,6 +628,14 @@ impl ConnectedNetwork for Libp2p self.wait_for_ready().await; } + fn pause(&self) { + unimplemented!("Pausing not implemented for the Libp2p network"); + } + + fn resume(&self) { + unimplemented!("Resuming not implemented for the Libp2p network"); + } + #[instrument(name = "Libp2pNetwork::ready_nonblocking", skip_all)] async fn is_ready(&self) -> bool { self.inner.is_ready.load(Ordering::Relaxed) @@ -886,153 +893,3 @@ impl ConnectedNetwork for Libp2p } } } - -/// libp2p identity communication channel -#[derive(Clone, Debug)] -pub struct Libp2pCommChannel( - Arc, TYPES::SignatureKey>>, - PhantomData, -); - -impl Libp2pCommChannel { - /// create a new libp2p communication channel - #[must_use] - pub fn new(network: Arc, TYPES::SignatureKey>>) -> Self { - Self(network, PhantomData) - } -} - -#[cfg(feature = "hotshot-testing")] -impl TestableNetworkingImplementation for Libp2pCommChannel -where - MessageKind: ViewMessage, -{ - /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork` - /// with the purpose of generating libp2p networks. - /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal - /// nodes with sane defaults. - /// # Panics - /// Returned function may panic either: - /// - An invalid configuration - /// (probably an issue with the defaults of this function) - /// - An inability to spin up the replica's network - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = , - TYPES::SignatureKey, - > as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config - ); - Box::new(move |node_id| Self(generator(node_id).into(), PhantomData)) - } - - fn in_flight_message_count(&self) -> Option { - None - } -} - -// FIXME maybe we should macro this...? It's repeated at verbatum EXCEPT for impl generics at the -// top -// we don't really want to make this the default implementation because that forces it to require ConnectedNetwork to be implemented. The struct we implement over might use multiple ConnectedNetworks -#[async_trait] -impl CommunicationChannel for Libp2pCommChannel -where - MessageKind: ViewMessage, -{ - type NETWORK = Libp2pNetwork, TYPES::SignatureKey>; - - fn pause(&self) { - unimplemented!("Pausing not implemented for the Libp2p network"); - } - - fn resume(&self) { - unimplemented!("Resuming not implemented for the Libp2p network"); - } - - async fn wait_for_ready(&self) { - self.0.wait_for_ready().await; - } - - async fn is_ready(&self) -> bool { - self.0.is_ready().await - } - - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - self.0.shut_down().await; - }; - boxed_sync(closure) - } - - async fn broadcast_message( - &self, - message: Message, - membership: &TYPES::Membership, - ) -> Result<(), NetworkError> { - let recipients = ::Membership::get_committee( - membership, - message.kind.get_view_number(), - ); - self.0.broadcast_message(message, recipients).await - } - - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } - - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { self.0.recv_msgs(transmit_type).await }; - boxed_sync(closure) - } - - async fn queue_node_lookup( - &self, - view_number: ViewNumber, - pk: TYPES::SignatureKey, - ) -> Result<(), UnboundedSendError>> { - self.0.queue_node_lookup(view_number, pk).await - } - - async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(&self.0, event) - .await; - } -} - -impl TestableChannelImplementation for Libp2pCommChannel { - fn generate_network( - ) -> Box, TYPES::SignatureKey>>) -> Self + 'static> - { - Box::new(move |network| Libp2pCommChannel::new(network)) - } -} diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index ae9f1c53dc..10f8803f6a 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -15,13 +15,9 @@ use dashmap::DashMap; use futures::StreamExt; use hotshot_types::{ boxed_sync, - message::{Message, MessageKind}, + message::Message, traits::{ - election::Membership, - network::{ - CommunicationChannel, ConnectedNetwork, NetworkMsg, TestableChannelImplementation, - TestableNetworkingImplementation, TransmitType, ViewMessage, - }, + network::{ConnectedNetwork, NetworkMsg, TestableNetworkingImplementation, TransmitType}, node_implementation::NodeType, signature_key::SignatureKey, }, @@ -251,18 +247,19 @@ impl TestableNetworkingImplementation _da_committee_size: usize, _is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { let master: Arc<_> = MasterMap::new(); // We assign known_nodes' public key and stake value rather than read from config file since it's a test Box::new(move |node_id| { let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); - MemoryNetwork::new( + let net = MemoryNetwork::new( pubkey, NetworkingMetricsValue::default(), master.clone(), reliability_config.clone(), - ) + ); + (net.clone().into(), net.into()) }) } @@ -277,6 +274,14 @@ impl ConnectedNetwork for Memory #[instrument(name = "MemoryNetwork::ready_blocking")] async fn wait_for_ready(&self) {} + fn pause(&self) { + unimplemented!("Pausing not implemented for the Memory network"); + } + + fn resume(&self) { + unimplemented!("Resuming not implemented for the Memory network"); + } + #[instrument(name = "MemoryNetwork::ready_nonblocking")] async fn is_ready(&self) -> bool { true @@ -450,123 +455,3 @@ impl ConnectedNetwork for Memory boxed_sync(closure) } } - -/// memory identity communication channel -#[derive(Clone, Debug)] -pub struct MemoryCommChannel( - Arc, TYPES::SignatureKey>>, -); - -impl MemoryCommChannel { - /// create new communication channel - #[must_use] - pub fn new(network: Arc, TYPES::SignatureKey>>) -> Self { - Self(network) - } -} - -impl TestableNetworkingImplementation for MemoryCommChannel -where - MessageKind: ViewMessage, -{ - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = , - TYPES::SignatureKey, - > as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ); - Box::new(move |node_id| Self(generator(node_id).into())) - } - - fn in_flight_message_count(&self) -> Option { - Some(self.0.inner.in_flight_message_count.load(Ordering::Relaxed)) - } -} - -#[async_trait] -impl CommunicationChannel for MemoryCommChannel -where - MessageKind: ViewMessage, -{ - type NETWORK = MemoryNetwork, TYPES::SignatureKey>; - - fn pause(&self) { - unimplemented!("Pausing not implemented for the memory network"); - } - - fn resume(&self) { - unimplemented!("Resuming not implemented for the memory network"); - } - - async fn wait_for_ready(&self) { - self.0.wait_for_ready().await; - } - - async fn is_ready(&self) -> bool { - self.0.is_ready().await - } - - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - self.0.shut_down().await; - }; - boxed_sync(closure) - } - - async fn broadcast_message( - &self, - message: Message, - election: &TYPES::Membership, - ) -> Result<(), NetworkError> { - let recipients = ::Membership::get_committee( - election, - message.kind.get_view_number(), - ); - self.0.broadcast_message(message, recipients).await - } - - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } - - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { self.0.recv_msgs(transmit_type).await }; - boxed_sync(closure) - } -} - -impl TestableChannelImplementation for MemoryCommChannel { - fn generate_network( - ) -> Box, TYPES::SignatureKey>>) -> Self + 'static> - { - Box::new(move |network| MemoryCommChannel::new(network)) - } -} diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index a5dcb90a30..861e55d3cd 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -18,9 +18,8 @@ use hotshot_types::{ message::{Message, MessagePurpose}, traits::{ network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, NetworkError, NetworkMsg, - TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, - WebServerNetworkError, + ConnectedNetwork, ConsensusIntentEvent, NetworkError, NetworkMsg, + TestableNetworkingImplementation, TransmitType, WebServerNetworkError, }, node_implementation::NodeType, signature_key::SignatureKey, @@ -52,18 +51,6 @@ use tracing::{debug, error, info, warn}; /// convenience alias alias for the result of getting transactions from the web server pub type TxnResult = Result>)>, ClientError>; -/// Represents the communication channel abstraction for the web server -#[derive(Clone, Debug)] -pub struct WebCommChannel(Arc>); - -impl WebCommChannel { - /// Create new communication channel - #[must_use] - pub fn new(network: Arc>) -> Self { - Self(network) - } -} - /// # Note /// /// This function uses `DefaultHasher` instead of cryptographic hash functions like SHA-256 because of an `AsRef` requirement. @@ -101,18 +88,6 @@ impl WebServerNetwork { source: WebServerNetworkError::ClientError, }) } - - /// Pauses the underlying network - pub fn pause(&self) { - error!("Pausing CDN network"); - self.inner.running.store(false, Ordering::Relaxed); - } - - /// Resumes the underlying network - pub fn resume(&self) { - error!("Resuming CDN network"); - self.inner.running.store(true, Ordering::Relaxed); - } } /// `TaskChannel` is a type alias for an unbounded sender channel that sends `ConsensusIntentEvent`s. @@ -721,104 +696,59 @@ impl WebServerNetwork { }; Ok(network_msg) } -} - -#[async_trait] -impl CommunicationChannel for WebCommChannel { - type NETWORK = WebServerNetwork; - /// Blocks until node is successfully initialized - /// into the network - async fn wait_for_ready(&self) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::wait_for_ready(&self.0) - .await; - } - - fn pause(&self) { - self.0.pause(); - } - - fn resume(&self) { - self.0.resume(); - } - /// checks if the network is ready - /// nonblocking - async fn is_ready(&self) -> bool { - as ConnectedNetwork, TYPES::SignatureKey>>::is_ready( - &self.0, - ) - .await - } - - /// Shut down this network. Afterwards this network should no longer be used. - /// - /// This should also cause other functions to immediately return with a [`NetworkError`] - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::shut_down(&self.0) - .await; - }; - boxed_sync(closure) - } - - /// broadcast message to those listening on the communication channel - /// blocking - async fn broadcast_message( - &self, - message: Message, - _election: &TYPES::Membership, - ) -> Result<(), NetworkError> { - self.0.broadcast_message(message, BTreeSet::new()).await - } + /// Generates a single webserver network, for use in tests + fn single_generator( + expected_node_count: usize, + _num_bootstrap: usize, + _network_id: usize, + _da_committee_size: usize, + is_da: bool, + _reliability_config: &Option>, + ) -> Box Self + 'static> { + let (server_shutdown_sender, server_shutdown) = oneshot(); + let sender = Arc::new(server_shutdown_sender); - /// Sends a direct message to a specific node - /// blocking - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } + // pick random, unused port + let port = portpicker::pick_unused_port().expect("Could not find an open port"); - /// Moves out the entire queue of received messages of 'transmit_type` - /// - /// Will unwrap the underlying `NetworkMessage` - /// blocking - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::recv_msgs(&self.0, transmit_type) + let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); + info!("Launching web server on port {port}"); + // Start web server + async_spawn(async { + match hotshot_web_server::run_web_server::( + Some(server_shutdown), + url, + ) .await - }; - boxed_sync(closure) - } + { + Ok(()) => error!("Web server future finished unexpectedly"), + Err(e) => error!("Web server task failed: {e}"), + } + }); - async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(&self.0, event) - .await; + // We assign known_nodes' public key and stake value rather than read from config file since it's a test + let known_nodes = (0..expected_node_count as u64) + .map(|id| { + TYPES::SignatureKey::from_private( + &TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], id).1, + ) + }) + .collect::>(); + + // Start each node's web server client + Box::new(move |id| { + let sender = Arc::clone(&sender); + let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); + let mut network = WebServerNetwork::create( + url, + Duration::from_millis(100), + known_nodes[usize::try_from(id).unwrap()].clone(), + is_da, + ); + network.server_shutdown_signal = Some(sender); + network + }) } } @@ -832,7 +762,15 @@ impl ConnectedNetwork, TYPES::Signatur async_sleep(Duration::from_secs(1)).await; } } + fn pause(&self) { + error!("Pausing CDN network"); + self.inner.running.store(false, Ordering::Relaxed); + } + fn resume(&self) { + error!("Resuming CDN network"); + self.inner.running.store(true, Ordering::Relaxed); + } /// checks if the network is ready /// nonblocking async fn is_ready(&self) -> bool { @@ -1339,93 +1277,35 @@ impl ConnectedNetwork, TYPES::Signatur } impl TestableNetworkingImplementation for WebServerNetwork { - fn generator( - expected_node_count: usize, - _num_bootstrap: usize, - _network_id: usize, - _da_committee_size: usize, - is_da: bool, - _reliability_config: Option>, - ) -> Box Self + 'static> { - let (server_shutdown_sender, server_shutdown) = oneshot(); - let sender = Arc::new(server_shutdown_sender); - - // pick random, unused port - let port = portpicker::pick_unused_port().expect("Could not find an open port"); - - let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); - info!("Launching web server on port {port}"); - // Start web server - async_spawn(async { - match hotshot_web_server::run_web_server::( - Some(server_shutdown), - url, - ) - .await - { - Ok(()) => error!("Web server future finished unexpectedly"), - Err(e) => error!("Web server task failed: {e}"), - } - }); - - // We assign known_nodes' public key and stake value rather than read from config file since it's a test - let known_nodes = (0..expected_node_count as u64) - .map(|id| { - TYPES::SignatureKey::from_private( - &TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], id).1, - ) - }) - .collect::>(); - - // Start each node's web server client - Box::new(move |id| { - let sender = Arc::clone(&sender); - let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); - let mut network = WebServerNetwork::create( - url, - Duration::from_millis(100), - known_nodes[usize::try_from(id).unwrap()].clone(), - is_da, - ); - network.server_shutdown_signal = Some(sender); - network - }) - } - - fn in_flight_message_count(&self) -> Option { - None - } -} - -impl TestableNetworkingImplementation for WebCommChannel { fn generator( expected_node_count: usize, num_bootstrap: usize, network_id: usize, da_committee_size: usize, - is_da: bool, - _reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = as TestableNetworkingImplementation<_>>::generator( + _is_da: bool, + reliability_config: Option>, + ) -> Box (Arc, Arc) + 'static> { + let da_gen = Self::single_generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + true, + &reliability_config, + ); + let quorum_gen = Self::single_generator( expected_node_count, num_bootstrap, network_id, da_committee_size, - is_da, - // network reliability is a testing feature - // not yet implemented for webcommchannel - None, + false, + &reliability_config, ); - Box::new(move |node_id| Self(generator(node_id).into())) + // Start each node's web server client + Box::new(move |id| (quorum_gen(id).into(), da_gen(id).into())) } fn in_flight_message_count(&self) -> Option { None } } - -impl TestableChannelImplementation for WebCommChannel { - fn generate_network() -> Box>) -> Self + 'static> { - Box::new(move |network| WebCommChannel::new(network)) - } -} diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 1cc0af9e75..af0ff75b08 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -25,7 +25,7 @@ use hotshot_types::{ block_contents::BlockHeader, consensus_api::ConsensusApi, election::Membership, - network::{CommunicationChannel, ConsensusIntentEvent}, + network::{ConnectedNetwork, ConsensusIntentEvent}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, signature_key::SignatureKey, states::ValidatedState, diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 577afbaca6..8c574d7eab 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -18,7 +18,7 @@ use hotshot_types::{ block_contents::vid_commitment, consensus_api::ConsensusApi, election::Membership, - network::{CommunicationChannel, ConsensusIntentEvent}, + network::{ConnectedNetwork, ConsensusIntentEvent}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, signature_key::SignatureKey, }, diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 92dd284aa2..17299d554a 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ events::{HotShotEvent, HotShotTaskCompleted}, helpers::broadcast_event, @@ -13,7 +15,7 @@ use hotshot_types::{ }, traits::{ election::Membership, - network::{CommunicationChannel, TransmitType}, + network::{ConnectedNetwork, TransmitType, ViewMessage}, node_implementation::NodeType, }, vote::{HasViewNumber, Vote}, @@ -181,9 +183,12 @@ impl NetworkMessageTaskState { } /// network event task state -pub struct NetworkEventTaskState> { +pub struct NetworkEventTaskState< + TYPES: NodeType, + COMMCHANNEL: ConnectedNetwork, TYPES::SignatureKey>, +> { /// comm channel - pub channel: COMMCHANNEL, + pub channel: Arc, /// view number pub view: TYPES::Time, /// membership for the channel @@ -193,7 +198,7 @@ pub struct NetworkEventTaskState) -> bool, } -impl> TaskState +impl, TYPES::SignatureKey>> TaskState for NetworkEventTaskState { type Event = HotShotEvent; @@ -221,7 +226,7 @@ impl> TaskState } } -impl> +impl, TYPES::SignatureKey>> NetworkEventTaskState { /// Handle the given event. @@ -364,13 +369,18 @@ impl> sender, kind: message_kind, }; + let view = message.kind.get_view_number(); let transmit_result = match transmit_type { TransmitType::Direct => { self.channel .direct_message(message, recipient.unwrap()) .await } - TransmitType::Broadcast => self.channel.broadcast_message(message, membership).await, + TransmitType::Broadcast => { + self.channel + .broadcast_message(message, membership.get_committee(view)) + .await + } }; match transmit_result { diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index d07aeb2c10..be40cb8e01 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -6,7 +6,7 @@ use async_lock::RwLock; use async_std::task::spawn_blocking; use hotshot_task::task::{Task, TaskState}; -use hotshot_types::traits::network::CommunicationChannel; +use hotshot_types::traits::network::ConnectedNetwork; use hotshot_types::{ consensus::Consensus, data::VidDisperse, diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 04d47cda08..03ade80ae5 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -31,7 +31,7 @@ use hotshot_types::{ traits::{ consensus_api::ConsensusApi, election::Membership, - network::CommunicationChannel, + network::ConnectedNetwork, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, }, }; diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index 017e1497a0..6235900810 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -6,7 +6,7 @@ use crate::test_runner::HotShotTaskCompleted; use crate::test_runner::LateStartNode; use crate::test_runner::Node; use hotshot_task::task::{Task, TaskState, TestTaskState}; -use hotshot_types::traits::network::CommunicationChannel; +use hotshot_types::traits::network::ConnectedNetwork; use hotshot_types::{event::Event, traits::node_implementation::NodeType}; use snafu::Snafu; use std::collections::BTreeMap; diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index b3eafdc757..567382aeba 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -284,7 +284,7 @@ impl TestMetadata { TestLauncher { resource_generator: ResourceGenerators { - channel_generator: >::gen_comm_channels( + channel_generator: >::gen_networks( total_nodes, num_bootstrap_nodes, da_committee_size, diff --git a/crates/testing/src/test_launcher.rs b/crates/testing/src/test_launcher.rs index df7d0a6a47..d6196b86d1 100644 --- a/crates/testing/src/test_launcher.rs +++ b/crates/testing/src/test_launcher.rs @@ -2,7 +2,8 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use hotshot::traits::{NodeImplementation, TestableNodeImplementation}; use hotshot_types::{ - traits::{network::CommunicationChannel, node_implementation::NodeType}, + message::Message, + traits::{network::ConnectedNetwork, node_implementation::NodeType}, HotShotConfig, }; @@ -10,19 +11,13 @@ use super::{test_builder::TestMetadata, test_runner::TestRunner}; /// convience type alias for the networks available pub type Networks = ( - >::QuorumNetwork, - >::QuorumNetwork, + Arc<>::QuorumNetwork>, + Arc<>::QuorumNetwork>, ); /// Wrapper for a function that takes a `node_id` and returns an instance of `T`. pub type Generator = Box T + 'static>; -/// Wrapper Type for committee function that takes a `ConnectedNetwork` and returns a `CommunicationChannel` -pub type CommitteeNetworkGenerator = Box) -> T + 'static>; - -/// Wrapper Type for view sync function that takes a `ConnectedNetwork` and returns a `CommunicationChannel` -pub type ViewSyncNetworkGenerator = Box) -> T + 'static>; - /// generators for resources used by each node pub struct ResourceGenerators> { /// generate channels @@ -44,7 +39,9 @@ pub struct TestLauncher> { impl> TestLauncher { /// launch the test #[must_use] - pub fn launch>(self) -> TestRunner { + pub fn launch, TYPES::SignatureKey>>( + self, + ) -> TestRunner { TestRunner:: { launcher: self, nodes: Vec::new(), diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 467b7b734d..cd35f8b222 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -20,9 +20,6 @@ use hotshot::{traits::TestableNodeImplementation, HotShotInitializer, SystemCont use hotshot_constants::EVENT_CHANNEL_SIZE; use hotshot_task::task::{Task, TaskRegistry, TestTask}; -use hotshot_types::traits::{ - network::CommunicationChannel, node_implementation::NodeImplementation, -}; use hotshot_types::{ consensus::ConsensusMetricsValue, traits::{ @@ -31,6 +28,10 @@ use hotshot_types::{ }, HotShotConfig, ValidatorConfig, }; +use hotshot_types::{ + message::Message, + traits::{network::ConnectedNetwork, node_implementation::NodeImplementation}, +}; use std::{ collections::{BTreeMap, HashMap, HashSet}, marker::PhantomData, @@ -65,7 +66,7 @@ pub struct LateStartNode> pub struct TestRunner< TYPES: NodeType, I: TestableNodeImplementation, - N: CommunicationChannel, + N: ConnectedNetwork, TYPES::SignatureKey>, > { /// test launcher, contains a bunch of useful metadata and closures pub(crate) launcher: TestLauncher, @@ -102,7 +103,7 @@ impl TaskErr for T {} impl< TYPES: NodeType, I: TestableNodeImplementation, - N: CommunicationChannel, + N: ConnectedNetwork, TYPES::SignatureKey>, > TestRunner where I: TestableNodeImplementation, diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index 33b40ff0c7..4510cd3797 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -98,7 +98,7 @@ async fn test_da_task() { api: api.clone(), consensus: handle.hotshot.get_consensus(), da_membership: api.inner.memberships.da_membership.clone().into(), - da_network: api.inner.networks.da_network.clone().into(), + da_network: api.inner.networks.da_network.clone(), quorum_membership: api.inner.memberships.quorum_membership.clone().into(), cur_view: ViewNumber::new(0), vote_collector: None.into(), diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index ffe5d9d0ee..49c9a4081e 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_compatibility_layer::logging::setup_logging; use hotshot::traits::election::static_committee::{GeneralStaticCommittee, StaticElectionConfig}; use hotshot::traits::implementations::{ - MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, + MasterMap, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, }; use hotshot::traits::NodeImplementation; use hotshot::types::SignatureKey; @@ -61,10 +61,10 @@ impl NodeType for Test { pub struct TestImpl {} pub type ThisMembership = GeneralStaticCommittee::SignatureKey>; -pub type DANetwork = MemoryCommChannel; -pub type QuorumNetwork = MemoryCommChannel; -pub type ViewSyncNetwork = MemoryCommChannel; -pub type VIDNetwork = MemoryCommChannel; +pub type DANetwork = MemoryNetwork, ::SignatureKey>; +pub type QuorumNetwork = MemoryNetwork, ::SignatureKey>; +pub type ViewSyncNetwork = MemoryNetwork, ::SignatureKey>; +pub type VIDNetwork = MemoryNetwork, ::SignatureKey>; impl NodeImplementation for TestImpl { type Storage = MemoryStorage; diff --git a/crates/testing/tests/vid_task.rs b/crates/testing/tests/vid_task.rs index eadfaf4e84..6ca6366abe 100644 --- a/crates/testing/tests/vid_task.rs +++ b/crates/testing/tests/vid_task.rs @@ -105,7 +105,7 @@ async fn test_vid_task() { consensus: handle.hotshot.get_consensus(), cur_view: ViewNumber::new(0), vote_collector: None, - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.vid_membership.clone().into(), public_key: *api.public_key(), private_key: api.private_key().clone(), diff --git a/crates/testing/tests/view_sync_task.rs b/crates/testing/tests/view_sync_task.rs index fd75e5944f..2fa93f6972 100644 --- a/crates/testing/tests/view_sync_task.rs +++ b/crates/testing/tests/view_sync_task.rs @@ -55,7 +55,7 @@ async fn test_view_sync_task() { let view_sync_state = ViewSyncTaskState { current_view: ViewNumber::new(0), next_view: ViewNumber::new(0), - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.view_sync_membership.clone().into(), public_key: *api.public_key(), private_key: api.private_key().clone(), diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index a2c8357a29..d659fce2ba 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -12,11 +12,7 @@ use tokio::time::error::Elapsed as TimeoutError; #[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."} use super::{node_implementation::NodeType, signature_key::SignatureKey}; -use crate::{ - data::ViewNumber, - message::{Message, MessagePurpose}, - BoxSyncFuture, -}; +use crate::{data::ViewNumber, message::MessagePurpose, BoxSyncFuture}; use async_compatibility_layer::channel::UnboundedSendError; use async_trait::async_trait; use rand::{ @@ -226,76 +222,6 @@ pub trait ViewMessage { fn purpose(&self) -> MessagePurpose; } -/// API for interacting directly with a consensus committee -/// intended to be implemented for both DA and for validating consensus committees -#[async_trait] -pub trait CommunicationChannel: Clone + Debug + Send + Sync + 'static { - /// Underlying Network implementation's type - type NETWORK; - /// Blocks until node is successfully initialized - /// into the network - async fn wait_for_ready(&self); - - /// Pauses the underlying network - fn pause(&self); - - /// Resumes the underlying network - fn resume(&self); - - /// checks if the network is ready - /// nonblocking - async fn is_ready(&self) -> bool; - - /// Shut down this network. Afterwards this network should no longer be used. - /// - /// This should also cause other functions to immediately return with a [`NetworkError`] - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b; - - /// broadcast message to those listening on the communication channel - /// blocking - async fn broadcast_message( - &self, - message: Message, - election: &TYPES::Membership, - ) -> Result<(), NetworkError>; - - /// Sends a direct message to a specific node - /// blocking - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError>; - - /// Moves out the entire queue of received messages of 'transmit_type` - /// - /// Will unwrap the underlying `NetworkMessage` - /// blocking - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b; - - /// queues looking up a node - async fn queue_node_lookup( - &self, - _view_number: ViewNumber, - _pk: TYPES::SignatureKey, - ) -> Result<(), UnboundedSendError>> { - Ok(()) - } - - /// Injects consensus data such as view number into the networking implementation - /// blocking - async fn inject_consensus_info(&self, _event: ConsensusIntentEvent) {} -} - /// represents a networking implmentration /// exposes low level API for interacting with a network /// intended to be implemented for libp2p, the centralized server, @@ -304,6 +230,12 @@ pub trait CommunicationChannel: Clone + Debug + Send + Sync + ' pub trait ConnectedNetwork: Clone + Send + Sync + 'static { + /// Pauses the underlying network + fn pause(&self); + + /// Resumes the underlying network + fn resume(&self); + /// Blocks until the network is successfully initialized async fn wait_for_ready(&self); @@ -358,8 +290,12 @@ pub trait ConnectedNetwork: } /// Describes additional functionality needed by the test network implementation -pub trait TestableNetworkingImplementation { +pub trait TestableNetworkingImplementation +where + Self: Sized, +{ /// generates a network given an expected node count + #[allow(clippy::type_complexity)] fn generator( expected_node_count: usize, num_bootstrap: usize, @@ -367,20 +303,13 @@ pub trait TestableNetworkingImplementation { da_committee_size: usize, is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static>; + ) -> Box (Arc, Arc) + 'static>; /// Get the number of messages in-flight. /// /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. fn in_flight_message_count(&self) -> Option; } -/// Describes additional functionality needed by the test communication channel -pub trait TestableChannelImplementation: CommunicationChannel { - /// generates the `CommunicationChannel` given it's associated network type - #[allow(clippy::type_complexity)] - fn generate_network( - ) -> Box>::NETWORK>) -> Self + 'static>; -} /// Changes that can occur in the network #[derive(Debug)] diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 7cd2ef070e..f3199a2bd6 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -6,16 +6,17 @@ use super::{ block_contents::{BlockHeader, TestableBlock, Transaction}, election::ElectionConfig, - network::{CommunicationChannel, NetworkReliability, TestableNetworkingImplementation}, + network::{ConnectedNetwork, NetworkReliability, TestableNetworkingImplementation}, states::TestableState, storage::{StorageError, StorageState, TestableStorage}, ValidatedState, }; use crate::{ data::{Leaf, TestableLeaf}, + message::Message, traits::{ - election::Membership, network::TestableChannelImplementation, signature_key::SignatureKey, - states::InstanceState, storage::Storage, BlockPayload, + election::Membership, signature_key::SignatureKey, states::InstanceState, storage::Storage, + BlockPayload, }, }; use async_trait::async_trait; @@ -44,9 +45,9 @@ pub trait NodeImplementation: type Storage: Storage + Clone; /// Network for all nodes - type QuorumNetwork: CommunicationChannel; + type QuorumNetwork: ConnectedNetwork, TYPES::SignatureKey>; /// Network for those in the DA committee - type CommitteeNetwork: CommunicationChannel; + type CommitteeNetwork: ConnectedNetwork, TYPES::SignatureKey>; } /// extra functions required on a node implementation to be usable by hotshot-testing @@ -94,12 +95,12 @@ pub trait TestableNodeImplementation: NodeImplementation async fn get_full_state(storage: &Self::Storage) -> StorageState; /// Generate the communication channels for testing - fn gen_comm_channels( + fn gen_networks( expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, reliability_config: Option>, - ) -> Box (Self::QuorumNetwork, Self::QuorumNetwork)>; + ) -> Box (Arc, Arc)>; } #[async_trait] @@ -108,12 +109,8 @@ where TYPES::ValidatedState: TestableState, TYPES::BlockPayload: TestableBlock, I::Storage: TestableStorage, - I::QuorumNetwork: TestableChannelImplementation, - I::CommitteeNetwork: TestableChannelImplementation, - <>::QuorumNetwork as CommunicationChannel>::NETWORK: - TestableNetworkingImplementation, - <>::CommitteeNetwork as CommunicationChannel>::NETWORK: - TestableNetworkingImplementation, + I::QuorumNetwork: TestableNetworkingImplementation, + I::CommitteeNetwork: TestableNetworkingImplementation, { type CommitteeElectionConfig = TYPES::ElectionConfigType; @@ -153,38 +150,20 @@ where async fn get_full_state(storage: &Self::Storage) -> StorageState { >::get_full_state(storage).await } - fn gen_comm_channels( + fn gen_networks( expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, reliability_config: Option>, - ) -> Box (Self::QuorumNetwork, Self::QuorumNetwork)> { - let quorum_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( - expected_node_count, - num_bootstrap, - 0, - da_committee_size, - false, - reliability_config.clone(), - ); - let da_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( + ) -> Box (Arc, Arc)> { + >::generator( expected_node_count, num_bootstrap, - 1, + 0, da_committee_size, false, - reliability_config, - ); - - Box::new(move |id| { - let quorum = Arc::new(quorum_generator(id)); - let da = Arc::new(da_generator(id)); - let quorum_chan = - >::generate_network()(quorum); - let committee_chan = - >::generate_network()(da); - (quorum_chan, committee_chan) - }) + reliability_config.clone(), + ) } }