diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 7380b67db0..6d3e279054 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -60,15 +60,15 @@ jobs: with: name: binaries-amd64-${{ matrix.just_variants }} path: | - target/debug/examples/counter - target/debug/examples/multi-validator-libp2p - target/debug/examples/orchestrator-libp2p - target/debug/examples/validator-libp2p - target/debug/examples/multi-validator-webserver - target/debug/examples/multi-webserver - target/debug/examples/webserver - target/debug/examples/orchestrator-webserver - target/debug/examples/validator-webserver + target/release/examples/counter + target/release/examples/multi-validator-libp2p + target/release/examples/orchestrator-libp2p + target/release/examples/validator-libp2p + target/release/examples/multi-validator-webserver + target/release/examples/multi-webserver + target/release/examples/webserver + target/release/examples/orchestrator-webserver + target/release/examples/validator-webserver build-arm: strategy: @@ -98,15 +98,15 @@ jobs: with: name: binaries-aarch64-${{ matrix.just_variants }} path: | - target/debug/examples/counter - target/debug/examples/multi-validator-libp2p - target/debug/examples/orchestrator-libp2p - target/debug/examples/validator-libp2p - target/debug/examples/multi-validator-webserver - target/debug/examples/multi-webserver - target/debug/examples/webserver - target/debug/examples/orchestrator-webserver - target/debug/examples/validator-webserver + target/release/examples/counter + target/release/examples/multi-validator-libp2p + target/release/examples/orchestrator-libp2p + target/release/examples/validator-libp2p + target/release/examples/multi-validator-webserver + target/release/examples/multi-webserver + target/release/examples/webserver + target/release/examples/orchestrator-webserver + target/release/examples/validator-webserver build-dockers: strategy: diff --git a/crates/hotshot/examples/infra/mod.rs b/crates/hotshot/examples/infra/mod.rs index 287769a0a9..c709eaff8b 100644 --- a/crates/hotshot/examples/infra/mod.rs +++ b/crates/hotshot/examples/infra/mod.rs @@ -239,22 +239,22 @@ async fn libp2p_network_from_config( config_builder.to_connect_addrs(to_connect_addrs); let mesh_params = -// NOTE I'm arbitrarily choosing these. -match node_type { - NetworkNodeType::Bootstrap => MeshParams { - mesh_n_high: libp2p_config.bootstrap_mesh_n_high, - mesh_n_low: libp2p_config.bootstrap_mesh_n_low, - mesh_outbound_min: libp2p_config.bootstrap_mesh_outbound_min, - mesh_n: libp2p_config.bootstrap_mesh_n, - }, - NetworkNodeType::Regular => MeshParams { - mesh_n_high: libp2p_config.mesh_n_high, - mesh_n_low: libp2p_config.mesh_n_low, - mesh_outbound_min: libp2p_config.mesh_outbound_min, - mesh_n: libp2p_config.mesh_n, - }, - NetworkNodeType::Conductor => unreachable!(), -}; + // NOTE I'm arbitrarily choosing these. + match node_type { + NetworkNodeType::Bootstrap => MeshParams { + mesh_n_high: libp2p_config.bootstrap_mesh_n_high, + mesh_n_low: libp2p_config.bootstrap_mesh_n_low, + mesh_outbound_min: libp2p_config.bootstrap_mesh_outbound_min, + mesh_n: libp2p_config.bootstrap_mesh_n, + }, + NetworkNodeType::Regular => MeshParams { + mesh_n_high: libp2p_config.mesh_n_high, + mesh_n_low: libp2p_config.mesh_n_low, + mesh_outbound_min: libp2p_config.mesh_outbound_min, + mesh_n: libp2p_config.mesh_n, + }, + NetworkNodeType::Conductor => unreachable!(), + }; config_builder.mesh_params(Some(mesh_params)); let mut all_keys = BTreeSet::new(); @@ -285,6 +285,7 @@ match node_type { // NOTE: this introduces an invariant that the keys are assigned using this indexed // function all_keys, + None, da_keys.clone(), da_keys.contains(&pub_key), ) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 69a10e582a..edbd21e51b 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -38,6 +38,9 @@ use hotshot_task::{ }; use hotshot_task_impls::{events::HotShotEvent, network::NetworkTaskKind}; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::traits::node_implementation::ChannelMaps; + use hotshot_types::{ consensus::{Consensus, ConsensusMetricsValue, PayloadStore, View, ViewInner, ViewQueue}, data::Leaf, @@ -69,9 +72,6 @@ use std::{ use tasks::add_vid_task; use tracing::{debug, error, info, instrument, trace, warn}; -#[cfg(feature = "hotshot-testing")] -use hotshot_types::traits::node_implementation::ChannelMaps; - // -- Rexports // External /// Reexport rand crate diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index d5a0c0ae73..81471dfe1f 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -20,6 +20,8 @@ use futures::join; use async_compatibility_layer::channel::UnboundedSendError; use hotshot_task::{boxed_sync, BoxSyncFuture}; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::traits::network::{NetworkReliability, TestableNetworkingImplementation}; use hotshot_types::{ data::ViewNumber, message::Message, @@ -27,8 +29,7 @@ use hotshot_types::{ election::Membership, network::{ CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, - TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, - ViewMessage, + TestableChannelImplementation, TransmitType, ViewMessage, }, node_implementation::NodeType, }, @@ -144,6 +145,7 @@ pub struct CombinedNetworks( pub Libp2pNetwork, TYPES::SignatureKey>, ); +#[cfg(feature = "hotshot-testing")] impl TestableNetworkingImplementation for CombinedNetworks { fn generator( expected_node_count: usize, @@ -151,6 +153,7 @@ impl TestableNetworkingImplementation for CombinedNetwor network_id: usize, da_committee_size: usize, is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { let generators = ( TestableNetworkingImplementation for CombinedNetwor num_bootstrap, network_id, da_committee_size, - is_da + is_da, + None, ), , TYPES::SignatureKey> as TestableNetworkingImplementation<_>>::generator( expected_node_count, num_bootstrap, network_id, da_committee_size, - is_da + is_da, + reliability_config, ) ); Box::new(move |node_id| CombinedNetworks(generators.0(node_id), generators.1(node_id))) @@ -181,6 +186,7 @@ impl TestableNetworkingImplementation for CombinedNetwor } } +#[cfg(feature = "hotshot-testing")] impl TestableNetworkingImplementation for CombinedCommChannel { fn generator( expected_node_count: usize, @@ -188,6 +194,7 @@ impl TestableNetworkingImplementation for CombinedCommCh network_id: usize, da_committee_size: usize, is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { let generator = as TestableNetworkingImplementation<_>>::generator( expected_node_count, @@ -195,6 +202,7 @@ impl TestableNetworkingImplementation for CombinedCommCh network_id, da_committee_size, is_da, + reliability_config, ); Box::new(move |node_id| Self { networks: generator(node_id).into(), diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index ea5dd2aa2d..8b633139af 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -2,8 +2,10 @@ //! This module provides a libp2p based networking implementation where each node in the //! network forms a tcp or udp connection to a subset of other nodes in the network use super::NetworkingMetricsValue; +#[cfg(feature = "hotshot-testing")] +use async_compatibility_layer::art::async_block_on; use async_compatibility_layer::{ - art::{async_block_on, async_sleep, async_spawn}, + art::{async_sleep, async_spawn}, channel::{unbounded, UnboundedReceiver, UnboundedSendError, UnboundedSender}, }; use async_lock::RwLock; @@ -12,6 +14,8 @@ use bimap::BiHashMap; use bincode::Options; use hotshot_constants::LOOK_AHEAD; use hotshot_task::{boxed_sync, BoxSyncFuture}; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::traits::network::{NetworkReliability, TestableNetworkingImplementation}; use hotshot_types::{ data::ViewNumber, message::{Message, MessageKind}, @@ -19,34 +23,36 @@ use hotshot_types::{ election::Membership, network::{ CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, - NetworkError, NetworkMsg, TestableChannelImplementation, - TestableNetworkingImplementation, TransmitType, ViewMessage, + NetworkError, NetworkMsg, TestableChannelImplementation, TransmitType, ViewMessage, }, node_implementation::NodeType, signature_key::SignatureKey, state::ConsensusTime, }, }; + use hotshot_utils::bincode::bincode_opts; use libp2p_identity::PeerId; +#[cfg(feature = "hotshot-testing")] +use libp2p_networking::network::{MeshParams, NetworkNodeConfigBuilder}; + use libp2p_networking::{ network::{ - MeshParams, NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg}, - NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeHandleError, - NetworkNodeType, + NetworkNodeConfig, NetworkNodeHandle, NetworkNodeHandleError, NetworkNodeType, }, reexport::Multiaddr, }; use serde::Serialize; use snafu::ResultExt; +#[cfg(feature = "hotshot-testing")] +use std::{collections::HashSet, num::NonZeroUsize, str::FromStr}; + use std::{ - collections::{BTreeSet, HashSet}, + collections::BTreeSet, fmt::Debug, marker::PhantomData, - num::NonZeroUsize, - str::FromStr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -116,6 +122,9 @@ struct Libp2pNetworkInner { /// NOTE: supposed to represent a ViewNumber but we /// haven't made that atomic yet and we prefer lock-free latest_seen_view: Arc, + #[cfg(feature = "hotshot-testing")] + /// reliability_config + reliability_config: Option>, /// if we're a member of the DA committee or not is_da: bool, } @@ -128,6 +137,7 @@ pub struct Libp2pNetwork { inner: Arc>, } +#[cfg(feature = "hotshot-testing")] impl TestableNetworkingImplementation for Libp2pNetwork, TYPES::SignatureKey> where @@ -149,6 +159,7 @@ where _network_id: usize, da_committee_size: usize, _is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { assert!( da_committee_size <= expected_node_count, @@ -234,6 +245,7 @@ where let bootstrap_addrs_ref = bootstrap_addrs.clone(); let keys = all_keys.clone(); let da = da_keys.clone(); + let reliability_config_dup = reliability_config.clone(); async_block_on(async move { match Libp2pNetwork::new( NetworkingMetricsValue::default(), @@ -243,6 +255,8 @@ where num_bootstrap, usize::try_from(node_id).unwrap(), keys, + #[cfg(feature = "hotshot-testing")] + reliability_config_dup, da.clone(), da.contains(&pubkey), ) @@ -297,6 +311,7 @@ impl Libp2pNetwork { id: usize, // HACK committee_pks: BTreeSet, + #[cfg(feature = "hotshot-testing")] reliability_config: Option>, da_pks: BTreeSet, is_da: bool, ) -> Result, NetworkError> { @@ -356,6 +371,8 @@ impl Libp2pNetwork { // proposals on". We need this because to have consensus info injected we need a working // network already. In the worst case, we send a few lookups we don't need. latest_seen_view: Arc::new(AtomicU64::new(0)), + #[cfg(feature = "hotshot-testing")] + reliability_config, is_da, }), }; @@ -623,6 +640,40 @@ impl ConnectedNetwork for Libp2p .map_err(|_| NetworkError::ShutDown)?; } + // NOTE: metrics is threadsafe, so clone is fine (and lightweight) + #[cfg(feature = "hotshot-testing")] + { + let metrics = self.inner.metrics.clone(); + if let Some(ref config) = &self.inner.reliability_config { + let handle = self.inner.handle.clone(); + + let serialized_msg = bincode_opts() + .serialize(&message) + .context(FailedToSerializeSnafu)?; + let fut = config.clone().chaos_send_msg( + serialized_msg, + Arc::new(move |msg: Vec| { + let topic_2 = topic.clone(); + let handle_2 = handle.clone(); + let metrics_2 = metrics.clone(); + boxed_sync(async move { + match handle_2.gossip_no_serialize(topic_2, msg).await { + Err(e) => { + metrics_2.message_failed_to_send.add(1); + warn!("Failed to broadcast to libp2p: {:?}", e); + } + Ok(()) => { + metrics_2.outgoing_direct_message_count.add(1); + } + } + }) + }), + ); + async_spawn(fut); + return Ok(()); + } + } + match self.inner.handle.gossip(topic, &message).await { Ok(()) => { self.inner.metrics.outgoing_broadcast_message_count.add(1); @@ -671,16 +722,42 @@ impl ConnectedNetwork for Libp2p } }; - match self.inner.handle.direct_request(pid, &message).await { - Ok(()) => { - self.inner.metrics.outgoing_direct_message_count.add(1); - Ok(()) - } - Err(e) => { - self.inner.metrics.message_failed_to_send.add(1); - Err(e.into()) + #[cfg(feature = "hotshot-testing")] + { + let metrics = self.inner.metrics.clone(); + if let Some(ref config) = &self.inner.reliability_config { + let handle = self.inner.handle.clone(); + + let serialized_msg = bincode_opts() + .serialize(&message) + .context(FailedToSerializeSnafu)?; + let fut = config.clone().chaos_send_msg( + serialized_msg, + Arc::new(move |msg: Vec| { + let handle_2 = handle.clone(); + let metrics_2 = metrics.clone(); + boxed_sync(async move { + match handle_2.direct_request_no_serialize(pid, msg).await { + Err(e) => { + metrics_2.message_failed_to_send.add(1); + warn!("Failed to broadcast to libp2p: {:?}", e); + } + Ok(()) => { + metrics_2.outgoing_direct_message_count.add(1); + } + } + }) + }), + ); + async_spawn(fut); + return Ok(()); } } + + match self.inner.handle.direct_request(pid, &message).await { + Ok(()) => Ok(()), + Err(e) => Err(e.into()), + } } #[instrument(name = "Libp2pNetwork::recv_msgs", skip_all)] @@ -778,6 +855,7 @@ impl Libp2pCommChannel { } } +#[cfg(feature = "hotshot-testing")] impl TestableNetworkingImplementation for Libp2pCommChannel where MessageKind: ViewMessage, @@ -797,6 +875,7 @@ where network_id: usize, da_committee_size: usize, is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { let generator = , @@ -806,7 +885,8 @@ where num_bootstrap, network_id, da_committee_size, - is_da + is_da, + reliability_config ); Box::new(move |node_id| Self(generator(node_id).into(), PhantomData)) } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 48ebfe1719..3e28a5871e 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -92,7 +92,7 @@ struct MemoryNetworkInner { metrics: NetworkingMetricsValue, /// config to introduce unreliability to the network - reliability_config: Option>>, + reliability_config: Option>, } /// In memory only network simulator. @@ -123,7 +123,7 @@ impl MemoryNetwork { pub_key: K, metrics: NetworkingMetricsValue, master_map: Arc>, - reliability_config: Option>>, + reliability_config: Option>, ) -> MemoryNetwork { info!("Attaching new MemoryNetwork"); let (broadcast_input, broadcast_task_recv) = bounded(128); @@ -249,6 +249,7 @@ impl TestableNetworkingImplementation _network_id: usize, _da_committee_size: usize, _is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { let master: Arc<_> = MasterMap::new(); // We assign known_nodes' public key and stake value rather than read from config file since it's a test @@ -259,7 +260,7 @@ impl TestableNetworkingImplementation pubkey, NetworkingMetricsValue::default(), master.clone(), - None, + reliability_config.clone(), ) }) } @@ -312,8 +313,7 @@ impl ConnectedNetwork for Memory continue; } trace!(?key, "Sending message to node"); - if let Some(r) = &self.inner.reliability_config { - let config = r.read().await; + if let Some(ref config) = &self.inner.reliability_config { { let node2 = node.clone(); let fut = config.chaos_send_msg( @@ -356,15 +356,14 @@ impl ConnectedNetwork for Memory trace!("Message bincoded, finding recipient"); if let Some(node) = self.inner.master_map.map.get(&recipient) { let node = node.value().clone(); - if let Some(r) = &self.inner.reliability_config { - let config = r.read().await; + if let Some(ref config) = &self.inner.reliability_config { { let fut = config.chaos_send_msg( vec.clone(), Arc::new(move |msg: Vec| { let node2 = node.clone(); boxed_sync(async move { - let _res = node2.broadcast_input(msg).await; + let _res = node2.direct_input(msg).await; // NOTE we're dropping metrics here but this is only for testing // purposes. I think that should be okay }) @@ -475,6 +474,7 @@ where network_id: usize, da_committee_size: usize, is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static> { let generator = , @@ -484,7 +484,8 @@ where num_bootstrap, network_id, da_committee_size, - is_da + is_da, + reliability_config, ); Box::new(move |node_id| Self(generator(node_id).into())) } diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 975db7666d..0ea946cf98 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -33,7 +33,7 @@ use std::hash::{Hash, Hasher}; use std::num::NonZeroUsize; use surf_disco::Url; -use hotshot_types::traits::network::ViewMessage; +use hotshot_types::traits::network::{NetworkReliability, ViewMessage}; use std::collections::BTreeMap; use std::{ collections::{btree_map::Entry, BTreeSet}, @@ -1259,6 +1259,7 @@ impl TestableNetworkingImplementation for WebServerNetwo _network_id: usize, _da_committee_size: usize, is_da: bool, + _reliability_config: Option>, ) -> Box Self + 'static> { let (server_shutdown_sender, server_shutdown) = oneshot(); let sender = Arc::new(server_shutdown_sender); @@ -1317,6 +1318,7 @@ impl TestableNetworkingImplementation for WebCommChannel network_id: usize, da_committee_size: usize, is_da: bool, + _reliability_config: Option>, ) -> Box Self + 'static> { let generator = as TestableNetworkingImplementation<_>>::generator( expected_node_count, @@ -1324,6 +1326,9 @@ impl TestableNetworkingImplementation for WebCommChannel network_id, da_committee_size, is_da, + // network reliability is a testing feature + // not yet implemented for webcommchannel + None, ); Box::new(move |node_id| Self(generator(node_id).into())) } diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs index 52f3946ecb..6f7d919eeb 100644 --- a/crates/hotshot/src/types/handle.rs +++ b/crates/hotshot/src/types/handle.rs @@ -13,8 +13,13 @@ use hotshot_task::{ BoxSyncFuture, }; use hotshot_task_impls::events::HotShotEvent; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::{ + message::{MessageKind, SequencingMessage}, + traits::election::Membership, +}; + use hotshot_types::simple_vote::QuorumData; -use hotshot_types::traits::election::Membership; use hotshot_types::{ consensus::Consensus, data::Leaf, @@ -26,9 +31,6 @@ use hotshot_types::{ use std::sync::Arc; use tracing::error; -#[cfg(feature = "hotshot-testing")] -use hotshot_types::message::{MessageKind, SequencingMessage}; - /// Event streaming handle for a [`SystemContext`] instance running in the background /// /// This type provides the means to message and interact with a background [`SystemContext`] instance, diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 5dce442809..8a24c00d26 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -458,9 +458,21 @@ impl NetworkNodeHandle { msg: &impl Serialize, ) -> Result<(), NetworkNodeHandleError> { let serialized_msg = bincode_opts().serialize(msg).context(SerializationSnafu)?; + self.direct_request_no_serialize(pid, serialized_msg).await + } + + /// Make a direct request to `peer_id` containing `msg` without serializing + /// # Errors + /// - Will return [`NetworkNodeHandleError::SendError`] when underlying `NetworkNode` has been killed + /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize `msg` + pub async fn direct_request_no_serialize( + &self, + pid: PeerId, + contents: Vec, + ) -> Result<(), NetworkNodeHandleError> { let req = ClientRequest::DirectRequest { pid, - contents: serialized_msg, + contents, retry_count: 1, }; self.send_request(req).await @@ -480,7 +492,7 @@ impl NetworkNodeHandle { self.send_request(req).await } - /// Forcefully disconnet from a peer + /// Forcefully disconnect from a peer /// # Errors /// If the channel is closed somehow /// Shouldnt' happen. @@ -502,7 +514,19 @@ impl NetworkNodeHandle { msg: &impl Serialize, ) -> Result<(), NetworkNodeHandleError> { let serialized_msg = bincode_opts().serialize(msg).context(SerializationSnafu)?; - let req = ClientRequest::GossipMsg(topic, serialized_msg); + self.gossip_no_serialize(topic, serialized_msg).await + } + + /// Gossip a message to peers without serializing + /// # Errors + /// - Will return [`NetworkNodeHandleError::SendError`] when underlying `NetworkNode` has been killed + /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize `msg` + pub async fn gossip_no_serialize( + &self, + topic: String, + msg: Vec, + ) -> Result<(), NetworkNodeHandleError> { + let req = ClientRequest::GossipMsg(topic, msg); self.send_request(req).await } diff --git a/crates/testing/src/overall_safety_task.rs b/crates/testing/src/overall_safety_task.rs index 81e36b5c92..d6d56d5f63 100644 --- a/crates/testing/src/overall_safety_task.rs +++ b/crates/testing/src/overall_safety_task.rs @@ -22,6 +22,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, }; +use tracing::error; use crate::{test_launcher::TaskGenerator, test_runner::Node}; /// convenience type alias for state and block @@ -266,6 +267,7 @@ impl RoundResult { let remaining_nodes = total_num_nodes - (num_decided + num_failed); if check_leaf && self.leaf_map.len() != 1 { + error!("LEAF MAP (that is mismatched) IS: {:?}", self.leaf_map); self.status = ViewStatus::Err(OverallSafetyTaskErr::MismatchedLeaf); return; } diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index ff0ff11413..6046253e26 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -1,4 +1,4 @@ -use hotshot::types::SignatureKey; +use hotshot::{traits::NetworkReliability, types::SignatureKey}; use hotshot_orchestrator::config::ValidatorConfigFile; use hotshot_types::traits::election::Membership; use std::{num::NonZeroUsize, sync::Arc, time::Duration}; @@ -59,6 +59,8 @@ pub struct TestMetadata { pub min_transactions: usize, /// timing data pub timing_data: TimingData, + /// unrelabile networking metadata + pub unreliable_network: Option>, /// view sync check task pub view_sync_properties: ViewSyncTaskDescription, } @@ -188,6 +190,7 @@ impl Default for TestMetadata { duration: Duration::from_millis(10000), }, ), + unreliable_network: None, view_sync_properties: ViewSyncTaskDescription::Threshold(0, num_nodes), } } @@ -216,6 +219,7 @@ impl TestMetadata { completion_task_description, overall_safety_properties, spinning_properties, + unreliable_network, view_sync_properties, .. } = self.clone(); @@ -292,6 +296,7 @@ impl TestMetadata { total_nodes, num_bootstrap_nodes, da_committee_size, + unreliable_network, ), storage: Box::new(|_| I::construct_tmp_storage().unwrap()), config, diff --git a/crates/testing/tests/unreliable_network.rs b/crates/testing/tests/unreliable_network.rs new file mode 100644 index 0000000000..c2e4a56249 --- /dev/null +++ b/crates/testing/tests/unreliable_network.rs @@ -0,0 +1,343 @@ +use hotshot_testing::test_builder::TimingData; +use hotshot_types::traits::network::AsynchronousNetwork; +use hotshot_types::traits::network::ChaosNetwork; +use hotshot_types::traits::network::PartiallySynchronousNetwork; +use hotshot_types::traits::network::SynchronousNetwork; +use std::time::Duration; +use std::time::Instant; + +use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{Libp2pImpl, TestTypes}, + overall_safety_task::OverallSafetyPropertiesDescription, + test_builder::TestMetadata, +}; +use tracing::instrument; + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn libp2p_network_sync() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + overall_safety_properties: OverallSafetyPropertiesDescription { + check_leaf: true, + ..Default::default() + }, + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::new(360, 0), + }, + ), + unreliable_network: Some(Box::new(SynchronousNetwork { + delay_high_ms: 30, + delay_low_ms: 4, + })), + ..TestMetadata::default_multiple_rounds() + }; + + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await +} + +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_memory_network_sync() { + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{MemoryImpl, TestTypes}, + test_builder::TestMetadata, + }; + use std::time::Duration; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(240), + }, + ), + unreliable_network: Some(Box::new(SynchronousNetwork { + delay_high_ms: 30, + delay_low_ms: 4, + })), + ..TestMetadata::default() + }; + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await; +} + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[ignore] +#[instrument] +async fn libp2p_network_async() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + overall_safety_properties: OverallSafetyPropertiesDescription { + check_leaf: true, + num_failed_views: 50, + ..Default::default() + }, + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::new(240, 0), + }, + ), + timing_data: TimingData { + timeout_ratio: (1, 1), + next_view_timeout: 1000, + ..TestMetadata::default_multiple_rounds().timing_data + }, + unreliable_network: Some(Box::new(AsynchronousNetwork { + keep_numerator: 9, + keep_denominator: 10, + delay_low_ms: 4, + delay_high_ms: 30, + })), + ..TestMetadata::default_multiple_rounds() + }; + + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await +} + +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[ignore] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_memory_network_async() { + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{MemoryImpl, TestTypes}, + test_builder::TestMetadata, + }; + use std::time::Duration; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + overall_safety_properties: OverallSafetyPropertiesDescription { + check_leaf: true, + num_failed_views: 5000, + ..Default::default() + }, + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(240), + }, + ), + timing_data: TimingData { + timeout_ratio: (1, 1), + next_view_timeout: 1000, + ..TestMetadata::default_multiple_rounds().timing_data + }, + unreliable_network: Some(Box::new(AsynchronousNetwork { + keep_numerator: 95, + keep_denominator: 100, + delay_low_ms: 4, + delay_high_ms: 30, + })), + ..TestMetadata::default() + }; + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await; +} + +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_memory_network_partially_sync() { + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{MemoryImpl, TestTypes}, + test_builder::TestMetadata, + }; + use std::time::Duration; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(240), + }, + ), + unreliable_network: Some(Box::new(PartiallySynchronousNetwork { + asynchronous: AsynchronousNetwork { + keep_numerator: 8, + keep_denominator: 10, + delay_low_ms: 4, + delay_high_ms: 30, + }, + synchronous: SynchronousNetwork { + delay_high_ms: 30, + delay_low_ms: 4, + }, + gst: std::time::Duration::from_millis(1000), + start: Instant::now(), + })), + ..TestMetadata::default() + }; + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await; +} + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[instrument] +async fn libp2p_network_partially_sync() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + overall_safety_properties: OverallSafetyPropertiesDescription { + check_leaf: true, + ..Default::default() + }, + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::new(240, 0), + }, + ), + unreliable_network: Some(Box::new(PartiallySynchronousNetwork { + asynchronous: AsynchronousNetwork { + keep_numerator: 8, + keep_denominator: 10, + delay_low_ms: 4, + delay_high_ms: 30, + }, + synchronous: SynchronousNetwork { + delay_high_ms: 30, + delay_low_ms: 4, + }, + gst: std::time::Duration::from_millis(1000), + start: Instant::now(), + })), + ..TestMetadata::default_multiple_rounds() + }; + + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await +} + +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[ignore] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_memory_network_chaos() { + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{MemoryImpl, TestTypes}, + test_builder::TestMetadata, + }; + use std::time::Duration; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(240), + }, + ), + unreliable_network: Some(Box::new(ChaosNetwork { + keep_numerator: 8, + keep_denominator: 10, + delay_low_ms: 4, + delay_high_ms: 30, + repeat_low: 1, + repeat_high: 5, + })), + ..TestMetadata::default() + }; + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await; +} + +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[ignore] +#[instrument] +async fn libp2p_network_chaos() { + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let metadata = TestMetadata { + overall_safety_properties: OverallSafetyPropertiesDescription { + check_leaf: true, + ..Default::default() + }, + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::new(240, 0), + }, + ), + unreliable_network: Some(Box::new(ChaosNetwork { + keep_numerator: 8, + keep_denominator: 10, + delay_low_ms: 4, + delay_high_ms: 30, + repeat_low: 1, + repeat_high: 5, + })), + ..TestMetadata::default_multiple_rounds() + }; + + metadata + .gen_launcher::(0) + .launch() + .run_test() + .await +} diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index df1bea4bbd..3ef3edc00f 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -5,6 +5,7 @@ use async_compatibility_layer::art::async_sleep; #[cfg(async_executor_impl = "async-std")] use async_std::future::TimeoutError; +use dyn_clone::DynClone; use hotshot_task::{boxed_sync, BoxSyncFuture}; use libp2p_networking::network::NetworkNodeHandleError; #[cfg(async_executor_impl = "tokio")] @@ -360,6 +361,7 @@ pub trait TestableNetworkingImplementation { network_id: usize, da_committee_size: usize, is_da: bool, + reliability_config: Option>, ) -> Box Self + 'static>; /// Get the number of messages in-flight. @@ -387,7 +389,7 @@ pub enum NetworkChange { /// interface describing how reliable the network is #[async_trait] -pub trait NetworkReliability: Debug + Sync + std::marker::Send { +pub trait NetworkReliability: Debug + Sync + std::marker::Send + DynClone + 'static { /// Sample from bernoulli distribution to decide whether /// or not to keep a packet /// # Panics @@ -420,6 +422,9 @@ pub trait NetworkReliability: Debug + Sync + std::marker::Send { /// whether or not to send duplicates /// and whether or not to include noise with the message /// then send the message + /// note: usually self is stored in a rwlock + /// so instead of doing the sending part, we just fiddle with the message + /// then return a future that does the sending and delaying fn chaos_send_msg( &self, msg: Vec, @@ -445,6 +450,9 @@ pub trait NetworkReliability: Debug + Sync + std::marker::Send { } } +// hack to get clone +dyn_clone::clone_trait_object!(NetworkReliability); + /// ideal network #[derive(Clone, Copy, Debug, Default)] pub struct PerfectNetwork {} @@ -455,10 +463,10 @@ impl NetworkReliability for PerfectNetwork {} /// to arrive within `timeout` ns #[derive(Clone, Copy, Debug, Default)] pub struct SynchronousNetwork { - /// Max delay of packet before arrival - timeout_ms: u64, + /// Max value in milliseconds that a packet may be delayed + pub delay_high_ms: u64, /// Lowest value in milliseconds that a packet may be delayed - delay_low_ms: u64, + pub delay_low_ms: u64, } impl NetworkReliability for SynchronousNetwork { @@ -468,7 +476,7 @@ impl NetworkReliability for SynchronousNetwork { } fn sample_delay(&self) -> Duration { Duration::from_millis( - Uniform::new_inclusive(self.delay_low_ms, self.timeout_ms) + Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms) .sample(&mut rand::thread_rng()), ) } @@ -482,13 +490,13 @@ impl NetworkReliability for SynchronousNetwork { #[derive(Debug, Clone, Copy)] pub struct AsynchronousNetwork { /// numerator for probability of keeping packets - keep_numerator: u32, + pub keep_numerator: u32, /// denominator for probability of keeping packets - keep_denominator: u32, + pub keep_denominator: u32, /// lowest value in milliseconds that a packet may be delayed - delay_low_ms: u64, + pub delay_low_ms: u64, /// highest value in milliseconds that a packet may be delayed - delay_high_ms: u64, + pub delay_high_ms: u64, } impl NetworkReliability for AsynchronousNetwork { @@ -512,13 +520,13 @@ impl NetworkReliability for AsynchronousNetwork { #[derive(Debug, Clone, Copy)] pub struct PartiallySynchronousNetwork { /// asynchronous portion of network - asynchronous: AsynchronousNetwork, + pub asynchronous: AsynchronousNetwork, /// synchronous portion of network - synchronous: SynchronousNetwork, + pub synchronous: SynchronousNetwork, /// time when GST occurs - gst: std::time::Duration, + pub gst: std::time::Duration, /// when the network was started - start: std::time::Instant, + pub start: std::time::Instant, } impl NetworkReliability for PartiallySynchronousNetwork { @@ -570,7 +578,7 @@ impl SynchronousNetwork { #[must_use] pub fn new(timeout: u64, delay_low_ms: u64) -> Self { SynchronousNetwork { - timeout_ms: timeout, + delay_high_ms: timeout, delay_low_ms, } } @@ -613,6 +621,37 @@ impl PartiallySynchronousNetwork { } /// A chaotic network using all the networking calls +#[derive(Debug, Clone)] pub struct ChaosNetwork { - // TODO + /// numerator for probability of keeping packets + pub keep_numerator: u32, + /// denominator for probability of keeping packets + pub keep_denominator: u32, + /// lowest value in milliseconds that a packet may be delayed + pub delay_low_ms: u64, + /// highest value in milliseconds that a packet may be delayed + pub delay_high_ms: u64, + /// lowest value of repeats for a message + pub repeat_low: usize, + /// highest value of repeats for a message + pub repeat_high: usize, +} + +impl NetworkReliability for ChaosNetwork { + fn sample_keep(&self) -> bool { + Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator) + .unwrap() + .sample(&mut rand::thread_rng()) + } + + fn sample_delay(&self) -> Duration { + Duration::from_millis( + Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms) + .sample(&mut rand::thread_rng()), + ) + } + + fn sample_repeat(&self) -> usize { + Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng()) + } } diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 7948d75bf1..82a7aa2afc 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -6,7 +6,7 @@ use super::{ block_contents::{BlockHeader, Transaction}, election::ElectionConfig, - network::{CommunicationChannel, TestableNetworkingImplementation}, + network::{CommunicationChannel, NetworkReliability, TestableNetworkingImplementation}, state::{ConsensusTime, TestableBlock, TestableState}, storage::{StorageError, StorageState, TestableStorage}, State, @@ -172,6 +172,7 @@ pub trait TestableNodeImplementation: NodeImplementation expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, + reliability_config: Option>, ) -> Box (Self::QuorumNetwork, Self::CommitteeNetwork)>; } @@ -230,6 +231,7 @@ where expected_node_count: usize, num_bootstrap: usize, da_committee_size: usize, + reliability_config: Option>, ) -> Box (Self::QuorumNetwork, Self::CommitteeNetwork)> { let network_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( expected_node_count, @@ -237,6 +239,7 @@ where 0, da_committee_size, false, + reliability_config.clone(), ); let da_generator = <>::NETWORK as TestableNetworkingImplementation>::generator( expected_node_count, @@ -244,6 +247,7 @@ where 1, da_committee_size, true, + reliability_config ); Box::new(move |id| { diff --git a/justfile b/justfile index 26d18eb88b..b8f04cf576 100644 --- a/justfile +++ b/justfile @@ -27,14 +27,17 @@ async := "async_std" export RUST_MIN_STACK=4194304 RUSTDOCFLAGS='-D warnings --cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustdocflags}}' RUSTFLAGS='--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std" {{original_rustflags}}' && just {{target}} {{ARGS}} build: - cargo build --workspace --examples --bins --tests --lib --benches + cargo build --workspace --examples --bins --tests --lib --benches --release + +build_release: + cargo build --package hotshot --no-default-features --features="docs, doc-images" example *ARGS: cargo run --profile=release-lto --example {{ARGS}} -test: - echo Testing - cargo test --verbose --lib --bins --tests --benches --workspace --no-fail-fast -- --test-threads=1 --nocapture --skip crypto_test +test *ARGS: + echo Testing {{ARGS}} + cargo test --verbose --lib --bins --tests --benches --workspace --no-fail-fast {{ARGS}} -- --test-threads=1 --nocapture --skip crypto_test test_basic: test_success test_with_failures test_network_task test_consensus_task test_da_task test_vid_task test_view_sync_task @@ -113,7 +116,11 @@ check: lint: fmt echo linting - cargo clippy --workspace --bins --tests --examples -- -D warnings + cargo clippy --workspace --examples --bins --tests -- -D warnings + +lint_release: fmt + echo linting + cargo clippy --package hotshot --no-default-features --features="docs, doc-images" -- -D warnings fmt: echo Running cargo fmt