Skip to content

Commit

Permalink
reverts wide fanout in broadcast when the root node is down (solana-l…
Browse files Browse the repository at this point in the history
…abs#26359)

A change included in
solana-labs#20480
was that when the root node in turbine broadcast tree is down, the
leader will broadcast the shred to all nodes in the first layer.
The intention was to mitigate the impact of dead nodes on shreds
propagation, because if the root node is down, then the entire cluster
will miss out the shred.
On the other hand, if x% of stake is down, this will cause 200*x% + 1
packets/shreds ratio at the broadcast stage which might contribute to
line-rate saturation and packet drop.
To avoid this bandwidth saturation issue, this commit reverts that logic
and always broadcasts shreds from the leader only to the root node.
As before we rely on erasure codes to recover shreds lost due to staked
nodes being offline.
  • Loading branch information
behzadnouri authored and xiangzhu70 committed Aug 17, 2022
1 parent 4f40b00 commit 39cf443
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 84 deletions.
20 changes: 9 additions & 11 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
},
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::Itertools,
solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT},
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
},
solana_ledger::{blockstore::Blockstore, shred::Shred},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
Expand All @@ -32,7 +35,6 @@ use {
},
std::{
collections::{HashMap, HashSet},
iter::repeat,
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -390,8 +392,8 @@ fn update_peer_stats(
}
}

/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// Broadcasts shreds from the leader (i.e. this node) to the root of the
/// turbine retransmit tree for each shred.
pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
Expand All @@ -416,14 +418,10 @@ pub fn broadcast_shreds(
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
))
let node = cluster_nodes.get_broadcast_peer(&shred.id())?;
ContactInfo::is_valid_address(&node.tvu, socket_addr_space)
.then(|| (shred.payload(), node.tvu))
})
})
.collect();
Expand Down
21 changes: 3 additions & 18 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
crate::cluster_nodes::ClusterNodesCache,
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -270,12 +270,6 @@ impl BroadcastRun for BroadcastDuplicatesRun {
(bank_forks.root_bank(), bank_forks.working_bank())
};
let self_pubkey = cluster_info.id();
let nodes: Vec<_> = cluster_info
.all_peers()
.into_iter()
.map(|(node, _)| node)
.collect();

// Create cluster partition.
let cluster_partition: HashSet<Pubkey> = {
let mut cumilative_stake = 0;
Expand All @@ -302,17 +296,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let addr = cluster_nodes
.get_broadcast_addrs(
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
)
.first()
.copied()?;
let node = nodes.iter().find(|node| node.tvu == addr)?;
if !socket_addr_space.check(&node.tvu) {
let node = cluster_nodes.get_broadcast_peer(&shred.id())?;
if ContactInfo::is_valid_address(&node.tvu, socket_addr_space) {
return None;
}
if self
Expand Down
59 changes: 4 additions & 55 deletions core/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
any::TypeId,
cmp::Reverse,
collections::HashMap,
iter::{once, repeat_with},
iter::repeat_with,
marker::PhantomData,
net::SocketAddr,
ops::Deref,
Expand Down Expand Up @@ -114,62 +114,11 @@ impl ClusterNodes<BroadcastStage> {
new_cluster_nodes(cluster_info, stakes)
}

pub(crate) fn get_broadcast_addrs(
&self,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> {
const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60);
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let index = match self.weighted_shuffle.first(&mut rng) {
None => return Vec::default(),
Some(index) => index,
};
if let Some(node) = self.nodes[index].contact_info() {
let now = timestamp();
let age = Duration::from_millis(now.saturating_sub(node.wallclock));
if age < MAX_CONTACT_INFO_AGE
&& ContactInfo::is_valid_address(&node.tvu, socket_addr_space)
{
return vec![node.tvu];
}
}
let mut rng = ChaChaRng::from_seed(shred_seed);
let nodes: Vec<&Node> = self
.weighted_shuffle
.clone()
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.collect();
if nodes.is_empty() {
return Vec::default();
}
if drop_redundant_turbine_path(shred.slot(), root_bank) {
let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes));
let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu);
return addrs
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
}
let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes);
neighbors[..1]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu))
.chain(
neighbors[1..]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards)),
)
.chain(
children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu)),
)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
}

Expand Down

0 comments on commit 39cf443

Please sign in to comment.