Skip to content

Commit

Permalink
[TECH DEBT] Improve ConnectedNetwork - recv_message (#3671)
Browse files Browse the repository at this point in the history
* improve `ConnectedNetwork`

* remove accidental `run_config` change

* fix `memory network` test
  • Loading branch information
rob-maron authored Sep 12, 2024
1 parent 7879bf0 commit 2bd0696
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 230 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ blake3.workspace = true
sha2 = { workspace = true }
url = { workspace = true }
num_enum = "0.7"
parking_lot = "0.12"

[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
tokio = { workspace = true }
Expand Down
70 changes: 27 additions & 43 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
/// Provides trait to create task states from a `SystemContextHandle`
pub mod task_state;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, sync::Arc};

use async_broadcast::broadcast;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
use async_trait::async_trait;
use futures::{
Expand All @@ -35,7 +35,7 @@ use hotshot_task_impls::{
use hotshot_types::{
consensus::Consensus,
constants::EVENT_CHANNEL_SIZE,
message::{Messages, UpgradeLock},
message::{Message, UpgradeLock},
request_response::RequestReceiver,
traits::{
network::ConnectedNetwork,
Expand Down Expand Up @@ -119,53 +119,37 @@ pub fn add_network_message_task<
let task_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);

let recv_stream = stream::unfold((), |()| async {
let msgs = match network.recv_msgs().await {
Ok(msgs) => {
let mut deserialized_messages = Vec::new();
for msg in msgs {
let deserialized_message = match upgrade_lock.deserialize(&msg).await {
Ok(deserialized) => deserialized,
Err(e) => {
tracing::error!("Failed to deserialize message: {}", e);
continue;
}
};
deserialized_messages.push(deserialized_message);
}
Messages(deserialized_messages)
}
Err(err) => {
tracing::error!("failed to receive messages: {err}");
Messages(vec![])
}
};
Some((msgs, ()))
});

let fused_recv_stream = recv_stream.boxed().fuse();
futures::pin_mut!(fused_recv_stream);

loop {
// Wait for one of the following to resolve:
futures::select! {
// Wait for a shutdown signal
() = shutdown_signal => {
tracing::error!("Shutting down network message task");
return;
}
msgs_option = fused_recv_stream.next() => {
if let Some(msgs) = msgs_option {
if msgs.0.is_empty() {
// TODO: Stop sleeping here: https://github.com/EspressoSystems/HotShot/issues/2558
async_sleep(Duration::from_millis(100)).await;
} else {
state.handle_messages(msgs.0).await;

// Wait for a message from the network
message = network.recv_message().fuse() => {
// Make sure the message did not fail
let message = match message {
Ok(message) => message,
Err(e) => {
tracing::error!("Failed to receive message: {:?}", e);
continue;
}
} else {
// Stream has ended, which shouldn't happen in this case.
// You might want to handle this situation, perhaps by breaking the loop or logging an error.
tracing::error!("Network message stream unexpectedly ended");
return;
}
};

// Deserialize the message
let deserialized_message: Message<TYPES> = match upgrade_lock.deserialize(&message).await {
Ok(message) => message,
Err(e) => {
tracing::error!("Failed to deserialize message: {:?}", e);
continue;
}
};

// Handle the message
state.handle_message(deserialized_message).await;
}
}
}
Expand Down
48 changes: 22 additions & 26 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::{
time::Duration,
};

use parking_lot::RwLock as PlRwLock;

use async_broadcast::{broadcast, InactiveReceiver, Sender};
use async_compatibility_layer::{
art::{async_sleep, async_spawn},
Expand Down Expand Up @@ -68,7 +70,7 @@ pub struct CombinedNetworks<TYPES: NodeType> {
networks: Arc<UnderlyingCombinedNetworks<TYPES>>,

/// Last n seen messages to prevent processing duplicates
message_cache: Arc<RwLock<LruCache<u64, ()>>>,
message_cache: Arc<PlRwLock<LruCache<u64, ()>>>,

/// How many times primary failed to deliver
primary_fail_counter: Arc<AtomicU64>,
Expand Down Expand Up @@ -106,7 +108,7 @@ impl<TYPES: NodeType> CombinedNetworks<TYPES> {

Self {
networks,
message_cache: Arc::new(RwLock::new(LruCache::new(
message_cache: Arc::new(PlRwLock::new(LruCache::new(
NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
))),
primary_fail_counter: Arc::new(AtomicU64::new(0)),
Expand Down Expand Up @@ -304,7 +306,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
);

// We want to use the same message cache between the two networks
let message_cache = Arc::new(RwLock::new(LruCache::new(
let message_cache = Arc::new(PlRwLock::new(LruCache::new(
NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
)));

Expand Down Expand Up @@ -466,35 +468,29 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks
///
/// # Errors
/// Does not error
async fn recv_msgs(&self) -> Result<Vec<Vec<u8>>, NetworkError> {
// recv on both networks because nodes may be accessible only on either. discard duplicates
// TODO: improve this algorithm: https://github.com/EspressoSystems/HotShot/issues/2089
let mut primary_fut = self.primary().recv_msgs().fuse();
let mut secondary_fut = self.secondary().recv_msgs().fuse();

let msgs = select! {
p = primary_fut => p?,
s = secondary_fut => s?,
};
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
loop {
// Receive from both networks
let mut primary_fut = self.primary().recv_message().fuse();
let mut secondary_fut = self.secondary().recv_message().fuse();

// Wait for one to return a message
let message = select! {
p = primary_fut => p?,
s = secondary_fut => s?,
};

let mut filtered_msgs = Vec::with_capacity(msgs.len());

// For each message,
for msg in msgs {
// Calculate hash of the message
let message_hash = calculate_hash_of(&msg);
let message_hash = calculate_hash_of(&message);

// Add the hash to the cache
if !self.message_cache.read().await.contains(&message_hash) {
// If the message is not in the cache, process it
filtered_msgs.push(msg.clone());
// Check if the hash is in the cache
if !self.message_cache.read().contains(&message_hash) {
// Add the hash to the cache
self.message_cache.write().put(message_hash, ());

// Add it to the cache
self.message_cache.write().await.put(message_hash, ());
break Ok(message);
}
}

Ok(filtered_msgs)
}

fn queue_node_lookup(
Expand Down
6 changes: 3 additions & 3 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,12 +1055,12 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Libp2pNetwork<K> {
///
/// # Errors
/// If there is a network-related failure.
#[instrument(name = "Libp2pNetwork::recv_msgs", skip_all)]
async fn recv_msgs(&self) -> Result<Vec<Vec<u8>>, NetworkError> {
#[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
let result = self
.inner
.receiver
.drain_at_least_one()
.recv()
.await
.map_err(|_x| NetworkError::ShutDown)?;

Expand Down
8 changes: 4 additions & 4 deletions crates/hotshot/src/traits/networking/memory_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,19 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
///
/// # Errors
/// If the other side of the channel is closed
#[instrument(name = "MemoryNetwork::recv_msgs", skip_all)]
async fn recv_msgs(&self) -> Result<Vec<Vec<u8>>, NetworkError> {
#[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
let ret = self
.inner
.output
.lock()
.await
.drain_at_least_one()
.recv()
.await
.map_err(|_x| NetworkError::ShutDown)?;
self.inner
.in_flight_message_count
.fetch_sub(ret.len(), Ordering::Relaxed);
.fetch_sub(1, Ordering::Relaxed);
Ok(ret)
}
}
4 changes: 2 additions & 2 deletions crates/hotshot/src/traits/networking/push_cdn_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
///
/// # Errors
/// - If we fail to receive messages. Will trigger a retry automatically.
async fn recv_msgs(&self) -> Result<Vec<Vec<u8>>, NetworkError> {
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
// Receive a message
let message = self.client.receive_message().await;

Expand Down Expand Up @@ -577,7 +577,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
return Ok(vec![]);
};

Ok(vec![message])
Ok(message)
}

/// Do nothing here, as we don't need to look up nodes.
Expand Down
Loading

0 comments on commit 2bd0696

Please sign in to comment.