diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 3bee58b19f..8433ed6121 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -107,6 +107,7 @@ pub fn add_network_message_task< let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState { internal_event_stream: handle.internal_event_stream.0.clone(), external_event_stream: handle.output_event_stream.0.clone(), + public_key: handle.public_key().clone(), }; let upgrade_lock = handle.hotshot.upgrade_lock.clone(); diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 297970966f..2bb7eba49c 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -7,7 +7,7 @@ //! Networking Implementation that has a primary and a fallback network. If the primary //! Errors we will use the backup to send or receive use std::{ - collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap}, + collections::{hash_map::DefaultHasher, BTreeMap, HashMap}, future::Future, hash::{Hash, Hasher}, num::NonZeroUsize, @@ -391,7 +391,7 @@ impl ConnectedNetwork for CombinedNetworks async fn da_broadcast_message( &self, message: Vec, - recipients: BTreeSet, + recipients: Vec, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { let primary = self.primary().clone(); diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 936f905318..32551fba72 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -885,7 +885,7 @@ impl ConnectedNetwork for Libp2pNetwork { async fn da_broadcast_message( &self, message: Vec, - recipients: BTreeSet, + recipients: Vec, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { // If we're not ready, return an error diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index e3993073c6..54fcdd9d1b 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -11,7 +11,6 @@ use core::time::Duration; use std::{ - collections::BTreeSet, fmt::Debug, sync::{ atomic::{AtomicUsize, Ordering}, @@ -305,7 +304,7 @@ impl ConnectedNetwork for MemoryNetwork { async fn da_broadcast_message( &self, message: Vec, - recipients: BTreeSet, + recipients: Vec, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { // Iterate over all topics, compare to recipients, and get the `Topic` diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network.rs index 94b3f8d30c..dc38f979cf 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network.rs @@ -6,7 +6,7 @@ #[cfg(feature = "hotshot-testing")] use std::sync::atomic::{AtomicBool, Ordering}; -use std::{collections::BTreeSet, marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc}; #[cfg(feature = "hotshot-testing")] use std::{path::Path, time::Duration}; @@ -487,7 +487,7 @@ impl ConnectedNetwork for PushCdnNetwork { async fn da_broadcast_message( &self, message: Vec, - _recipients: BTreeSet, + _recipients: Vec, _broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError> { self.broadcast_message(message, Topic::Da) diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs index 8f5a79824a..5945e31ceb 100644 --- a/crates/hotshot/src/types/handle.rs +++ b/crates/hotshot/src/types/handle.rs @@ -22,11 +22,14 @@ use hotshot_types::{ consensus::Consensus, data::{Leaf, QuorumProposal}, error::HotShotError, - message::Proposal, + message::{Message, MessageKind, Proposal, RecipientList}, request_response::ProposalRequestPayload, traits::{ - consensus_api::ConsensusApi, election::Membership, network::ConnectedNetwork, - node_implementation::NodeType, signature_key::SignatureKey, + consensus_api::ConsensusApi, + election::Membership, + network::{BroadcastDelay, ConnectedNetwork, Topic}, + node_implementation::NodeType, + signature_key::SignatureKey, }, vote::HasViewNumber, }; @@ -88,6 +91,43 @@ impl + 'static, V: Versions> self.output_event_stream.1.activate_cloned() } + /// Message other participents with a serialized message from the application + /// Receivers of this message will get an `Event::ExternalMessageReceived` via + /// the event stream. + /// + /// # Errors + /// Errors if serializing the request fails, or the request fails to be sent + pub async fn send_external_message( + &self, + msg: Vec, + recipients: RecipientList, + ) -> Result<()> { + let message = Message { + sender: self.public_key().clone(), + kind: MessageKind::External(msg), + }; + let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?; + + match recipients { + RecipientList::Broadcast => { + self.network + .broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None) + .await?; + } + RecipientList::Direct(recipient) => { + self.network + .direct_message(serialized_message, recipient) + .await?; + } + RecipientList::Many(recipients) => { + self.network + .da_broadcast_message(serialized_message, recipients, BroadcastDelay::None) + .await?; + } + } + Ok(()) + } + /// Request a proposal from the all other nodes. Will block until some node /// returns a valid proposal with the requested commitment. If nobody has the /// proposal this will block forever diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index b841d6a15c..ca5f6c7aeb 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -46,6 +46,9 @@ pub struct NetworkMessageTaskState { /// Sender to send external events this task generates to the event stream pub external_event_stream: Sender>, + + /// This nodes public key + pub public_key: TYPES::SignatureKey, } impl NetworkMessageTaskState { @@ -160,11 +163,14 @@ impl NetworkMessageTaskState { // Handle external messages MessageKind::External(data) => { + if sender == self.public_key { + return; + } // Send the external message to the external event stream so it can be processed broadcast_event( Event { view_number: TYPES::Time::new(1), - event: EventType::ExternalMessageReceived(data), + event: EventType::ExternalMessageReceived { sender, data }, }, &self.external_event_stream, ) @@ -571,20 +577,12 @@ impl< .await } TransmitType::DaCommitteeBroadcast => { - net.da_broadcast_message(serialized_message, da_committee, broadcast_delay) - .await - } - TransmitType::DaCommitteeAndLeaderBroadcast(recipient) => { - if let Err(e) = net - .direct_message(serialized_message.clone(), recipient) - .await - { - warn!("Failed to send message: {e:?}"); - } - - // Otherwise, send the next message. - net.da_broadcast_message(serialized_message, da_committee, broadcast_delay) - .await + net.da_broadcast_message( + serialized_message, + da_committee.iter().cloned().collect(), + broadcast_delay, + ) + .await } }; diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index b370f8d279..54eab9d4fa 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -40,12 +40,12 @@ use hotshot_types::{ utils::{View, ViewInner}, vid::{vid_scheme, VidCommitment, VidProposal, VidSchemeType}, vote::{Certificate, HasViewNumber, Vote}, + ValidatorConfig, }; use jf_vid::VidScheme; use serde::Serialize; -use crate::test_builder::TestDescription; - +use crate::{test_builder::TestDescription, test_launcher::TestLauncher}; /// create the [`SystemContextHandle`] from a node id /// # Panics /// if cannot create a [`HotShotInitializer`] @@ -67,18 +67,46 @@ pub async fn build_system_handle< let builder: TestDescription = TestDescription::default_multiple_rounds(); let launcher = builder.gen_launcher(node_id); + build_system_handle_from_launcher(node_id, &launcher).await +} +/// create the [`SystemContextHandle`] from a node id and `TestLauncher` +/// # Panics +/// if cannot create a [`HotShotInitializer`] +pub async fn build_system_handle_from_launcher< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Storage = TestStorage, + AuctionResultsProvider = TestAuctionResultsProvider, + > + TestableNodeImplementation, + V: Versions, +>( + node_id: u64, + launcher: &TestLauncher, +) -> ( + SystemContextHandle, + Sender>>, + Receiver>>, +) { let network = (launcher.resource_generator.channel_generator)(node_id).await; let storage = (launcher.resource_generator.storage)(node_id); let marketplace_config = (launcher.resource_generator.marketplace_config)(node_id); - let config = launcher.resource_generator.config.clone(); + let mut config = launcher.resource_generator.config.clone(); let initializer = HotShotInitializer::::from_genesis::(TestInstanceState::new( - launcher.metadata.async_delay_config, + launcher.metadata.async_delay_config.clone(), )) .await .unwrap(); + // See whether or not we should be DA + let is_da = node_id < config.da_staked_committee_size as u64; + + // We assign node's public key and stake value rather than read from config file since it's a test + let validator_config = + ValidatorConfig::generated_from_seed_indexed([0u8; 32], node_id, 1, is_da); + config.my_own_validator_config = validator_config; let private_key = config.my_own_validator_config.private_key.clone(); let public_key = config.my_own_validator_config.public_key.clone(); diff --git a/crates/testing/src/test_task.rs b/crates/testing/src/test_task.rs index 9df6a23d0d..8d685914d3 100644 --- a/crates/testing/src/test_task.rs +++ b/crates/testing/src/test_task.rs @@ -161,11 +161,13 @@ pub async fn add_network_message_test_task< external_event_stream: Sender>, upgrade_lock: UpgradeLock, channel: Arc, + public_key: TYPES::SignatureKey, ) -> JoinHandle<()> { let net = Arc::clone(&channel); let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState { internal_event_stream: internal_event_stream.clone(), external_event_stream: external_event_stream.clone(), + public_key, }; let network = Arc::clone(&net); diff --git a/crates/testing/tests/tests_1/network_task.rs b/crates/testing/tests/tests_1/network_task.rs index 03a855a05f..272d132ae3 100644 --- a/crates/testing/tests/tests_1/network_task.rs +++ b/crates/testing/tests/tests_1/network_task.rs @@ -84,6 +84,7 @@ async fn test_network_task() { out_tx_external.clone(), upgrade_lock, network.clone(), + public_key, ) .await; @@ -104,6 +105,109 @@ async fn test_network_task() { )); } +#[cfg(test)] +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_network_external_mnessages() { + use hotshot::types::EventType; + use hotshot_testing::helpers::build_system_handle_from_launcher; + use hotshot_types::message::RecipientList; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + let builder: TestDescription = + TestDescription::default_multiple_rounds(); + + let launcher = builder.gen_launcher(0); + + let mut handles = vec![]; + let mut event_streams = vec![]; + for i in 0..launcher.metadata.num_nodes_with_stake { + let handle = build_system_handle_from_launcher::( + i.try_into().unwrap(), + &launcher, + ) + .await + .0; + event_streams.push(handle.event_stream_known_impl()); + handles.push(handle); + } + + // Send a message from 1 -> 2 + handles[1] + .send_external_message(vec![1, 2], RecipientList::Direct(handles[2].public_key())) + .await + .unwrap(); + let event = async_compatibility_layer::art::async_timeout( + Duration::from_millis(100), + event_streams[2].recv(), + ) + .await + .unwrap() + .unwrap() + .event; + + // check that 2 received the message + assert!(matches!( + event, + EventType::ExternalMessageReceived { + sender, + data, + } if sender == handles[1].public_key() && data == vec![1, 2] + )); + + // Send a message from 2 -> 1 + handles[2] + .send_external_message(vec![2, 1], RecipientList::Direct(handles[1].public_key())) + .await + .unwrap(); + let event = async_compatibility_layer::art::async_timeout( + Duration::from_millis(100), + event_streams[1].recv(), + ) + .await + .unwrap() + .unwrap() + .event; + + // check that 1 received the message + assert!(matches!( + event, + EventType::ExternalMessageReceived { + sender, + data, + } if sender == handles[2].public_key() && data == vec![2,1] + )); + + // Check broadcast works + handles[0] + .send_external_message(vec![0, 0, 0], RecipientList::Broadcast) + .await + .unwrap(); + // All other nodes get the broadcast + for stream in event_streams.iter_mut().skip(1) { + let event = async_compatibility_layer::art::async_timeout( + Duration::from_millis(100), + stream.recv(), + ) + .await + .unwrap() + .unwrap() + .event; + assert!(matches!( + event, + EventType::ExternalMessageReceived { + sender, + data, + } if sender == handles[0].public_key() && data == vec![0,0,0] + )); + } + // No event on 0 even after short sleep + async_compatibility_layer::art::async_sleep(Duration::from_millis(2)).await; + assert!(event_streams[0].is_empty()); +} + #[cfg(test)] #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] @@ -161,6 +265,7 @@ async fn test_network_storage_fail() { out_tx_external.clone(), upgrade_lock, network.clone(), + public_key, ) .await; diff --git a/crates/types/src/event.rs b/crates/types/src/event.rs index 84f052d651..e71d0c8196 100644 --- a/crates/types/src/event.rs +++ b/crates/types/src/event.rs @@ -92,6 +92,7 @@ pub mod error_adaptor { Ok(Arc::new(HotShotError::FailedToDeserialize(str))) } } + /// The type and contents of a status event emitted by a `HotShot` instance /// /// This enum does not include metadata shared among all variants, such as the stage and view @@ -171,7 +172,12 @@ pub enum EventType { }, /// A message destined for external listeners was received - ExternalMessageReceived(Vec), + ExternalMessageReceived { + /// Public Key of the message sender + sender: TYPES::SignatureKey, + /// Serialized data of the message + data: Vec, + }, } #[derive(Debug, Serialize, Deserialize, Clone, Copy)] /// A list of actions that we track for nodes diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 407845b001..ab72986652 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -9,7 +9,11 @@ //! This module contains types used to represent the various types of messages that //! `HotShot` nodes can send among themselves. -use std::{fmt, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{ + fmt::{self, Debug}, + marker::PhantomData, + sync::Arc, +}; use anyhow::{bail, ensure, Context, Result}; use async_lock::RwLock; @@ -117,6 +121,16 @@ pub enum MessageKind { External(Vec), } +/// List of keys to send a message to, or broadcast to all known keys +pub enum RecipientList { + /// Broadcast to all + Broadcast, + /// Send a message directly to a key + Direct(K), + /// Send a message directly to many keys + Many(Vec), +} + impl MessageKind { // Can't implement `From` directly due to potential conflict with // `From`. diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 40a55423cc..97851e6963 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -17,7 +17,7 @@ use thiserror::Error; #[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."} use std::{ - collections::{BTreeSet, HashMap}, + collections::HashMap, fmt::{Debug, Display}, hash::Hash, pin::Pin, @@ -54,8 +54,6 @@ pub enum TransmitType { Broadcast, /// broadcast to DA committee DaCommitteeBroadcast, - /// broadcast to the leader and the DA - DaCommitteeAndLeaderBroadcast(TYPES::SignatureKey), } /// Errors that can occur in the network @@ -122,17 +120,6 @@ pub enum NetworkError { LookupError(String), } -/// common traits we would like our network messages to implement -pub trait NetworkMsg: - Serialize + for<'a> Deserialize<'a> + Clone + Sync + Send + Debug + 'static -{ -} - -impl NetworkMsg for T where - T: Serialize + for<'a> Deserialize<'a> + Clone + Sync + Send + Debug + 'static -{ -} - /// Trait that bundles what we need from a request ID pub trait Id: Eq + PartialEq + Hash {} @@ -230,7 +217,7 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st async fn da_broadcast_message( &self, message: Vec, - recipients: BTreeSet, + recipients: Vec, broadcast_delay: BroadcastDelay, ) -> Result<(), NetworkError>;