Skip to content

Commit

Permalink
Merge pull request #1929 from tari-project/sb-dht-saf-spread
Browse files Browse the repository at this point in the history
Increased redundancy for SAF messages
  • Loading branch information
philipr-za authored Jun 1, 2020
2 parents ce1b404 + d86401b commit 8ee27eb
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 82 deletions.
27 changes: 15 additions & 12 deletions comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,19 @@ async fn main() {
take_a_break().await;
total_messages += drain_messaging_events(&mut messaging_events_rx, false).await;

let random_wallet = wallets.remove(OsRng.gen_range(0, wallets.len() - 1));
let (num_msgs, random_wallet) = do_store_and_forward_message_propagation(
random_wallet,
&wallets,
messaging_events_tx,
&mut messaging_events_rx,
)
.await;
total_messages += num_msgs;
// Put the wallet back
wallets.push(random_wallet);
for _ in 0..5 {
let random_wallet = wallets.remove(OsRng.gen_range(0, wallets.len() - 1));
let (num_msgs, random_wallet) = do_store_and_forward_message_propagation(
random_wallet,
&wallets,
messaging_events_tx.clone(),
&mut messaging_events_rx,
)
.await;
total_messages += num_msgs;
// Put the wallet back
wallets.push(random_wallet);
}

do_network_wide_propagation(&mut nodes).await;

Expand Down Expand Up @@ -896,7 +898,8 @@ async fn setup_comms_dht(
.local_test()
.enable_auto_join()
.with_discovery_timeout(Duration::from_secs(15))
.with_num_neighbouring_nodes(8)
.with_num_neighbouring_nodes(10)
.with_num_random_nodes(5)
.with_propagation_factor(4)
.finish()
.await
Expand Down
43 changes: 24 additions & 19 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,7 @@ impl DhtActor {
outbound_requester
.send_message_no_header(
SendMessageParams::new()
.closest(
node_identity.node_id().clone(),
config.num_neighbouring_nodes,
vec![],
PeerFeatures::MESSAGE_PROPAGATION,
)
.closest(node_identity.node_id().clone(), config.num_neighbouring_nodes, vec![])
.with_dht_message_type(DhtMessageType::Join)
.force_origin()
.finish(),
Expand Down Expand Up @@ -392,14 +387,27 @@ impl DhtActor {
Ok(peers.into_iter().map(|p| p.node_id).collect())
},
Closest(closest_request) => {
Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
closest_request.n,
&closest_request.excluded_peers,
closest_request.peer_features,
)
.await
let candidates = if closest_request.connected_only {
let connections = connectivity
.select_connections(ConnectivitySelection::closest_to(
closest_request.node_id,
closest_request.n,
closest_request.excluded_peers,
))
.await?;

connections.iter().map(|conn| conn.peer_node_id()).cloned().collect()
} else {
Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
closest_request.n,
&closest_request.excluded_peers,
PeerFeatures::MESSAGE_PROPAGATION,
)
.await?
};
Ok(candidates)
},
Random(n, excluded) => {
// Send to a random set of peers of size n that are Communication Nodes
Expand Down Expand Up @@ -619,10 +627,7 @@ mod test {
test_utils::{make_client_identity, make_node_identity, make_peer_manager},
};
use chrono::{DateTime, Utc};
use tari_comms::{
peer_manager::PeerFeatures,
test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair},
};
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;

Expand Down Expand Up @@ -751,8 +756,8 @@ mod test {
let send_request = Box::new(BroadcastClosestRequest {
n: 10,
node_id: node_identity.node_id().clone(),
peer_features: PeerFeatures::DHT_STORE_FORWARD,
excluded_peers: vec![],
connected_only: false,
});
let peers = requester
.select_peers(BroadcastStrategy::Closest(send_request))
Expand Down
13 changes: 5 additions & 8 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@

use crate::envelope::NodeDestination;
use std::{fmt, fmt::Formatter};
use tari_comms::{
peer_manager::{node_id::NodeId, PeerFeatures},
types::CommsPublicKey,
};
use tari_comms::{peer_manager::node_id::NodeId, types::CommsPublicKey};

#[derive(Debug, Clone)]
pub struct BroadcastClosestRequest {
pub n: usize,
pub node_id: NodeId,
pub peer_features: PeerFeatures,
pub excluded_peers: Vec<NodeId>,
pub connected_only: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -128,7 +125,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default()
connected_only: false
}))
.is_direct(),
false
Expand All @@ -152,7 +149,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default()
connected_only: false
}))
.direct_public_key()
.is_none(),);
Expand All @@ -178,7 +175,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default(),
connected_only: false
}))
.direct_node_id()
.is_none(),);
Expand Down
9 changes: 7 additions & 2 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ impl DhtBuilder {
self
}

pub fn with_num_neighbouring_nodes(mut self, num_neighbours: usize) -> Self {
self.config.num_neighbouring_nodes = num_neighbours;
pub fn with_num_random_nodes(mut self, n: usize) -> Self {
self.config.num_random_nodes = n;
self
}

pub fn with_num_neighbouring_nodes(mut self, n: usize) -> Self {
self.config.num_neighbouring_nodes = n;
self
}

Expand Down
7 changes: 5 additions & 2 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,17 @@ impl DhtConnectivity {
PeerConnectFailed(node_id) | PeerOffline(node_id) | PeerBanned(node_id) => {
self.replace_managed_peer(node_id).await?;
},
ConnectivityStateDegraded(_) | ConnectivityStateOnline(_) => {
ConnectivityStateDegraded(n) | ConnectivityStateOnline(n) => {
if self.config.auto_join && self.can_send_join() {
info!(target: LOG_TARGET, "Joining the network automatically");
self.dht_requester
.send_join()
.await
.map_err(DhtConnectivityError::SendJoinFailed)?;
self.stats.mark_join_sent();
// If join is only being sent to a single peer, allow it to be resent
if *n > 1 {
self.stats.mark_join_sent();
}
}
},
_ => {},
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl Dht {
)))
.layer(inbound::DecryptionLayer::new(Arc::clone(&self.node_identity)))
.layer(store_forward::ForwardLayer::new(
self.config.clone(),
self.outbound_requester(),
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
Expand Down
7 changes: 7 additions & 0 deletions comms/dht/src/inbound/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct DecryptedDhtMessage {
pub authenticated_origin: Option<CommsPublicKey>,
pub dht_header: DhtMessageHeader,
pub is_saf_message: bool,
pub is_saf_stored: Option<bool>,
pub decryption_result: Result<EnvelopeBody, Vec<u8>>,
}

Expand All @@ -99,6 +100,7 @@ impl DecryptedDhtMessage {
authenticated_origin,
dht_header: message.dht_header,
is_saf_message: message.is_saf_message,
is_saf_stored: None,
decryption_result: Ok(message_body),
}
}
Expand All @@ -111,6 +113,7 @@ impl DecryptedDhtMessage {
authenticated_origin: None,
dht_header: message.dht_header,
is_saf_message: message.is_saf_message,
is_saf_stored: None,
decryption_result: Err(message.body),
}
}
Expand Down Expand Up @@ -158,4 +161,8 @@ impl DecryptedDhtMessage {
Err(b) => b.len(),
}
}

pub fn set_saf_stored(&mut self, is_stored: bool) {
self.is_saf_stored = Some(is_stored);
}
}
28 changes: 15 additions & 13 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use crate::{
proto::envelope::DhtMessageType,
};
use std::{fmt, fmt::Display};
use tari_comms::{
peer_manager::{NodeId, PeerFeatures},
types::CommsPublicKey,
};
use tari_comms::{peer_manager::NodeId, types::CommsPublicKey};

/// Configuration for outbound messages.
///
Expand Down Expand Up @@ -115,19 +112,24 @@ impl SendMessageParams {

/// Set broadcast_strategy to Closest.`excluded_peers` are excluded. Only Peers which have all `features` are
/// included.
pub fn closest(
&mut self,
node_id: NodeId,
n: usize,
excluded_peers: Vec<NodeId>,
peer_features: PeerFeatures,
) -> &mut Self
{
pub fn closest(&mut self, node_id: NodeId, n: usize, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
n,
connected_only: false,
}));
self
}

/// Set broadcast_strategy to Closest.`excluded_peers` are excluded. Only Peers which have all `features` are
/// included.
pub fn closest_connected(&mut self, node_id: NodeId, n: usize, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
peer_features,
n,
connected_only: true,
}));
self
}
Expand Down
Loading

0 comments on commit 8ee27eb

Please sign in to comment.