Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increased redundancy for SAF messages #1929

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,19 @@ async fn main() {
take_a_break().await;
total_messages += drain_messaging_events(&mut messaging_events_rx, false).await;

let random_wallet = wallets.remove(OsRng.gen_range(0, wallets.len() - 1));
let (num_msgs, random_wallet) = do_store_and_forward_message_propagation(
random_wallet,
&wallets,
messaging_events_tx,
&mut messaging_events_rx,
)
.await;
total_messages += num_msgs;
// Put the wallet back
wallets.push(random_wallet);
for _ in 0..5 {
let random_wallet = wallets.remove(OsRng.gen_range(0, wallets.len() - 1));
let (num_msgs, random_wallet) = do_store_and_forward_message_propagation(
random_wallet,
&wallets,
messaging_events_tx.clone(),
&mut messaging_events_rx,
)
.await;
total_messages += num_msgs;
// Put the wallet back
wallets.push(random_wallet);
}

do_network_wide_propagation(&mut nodes).await;

Expand Down Expand Up @@ -896,7 +898,8 @@ async fn setup_comms_dht(
.local_test()
.enable_auto_join()
.with_discovery_timeout(Duration::from_secs(15))
.with_num_neighbouring_nodes(8)
.with_num_neighbouring_nodes(10)
.with_num_random_nodes(5)
.with_propagation_factor(4)
.finish()
.await
Expand Down
43 changes: 24 additions & 19 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,7 @@ impl DhtActor {
outbound_requester
.send_message_no_header(
SendMessageParams::new()
.closest(
node_identity.node_id().clone(),
config.num_neighbouring_nodes,
vec![],
PeerFeatures::MESSAGE_PROPAGATION,
)
.closest(node_identity.node_id().clone(), config.num_neighbouring_nodes, vec![])
.with_dht_message_type(DhtMessageType::Join)
.force_origin()
.finish(),
Expand Down Expand Up @@ -392,14 +387,27 @@ impl DhtActor {
Ok(peers.into_iter().map(|p| p.node_id).collect())
},
Closest(closest_request) => {
Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
closest_request.n,
&closest_request.excluded_peers,
closest_request.peer_features,
)
.await
let candidates = if closest_request.connected_only {
let connections = connectivity
.select_connections(ConnectivitySelection::closest_to(
closest_request.node_id,
closest_request.n,
closest_request.excluded_peers,
))
.await?;

connections.iter().map(|conn| conn.peer_node_id()).cloned().collect()
} else {
Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
closest_request.n,
&closest_request.excluded_peers,
PeerFeatures::MESSAGE_PROPAGATION,
)
.await?
};
Ok(candidates)
},
Random(n, excluded) => {
// Send to a random set of peers of size n that are Communication Nodes
Expand Down Expand Up @@ -619,10 +627,7 @@ mod test {
test_utils::{make_client_identity, make_node_identity, make_peer_manager},
};
use chrono::{DateTime, Utc};
use tari_comms::{
peer_manager::PeerFeatures,
test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair},
};
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;

Expand Down Expand Up @@ -751,8 +756,8 @@ mod test {
let send_request = Box::new(BroadcastClosestRequest {
n: 10,
node_id: node_identity.node_id().clone(),
peer_features: PeerFeatures::DHT_STORE_FORWARD,
excluded_peers: vec![],
connected_only: false,
});
let peers = requester
.select_peers(BroadcastStrategy::Closest(send_request))
Expand Down
13 changes: 5 additions & 8 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@

use crate::envelope::NodeDestination;
use std::{fmt, fmt::Formatter};
use tari_comms::{
peer_manager::{node_id::NodeId, PeerFeatures},
types::CommsPublicKey,
};
use tari_comms::{peer_manager::node_id::NodeId, types::CommsPublicKey};

#[derive(Debug, Clone)]
pub struct BroadcastClosestRequest {
pub n: usize,
pub node_id: NodeId,
pub peer_features: PeerFeatures,
pub excluded_peers: Vec<NodeId>,
pub connected_only: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -128,7 +125,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default()
connected_only: false
}))
.is_direct(),
false
Expand All @@ -152,7 +149,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default()
connected_only: false
}))
.direct_public_key()
.is_none(),);
Expand All @@ -178,7 +175,7 @@ mod test {
node_id: NodeId::default(),
n: 0,
excluded_peers: Default::default(),
peer_features: Default::default(),
connected_only: false
}))
.direct_node_id()
.is_none(),);
Expand Down
9 changes: 7 additions & 2 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ impl DhtBuilder {
self
}

pub fn with_num_neighbouring_nodes(mut self, num_neighbours: usize) -> Self {
self.config.num_neighbouring_nodes = num_neighbours;
pub fn with_num_random_nodes(mut self, n: usize) -> Self {
self.config.num_random_nodes = n;
self
}

pub fn with_num_neighbouring_nodes(mut self, n: usize) -> Self {
self.config.num_neighbouring_nodes = n;
self
}

Expand Down
7 changes: 5 additions & 2 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,17 @@ impl DhtConnectivity {
PeerConnectFailed(node_id) | PeerOffline(node_id) | PeerBanned(node_id) => {
self.replace_managed_peer(node_id).await?;
},
ConnectivityStateDegraded(_) | ConnectivityStateOnline(_) => {
ConnectivityStateDegraded(n) | ConnectivityStateOnline(n) => {
if self.config.auto_join && self.can_send_join() {
info!(target: LOG_TARGET, "Joining the network automatically");
self.dht_requester
.send_join()
.await
.map_err(DhtConnectivityError::SendJoinFailed)?;
self.stats.mark_join_sent();
// If join is only being sent to a single peer, allow it to be resent
if *n > 1 {
self.stats.mark_join_sent();
}
}
},
_ => {},
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl Dht {
)))
.layer(inbound::DecryptionLayer::new(Arc::clone(&self.node_identity)))
.layer(store_forward::ForwardLayer::new(
self.config.clone(),
self.outbound_requester(),
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
Expand Down
7 changes: 7 additions & 0 deletions comms/dht/src/inbound/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct DecryptedDhtMessage {
pub authenticated_origin: Option<CommsPublicKey>,
pub dht_header: DhtMessageHeader,
pub is_saf_message: bool,
pub is_saf_stored: Option<bool>,
pub decryption_result: Result<EnvelopeBody, Vec<u8>>,
}

Expand All @@ -99,6 +100,7 @@ impl DecryptedDhtMessage {
authenticated_origin,
dht_header: message.dht_header,
is_saf_message: message.is_saf_message,
is_saf_stored: None,
decryption_result: Ok(message_body),
}
}
Expand All @@ -111,6 +113,7 @@ impl DecryptedDhtMessage {
authenticated_origin: None,
dht_header: message.dht_header,
is_saf_message: message.is_saf_message,
is_saf_stored: None,
decryption_result: Err(message.body),
}
}
Expand Down Expand Up @@ -158,4 +161,8 @@ impl DecryptedDhtMessage {
Err(b) => b.len(),
}
}

pub fn set_saf_stored(&mut self, is_stored: bool) {
self.is_saf_stored = Some(is_stored);
}
}
28 changes: 15 additions & 13 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use crate::{
proto::envelope::DhtMessageType,
};
use std::{fmt, fmt::Display};
use tari_comms::{
peer_manager::{NodeId, PeerFeatures},
types::CommsPublicKey,
};
use tari_comms::{peer_manager::NodeId, types::CommsPublicKey};

/// Configuration for outbound messages.
///
Expand Down Expand Up @@ -115,19 +112,24 @@ impl SendMessageParams {

/// Set broadcast_strategy to Closest.`excluded_peers` are excluded. Only Peers which have all `features` are
/// included.
pub fn closest(
&mut self,
node_id: NodeId,
n: usize,
excluded_peers: Vec<NodeId>,
peer_features: PeerFeatures,
) -> &mut Self
{
pub fn closest(&mut self, node_id: NodeId, n: usize, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
n,
connected_only: false,
}));
self
}

/// Set broadcast_strategy to Closest.`excluded_peers` are excluded. Only Peers which have all `features` are
/// included.
pub fn closest_connected(&mut self, node_id: NodeId, n: usize, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
peer_features,
n,
connected_only: true,
}));
self
}
Expand Down
Loading