Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate region threshold depending on PeerFeatures #1677

Merged
merged 1 commit into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async fn peer_list_summary<'a, I: IntoIterator<Item = T>, T: AsRef<TestNode>>(ne
.as_ref()
.comms
.peer_manager()
.closest_peers(node_identity.node_id(), 10, &[])
.closest_peers(node_identity.node_id(), 10, &[], None)
.await
.unwrap();
let mut table = Table::new();
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async fn dial_offline_peer() {
async fn simultaneous_dial_events() {
let mut shutdown = Shutdown::new();

let node_identities = ordered_node_identities(2);
let node_identities = ordered_node_identities(2, Default::default());

// Setup connection manager 1
let peer_manager1 = build_peer_manager();
Expand Down
168 changes: 119 additions & 49 deletions comms/src/peer_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::{
peer_manager::{
connection_stats::PeerConnectionStats,
node_id::NodeId,
node_id::{NodeDistance, NodeId},
peer::{Peer, PeerFlags},
peer_id::PeerId,
peer_storage::PeerStorage,
Expand Down Expand Up @@ -186,15 +186,20 @@ impl PeerManager {
self.peer_storage.read().await.for_each(f)
}

/// Fetch n nearest neighbour Communication Nodes
/// Fetch n nearest neighbours. If features are supplied, the function will return the closest peers matching that
/// feature
pub async fn closest_peers(
&self,
node_id: &NodeId,
n: usize,
excluded_peers: &[CommsPublicKey],
features: Option<PeerFeatures>,
) -> Result<Vec<Peer>, PeerManagerError>
{
self.peer_storage.read().await.closest_peers(node_id, n, excluded_peers)
self.peer_storage
.read()
.await
.closest_peers(node_id, n, excluded_peers, features)
}

/// Fetch n random peers
Expand All @@ -218,6 +223,19 @@ impl PeerManager {
.in_network_region(node_id, region_node_id, n)
}

pub async fn calc_region_threshold(
&self,
region_node_id: &NodeId,
n: usize,
features: PeerFeatures,
) -> Result<NodeDistance, PeerManagerError>
{
self.peer_storage
.read()
.await
.calc_region_threshold(region_node_id, n, features)
}

/// Changes the ban flag bit of the peer
pub async fn set_banned(&self, public_key: &CommsPublicKey, ban_flag: bool) -> Result<NodeId, PeerManagerError> {
self.peer_storage.write().await.set_banned(public_key, ban_flag)
Expand Down Expand Up @@ -249,18 +267,11 @@ mod test {
use tari_crypto::{keys::PublicKey, ristretto::RistrettoPublicKey};
use tari_storage::HashmapDatabase;

fn create_test_peer(ban_flag: bool) -> Peer {
fn create_test_peer(ban_flag: bool, features: PeerFeatures) -> Peer {
let (_sk, pk) = RistrettoPublicKey::random_keypair(&mut OsRng);
let node_id = NodeId::from_key(&pk).unwrap();
let net_addresses = MultiaddressesWithStats::from("/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap());
let mut peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::MESSAGE_PROPAGATION,
&[],
);
let mut peer = Peer::new(pk, node_id, net_addresses, PeerFlags::default(), features, &[]);
peer.set_banned(ban_flag);
peer
}
Expand All @@ -271,19 +282,19 @@ mod test {
let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap();
let mut test_peers = Vec::new();
// Create 20 peers were the 1st and last one is bad
test_peers.push(create_test_peer(true));
test_peers.push(create_test_peer(true, PeerFeatures::COMMUNICATION_NODE));
assert!(peer_manager
.add_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok());
for _i in 0..18 {
test_peers.push(create_test_peer(false));
test_peers.push(create_test_peer(false, PeerFeatures::COMMUNICATION_NODE));
assert!(peer_manager
.add_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok());
}
test_peers.push(create_test_peer(true));
test_peers.push(create_test_peer(true, PeerFeatures::COMMUNICATION_NODE));
assert!(peer_manager
.add_peer(test_peers[test_peers.len() - 1].clone())
.await
Expand All @@ -298,7 +309,7 @@ mod test {
assert_eq!(selected_peers.node_id, test_peers[2].node_id);
assert_eq!(selected_peers.public_key, test_peers[2].public_key);
// Test Invalid Direct
let unmanaged_peer = create_test_peer(false);
let unmanaged_peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
assert!(peer_manager
.direct_identity_node_id(&unmanaged_peer.node_id)
.await
Expand All @@ -321,7 +332,7 @@ mod test {

// Test Closest - No exclusions
let selected_peers = peer_manager
.closest_peers(&unmanaged_peer.node_id, 3, &Vec::new())
.closest_peers(&unmanaged_peer.node_id, 3, &[], None)
.await
.unwrap();
assert_eq!(selected_peers.len(), 3);
Expand Down Expand Up @@ -349,7 +360,7 @@ mod test {
selected_peers[0].public_key.clone(), // ,selected_peers[1].public_key.clone()
];
let selected_peers = peer_manager
.closest_peers(&unmanaged_peer.node_id, 3, &excluded_peers)
.closest_peers(&unmanaged_peer.node_id, 3, &excluded_peers, None)
.await
.unwrap();
assert_eq!(selected_peers.len(), 3);
Expand Down Expand Up @@ -379,45 +390,104 @@ mod test {
}

#[tokio_macros::test_basic]
async fn test_in_network_region() {
let _rng = rand::rngs::OsRng;
async fn calc_region_threshold() {
let n = 5;
// Create peer manager with random peers
let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap();
let network_region_node_id = create_test_peer(false).node_id;
// Create peers
let mut test_peers: Vec<Peer> = Vec::new();
for _ in 0..10 {
test_peers.push(create_test_peer(false));
assert!(peer_manager
.add_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok());
let network_region_node_id = create_test_peer(false, Default::default()).node_id;
let mut test_peers = (0..10)
.map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_NODE))
.chain((0..10).map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT)))
.collect::<Vec<_>>();

for p in &test_peers {
peer_manager.add_peer(p.clone()).await.unwrap();
}
test_peers[0].set_banned(true);
test_peers[1].set_banned(true);

// Get nearest neighbours
let n = 5;
let nearest_identities = peer_manager
.closest_peers(&network_region_node_id, n, &Vec::new())
test_peers.sort_by(|a, b| {
let a_dist = network_region_node_id.distance(&a.node_id);
let b_dist = network_region_node_id.distance(&b.node_id);
a_dist.partial_cmp(&b_dist).unwrap()
});

let node_region_threshold = peer_manager
.calc_region_threshold(&network_region_node_id, n, PeerFeatures::COMMUNICATION_NODE)
.await
.unwrap();

for peer in &test_peers {
if nearest_identities
// First 5 base nodes should be within the region
for peer in test_peers
.iter()
.filter(|p| p.features == PeerFeatures::COMMUNICATION_NODE)
.take(n)
{
assert!(peer.node_id.distance(&network_region_node_id) <= node_region_threshold);
}

// Next 5 should not be in the region
for peer in test_peers
.iter()
.filter(|p| p.features == PeerFeatures::COMMUNICATION_NODE)
.skip(n)
{
assert!(peer.node_id.distance(&network_region_node_id) > node_region_threshold);
}

let node_region_threshold = peer_manager
.calc_region_threshold(&network_region_node_id, n, PeerFeatures::COMMUNICATION_CLIENT)
.await
.unwrap();

// First 5 clients should be in region
for peer in test_peers
.iter()
.filter(|p| p.features == PeerFeatures::COMMUNICATION_CLIENT)
.take(5)
{
assert!(peer.node_id.distance(&network_region_node_id) <= node_region_threshold);
}

// Next 5 should not be in the region
for peer in test_peers
.iter()
.filter(|p| p.features == PeerFeatures::COMMUNICATION_CLIENT)
.skip(5)
{
assert!(peer.node_id.distance(&network_region_node_id) > node_region_threshold);
}
}

#[tokio_macros::test_basic]
async fn closest_peers() {
let n = 5;
// Create peer manager with random peers
let peer_manager = PeerManager::new(HashmapDatabase::new()).unwrap();
let network_region_node_id = create_test_peer(false, Default::default()).node_id;
let test_peers = (0..10)
.map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_NODE))
.chain((0..10).map(|_| create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT)))
.collect::<Vec<_>>();

for p in &test_peers {
peer_manager.add_peer(p.clone()).await.unwrap();
}

for features in &[PeerFeatures::COMMUNICATION_NODE, PeerFeatures::COMMUNICATION_CLIENT] {
let node_threshold = peer_manager
.peer_storage
.read()
.await
.calc_region_threshold(&network_region_node_id, n, *features)
.unwrap();

let closest = peer_manager
.closest_peers(&network_region_node_id, n, &[], Some(*features))
.await
.unwrap();

assert!(closest
.iter()
.any(|peer_identity| peer.node_id == peer_identity.node_id)
{
assert!(peer_manager
.in_network_region(&peer.node_id, &network_region_node_id, n)
.await
.unwrap());
} else {
assert!(!peer_manager
.in_network_region(&peer.node_id, &network_region_node_id, n)
.await
.unwrap());
}
.all(|p| network_region_node_id.distance(&p.node_id) <= node_threshold));
}
}
}
8 changes: 7 additions & 1 deletion comms/src/peer_manager/node_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl NodeDistance {
nd
}

pub fn max_distance() -> NodeDistance {
pub const fn max_distance() -> NodeDistance {
NodeDistance([255; NODE_ID_ARRAY_SIZE])
}
}
Expand Down Expand Up @@ -355,6 +355,12 @@ mod test {
assert!(n1_to_n2_dist < n1_to_n3_dist);
assert_eq!(n1_to_n2_dist, desired_n1_to_n2_dist);
assert_eq!(n1_to_n3_dist, desired_n1_to_n3_dist);

// Commutative
let n1_to_n2_dist = node_id1.distance(&node_id2);
let n2_to_n1_dist = node_id2.distance(&node_id1);

assert_eq!(n1_to_n2_dist, n2_to_n1_dist);
}

#[test]
Expand Down
57 changes: 41 additions & 16 deletions comms/src/peer_manager/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,18 @@ where DS: KeyValueStore<PeerId, Peer>
node_id: &NodeId,
n: usize,
excluded_peers: &[CommsPublicKey],
features: Option<PeerFeatures>,
) -> Result<Vec<Peer>, PeerManagerError>
{
let mut peer_keys = Vec::new();
let mut dists = Vec::new();
self.peer_db
.for_each_ok(|(peer_key, peer)| {
if !peer.is_banned() && !excluded_peers.contains(&peer.public_key) {
if features.map(|f| peer.features == f).unwrap_or(true) &&
!peer.is_banned() &&
!peer.is_offline() &&
!excluded_peers.contains(&peer.public_key)
{
peer_keys.push(peer_key);
dists.push(node_id.distance(&peer.node_id));
}
Expand Down Expand Up @@ -345,7 +350,7 @@ where DS: KeyValueStore<PeerId, Peer>
// TODO: Send to a random set of Communication Nodes
let mut peer_keys = self
.peer_db
.filter(|(_, peer)| !peer.is_banned())
.filter(|(_, peer)| !peer.is_banned() && peer.features == PeerFeatures::COMMUNICATION_NODE)
.map(|pairs| pairs.into_iter().map(|(k, _)| k).collect::<Vec<_>>())
.map_err(PeerManagerError::DatabaseError)?;

Expand Down Expand Up @@ -383,31 +388,51 @@ where DS: KeyValueStore<PeerId, Peer>
n: usize,
) -> Result<bool, PeerManagerError>
{
let region2node_dist = region_node_id.distance(node_id);
let region_node_distance = region_node_id.distance(node_id);
let node_threshold = self.calc_region_threshold(region_node_id, n, PeerFeatures::COMMUNICATION_NODE)?;
// Is node ID in the base node threshold?
if region_node_distance <= node_threshold {
return Ok(true);
}
let client_threshold = self.calc_region_threshold(region_node_id, n, PeerFeatures::COMMUNICATION_CLIENT)?;
// Is node ID in the base client threshold?
Ok(region_node_distance <= client_threshold)
}

pub fn calc_region_threshold(
&self,
region_node_id: &NodeId,
n: usize,
features: PeerFeatures,
) -> Result<NodeDistance, PeerManagerError>
{
let mut dists = vec![NodeDistance::max_distance(); n];
let last_index = dists.len() - 1;
let last_index = n - 1;

self.peer_db
.for_each_ok(|(_, peer)| {
if !peer.is_banned() {
let curr_dist = region_node_id.distance(&peer.node_id);
for i in 0..dists.len() {
if dists[i] > curr_dist {
dists.insert(i, curr_dist);
dists.pop();
break;
}
}
if peer.features != features {
return IterationResult::Continue;
}

if peer.is_banned() || peer.is_offline() {
return IterationResult::Continue;
}

if region2node_dist > dists[last_index] {
return IterationResult::Break;
let curr_dist = region_node_id.distance(&peer.node_id);
for i in 0..dists.len() {
if dists[i] > curr_dist {
dists.insert(i, curr_dist);
dists.pop();
break;
}
}

IterationResult::Continue
})
.map_err(PeerManagerError::DatabaseError)?;

Ok(region2node_dist <= dists[last_index])
Ok(dists.remove(last_index))
}

/// Changes the ban flag bit of the peer
Expand Down
Loading