diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 18ab25a0b914c9..ba4c33fa38cc46 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -14,7 +14,10 @@ use { }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, + solana_gossip::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + contact_info::ContactInfo, + }, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -32,7 +35,6 @@ use { }, std::{ collections::{HashMap, HashSet}, - iter::repeat, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -390,8 +392,8 @@ fn update_peer_stats( } } -/// broadcast messages from the leader to layer 1 nodes -/// # Remarks +/// Broadcasts shreds from the leader (i.e. this node) to the root of the +/// turbine retransmit tree for each shred. pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], @@ -416,14 +418,10 @@ pub fn broadcast_shreds( let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); - let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { - repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - )) + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + ContactInfo::is_valid_address(&node.tvu, socket_addr_space) + .then(|| (shred.payload(), node.tvu)) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 741be826c44982..9e60d6c8196cfe 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,7 +3,7 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, - solana_gossip::cluster_info::DATA_PLANE_FANOUT, + solana_gossip::contact_info::ContactInfo, solana_ledger::shred::{ProcessShredsStats, Shredder}, solana_sdk::{ hash::Hash, @@ -270,12 +270,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); - let nodes: Vec<_> = cluster_info - .all_peers() - .into_iter() - .map(|(node, _)| node) - .collect(); - // Create cluster partition. let cluster_partition: HashSet = { let mut cumilative_stake = 0; @@ -302,17 +296,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let addr = cluster_nodes - .get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - ) - .first() - .copied()?; - let node = nodes.iter().find(|node| node.tvu == addr)?; - if !socket_addr_space.check(&node.tvu) { + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + if ContactInfo::is_valid_address(&node.tvu, socket_addr_space) { return None; } if self diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index f83175a9946f8d..22fcc882c07186 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -26,7 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, - iter::{once, repeat_with}, + iter::repeat_with, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -114,62 +114,11 @@ impl ClusterNodes { new_cluster_nodes(cluster_info, stakes) } - pub(crate) fn get_broadcast_addrs( - &self, - shred: &ShredId, - root_bank: &Bank, - fanout: usize, - socket_addr_space: &SocketAddrSpace, - ) -> Vec { - const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); + pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { let shred_seed = shred.seed(&self.pubkey); let mut rng = ChaChaRng::from_seed(shred_seed); - let index = match self.weighted_shuffle.first(&mut rng) { - None => return Vec::default(), - Some(index) => index, - }; - if let Some(node) = self.nodes[index].contact_info() { - let now = timestamp(); - let age = Duration::from_millis(now.saturating_sub(node.wallclock)); - if age < MAX_CONTACT_INFO_AGE - && ContactInfo::is_valid_address(&node.tvu, socket_addr_space) - { - return vec![node.tvu]; - } - } - let mut rng = ChaChaRng::from_seed(shred_seed); - let nodes: Vec<&Node> = self - .weighted_shuffle - .clone() - .shuffle(&mut rng) - .map(|index| &self.nodes[index]) - .collect(); - if nodes.is_empty() { - return Vec::default(); - } - if drop_redundant_turbine_path(shred.slot(), root_bank) { - let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes)); - let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu); - return addrs - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect(); - } - let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); - neighbors[..1] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)) - .chain( - neighbors[1..] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)), - ) - .chain( - children - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)), - ) - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect() + let index = self.weighted_shuffle.first(&mut rng)?; + self.nodes[index].contact_info() } }