Skip to content

Commit

Permalink
limits number of nodes per IP address in Turbine (solana-labs#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Apr 22, 2024
1 parent c47a680 commit c87b830
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 30 deletions.
5 changes: 3 additions & 2 deletions turbine/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
rand::{seq::SliceRandom, Rng},
solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey},
solana_turbine::{
cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
retransmit_stage::RetransmitStage,
Expand All @@ -21,7 +21,8 @@ fn make_cluster_nodes<R: Rng>(
unstaked_ratio: Option<(u32, u32)>,
) -> (Vec<ContactInfo>, ClusterNodes<RetransmitStage>) {
let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
let cluster_nodes =
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
(nodes, cluster_nodes)
}

Expand Down
89 changes: 61 additions & 28 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
genesis_config::ClusterType,
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signer},
Expand All @@ -29,7 +30,7 @@ use {
collections::HashMap,
iter::repeat_with,
marker::PhantomData,
net::SocketAddr,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex, RwLock},
time::{Duration, Instant},
},
Expand All @@ -39,6 +40,9 @@ use {
const DATA_PLANE_FANOUT: usize = 200;
pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;

// Limit number of nodes per IP address.
const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10;

#[derive(Debug, Error)]
pub enum Error {
#[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
Expand Down Expand Up @@ -81,9 +85,6 @@ pub struct ClusterNodesCache<T> {
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
children: Vec<&'a Node>,
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
}

impl Node {
Expand Down Expand Up @@ -147,8 +148,12 @@ impl<T> ClusterNodes<T> {
}

impl ClusterNodes<BroadcastStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
pub fn new(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> Self {
new_cluster_nodes(cluster_info, cluster_type, stakes)
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
Expand All @@ -168,16 +173,13 @@ impl ClusterNodes<RetransmitStage> {
let RetransmitPeers {
root_distance,
children,
addrs,
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
let peers = children
.into_iter()
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
.collect();
Ok((root_distance, peers))
}

pub fn get_retransmit_peers(
Expand All @@ -197,19 +199,10 @@ impl ClusterNodes<RetransmitStage> {
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
}
})
.collect();
let self_index = nodes
.iter()
Expand All @@ -228,7 +221,6 @@ impl ClusterNodes<RetransmitStage> {
Ok(RetransmitPeers {
root_distance,
children: peers.collect(),
addrs,
})
}

Expand Down Expand Up @@ -272,10 +264,11 @@ impl ClusterNodes<RetransmitStage> {

pub fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> ClusterNodes<T> {
let self_pubkey = cluster_info.id();
let nodes = get_nodes(cluster_info, stakes);
let nodes = get_nodes(cluster_info, cluster_type, stakes);
let index: HashMap<_, _> = nodes
.iter()
.enumerate()
Expand All @@ -298,8 +291,21 @@ pub fn new_cluster_nodes<T: 'static>(

// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<Node> {
fn get_nodes(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<Node> {
let self_pubkey = cluster_info.id();
let should_dedup_addrs = match cluster_type {
ClusterType::Development => false,
ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true,
};
// Maps IP addresses to number of nodes at that IP address.
let mut counts = {
let capacity = if should_dedup_addrs { stakes.len() } else { 0 };
HashMap::<IpAddr, usize>::with_capacity(capacity)
};
// The local node itself.
std::iter::once({
let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
Expand Down Expand Up @@ -328,6 +334,30 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
// Since sorted_by_key is stable, in case of duplicates, this
// will keep nodes with contact-info.
.dedup_by(|a, b| a.pubkey() == b.pubkey())
.filter_map(|node| {
if !should_dedup_addrs
|| node
.contact_info()
.and_then(|node| node.tvu(Protocol::UDP).ok())
.map(|addr| {
*counts
.entry(addr.ip())
.and_modify(|count| *count += 1)
.or_insert(1)
})
<= Some(MAX_NUM_NODES_PER_IP_ADDRESS)
{
Some(node)
} else {
// If the node is not staked, drop it entirely. Otherwise, keep the
// pubkey for deterministic shuffle, but strip the contact-info so
// that no more packets are sent to this node.
(node.stake > 0u64).then(|| Node {
node: NodeId::from(node.pubkey()),
stake: node.stake,
})
}
})
.collect()
}

Expand Down Expand Up @@ -446,6 +476,7 @@ impl<T: 'static> ClusterNodesCache<T> {
}
let nodes = Arc::new(new_cluster_nodes::<T>(
cluster_info,
root_bank.cluster_type(),
&epoch_staked_nodes.unwrap_or_default(),
));
*entry = Some((Instant::now(), Arc::clone(&nodes)));
Expand Down Expand Up @@ -583,7 +614,8 @@ mod tests {
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
let cluster_nodes =
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
// All nodes with contact-info should be in the index.
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
Expand Down Expand Up @@ -618,7 +650,8 @@ mod tests {
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let cluster_nodes =
ClusterNodes::<BroadcastStage>::new(&cluster_info, ClusterType::Development, &stakes);
// All nodes with contact-info should be in the index.
// Excluding this node itself.
// Staked nodes with no contact-info should be included.
Expand Down

0 comments on commit c87b830

Please sign in to comment.