diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs new file mode 100644 index 00000000000000..0f3b8364442f28 --- /dev/null +++ b/core/benches/cluster_nodes.rs @@ -0,0 +1,112 @@ +#![feature(test)] + +extern crate test; + +use { + rand::{seq::SliceRandom, Rng}, + solana_core::{ + cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes}, + retransmit_stage::RetransmitStage, + }, + solana_gossip::contact_info::ContactInfo, + solana_sdk::{clock::Slot, hash::hashv, pubkey::Pubkey, signature::Signature}, + test::Bencher, +}; + +const NUM_SIMULATED_SHREDS: usize = 4; + +fn make_cluster_nodes( + rng: &mut R, + unstaked_ratio: Option<(u32, u32)>, +) -> (Vec, ClusterNodes) { + let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio); + let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); + (nodes, cluster_nodes) +} + +fn get_retransmit_peers_deterministic( + cluster_nodes: &ClusterNodes, + slot: &Slot, + slot_leader: &Pubkey, + num_simulated_shreds: usize, +) { + for i in 0..num_simulated_shreds { + // see Shred::seed + let shred_seed = hashv(&[ + &slot.to_le_bytes(), + &(i as u32).to_le_bytes(), + &slot_leader.to_bytes(), + ]) + .to_bytes(); + + let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_deterministic( + shred_seed, + solana_gossip::cluster_info::DATA_PLANE_FANOUT, + *slot_leader, + ); + } +} + +fn get_retransmit_peers_compat( + cluster_nodes: &ClusterNodes, + slot_leader: &Pubkey, + signatures: &[Signature], +) { + for signature in signatures.iter() { + // see Shred::seed + let signature = signature.as_ref(); + let offset = signature.len().checked_sub(32).unwrap(); + let shred_seed = signature[offset..].try_into().unwrap(); + + let (_neighbors, _children) = cluster_nodes.get_retransmit_peers_compat( + shred_seed, + solana_gossip::cluster_info::DATA_PLANE_FANOUT, + *slot_leader, + ); + } +} + +fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) { + let mut rng = rand::thread_rng(); + let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio); + let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; + let slot = rand::random::(); + b.iter(|| { + get_retransmit_peers_deterministic( + &cluster_nodes, + &slot, + &slot_leader, + NUM_SIMULATED_SHREDS, + ) + }); +} + +fn get_retransmit_peers_compat_wrapper(b: &mut Bencher, unstaked_ratio: Option<(u32, u32)>) { + let mut rng = rand::thread_rng(); + let (nodes, cluster_nodes) = make_cluster_nodes(&mut rng, unstaked_ratio); + let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; + let signatures: Vec<_> = std::iter::repeat_with(Signature::new_unique) + .take(NUM_SIMULATED_SHREDS) + .collect(); + b.iter(|| get_retransmit_peers_compat(&cluster_nodes, &slot_leader, &signatures)); +} + +#[bench] +fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_2(b: &mut Bencher) { + get_retransmit_peers_deterministic_wrapper(b, Some((1, 2))); +} + +#[bench] +fn bench_get_retransmit_peers_compat_unstaked_ratio_1_2(b: &mut Bencher) { + get_retransmit_peers_compat_wrapper(b, Some((1, 2))); +} + +#[bench] +fn bench_get_retransmit_peers_deterministic_unstaked_ratio_1_32(b: &mut Bencher) { + get_retransmit_peers_deterministic_wrapper(b, Some((1, 32))); +} + +#[bench] +fn bench_get_retransmit_peers_compat_unstaked_ratio_1_32(b: &mut Bencher) { + get_retransmit_peers_compat_wrapper(b, Some((1, 32))); +} diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 9618c18cdf1a7b..1df141607a7eeb 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -2,12 +2,14 @@ use { crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, lru::LruCache, - rand::SeedableRng, + rand::{seq::SliceRandom, Rng, SeedableRng}, rand_chacha::ChaChaRng, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, + crds::GossipRoute, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + crds_value::{CrdsData, CrdsValue}, weighted_shuffle::{weighted_best, weighted_shuffle, WeightedShuffle}, }, solana_ledger::shred::Shred, @@ -16,6 +18,7 @@ use { clock::{Epoch, Slot}, feature_set, pubkey::Pubkey, + signature::Keypair, timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, @@ -23,6 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, + iter::repeat_with, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -39,7 +43,7 @@ enum NodeId { Pubkey(Pubkey), } -struct Node { +pub struct Node { node: NodeId, stake: u64, } @@ -233,6 +237,18 @@ impl ClusterNodes { if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); } + self.get_retransmit_peers_deterministic(shred_seed, fanout, slot_leader) + } + + pub fn get_retransmit_peers_deterministic( + &self, + shred_seed: [u8; 32], + fanout: usize, + slot_leader: Pubkey, + ) -> ( + Vec<&Node>, // neighbors + Vec<&Node>, // children + ) { let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == self.pubkey { @@ -256,7 +272,7 @@ impl ClusterNodes { (neighbors, children) } - fn get_retransmit_peers_compat( + pub fn get_retransmit_peers_compat( &self, shred_seed: [u8; 32], fanout: usize, @@ -297,7 +313,7 @@ impl ClusterNodes { } } -fn new_cluster_nodes( +pub fn new_cluster_nodes( cluster_info: &ClusterInfo, stakes: &HashMap, ) -> ClusterNodes { @@ -462,22 +478,61 @@ impl From for NodeId { } } +pub fn make_test_cluster( + rng: &mut R, + num_nodes: usize, + unstaked_ratio: Option<(u32, u32)>, +) -> ( + Vec, + HashMap, // stakes + ClusterInfo, +) { + let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7)); + let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) + .take(num_nodes) + .collect(); + nodes.shuffle(rng); + let this_node = nodes[0].clone(); + let mut stakes: HashMap = nodes + .iter() + .filter_map(|node| { + if rng.gen_ratio(unstaked_numerator, unstaked_denominator) { + None // No stake for some of the nodes. + } else { + Some((node.id, rng.gen_range(0, 20))) + } + }) + .collect(); + // Add some staked nodes with no contact-info. + stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100)); + let cluster_info = ClusterInfo::new( + this_node, + Arc::new(Keypair::new()), + SocketAddrSpace::Unspecified, + ); + { + let now = timestamp(); + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + // First node is pushed to crds table by ClusterInfo constructor. + for node in nodes.iter().skip(1) { + let node = CrdsData::ContactInfo(node.clone()); + let node = CrdsValue::new_unsigned(node); + assert_eq!( + gossip_crds.insert(node, now, GossipRoute::LocalMessage), + Ok(()) + ); + } + } + (nodes, stakes, cluster_info) +} + #[cfg(test)] mod tests { use { super::*, - rand::{seq::SliceRandom, Rng}, - solana_gossip::{ - crds::GossipRoute, - crds_value::{CrdsData, CrdsValue}, - deprecated::{ - shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, - sorted_stakes_with_index, - }, + solana_gossip::deprecated::{ + shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, sorted_stakes_with_index, }, - solana_sdk::{signature::Keypair, timing::timestamp}, - solana_streamer::socket::SocketAddrSpace, - std::{iter::repeat_with, sync::Arc}, }; // Legacy methods copied for testing backward compatibility. @@ -499,55 +554,10 @@ mod tests { sorted_stakes_with_index(peers, stakes) } - fn make_cluster( - rng: &mut R, - ) -> ( - Vec, - HashMap, // stakes - ClusterInfo, - ) { - let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) - .take(1000) - .collect(); - nodes.shuffle(rng); - let this_node = nodes[0].clone(); - let mut stakes: HashMap = nodes - .iter() - .filter_map(|node| { - if rng.gen_ratio(1, 7) { - None // No stake for some of the nodes. - } else { - Some((node.id, rng.gen_range(0, 20))) - } - }) - .collect(); - // Add some staked nodes with no contact-info. - stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100)); - let cluster_info = ClusterInfo::new( - this_node, - Arc::new(Keypair::new()), - SocketAddrSpace::Unspecified, - ); - { - let now = timestamp(); - let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); - // First node is pushed to crds table by ClusterInfo constructor. - for node in nodes.iter().skip(1) { - let node = CrdsData::ContactInfo(node.clone()); - let node = CrdsValue::new_unsigned(node); - assert_eq!( - gossip_crds.insert(node, now, GossipRoute::LocalMessage), - Ok(()) - ); - } - } - (nodes, stakes, cluster_info) - } - #[test] fn test_cluster_nodes_retransmit() { let mut rng = rand::thread_rng(); - let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None); let this_node = cluster_info.my_contact_info(); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); @@ -628,7 +638,7 @@ mod tests { #[test] fn test_cluster_nodes_broadcast() { let mut rng = rand::thread_rng(); - let (nodes, stakes, cluster_info) = make_cluster(&mut rng); + let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 383e4908ecb4b5..60e7d88ab57d1b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -416,7 +416,7 @@ pub fn retransmitter( .unwrap() } -pub(crate) struct RetransmitStage { +pub struct RetransmitStage { retransmit_thread_handle: JoinHandle<()>, window_service: WindowService, cluster_slots_service: ClusterSlotsService,