From 4d0186f4d5a40d39dc2230069ca0eb4bb6932511 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Mon, 6 Apr 2020 14:09:56 +0200 Subject: [PATCH] Calcuate region threshold depending on PeerFeatures When a node is determining if a requesting node is in it's region, it takes the closest `n` peers that it is aware of and if the node ID is within that threshold it considers it a neighbouring node. Because there are nodes on the network that do not perform some network functions (such as message propagation and store and forward i.e. wallets), there is a chance that a majority of neighbours are these non-propagating nodes and 'take up space' where propagation nodes could 'fit'. When calculating the threshold for neighbouring nodes it must take the features of the nodes it knows about into account. This PR changes the peer manager to consider `COMMUNICATION_NODE`s and `COMMUNICATION_CLIENT`s independently to ensure a minimum number of propagation nodes are considered neighbouring. --- comms/dht/examples/memorynet.rs | 2 +- comms/src/connection_manager/tests/manager.rs | 2 +- comms/src/peer_manager/manager.rs | 168 +++++++++++++----- comms/src/peer_manager/node_id.rs | 8 +- comms/src/peer_manager/peer_storage.rs | 57 ++++-- comms/src/test_utils/node_identity.rs | 6 +- 6 files changed, 171 insertions(+), 72 deletions(-) diff --git a/comms/dht/examples/memorynet.rs b/comms/dht/examples/memorynet.rs index b84fbe2e08..66d5cc6d82 100644 --- a/comms/dht/examples/memorynet.rs +++ b/comms/dht/examples/memorynet.rs @@ -339,7 +339,7 @@ async fn peer_list_summary<'a, I: IntoIterator, T: AsRef>(ne .as_ref() .comms .peer_manager() - .closest_peers(node_identity.node_id(), 10, &[]) + .closest_peers(node_identity.node_id(), 10, &[], None) .await .unwrap(); let mut table = Table::new(); diff --git a/comms/src/connection_manager/tests/manager.rs b/comms/src/connection_manager/tests/manager.rs index 0fabbf3e0d..a71cbb9c24 100644 --- a/comms/src/connection_manager/tests/manager.rs +++ b/comms/src/connection_manager/tests/manager.rs @@ -234,7 +234,7 @@ async fn dial_offline_peer() { async fn simultaneous_dial_events() { let mut shutdown = Shutdown::new(); - let node_identities = ordered_node_identities(2); + let node_identities = ordered_node_identities(2, Default::default()); // Setup connection manager 1 let peer_manager1 = build_peer_manager(); diff --git a/comms/src/peer_manager/manager.rs b/comms/src/peer_manager/manager.rs index b19b5fdde6..de90ee5510 100644 --- a/comms/src/peer_manager/manager.rs +++ b/comms/src/peer_manager/manager.rs @@ -23,7 +23,7 @@ use crate::{ peer_manager::{ connection_stats::PeerConnectionStats, - node_id::NodeId, + node_id::{NodeDistance, NodeId}, peer::{Peer, PeerFlags}, peer_id::PeerId, peer_storage::PeerStorage, @@ -186,15 +186,20 @@ impl PeerManager { self.peer_storage.read().await.for_each(f) } - /// Fetch n nearest neighbour Communication Nodes + /// Fetch n nearest neighbours. If features are supplied, the function will return the closest peers matching that + /// feature pub async fn closest_peers( &self, node_id: &NodeId, n: usize, excluded_peers: &[CommsPublicKey], + features: Option, ) -> Result, PeerManagerError> { - self.peer_storage.read().await.closest_peers(node_id, n, excluded_peers) + self.peer_storage + .read() + .await + .closest_peers(node_id, n, excluded_peers, features) } /// Fetch n random peers @@ -218,6 +223,19 @@ impl PeerManager { .in_network_region(node_id, region_node_id, n) } + pub async fn calc_region_threshold( + &self, + region_node_id: &NodeId, + n: usize, + features: PeerFeatures, + ) -> Result + { + self.peer_storage + .read() + .await + .calc_region_threshold(region_node_id, n, features) + } + /// Changes the ban flag bit of the peer pub async fn set_banned(&self, public_key: &CommsPublicKey, ban_flag: bool) -> Result { self.peer_storage.write().await.set_banned(public_key, ban_flag) @@ -249,18 +267,11 @@ mod test { use tari_crypto::{keys::PublicKey, ristretto::RistrettoPublicKey}; use tari_storage::HashmapDatabase; - fn create_test_peer(ban_flag: bool) -> Peer { + fn create_test_peer(ban_flag: bool, features: PeerFeatures) -> Peer { let (_sk, pk) = RistrettoPublicKey::random_keypair(&mut OsRng); let node_id = NodeId::from_key(&pk).unwrap(); let net_addresses = MultiaddressesWithStats::from("/ip4/1.2.3.4/tcp/8000".parse::().unwrap()); - let mut peer = Peer::new( - pk, - node_id, - net_addresses, - PeerFlags::default(), - PeerFeatures::MESSAGE_PROPAGATION, - &[], - ); + let mut peer = Peer::new(pk, node_id, net_addresses, PeerFlags::default(), features, &[]); peer.set_banned(ban_flag); peer } @@ -271,19 +282,19 @@ mod test { let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap(); let mut test_peers = Vec::new(); // Create 20 peers were the 1st and last one is bad - test_peers.push(create_test_peer(true)); + test_peers.push(create_test_peer(true, PeerFeatures::COMMUNICATION_NODE)); assert!(peer_manager .add_peer(test_peers[test_peers.len() - 1].clone()) .await .is_ok()); for _i in 0..18 { - test_peers.push(create_test_peer(false)); + test_peers.push(create_test_peer(false, PeerFeatures::COMMUNICATION_NODE)); assert!(peer_manager .add_peer(test_peers[test_peers.len() - 1].clone()) .await .is_ok()); } - test_peers.push(create_test_peer(true)); + test_peers.push(create_test_peer(true, PeerFeatures::COMMUNICATION_NODE)); assert!(peer_manager .add_peer(test_peers[test_peers.len() - 1].clone()) .await @@ -298,7 +309,7 @@ mod test { assert_eq!(selected_peers.node_id, test_peers[2].node_id); assert_eq!(selected_peers.public_key, test_peers[2].public_key); // Test Invalid Direct - let unmanaged_peer = create_test_peer(false); + let unmanaged_peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE); assert!(peer_manager .direct_identity_node_id(&unmanaged_peer.node_id) .await @@ -321,7 +332,7 @@ mod test { // Test Closest - No exclusions let selected_peers = peer_manager - .closest_peers(&unmanaged_peer.node_id, 3, &Vec::new()) + .closest_peers(&unmanaged_peer.node_id, 3, &[], None) .await .unwrap(); assert_eq!(selected_peers.len(), 3); @@ -349,7 +360,7 @@ mod test { selected_peers[0].public_key.clone(), // ,selected_peers[1].public_key.clone() ]; let selected_peers = peer_manager - .closest_peers(&unmanaged_peer.node_id, 3, &excluded_peers) + .closest_peers(&unmanaged_peer.node_id, 3, &excluded_peers, None) .await .unwrap(); assert_eq!(selected_peers.len(), 3); @@ -379,45 +390,104 @@ mod test { } #[tokio_macros::test_basic] - async fn test_in_network_region() { - let _rng = rand::rngs::OsRng; + async fn calc_region_threshold() { + let n = 5; // Create peer manager with random peers let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap(); - let network_region_node_id = create_test_peer(false).node_id; - // Create peers - let mut test_peers: Vec = Vec::new(); - for _ in 0..10 { - test_peers.push(create_test_peer(false)); - assert!(peer_manager - .add_peer(test_peers[test_peers.len() - 1].clone()) - .await - .is_ok()); + let network_region_node_id = create_test_peer(false, Default::default()).node_id; + let mut test_peers = (0..10) + .map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_NODE)) + .chain((0..10).map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT))) + .collect::>(); + + for p in &test_peers { + peer_manager.add_peer(p.clone()).await.unwrap(); } - test_peers[0].set_banned(true); - test_peers[1].set_banned(true); - // Get nearest neighbours - let n = 5; - let nearest_identities = peer_manager - .closest_peers(&network_region_node_id, n, &Vec::new()) + test_peers.sort_by(|a, b| { + let a_dist = network_region_node_id.distance(&a.node_id); + let b_dist = network_region_node_id.distance(&b.node_id); + a_dist.partial_cmp(&b_dist).unwrap() + }); + + let node_region_threshold = peer_manager + .calc_region_threshold(&network_region_node_id, n, PeerFeatures::COMMUNICATION_NODE) .await .unwrap(); - for peer in &test_peers { - if nearest_identities + // First 5 base nodes should be within the region + for peer in test_peers + .iter() + .filter(|p| p.features == PeerFeatures::COMMUNICATION_NODE) + .take(n) + { + assert!(peer.node_id.distance(&network_region_node_id) <= node_region_threshold); + } + + // Next 5 should not be in the region + for peer in test_peers + .iter() + .filter(|p| p.features == PeerFeatures::COMMUNICATION_NODE) + .skip(n) + { + assert!(peer.node_id.distance(&network_region_node_id) > node_region_threshold); + } + + let node_region_threshold = peer_manager + .calc_region_threshold(&network_region_node_id, n, PeerFeatures::COMMUNICATION_CLIENT) + .await + .unwrap(); + + // First 5 clients should be in region + for peer in test_peers + .iter() + .filter(|p| p.features == PeerFeatures::COMMUNICATION_CLIENT) + .take(5) + { + assert!(peer.node_id.distance(&network_region_node_id) <= node_region_threshold); + } + + // Next 5 should not be in the region + for peer in test_peers + .iter() + .filter(|p| p.features == PeerFeatures::COMMUNICATION_CLIENT) + .skip(5) + { + assert!(peer.node_id.distance(&network_region_node_id) > node_region_threshold); + } + } + + #[tokio_macros::test_basic] + async fn closest_peers() { + let n = 5; + // Create peer manager with random peers + let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap(); + let network_region_node_id = create_test_peer(false, Default::default()).node_id; + let test_peers = (0..10) + .map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_NODE)) + .chain((0..10).map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT))) + .collect::>(); + + for p in &test_peers { + peer_manager.add_peer(p.clone()).await.unwrap(); + } + + for features in &[PeerFeatures::COMMUNICATION_NODE, PeerFeatures::COMMUNICATION_CLIENT] { + let node_threshold = peer_manager + .peer_storage + .read() + .await + .calc_region_threshold(&network_region_node_id, n, *features) + .unwrap(); + + let closest = peer_manager + .closest_peers(&network_region_node_id, n, &[], Some(*features)) + .await + .unwrap(); + + assert!(closest .iter() - .any(|peer_identity| peer.node_id == peer_identity.node_id) - { - assert!(peer_manager - .in_network_region(&peer.node_id, &network_region_node_id, n) - .await - .unwrap()); - } else { - assert!(!peer_manager - .in_network_region(&peer.node_id, &network_region_node_id, n) - .await - .unwrap()); - } + .all(|p| network_region_node_id.distance(&p.node_id) <= node_threshold)); } } } diff --git a/comms/src/peer_manager/node_id.rs b/comms/src/peer_manager/node_id.rs index c7b1552cfc..b1466803b0 100644 --- a/comms/src/peer_manager/node_id.rs +++ b/comms/src/peer_manager/node_id.rs @@ -68,7 +68,7 @@ impl NodeDistance { nd } - pub fn max_distance() -> NodeDistance { + pub const fn max_distance() -> NodeDistance { NodeDistance([255; NODE_ID_ARRAY_SIZE]) } } @@ -355,6 +355,12 @@ mod test { assert!(n1_to_n2_dist < n1_to_n3_dist); assert_eq!(n1_to_n2_dist, desired_n1_to_n2_dist); assert_eq!(n1_to_n3_dist, desired_n1_to_n3_dist); + + // Commutative + let n1_to_n2_dist = node_id1.distance(&node_id2); + let n2_to_n1_dist = node_id2.distance(&node_id1); + + assert_eq!(n1_to_n2_dist, n2_to_n1_dist); } #[test] diff --git a/comms/src/peer_manager/peer_storage.rs b/comms/src/peer_manager/peer_storage.rs index 7c488d8d0d..7c572a16f3 100644 --- a/comms/src/peer_manager/peer_storage.rs +++ b/comms/src/peer_manager/peer_storage.rs @@ -301,13 +301,18 @@ where DS: KeyValueStore node_id: &NodeId, n: usize, excluded_peers: &[CommsPublicKey], + features: Option, ) -> Result, PeerManagerError> { let mut peer_keys = Vec::new(); let mut dists = Vec::new(); self.peer_db .for_each_ok(|(peer_key, peer)| { - if !peer.is_banned() && !excluded_peers.contains(&peer.public_key) { + if features.map(|f| peer.features == f).unwrap_or(true) && + !peer.is_banned() && + !peer.is_offline() && + !excluded_peers.contains(&peer.public_key) + { peer_keys.push(peer_key); dists.push(node_id.distance(&peer.node_id)); } @@ -345,7 +350,7 @@ where DS: KeyValueStore // TODO: Send to a random set of Communication Nodes let mut peer_keys = self .peer_db - .filter(|(_, peer)| !peer.is_banned()) + .filter(|(_, peer)| !peer.is_banned() && peer.features == PeerFeatures::COMMUNICATION_NODE) .map(|pairs| pairs.into_iter().map(|(k, _)| k).collect::>()) .map_err(PeerManagerError::DatabaseError)?; @@ -383,23 +388,43 @@ where DS: KeyValueStore n: usize, ) -> Result { - let region2node_dist = region_node_id.distance(node_id); + let region_node_distance = region_node_id.distance(node_id); + let node_threshold = self.calc_region_threshold(region_node_id, n, PeerFeatures::COMMUNICATION_NODE)?; + // Is node ID in the base node threshold? + if region_node_distance <= node_threshold { + return Ok(true); + } + let client_threshold = self.calc_region_threshold(region_node_id, n, PeerFeatures::COMMUNICATION_CLIENT)?; + // Is node ID in the base client threshold? + Ok(region_node_distance <= client_threshold) + } + + pub fn calc_region_threshold( + &self, + region_node_id: &NodeId, + n: usize, + features: PeerFeatures, + ) -> Result + { let mut dists = vec![NodeDistance::max_distance(); n]; - let last_index = dists.len() - 1; + let last_index = n - 1; + self.peer_db .for_each_ok(|(_, peer)| { - if !peer.is_banned() { - let curr_dist = region_node_id.distance(&peer.node_id); - for i in 0..dists.len() { - if dists[i] > curr_dist { - dists.insert(i, curr_dist); - dists.pop(); - break; - } - } + if peer.features != features { + return IterationResult::Continue; + } + + if peer.is_banned() || peer.is_offline() { + return IterationResult::Continue; + } - if region2node_dist > dists[last_index] { - return IterationResult::Break; + let curr_dist = region_node_id.distance(&peer.node_id); + for i in 0..dists.len() { + if dists[i] > curr_dist { + dists.insert(i, curr_dist); + dists.pop(); + break; } } @@ -407,7 +432,7 @@ where DS: KeyValueStore }) .map_err(PeerManagerError::DatabaseError)?; - Ok(region2node_dist <= dists[last_index]) + Ok(dists.remove(last_index)) } /// Changes the ban flag bit of the peer diff --git a/comms/src/test_utils/node_identity.rs b/comms/src/test_utils/node_identity.rs index fd06a40dc2..86df4b4c8b 100644 --- a/comms/src/test_utils/node_identity.rs +++ b/comms/src/test_utils/node_identity.rs @@ -34,10 +34,8 @@ pub fn build_node_identity(features: PeerFeatures) -> Arc { Arc::new(NodeIdentity::random(&mut OsRng, public_addr, features).unwrap()) } -pub fn ordered_node_identities(n: usize) -> Vec> { - let mut ids = (0..n) - .map(|_| build_node_identity(PeerFeatures::default())) - .collect::>(); +pub fn ordered_node_identities(n: usize, features: PeerFeatures) -> Vec> { + let mut ids = (0..n).map(|_| build_node_identity(features)).collect::>(); ids.sort_unstable_by(|a, b| a.node_id().cmp(b.node_id())); ids }