From 4eb87e7410d81f7a390d734e60ee3d712c8b60bc Mon Sep 17 00:00:00 2001 From: Stanimal Date: Wed, 16 Sep 2020 11:05:45 +0200 Subject: [PATCH] Propagate block hashes to all connected nodes This PR changes the `Flood` broadcast stragey to "flood" a message to every _connected_ peer node. Since PR #1986, block hashes are propagated across the network and the full block only downloaded once per peer (push became push-pull gossip). This large drop in propagation overhead means that we can afford to spread the hash a little more widely and use the modified `Flood` stragegy. The main change is to change the flood strategy from sending to _all_ valid peers in the peer list, to only sending to currenty connected peers (inbound and outbound connections). As before, client nodes are exempt from flood propagation messages. --- .../core/src/base_node/service/service.rs | 2 +- .../core/src/proof_of_work/monero_rx.rs | 2 +- comms/dht/src/actor.rs | 10 +++++----- comms/dht/src/broadcast_strategy.rs | 19 +++++++++++-------- comms/dht/src/outbound/broadcast.rs | 4 ++-- comms/dht/src/outbound/message_params.rs | 6 +++--- comms/dht/src/outbound/requester.rs | 10 ++++------ 7 files changed, 27 insertions(+), 26 deletions(-) diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index e0daf31913..d8ba520353 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -630,7 +630,7 @@ async fn handle_outbound_block( ) -> Result<(), CommsInterfaceError> { outbound_message_service - .propagate( + .flood( NodeDestination::Unknown, OutboundEncryption::None, exclude_peers, diff --git a/base_layer/core/src/proof_of_work/monero_rx.rs b/base_layer/core/src/proof_of_work/monero_rx.rs index 58c669677b..d052293660 100644 --- a/base_layer/core/src/proof_of_work/monero_rx.rs +++ b/base_layer/core/src/proof_of_work/monero_rx.rs @@ -395,7 +395,7 @@ mod test { for item in block.clone().tx_hashes { hashes.push(item); } - let mut root = tree_hash(&hashes).unwrap(); // tree_hash.c used by monero + let root = tree_hash(&hashes).unwrap(); let mut encode2 = header; encode2.extend_from_slice(root.as_bytes()); encode2.append(&mut count); diff --git a/comms/dht/src/actor.rs b/comms/dht/src/actor.rs index a0728c2c88..a20ffef7f9 100644 --- a/comms/dht/src/actor.rs +++ b/comms/dht/src/actor.rs @@ -393,11 +393,11 @@ impl DhtActor { .map(|peer| peer.map(|p| vec![p.node_id]).unwrap_or_default()) .map_err(Into::into) }, - Flood => { - // Send to all known peers - // TODO: This should never be needed, remove - let peers = peer_manager.flood_peers().await?; - Ok(peers.into_iter().map(|p| p.node_id).collect()) + Flood(exclude) => { + let peers = connectivity + .select_connections(ConnectivitySelection::all_nodes(exclude)) + .await?; + Ok(peers.into_iter().map(|p| p.peer_node_id().clone()).collect()) }, Closest(closest_request) => { let candidates = if closest_request.connected_only { diff --git a/comms/dht/src/broadcast_strategy.rs b/comms/dht/src/broadcast_strategy.rs index 297acd4834..776e3cd356 100644 --- a/comms/dht/src/broadcast_strategy.rs +++ b/comms/dht/src/broadcast_strategy.rs @@ -38,8 +38,8 @@ pub enum BroadcastStrategy { DirectNodeId(Box), /// Send to a particular peer matching the given Public Key DirectPublicKey(Box), - /// Send to all known peers - Flood, + /// Send to all connected peers. If no peers are connected, no messages are sent. + Flood(Vec), /// Send to a random set of peers of size n that are Communication Nodes, excluding the given node IDs Random(usize, Vec), /// Send to all n nearest Communication Nodes according to the given BroadcastClosestRequest @@ -55,7 +55,7 @@ impl fmt::Display for BroadcastStrategy { match self { DirectPublicKey(pk) => write!(f, "DirectPublicKey({})", pk), DirectNodeId(node_id) => write!(f, "DirectNodeId({})", node_id), - Flood => write!(f, "Flood"), + Flood(excluded) => write!(f, "Flood({} excluded)", excluded.len()), Closest(request) => write!(f, "Closest({})", request.n), Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()), Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()), @@ -65,10 +65,11 @@ impl fmt::Display for BroadcastStrategy { } impl BroadcastStrategy { - pub fn is_broadcast(&self) -> bool { + /// Returns true if this strategy can select multiple peers, otherwise false + pub fn is_multi_peer(&self) -> bool { use BroadcastStrategy::*; match self { - Closest(_) | Flood | Broadcast(_) | Random(_, _) | Propagate(_, _) => true, + Closest(_) | Flood(_) | Broadcast(_) | Random(_, _) | Propagate(_, _) => true, _ => false, } } @@ -119,7 +120,7 @@ mod test { BroadcastStrategy::Propagate(Default::default(), Default::default()).is_direct(), false ); - assert_eq!(BroadcastStrategy::Flood.is_direct(), false); + assert_eq!(BroadcastStrategy::Flood(Default::default()).is_direct(), false); assert_eq!( BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest { node_id: NodeId::default(), @@ -144,7 +145,9 @@ mod test { assert!(BroadcastStrategy::Broadcast(Default::default(),) .direct_public_key() .is_none()); - assert!(BroadcastStrategy::Flood.direct_public_key().is_none()); + assert!(BroadcastStrategy::Flood(Default::default()) + .direct_public_key() + .is_none()); assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest { node_id: NodeId::default(), n: 0, @@ -170,7 +173,7 @@ mod test { assert!(BroadcastStrategy::Broadcast(Default::default(),) .direct_node_id() .is_none()); - assert!(BroadcastStrategy::Flood.direct_node_id().is_none()); + assert!(BroadcastStrategy::Flood(Default::default()).direct_node_id().is_none()); assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest { node_id: NodeId::default(), n: 0, diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index 84686d483c..95fe944036 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -271,7 +271,7 @@ where S: Service is_discovery_enabled, ); - let is_broadcast = broadcast_strategy.is_broadcast(); + let is_broadcast = broadcast_strategy.is_multi_peer(); // Discovery is required if: // - Discovery is enabled for this request @@ -585,7 +585,7 @@ mod test { service .call(DhtOutboundRequest::SendMessage( - Box::new(SendMessageParams::new().flood().finish()), + Box::new(SendMessageParams::new().flood(vec![]).finish()), "custom_msg".as_bytes().into(), reply_tx, )) diff --git a/comms/dht/src/outbound/message_params.rs b/comms/dht/src/outbound/message_params.rs index b5bc6080ce..e7cdc683d5 100644 --- a/comms/dht/src/outbound/message_params.rs +++ b/comms/dht/src/outbound/message_params.rs @@ -71,7 +71,7 @@ pub struct FinalSendMessageParams { impl Default for FinalSendMessageParams { fn default() -> Self { Self { - broadcast_strategy: BroadcastStrategy::Flood, + broadcast_strategy: BroadcastStrategy::Flood(Default::default()), destination: Default::default(), encryption: Default::default(), dht_message_type: Default::default(), @@ -146,8 +146,8 @@ impl SendMessageParams { } /// Set broadcast_strategy to Flood - pub fn flood(&mut self) -> &mut Self { - self.params_mut().broadcast_strategy = BroadcastStrategy::Flood; + pub fn flood(&mut self, excluded: Vec) -> &mut Self { + self.params_mut().broadcast_strategy = BroadcastStrategy::Flood(excluded); self } diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index aa469ec6ec..519f898bee 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -150,14 +150,12 @@ impl OutboundMessageRequester { .map_err(Into::into) } - /// Send to _ALL_ known peers. - /// - /// This should be used with caution as, depending on the number of known peers, a lot of network - /// traffic could be generated from this node. - pub async fn send_flood( + /// Send to all _connected_ peers. + pub async fn flood( &mut self, destination: NodeDestination, encryption: OutboundEncryption, + exclude_peers: Vec, message: OutboundDomainMessage, ) -> Result where @@ -165,7 +163,7 @@ impl OutboundMessageRequester { { self.send_message( SendMessageParams::new() - .flood() + .flood(exclude_peers) .with_destination(destination) .with_encryption(encryption) .finish(),