diff --git a/Cargo.lock b/Cargo.lock index 552840681b..1cd3fe60dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" [[package]] name = "ark-bls12-381" @@ -473,9 +473,9 @@ dependencies = [ [[package]] name = "async-task" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d306121baf53310a3fd342d88dc0824f6bbeace68347593658525565abee8" +checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-trait" @@ -1862,9 +1862,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +checksum = "35e70ee094dc02fd9c13fdad4940090f22dbd6ac7c9e7094a46cf0232a50bc7c" [[package]] name = "itertools" @@ -2480,9 +2480,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de5435b8549c16d423ed0c03dbaafe57cf6c3344744f1242520d59c9d8ecec66" +checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" dependencies = [ "cc", "pkg-config", @@ -3440,9 +3440,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", diff --git a/libp2p-networking/README b/libp2p-networking/README.md similarity index 76% rename from libp2p-networking/README rename to libp2p-networking/README.md index f127b4639f..4c6f74f6e2 100644 --- a/libp2p-networking/README +++ b/libp2p-networking/README.md @@ -58,3 +58,18 @@ In the direct message case, the conductor will increment the state of a randomly In both cases, the test terminates as successful when the conductor receives the incremented state from all other nodes. Then, the conductor sends a special "kill" message to all known nodes and waits for them to disconnect. Metadata about the toplogy is currently read from an `identity_mapping.json` file that manually labels the type of node (bootstrap, regular, conductor). The conductor uses this to figure out information about all nodes in the network. The regular nodes use this to learn about their ip address and the addresses necessary to bootstrap onto the network. The boostrap nodes only use this to learn about their ip addresses. + +### Running counter multi-machine tests + +A sample invocation locally: + +```bash +# run each line in a separate terminal +nix develop -c cargo run --features webui --release --example counter -- --bound_addr 127.0.0.1:9000 --node_type Bootstrap --num_nodes 5 --bootstrap 127.0.0.1:9000 --webui 127.0.0.1:8000 +nix develop -c cargo run --features webui --release --example counter -- --bound_addr 127.0.0.1:9001 --node_type Regular --num_nodes 5 --bootstrap 127.0.0.1:9000 --webui 127.0.0.1:8001 +nix develop -c cargo run --features webui --release --example counter -- --bound_addr 127.0.0.1:9002 --node_type Regular --num_nodes 5 --bootstrap 127.0.0.1:9000 --webui 127.0.0.1:8002 +nix develop -c cargo run --features webui --release --example counter -- --bound_addr 127.0.0.1:9003 --node_type Regular --num_nodes 5 --bootstrap 127.0.0.1:9000 --webui 127.0.0.1:8003 +nix develop -c cargo run --features webui --release --example counter -- --bound_addr 127.0.0.1:9004 --node_type Conductor --num_nodes 5 --bootstrap 127.0.0.1:9000 --webui 127.0.0.1:8004 +``` + +To run on the AWS cluster, see [here](https://github.com/EspressoSystems/cloud-infrastructure/blob/c86873a5c647772836907fc206fce5702a5878bb/ansible/networking-demo/README.md). diff --git a/libp2p-networking/examples/common/mod.rs b/libp2p-networking/examples/common/mod.rs index 5db18a6953..e682b5bd8e 100644 --- a/libp2p-networking/examples/common/mod.rs +++ b/libp2p-networking/examples/common/mod.rs @@ -1,13 +1,16 @@ #[cfg(feature = "webui")] pub mod web; -use async_std::{ - fs::File, - task::{sleep, spawn}, +use async_std::task::{sleep, spawn}; + +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::{Arc, Once}, + time::Duration, }; -use futures::AsyncReadExt; -use libp2p::{gossipsub::Topic, request_response::ResponseChannel, PeerId}; -use networking_demo::parse_config::NodeDescription; + +use libp2p::{gossipsub::Topic, multiaddr, request_response::ResponseChannel, Multiaddr, PeerId}; use networking_demo::{ direct_message::DirectMessageResponse, network_node::{ @@ -24,11 +27,6 @@ use rand::{seq::IteratorRandom, thread_rng}; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::fmt::Debug; -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, Once}, - time::Duration, -}; use structopt::StructOpt; use tracing::instrument; @@ -82,10 +80,12 @@ pub enum CounterRequest { #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum Message { /// message to send from a peer to a peer - NormalMessage(CounterRequest), + Normal(CounterRequest), /// message a conductor sent to a node /// that the node must send to other node(s) - ConductorMessage(CounterRequest, ConductorMessageMethod), + Conductor(CounterRequest, ConductorMessageMethod), + // announce the conductor + ConductorIdIs(PeerId), } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] @@ -116,7 +116,7 @@ pub async fn handle_normal_msg( CounterRequest::AskForCounter => { if let Some(chan) = chan { let response = - Message::NormalMessage(CounterRequest::MyCounterIs(*handle.state.lock().await)); + Message::Normal(CounterRequest::MyCounterIs(*handle.state.lock().await)); let serialized_response = serialize_msg(&response).context(SerializationSnafu)?; println!("sending back reponse: {:?})", response); handle @@ -149,10 +149,23 @@ pub async fn regular_handle_network_event( GossipMsg(m) | DirectResponse(m, _) => { if let Ok(msg) = deserialize_msg::(&m) { match msg { - Message::NormalMessage(msg) => { + Message::ConductorIdIs(peerid) => { + handle + .send_network + .send_async(ClientRequest::IgnorePeers(vec![peerid])) + .await + .context(SendSnafu)?; + handle + .connection_state + .lock() + .await + .ignored_peers + .insert(peerid); + } + Message::Normal(msg) => { handle_normal_msg(handle.clone(), msg, None).await?; } - Message::ConductorMessage(..) => { + Message::Conductor(..) => { // do nothing. We only expect to be reached out to by the conductor via // direct message } @@ -162,14 +175,15 @@ pub async fn regular_handle_network_event( DirectRequest(msg, _peer_id, chan) => { if let Ok(msg) = deserialize_msg::(&msg) { match msg { - Message::NormalMessage(msg) => { + Message::ConductorIdIs(_) => {} + Message::Normal(msg) => { println!("recv-ed normal direct message {:?}", msg); handle_normal_msg(handle.clone(), msg, Some(chan)).await?; } - Message::ConductorMessage(msg, method) => { + Message::Conductor(msg, method) => { println!("recv-ed conductor message!"); - let serialized_msg = serialize_msg(&Message::NormalMessage(msg)) - .context(SerializationSnafu)?; + let serialized_msg = + serialize_msg(&Message::Normal(msg)).context(SerializationSnafu)?; match method { ConductorMessageMethod::Broadcast => { handle @@ -188,7 +202,7 @@ pub async fn regular_handle_network_event( .await .context(SendSnafu)?; let response = - serialize_msg(&Message::NormalMessage(CounterRequest::Recvd)) + serialize_msg(&Message::Normal(CounterRequest::Recvd)) .context(SerializationSnafu)?; handle .send_network @@ -213,107 +227,112 @@ pub async fn regular_handle_network_event( Ok(()) } +pub fn parse_node(s: &str) -> Result { + let mut i = s.split(':'); + let ip = i.next().ok_or(multiaddr::Error::InvalidMultiaddr)?; + let port = i.next().ok_or(multiaddr::Error::InvalidMultiaddr)?; + Multiaddr::from_str(&format!("/ip4/{}/tcp/{}", ip, port)) +} + #[derive(StructOpt)] pub struct CliOpt { - /// Which node to start. Will strong compare with the `multiaddr` value - #[structopt(long = "ip_addr", short = "ip")] - pub ip: Option, - /// Path to the node configuration file. Defaults to `./identity_mapping.json` - #[structopt(long = "toplogy_path", short = "path")] - pub path: Option, - + /// Path to the node configuration file + /// only should be provided for conductor node + // #[structopt(long = "inventory", short = "i")] + // pub inventory: Option, + #[structopt(long = "bootstrap")] + #[structopt(parse(try_from_str = parse_node))] + pub bootstrap_addrs: Vec, + #[structopt(long = "num_nodes")] + pub num_nodes: usize, + #[structopt(long = "node_type")] + pub node_type: NetworkNodeType, + #[structopt(long = "bound_addr")] + #[structopt(parse(try_from_str = parse_node))] + pub bound_addr: Multiaddr, #[cfg(feature = "webui")] /// If this value is set, a webserver will be spawned on this address with debug info #[structopt(long = "webui")] - pub webui: Option, -} - -pub async fn parse_config(path: Option) -> Result, CounterError> { - let mut f = File::open(&path.unwrap_or_else(|| "./identity_mapping.json".to_string())) - .await - .context(FileReadSnafu)?; - let mut s = String::new(); - f.read_to_string(&mut s).await.context(FileReadSnafu)?; - serde_json::from_str(&s).context(JsonParseSnafu) + pub webui_addr: Option, } -pub async fn start_main( - ip_addr: String, - path: Option, - #[cfg(feature = "webui")] webui_addr: Option, -) -> Result<(), CounterError> { +/// ['bootstrap_addrs`] list of bootstrap multiaddrs. Needed to bootstrap into network +/// [`num_nodes`] total number of nodes. Needed to create pruning rules +/// [`node_type`] the type of this node +/// ['bound_addr`] the address to bind to +pub async fn start_main(opts: CliOpt) -> Result<(), CounterError> { // FIXME can we pass in a function that returns an error type INIT.call_once(|| { color_eyre::install().unwrap(); tracing_setup::setup_tracing(); }); - let swarm_config = parse_config(path).await?; - - let ignored_peers = swarm_config + let bootstrap_nodes = opts + .bootstrap_addrs .iter() - .filter_map(|n| { - if n.node_type == NetworkNodeType::Conductor { - Some(n.identity.public().to_peer_id()) - } else { - None - } - }) - .collect::>(); - - let (idx, node_description) = swarm_config - .iter() - .enumerate() - .find(|(_, node)| node.multiaddr.clone().to_string().contains(&ip_addr)) - .unwrap(); - println!("found entry!: {idx}"); + .cloned() + .map(|a| (None, a)) + .collect::>(); - match node_description.node_type { + match opts.node_type { NetworkNodeType::Conductor => { let config = NetworkNodeConfigBuilder::default() - .bound_addr(node_description.bound_addr.clone()) - .min_num_peers(swarm_config.len() - 1) - .max_num_peers(swarm_config.len() - 1) + .bound_addr(opts.bound_addr) + .min_num_peers(opts.num_nodes - 1) + .max_num_peers(opts.num_nodes - 1) .node_type(NetworkNodeType::Conductor) - .identity(node_description.identity.clone()) - .ignored_peers(ignored_peers) + .ignored_peers(HashSet::new()) .build() .context(NodeConfigSnafu) .context(HandleSnafu)?; let handle = Arc::new( - NetworkNodeHandle::::new(config.clone(), idx) + NetworkNodeHandle::::new(config.clone(), 0) .await .context(HandleSnafu)?, ); #[cfg(feature = "webui")] - if let Some(addr) = webui_addr { + if let Some(addr) = opts.webui_addr { web::spawn_server(Arc::clone(&handle), addr); } - spin_up_swarm( - TIMEOUT, - swarm_config - .iter() - .map(|c| (Some(c.identity.public().to_peer_id()), c.multiaddr.clone())) - .collect::>(), - config, - idx, - &handle, - ) - .await - .context(HandleSnafu)?; + spin_up_swarm(TIMEOUT, bootstrap_nodes, config, 0, &handle) + .await + .context(HandleSnafu)?; spawn_handler(handle.clone(), conductor_handle_network_event).await; - // initialize the state of each node let mut state = handle.state.lock().await; - for (i, connection) in swarm_config.iter().enumerate() { - if i != idx { - state.insert(connection.identity.public().to_peer_id(), 0); + for a_peer in handle.connection_state.lock().await.known_peers.clone() { + if a_peer != handle.peer_id && state.get(&a_peer).is_none() { + state.insert(a_peer, 0); } } drop(state); handle.notify_webui().await; + let handle_dup = handle.clone(); + let conductor_peerid = handle.peer_id; + // the "conductor id" + // periodically say "ignore me!" + spawn(async move { + let msg = Message::ConductorIdIs(conductor_peerid); + let serialized_msg = serialize_msg(&msg) + .context(SerializationSnafu) + .context(HandleSnafu)?; + while !*handle_dup.killed.lock().await { + sleep(Duration::from_secs(1)).await; + handle_dup + .send_network + .send_async(ClientRequest::GossipMsg( + Topic::new("global"), + serialized_msg.clone(), + )) + .await + .context(SendSnafu) + .context(HandleSnafu)?; + } + Ok::<(), CounterError>(()) + }); + for i in 0..5 { conductor_broadcast(TIMEOUT, i, handle.clone()) .await @@ -326,7 +345,7 @@ pub async fn start_main( .context(HandleSnafu)? } - let kill_msg = Message::NormalMessage(CounterRequest::Kill); + let kill_msg = Message::Normal(CounterRequest::Kill); let serialized_kill_msg = serialize_msg(&kill_msg) .context(SerializationSnafu) .context(HandleSnafu)?; @@ -341,6 +360,7 @@ pub async fn start_main( .context(SendSnafu) .context(HandleSnafu)? } + while !handle .connection_state .lock() @@ -348,70 +368,56 @@ pub async fn start_main( .connected_peers .is_empty() {} - - // FIXME - // we need one other primitive here: - // - tell a node to tell all other nodes to increment state with direct message } - NetworkNodeType::Bootstrap | NetworkNodeType::Regular => { - let known_peers = swarm_config - .iter() - .filter_map(|x| { - // TODO this is gross. Make this a data structure - if x.node_type == NetworkNodeType::Bootstrap { - Some((Some(x.identity.public().to_peer_id()), x.bound_addr.clone())) - } else { - None - } - }) - .collect::>(); + // regular and bootstrap nodes + NetworkNodeType::Regular | NetworkNodeType::Bootstrap => { let config = NetworkNodeConfigBuilder::default() - .bound_addr(node_description.bound_addr.clone()) - .identity(node_description.identity.clone()) - .ignored_peers(ignored_peers) - .min_num_peers(swarm_config.len() / 4) - .max_num_peers(swarm_config.len() / 2) - .node_type(node_description.node_type) + .bound_addr(opts.bound_addr) + .ignored_peers(HashSet::new()) + .min_num_peers(opts.num_nodes / 4) + .max_num_peers(opts.num_nodes / 2) + .node_type(opts.node_type) .build() .context(NodeConfigSnafu) .context(HandleSnafu)?; let handle = Arc::new( - NetworkNodeHandle::::new(config.clone(), idx) + NetworkNodeHandle::::new(config.clone(), 0) .await .context(HandleSnafu)?, ); #[cfg(feature = "webui")] - if let Some(addr) = webui_addr { + if let Some(addr) = opts.webui_addr { web::spawn_server(Arc::clone(&handle), addr); } - spin_up_swarm(TIMEOUT, known_peers, config, idx, &handle) + spin_up_swarm(TIMEOUT, bootstrap_nodes, config, 0, &handle) .await .context(HandleSnafu)?; let handle_dup = handle.clone(); // periodically broadcast state back to conductor node spawn(async move { - // FIXME map option to error - let conductor_id = swarm_config - .iter() - .find(|c| c.node_type == NetworkNodeType::Conductor) - .unwrap() - .identity - .public() - .to_peer_id(); while !*handle_dup.killed.lock().await { - sleep(Duration::from_secs(1)).await; - let counter = *handle_dup.state.lock().await; - let msg = Message::NormalMessage(CounterRequest::MyCounterIs(counter)); - let serialized_msg = serialize_msg(&msg) - .context(SerializationSnafu) - .context(HandleSnafu)?; - handle_dup - .send_network - .send_async(ClientRequest::DirectRequest(conductor_id, serialized_msg)) + if let Some(conductor_id) = handle_dup + .connection_state + .lock() .await - .context(SendSnafu) - .context(HandleSnafu)?; + .ignored_peers + .iter() + .next() + { + let counter = *handle_dup.state.lock().await; + let msg = Message::Normal(CounterRequest::MyCounterIs(counter)); + let serialized_msg = serialize_msg(&msg) + .context(SerializationSnafu) + .context(HandleSnafu)?; + handle_dup + .send_network + .send_async(ClientRequest::DirectRequest(*conductor_id, serialized_msg)) + .await + .context(SendSnafu) + .context(HandleSnafu)?; + } + sleep(Duration::from_secs(1)).await; } Ok::<(), CounterError>(()) }); @@ -450,7 +456,7 @@ pub async fn conductor_direct_message( }); // dispatch message - let msg = Message::NormalMessage(CounterRequest::IncrementCounter { + let msg = Message::Normal(CounterRequest::IncrementCounter { from: state, to: new_state, }); @@ -483,7 +489,7 @@ pub async fn conductor_direct_message( remaining_nodes.remove(chosen_peer); for peer in &remaining_nodes { - let msg = Message::ConductorMessage( + let msg = Message::Conductor( CounterRequest::AskForCounter, ConductorMessageMethod::DirectMessage(*chosen_peer), ); @@ -522,7 +528,7 @@ pub async fn conductor_broadcast( }; println!("broadcasting message!"); // broadcast message - let msg = Message::ConductorMessage(request.clone(), ConductorMessageMethod::Broadcast); + let msg = Message::Conductor(request.clone(), ConductorMessageMethod::Broadcast); let serialized_msg = serialize_msg(&msg).context(SerializationSnafu)?; handle .send_network @@ -538,7 +544,7 @@ pub async fn conductor_broadcast( state.iter().all(|(_, &s)| s == new_state) }); - let msg_direct = Message::NormalMessage(request); + let msg_direct = Message::Normal(request); let serialized_msg_direct = serialize_msg(&msg_direct).context(SerializationSnafu)?; handle .send_network @@ -571,7 +577,7 @@ pub async fn conductor_handle_network_event( DirectRequest(m, peer_id, _chan) => { if let Ok(msg) = deserialize_msg::(&m) { match msg { - Message::NormalMessage(msg) => { + Message::Normal(msg) => { if let CounterRequest::MyCounterIs(state) = msg { let _old_state = (*handle.state.lock().await) .insert(peer_id, state) @@ -581,10 +587,11 @@ pub async fn conductor_handle_network_event( handle.notify_webui().await; } } - Message::ConductorMessage(..) => { + Message::Conductor(..) => { /* This should also never happen ... */ unreachable!() } + Message::ConductorIdIs(_) => {} } } } @@ -607,6 +614,5 @@ pub async fn conductor_handle_network_event( pub enum CounterError { Handle { source: NetworkNodeHandleError }, FileRead { source: std::io::Error }, - JsonParse { source: serde_json::Error }, MissingBootstrap, } diff --git a/libp2p-networking/examples/common/web.rs b/libp2p-networking/examples/common/web.rs index dfcbd097c0..9601f67461 100644 --- a/libp2p-networking/examples/common/web.rs +++ b/libp2p-networking/examples/common/web.rs @@ -17,9 +17,12 @@ where let mut tide = tide::with_state(state); // Unwrap this in the calling thread so that if it fails we fail completely // instead of not knowing why the web UI does not work - tide.at("/") - .serve_file("web/index.html") - .expect("Could not register web/index.html"); + tide.at("/").get(|_| async move { + Ok(tide::Response::builder(200) + .content_type(tide::http::mime::HTML) + .body(include_str!("../../web/index.html")) + .build()) + }); tide.at("/sse").get(tide::sse::endpoint( |req: tide::Request>>, sender| async move { let peer_addr = req.peer_addr(); diff --git a/libp2p-networking/examples/counter.rs b/libp2p-networking/examples/counter.rs index 4ae9456f81..900c9b1d77 100644 --- a/libp2p-networking/examples/counter.rs +++ b/libp2p-networking/examples/counter.rs @@ -9,13 +9,7 @@ pub mod common; #[instrument] async fn main() -> Result<()> { let args = CliOpt::from_args(); - start_main( - args.ip.unwrap(), - args.path, - #[cfg(feature = "webui")] - args.webui, - ) - .await?; + start_main(args).await?; // optional UI perhaps? for monitoring purposes Ok(()) diff --git a/libp2p-networking/examples/identities.rs b/libp2p-networking/examples/identities.rs deleted file mode 100644 index d68ef1bbb2..0000000000 --- a/libp2p-networking/examples/identities.rs +++ /dev/null @@ -1,27 +0,0 @@ -use async_std::fs::File; -use futures::AsyncReadExt; -use libp2p::identity::Keypair; -use networking_demo::parse_config::NodeDescription; -use tracing::instrument; - -#[async_std::main] -#[instrument] -async fn main() { - parse_ids().await; -} - -pub async fn parse_ids() { - let mut f = File::open(&"./identity_mapping.json").await.unwrap(); - let mut s = String::new(); - f.read_to_string(&mut s).await.unwrap(); - println!("s{}", s); - let _: Vec = serde_json::from_str(&s).unwrap(); -} - -pub async fn gen_ids() { - for _i in 0..10 { - let identity = Keypair::generate_ed25519(); - let pbuf_encoding = identity.to_protobuf_encoding().unwrap(); - println!("{:?}", pbuf_encoding); - } -} diff --git a/libp2p-networking/flake.nix b/libp2p-networking/flake.nix index c69a8c434e..c01e94183c 100644 --- a/libp2p-networking/flake.nix +++ b/libp2p-networking/flake.nix @@ -121,7 +121,7 @@ defaultPackage = self.packages.${system}.${crateName}; - devShell = pkgs.mkShell { + devShells.staticShell = pkgs.mkShell { shellHook = '' ulimit -n 1024 export RUSTFLAGS='-C target-feature=+crt-static' @@ -131,6 +131,11 @@ with pkgs; [ fenix.packages.${system}.rust-analyzer fenixStable ] ++ buildDeps; }; + devShell = pkgs.mkShell { + buildInputs = + with pkgs; [ fenix.packages.${system}.rust-analyzer fenixStable ] ++ buildDeps; + }; + devShells.perfShell = pkgs.mkShell { buildInputs = with pkgs; [ grcov recent_flamegraph fd fenixNightly fenix.packages.${system}.rust-analyzer ] ++ buildDeps; shellHook = '' diff --git a/libp2p-networking/src/network_node.rs b/libp2p-networking/src/network_node.rs index c46d846f88..429b5c33e4 100644 --- a/libp2p-networking/src/network_node.rs +++ b/libp2p-networking/src/network_node.rs @@ -7,6 +7,8 @@ use rand::{seq::IteratorRandom, thread_rng}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; +use std::str::FromStr; + use std::{ collections::HashSet, io::Error, @@ -137,8 +139,10 @@ pub struct ConnectionData { pub connected_peers: HashSet, /// set of peers that were at one point connected pub connecting_peers: HashSet, - /// set of events to send to client + /// set of known peers pub known_peers: HashSet, + /// set of peers that are immune to pruning + pub ignored_peers: HashSet, } impl NetworkDef { @@ -286,6 +290,22 @@ pub enum NetworkNodeType { Conductor, } +impl FromStr for NetworkNodeType { + type Err = String; + + fn from_str(input: &str) -> Result { + match input { + "Conductor" => Ok(NetworkNodeType::Conductor), + "Regular" => Ok(NetworkNodeType::Regular), + "Bootstrap" => Ok(NetworkNodeType::Bootstrap), + _ => Err( + "Couldn't parse node type. Must be one of Conductor, Bootstrap, Regular" + .to_string(), + ), + } + } +} + /// serialize an arbitrary message /// # Errors /// when unable to serialize a message @@ -389,6 +409,9 @@ pub enum ClientRequest { Pruning(bool), /// add vec of known peers or addresses AddKnownPeers(Vec<(Option, Multiaddr)>), + /// Ignore peers. Only here for debugging purposes. + /// Allows us to have nodes that are never pruned + IgnorePeers(Vec), } /// events generated by the swarm that we wish @@ -733,6 +756,12 @@ impl NetworkNode { #[allow(clippy::enum_glob_use)] use ClientRequest::*; match msg { + IgnorePeers(peers) => { + self.swarm + .behaviour_mut() + .ignored_peers + .extend(peers.iter()); + } Shutdown => { warn!("Libp2p listener shutting down"); return Ok(true); diff --git a/libp2p-networking/tests/common/mod.rs b/libp2p-networking/tests/common/mod.rs index c5a446bada..0fa043ae76 100644 --- a/libp2p-networking/tests/common/mod.rs +++ b/libp2p-networking/tests/common/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, sync::{Arc, Once}, time::Duration, }; @@ -111,6 +111,7 @@ pub async fn print_connections(handles: &[Arc>]) { } } +#[allow(dead_code)] pub async fn check_connection_state(handles: &[Arc>]) { let mut err_msg = "".to_string(); for (i, handle) in handles.iter().enumerate() { diff --git a/libp2p-networking/tests/counter.rs b/libp2p-networking/tests/counter.rs index b146bf444e..0dd7577298 100644 --- a/libp2p-networking/tests/counter.rs +++ b/libp2p-networking/tests/counter.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; mod common; use async_std::future::timeout; -use common::{check_connection_state, test_bed, HandleSnafu, TestError}; +use common::{test_bed, HandleSnafu, TestError}; use bincode::Options;