From fef62174df6c5d51b30051377fe4a38b7bad186b Mon Sep 17 00:00:00 2001 From: = Date: Mon, 12 Feb 2024 11:32:47 -0500 Subject: [PATCH 01/11] Remove custom DM codec --- Cargo.lock | 11 + Cargo.toml | 1 + .../libp2p-networking/examples/common/mod.rs | 4 +- .../src/network/behaviours/direct_message.rs | 37 ++-- .../behaviours/direct_message_codec.rs | 200 ------------------ crates/libp2p-networking/src/network/def.rs | 7 +- crates/libp2p-networking/src/network/mod.rs | 9 +- crates/libp2p-networking/src/network/node.rs | 13 +- .../src/network/node/handle.rs | 7 +- 9 files changed, 44 insertions(+), 245 deletions(-) delete mode 100644 crates/libp2p-networking/src/network/behaviours/direct_message_codec.rs diff --git a/Cargo.lock b/Cargo.lock index abd6ea0b39..ad5d3854ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1176,6 +1176,15 @@ dependencies = [ "serde", ] +[[package]] +name = "cbor4ii" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b4c883b9cc4757b061600d39001d4d0232bece4a3174696cf8f58a14db107d" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.0.83" @@ -4146,6 +4155,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12823250fe0c45bdddea6eefa2be9a609aff1283ff4e1d8a294fdbb89572f6f" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "futures-timer", @@ -4154,6 +4164,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", "smallvec", "tracing", "void", diff --git a/Cargo.toml b/Cargo.toml index 8f39d5bcb2..91983c3062 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ typenum = "1.17.0" libp2p = { package = "libp2p", version = "0.53.2", features = [ "macros", "autonat", + "cbor", "dns", "floodsub", "gossipsub", diff --git a/crates/libp2p-networking/examples/common/mod.rs b/crates/libp2p-networking/examples/common/mod.rs index 8be084db29..648b4a55ec 100644 --- a/crates/libp2p-networking/examples/common/mod.rs +++ b/crates/libp2p-networking/examples/common/mod.rs @@ -20,7 +20,7 @@ use clap::{Args, Parser}; use libp2p::{multiaddr, request_response::ResponseChannel, Multiaddr}; use libp2p_identity::PeerId; use libp2p_networking::network::{ - behaviours::direct_message_codec::DirectMessageResponse, deserialize_msg, + deserialize_msg, network_node_handle_error::NodeConfigSnafu, spin_up_swarm, NetworkEvent, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeHandleError, NetworkNodeType, }; @@ -279,7 +279,7 @@ pub async fn handle_normal_msg( handle: Arc)>>, msg: NormalMessage, // in case we need to reply to direct message - chan: Option>, + chan: Option>>, ) -> Result<(), NetworkNodeHandleError> { debug!("node={} handling normal msg {:?}", handle.id(), msg); // send reply logic diff --git a/crates/libp2p-networking/src/network/behaviours/direct_message.rs b/crates/libp2p-networking/src/network/behaviours/direct_message.rs index 81e5d39692..0bc684583d 100644 --- a/crates/libp2p-networking/src/network/behaviours/direct_message.rs +++ b/crates/libp2p-networking/src/network/behaviours/direct_message.rs @@ -3,19 +3,18 @@ use std::{ task::Poll, }; +use libp2p::request_response::cbor::Behaviour; use libp2p::{ - request_response::{Behaviour, Event, Message, OutboundRequestId, ResponseChannel}, + request_response::{Event, Message, OutboundRequestId, ResponseChannel}, swarm::{NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm}, Multiaddr, }; use libp2p_identity::PeerId; -// use libp2p_request_response::Behaviour; use tracing::{error, info}; -use super::{ - direct_message_codec::{DirectMessageCodec, DirectMessageRequest, DirectMessageResponse}, - exponential_backoff::ExponentialBackoff, -}; +use super::exponential_backoff::ExponentialBackoff; +/// Maximum size of a direct message +pub const MAX_MSG_SIZE_DM: usize = 100_000_000; /// Request to direct message a peert pub struct DMRequest { @@ -33,7 +32,7 @@ pub struct DMRequest { /// usage: direct message peer pub struct DMBehaviour { /// The wrapped behaviour - request_response: Behaviour, + request_response: Behaviour, Vec>, /// In progress queries in_progress_rr: HashMap, /// Failed queries to be retried @@ -46,14 +45,14 @@ pub struct DMBehaviour { #[derive(Debug)] pub enum DMEvent { /// We received as Direct Request - DirectRequest(Vec, PeerId, ResponseChannel), + DirectRequest(Vec, PeerId, ResponseChannel>), /// We received a Direct Response DirectResponse(Vec, PeerId), } impl DMBehaviour { /// handle a direct message event - fn handle_dm_event(&mut self, event: Event) { + fn handle_dm_event(&mut self, event: Event, Vec>) { match event { Event::InboundFailure { peer, @@ -83,7 +82,7 @@ impl DMBehaviour { } Event::Message { message, peer, .. } => match message { Message::Request { - request: DirectMessageRequest(msg), + request: msg, channel, .. } => { @@ -95,7 +94,7 @@ impl DMBehaviour { } Message::Response { request_id, - response: DirectMessageResponse(msg), + response: msg, } => { // success, finished. if let Some(req) = self.in_progress_rr.remove(&request_id) { @@ -115,7 +114,7 @@ impl DMBehaviour { } impl NetworkBehaviour for DMBehaviour { - type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type ConnectionHandler = , Vec> as NetworkBehaviour>::ConnectionHandler; type ToSwarm = DMEvent; @@ -257,7 +256,7 @@ impl NetworkBehaviour for DMBehaviour { impl DMBehaviour { /// Create new behaviour based on request response #[must_use] - pub fn new(request_response: Behaviour) -> Self { + pub fn new(request_response: Behaviour, Vec>) -> Self { Self { request_response, in_progress_rr: HashMap::default(), @@ -286,21 +285,15 @@ impl DMBehaviour { let request_id = self .request_response - .send_request(&req.peer_id, DirectMessageRequest(req.data.clone())); + .send_request(&req.peer_id, req.data.clone()); info!("direct message request with id {:?}", request_id); self.in_progress_rr.insert(request_id, req); } /// Add a direct response for a channel - pub fn add_direct_response( - &mut self, - chan: ResponseChannel, - msg: Vec, - ) { - let res = self - .request_response - .send_response(chan, DirectMessageResponse(msg)); + pub fn add_direct_response(&mut self, chan: ResponseChannel>, msg: Vec) { + let res = self.request_response.send_response(chan, msg); if let Err(e) = res { error!("Error replying to direct message. {:?}", e); } diff --git a/crates/libp2p-networking/src/network/behaviours/direct_message_codec.rs b/crates/libp2p-networking/src/network/behaviours/direct_message_codec.rs deleted file mode 100644 index 18584ec510..0000000000 --- a/crates/libp2p-networking/src/network/behaviours/direct_message_codec.rs +++ /dev/null @@ -1,200 +0,0 @@ -use async_trait::async_trait; -use futures::prelude::*; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; -use libp2p::request_response::Codec; -use serde::{Deserialize, Serialize}; -use std::io; - -/// Protocol for direct messages -#[derive(Debug, Clone)] -pub struct DirectMessageProtocol(); -/// Codec for direct messages -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct DirectMessageCodec(); -/// Wrapper type describing a serialized direct message -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DirectMessageRequest(pub Vec); -/// wrapper type describing the response to direct message -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DirectMessageResponse(pub Vec); - -/// Maximum size of a direct message -pub const MAX_MSG_SIZE_DM: usize = 100_000_000; - -// NOTE: yoinked from libp2p -// -/// Writes a message to the given socket with a length prefix appended to it. Also flushes the socket. -/// -/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is -/// > compatible with what [`read_length_prefixed`] expects. -/// # Errors -/// On weird input from socket -pub async fn write_length_prefixed( - socket: &mut (impl AsyncWrite + Unpin), - data: impl AsRef<[u8]>, -) -> Result<(), io::Error> { - write_varint(socket, data.as_ref().len()).await?; - socket.write_all(data.as_ref()).await?; - socket.flush().await?; - - Ok(()) -} - -/// Writes a variable-length integer to the `socket`. -/// -/// > **Note**: Does **NOT** flush the socket. -/// # Errors -/// On weird input from socket -pub async fn write_varint( - socket: &mut (impl AsyncWrite + Unpin), - len: usize, -) -> Result<(), io::Error> { - let mut len_data = unsigned_varint::encode::usize_buffer(); - let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len(); - socket.write_all(&len_data[..encoded_len]).await?; - - Ok(()) -} - -/// Reads a variable-length integer from the `socket`. -/// -/// As a special exception, if the `socket` is empty and EOFs right at the beginning, then we -/// return `Ok(0)`. -/// -/// > **Note**: This function reads bytes one by one from the `socket`. It is therefore encouraged -/// > to use some sort of buffering mechanism. -/// # Errors -/// On weird input from socket -pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result { - let mut buffer = unsigned_varint::encode::usize_buffer(); - let mut buffer_len = 0; - - loop { - match socket.read(&mut buffer[buffer_len..=buffer_len]).await? { - 0 => { - // Reaching EOF before finishing to read the length is an error, unless the EOF is - // at the very beginning of the substream, in which case we assume that the data is - // empty. - if buffer_len == 0 { - return Ok(0); - } - return Err(io::ErrorKind::UnexpectedEof.into()); - } - n => debug_assert_eq!(n, 1), - } - - buffer_len += 1; - - match unsigned_varint::decode::usize(&buffer[..buffer_len]) { - Ok((len, _)) => return Ok(len), - Err(unsigned_varint::decode::Error::Overflow) => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "overflow in variable-length integer", - )); - } - // TODO: why do we have a `__Nonexhaustive` variant in the error? I don't know how to process it - // Err(unsigned_varint::decode::Error::Insufficient) => {} - Err(_) => {} - } - } -} - -/// Reads a length-prefixed message from the given socket. -/// -/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is -/// necessary in order to avoid `DoS` attacks where the remote sends us a message of several -/// gigabytes. -/// -/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is -/// > compatible with what [`write_length_prefixed`] does. -/// # Errors -/// On weird input from socket -pub async fn read_length_prefixed( - socket: &mut (impl AsyncRead + Unpin), - max_size: usize, -) -> io::Result> { - let len = read_varint(socket).await?; - if len > max_size { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Received data size ({len} bytes) exceeds maximum ({max_size} bytes)"), - )); - } - - let mut buf = vec![0; len]; - socket.read_exact(&mut buf).await?; - - Ok(buf) -} - -impl AsRef for DirectMessageProtocol { - fn as_ref(&self) -> &str { - "/HotShot/request_response/1.0" - } -} - -#[async_trait] -impl Codec for DirectMessageCodec { - type Protocol = DirectMessageProtocol; - - type Request = DirectMessageRequest; - - type Response = DirectMessageResponse; - - async fn read_request( - &mut self, - _: &DirectMessageProtocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let msg = read_length_prefixed(io, MAX_MSG_SIZE_DM).await?; - - // NOTE we don't error here unless message is too big. - // We'll wrap this in a networkbehaviour and get parsing messages there - Ok(DirectMessageRequest(msg)) - } - - async fn read_response( - &mut self, - _: &DirectMessageProtocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let msg = read_length_prefixed(io, MAX_MSG_SIZE_DM).await?; - Ok(DirectMessageResponse(msg)) - } - - async fn write_request( - &mut self, - _: &DirectMessageProtocol, - io: &mut T, - DirectMessageRequest(msg): DirectMessageRequest, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, msg).await?; - io.close().await?; - - Ok(()) - } - - async fn write_response( - &mut self, - _: &DirectMessageProtocol, - io: &mut T, - DirectMessageResponse(msg): DirectMessageResponse, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, msg).await?; - io.close().await?; - Ok(()) - } -} diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index c412e6fd11..bfbf197b40 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -13,7 +13,6 @@ use super::{ behaviours::{ dht::{DHTBehaviour, DHTEvent, KadPutQuery}, direct_message::{DMBehaviour, DMEvent, DMRequest}, - direct_message_codec::DirectMessageResponse, exponential_backoff::ExponentialBackoff, gossip::{GossipBehaviour, GossipEvent}, }, @@ -145,11 +144,7 @@ impl NetworkDef { } /// Add a direct response for a channel - pub fn add_direct_response( - &mut self, - chan: ResponseChannel, - msg: Vec, - ) { + pub fn add_direct_response(&mut self, chan: ResponseChannel>, msg: Vec) { self.request_response.add_direct_response(chan, msg); } } diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 9359b1f8eb..523b5eef34 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -17,10 +17,7 @@ pub use self::{ }, }; -use self::behaviours::{ - dht::DHTEvent, direct_message::DMEvent, direct_message_codec::DirectMessageResponse, - gossip::GossipEvent, -}; +use self::behaviours::{dht::DHTEvent, direct_message::DMEvent, gossip::GossipEvent}; use bincode::Options; use futures::channel::oneshot::Sender; use hotshot_utils::bincode::bincode_opts; @@ -125,7 +122,7 @@ pub enum ClientRequest { retry_count: u8, }, /// client request to send a direct reply to a message - DirectResponse(ResponseChannel, Vec), + DirectResponse(ResponseChannel>, Vec), /// prune a peer Prune(PeerId), /// add vec of known peers or addresses @@ -169,7 +166,7 @@ pub enum NetworkEvent { /// Recv-ed a broadcast GossipMsg(Vec, TopicHash), /// Recv-ed a direct message from a node - DirectRequest(Vec, PeerId, ResponseChannel), + DirectRequest(Vec, PeerId, ResponseChannel>), /// Recv-ed a direct response from a node (that hopefully was initiated by this node) DirectResponse(Vec, PeerId), /// Report that kademlia has successfully bootstrapped into the network diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index af5f682c3b..24e9c93d56 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -21,8 +21,7 @@ use super::{ use crate::network::behaviours::{ dht::{DHTBehaviour, DHTEvent, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST}, - direct_message::{DMBehaviour, DMEvent}, - direct_message_codec::{DirectMessageCodec, DirectMessageProtocol, MAX_MSG_SIZE_DM}, + direct_message::{DMBehaviour, DMEvent, MAX_MSG_SIZE_DM}, exponential_backoff::ExponentialBackoff, gossip::GossipEvent, }; @@ -32,7 +31,7 @@ use async_compatibility_layer::{ }; use futures::{select, FutureExt, StreamExt}; use hotshot_constants::KAD_DEFAULT_REPUB_INTERVAL_SEC; -use libp2p::core::transport::ListenerId; +use libp2p::{core::transport::ListenerId, StreamProtocol}; use libp2p::{ gossipsub::{ Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, @@ -266,9 +265,13 @@ impl NetworkNode { let rrconfig = RequestResponseConfig::default(); - let request_response: libp2p::request_response::Behaviour = + let request_response: libp2p::request_response::cbor::Behaviour, Vec> = RequestResponse::new( - [(DirectMessageProtocol(), ProtocolSupport::Full)].into_iter(), + [( + StreamProtocol::new("/HotShot/request_response/1.0"), + ProtocolSupport::Full, + )] + .into_iter(), rrconfig, ); diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 885f66b097..69e17f3678 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -1,7 +1,6 @@ use crate::network::{ - behaviours::direct_message_codec::DirectMessageResponse, error::DHTError, gen_multiaddr, - ClientRequest, NetworkError, NetworkEvent, NetworkNode, NetworkNodeConfig, - NetworkNodeConfigBuilderError, + error::DHTError, gen_multiaddr, ClientRequest, NetworkError, NetworkEvent, NetworkNode, + NetworkNodeConfig, NetworkNodeConfigBuilderError, }; use async_compatibility_layer::{ art::{async_sleep, async_spawn, async_timeout, future::to, stream}, @@ -484,7 +483,7 @@ impl NetworkNodeHandle { /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize `msg` pub async fn direct_response( &self, - chan: ResponseChannel, + chan: ResponseChannel>, msg: &impl Serialize, ) -> Result<(), NetworkNodeHandleError> { let serialized_msg = bincode_opts().serialize(msg).context(SerializationSnafu)?; From dc8c348aaea6e96e74a03c5d9f2b951d613d1ea5 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 12 Feb 2024 11:38:06 -0500 Subject: [PATCH 02/11] cleanup --- crates/libp2p-networking/src/network/behaviours/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/libp2p-networking/src/network/behaviours/mod.rs b/crates/libp2p-networking/src/network/behaviours/mod.rs index 7d0feeeb25..869b6324c3 100644 --- a/crates/libp2p-networking/src/network/behaviours/mod.rs +++ b/crates/libp2p-networking/src/network/behaviours/mod.rs @@ -7,9 +7,5 @@ pub mod direct_message; /// exponential backoff type pub mod exponential_backoff; -/// Implementation of a codec for sending messages -/// for `RequestResponse` -pub mod direct_message_codec; - /// Wrapper around Kademlia pub mod dht; From aad398e5bdb65f3d9417a5064e8d9bc0be3b2dcf Mon Sep 17 00:00:00 2001 From: = Date: Mon, 12 Feb 2024 13:06:14 -0500 Subject: [PATCH 03/11] add a little timeout --- crates/testing/src/test_builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index 4cba204166..b3eafdc757 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -68,7 +68,7 @@ pub struct TestMetadata { impl Default for TimingData { fn default() -> Self { Self { - next_view_timeout: 2500, + next_view_timeout: 4000, timeout_ratio: (11, 10), round_start_delay: 100, start_delay: 100, From 1891e10efec678b741fcefd28edcb78ec77eacaa Mon Sep 17 00:00:00 2001 From: = Date: Mon, 12 Feb 2024 13:14:26 -0500 Subject: [PATCH 04/11] rename and move constant --- .../src/network/behaviours/direct_message.rs | 2 -- crates/libp2p-networking/src/network/node.rs | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/libp2p-networking/src/network/behaviours/direct_message.rs b/crates/libp2p-networking/src/network/behaviours/direct_message.rs index 0bc684583d..fd543f4348 100644 --- a/crates/libp2p-networking/src/network/behaviours/direct_message.rs +++ b/crates/libp2p-networking/src/network/behaviours/direct_message.rs @@ -13,8 +13,6 @@ use libp2p_identity::PeerId; use tracing::{error, info}; use super::exponential_backoff::ExponentialBackoff; -/// Maximum size of a direct message -pub const MAX_MSG_SIZE_DM: usize = 100_000_000; /// Request to direct message a peert pub struct DMRequest { diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index 24e9c93d56..de28d88f81 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -21,7 +21,7 @@ use super::{ use crate::network::behaviours::{ dht::{DHTBehaviour, DHTEvent, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST}, - direct_message::{DMBehaviour, DMEvent, MAX_MSG_SIZE_DM}, + direct_message::{DMBehaviour, DMEvent}, exponential_backoff::ExponentialBackoff, gossip::GossipEvent, }; @@ -60,6 +60,9 @@ use std::{ }; use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; +/// Maximum size of a message +pub const MAX_GOSSIP_MSG_SIZE: usize = 200_000_000; + /// Wrapped num of connections pub const ESTABLISHED_LIMIT: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(ESTABLISHED_LIMIT_UNWR) }; @@ -210,7 +213,7 @@ impl NetworkNode { .mesh_outbound_min(params.mesh_outbound_min) .mesh_n(params.mesh_n) .history_length(500) - .max_transmit_size(2 * MAX_MSG_SIZE_DM) + .max_transmit_size(MAX_GOSSIP_MSG_SIZE) // Use the (blake3) hash of a message as its ID .message_id_fn(message_id_fn) .build() From 2bff9ac3b48dc77e9ced86bfe9c737601ea27a98 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 13 Feb 2024 17:42:41 -0500 Subject: [PATCH 05/11] Comm channels gone from non tests --- crates/examples/infra/mod.rs | 187 +++++------------- crates/hotshot/src/lib.rs | 5 +- crates/hotshot/src/tasks/mod.rs | 17 +- crates/hotshot/src/traits.rs | 4 +- .../src/traits/networking/combined_network.rs | 75 ++----- .../src/traits/networking/libp2p_network.rs | 164 +-------------- .../src/traits/networking/memory_network.rs | 122 +----------- .../traits/networking/web_server_network.rs | 157 +-------------- crates/task-impls/src/consensus.rs | 2 +- crates/task-impls/src/da.rs | 2 +- crates/task-impls/src/network.rs | 18 +- crates/task-impls/src/vid.rs | 2 +- crates/task-impls/src/view_sync.rs | 2 +- crates/testing/src/spinning_task.rs | 2 +- crates/testing/src/test_builder.rs | 2 +- crates/testing/src/test_launcher.rs | 12 +- crates/testing/src/test_runner.rs | 8 +- crates/types/src/traits/network.rs | 6 + .../types/src/traits/node_implementation.rs | 38 ++-- 19 files changed, 158 insertions(+), 667 deletions(-) diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index e056c6af78..e949cecf40 100644 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -5,12 +5,12 @@ use async_lock::RwLock; use async_trait::async_trait; use clap::Parser; use futures::StreamExt; -use hotshot::traits::implementations::{CombinedCommChannel, CombinedNetworks}; +use hotshot::traits::implementations::{CombinedNetworks, CombinedNetworks}; use hotshot::{ traits::{ implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, - WebCommChannel, WebServerNetwork, + Libp2pNetwork, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, WebServerNetwork, + WebServerNetwork, }, NodeImplementation, }, @@ -37,7 +37,6 @@ use hotshot_types::{ traits::{ block_contents::TestableBlock, election::Membership, - network::CommunicationChannel, node_implementation::{ConsensusTime, NodeType}, states::TestableState, }, @@ -125,10 +124,8 @@ pub fn load_config_from_file( /// Runs the orchestrator pub async fn run_orchestrator< TYPES: NodeType, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DACHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMCHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation>, >( OrchestratorArgs { url, config_file }: OrchestratorArgs, @@ -300,14 +297,12 @@ async fn libp2p_network_from_config( #[async_trait] pub trait RunDA< TYPES: NodeType, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DANET: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMNET: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation< TYPES, - QuorumNetwork = QUORUMCHANNEL, - CommitteeNetwork = DACHANNEL, + QuorumNetwork = QUORUMNET, + CommitteeNetwork = DANET, Storage = MemoryStorage, >, > where @@ -484,16 +479,10 @@ pub trait RunDA< } /// Returns the da network for this run - fn get_da_channel(&self) -> DACHANNEL; + fn get_da_channel(&self) -> DANET; /// Returns the quorum network for this run - fn get_quorum_channel(&self) -> QUORUMCHANNEL; - - ///Returns view sync network for this run - fn get_view_sync_channel(&self) -> VIEWSYNCCHANNEL; - - ///Returns VID network for this run - fn get_vid_channel(&self) -> VIDCHANNEL; + fn get_quorum_channel(&self) -> QUORUMNET; /// Returns the config for this run fn get_config(&self) -> NetworkConfig; @@ -506,13 +495,9 @@ pub struct WebServerDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: WebCommChannel, + quorum_channel: WebServerNetwork, /// data availability channel - da_channel: WebCommChannel, - /// view sync channel - view_sync_channel: WebCommChannel, - /// vid channel - vid_channel: WebCommChannel, + da_channel: WebServerNetwork, } #[async_trait] @@ -525,19 +510,11 @@ impl< >, NODE: NodeImplementation< TYPES, - QuorumNetwork = WebCommChannel, - CommitteeNetwork = WebCommChannel, + QuorumNetwork = WebServerNetwork, + CommitteeNetwork = WebServerNetwork, Storage = MemoryStorage, >, - > - RunDA< - TYPES, - WebCommChannel, - WebCommChannel, - WebCommChannel, - WebCommChannel, - NODE, - > for WebServerDARun + > RunDA, WebServerNetwork, NODE> for WebServerDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -563,45 +540,28 @@ where underlying_quorum_network.wait_for_ready().await; // create communication channels - let quorum_channel: WebCommChannel = - WebCommChannel::new(underlying_quorum_network.clone().into()); - - let view_sync_channel: WebCommChannel = - WebCommChannel::new(underlying_quorum_network.into()); + let quorum_channel: WebServerNetwork = + WebServerNetwork::new(underlying_quorum_network.clone().into()); - let da_channel: WebCommChannel = WebCommChannel::new( + let da_channel: WebServerNetwork = WebServerNetwork::new( WebServerNetwork::create(url.clone(), wait_between_polls, pub_key.clone(), true).into(), ); - let vid_channel: WebCommChannel = WebCommChannel::new( - WebServerNetwork::create(url, wait_between_polls, pub_key, true).into(), - ); - WebServerDARun { config, quorum_channel, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> WebCommChannel { + fn get_da_channel(&self) -> WebServerNetwork { self.da_channel.clone() } - fn get_quorum_channel(&self) -> WebCommChannel { + fn get_quorum_channel(&self) -> WebServerNetwork { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> WebCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> WebCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -614,13 +574,9 @@ pub struct Libp2pDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: Libp2pCommChannel, + quorum_channel: Libp2pNetwork, /// data availability channel - da_channel: Libp2pCommChannel, - /// view sync channel - view_sync_channel: Libp2pCommChannel, - /// vid channel - vid_channel: Libp2pCommChannel, + da_channel: Libp2pNetwork, } #[async_trait] @@ -633,19 +589,11 @@ impl< >, NODE: NodeImplementation< TYPES, - QuorumNetwork = Libp2pCommChannel, - CommitteeNetwork = Libp2pCommChannel, + QuorumNetwork = Libp2pNetwork, + CommitteeNetwork = Libp2pNetwork, Storage = MemoryStorage, >, - > - RunDA< - TYPES, - Libp2pCommChannel, - Libp2pCommChannel, - Libp2pCommChannel, - Libp2pCommChannel, - NODE, - > for Libp2pDARun + > RunDA, Libp2pNetwork, NODE> for Libp2pDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -658,49 +606,26 @@ where let pub_key = config.config.my_own_validator_config.public_key.clone(); // create and wait for underlying network - let underlying_quorum_network = - libp2p_network_from_config::(config.clone(), pub_key).await; + let quorum_channel = libp2p_network_from_config::(config.clone(), pub_key).await; + let da_channel = underlying_quorum_network.clone().into(); underlying_quorum_network.wait_for_ready().await; - // create communication channels - let quorum_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - - let view_sync_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - - let da_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - - let vid_channel: Libp2pCommChannel = - Libp2pCommChannel::new(underlying_quorum_network.clone().into()); - Libp2pDARun { config, quorum_channel, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> Libp2pCommChannel { + fn get_da_channel(&self) -> Libp2pNetwork { self.da_channel.clone() } - fn get_quorum_channel(&self) -> Libp2pCommChannel { + fn get_quorum_channel(&self) -> Libp2pNetwork { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> Libp2pCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> Libp2pCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -713,13 +638,9 @@ pub struct CombinedDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: CombinedCommChannel, + quorum_channel: CombinedNetworks, /// data availability channel - da_channel: CombinedCommChannel, - /// view sync channel - view_sync_channel: CombinedCommChannel, - /// vid channel - vid_channel: CombinedCommChannel, + da_channel: CombinedNetworks, } #[async_trait] @@ -733,18 +654,10 @@ impl< NODE: NodeImplementation< TYPES, Storage = MemoryStorage, - QuorumNetwork = CombinedCommChannel, - CommitteeNetwork = CombinedCommChannel, + QuorumNetwork = CombinedNetworks, + CommitteeNetwork = CombinedNetworks, >, - > - RunDA< - TYPES, - CombinedCommChannel, - CombinedCommChannel, - CombinedCommChannel, - CombinedCommChannel, - NODE, - > for CombinedDARun + > RunDA, CombinedNetworks, NODE> for CombinedDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -783,23 +696,23 @@ where webserver_underlying_quorum_network.wait_for_ready().await; // combine the two communication channels - let quorum_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( + let quorum_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( webserver_underlying_quorum_network.clone(), libp2p_underlying_quorum_network.clone(), ))); - let view_sync_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( + let view_sync_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( webserver_underlying_quorum_network.clone(), libp2p_underlying_quorum_network.clone(), ))); - let da_channel: CombinedCommChannel = - CombinedCommChannel::new(Arc::new(CombinedNetworks( + let da_channel: CombinedNetworks = + CombinedNetworks::new(Arc::new(CombinedNetworks( webserver_underlying_da_network, libp2p_underlying_quorum_network.clone(), ))); - let vid_channel = CombinedCommChannel::new(Arc::new(CombinedNetworks( + let vid_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( webserver_underlying_quorum_network, libp2p_underlying_quorum_network, ))); @@ -808,27 +721,17 @@ where config, quorum_channel, da_channel, - view_sync_channel, - vid_channel, } } - fn get_da_channel(&self) -> CombinedCommChannel { + fn get_da_channel(&self) -> CombinedNetworks { self.da_channel.clone() } - fn get_quorum_channel(&self) -> CombinedCommChannel { + fn get_quorum_channel(&self) -> CombinedNetworks { self.quorum_channel.clone() } - fn get_view_sync_channel(&self) -> CombinedCommChannel { - self.view_sync_channel.clone() - } - - fn get_vid_channel(&self) -> CombinedCommChannel { - self.vid_channel.clone() - } - fn get_config(&self) -> NetworkConfig { self.config.clone() } @@ -844,10 +747,8 @@ pub async fn main_entry_point< BlockHeader = TestBlockHeader, InstanceState = TestInstanceState, >, - DACHANNEL: CommunicationChannel + Debug, - QUORUMCHANNEL: CommunicationChannel + Debug, - VIEWSYNCCHANNEL: CommunicationChannel + Debug, - VIDCHANNEL: CommunicationChannel + Debug, + DACHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, + QUORUMCHANNEL: ConnectedNetwork, TYPES::SignatureKey> + Debug, NODE: NodeImplementation< TYPES, QuorumNetwork = QUORUMCHANNEL, diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index e948c47879..b706e89df2 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -42,7 +42,8 @@ use hotshot_types::{ simple_certificate::QuorumCertificate, traits::{ consensus_api::ConsensusApi, - network::CommunicationChannel, + election::Membership, + network::ConnectedNetwork, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, states::ValidatedState, @@ -321,7 +322,7 @@ impl> SystemContext { sender: api.inner.public_key.clone(), kind: MessageKind::from(message), }, - da_membership, + da_membership.get_committee(TYPES::Time::new(0)), ), api .send_external_event(Event { diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 172f904b9f..40e50ea466 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -15,18 +15,21 @@ use hotshot_task_impls::{ vid::VIDTaskState, view_sync::ViewSyncTaskState, }; -use hotshot_types::traits::election::Membership; use hotshot_types::{ event::Event, message::Messages, traits::{ block_contents::vid_commitment, consensus_api::ConsensusApi, - network::{CommunicationChannel, ConsensusIntentEvent, TransmitType}, + network::{ConsensusIntentEvent, TransmitType}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, BlockPayload, }, }; +use hotshot_types::{ + message::Message, + traits::{election::Membership, network::ConnectedNetwork}, +}; use std::{ collections::{BTreeMap, HashMap, HashSet}, marker::PhantomData, @@ -45,7 +48,10 @@ pub enum GlobalEvent { } /// Add the network task to handle messages and publish events. -pub async fn add_network_message_task>( +pub async fn add_network_message_task< + TYPES: NodeType, + NET: ConnectedNetwork, TYPES::SignatureKey>, +>( task_reg: Arc, event_stream: Sender>, channel: NET, @@ -105,7 +111,10 @@ pub async fn add_network_message_task>( +pub async fn add_network_event_task< + TYPES: NodeType, + NET: ConnectedNetwork, TYPES::SignatureKey>, +>( task_reg: Arc, tx: Sender>, rx: Receiver>, diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index 3bd07c4c15..1671c5c145 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -13,8 +13,8 @@ pub use storage::{Result as StorageResult, Storage}; pub mod implementations { pub use super::{ networking::{ - combined_network::{calculate_hash_of, Cache, CombinedCommChannel, CombinedNetworks}, - libp2p_network::{Libp2pCommChannel, Libp2pNetwork, PeerInfoVec}, + combined_network::{calculate_hash_of, Cache, CombinedNetworks}, + libp2p_network::{Libp2pNetwork, PeerInfoVec}, memory_network::{MasterMap, MemoryCommChannel, MemoryNetwork}, web_server_network::{WebCommChannel, WebServerNetwork}, NetworkingMetricsValue, diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index ae093c5f72..82e00e4776 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -8,7 +8,7 @@ use hotshot_constants::{ COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL, }; use std::{ - collections::HashSet, + collections::{BTreeSet, HashSet}, hash::Hasher, sync::atomic::{AtomicU64, Ordering}, }; @@ -26,11 +26,7 @@ use hotshot_types::{ data::ViewNumber, message::Message, traits::{ - election::Membership, - network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, - TestableChannelImplementation, TransmitType, ViewMessage, - }, + network::{ConnectedNetwork, ConsensusIntentEvent, TransmitType}, node_implementation::NodeType, }, BoxSyncFuture, @@ -110,9 +106,9 @@ pub fn calculate_hash_of(t: &T) -> u64 { /// A communication channel with 2 networks, where we can fall back to the slower network if the /// primary fails #[derive(Clone, Debug)] -pub struct CombinedCommChannel { +pub struct CombinedNetworks { /// The two networks we'll use for send/recv - networks: Arc>, + networks: Arc>, /// Last n seen messages to prevent processing duplicates message_cache: Arc>, @@ -121,10 +117,10 @@ pub struct CombinedCommChannel { primary_down: Arc, } -impl CombinedCommChannel { +impl CombinedNetworks { /// Constructor #[must_use] - pub fn new(networks: Arc>) -> Self { + pub fn new(networks: Arc>) -> Self { Self { networks, message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), @@ -149,7 +145,7 @@ impl CombinedCommChannel { /// We need this so we can impl `TestableNetworkingImplementation` /// on the tuple #[derive(Debug, Clone)] -pub struct CombinedNetworks( +pub struct UnderlyingCombinedNetworks( pub WebServerNetwork, pub Libp2pNetwork, TYPES::SignatureKey>, ); @@ -184,39 +180,13 @@ impl TestableNetworkingImplementation for CombinedNetwor reliability_config, ) ); - Box::new(move |node_id| CombinedNetworks(generators.0(node_id), generators.1(node_id))) - } - - /// Get the number of messages in-flight. - /// - /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. - fn in_flight_message_count(&self) -> Option { - None - } -} - -#[cfg(feature = "hotshot-testing")] -impl TestableNetworkingImplementation for CombinedCommChannel { - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ); - Box::new(move |node_id| Self { - networks: generator(node_id).into(), - message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), - primary_down: Arc::new(AtomicU64::new(0)), + Box::new(move |node_id| { + let networks = UnderlyingCombinedNetworks(generators.0(node_id), generators.1(node_id)); + Self { + networks: Arc::new(networks), + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + } }) } @@ -229,9 +199,9 @@ impl TestableNetworkingImplementation for CombinedCommCh } #[async_trait] -impl CommunicationChannel for CombinedCommChannel { - type NETWORK = CombinedNetworks; - +impl ConnectedNetwork, TYPES::SignatureKey> + for CombinedNetworks +{ fn pause(&self) { self.networks.0.pause(); } @@ -265,11 +235,8 @@ impl CommunicationChannel for CombinedCommChannel async fn broadcast_message( &self, message: Message, - election: &TYPES::Membership, + recipients: BTreeSet, ) -> Result<(), NetworkError> { - let recipients = - ::Membership::get_committee(election, message.get_view_number()); - // broadcast optimistically on both networks, but if the primary network is down, skip it let primary_down = self.primary_down.load(Ordering::Relaxed); if primary_down < COMBINED_NETWORK_MIN_PRIMARY_FAILURES @@ -384,12 +351,6 @@ impl CommunicationChannel for CombinedCommChannel } } -impl TestableChannelImplementation for CombinedCommChannel { - fn generate_network() -> Box) -> Self + 'static> { - Box::new(move |network| CombinedCommChannel::new(network)) - } -} - #[cfg(test)] mod test { use super::*; diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 9a8162c6ba..d2bd9e45b8 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -20,10 +20,9 @@ use hotshot_types::{ data::ViewNumber, message::{Message, MessageKind}, traits::{ - election::Membership, network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, - NetworkError, NetworkMsg, TestableChannelImplementation, TransmitType, ViewMessage, + ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError, + NetworkMsg, TransmitType, ViewMessage, }, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, @@ -51,7 +50,6 @@ use std::{collections::HashSet, num::NonZeroUsize, str::FromStr}; use std::{ collections::BTreeSet, fmt::Debug, - marker::PhantomData, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -629,6 +627,14 @@ impl ConnectedNetwork for Libp2p self.wait_for_ready().await; } + fn pause(&self) { + unimplemented!("Pausing not implemented for the Libp2p network"); + } + + fn resume(&self) { + unimplemented!("Resuming not implemented for the Libp2p network"); + } + #[instrument(name = "Libp2pNetwork::ready_nonblocking", skip_all)] async fn is_ready(&self) -> bool { self.inner.is_ready.load(Ordering::Relaxed) @@ -886,153 +892,3 @@ impl ConnectedNetwork for Libp2p } } } - -/// libp2p identity communication channel -#[derive(Clone, Debug)] -pub struct Libp2pCommChannel( - Arc, TYPES::SignatureKey>>, - PhantomData, -); - -impl Libp2pCommChannel { - /// create a new libp2p communication channel - #[must_use] - pub fn new(network: Arc, TYPES::SignatureKey>>) -> Self { - Self(network, PhantomData) - } -} - -#[cfg(feature = "hotshot-testing")] -impl TestableNetworkingImplementation for Libp2pCommChannel -where - MessageKind: ViewMessage, -{ - /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork` - /// with the purpose of generating libp2p networks. - /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal - /// nodes with sane defaults. - /// # Panics - /// Returned function may panic either: - /// - An invalid configuration - /// (probably an issue with the defaults of this function) - /// - An inability to spin up the replica's network - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = , - TYPES::SignatureKey, - > as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config - ); - Box::new(move |node_id| Self(generator(node_id).into(), PhantomData)) - } - - fn in_flight_message_count(&self) -> Option { - None - } -} - -// FIXME maybe we should macro this...? It's repeated at verbatum EXCEPT for impl generics at the -// top -// we don't really want to make this the default implementation because that forces it to require ConnectedNetwork to be implemented. The struct we implement over might use multiple ConnectedNetworks -#[async_trait] -impl CommunicationChannel for Libp2pCommChannel -where - MessageKind: ViewMessage, -{ - type NETWORK = Libp2pNetwork, TYPES::SignatureKey>; - - fn pause(&self) { - unimplemented!("Pausing not implemented for the Libp2p network"); - } - - fn resume(&self) { - unimplemented!("Resuming not implemented for the Libp2p network"); - } - - async fn wait_for_ready(&self) { - self.0.wait_for_ready().await; - } - - async fn is_ready(&self) -> bool { - self.0.is_ready().await - } - - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - self.0.shut_down().await; - }; - boxed_sync(closure) - } - - async fn broadcast_message( - &self, - message: Message, - membership: &TYPES::Membership, - ) -> Result<(), NetworkError> { - let recipients = ::Membership::get_committee( - membership, - message.kind.get_view_number(), - ); - self.0.broadcast_message(message, recipients).await - } - - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } - - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { self.0.recv_msgs(transmit_type).await }; - boxed_sync(closure) - } - - async fn queue_node_lookup( - &self, - view_number: ViewNumber, - pk: TYPES::SignatureKey, - ) -> Result<(), UnboundedSendError>> { - self.0.queue_node_lookup(view_number, pk).await - } - - async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(&self.0, event) - .await; - } -} - -impl TestableChannelImplementation for Libp2pCommChannel { - fn generate_network( - ) -> Box, TYPES::SignatureKey>>) -> Self + 'static> - { - Box::new(move |network| Libp2pCommChannel::new(network)) - } -} diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index ae9f1c53dc..df70f77be9 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -15,13 +15,9 @@ use dashmap::DashMap; use futures::StreamExt; use hotshot_types::{ boxed_sync, - message::{Message, MessageKind}, + message::Message, traits::{ - election::Membership, - network::{ - CommunicationChannel, ConnectedNetwork, NetworkMsg, TestableChannelImplementation, - TestableNetworkingImplementation, TransmitType, ViewMessage, - }, + network::{ConnectedNetwork, NetworkMsg, TestableNetworkingImplementation, TransmitType}, node_implementation::NodeType, signature_key::SignatureKey, }, @@ -277,6 +273,14 @@ impl ConnectedNetwork for Memory #[instrument(name = "MemoryNetwork::ready_blocking")] async fn wait_for_ready(&self) {} + fn pause(&self) { + unimplemented!("Pausing not implemented for the Memory network"); + } + + fn resume(&self) { + unimplemented!("Resuming not implemented for the Memory network"); + } + #[instrument(name = "MemoryNetwork::ready_nonblocking")] async fn is_ready(&self) -> bool { true @@ -464,109 +468,3 @@ impl MemoryCommChannel { Self(network) } } - -impl TestableNetworkingImplementation for MemoryCommChannel -where - MessageKind: ViewMessage, -{ - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = , - TYPES::SignatureKey, - > as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ); - Box::new(move |node_id| Self(generator(node_id).into())) - } - - fn in_flight_message_count(&self) -> Option { - Some(self.0.inner.in_flight_message_count.load(Ordering::Relaxed)) - } -} - -#[async_trait] -impl CommunicationChannel for MemoryCommChannel -where - MessageKind: ViewMessage, -{ - type NETWORK = MemoryNetwork, TYPES::SignatureKey>; - - fn pause(&self) { - unimplemented!("Pausing not implemented for the memory network"); - } - - fn resume(&self) { - unimplemented!("Resuming not implemented for the memory network"); - } - - async fn wait_for_ready(&self) { - self.0.wait_for_ready().await; - } - - async fn is_ready(&self) -> bool { - self.0.is_ready().await - } - - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - self.0.shut_down().await; - }; - boxed_sync(closure) - } - - async fn broadcast_message( - &self, - message: Message, - election: &TYPES::Membership, - ) -> Result<(), NetworkError> { - let recipients = ::Membership::get_committee( - election, - message.kind.get_view_number(), - ); - self.0.broadcast_message(message, recipients).await - } - - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } - - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { self.0.recv_msgs(transmit_type).await }; - boxed_sync(closure) - } -} - -impl TestableChannelImplementation for MemoryCommChannel { - fn generate_network( - ) -> Box, TYPES::SignatureKey>>) -> Self + 'static> - { - Box::new(move |network| MemoryCommChannel::new(network)) - } -} diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index a5dcb90a30..c3efb8f9ac 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -18,9 +18,8 @@ use hotshot_types::{ message::{Message, MessagePurpose}, traits::{ network::{ - CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, NetworkError, NetworkMsg, - TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, - WebServerNetworkError, + ConnectedNetwork, ConsensusIntentEvent, NetworkError, NetworkMsg, + TestableNetworkingImplementation, TransmitType, WebServerNetworkError, }, node_implementation::NodeType, signature_key::SignatureKey, @@ -101,18 +100,6 @@ impl WebServerNetwork { source: WebServerNetworkError::ClientError, }) } - - /// Pauses the underlying network - pub fn pause(&self) { - error!("Pausing CDN network"); - self.inner.running.store(false, Ordering::Relaxed); - } - - /// Resumes the underlying network - pub fn resume(&self) { - error!("Resuming CDN network"); - self.inner.running.store(true, Ordering::Relaxed); - } } /// `TaskChannel` is a type alias for an unbounded sender channel that sends `ConsensusIntentEvent`s. @@ -723,105 +710,6 @@ impl WebServerNetwork { } } -#[async_trait] -impl CommunicationChannel for WebCommChannel { - type NETWORK = WebServerNetwork; - /// Blocks until node is successfully initialized - /// into the network - async fn wait_for_ready(&self) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::wait_for_ready(&self.0) - .await; - } - - fn pause(&self) { - self.0.pause(); - } - - fn resume(&self) { - self.0.resume(); - } - - /// checks if the network is ready - /// nonblocking - async fn is_ready(&self) -> bool { - as ConnectedNetwork, TYPES::SignatureKey>>::is_ready( - &self.0, - ) - .await - } - - /// Shut down this network. Afterwards this network should no longer be used. - /// - /// This should also cause other functions to immediately return with a [`NetworkError`] - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::shut_down(&self.0) - .await; - }; - boxed_sync(closure) - } - - /// broadcast message to those listening on the communication channel - /// blocking - async fn broadcast_message( - &self, - message: Message, - _election: &TYPES::Membership, - ) -> Result<(), NetworkError> { - self.0.broadcast_message(message, BTreeSet::new()).await - } - - /// Sends a direct message to a specific node - /// blocking - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError> { - self.0.direct_message(message, recipient).await - } - - /// Moves out the entire queue of received messages of 'transmit_type` - /// - /// Will unwrap the underlying `NetworkMessage` - /// blocking - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b, - { - let closure = async move { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::recv_msgs(&self.0, transmit_type) - .await - }; - boxed_sync(closure) - } - - async fn inject_consensus_info(&self, event: ConsensusIntentEvent) { - as ConnectedNetwork< - Message, - TYPES::SignatureKey, - >>::inject_consensus_info(&self.0, event) - .await; - } -} - #[async_trait] impl ConnectedNetwork, TYPES::SignatureKey> for WebServerNetwork @@ -832,7 +720,15 @@ impl ConnectedNetwork, TYPES::Signatur async_sleep(Duration::from_secs(1)).await; } } + fn pause(&self) { + error!("Pausing CDN network"); + self.inner.running.store(false, Ordering::Relaxed); + } + fn resume(&self) { + error!("Resuming CDN network"); + self.inner.running.store(true, Ordering::Relaxed); + } /// checks if the network is ready /// nonblocking async fn is_ready(&self) -> bool { @@ -1396,36 +1292,3 @@ impl TestableNetworkingImplementation for WebServerNetwo None } } - -impl TestableNetworkingImplementation for WebCommChannel { - fn generator( - expected_node_count: usize, - num_bootstrap: usize, - network_id: usize, - da_committee_size: usize, - is_da: bool, - _reliability_config: Option>, - ) -> Box Self + 'static> { - let generator = as TestableNetworkingImplementation<_>>::generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - // network reliability is a testing feature - // not yet implemented for webcommchannel - None, - ); - Box::new(move |node_id| Self(generator(node_id).into())) - } - - fn in_flight_message_count(&self) -> Option { - None - } -} - -impl TestableChannelImplementation for WebCommChannel { - fn generate_network() -> Box>) -> Self + 'static> { - Box::new(move |network| WebCommChannel::new(network)) - } -} diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 1cc0af9e75..af0ff75b08 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -25,7 +25,7 @@ use hotshot_types::{ block_contents::BlockHeader, consensus_api::ConsensusApi, election::Membership, - network::{CommunicationChannel, ConsensusIntentEvent}, + network::{ConnectedNetwork, ConsensusIntentEvent}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, signature_key::SignatureKey, states::ValidatedState, diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 577afbaca6..8c574d7eab 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -18,7 +18,7 @@ use hotshot_types::{ block_contents::vid_commitment, consensus_api::ConsensusApi, election::Membership, - network::{CommunicationChannel, ConsensusIntentEvent}, + network::{ConnectedNetwork, ConsensusIntentEvent}, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, signature_key::SignatureKey, }, diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 92dd284aa2..b38839085b 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -13,7 +13,7 @@ use hotshot_types::{ }, traits::{ election::Membership, - network::{CommunicationChannel, TransmitType}, + network::{ConnectedNetwork, TransmitType, ViewMessage}, node_implementation::NodeType, }, vote::{HasViewNumber, Vote}, @@ -181,7 +181,10 @@ impl NetworkMessageTaskState { } /// network event task state -pub struct NetworkEventTaskState> { +pub struct NetworkEventTaskState< + TYPES: NodeType, + COMMCHANNEL: ConnectedNetwork, TYPES::SignatureKey>, +> { /// comm channel pub channel: COMMCHANNEL, /// view number @@ -193,7 +196,7 @@ pub struct NetworkEventTaskState) -> bool, } -impl> TaskState +impl, TYPES::SignatureKey>> TaskState for NetworkEventTaskState { type Event = HotShotEvent; @@ -221,7 +224,7 @@ impl> TaskState } } -impl> +impl, TYPES::SignatureKey>> NetworkEventTaskState { /// Handle the given event. @@ -364,13 +367,18 @@ impl> sender, kind: message_kind, }; + let view = message.kind.get_view_number(); let transmit_result = match transmit_type { TransmitType::Direct => { self.channel .direct_message(message, recipient.unwrap()) .await } - TransmitType::Broadcast => self.channel.broadcast_message(message, membership).await, + TransmitType::Broadcast => { + self.channel + .broadcast_message(message, membership.get_committee(view)) + .await + } }; match transmit_result { diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index d07aeb2c10..be40cb8e01 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -6,7 +6,7 @@ use async_lock::RwLock; use async_std::task::spawn_blocking; use hotshot_task::task::{Task, TaskState}; -use hotshot_types::traits::network::CommunicationChannel; +use hotshot_types::traits::network::ConnectedNetwork; use hotshot_types::{ consensus::Consensus, data::VidDisperse, diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 04d47cda08..03ade80ae5 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -31,7 +31,7 @@ use hotshot_types::{ traits::{ consensus_api::ConsensusApi, election::Membership, - network::CommunicationChannel, + network::ConnectedNetwork, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, }, }; diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index 017e1497a0..6235900810 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -6,7 +6,7 @@ use crate::test_runner::HotShotTaskCompleted; use crate::test_runner::LateStartNode; use crate::test_runner::Node; use hotshot_task::task::{Task, TaskState, TestTaskState}; -use hotshot_types::traits::network::CommunicationChannel; +use hotshot_types::traits::network::ConnectedNetwork; use hotshot_types::{event::Event, traits::node_implementation::NodeType}; use snafu::Snafu; use std::collections::BTreeMap; diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index b3eafdc757..567382aeba 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -284,7 +284,7 @@ impl TestMetadata { TestLauncher { resource_generator: ResourceGenerators { - channel_generator: >::gen_comm_channels( + channel_generator: >::gen_networks( total_nodes, num_bootstrap_nodes, da_committee_size, diff --git a/crates/testing/src/test_launcher.rs b/crates/testing/src/test_launcher.rs index df7d0a6a47..d0616d8e2d 100644 --- a/crates/testing/src/test_launcher.rs +++ b/crates/testing/src/test_launcher.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use hotshot::traits::{NodeImplementation, TestableNodeImplementation}; use hotshot_types::{ - traits::{network::CommunicationChannel, node_implementation::NodeType}, + traits::{network::ConnectedNetwork, node_implementation::NodeType}, HotShotConfig, }; @@ -17,12 +17,6 @@ pub type Networks = ( /// Wrapper for a function that takes a `node_id` and returns an instance of `T`. pub type Generator = Box T + 'static>; -/// Wrapper Type for committee function that takes a `ConnectedNetwork` and returns a `CommunicationChannel` -pub type CommitteeNetworkGenerator = Box) -> T + 'static>; - -/// Wrapper Type for view sync function that takes a `ConnectedNetwork` and returns a `CommunicationChannel` -pub type ViewSyncNetworkGenerator = Box) -> T + 'static>; - /// generators for resources used by each node pub struct ResourceGenerators> { /// generate channels @@ -44,7 +38,9 @@ pub struct TestLauncher> { impl> TestLauncher { /// launch the test #[must_use] - pub fn launch>(self) -> TestRunner { + pub fn launch, TYPES::SignatureKey>>( + self, + ) -> TestRunner { TestRunner:: { launcher: self, nodes: Vec::new(), diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 467b7b734d..19015f0129 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -20,9 +20,7 @@ use hotshot::{traits::TestableNodeImplementation, HotShotInitializer, SystemCont use hotshot_constants::EVENT_CHANNEL_SIZE; use hotshot_task::task::{Task, TaskRegistry, TestTask}; -use hotshot_types::traits::{ - network::CommunicationChannel, node_implementation::NodeImplementation, -}; +use hotshot_types::traits::{network::ConnectedNetwork, node_implementation::NodeImplementation}; use hotshot_types::{ consensus::ConsensusMetricsValue, traits::{ @@ -65,7 +63,7 @@ pub struct LateStartNode> pub struct TestRunner< TYPES: NodeType, I: TestableNodeImplementation, - N: CommunicationChannel, + N: ConnectedNetwork, TYPES::SignatureKey>, > { /// test launcher, contains a bunch of useful metadata and closures pub(crate) launcher: TestLauncher, @@ -102,7 +100,7 @@ impl TaskErr for T {} impl< TYPES: NodeType, I: TestableNodeImplementation, - N: CommunicationChannel, + N: ConnectedNetwork, TYPES::SignatureKey>, > TestRunner where I: TestableNodeImplementation, diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index a2c8357a29..2421e1e329 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -304,6 +304,12 @@ pub trait CommunicationChannel: Clone + Debug + Send + Sync + ' pub trait ConnectedNetwork: Clone + Send + Sync + 'static { + /// Pauses the underlying network + fn pause(&self); + + /// Resumes the underlying network + fn resume(&self); + /// Blocks until the network is successfully initialized async fn wait_for_ready(&self); diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 7cd2ef070e..02ddf7f5fe 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -6,16 +6,17 @@ use super::{ block_contents::{BlockHeader, TestableBlock, Transaction}, election::ElectionConfig, - network::{CommunicationChannel, NetworkReliability, TestableNetworkingImplementation}, + network::{ConnectedNetwork, NetworkReliability, TestableNetworkingImplementation}, states::TestableState, storage::{StorageError, StorageState, TestableStorage}, ValidatedState, }; use crate::{ data::{Leaf, TestableLeaf}, + message::Message, traits::{ - election::Membership, network::TestableChannelImplementation, signature_key::SignatureKey, - states::InstanceState, storage::Storage, BlockPayload, + election::Membership, signature_key::SignatureKey, states::InstanceState, storage::Storage, + BlockPayload, }, }; use async_trait::async_trait; @@ -44,9 +45,9 @@ pub trait NodeImplementation: type Storage: Storage + Clone; /// Network for all nodes - type QuorumNetwork: CommunicationChannel; + type QuorumNetwork: ConnectedNetwork, TYPES::SignatureKey>; /// Network for those in the DA committee - type CommitteeNetwork: CommunicationChannel; + type CommitteeNetwork: ConnectedNetwork, TYPES::SignatureKey>; } /// extra functions required on a node implementation to be usable by hotshot-testing @@ -94,12 +95,12 @@ pub trait TestableNodeImplementation: NodeImplementation async fn get_full_state(storage: &Self::Storage) -> StorageState; /// Generate the communication channels for testing - fn gen_comm_channels( + fn gen_networks( expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, reliability_config: Option>, - ) -> Box (Self::QuorumNetwork, Self::QuorumNetwork)>; + ) -> Box (Arc, Arc)>; } #[async_trait] @@ -108,12 +109,8 @@ where TYPES::ValidatedState: TestableState, TYPES::BlockPayload: TestableBlock, I::Storage: TestableStorage, - I::QuorumNetwork: TestableChannelImplementation, - I::CommitteeNetwork: TestableChannelImplementation, - <>::QuorumNetwork as CommunicationChannel>::NETWORK: - TestableNetworkingImplementation, - <>::CommitteeNetwork as CommunicationChannel>::NETWORK: - TestableNetworkingImplementation, + I::QuorumNetwork: TestableNetworkingImplementation, + I::CommitteeNetwork: TestableNetworkingImplementation, { type CommitteeElectionConfig = TYPES::ElectionConfigType; @@ -153,13 +150,14 @@ where async fn get_full_state(storage: &Self::Storage) -> StorageState { >::get_full_state(storage).await } - fn gen_comm_channels( + fn gen_networks( expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, reliability_config: Option>, - ) -> Box (Self::QuorumNetwork, Self::QuorumNetwork)> { - let quorum_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( + ) -> Box (Arc, Arc)> { + let quorum_generator = + >::generator( expected_node_count, num_bootstrap, 0, @@ -167,7 +165,7 @@ where false, reliability_config.clone(), ); - let da_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( + let da_generator = >::generator( expected_node_count, num_bootstrap, 1, @@ -179,11 +177,7 @@ where Box::new(move |id| { let quorum = Arc::new(quorum_generator(id)); let da = Arc::new(da_generator(id)); - let quorum_chan = - >::generate_network()(quorum); - let committee_chan = - >::generate_network()(da); - (quorum_chan, committee_chan) + (quorum, da) }) } } From bbd7f2b9497a078217e38858db6c2cd6426b39c0 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 13 Feb 2024 18:53:04 -0500 Subject: [PATCH 06/11] Completed the removal --- crates/example-types/src/node_types.rs | 30 ++++--- crates/examples/combined/all.rs | 24 ++---- crates/examples/combined/multi-validator.rs | 15 +--- crates/examples/combined/orchestrator.rs | 8 +- crates/examples/combined/types.rs | 10 +-- crates/examples/combined/validator.rs | 14 +--- crates/examples/infra/mod.rs | 78 ++++++++---------- crates/examples/libp2p/all.rs | 24 ++---- crates/examples/libp2p/multi-validator.rs | 15 +--- crates/examples/libp2p/orchestrator.rs | 7 +- crates/examples/libp2p/types.rs | 15 ++-- crates/examples/libp2p/validator.rs | 14 +--- crates/examples/webserver/all.rs | 25 ++---- crates/examples/webserver/multi-validator.rs | 15 +--- crates/examples/webserver/orchestrator.rs | 8 +- crates/examples/webserver/types.rs | 10 +-- crates/examples/webserver/validator.rs | 14 +--- crates/hotshot/src/lib.rs | 4 +- crates/hotshot/src/tasks/mod.rs | 18 ++--- crates/hotshot/src/traits.rs | 8 +- .../src/traits/networking/memory_network.rs | 14 ---- .../traits/networking/web_server_network.rs | 12 --- crates/task-impls/src/network.rs | 4 +- crates/testing/src/test_launcher.rs | 5 +- crates/testing/src/test_runner.rs | 5 +- crates/testing/tests/da_task.rs | 2 +- crates/testing/tests/memory_network.rs | 10 +-- crates/testing/tests/vid_task.rs | 2 +- crates/testing/tests/view_sync_task.rs | 2 +- crates/types/src/traits/network.rs | 79 +------------------ 30 files changed, 146 insertions(+), 345 deletions(-) diff --git a/crates/example-types/src/node_types.rs b/crates/example-types/src/node_types.rs index 18de95cc22..b84c2da482 100644 --- a/crates/example-types/src/node_types.rs +++ b/crates/example-types/src/node_types.rs @@ -8,12 +8,13 @@ use crate::{ use hotshot::traits::{ election::static_committee::{StaticCommittee, StaticElectionConfig}, implementations::{ - CombinedCommChannel, Libp2pCommChannel, MemoryCommChannel, MemoryStorage, WebCommChannel, + CombinedNetworks, Libp2pNetwork, MemoryNetwork, MemoryStorage, WebServerNetwork, }, NodeImplementation, }; use hotshot_types::{ - data::ViewNumber, signature_key::BLSPubKey, traits::node_implementation::NodeType, + data::ViewNumber, message::Message, signature_key::BLSPubKey, + traits::node_implementation::NodeType, }; use serde::{Deserialize, Serialize}; @@ -65,34 +66,31 @@ pub struct CombinedImpl; pub type StaticMembership = StaticCommittee; /// memory network -pub type StaticMemoryDAComm = MemoryCommChannel; +pub type StaticMemoryDAComm = + MemoryNetwork, ::SignatureKey>; /// libp2p network -type StaticLibp2pDAComm = Libp2pCommChannel; +type StaticLibp2pDAComm = Libp2pNetwork, ::SignatureKey>; /// web server network communication channel -type StaticWebDAComm = WebCommChannel; +type StaticWebDAComm = WebServerNetwork; /// combined network -type StaticCombinedDAComm = CombinedCommChannel; +type StaticCombinedDAComm = CombinedNetworks; /// memory comm channel -pub type StaticMemoryQuorumComm = MemoryCommChannel; +pub type StaticMemoryQuorumComm = + MemoryNetwork, ::SignatureKey>; /// libp2p comm channel -type StaticLibp2pQuorumComm = Libp2pCommChannel; +type StaticLibp2pQuorumComm = + Libp2pNetwork, ::SignatureKey>; /// web server comm channel -type StaticWebQuorumComm = WebCommChannel; +type StaticWebQuorumComm = WebServerNetwork; /// combined network (libp2p + web server) -type StaticCombinedQuorumComm = CombinedCommChannel; - -/// memory network -pub type StaticMemoryViewSyncComm = MemoryCommChannel; - -/// memory network -pub type StaticMemoryVIDComm = MemoryCommChannel; +type StaticCombinedQuorumComm = CombinedNetworks; impl NodeImplementation for Libp2pImpl { type Storage = MemoryStorage; diff --git a/crates/examples/combined/all.rs b/crates/examples/combined/all.rs index 1d6f1c0a89..6249abb909 100644 --- a/crates/examples/combined/all.rs +++ b/crates/examples/combined/all.rs @@ -20,7 +20,7 @@ use tracing::{error, instrument}; use crate::{ infra::run_orchestrator, infra::{ConfigArgs, OrchestratorArgs}, - types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; /// general infra used for this example @@ -78,8 +78,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -96,19 +94,13 @@ async fn main() { for _ in 0..config.config.total_nodes.into() { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url, - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/combined/multi-validator.rs b/crates/examples/combined/multi-validator.rs index d97a7f7bc3..f903f1e57c 100644 --- a/crates/examples/combined/multi-validator.rs +++ b/crates/examples/combined/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/combined/orchestrator.rs b/crates/examples/combined/orchestrator.rs index d4ced1536c..0069093505 100644 --- a/crates/examples/combined/orchestrator.rs +++ b/crates/examples/combined/orchestrator.rs @@ -6,11 +6,10 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::instrument; -use types::VIDNetwork; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -27,8 +26,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/combined/types.rs b/crates/examples/combined/types.rs index 94c316f7ab..9a904b592f 100644 --- a/crates/examples/combined/types.rs +++ b/crates/examples/combined/types.rs @@ -1,5 +1,5 @@ use crate::infra::CombinedDARun; -use hotshot::traits::implementations::{CombinedCommChannel, MemoryStorage}; +use hotshot::traits::implementations::{CombinedNetworks, MemoryStorage}; use hotshot_example_types::state_types::TestTypes; use hotshot_types::traits::node_implementation::NodeImplementation; use serde::{Deserialize, Serialize}; @@ -10,13 +10,13 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = CombinedCommChannel; +pub type DANetwork = CombinedNetworks; /// convenience type alias -pub type VIDNetwork = CombinedCommChannel; +pub type VIDNetwork = CombinedNetworks; /// convenience type alias -pub type QuorumNetwork = CombinedCommChannel; +pub type QuorumNetwork = CombinedNetworks; /// convenience type alias -pub type ViewSyncNetwork = CombinedCommChannel; +pub type ViewSyncNetwork = CombinedNetworks; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/combined/validator.rs b/crates/examples/combined/validator.rs index d0493134d6..38c8dbe0b8 100644 --- a/crates/examples/combined/validator.rs +++ b/crates/examples/combined/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index e949cecf40..1bbf1e786c 100644 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -5,13 +5,10 @@ use async_lock::RwLock; use async_trait::async_trait; use clap::Parser; use futures::StreamExt; -use hotshot::traits::implementations::{CombinedNetworks, CombinedNetworks}; +use hotshot::traits::implementations::{CombinedNetworks, UnderlyingCombinedNetworks}; use hotshot::{ traits::{ - implementations::{ - Libp2pNetwork, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, WebServerNetwork, - WebServerNetwork, - }, + implementations::{Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, WebServerNetwork}, NodeImplementation, }, types::{SignatureKey, SystemContextHandle}, @@ -344,8 +341,8 @@ pub trait RunDA< config.config.da_committee_size.try_into().unwrap(), ); let networks_bundle = Networks { - quorum_network: quorum_network.clone(), - da_network: da_network.clone(), + quorum_network: quorum_network.clone().into(), + da_network: da_network.clone().into(), _pd: PhantomData, }; @@ -539,17 +536,12 @@ where underlying_quorum_network.wait_for_ready().await; - // create communication channels - let quorum_channel: WebServerNetwork = - WebServerNetwork::new(underlying_quorum_network.clone().into()); - - let da_channel: WebServerNetwork = WebServerNetwork::new( - WebServerNetwork::create(url.clone(), wait_between_polls, pub_key.clone(), true).into(), - ); + let da_channel: WebServerNetwork = + WebServerNetwork::create(url.clone(), wait_between_polls, pub_key.clone(), true); WebServerDARun { config, - quorum_channel, + quorum_channel: underlying_quorum_network, da_channel, } } @@ -574,9 +566,9 @@ pub struct Libp2pDARun { /// the network configuration config: NetworkConfig, /// quorum channel - quorum_channel: Libp2pNetwork, + quorum_channel: Libp2pNetwork, TYPES::SignatureKey>, /// data availability channel - da_channel: Libp2pNetwork, + da_channel: Libp2pNetwork, TYPES::SignatureKey>, } #[async_trait] @@ -589,11 +581,17 @@ impl< >, NODE: NodeImplementation< TYPES, - QuorumNetwork = Libp2pNetwork, - CommitteeNetwork = Libp2pNetwork, + QuorumNetwork = Libp2pNetwork, TYPES::SignatureKey>, + CommitteeNetwork = Libp2pNetwork, TYPES::SignatureKey>, Storage = MemoryStorage, >, - > RunDA, Libp2pNetwork, NODE> for Libp2pDARun + > + RunDA< + TYPES, + Libp2pNetwork, TYPES::SignatureKey>, + Libp2pNetwork, TYPES::SignatureKey>, + NODE, + > for Libp2pDARun where ::ValidatedState: TestableState, ::BlockPayload: TestableBlock, @@ -608,8 +606,8 @@ where // create and wait for underlying network let quorum_channel = libp2p_network_from_config::(config.clone(), pub_key).await; - let da_channel = underlying_quorum_network.clone().into(); - underlying_quorum_network.wait_for_ready().await; + let da_channel = quorum_channel.clone(); + quorum_channel.wait_for_ready().await; Libp2pDARun { config, @@ -618,11 +616,11 @@ where } } - fn get_da_channel(&self) -> Libp2pNetwork { + fn get_da_channel(&self) -> Libp2pNetwork, TYPES::SignatureKey> { self.da_channel.clone() } - fn get_quorum_channel(&self) -> Libp2pNetwork { + fn get_quorum_channel(&self) -> Libp2pNetwork, TYPES::SignatureKey> { self.quorum_channel.clone() } @@ -687,34 +685,22 @@ where }: WebServerConfig = config.clone().da_web_server_config.unwrap(); // create and wait for underlying webserver network - let webserver_underlying_quorum_network = + let web_quorum_network = webserver_network_from_config::(config.clone(), pub_key.clone()); - let webserver_underlying_da_network = - WebServerNetwork::create(url, wait_between_polls, pub_key, true); + let web_da_network = WebServerNetwork::create(url, wait_between_polls, pub_key, true); - webserver_underlying_quorum_network.wait_for_ready().await; + web_quorum_network.wait_for_ready().await; - // combine the two communication channels - let quorum_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network.clone(), - libp2p_underlying_quorum_network.clone(), - ))); + // combine the two communication channel - let view_sync_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network.clone(), + let da_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks( + web_da_network.clone(), libp2p_underlying_quorum_network.clone(), ))); - - let da_channel: CombinedNetworks = - CombinedNetworks::new(Arc::new(CombinedNetworks( - webserver_underlying_da_network, - libp2p_underlying_quorum_network.clone(), - ))); - - let vid_channel = CombinedNetworks::new(Arc::new(CombinedNetworks( - webserver_underlying_quorum_network, - libp2p_underlying_quorum_network, + let quorum_channel = CombinedNetworks::new(Arc::new(UnderlyingCombinedNetworks( + web_quorum_network.clone(), + libp2p_underlying_quorum_network.clone(), ))); CombinedDARun { @@ -755,7 +741,7 @@ pub async fn main_entry_point< CommitteeNetwork = DACHANNEL, Storage = MemoryStorage, >, - RUNDA: RunDA, + RUNDA: RunDA, >( args: ValidatorArgs, ) where diff --git a/crates/examples/libp2p/all.rs b/crates/examples/libp2p/all.rs index 5b67f667d7..ddc8b472b9 100644 --- a/crates/examples/libp2p/all.rs +++ b/crates/examples/libp2p/all.rs @@ -18,7 +18,7 @@ use tracing::instrument; use crate::{ infra::run_orchestrator, infra::{ConfigArgs, OrchestratorArgs}, - types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; /// general infra used for this example @@ -44,8 +44,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -61,19 +59,13 @@ async fn main() { for _ in 0..config.config.total_nodes.into() { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url, - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/libp2p/multi-validator.rs b/crates/examples/libp2p/multi-validator.rs index aec3325383..e085b498f6 100644 --- a/crates/examples/libp2p/multi-validator.rs +++ b/crates/examples/libp2p/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/libp2p/orchestrator.rs b/crates/examples/libp2p/orchestrator.rs index c26fc73bba..42e23a39d6 100644 --- a/crates/examples/libp2p/orchestrator.rs +++ b/crates/examples/libp2p/orchestrator.rs @@ -10,7 +10,7 @@ use tracing::instrument; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, VIDNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -27,8 +27,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/libp2p/types.rs b/crates/examples/libp2p/types.rs index af535db8d7..500581b9d6 100644 --- a/crates/examples/libp2p/types.rs +++ b/crates/examples/libp2p/types.rs @@ -1,7 +1,10 @@ use crate::infra::Libp2pDARun; -use hotshot::traits::implementations::{Libp2pCommChannel, MemoryStorage}; +use hotshot::traits::implementations::{Libp2pNetwork, MemoryStorage}; use hotshot_example_types::state_types::TestTypes; -use hotshot_types::traits::node_implementation::NodeImplementation; +use hotshot_types::{ + message::Message, + traits::node_implementation::{NodeImplementation, NodeType}, +}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -10,13 +13,9 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = Libp2pCommChannel; +pub type DANetwork = Libp2pNetwork, ::SignatureKey>; /// convenience type alias -pub type VIDNetwork = Libp2pCommChannel; -/// convenience type alias -pub type QuorumNetwork = Libp2pCommChannel; -/// convenience type alias -pub type ViewSyncNetwork = Libp2pCommChannel; +pub type QuorumNetwork = Libp2pNetwork, ::SignatureKey>; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/libp2p/validator.rs b/crates/examples/libp2p/validator.rs index 9873cac76e..cebcd44d04 100644 --- a/crates/examples/libp2p/validator.rs +++ b/crates/examples/libp2p/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/examples/webserver/all.rs b/crates/examples/webserver/all.rs index 99de0d9f93..d05ec4492d 100644 --- a/crates/examples/webserver/all.rs +++ b/crates/examples/webserver/all.rs @@ -7,7 +7,7 @@ use crate::infra::{ConfigArgs, OrchestratorArgs}; use crate::types::ThisRun; use crate::{ infra::run_orchestrator, - types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}, + types::{DANetwork, NodeImpl, QuorumNetwork}, }; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; @@ -24,7 +24,6 @@ use hotshot_orchestrator::config::NetworkConfig; use hotshot_types::traits::node_implementation::NodeType; use surf_disco::Url; use tracing::error; -use types::VIDNetwork; #[cfg_attr(async_executor_impl = "tokio", tokio::main)] #[cfg_attr(async_executor_impl = "async-std", async_std::main)] @@ -70,8 +69,6 @@ async fn main() { TestTypes, DANetwork, QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, NodeImpl, >(OrchestratorArgs { url: orchestrator_url.clone(), @@ -87,19 +84,13 @@ async fn main() { for _ in 0..(config.config.total_nodes.get()) { let orchestrator_url = orchestrator_url.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs { - url: orchestrator_url, - public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - network_config_file: None, - }) + infra::main_entry_point::( + ValidatorArgs { + url: orchestrator_url, + public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + network_config_file: None, + }, + ) .await; }); nodes.push(node); diff --git a/crates/examples/webserver/multi-validator.rs b/crates/examples/webserver/multi-validator.rs index b975f07b0f..61d81c79c5 100644 --- a/crates/examples/webserver/multi-validator.rs +++ b/crates/examples/webserver/multi-validator.rs @@ -7,9 +7,8 @@ use clap::Parser; use hotshot_example_types::state_types::TestTypes; use hotshot_orchestrator::client::{MultiValidatorArgs, ValidatorArgs}; use tracing::instrument; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; /// types used for this example pub mod types; @@ -34,15 +33,9 @@ async fn main() { let args = args.clone(); let node = async_spawn(async move { - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(ValidatorArgs::from_multi_args(args, node_index)) + infra::main_entry_point::( + ValidatorArgs::from_multi_args(args, node_index), + ) .await; }); nodes.push(node); diff --git a/crates/examples/webserver/orchestrator.rs b/crates/examples/webserver/orchestrator.rs index 62f2006f2e..49080dae8a 100644 --- a/crates/examples/webserver/orchestrator.rs +++ b/crates/examples/webserver/orchestrator.rs @@ -7,11 +7,10 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::instrument; -use types::VIDNetwork; use crate::infra::run_orchestrator; use crate::infra::OrchestratorArgs; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork}; /// general infra used for this example #[path = "../infra/mod.rs"] @@ -28,8 +27,5 @@ async fn main() { setup_backtrace(); let args = OrchestratorArgs::parse(); - run_orchestrator::( - args, - ) - .await; + run_orchestrator::(args).await; } diff --git a/crates/examples/webserver/types.rs b/crates/examples/webserver/types.rs index 0e67f7a742..95abc27be1 100644 --- a/crates/examples/webserver/types.rs +++ b/crates/examples/webserver/types.rs @@ -1,5 +1,5 @@ use crate::infra::WebServerDARun; -use hotshot::traits::implementations::{MemoryStorage, WebCommChannel}; +use hotshot::traits::implementations::{MemoryStorage, WebServerNetwork}; use hotshot_example_types::state_types::TestTypes; use hotshot_types::traits::node_implementation::NodeImplementation; use serde::{Deserialize, Serialize}; @@ -10,13 +10,13 @@ use std::fmt::Debug; pub struct NodeImpl {} /// convenience type alias -pub type DANetwork = WebCommChannel; +pub type DANetwork = WebServerNetwork; /// convenience type alias -pub type VIDNetwork = WebCommChannel; +pub type VIDNetwork = WebServerNetwork; /// convenience type alias -pub type QuorumNetwork = WebCommChannel; +pub type QuorumNetwork = WebServerNetwork; /// convenience type alias -pub type ViewSyncNetwork = WebCommChannel; +pub type ViewSyncNetwork = WebServerNetwork; impl NodeImplementation for NodeImpl { type Storage = MemoryStorage; diff --git a/crates/examples/webserver/validator.rs b/crates/examples/webserver/validator.rs index 96bcde1807..e335cae2be 100644 --- a/crates/examples/webserver/validator.rs +++ b/crates/examples/webserver/validator.rs @@ -3,9 +3,8 @@ use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use clap::Parser; use hotshot_example_types::state_types::TestTypes; use tracing::{info, instrument}; -use types::VIDNetwork; -use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun, ViewSyncNetwork}; +use crate::types::{DANetwork, NodeImpl, QuorumNetwork, ThisRun}; use hotshot_orchestrator::client::ValidatorArgs; @@ -27,14 +26,5 @@ async fn main() { setup_backtrace(); let args = ValidatorArgs::parse(); info!("connecting to orchestrator at {:?}", args.url); - infra::main_entry_point::< - TestTypes, - DANetwork, - QuorumNetwork, - ViewSyncNetwork, - VIDNetwork, - NodeImpl, - ThisRun, - >(args) - .await; + infra::main_entry_point::(args).await; } diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index b706e89df2..240c740f06 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -79,10 +79,10 @@ pub const H_256: usize = 32; /// Bundle of the networks used in consensus pub struct Networks> { /// Newtork for reaching all nodes - pub quorum_network: I::QuorumNetwork, + pub quorum_network: Arc, /// Network for reaching the DA committee - pub da_network: I::CommitteeNetwork, + pub da_network: Arc, /// Phantom for TYPES and I pub _pd: PhantomData<(TYPES, I)>, diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 40e50ea466..14ef94c48a 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -54,7 +54,7 @@ pub async fn add_network_message_task< >( task_reg: Arc, event_stream: Sender>, - channel: NET, + channel: Arc, ) { let net = channel.clone(); let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState { @@ -118,7 +118,7 @@ pub async fn add_network_event_task< task_reg: Arc, tx: Sender>, rx: Receiver>, - channel: NET, + channel: Arc, membership: TYPES::Membership, filter: fn(&HotShotEvent) -> bool, ) { @@ -177,8 +177,8 @@ pub async fn create_consensus_state>( consensus: handle.hotshot.get_consensus(), cur_view: TYPES::Time::new(0), vote_collector: None, - network: c_api.inner.networks.quorum_network.clone().into(), + network: c_api.inner.networks.quorum_network.clone(), membership: c_api.inner.memberships.vid_membership.clone().into(), public_key: c_api.public_key().clone(), private_key: c_api.private_key().clone(), @@ -271,7 +271,7 @@ pub async fn add_upgrade_task>( api: c_api.clone(), cur_view: TYPES::Time::new(0), quorum_membership: c_api.inner.memberships.quorum_membership.clone().into(), - quorum_network: c_api.inner.networks.quorum_network.clone().into(), + quorum_network: c_api.inner.networks.quorum_network.clone(), should_vote: |_upgrade_proposal| false, vote_collector: None.into(), public_key: c_api.public_key().clone(), @@ -297,7 +297,7 @@ pub async fn add_da_task>( api: c_api.clone(), consensus: handle.hotshot.get_consensus(), da_membership: c_api.inner.memberships.da_membership.clone().into(), - da_network: c_api.inner.networks.da_network.clone().into(), + da_network: c_api.inner.networks.da_network.clone(), quorum_membership: c_api.inner.memberships.quorum_membership.clone().into(), cur_view: TYPES::Time::new(0), vote_collector: None.into(), @@ -327,7 +327,7 @@ pub async fn add_transaction_task> transactions: Arc::default(), seen_transactions: HashSet::new(), cur_view: TYPES::Time::new(0), - network: c_api.inner.networks.quorum_network.clone().into(), + network: c_api.inner.networks.quorum_network.clone(), membership: c_api.inner.memberships.quorum_membership.clone().into(), public_key: c_api.public_key().clone(), private_key: c_api.private_key().clone(), @@ -351,7 +351,7 @@ pub async fn add_view_sync_task>( let view_sync_state = ViewSyncTaskState { current_view: TYPES::Time::new(0), next_view: TYPES::Time::new(0), - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.view_sync_membership.clone().into(), public_key: api.public_key().clone(), private_key: api.private_key().clone(), diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index 1671c5c145..48a2669493 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -13,10 +13,12 @@ pub use storage::{Result as StorageResult, Storage}; pub mod implementations { pub use super::{ networking::{ - combined_network::{calculate_hash_of, Cache, CombinedNetworks}, + combined_network::{ + calculate_hash_of, Cache, CombinedNetworks, UnderlyingCombinedNetworks, + }, libp2p_network::{Libp2pNetwork, PeerInfoVec}, - memory_network::{MasterMap, MemoryCommChannel, MemoryNetwork}, - web_server_network::{WebCommChannel, WebServerNetwork}, + memory_network::{MasterMap, MemoryNetwork}, + web_server_network::WebServerNetwork, NetworkingMetricsValue, }, storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage, diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index df70f77be9..7f1f84ee18 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -454,17 +454,3 @@ impl ConnectedNetwork for Memory boxed_sync(closure) } } - -/// memory identity communication channel -#[derive(Clone, Debug)] -pub struct MemoryCommChannel( - Arc, TYPES::SignatureKey>>, -); - -impl MemoryCommChannel { - /// create new communication channel - #[must_use] - pub fn new(network: Arc, TYPES::SignatureKey>>) -> Self { - Self(network) - } -} diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index c3efb8f9ac..0ddb46c2e3 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -51,18 +51,6 @@ use tracing::{debug, error, info, warn}; /// convenience alias alias for the result of getting transactions from the web server pub type TxnResult = Result>)>, ClientError>; -/// Represents the communication channel abstraction for the web server -#[derive(Clone, Debug)] -pub struct WebCommChannel(Arc>); - -impl WebCommChannel { - /// Create new communication channel - #[must_use] - pub fn new(network: Arc>) -> Self { - Self(network) - } -} - /// # Note /// /// This function uses `DefaultHasher` instead of cryptographic hash functions like SHA-256 because of an `AsRef` requirement. diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index b38839085b..17299d554a 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ events::{HotShotEvent, HotShotTaskCompleted}, helpers::broadcast_event, @@ -186,7 +188,7 @@ pub struct NetworkEventTaskState< COMMCHANNEL: ConnectedNetwork, TYPES::SignatureKey>, > { /// comm channel - pub channel: COMMCHANNEL, + pub channel: Arc, /// view number pub view: TYPES::Time, /// membership for the channel diff --git a/crates/testing/src/test_launcher.rs b/crates/testing/src/test_launcher.rs index d0616d8e2d..d6196b86d1 100644 --- a/crates/testing/src/test_launcher.rs +++ b/crates/testing/src/test_launcher.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use hotshot::traits::{NodeImplementation, TestableNodeImplementation}; use hotshot_types::{ + message::Message, traits::{network::ConnectedNetwork, node_implementation::NodeType}, HotShotConfig, }; @@ -10,8 +11,8 @@ use super::{test_builder::TestMetadata, test_runner::TestRunner}; /// convience type alias for the networks available pub type Networks = ( - >::QuorumNetwork, - >::QuorumNetwork, + Arc<>::QuorumNetwork>, + Arc<>::QuorumNetwork>, ); /// Wrapper for a function that takes a `node_id` and returns an instance of `T`. diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 19015f0129..cd35f8b222 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -20,7 +20,6 @@ use hotshot::{traits::TestableNodeImplementation, HotShotInitializer, SystemCont use hotshot_constants::EVENT_CHANNEL_SIZE; use hotshot_task::task::{Task, TaskRegistry, TestTask}; -use hotshot_types::traits::{network::ConnectedNetwork, node_implementation::NodeImplementation}; use hotshot_types::{ consensus::ConsensusMetricsValue, traits::{ @@ -29,6 +28,10 @@ use hotshot_types::{ }, HotShotConfig, ValidatorConfig, }; +use hotshot_types::{ + message::Message, + traits::{network::ConnectedNetwork, node_implementation::NodeImplementation}, +}; use std::{ collections::{BTreeMap, HashMap, HashSet}, marker::PhantomData, diff --git a/crates/testing/tests/da_task.rs b/crates/testing/tests/da_task.rs index 33b40ff0c7..4510cd3797 100644 --- a/crates/testing/tests/da_task.rs +++ b/crates/testing/tests/da_task.rs @@ -98,7 +98,7 @@ async fn test_da_task() { api: api.clone(), consensus: handle.hotshot.get_consensus(), da_membership: api.inner.memberships.da_membership.clone().into(), - da_network: api.inner.networks.da_network.clone().into(), + da_network: api.inner.networks.da_network.clone(), quorum_membership: api.inner.memberships.quorum_membership.clone().into(), cur_view: ViewNumber::new(0), vote_collector: None.into(), diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index ffe5d9d0ee..49c9a4081e 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_compatibility_layer::logging::setup_logging; use hotshot::traits::election::static_committee::{GeneralStaticCommittee, StaticElectionConfig}; use hotshot::traits::implementations::{ - MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, + MasterMap, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, }; use hotshot::traits::NodeImplementation; use hotshot::types::SignatureKey; @@ -61,10 +61,10 @@ impl NodeType for Test { pub struct TestImpl {} pub type ThisMembership = GeneralStaticCommittee::SignatureKey>; -pub type DANetwork = MemoryCommChannel; -pub type QuorumNetwork = MemoryCommChannel; -pub type ViewSyncNetwork = MemoryCommChannel; -pub type VIDNetwork = MemoryCommChannel; +pub type DANetwork = MemoryNetwork, ::SignatureKey>; +pub type QuorumNetwork = MemoryNetwork, ::SignatureKey>; +pub type ViewSyncNetwork = MemoryNetwork, ::SignatureKey>; +pub type VIDNetwork = MemoryNetwork, ::SignatureKey>; impl NodeImplementation for TestImpl { type Storage = MemoryStorage; diff --git a/crates/testing/tests/vid_task.rs b/crates/testing/tests/vid_task.rs index eadfaf4e84..6ca6366abe 100644 --- a/crates/testing/tests/vid_task.rs +++ b/crates/testing/tests/vid_task.rs @@ -105,7 +105,7 @@ async fn test_vid_task() { consensus: handle.hotshot.get_consensus(), cur_view: ViewNumber::new(0), vote_collector: None, - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.vid_membership.clone().into(), public_key: *api.public_key(), private_key: api.private_key().clone(), diff --git a/crates/testing/tests/view_sync_task.rs b/crates/testing/tests/view_sync_task.rs index fd75e5944f..2fa93f6972 100644 --- a/crates/testing/tests/view_sync_task.rs +++ b/crates/testing/tests/view_sync_task.rs @@ -55,7 +55,7 @@ async fn test_view_sync_task() { let view_sync_state = ViewSyncTaskState { current_view: ViewNumber::new(0), next_view: ViewNumber::new(0), - network: api.inner.networks.quorum_network.clone().into(), + network: api.inner.networks.quorum_network.clone(), membership: api.inner.memberships.view_sync_membership.clone().into(), public_key: *api.public_key(), private_key: api.private_key().clone(), diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 2421e1e329..d3e84da94d 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -14,7 +14,7 @@ compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled use super::{node_implementation::NodeType, signature_key::SignatureKey}; use crate::{ data::ViewNumber, - message::{Message, MessagePurpose}, + message::MessagePurpose, BoxSyncFuture, }; use async_compatibility_layer::channel::UnboundedSendError; @@ -226,76 +226,6 @@ pub trait ViewMessage { fn purpose(&self) -> MessagePurpose; } -/// API for interacting directly with a consensus committee -/// intended to be implemented for both DA and for validating consensus committees -#[async_trait] -pub trait CommunicationChannel: Clone + Debug + Send + Sync + 'static { - /// Underlying Network implementation's type - type NETWORK; - /// Blocks until node is successfully initialized - /// into the network - async fn wait_for_ready(&self); - - /// Pauses the underlying network - fn pause(&self); - - /// Resumes the underlying network - fn resume(&self); - - /// checks if the network is ready - /// nonblocking - async fn is_ready(&self) -> bool; - - /// Shut down this network. Afterwards this network should no longer be used. - /// - /// This should also cause other functions to immediately return with a [`NetworkError`] - fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()> - where - 'a: 'b, - Self: 'b; - - /// broadcast message to those listening on the communication channel - /// blocking - async fn broadcast_message( - &self, - message: Message, - election: &TYPES::Membership, - ) -> Result<(), NetworkError>; - - /// Sends a direct message to a specific node - /// blocking - async fn direct_message( - &self, - message: Message, - recipient: TYPES::SignatureKey, - ) -> Result<(), NetworkError>; - - /// Moves out the entire queue of received messages of 'transmit_type` - /// - /// Will unwrap the underlying `NetworkMessage` - /// blocking - fn recv_msgs<'a, 'b>( - &'a self, - transmit_type: TransmitType, - ) -> BoxSyncFuture<'b, Result>, NetworkError>> - where - 'a: 'b, - Self: 'b; - - /// queues looking up a node - async fn queue_node_lookup( - &self, - _view_number: ViewNumber, - _pk: TYPES::SignatureKey, - ) -> Result<(), UnboundedSendError>> { - Ok(()) - } - - /// Injects consensus data such as view number into the networking implementation - /// blocking - async fn inject_consensus_info(&self, _event: ConsensusIntentEvent) {} -} - /// represents a networking implmentration /// exposes low level API for interacting with a network /// intended to be implemented for libp2p, the centralized server, @@ -380,13 +310,6 @@ pub trait TestableNetworkingImplementation { /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`. fn in_flight_message_count(&self) -> Option; } -/// Describes additional functionality needed by the test communication channel -pub trait TestableChannelImplementation: CommunicationChannel { - /// generates the `CommunicationChannel` given it's associated network type - #[allow(clippy::type_complexity)] - fn generate_network( - ) -> Box>::NETWORK>) -> Self + 'static>; -} /// Changes that can occur in the network #[derive(Debug)] From f0051f0011d840bc316b6f45be757f8b86db2427 Mon Sep 17 00:00:00 2001 From: = Date: Tue, 13 Feb 2024 19:09:09 -0500 Subject: [PATCH 07/11] wip gen both networks at once --- .../src/traits/networking/combined_network.rs | 7 +- .../src/traits/networking/libp2p_network.rs | 7 +- .../src/traits/networking/memory_network.rs | 7 +- .../traits/networking/web_server_network.rs | 120 +++++++++++------- crates/types/src/traits/network.rs | 14 +- .../types/src/traits/node_implementation.rs | 23 +--- 6 files changed, 98 insertions(+), 80 deletions(-) diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 82e00e4776..ab86560181 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -159,7 +159,7 @@ impl TestableNetworkingImplementation for CombinedNetwor da_committee_size: usize, is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { let generators = ( TestableNetworkingImplementation for CombinedNetwor ); Box::new(move |node_id| { let networks = UnderlyingCombinedNetworks(generators.0(node_id), generators.1(node_id)); - Self { + let net = Self { networks: Arc::new(networks), message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), primary_down: Arc::new(AtomicU64::new(0)), - } + }; + (net.clone().into(), net.into()) }) } diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index d2bd9e45b8..d7111c0cea 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -164,7 +164,7 @@ where da_committee_size: usize, _is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { assert!( da_committee_size <= expected_node_count, "DA committee size must be less than or equal to total # nodes" @@ -250,7 +250,7 @@ where let keys = all_keys.clone(); let da = da_keys.clone(); let reliability_config_dup = reliability_config.clone(); - async_block_on(async move { + let net = Arc::new(async_block_on(async move { match Libp2pNetwork::new( NetworkingMetricsValue::default(), config, @@ -271,7 +271,8 @@ where panic!("Failed to create libp2p network: {err:?}"); } } - }) + })); + (net.clone(), net) } }) } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 7f1f84ee18..10f8803f6a 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -247,18 +247,19 @@ impl TestableNetworkingImplementation _da_committee_size: usize, _is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static> { + ) -> Box (Arc, Arc) + 'static> { let master: Arc<_> = MasterMap::new(); // We assign known_nodes' public key and stake value rather than read from config file since it's a test Box::new(move |node_id| { let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); - MemoryNetwork::new( + let net = MemoryNetwork::new( pubkey, NetworkingMetricsValue::default(), master.clone(), reliability_config.clone(), - ) + ); + (net.clone().into(), net.into()) }) } diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 0ddb46c2e3..8088c2c533 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -696,6 +696,58 @@ impl WebServerNetwork { }; Ok(network_msg) } + fn single_generator( + expected_node_count: usize, + _num_bootstrap: usize, + _network_id: usize, + _da_committee_size: usize, + is_da: bool, + _reliability_config: Option>, + ) -> Box Self + 'static> { + let (server_shutdown_sender, server_shutdown) = oneshot(); + let sender = Arc::new(server_shutdown_sender); + + // pick random, unused port + let port = portpicker::pick_unused_port().expect("Could not find an open port"); + + let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); + info!("Launching web server on port {port}"); + // Start web server + async_spawn(async { + match hotshot_web_server::run_web_server::( + Some(server_shutdown), + url, + ) + .await + { + Ok(()) => error!("Web server future finished unexpectedly"), + Err(e) => error!("Web server task failed: {e}"), + } + }); + + // We assign known_nodes' public key and stake value rather than read from config file since it's a test + let known_nodes = (0..expected_node_count as u64) + .map(|id| { + TYPES::SignatureKey::from_private( + &TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], id).1, + ) + }) + .collect::>(); + + // Start each node's web server client + Box::new(move |id| { + let sender = Arc::clone(&sender); + let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); + let mut network = WebServerNetwork::create( + url, + Duration::from_millis(100), + known_nodes[usize::try_from(id).unwrap()].clone(), + is_da, + ); + network.server_shutdown_signal = Some(sender); + network + }) + } } #[async_trait] @@ -1225,54 +1277,32 @@ impl ConnectedNetwork, TYPES::Signatur impl TestableNetworkingImplementation for WebServerNetwork { fn generator( expected_node_count: usize, - _num_bootstrap: usize, - _network_id: usize, - _da_committee_size: usize, + num_bootstrap: usize, + network_id: usize, + da_committee_size: usize, is_da: bool, - _reliability_config: Option>, - ) -> Box Self + 'static> { - let (server_shutdown_sender, server_shutdown) = oneshot(); - let sender = Arc::new(server_shutdown_sender); - - // pick random, unused port - let port = portpicker::pick_unused_port().expect("Could not find an open port"); - - let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); - info!("Launching web server on port {port}"); - // Start web server - async_spawn(async { - match hotshot_web_server::run_web_server::( - Some(server_shutdown), - url, - ) - .await - { - Ok(()) => error!("Web server future finished unexpectedly"), - Err(e) => error!("Web server task failed: {e}"), - } - }); - - // We assign known_nodes' public key and stake value rather than read from config file since it's a test - let known_nodes = (0..expected_node_count as u64) - .map(|id| { - TYPES::SignatureKey::from_private( - &TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], id).1, - ) - }) - .collect::>(); - + reliability_config: Option>, + ) -> Box (Arc, Arc) + 'static> { // Start each node's web server client Box::new(move |id| { - let sender = Arc::clone(&sender); - let url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap(); - let mut network = WebServerNetwork::create( - url, - Duration::from_millis(100), - known_nodes[usize::try_from(id).unwrap()].clone(), - is_da, - ); - network.server_shutdown_signal = Some(sender); - network + ( + Self::single_generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + is_da, + reliability_config, + ), + Self::single_generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + is_da, + reliability_config, + ), + ) }) } diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index d3e84da94d..d659fce2ba 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -12,11 +12,7 @@ use tokio::time::error::Elapsed as TimeoutError; #[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."} use super::{node_implementation::NodeType, signature_key::SignatureKey}; -use crate::{ - data::ViewNumber, - message::MessagePurpose, - BoxSyncFuture, -}; +use crate::{data::ViewNumber, message::MessagePurpose, BoxSyncFuture}; use async_compatibility_layer::channel::UnboundedSendError; use async_trait::async_trait; use rand::{ @@ -294,8 +290,12 @@ pub trait ConnectedNetwork: } /// Describes additional functionality needed by the test network implementation -pub trait TestableNetworkingImplementation { +pub trait TestableNetworkingImplementation +where + Self: Sized, +{ /// generates a network given an expected node count + #[allow(clippy::type_complexity)] fn generator( expected_node_count: usize, num_bootstrap: usize, @@ -303,7 +303,7 @@ pub trait TestableNetworkingImplementation { da_committee_size: usize, is_da: bool, reliability_config: Option>, - ) -> Box Self + 'static>; + ) -> Box (Arc, Arc) + 'static>; /// Get the number of messages in-flight. /// diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 02ddf7f5fe..f3199a2bd6 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -156,29 +156,14 @@ where da_committee_size: usize, reliability_config: Option>, ) -> Box (Arc, Arc)> { - let quorum_generator = - >::generator( - expected_node_count, - num_bootstrap, - 0, - da_committee_size, - false, - reliability_config.clone(), - ); - let da_generator = >::generator( + >::generator( expected_node_count, num_bootstrap, - 1, + 0, da_committee_size, false, - reliability_config, - ); - - Box::new(move |id| { - let quorum = Arc::new(quorum_generator(id)); - let da = Arc::new(da_generator(id)); - (quorum, da) - }) + reliability_config.clone(), + ) } } From 89dde5d717810e73d7d3e1a53b7bc09bc67a0414 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 14 Feb 2024 03:20:11 -0500 Subject: [PATCH 08/11] gen both networks at once --- .../src/traits/networking/combined_network.rs | 22 ++++++++-- .../traits/networking/web_server_network.rs | 43 +++++++++++-------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index ab86560181..2eb47a7b2f 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -181,13 +181,27 @@ impl TestableNetworkingImplementation for CombinedNetwor ) ); Box::new(move |node_id| { - let networks = UnderlyingCombinedNetworks(generators.0(node_id), generators.1(node_id)); - let net = Self { - networks: Arc::new(networks), + let (quorum_web, da_web) = generators.0(node_id); + let (quorum_p2p, da_p2p) = generators.1(node_id); + let da_networks = UnderlyingCombinedNetworks( + Arc::into_inner(da_web).unwrap(), + Arc::into_inner(da_p2p).unwrap(), + ); + let quorum_networks = UnderlyingCombinedNetworks( + Arc::into_inner(quorum_web).unwrap(), + Arc::into_inner(quorum_p2p).unwrap(), + ); + let quorum_net = Self { + networks: Arc::new(quorum_networks), message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), primary_down: Arc::new(AtomicU64::new(0)), }; - (net.clone().into(), net.into()) + let da_net = Self { + networks: Arc::new(da_networks), + message_cache: Arc::new(RwLock::new(Cache::new(COMBINED_NETWORK_CACHE_SIZE))), + primary_down: Arc::new(AtomicU64::new(0)), + }; + (quorum_net.into(), da_net.into()) }) } diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 8088c2c533..13de1972a1 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -696,13 +696,16 @@ impl WebServerNetwork { }; Ok(network_msg) } + + /// Generates a single webserver network, for use in tests + #[cfg(feature = "hotshot-testing")] fn single_generator( expected_node_count: usize, _num_bootstrap: usize, _network_id: usize, _da_committee_size: usize, is_da: bool, - _reliability_config: Option>, + _reliability_config: &Option>, ) -> Box Self + 'static> { let (server_shutdown_sender, server_shutdown) = oneshot(); let sender = Arc::new(server_shutdown_sender); @@ -1280,28 +1283,32 @@ impl TestableNetworkingImplementation for WebServerNetwo num_bootstrap: usize, network_id: usize, da_committee_size: usize, - is_da: bool, + _is_da: bool, reliability_config: Option>, ) -> Box (Arc, Arc) + 'static> { + let da_gen = Self::single_generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + true, + &reliability_config, + ); + let quorum_gen = Self::single_generator( + expected_node_count, + num_bootstrap, + network_id, + da_committee_size, + false, + &reliability_config, + ); // Start each node's web server client Box::new(move |id| { ( - Self::single_generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ), - Self::single_generator( - expected_node_count, - num_bootstrap, - network_id, - da_committee_size, - is_da, - reliability_config, - ), + quorum_gen(id) + .into(), + da_gen(id) + .into(), ) }) } From b4a3e0d0f27c792c127b4cad48242cce3b668f52 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 14 Feb 2024 03:52:06 -0500 Subject: [PATCH 09/11] fix lint error --- crates/hotshot/src/traits/networking/web_server_network.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 48cac7d31d..861e55d3cd 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -698,7 +698,6 @@ impl WebServerNetwork { } /// Generates a single webserver network, for use in tests - #[cfg(feature = "hotshot-testing")] fn single_generator( expected_node_count: usize, _num_bootstrap: usize, From f6aeed6e5f6b51b81ef4a0450a11849e8455febb Mon Sep 17 00:00:00 2001 From: = Date: Wed, 14 Feb 2024 06:10:59 -0500 Subject: [PATCH 10/11] fix combined test --- crates/hotshot/src/traits/networking/combined_network.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 2eb47a7b2f..3ecb6b99f5 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -184,12 +184,12 @@ impl TestableNetworkingImplementation for CombinedNetwor let (quorum_web, da_web) = generators.0(node_id); let (quorum_p2p, da_p2p) = generators.1(node_id); let da_networks = UnderlyingCombinedNetworks( - Arc::into_inner(da_web).unwrap(), - Arc::into_inner(da_p2p).unwrap(), + Arc::>::into_inner(da_web).unwrap(), + Arc::, TYPES::SignatureKey>>::unwrap_or_clone(da_p2p), ); let quorum_networks = UnderlyingCombinedNetworks( - Arc::into_inner(quorum_web).unwrap(), - Arc::into_inner(quorum_p2p).unwrap(), + Arc::>::into_inner(quorum_web).unwrap(), + Arc::, TYPES::SignatureKey>>::unwrap_or_clone(quorum_p2p), ); let quorum_net = Self { networks: Arc::new(quorum_networks), From 7ba6b49ad32f5b9c0d7b3e8d623d2e2a70bbbf1b Mon Sep 17 00:00:00 2001 From: = Date: Wed, 14 Feb 2024 06:13:18 -0500 Subject: [PATCH 11/11] lint --- crates/hotshot/src/traits/networking/combined_network.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 3ecb6b99f5..d401ba15e8 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -189,7 +189,9 @@ impl TestableNetworkingImplementation for CombinedNetwor ); let quorum_networks = UnderlyingCombinedNetworks( Arc::>::into_inner(quorum_web).unwrap(), - Arc::, TYPES::SignatureKey>>::unwrap_or_clone(quorum_p2p), + Arc::, TYPES::SignatureKey>>::unwrap_or_clone( + quorum_p2p, + ), ); let quorum_net = Self { networks: Arc::new(quorum_networks),