Skip to content

Commit

Permalink
Propagate block hashes to all connected nodes
Browse files Browse the repository at this point in the history
This PR changes the `Flood` broadcast stragey to "flood" a message to
every _connected_ peer node. Since PR tari-project#1986, block hashes
are propagated across the network and the full block only downloaded
once per peer (push became push-pull gossip). This large drop in propagation
overhead means that we can afford to spread the hash a little more
widely and use the modified `Flood` stragegy.

The main change is to change the flood strategy from sending to _all_
valid peers in the peer list, to only sending to currenty connected
peers (inbound and outbound connections). As before, client nodes are
exempt from flood propagation messages.
  • Loading branch information
sdbondi committed Sep 16, 2020
1 parent e8e2087 commit 4eb87e7
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ async fn handle_outbound_block(
) -> Result<(), CommsInterfaceError>
{
outbound_message_service
.propagate(
.flood(
NodeDestination::Unknown,
OutboundEncryption::None,
exclude_peers,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/proof_of_work/monero_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ mod test {
for item in block.clone().tx_hashes {
hashes.push(item);
}
let mut root = tree_hash(&hashes).unwrap(); // tree_hash.c used by monero
let root = tree_hash(&hashes).unwrap();
let mut encode2 = header;
encode2.extend_from_slice(root.as_bytes());
encode2.append(&mut count);
Expand Down
10 changes: 5 additions & 5 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,11 @@ impl DhtActor {
.map(|peer| peer.map(|p| vec![p.node_id]).unwrap_or_default())
.map_err(Into::into)
},
Flood => {
// Send to all known peers
// TODO: This should never be needed, remove
let peers = peer_manager.flood_peers().await?;
Ok(peers.into_iter().map(|p| p.node_id).collect())
Flood(exclude) => {
let peers = connectivity
.select_connections(ConnectivitySelection::all_nodes(exclude))
.await?;
Ok(peers.into_iter().map(|p| p.peer_node_id().clone()).collect())
},
Closest(closest_request) => {
let candidates = if closest_request.connected_only {
Expand Down
19 changes: 11 additions & 8 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ pub enum BroadcastStrategy {
DirectNodeId(Box<NodeId>),
/// Send to a particular peer matching the given Public Key
DirectPublicKey(Box<CommsPublicKey>),
/// Send to all known peers
Flood,
/// Send to all connected peers. If no peers are connected, no messages are sent.
Flood(Vec<NodeId>),
/// Send to a random set of peers of size n that are Communication Nodes, excluding the given node IDs
Random(usize, Vec<NodeId>),
/// Send to all n nearest Communication Nodes according to the given BroadcastClosestRequest
Expand All @@ -55,7 +55,7 @@ impl fmt::Display for BroadcastStrategy {
match self {
DirectPublicKey(pk) => write!(f, "DirectPublicKey({})", pk),
DirectNodeId(node_id) => write!(f, "DirectNodeId({})", node_id),
Flood => write!(f, "Flood"),
Flood(excluded) => write!(f, "Flood({} excluded)", excluded.len()),
Closest(request) => write!(f, "Closest({})", request.n),
Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()),
Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()),
Expand All @@ -65,10 +65,11 @@ impl fmt::Display for BroadcastStrategy {
}

impl BroadcastStrategy {
pub fn is_broadcast(&self) -> bool {
/// Returns true if this strategy can select multiple peers, otherwise false
pub fn is_multi_peer(&self) -> bool {
use BroadcastStrategy::*;
match self {
Closest(_) | Flood | Broadcast(_) | Random(_, _) | Propagate(_, _) => true,
Closest(_) | Flood(_) | Broadcast(_) | Random(_, _) | Propagate(_, _) => true,
_ => false,
}
}
Expand Down Expand Up @@ -119,7 +120,7 @@ mod test {
BroadcastStrategy::Propagate(Default::default(), Default::default()).is_direct(),
false
);
assert_eq!(BroadcastStrategy::Flood.is_direct(), false);
assert_eq!(BroadcastStrategy::Flood(Default::default()).is_direct(), false);
assert_eq!(
BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
Expand All @@ -144,7 +145,9 @@ mod test {
assert!(BroadcastStrategy::Broadcast(Default::default(),)
.direct_public_key()
.is_none());
assert!(BroadcastStrategy::Flood.direct_public_key().is_none());
assert!(BroadcastStrategy::Flood(Default::default())
.direct_public_key()
.is_none());
assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
n: 0,
Expand All @@ -170,7 +173,7 @@ mod test {
assert!(BroadcastStrategy::Broadcast(Default::default(),)
.direct_node_id()
.is_none());
assert!(BroadcastStrategy::Flood.direct_node_id().is_none());
assert!(BroadcastStrategy::Flood(Default::default()).direct_node_id().is_none());
assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
n: 0,
Expand Down
4 changes: 2 additions & 2 deletions comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
is_discovery_enabled,
);

let is_broadcast = broadcast_strategy.is_broadcast();
let is_broadcast = broadcast_strategy.is_multi_peer();

// Discovery is required if:
// - Discovery is enabled for this request
Expand Down Expand Up @@ -585,7 +585,7 @@ mod test {

service
.call(DhtOutboundRequest::SendMessage(
Box::new(SendMessageParams::new().flood().finish()),
Box::new(SendMessageParams::new().flood(vec![]).finish()),
"custom_msg".as_bytes().into(),
reply_tx,
))
Expand Down
6 changes: 3 additions & 3 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct FinalSendMessageParams {
impl Default for FinalSendMessageParams {
fn default() -> Self {
Self {
broadcast_strategy: BroadcastStrategy::Flood,
broadcast_strategy: BroadcastStrategy::Flood(Default::default()),
destination: Default::default(),
encryption: Default::default(),
dht_message_type: Default::default(),
Expand Down Expand Up @@ -146,8 +146,8 @@ impl SendMessageParams {
}

/// Set broadcast_strategy to Flood
pub fn flood(&mut self) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Flood;
pub fn flood(&mut self, excluded: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Flood(excluded);
self
}

Expand Down
10 changes: 4 additions & 6 deletions comms/dht/src/outbound/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,20 @@ impl OutboundMessageRequester {
.map_err(Into::into)
}

/// Send to _ALL_ known peers.
///
/// This should be used with caution as, depending on the number of known peers, a lot of network
/// traffic could be generated from this node.
pub async fn send_flood<T>(
/// Send to all _connected_ peers.
pub async fn flood<T>(
&mut self,
destination: NodeDestination,
encryption: OutboundEncryption,
exclude_peers: Vec<NodeId>,
message: OutboundDomainMessage<T>,
) -> Result<MessageSendStates, DhtOutboundError>
where
T: prost::Message,
{
self.send_message(
SendMessageParams::new()
.flood()
.flood(exclude_peers)
.with_destination(destination)
.with_encryption(encryption)
.finish(),
Expand Down

0 comments on commit 4eb87e7

Please sign in to comment.