diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index cefe1ecd4a..5ef1926478 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -28,7 +28,7 @@ use hotshot_types::{ use hotshot_types::{ message::Messages, traits::{ - network::{ConsensusIntentEvent, TransmitType}, + network::ConsensusIntentEvent, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, }, }; @@ -63,12 +63,12 @@ pub async fn add_network_message_task< // https://github.com/EspressoSystems/HotShot/issues/2377 let network = net.clone(); let mut state = network_state.clone(); - let broadcast_handle = async_spawn(async move { + let handle = async_spawn(async move { loop { - let msgs = match network.recv_msgs(TransmitType::Broadcast).await { + let msgs = match network.recv_msgs().await { Ok(msgs) => Messages(msgs), Err(err) => { - error!("failed to receive broadcast messages: {err}"); + error!("failed to receive messages: {err}"); // return zero messages so we sleep and try again Messages(vec![]) @@ -82,29 +82,7 @@ pub async fn add_network_message_task< } } }); - let network = net.clone(); - let mut state = network_state.clone(); - let direct_handle = async_spawn(async move { - loop { - let msgs = match network.recv_msgs(TransmitType::Direct).await { - Ok(msgs) => Messages(msgs), - Err(err) => { - error!("failed to receive direct messages: {err}"); - - // return zero messages so we sleep and try again - Messages(vec![]) - } - }; - if msgs.0.is_empty() { - // TODO: Stop sleeping here: https://github.com/EspressoSystems/HotShot/issues/2558 - async_sleep(Duration::from_millis(100)).await; - } else { - state.handle_messages(msgs.0).await; - } - } - }); - task_reg.register(direct_handle).await; - task_reg.register(broadcast_handle).await; + task_reg.register(handle).await; } /// Add the network task to handle events and send messages. pub async fn add_network_event_task< diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index f372f1c111..61482f2bd9 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -26,10 +26,8 @@ pub struct NetworkingMetricsValue { #[allow(dead_code)] /// A [`Gauge`] which tracks how many peers are connected pub connected_peers: Box, - /// A [`Counter`] which tracks how many messages have been received directly - pub incoming_direct_message_count: Box, - /// A [`Counter`] which tracks how many messages have been received by broadcast - pub incoming_broadcast_message_count: Box, + /// A [`Counter`] which tracks how many messages have been received + pub incoming_message_count: Box, /// A [`Counter`] which tracks how many messages have been send directly pub outgoing_direct_message_count: Box, /// A [`Counter`] which tracks how many messages have been send by broadcast @@ -163,10 +161,8 @@ impl NetworkingMetricsValue { pub fn new(metrics: &dyn Metrics) -> Self { Self { connected_peers: metrics.create_gauge(String::from("connected_peers"), None), - incoming_direct_message_count: metrics - .create_counter(String::from("incoming_direct_message_count"), None), - incoming_broadcast_message_count: metrics - .create_counter(String::from("incoming_broadcast_message_count"), None), + incoming_message_count: metrics + .create_counter(String::from("incoming_message_count"), None), outgoing_direct_message_count: metrics .create_counter(String::from("outgoing_direct_message_count"), None), outgoing_broadcast_message_count: metrics diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index bb064e6b34..c064c635ab 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -26,7 +26,7 @@ use hotshot_types::{ data::ViewNumber, message::Message, traits::{ - network::{ConnectedNetwork, ConsensusIntentEvent, TransmitType}, + network::{ConnectedNetwork, ConsensusIntentEvent}, node_implementation::NodeType, }, BoxSyncFuture, @@ -319,7 +319,6 @@ impl ConnectedNetwork, TYPES::SignatureKey> fn recv_msgs<'a, 'b>( &'a self, - transmit_type: TransmitType, ) -> BoxSyncFuture<'b, Result>, NetworkError>> where 'a: 'b, @@ -328,8 +327,8 @@ impl ConnectedNetwork, TYPES::SignatureKey> // recv on both networks because nodes may be accessible only on either. discard duplicates // TODO: improve this algorithm: https://github.com/EspressoSystems/HotShot/issues/2089 let closure = async move { - let mut primary_msgs = self.primary().recv_msgs(transmit_type).await?; - let mut secondary_msgs = self.secondary().recv_msgs(transmit_type).await?; + let mut primary_msgs = self.primary().recv_msgs().await?; + let mut secondary_msgs = self.secondary().recv_msgs().await?; primary_msgs.append(secondary_msgs.as_mut()); diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 03c570de70..aaac1fb152 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -22,7 +22,7 @@ use hotshot_types::{ traits::{ network::{ ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError, - NetworkMsg, TransmitType, ViewMessage, + NetworkMsg, ViewMessage, }, node_implementation::{ConsensusTime, NodeType}, signature_key::SignatureKey, @@ -97,13 +97,9 @@ struct Libp2pNetworkInner { /// handle to control the network handle: Arc>, /// map of known replica peer ids to public keys - broadcast_recv: UnboundedReceiver, + receiver: UnboundedReceiver, /// Sender for broadcast messages - broadcast_send: UnboundedSender, - /// Sender for direct messages (only used for sending messages back to oneself) - direct_send: UnboundedSender, - /// Receiver for direct messages - direct_recv: UnboundedReceiver, + sender: UnboundedSender, /// Sender for node lookup (relevant view number, key of node) (None for shutdown) node_lookup_send: UnboundedSender>, /// this is really cheating to enable local tests @@ -351,18 +347,15 @@ impl Libp2pNetwork { // unbounded channels may not be the best choice (spammed?) // if bounded figure out a way to log dropped msgs - let (direct_send, direct_recv) = unbounded(); - let (broadcast_send, broadcast_recv) = unbounded(); + let (sender, receiver) = unbounded(); let (node_lookup_send, node_lookup_recv) = unbounded(); let mut result = Libp2pNetwork { inner: Arc::new(Libp2pNetworkInner { handle: network_handle, - broadcast_recv, - direct_send: direct_send.clone(), - direct_recv, + receiver, + sender: sender.clone(), pk, - broadcast_send: broadcast_send.clone(), bootstrap_addrs_len, bootstrap_addrs, is_ready: Arc::new(AtomicBool::new(false)), @@ -383,7 +376,7 @@ impl Libp2pNetwork { }), }; - result.handle_event_generator(direct_send, broadcast_send); + result.handle_event_generator(sender); result.spawn_node_lookup(node_lookup_recv); result.spawn_connect(id); @@ -514,14 +507,13 @@ impl Libp2pNetwork { async fn handle_recvd_events_0_1( &self, msg: NetworkEvent, - direct_send: &UnboundedSender, - broadcast_send: &UnboundedSender, + sender: &UnboundedSender, ) -> Result<(), NetworkError> { match msg { GossipMsg(msg, _) => { let result: Result = bincode_opts().deserialize(&msg); if let Ok(result) = result { - broadcast_send + sender .send(result) .await .map_err(|_| NetworkError::ChannelSend)?; @@ -532,7 +524,7 @@ impl Libp2pNetwork { .deserialize(&msg) .context(FailedToSerializeSnafu); if let Ok(result) = result { - direct_send + sender .send(result) .await .map_err(|_| NetworkError::ChannelSend)?; @@ -568,8 +560,7 @@ impl Libp2pNetwork { /// terminates on shut down of network fn handle_event_generator( &self, - direct_send: UnboundedSender, - broadcast_send: UnboundedSender, + sender: UnboundedSender, ) { let handle = self.clone(); let is_bootstrapped = self.inner.is_bootstrapped.clone(); @@ -584,7 +575,7 @@ impl Libp2pNetwork { match message_version { Some(VERSION_0_1) => { let _ = handle - .handle_recvd_events_0_1(message, &direct_send, &broadcast_send) + .handle_recvd_events_0_1(message, &sender) .await; } Some(version) => { @@ -675,7 +666,7 @@ impl ConnectedNetwork for Libp2p if recipients.contains(&self.inner.pk) { // send to self self.inner - .broadcast_send + .sender .send(message.clone()) .await .map_err(|_| NetworkError::ShutDown)?; @@ -763,7 +754,7 @@ impl ConnectedNetwork for Libp2p if recipient == self.inner.pk { // panic if we already shut down? self.inner - .direct_send + .sender .send(message) .await .map_err(|_x| NetworkError::ShutDown)?; @@ -830,7 +821,6 @@ impl ConnectedNetwork for Libp2p #[instrument(name = "Libp2pNetwork::recv_msgs", skip_all)] fn recv_msgs<'a, 'b>( &'a self, - transmit_type: TransmitType, ) -> BoxSyncFuture<'b, Result, NetworkError>> where 'a: 'b, @@ -840,40 +830,17 @@ impl ConnectedNetwork for Libp2p if self.inner.handle.is_killed() { Err(NetworkError::ShutDown) } else { - match transmit_type { - TransmitType::Direct => { - let result = self - .inner - .direct_recv - .drain_at_least_one() - .await - .map_err(|_x| NetworkError::ShutDown)?; - self.inner - .metrics - .incoming_direct_message_count - .add(result.len()); - Ok(result) - } - TransmitType::Broadcast => { - let result = self - .inner - .broadcast_recv - .drain_at_least_one() - .await - .map_err(|_x| NetworkError::ShutDown)?; - self.inner - .metrics - .incoming_direct_message_count - .add(result.len()); - Ok(result) - } - TransmitType::DACommitteeBroadcast => { - error!("Received DACommitteeBroadcast, it should have not happened."); - Err(NetworkError::Libp2p { - source: NetworkNodeHandleError::Killed, - }) - } - } + let result = self + .inner + .receiver + .drain_at_least_one() + .await + .map_err(|_x| NetworkError::ShutDown)?; + self.inner + .metrics + .incoming_message_count + .add(result.len()); + Ok(result) } }; boxed_sync(closure) diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 85371b584d..f7aaf04392 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -6,19 +6,18 @@ use super::{FailedToSerializeSnafu, NetworkError, NetworkReliability, NetworkingMetricsValue}; use async_compatibility_layer::{ art::async_spawn, - channel::{bounded, Receiver, SendError, Sender}, + channel::{bounded, Receiver, SendError, Sender, BoundedStream}, }; use async_lock::{Mutex, RwLock}; use async_trait::async_trait; use bincode::Options; use dashmap::DashMap; -use futures::StreamExt; -use hotshot_types::traits::network::MemoryNetworkError; +use futures::{StreamExt}; use hotshot_types::{ boxed_sync, message::Message, traits::{ - network::{ConnectedNetwork, NetworkMsg, TestableNetworkingImplementation, TransmitType}, + network::{ConnectedNetwork, NetworkMsg, TestableNetworkingImplementation}, node_implementation::NodeType, signature_key::SignatureKey, }, @@ -61,25 +60,13 @@ impl MasterMap { } } -/// Internal enum for combining streams -enum Combo { - /// Direct message - Direct(T), - /// Broadcast message - Broadcast(T), -} - /// Internal state for a `MemoryNetwork` instance #[derive(Debug)] struct MemoryNetworkInner { - /// Input for broadcast messages - broadcast_input: RwLock>>>, - /// Input for direct messages - direct_input: RwLock>>>, - /// Output for broadcast messages - broadcast_output: Mutex>, - /// Output for direct messages - direct_output: Mutex>, + /// Input for messages + input: RwLock>>>, + /// Output for messages + output: Mutex>, /// The master map master_map: Arc>, @@ -124,68 +111,36 @@ impl MemoryNetwork { reliability_config: Option>, ) -> MemoryNetwork { info!("Attaching new MemoryNetwork"); - let (broadcast_input, broadcast_task_recv) = bounded(128); - let (direct_input, direct_task_recv) = bounded(128); - let (broadcast_task_send, broadcast_output) = bounded(128); - let (direct_task_send, direct_output) = bounded(128); + let (input, task_recv) = bounded(128); + let (task_send, output) = bounded(128); let in_flight_message_count = AtomicUsize::new(0); trace!("Channels open, spawning background task"); async_spawn( async move { debug!("Starting background task"); - // direct input is right stream - let direct = direct_task_recv.into_stream().map(Combo::>::Direct); - // broadcast input is left stream - let broadcast = broadcast_task_recv - .into_stream() - .map(Combo::>::Broadcast); - // Combine the streams - let mut combined = futures::stream::select(direct, broadcast); + let mut task_stream: BoundedStream> = task_recv.into_stream(); trace!("Entering processing loop"); - while let Some(message) = combined.next().await { - match message { - Combo::Direct(vec) => { - trace!(?vec, "Incoming direct message"); - // Attempt to decode message - let x = bincode_opts().deserialize(&vec); - match x { - Ok(x) => { - let dts = direct_task_send.clone(); - let res = dts.send(x).await; - if res.is_ok() { - trace!("Passed message to output queue"); - } else { - error!("Output queue receivers are shutdown"); - } - } - Err(e) => { - warn!(?e, "Failed to decode incoming message, skipping"); - } + while let Some(vec) = task_stream.next().await { + trace!(?vec, "Incoming message"); + // Attempt to decode message + let x = bincode_opts().deserialize(&vec); + match x { + Ok(x) => { + let ts = task_send.clone(); + let res = ts.send(x).await; + if res.is_ok() { + trace!("Passed message to output queue"); + } else { + error!("Output queue receivers are shutdown"); } } - Combo::Broadcast(vec) => { - trace!(?vec, "Incoming broadcast message"); - // Attempt to decode message - let x = bincode_opts().deserialize(&vec); - match x { - Ok(x) => { - let bts = broadcast_task_send.clone(); - let res = bts.send(x).await; - if res.is_ok() { - trace!("Passed message to output queue"); - } else { - warn!("dropping packet!"); - } - } - Err(e) => { - warn!(?e, "Failed to decode incoming message, skipping"); - } - } + Err(e) => { + warn!(?e, "Failed to decode incoming message, skipping"); } } + warn!("Stream shutdown"); } - warn!("Stream shutdown"); } .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)), ); @@ -193,10 +148,8 @@ impl MemoryNetwork { trace!("Task spawned, creating MemoryNetwork"); let mn = MemoryNetwork { inner: Arc::new(MemoryNetworkInner { - broadcast_input: RwLock::new(Some(broadcast_input)), - direct_input: RwLock::new(Some(direct_input)), - broadcast_output: Mutex::new(broadcast_output), - direct_output: Mutex::new(direct_output), + input: RwLock::new(Some(input)), + output: Mutex::new(output), master_map: master_map.clone(), in_flight_message_count, metrics, @@ -209,26 +162,12 @@ impl MemoryNetwork { mn } - /// Send a [`Vec`] message to the inner `broadcast_input` - async fn broadcast_input(&self, message: Vec) -> Result<(), SendError>> { + /// Send a [`Vec`] message to the inner `input` + async fn input(&self, message: Vec) -> Result<(), SendError>> { self.inner .in_flight_message_count .fetch_add(1, Ordering::Relaxed); - let input = self.inner.broadcast_input.read().await; - if let Some(input) = &*input { - self.inner.metrics.outgoing_broadcast_message_count.add(1); - input.send(message).await - } else { - Err(SendError(message)) - } - } - - /// Send a [`Vec`] message to the inner `direct_input` - async fn direct_input(&self, message: Vec) -> Result<(), SendError>> { - self.inner - .in_flight_message_count - .fetch_add(1, Ordering::Relaxed); - let input = self.inner.direct_input.read().await; + let input = self.inner.input.read().await; if let Some(input) = &*input { self.inner.metrics.outgoing_direct_message_count.add(1); input.send(message).await @@ -295,8 +234,7 @@ impl ConnectedNetwork for Memory Self: 'b, { let closure = async move { - *self.inner.broadcast_input.write().await = None; - *self.inner.direct_input.write().await = None; + *self.inner.input.write().await = None; }; boxed_sync(closure) } @@ -328,7 +266,7 @@ impl ConnectedNetwork for Memory Arc::new(move |msg: Vec| { let node3 = (node2).clone(); boxed_sync(async move { - let _res = node3.broadcast_input(msg).await; + let _res = node3.input(msg).await; // NOTE we're dropping metrics here but this is only for testing // purposes. I think that should be okay }) @@ -337,7 +275,7 @@ impl ConnectedNetwork for Memory async_spawn(fut); } } else { - let res = node.broadcast_input(vec.clone()).await; + let res = node.input(vec.clone()).await; match res { Ok(()) => { self.inner.metrics.outgoing_broadcast_message_count.add(1); @@ -379,7 +317,7 @@ impl ConnectedNetwork for Memory Arc::new(move |msg: Vec| { let node2 = node.clone(); boxed_sync(async move { - let _res = node2.direct_input(msg).await; + let _res = node2.input(msg).await; // NOTE we're dropping metrics here but this is only for testing // purposes. I think that should be okay }) @@ -389,7 +327,7 @@ impl ConnectedNetwork for Memory } Ok(()) } else { - let res = node.direct_input(vec).await; + let res = node.input(vec).await; match res { Ok(()) => { self.inner.metrics.outgoing_direct_message_count.add(1); @@ -416,57 +354,28 @@ impl ConnectedNetwork for Memory #[instrument(name = "MemoryNetwork::recv_msgs", skip_all)] fn recv_msgs<'a, 'b>( &'a self, - transmit_type: TransmitType, ) -> BoxSyncFuture<'b, Result, NetworkError>> where 'a: 'b, Self: 'b, { let closure = async move { - match transmit_type { - TransmitType::Direct => { - let ret = self - .inner - .direct_output - .lock() - .await - .drain_at_least_one() - .await - .map_err(|_x| NetworkError::ShutDown)?; - self.inner - .in_flight_message_count - .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner - .metrics - .incoming_direct_message_count - .add(ret.len()); - Ok(ret) - } - TransmitType::Broadcast => { - let ret = self - .inner - .broadcast_output - .lock() - .await - .drain_at_least_one() - .await - .map_err(|_x| NetworkError::ShutDown)?; - self.inner - .in_flight_message_count - .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner - .metrics - .incoming_broadcast_message_count - .add(ret.len()); - Ok(ret) - } - TransmitType::DACommitteeBroadcast => { - error!("Received DACommitteeBroadcast, it should have not happened."); - Err(NetworkError::MemoryNetwork { - source: MemoryNetworkError::Stub, - }) - } - } + let ret = self + .inner + .output + .lock() + .await + .drain_at_least_one() + .await + .map_err(|_x| NetworkError::ShutDown)?; + self.inner + .in_flight_message_count + .fetch_sub(ret.len(), Ordering::Relaxed); + self.inner + .metrics + .incoming_message_count + .add(ret.len()); + Ok(ret) }; boxed_sync(closure) } diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 505c93bac9..93f48e0656 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -19,7 +19,7 @@ use hotshot_types::{ traits::{ network::{ ConnectedNetwork, ConsensusIntentEvent, NetworkError, NetworkMsg, - TestableNetworkingImplementation, TransmitType, WebServerNetworkError, + TestableNetworkingImplementation, WebServerNetworkError, }, node_implementation::NodeType, signature_key::SignatureKey, @@ -169,10 +169,8 @@ impl TaskMap { struct Inner { /// Our own key _own_key: TYPES::SignatureKey, - /// Queue for broadcasted messages - broadcast_poll_queue_0_1: Arc>>>>, - /// Queue for direct messages - direct_poll_queue_0_1: Arc>>>>, + /// Queue for messages + poll_queue_0_1: Arc>>>>, /// Client is running running: AtomicBool, /// The web server connection is ready @@ -215,7 +213,7 @@ impl Inner { /// * `first_tx_index` - the index of the first transaction received from the server in the latest batch. /// * `tx_index` - the last transaction index we saw from the web server. async fn handle_tx_0_1(&self, tx: Vec, first_tx_index: u64, tx_index: &mut u64) { - let broadcast_poll_queue = &self.broadcast_poll_queue_0_1; + let poll_queue = &self.poll_queue_0_1; if first_tx_index > *tx_index + 1 { debug!( "missed txns from {} to {}", @@ -231,7 +229,7 @@ impl Inner { let deserialized_message = RecvMsg { message: Some(deserialized_message_inner), }; - broadcast_poll_queue + poll_queue .write() .await .push(deserialized_message.clone()); @@ -257,8 +255,7 @@ impl Inner { seen_proposals: &mut LruCache, seen_view_sync_certificates: &mut LruCache, ) -> bool { - let broadcast_poll_queue = &self.broadcast_poll_queue_0_1; - let direct_poll_queue = &self.direct_poll_queue_0_1; + let poll_queue = &self.poll_queue_0_1; if let Ok(deserialized_message_inner) = bincode::deserialize::>(&message) { let deserialized_message = RecvMsg { message: Some(deserialized_message_inner), @@ -269,7 +266,7 @@ impl Inner { } MessagePurpose::Proposal => { let proposal = deserialized_message.clone(); - broadcast_poll_queue.write().await.push(proposal); + poll_queue.write().await.push(proposal); // Only pushing the first proposal since we will soon only be allowing 1 proposal per view return true; @@ -279,7 +276,7 @@ impl Inner { let hash = hash(&proposal); // Only allow unseen proposals to be pushed to the queue if seen_proposals.put(hash, ()).is_none() { - broadcast_poll_queue.write().await.push(proposal); + poll_queue.write().await.push(proposal); } // Only pushing the first proposal since we will soon only be allowing 1 proposal per view @@ -289,14 +286,14 @@ impl Inner { let cert = deserialized_message.clone(); let hash = hash(&cert); if seen_view_sync_certificates.put(hash, ()).is_none() { - broadcast_poll_queue.write().await.push(cert); + poll_queue.write().await.push(cert); } return false; } - MessagePurpose::Vote | MessagePurpose::ViewSyncVote => { + MessagePurpose::Vote | MessagePurpose::ViewSyncVote | MessagePurpose::ViewSyncCertificate => { let vote = deserialized_message.clone(); *vote_index += 1; - direct_poll_queue.write().await.push(vote); + poll_queue.write().await.push(vote); return false; } @@ -305,7 +302,7 @@ impl Inner { "Received DAC from web server for view {} {}", view_number, self.is_da ); - broadcast_poll_queue + poll_queue .write() .await .push(deserialized_message.clone()); @@ -318,7 +315,7 @@ impl Inner { MessagePurpose::VidDisperse => { // TODO copy-pasted from `MessagePurpose::Proposal` https://github.com/EspressoSystems/HotShot/issues/1690 - self.broadcast_poll_queue_0_1 + self.poll_queue_0_1 .write() .await .push(deserialized_message.clone()); @@ -326,15 +323,6 @@ impl Inner { // Only pushing the first proposal since we will soon only be allowing 1 proposal per view return true; } - MessagePurpose::ViewSyncCertificate => { - // TODO ED Special case this for view sync - // TODO ED Need to add vote indexing to web server for view sync certs - let cert = deserialized_message.clone(); - *vote_index += 1; - broadcast_poll_queue.write().await.push(cert); - - return false; - } MessagePurpose::Internal => { error!("Received internal message in web server network"); @@ -343,7 +331,7 @@ impl Inner { } MessagePurpose::Upgrade => { - broadcast_poll_queue + poll_queue .write() .await .push(deserialized_message.clone()); @@ -636,8 +624,7 @@ impl WebServerNetwork { let client = surf_disco::Client::::new(url); let inner = Arc::new(Inner { - broadcast_poll_queue_0_1: Arc::default(), - direct_poll_queue_0_1: Arc::default(), + poll_queue_0_1: Arc::default(), running: AtomicBool::new(true), connected: AtomicBool::new(false), client, @@ -860,45 +847,25 @@ impl ConnectedNetwork, TYPES::Signatur } } - /// Moves out the entire queue of received messages of 'transmit_type` + /// Moves out the entire queue of received messages /// /// Will unwrap the underlying `NetworkMessage` /// blocking fn recv_msgs<'a, 'b>( &'a self, - transmit_type: TransmitType, ) -> BoxSyncFuture<'b, Result>, NetworkError>> where 'a: 'b, Self: 'b, { let closure = async move { - match transmit_type { - TransmitType::Direct => { - let mut queue = self.inner.direct_poll_queue_0_1.write().await; - Ok(queue - .drain(..) - .collect::>() - .iter() - .map(|x| x.get_message().unwrap()) - .collect()) - } - TransmitType::Broadcast => { - let mut queue = self.inner.broadcast_poll_queue_0_1.write().await; - Ok(queue - .drain(..) - .collect::>() - .iter() - .map(|x| x.get_message().unwrap()) - .collect()) - } - TransmitType::DACommitteeBroadcast => { - error!("Received DACommitteeBroadcast, it should have not happened."); - Err(NetworkError::WebServer { - source: WebServerNetworkError::ClientDisconnected, - }) - } - } + let mut queue = self.inner.poll_queue_0_1.write().await; + Ok(queue + .drain(..) + .collect::>() + .iter() + .map(|x| x.get_message().unwrap()) + .collect()) }; boxed_sync(closure) } diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index 49c9a4081e..f2cf270ed7 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -18,7 +18,7 @@ use hotshot_example_types::{ use hotshot_types::message::Message; use hotshot_types::signature_key::BLSPubKey; use hotshot_types::traits::network::TestableNetworkingImplementation; -use hotshot_types::traits::network::{ConnectedNetwork, TransmitType}; +use hotshot_types::traits::network::ConnectedNetwork; use hotshot_types::traits::node_implementation::{ConsensusTime, NodeType}; use hotshot_types::{ data::ViewNumber, @@ -188,7 +188,7 @@ async fn memory_network_direct_queue() { .await .expect("Failed to message node"); let mut recv_messages = network2 - .recv_msgs(TransmitType::Direct) + .recv_msgs() .await .expect("Failed to receive message"); let recv_message = recv_messages.pop().unwrap(); @@ -206,7 +206,7 @@ async fn memory_network_direct_queue() { .await .expect("Failed to message node"); let mut recv_messages = network1 - .recv_msgs(TransmitType::Direct) + .recv_msgs() .await .expect("Failed to receive message"); let recv_message = recv_messages.pop().unwrap(); @@ -255,7 +255,7 @@ async fn memory_network_broadcast_queue() { .await .expect("Failed to message node"); let mut recv_messages = network2 - .recv_msgs(TransmitType::Broadcast) + .recv_msgs() .await .expect("Failed to receive message"); let recv_message = recv_messages.pop().unwrap(); @@ -276,7 +276,7 @@ async fn memory_network_broadcast_queue() { .await .expect("Failed to message node"); let mut recv_messages = network1 - .recv_msgs(TransmitType::Broadcast) + .recv_msgs() .await .expect("Failed to receive message"); let recv_message = recv_messages.pop().unwrap(); @@ -339,15 +339,15 @@ async fn memory_network_test_in_flight_message_count() { } while network1.in_flight_message_count().unwrap() > 0 { - network1.recv_msgs(TransmitType::Broadcast).await.unwrap(); + network1.recv_msgs().await.unwrap(); } while network2.in_flight_message_count().unwrap() > messages.len() { - network2.recv_msgs(TransmitType::Direct).await.unwrap(); + network2.recv_msgs().await.unwrap(); } while network2.in_flight_message_count().unwrap() > 0 { - network2.recv_msgs(TransmitType::Broadcast).await.unwrap(); + network2.recv_msgs().await.unwrap(); } assert_eq!(network1.in_flight_message_count(), Some(0)); diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index d1fed267a1..cd87548391 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -283,7 +283,6 @@ pub trait ConnectedNetwork: /// blocking fn recv_msgs<'a, 'b>( &'a self, - transmit_type: TransmitType, ) -> BoxSyncFuture<'b, Result, NetworkError>> where 'a: 'b,