From 93cd0e662cf42e7ec3ebd621a6877b75eb5c28cf Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 1 Jul 2022 12:08:37 -0400 Subject: [PATCH] reverts wide fanout in broadcast when the root node is down A change included in https://github.com/solana-labs/solana/pull/20480 was that when the root node in turbine broadcast tree is down, the leader will broadcast the shred to all nodes in the first layer. The intention was to mitigate the impact of dead nodes on shreds propagation, because if the root node is down, then the entire cluster will miss out the shred. On the other hand, if x% of stake is down, this will cause 200*x% + 1 packets/shreds ratio at the broadcast stage which might contribute to line-rate saturation and packet drop. To avoid this bandwidth saturation issue, this commit reverts that logic and always broadcasts shreds from the leader only to the root node. As before we rely on erasure codes to recover shreds lost due to staked nodes being offline. --- core/src/broadcast_stage.rs | 20 +++---- .../broadcast_duplicates_run.rs | 21 +------ core/src/cluster_nodes.rs | 59 ++----------------- 3 files changed, 16 insertions(+), 84 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 18ab25a0b914c9..ba4c33fa38cc46 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -14,7 +14,10 @@ use { }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, + solana_gossip::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + contact_info::ContactInfo, + }, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -32,7 +35,6 @@ use { }, std::{ collections::{HashMap, HashSet}, - iter::repeat, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -390,8 +392,8 @@ fn update_peer_stats( } } -/// broadcast messages from the leader to layer 1 nodes -/// # Remarks +/// Broadcasts shreds from the leader (i.e. this node) to the root of the +/// turbine retransmit tree for each shred. pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], @@ -416,14 +418,10 @@ pub fn broadcast_shreds( let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); - let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { - repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - )) + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + ContactInfo::is_valid_address(&node.tvu, socket_addr_space) + .then(|| (shred.payload(), node.tvu)) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 741be826c44982..9e60d6c8196cfe 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,7 +3,7 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, - solana_gossip::cluster_info::DATA_PLANE_FANOUT, + solana_gossip::contact_info::ContactInfo, solana_ledger::shred::{ProcessShredsStats, Shredder}, solana_sdk::{ hash::Hash, @@ -270,12 +270,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); - let nodes: Vec<_> = cluster_info - .all_peers() - .into_iter() - .map(|(node, _)| node) - .collect(); - // Create cluster partition. let cluster_partition: HashSet = { let mut cumilative_stake = 0; @@ -302,17 +296,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let addr = cluster_nodes - .get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - ) - .first() - .copied()?; - let node = nodes.iter().find(|node| node.tvu == addr)?; - if !socket_addr_space.check(&node.tvu) { + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + if ContactInfo::is_valid_address(&node.tvu, socket_addr_space) { return None; } if self diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index f83175a9946f8d..22fcc882c07186 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -26,7 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, - iter::{once, repeat_with}, + iter::repeat_with, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -114,62 +114,11 @@ impl ClusterNodes { new_cluster_nodes(cluster_info, stakes) } - pub(crate) fn get_broadcast_addrs( - &self, - shred: &ShredId, - root_bank: &Bank, - fanout: usize, - socket_addr_space: &SocketAddrSpace, - ) -> Vec { - const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); + pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { let shred_seed = shred.seed(&self.pubkey); let mut rng = ChaChaRng::from_seed(shred_seed); - let index = match self.weighted_shuffle.first(&mut rng) { - None => return Vec::default(), - Some(index) => index, - }; - if let Some(node) = self.nodes[index].contact_info() { - let now = timestamp(); - let age = Duration::from_millis(now.saturating_sub(node.wallclock)); - if age < MAX_CONTACT_INFO_AGE - && ContactInfo::is_valid_address(&node.tvu, socket_addr_space) - { - return vec![node.tvu]; - } - } - let mut rng = ChaChaRng::from_seed(shred_seed); - let nodes: Vec<&Node> = self - .weighted_shuffle - .clone() - .shuffle(&mut rng) - .map(|index| &self.nodes[index]) - .collect(); - if nodes.is_empty() { - return Vec::default(); - } - if drop_redundant_turbine_path(shred.slot(), root_bank) { - let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes)); - let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu); - return addrs - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect(); - } - let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); - neighbors[..1] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)) - .chain( - neighbors[1..] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)), - ) - .chain( - children - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)), - ) - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect() + let index = self.weighted_shuffle.first(&mut rng)?; + self.nodes[index].contact_info() } }