Skip to content

Commit

Permalink
[Networking] Add a hook for application messages (#3759)
Browse files Browse the repository at this point in the history
* add a hook for application messages:wq

* revert logging change to memory network

* fix tokio

* missed one

* remove unused import

* Change recipients from `BTreeSet` to `Vec`
  • Loading branch information
bfish713 authored Oct 17, 2024
1 parent 5c9b79a commit 86df546
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 46 deletions.
1 change: 1 addition & 0 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub fn add_network_message_task<
let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState {
internal_event_stream: handle.internal_event_stream.0.clone(),
external_event_stream: handle.output_event_stream.0.clone(),
public_key: handle.public_key().clone(),
};

let upgrade_lock = handle.hotshot.upgrade_lock.clone();
Expand Down
4 changes: 2 additions & 2 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Networking Implementation that has a primary and a fallback network. If the primary
//! Errors we will use the backup to send or receive
use std::{
collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap},
collections::{hash_map::DefaultHasher, BTreeMap, HashMap},
future::Future,
hash::{Hash, Hasher},
num::NonZeroUsize,
Expand Down Expand Up @@ -391,7 +391,7 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks
async fn da_broadcast_message(
&self,
message: Vec<u8>,
recipients: BTreeSet<TYPES::SignatureKey>,
recipients: Vec<TYPES::SignatureKey>,
broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
let primary = self.primary().clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
async fn da_broadcast_message(
&self,
message: Vec<u8>,
recipients: BTreeSet<K>,
recipients: Vec<K>,
_broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
// If we're not ready, return an error
Expand Down
3 changes: 1 addition & 2 deletions crates/hotshot/src/traits/networking/memory_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use core::time::Duration;
use std::{
collections::BTreeSet,
fmt::Debug,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -305,7 +304,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
async fn da_broadcast_message(
&self,
message: Vec<u8>,
recipients: BTreeSet<K>,
recipients: Vec<K>,
broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
// Iterate over all topics, compare to recipients, and get the `Topic`
Expand Down
4 changes: 2 additions & 2 deletions crates/hotshot/src/traits/networking/push_cdn_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#[cfg(feature = "hotshot-testing")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{collections::BTreeSet, marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc};
#[cfg(feature = "hotshot-testing")]
use std::{path::Path, time::Duration};

Expand Down Expand Up @@ -487,7 +487,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
async fn da_broadcast_message(
&self,
message: Vec<u8>,
_recipients: BTreeSet<K>,
_recipients: Vec<K>,
_broadcast_delay: BroadcastDelay,
) -> Result<(), NetworkError> {
self.broadcast_message(message, Topic::Da)
Expand Down
46 changes: 43 additions & 3 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use hotshot_types::{
consensus::Consensus,
data::{Leaf, QuorumProposal},
error::HotShotError,
message::Proposal,
message::{Message, MessageKind, Proposal, RecipientList},
request_response::ProposalRequestPayload,
traits::{
consensus_api::ConsensusApi, election::Membership, network::ConnectedNetwork,
node_implementation::NodeType, signature_key::SignatureKey,
consensus_api::ConsensusApi,
election::Membership,
network::{BroadcastDelay, ConnectedNetwork, Topic},
node_implementation::NodeType,
signature_key::SignatureKey,
},
vote::HasViewNumber,
};
Expand Down Expand Up @@ -88,6 +91,43 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
self.output_event_stream.1.activate_cloned()
}

/// Message other participents with a serialized message from the application
/// Receivers of this message will get an `Event::ExternalMessageReceived` via
/// the event stream.
///
/// # Errors
/// Errors if serializing the request fails, or the request fails to be sent
pub async fn send_external_message(
&self,
msg: Vec<u8>,
recipients: RecipientList<TYPES::SignatureKey>,
) -> Result<()> {
let message = Message {
sender: self.public_key().clone(),
kind: MessageKind::External(msg),
};
let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;

match recipients {
RecipientList::Broadcast => {
self.network
.broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
.await?;
}
RecipientList::Direct(recipient) => {
self.network
.direct_message(serialized_message, recipient)
.await?;
}
RecipientList::Many(recipients) => {
self.network
.da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
.await?;
}
}
Ok(())
}

/// Request a proposal from the all other nodes. Will block until some node
/// returns a valid proposal with the requested commitment. If nobody has the
/// proposal this will block forever
Expand Down
28 changes: 13 additions & 15 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub struct NetworkMessageTaskState<TYPES: NodeType> {

/// Sender to send external events this task generates to the event stream
pub external_event_stream: Sender<Event<TYPES>>,

/// This nodes public key
pub public_key: TYPES::SignatureKey,
}

impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
Expand Down Expand Up @@ -160,11 +163,14 @@ impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {

// Handle external messages
MessageKind::External(data) => {
if sender == self.public_key {
return;
}
// Send the external message to the external event stream so it can be processed
broadcast_event(
Event {
view_number: TYPES::Time::new(1),
event: EventType::ExternalMessageReceived(data),
event: EventType::ExternalMessageReceived { sender, data },
},
&self.external_event_stream,
)
Expand Down Expand Up @@ -571,20 +577,12 @@ impl<
.await
}
TransmitType::DaCommitteeBroadcast => {
net.da_broadcast_message(serialized_message, da_committee, broadcast_delay)
.await
}
TransmitType::DaCommitteeAndLeaderBroadcast(recipient) => {
if let Err(e) = net
.direct_message(serialized_message.clone(), recipient)
.await
{
warn!("Failed to send message: {e:?}");
}

// Otherwise, send the next message.
net.da_broadcast_message(serialized_message, da_committee, broadcast_delay)
.await
net.da_broadcast_message(
serialized_message,
da_committee.iter().cloned().collect(),
broadcast_delay,
)
.await
}
};

Expand Down
36 changes: 32 additions & 4 deletions crates/testing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ use hotshot_types::{
utils::{View, ViewInner},
vid::{vid_scheme, VidCommitment, VidProposal, VidSchemeType},
vote::{Certificate, HasViewNumber, Vote},
ValidatorConfig,
};
use jf_vid::VidScheme;
use serde::Serialize;

use crate::test_builder::TestDescription;

use crate::{test_builder::TestDescription, test_launcher::TestLauncher};
/// create the [`SystemContextHandle`] from a node id
/// # Panics
/// if cannot create a [`HotShotInitializer`]
Expand All @@ -67,18 +67,46 @@ pub async fn build_system_handle<
let builder: TestDescription<TYPES, I, V> = TestDescription::default_multiple_rounds();

let launcher = builder.gen_launcher(node_id);
build_system_handle_from_launcher(node_id, &launcher).await
}

/// create the [`SystemContextHandle`] from a node id and `TestLauncher`
/// # Panics
/// if cannot create a [`HotShotInitializer`]
pub async fn build_system_handle_from_launcher<
TYPES: NodeType<InstanceState = TestInstanceState>,
I: NodeImplementation<
TYPES,
Storage = TestStorage<TYPES>,
AuctionResultsProvider = TestAuctionResultsProvider<TYPES>,
> + TestableNodeImplementation<TYPES>,
V: Versions,
>(
node_id: u64,
launcher: &TestLauncher<TYPES, I, V>,
) -> (
SystemContextHandle<TYPES, I, V>,
Sender<Arc<HotShotEvent<TYPES>>>,
Receiver<Arc<HotShotEvent<TYPES>>>,
) {
let network = (launcher.resource_generator.channel_generator)(node_id).await;
let storage = (launcher.resource_generator.storage)(node_id);
let marketplace_config = (launcher.resource_generator.marketplace_config)(node_id);
let config = launcher.resource_generator.config.clone();
let mut config = launcher.resource_generator.config.clone();

let initializer = HotShotInitializer::<TYPES>::from_genesis::<V>(TestInstanceState::new(
launcher.metadata.async_delay_config,
launcher.metadata.async_delay_config.clone(),
))
.await
.unwrap();

// See whether or not we should be DA
let is_da = node_id < config.da_staked_committee_size as u64;

// We assign node's public key and stake value rather than read from config file since it's a test
let validator_config =
ValidatorConfig::generated_from_seed_indexed([0u8; 32], node_id, 1, is_da);
config.my_own_validator_config = validator_config;
let private_key = config.my_own_validator_config.private_key.clone();
let public_key = config.my_own_validator_config.public_key.clone();

Expand Down
2 changes: 2 additions & 0 deletions crates/testing/src/test_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ pub async fn add_network_message_test_task<
external_event_stream: Sender<Event<TYPES>>,
upgrade_lock: UpgradeLock<TYPES, V>,
channel: Arc<NET>,
public_key: TYPES::SignatureKey,
) -> JoinHandle<()> {
let net = Arc::clone(&channel);
let network_state: NetworkMessageTaskState<_> = NetworkMessageTaskState {
internal_event_stream: internal_event_stream.clone(),
external_event_stream: external_event_stream.clone(),
public_key,
};

let network = Arc::clone(&net);
Expand Down
105 changes: 105 additions & 0 deletions crates/testing/tests/tests_1/network_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn test_network_task() {
out_tx_external.clone(),
upgrade_lock,
network.clone(),
public_key,
)
.await;

Expand All @@ -104,6 +105,109 @@ async fn test_network_task() {
));
}

#[cfg(test)]
#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_network_external_mnessages() {
use hotshot::types::EventType;
use hotshot_testing::helpers::build_system_handle_from_launcher;
use hotshot_types::message::RecipientList;

async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();

let builder: TestDescription<TestTypes, MemoryImpl, TestVersions> =
TestDescription::default_multiple_rounds();

let launcher = builder.gen_launcher(0);

let mut handles = vec![];
let mut event_streams = vec![];
for i in 0..launcher.metadata.num_nodes_with_stake {
let handle = build_system_handle_from_launcher::<TestTypes, MemoryImpl, TestVersions>(
i.try_into().unwrap(),
&launcher,
)
.await
.0;
event_streams.push(handle.event_stream_known_impl());
handles.push(handle);
}

// Send a message from 1 -> 2
handles[1]
.send_external_message(vec![1, 2], RecipientList::Direct(handles[2].public_key()))
.await
.unwrap();
let event = async_compatibility_layer::art::async_timeout(
Duration::from_millis(100),
event_streams[2].recv(),
)
.await
.unwrap()
.unwrap()
.event;

// check that 2 received the message
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[1].public_key() && data == vec![1, 2]
));

// Send a message from 2 -> 1
handles[2]
.send_external_message(vec![2, 1], RecipientList::Direct(handles[1].public_key()))
.await
.unwrap();
let event = async_compatibility_layer::art::async_timeout(
Duration::from_millis(100),
event_streams[1].recv(),
)
.await
.unwrap()
.unwrap()
.event;

// check that 1 received the message
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[2].public_key() && data == vec![2,1]
));

// Check broadcast works
handles[0]
.send_external_message(vec![0, 0, 0], RecipientList::Broadcast)
.await
.unwrap();
// All other nodes get the broadcast
for stream in event_streams.iter_mut().skip(1) {
let event = async_compatibility_layer::art::async_timeout(
Duration::from_millis(100),
stream.recv(),
)
.await
.unwrap()
.unwrap()
.event;
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[0].public_key() && data == vec![0,0,0]
));
}
// No event on 0 even after short sleep
async_compatibility_layer::art::async_sleep(Duration::from_millis(2)).await;
assert!(event_streams[0].is_empty());
}

#[cfg(test)]
#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
Expand Down Expand Up @@ -161,6 +265,7 @@ async fn test_network_storage_fail() {
out_tx_external.clone(),
upgrade_lock,
network.clone(),
public_key,
)
.await;

Expand Down
Loading

0 comments on commit 86df546

Please sign in to comment.