Skip to content

Commit

Permalink
v1.18: adds api to obtain the parent node in the turbine retransmit t…
Browse files Browse the repository at this point in the history
…ree (backport of #115) (#135)

adds api to obtain the parent node in the turbine retransmit tree (#115)

Following commits will use this api to check retransmitter's signature
on incoming shreds.

(cherry picked from commit 42e8309)

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Mar 8, 2024
1 parent 50469ae commit d4678fc
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio = { workspace = true }
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
test-case = { workspace = true }

[[bench]]
name = "cluster_info"
Expand Down
199 changes: 177 additions & 22 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
Expand Down Expand Up @@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
Expand All @@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
Expand Down Expand Up @@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
addrs,
})
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or if it is not staked.
#[allow(unused)]
fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<Option<Pubkey>, Error> {
// Exclude slot leader from list of nodes.
if leader == &self.pubkey {
return Err(Error::Loopback {
leader: *leader,
shred: *shred,
});
}
// Unstaked nodes' position in the turbine tree is not deterministic
// and depends on gossip propagation of contact-infos. Therefore, if
// this node is not staked return None.
if self.nodes[self.index[&self.pubkey]].stake == 0 {
return Ok(None);
}
let mut weighted_shuffle = self.weighted_shuffle.clone();
if let Some(index) = self.index.get(leader).copied() {
weighted_shuffle.remove_index(index);
}
let mut rng = get_seeded_rng(leader, shred);
// Only need shuffled nodes until this node itself.
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.take_while(|node| node.pubkey() != self.pubkey)
.collect();
let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
Ok(parent.map(Node::pubkey))
}
}

pub fn new_cluster_nodes<T: 'static>(
Expand Down Expand Up @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}

fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
let seed = shred.seed(leader);
ChaChaRng::from_seed(seed)
}

// root : [0]
// 1st layer: [1, 2, ..., fanout]
// 2nd layer: [[fanout + 1, ..., fanout * 2],
Expand Down Expand Up @@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
.copied()
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree.
fn get_retransmit_parent<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> Option<T> {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
let index = index.checked_sub(1)? / fanout;
let index = index - index.saturating_sub(1) % fanout;
let index = if index == 0 { index } else { index + offset };
nodes.get(index).copied()
}

impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
Expand Down Expand Up @@ -527,7 +582,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &

#[cfg(test)]
mod tests {
use super::*;
use {
super::*,
std::{fmt::Debug, hash::Hash},
test_case::test_case,
};

#[test]
fn test_cluster_nodes_retransmit() {
Expand Down Expand Up @@ -600,10 +659,42 @@ mod tests {
}
}

// Checks (1) computed retransmit children against expected children and
// (2) computed parent of each child against the expected parent.
fn check_retransmit_nodes<T>(fanout: usize, nodes: &[T], peers: Vec<Vec<T>>)
where
T: Copy + Eq + PartialEq + Debug + Hash,
{
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
let offset = peers.len();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
}
}

#[test]
fn test_get_retransmit_peers() {
fn test_get_retransmit_nodes() {
// fanout 2
let index = vec![
let nodes = [
7, // root
6, 10, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -631,16 +722,9 @@ mod tests {
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
// fanout 3
let index = vec![
let nodes = [
19, // root
14, 15, 28, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -672,13 +756,84 @@ mod tests {
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
let nodes = [
5, // root
34, 52, 8, // 1st layer
// 2nd layar
44, 18, 2, // 1st neigborhood
42, 47, 46, // 2nd
11, 26, 28, // 3rd
// 3rd layer
53, 23, 37, // 1st neighborhood
40, 13, 7, // 2nd
50, 35, 22, // 3rd
3, 27, 31, // 4th
10, 48, 15, // 5th
19, 6, 30, // 6th
36, 45, 1, // 7th
38, 12, 17, // 8th
4, 32, 16, // 9th
// 4th layer
41, 49, 24, // 1st neighborhood
14, 9, 0, // 2nd
29, 21, 39, // 3rd
43, 51, 33, // 4th
25, 20, // 5th
];
let peers = vec![
vec![34, 52, 8],
vec![44, 42, 11],
vec![18, 47, 26],
vec![2, 46, 28],
vec![53, 40, 50],
vec![23, 13, 35],
vec![37, 7, 22],
vec![3, 10, 19],
vec![27, 48, 6],
vec![31, 15, 30],
vec![36, 38, 4],
vec![45, 12, 32],
vec![1, 17, 16],
vec![41, 14, 29],
vec![49, 9, 21],
vec![24, 0, 39],
vec![43, 25],
vec![51, 20],
vec![33],
];
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
}

#[test_case(2, 1_347)]
#[test_case(3, 1_359)]
#[test_case(4, 4_296)]
#[test_case(5, 3_925)]
#[test_case(6, 8_778)]
#[test_case(7, 9_879)]
fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) {
let mut rng = rand::thread_rng();
let mut nodes: Vec<_> = (0..size).collect();
nodes.shuffle(&mut rng);
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
for k in 1..size {
let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes);
assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k]));
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
for k in 0..size {
let parent = Some(nodes[k]);
for peer in get_retransmit_peers(fanout, k, &nodes) {
assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent);
}
}
}
}

0 comments on commit d4678fc

Please sign in to comment.