From 0f802721fc1ccb6ae801eab5298930ec89e429a9 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Tue, 12 May 2020 15:23:26 +0200 Subject: [PATCH] Reduce number of messages sent when propagating This is a stop gap fix - the code will be simplified by ConnectivityManager. - Added `propagation_factor` (default: 0.5) to indicate what percentage of neighbours should be propagated to - If the destination is given, send to at most `num_neighbours * propagation_factor` nodes unless nodes are further away from the destination - If destination is Unknown, send to a random set of peers (either connect to a random set of peers or if we have enough peers, select randomly from peers that are already connected) - Join messages are propagated with a reduced number of messages - Updated memorynet to test network wide message propagation --- comms/dht/examples/memorynet.rs | 202 ++++++++++++++++------ comms/dht/src/actor.rs | 174 ++++++++++++------- comms/dht/src/config.rs | 22 ++- comms/dht/src/inbound/dht_handler/task.rs | 10 +- 4 files changed, 281 insertions(+), 127 deletions(-) diff --git a/comms/dht/examples/memorynet.rs b/comms/dht/examples/memorynet.rs index 10f2254ce4..c171f581c1 100644 --- a/comms/dht/examples/memorynet.rs +++ b/comms/dht/examples/memorynet.rs @@ -36,9 +36,9 @@ //! `RUST_BACKTRACE=1 RUST_LOG=trace cargo run --example memorynet 2> /tmp/debug.log` // Size of network -const NUM_NODES: usize = 39; +const NUM_NODES: usize = 40; // Must be at least 2 -const NUM_WALLETS: usize = 8; +const NUM_WALLETS: usize = 6; const QUIET_MODE: bool = true; mod memory_net; @@ -71,6 +71,7 @@ use tari_comms::{ }; use tari_comms_dht::{ domain_message::OutboundDomainMessage, + envelope::NodeDestination, inbound::DecryptedDhtMessage, outbound::OutboundEncryption, Dht, @@ -78,7 +79,7 @@ use tari_comms_dht::{ }; use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper}; use tari_test_utils::{paths::create_temporary_data_path, random}; -use tokio::{runtime, time}; +use tokio::{runtime, task, time}; use tower::ServiceBuilder; type MessagingEventRx = mpsc::UnboundedReceiver<(NodeId, NodeId)>; @@ -242,6 +243,10 @@ async fn main() { // Put the wallet back wallets.push(random_wallet); + do_network_wide_propagation(&mut nodes).await; + + total_messages += drain_messaging_events(&mut messaging_events_rx, false).await; + println!("{} messages sent in total across the network", total_messages); network_peer_list_stats(&nodes, &wallets).await; @@ -419,6 +424,91 @@ async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]) { ); } +async fn do_network_wide_propagation(nodes: &mut [TestNode]) { + let random_node = &nodes[OsRng.gen_range(0, nodes.len() - 1)]; + let random_node_id = random_node.comms.node_identity().node_id().clone(); + const PUBLIC_MESSAGE: &str = "This is something you're all interested in!"; + + banner!("🌎 {} is going to broadcast a message to the network", random_node); + random_node + .dht + .outbound_requester() + .broadcast( + NodeDestination::Unknown, + OutboundEncryption::None, + vec![], + OutboundDomainMessage::new(0i32, PUBLIC_MESSAGE.to_string()), + ) + .await + .unwrap(); + + // Spawn task for each peer that will read the message and propagate it on + let tasks = nodes + .into_iter() + .filter(|n| n.comms.node_identity().node_id() != &random_node_id) + .enumerate() + .map(|(idx, node)| { + let mut outbound_req = node.dht.outbound_requester(); + let mut ims_rx = node.ims_rx.take().unwrap(); + let start = Instant::now(); + let node_name = node.name.clone(); + + task::spawn(async move { + let result = time::timeout(Duration::from_secs(5), ims_rx.next()).await; + let mut is_success = false; + match result { + Ok(Some(msg)) => { + let public_msg = msg + .decryption_result + .unwrap() + .decode_part::(1) + .unwrap() + .unwrap(); + println!("📬 {} got public message '{}'", node_name, public_msg); + is_success = true; + let sent_state = outbound_req + .propagate( + NodeDestination::Unknown, + OutboundEncryption::None, + vec![msg.source_peer.node_id.clone()], + OutboundDomainMessage::new(0i32, public_msg), + ) + .await + .unwrap(); + let states = sent_state.resolve_ok().await.unwrap(); + println!("🦠 {} propagated to {} peer(s)", node_name, states.len()); + }, + Err(_) | Ok(None) => { + banner!( + "💩 {} failed to receive network message after {}ms", + node_name, + start.elapsed().as_millis(), + ); + }, + } + + (idx, ims_rx, is_success) + }) + }); + + // Put the ims_rxs back + let ims_rxs = future::join_all(tasks).await; + let mut num_successes = 0; + for ims in ims_rxs { + let (idx, ims_rx, is_success) = ims.unwrap(); + nodes[idx].ims_rx = Some(ims_rx); + if is_success { + num_successes += 1; + } + } + + banner!( + "🙌 Finished propagation test. {} out of {} nodes received the message", + num_successes, + nodes.len() - 1 + ); +} + async fn do_store_and_forward_message_propagation( wallet: TestNode, wallets: &[TestNode], @@ -426,9 +516,9 @@ async fn do_store_and_forward_message_propagation( messaging_rx: &mut MessagingEventRx, ) -> (usize, TestNode) { - println!( - "{} chosen at random to be receive a message from {} using store and forward", - wallet, wallets[0] + banner!( + "{} chosen at random to be receive messages from other nodes using store and forward", + wallet, ); let wallets_peers = wallet.comms.peer_manager().all().await.unwrap(); let node_identity = wallet.comms.node_identity().clone(); @@ -437,28 +527,28 @@ async fn do_store_and_forward_message_propagation( wallet.comms.shutdown().await; banner!( - "🌎 {} ({}) is going to attempt to discover {} ({})", - wallets[0], - wallets[0].comms.node_identity().public_key(), + "🎤 All other wallets are going to attempt to broadcast messages to {} ({})", get_name(node_identity.node_id()), node_identity.public_key(), ); - let secret_message = format!("My name is wiki wiki {}", wallets[0]); let start = Instant::now(); - wallets[0] - .dht - .outbound_requester() - .broadcast( - node_identity.node_id().clone().into(), - OutboundEncryption::EncryptFor(Box::new(node_identity.public_key().clone())), - vec![], - OutboundDomainMessage::new(123i32, secret_message.clone()), - ) - .await - .unwrap(); + for wallet in wallets { + let secret_message = format!("My name is wiki wiki {}", wallet); + wallet + .dht + .outbound_requester() + .broadcast( + node_identity.node_id().clone().into(), + OutboundEncryption::EncryptFor(Box::new(node_identity.public_key().clone())), + vec![], + OutboundDomainMessage::new(123i32, secret_message.clone()), + ) + .await + .unwrap(); + } - println!("Waiting a few seconds for message to propagate around the network..."); + banner!("⏰ Waiting a few seconds for messages to propagate around the network..."); time::delay_for(Duration::from_secs(5)).await; let mut total_messages = drain_messaging_events(messaging_rx, false).await; @@ -475,33 +565,41 @@ async fn do_store_and_forward_message_propagation( .await .unwrap(); - let result = time::timeout(Duration::from_secs(20), wallet.ims_rx.next()).await; - total_messages += match result { - Ok(msg) => { - let msg = msg.unwrap(); - let secret_msg = msg - .decryption_result - .unwrap() - .decode_part::(1) - .unwrap() - .unwrap(); - banner!( - "🎉 Wallet {} received propagated message '{}' from store and forward in {}ms", - wallet, - secret_msg, - start.elapsed().as_millis() - ); - drain_messaging_events(messaging_rx, false).await - }, - Err(err) => { - banner!( - "💩 Failed to receive message after {}ms using store and forward '{:?}'", - start.elapsed().as_millis(), - err - ); - drain_messaging_events(messaging_rx, true).await - }, - }; + let mut num_msgs = 0; + loop { + let result = time::timeout(Duration::from_secs(20), wallet.ims_rx.as_mut().unwrap().next()).await; + num_msgs += 1; + match result { + Ok(msg) => { + let msg = msg.unwrap(); + let secret_msg = msg + .decryption_result + .unwrap() + .decode_part::(1) + .unwrap() + .unwrap(); + banner!( + "🎉 Wallet {} received propagated message '{}' from store and forward in {}ms", + wallet, + secret_msg, + start.elapsed().as_millis() + ); + }, + Err(err) => { + banner!( + "💩 Failed to receive message after {}ms using store and forward '{:?}'", + start.elapsed().as_millis(), + err + ); + }, + }; + + if num_msgs == wallets.len() { + break; + } + } + + total_messages += drain_messaging_events(messaging_rx, false).await; (total_messages, wallet) } @@ -609,7 +707,7 @@ struct TestNode { seed_peer: Option, dht: Dht, conn_man_events_rx: mpsc::Receiver>, - ims_rx: mpsc::Receiver, + ims_rx: Option>, } impl TestNode { @@ -632,7 +730,7 @@ impl TestNode { seed_peer, comms, dht, - ims_rx, + ims_rx: Some(ims_rx), conn_man_events_rx: events_rx, } } @@ -817,5 +915,5 @@ async fn setup_comms_dht( async fn take_a_break() { banner!("Taking a break for a few seconds to let things settle..."); - time::delay_for(Duration::from_millis(NUM_NODES as u64 * 300)).await; + time::delay_for(Duration::from_millis(NUM_NODES as u64 * 100)).await; } diff --git a/comms/dht/src/actor.rs b/comms/dht/src/actor.rs index 9d93a74777..20e1416308 100644 --- a/comms/dht/src/actor.rs +++ b/comms/dht/src/actor.rs @@ -46,6 +46,7 @@ use futures::{ StreamExt, }; use log::*; +use rand::{rngs::OsRng, seq::SliceRandom}; use std::{fmt, fmt::Display, sync::Arc}; use tari_comms::{ connection_manager::{ConnectionManagerError, ConnectionManagerRequester}, @@ -433,54 +434,121 @@ impl<'a> DhtActor<'a> { .collect()) }, Neighbours(exclude) => { - let candidates = Self::get_propagate_candidates( + let active_connections = connection_manager.get_active_connections().await?; + let (connected_nodes, connected_clients) = active_connections + .into_iter() + .map(|conn| conn.peer()) + .partition::, _>(|peer| peer.features.contains(PeerFeatures::COMMUNICATION_NODE)); + let mut candidates = Self::get_propagate_candidates( &config, &peer_manager, - &mut connection_manager, + &connected_clients, + &connected_nodes, &node_identity, node_identity.node_id().clone(), &exclude, ) .await?; + candidates.truncate(config.num_neighbouring_nodes); + + info!( + target: LOG_TARGET, + "{} candidate(s) selected for broadcast", + candidates.len() + ); + Ok(candidates) }, Propagate(destination, exclude) => { - let dest_node_id = destination - .node_id() - .map(Clone::clone) - .or_else(|| destination.public_key().and_then(|pk| NodeId::from_key(pk).ok())); - - let mut candidates = Self::get_propagate_candidates( - &config, - &peer_manager, - &mut connection_manager, - &node_identity, - dest_node_id.clone().unwrap_or_else(|| node_identity.node_id().clone()), - &exclude, - ) - .await?; - - // Exclude candidates that are further away from the destination than this node - // unless this node has not selected a big enough sample i.e. this node is not well connected - if candidates.len() >= config.num_neighbouring_nodes { - if let Some(node_id) = dest_node_id { - let dist_from_dest = node_identity.node_id().distance(&node_id); - let before_len = candidates.len(); - candidates = candidates - .into_iter() - .filter(|p| p.node_id.distance(&node_id) < dist_from_dest) + let active_connections = connection_manager.get_active_connections().await?; + let (connected_nodes, connected_clients) = active_connections + .into_iter() + .map(|conn| conn.peer()) + .partition::, _>(|peer| peer.features.contains(PeerFeatures::COMMUNICATION_NODE)); + + debug!( + target: LOG_TARGET, + "{} connected node(s), {} connected client(s)", + connected_nodes.len(), + connected_clients.len() + ); + + if destination.is_unknown() { + // If the message has an unknown destination, propagate to random peers + if connected_nodes.len() >= config.num_neighbouring_nodes { + let candidates = connected_nodes + .choose_multiple(&mut OsRng, config.num_propagation_nodes()) + .cloned() .collect(); - debug!( target: LOG_TARGET, - "Filtered out {} node(s) that are further away than this node.", - before_len - candidates.len() + "Selected {} candidates for propagation to undefined destination from a pool of {} active \ + connections", + config.num_neighbouring_nodes, + connected_nodes.len() ); + Ok(candidates) + } else { + let random_peers = peer_manager + .random_peers(config.num_propagation_nodes(), &exclude) + .await? + .into_iter() + .map(Arc::new) + .collect::>(); + debug!( + target: LOG_TARGET, + "Selected {} random candidates for propagation to undefined destination", + random_peers.len(), + ); + Ok(random_peers) + } + } else { + let dest_node_id = destination + .node_id() + .map(Clone::clone) + .or_else(|| destination.public_key().and_then(|pk| NodeId::from_key(pk).ok())); + + let mut candidates = Self::get_propagate_candidates( + &config, + &peer_manager, + &connected_clients, + &connected_nodes, + &node_identity, + dest_node_id.clone().unwrap_or_else(|| node_identity.node_id().clone()), + &exclude, + ) + .await?; + + // Exclude candidates that are further away from the destination than this node + // unless this node has not selected a big enough sample i.e. this node is not well connected + if candidates.len() >= config.num_neighbouring_nodes { + if let Some(node_id) = dest_node_id { + let dist_from_dest = node_identity.node_id().distance(&node_id); + let before_len = candidates.len(); + candidates = candidates + .into_iter() + .filter(|p| p.node_id.distance(&node_id) < dist_from_dest) + .collect(); + + debug!( + target: LOG_TARGET, + "Filtered out {} node(s) that are further away than this node.", + before_len - candidates.len() + ); + } } - } - Ok(candidates) + candidates.truncate(config.num_propagation_nodes()); + info!( + target: LOG_TARGET, + "{} candidate(s) selected for propagation to {}", + candidates.len(), + destination + ); + + Ok(candidates) + } }, } } @@ -488,33 +556,25 @@ impl<'a> DhtActor<'a> { async fn get_propagate_candidates( config: &DhtConfig, peer_manager: &PeerManager, - connection_manager: &mut ConnectionManagerRequester, + connected_clients: &[Arc], + connected_nodes: &[Arc], node_identity: &NodeIdentity, dest_node_id: NodeId, exclude: &[NodeId], ) -> Result>, DhtActorError> { - let active_connections = connection_manager.get_active_connections().await?; - let (connected_nodes, connected_clients) = active_connections - .into_iter() - .map(|conn| conn.peer()) - .partition::, _>(|peer| peer.features.contains(PeerFeatures::COMMUNICATION_NODE)); - - debug!( - target: LOG_TARGET, - "{} nodes and {} clients are connected", - connected_nodes.len(), - connected_clients.len() - ); - // If a connected wallet matches the destination, just send it to them if let Some(client) = connected_clients.iter().find(|peer| peer.node_id == dest_node_id) { - debug!( - target: LOG_TARGET, - "Message destination is for the connected client '{}'. Sending to connected client only.", - client.node_id - ); - return Ok(vec![client.clone()]); + // If we're excluding the client for this propagation (as is the case for join messages) + // do a normal propagation + if !exclude.contains(&client.node_id) { + debug!( + target: LOG_TARGET, + "Message destination is for the connected client '{}'. Sending to connected client only.", + client.node_id + ); + return Ok(vec![client.clone()]); + } } let mut candidates = Self::select_closest_peers_for_propagation( @@ -530,8 +590,9 @@ impl<'a> DhtActor<'a> { // Add any other communication nodes that are connected. let connected_nodes = connected_nodes - .into_iter() + .iter() .filter(|peer| !candidates.contains(&peer)) + .cloned() .collect::>(); candidates.extend(connected_nodes); @@ -547,13 +608,6 @@ impl<'a> DhtActor<'a> { let node_b_dist = b.node_id.distance(&dest_node_id); node_a_dist.cmp(&node_b_dist) }); - candidates.truncate(config.num_neighbouring_nodes); - - info!( - target: LOG_TARGET, - "{} candidate(s) selected for propagation", - candidates.len() - ); Ok(candidates) } @@ -834,7 +888,7 @@ mod test { actor.spawn().await.unwrap(); assert!(requester - .get_metadata::>(DhtMetadataKey::OfflineTimestamp,) + .get_metadata::>(DhtMetadataKey::OfflineTimestamp) .await .unwrap() .is_none()); diff --git a/comms/dht/src/config.rs b/comms/dht/src/config.rs index 91cef39912..638d17f88e 100644 --- a/comms/dht/src/config.rs +++ b/comms/dht/src/config.rs @@ -30,9 +30,7 @@ pub const SAF_LOW_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(6 * 6 /// The default time-to-live duration used for storage of high priority messages by the Store-and-forward middleware pub const SAF_HIGH_PRIORITY_MSG_STORAGE_TTL: Duration = Duration::from_secs(3 * 24 * 60 * 60); // 3 days /// The default number of peer nodes that a message has to be closer to, to be considered a neighbour -pub const DEFAULT_NUM_NEIGHBOURING_NODES: usize = 10; -/// The default number of randomly-selected peer nodes to be included in propagation messages -pub const DEFAULT_NUM_RANDOM_PROPAGATION_NODES: usize = 2; +pub const DEFAULT_NUM_NEIGHBOURING_NODES: usize = 8; #[derive(Debug, Clone)] pub struct DhtConfig { @@ -42,12 +40,12 @@ pub struct DhtConfig { /// Default: 20 pub outbound_buffer_size: usize, /// The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour - /// Default: 10 + /// Default: 8 pub num_neighbouring_nodes: usize, - /// The maximum number of randomly-selected peer nodes that will be included in propagation - /// messages. Only applies to `BroadcastStrategy::Propagate`. - /// Default: 2 - pub num_random_propagation_nodes: usize, + /// A number from 0 to 1 that determines the number of peers to propagate to as a factor of + /// `num_neighbouring_nodes`. + /// Default: 0.5 + pub propagation_factor: f32, /// A request to retrieve stored messages will be ignored if the requesting node is /// not within one of this nodes _n_ closest nodes. /// Default 8 @@ -109,13 +107,19 @@ impl DhtConfig { ..Default::default() } } + + #[inline] + pub fn num_propagation_nodes(&self) -> usize { + let n = self.num_neighbouring_nodes as f32 * self.propagation_factor; + n.round() as usize + } } impl Default for DhtConfig { fn default() -> Self { Self { num_neighbouring_nodes: DEFAULT_NUM_NEIGHBOURING_NODES, - num_random_propagation_nodes: DEFAULT_NUM_RANDOM_PROPAGATION_NODES, + propagation_factor: 0.5, saf_num_closest_nodes: 10, saf_max_returned_messages: 50, outbound_buffer_size: 20, diff --git a/comms/dht/src/inbound/dht_handler/task.rs b/comms/dht/src/inbound/dht_handler/task.rs index b089e8512b..71c05ed0ab 100644 --- a/comms/dht/src/inbound/dht_handler/task.rs +++ b/comms/dht/src/inbound/dht_handler/task.rs @@ -237,12 +237,10 @@ where S: Service self.outbound_service .send_raw( SendMessageParams::new() - .closest( - origin_node_id.clone(), - self.config.num_neighbouring_nodes, - vec![origin_node_id, source_peer.node_id.clone()], - PeerFeatures::MESSAGE_PROPAGATION, - ) + .propagate(origin_node_id.clone().into(), vec![ + origin_node_id, + source_peer.node_id.clone(), + ]) .with_dht_header(dht_header) .finish(), body.to_encoded_bytes(),