From 12fe7d39e89dd9a0377adbdded5f36c0e5d9a3ac Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 14 May 2024 03:59:27 +0000 Subject: [PATCH] v1.18: reverts back in SocketAddr dedup in retransmit stage (backport of #1106) (#1225) reverts back in SocketAddr dedup in retransmit stage (#1106) This was erronously deemed as unnecessary and removed in: https://github.com/anza-xyz/agave/pull/864 The commit partially reverts #864 and adds back socket-addr dedup. (cherry picked from commit fbe1dbc75895ff7e1f5fb8abcc6cd2f2a473c309) Co-authored-by: behzad nouri --- turbine/src/cluster_nodes.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 76bae78bc1e6c9..bebbda5af8b1e0 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -85,6 +85,9 @@ pub struct ClusterNodesCache { pub struct RetransmitPeers<'a> { root_distance: usize, // distance from the root node children: Vec<&'a Node>, + // Maps tvu addresses to the first node + // in the shuffle with the same address. + addrs: HashMap, // tvu addresses } impl Node { @@ -173,13 +176,16 @@ impl ClusterNodes { let RetransmitPeers { root_distance, children, + addrs, } = self.get_retransmit_peers(slot_leader, shred, fanout)?; let protocol = get_broadcast_protocol(shred); - let peers = children - .into_iter() - .filter_map(|node| node.contact_info()?.tvu(protocol).ok()) - .collect(); - Ok((root_distance, peers)) + let peers = children.into_iter().filter_map(|node| { + node.contact_info()? + .tvu(protocol) + .ok() + .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) + }); + Ok((root_distance, peers.collect())) } pub fn get_retransmit_peers( @@ -199,10 +205,19 @@ impl ClusterNodes { if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); } + let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut rng = get_seeded_rng(slot_leader, shred); + let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) .map(|index| &self.nodes[index]) + .inspect(|node| { + if let Some(node) = node.contact_info() { + if let Ok(addr) = node.tvu(protocol) { + addrs.entry(addr).or_insert(*node.pubkey()); + } + } + }) .collect(); let self_index = nodes .iter() @@ -221,6 +236,7 @@ impl ClusterNodes { Ok(RetransmitPeers { root_distance, children: peers.collect(), + addrs, }) }