Skip to content

Commit

Permalink
Received direct and broadcast messages are handled in the same way (#…
Browse files Browse the repository at this point in the history
…2641)

* Poll direct messages and broadcast messages in parallel

* Merge broadcast and direct message channels

* Merge broadcast and direct message queues in web impl

* Merge identical match arms
  • Loading branch information
lukaszrzasik authored Feb 27, 2024
1 parent 5e6730a commit b74d5ef
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 304 deletions.
32 changes: 5 additions & 27 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use hotshot_types::{
use hotshot_types::{
message::Messages,
traits::{
network::{ConsensusIntentEvent, TransmitType},
network::ConsensusIntentEvent,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
},
};
Expand Down Expand Up @@ -63,12 +63,12 @@ pub async fn add_network_message_task<
// https://github.com/EspressoSystems/HotShot/issues/2377
let network = net.clone();
let mut state = network_state.clone();
let broadcast_handle = async_spawn(async move {
let handle = async_spawn(async move {
loop {
let msgs = match network.recv_msgs(TransmitType::Broadcast).await {
let msgs = match network.recv_msgs().await {
Ok(msgs) => Messages(msgs),
Err(err) => {
error!("failed to receive broadcast messages: {err}");
error!("failed to receive messages: {err}");

// return zero messages so we sleep and try again
Messages(vec![])
Expand All @@ -82,29 +82,7 @@ pub async fn add_network_message_task<
}
}
});
let network = net.clone();
let mut state = network_state.clone();
let direct_handle = async_spawn(async move {
loop {
let msgs = match network.recv_msgs(TransmitType::Direct).await {
Ok(msgs) => Messages(msgs),
Err(err) => {
error!("failed to receive direct messages: {err}");

// return zero messages so we sleep and try again
Messages(vec![])
}
};
if msgs.0.is_empty() {
// TODO: Stop sleeping here: https://github.com/EspressoSystems/HotShot/issues/2558
async_sleep(Duration::from_millis(100)).await;
} else {
state.handle_messages(msgs.0).await;
}
}
});
task_reg.register(direct_handle).await;
task_reg.register(broadcast_handle).await;
task_reg.register(handle).await;
}
/// Add the network task to handle events and send messages.
pub async fn add_network_event_task<
Expand Down
12 changes: 4 additions & 8 deletions crates/hotshot/src/traits/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ pub struct NetworkingMetricsValue {
#[allow(dead_code)]
/// A [`Gauge`] which tracks how many peers are connected
pub connected_peers: Box<dyn Gauge>,
/// A [`Counter`] which tracks how many messages have been received directly
pub incoming_direct_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been received by broadcast
pub incoming_broadcast_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been received
pub incoming_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been send directly
pub outgoing_direct_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been send by broadcast
Expand Down Expand Up @@ -163,10 +161,8 @@ impl NetworkingMetricsValue {
pub fn new(metrics: &dyn Metrics) -> Self {
Self {
connected_peers: metrics.create_gauge(String::from("connected_peers"), None),
incoming_direct_message_count: metrics
.create_counter(String::from("incoming_direct_message_count"), None),
incoming_broadcast_message_count: metrics
.create_counter(String::from("incoming_broadcast_message_count"), None),
incoming_message_count: metrics
.create_counter(String::from("incoming_message_count"), None),
outgoing_direct_message_count: metrics
.create_counter(String::from("outgoing_direct_message_count"), None),
outgoing_broadcast_message_count: metrics
Expand Down
7 changes: 3 additions & 4 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use hotshot_types::{
data::ViewNumber,
message::Message,
traits::{
network::{ConnectedNetwork, ConsensusIntentEvent, TransmitType},
network::{ConnectedNetwork, ConsensusIntentEvent},
node_implementation::NodeType,
},
BoxSyncFuture,
Expand Down Expand Up @@ -319,7 +319,6 @@ impl<TYPES: NodeType> ConnectedNetwork<Message<TYPES>, TYPES::SignatureKey>

fn recv_msgs<'a, 'b>(
&'a self,
transmit_type: TransmitType,
) -> BoxSyncFuture<'b, Result<Vec<Message<TYPES>>, NetworkError>>
where
'a: 'b,
Expand All @@ -328,8 +327,8 @@ impl<TYPES: NodeType> ConnectedNetwork<Message<TYPES>, TYPES::SignatureKey>
// recv on both networks because nodes may be accessible only on either. discard duplicates
// TODO: improve this algorithm: https://github.com/EspressoSystems/HotShot/issues/2089
let closure = async move {
let mut primary_msgs = self.primary().recv_msgs(transmit_type).await?;
let mut secondary_msgs = self.secondary().recv_msgs(transmit_type).await?;
let mut primary_msgs = self.primary().recv_msgs().await?;
let mut secondary_msgs = self.secondary().recv_msgs().await?;

primary_msgs.append(secondary_msgs.as_mut());

Expand Down
83 changes: 25 additions & 58 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hotshot_types::{
traits::{
network::{
ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError,
NetworkMsg, TransmitType, ViewMessage,
NetworkMsg, ViewMessage,
},
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
Expand Down Expand Up @@ -97,13 +97,9 @@ struct Libp2pNetworkInner<M: NetworkMsg, K: SignatureKey + 'static> {
/// handle to control the network
handle: Arc<NetworkNodeHandle<()>>,
/// map of known replica peer ids to public keys
broadcast_recv: UnboundedReceiver<M>,
receiver: UnboundedReceiver<M>,
/// Sender for broadcast messages
broadcast_send: UnboundedSender<M>,
/// Sender for direct messages (only used for sending messages back to oneself)
direct_send: UnboundedSender<M>,
/// Receiver for direct messages
direct_recv: UnboundedReceiver<M>,
sender: UnboundedSender<M>,
/// Sender for node lookup (relevant view number, key of node) (None for shutdown)
node_lookup_send: UnboundedSender<Option<(ViewNumber, K)>>,
/// this is really cheating to enable local tests
Expand Down Expand Up @@ -351,18 +347,15 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {

// unbounded channels may not be the best choice (spammed?)
// if bounded figure out a way to log dropped msgs
let (direct_send, direct_recv) = unbounded();
let (broadcast_send, broadcast_recv) = unbounded();
let (sender, receiver) = unbounded();
let (node_lookup_send, node_lookup_recv) = unbounded();

let mut result = Libp2pNetwork {
inner: Arc::new(Libp2pNetworkInner {
handle: network_handle,
broadcast_recv,
direct_send: direct_send.clone(),
direct_recv,
receiver,
sender: sender.clone(),
pk,
broadcast_send: broadcast_send.clone(),
bootstrap_addrs_len,
bootstrap_addrs,
is_ready: Arc::new(AtomicBool::new(false)),
Expand All @@ -383,7 +376,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
}),
};

result.handle_event_generator(direct_send, broadcast_send);
result.handle_event_generator(sender);
result.spawn_node_lookup(node_lookup_recv);
result.spawn_connect(id);

Expand Down Expand Up @@ -514,14 +507,13 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
async fn handle_recvd_events_0_1(
&self,
msg: NetworkEvent,
direct_send: &UnboundedSender<M>,
broadcast_send: &UnboundedSender<M>,
sender: &UnboundedSender<M>,
) -> Result<(), NetworkError> {
match msg {
GossipMsg(msg, _) => {
let result: Result<M, _> = bincode_opts().deserialize(&msg);
if let Ok(result) = result {
broadcast_send
sender
.send(result)
.await
.map_err(|_| NetworkError::ChannelSend)?;
Expand All @@ -532,7 +524,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
.deserialize(&msg)
.context(FailedToSerializeSnafu);
if let Ok(result) = result {
direct_send
sender
.send(result)
.await
.map_err(|_| NetworkError::ChannelSend)?;
Expand Down Expand Up @@ -568,8 +560,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
/// terminates on shut down of network
fn handle_event_generator(
&self,
direct_send: UnboundedSender<M>,
broadcast_send: UnboundedSender<M>,
sender: UnboundedSender<M>,
) {
let handle = self.clone();
let is_bootstrapped = self.inner.is_bootstrapped.clone();
Expand All @@ -584,7 +575,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
match message_version {
Some(VERSION_0_1) => {
let _ = handle
.handle_recvd_events_0_1(message, &direct_send, &broadcast_send)
.handle_recvd_events_0_1(message, &sender)
.await;
}
Some(version) => {
Expand Down Expand Up @@ -675,7 +666,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
if recipients.contains(&self.inner.pk) {
// send to self
self.inner
.broadcast_send
.sender
.send(message.clone())
.await
.map_err(|_| NetworkError::ShutDown)?;
Expand Down Expand Up @@ -763,7 +754,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
if recipient == self.inner.pk {
// panic if we already shut down?
self.inner
.direct_send
.sender
.send(message)
.await
.map_err(|_x| NetworkError::ShutDown)?;
Expand Down Expand Up @@ -830,7 +821,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
#[instrument(name = "Libp2pNetwork::recv_msgs", skip_all)]
fn recv_msgs<'a, 'b>(
&'a self,
transmit_type: TransmitType,
) -> BoxSyncFuture<'b, Result<Vec<M>, NetworkError>>
where
'a: 'b,
Expand All @@ -840,40 +830,17 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
if self.inner.handle.is_killed() {
Err(NetworkError::ShutDown)
} else {
match transmit_type {
TransmitType::Direct => {
let result = self
.inner
.direct_recv
.drain_at_least_one()
.await
.map_err(|_x| NetworkError::ShutDown)?;
self.inner
.metrics
.incoming_direct_message_count
.add(result.len());
Ok(result)
}
TransmitType::Broadcast => {
let result = self
.inner
.broadcast_recv
.drain_at_least_one()
.await
.map_err(|_x| NetworkError::ShutDown)?;
self.inner
.metrics
.incoming_direct_message_count
.add(result.len());
Ok(result)
}
TransmitType::DACommitteeBroadcast => {
error!("Received DACommitteeBroadcast, it should have not happened.");
Err(NetworkError::Libp2p {
source: NetworkNodeHandleError::Killed,
})
}
}
let result = self
.inner
.receiver
.drain_at_least_one()
.await
.map_err(|_x| NetworkError::ShutDown)?;
self.inner
.metrics
.incoming_message_count
.add(result.len());
Ok(result)
}
};
boxed_sync(closure)
Expand Down
Loading

0 comments on commit b74d5ef

Please sign in to comment.