diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 84ffae41fb6bf1..9db03d8f0a8355 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -17,7 +17,7 @@ use { Sender as CrossbeamSender, }, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}, + solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -33,6 +33,7 @@ use { }, std::{ collections::HashMap, + iter::repeat, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -412,8 +413,6 @@ pub fn broadcast_shreds( ) -> Result<()> { let mut result = Ok(()); let mut shred_select = Measure::start("shred_select"); - // Only the leader broadcasts shreds. - let leader = cluster_info.id(); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) @@ -427,12 +426,13 @@ pub fn broadcast_shreds( cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); let root_bank = root_bank.clone(); - shreds.filter_map(move |shred| { - let seed = shred.seed(leader, &root_bank); - let node = cluster_nodes.get_broadcast_peer(seed)?; - socket_addr_space - .check(&node.tvu) - .then(|| (&shred.payload, node.tvu)) + shreds.flat_map(move |shred| { + repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs( + shred, + &root_bank, + DATA_PLANE_FANOUT, + socket_addr_space, + )) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index c7b51cf9c22be8..5964cdb606c895 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,6 +3,7 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, + solana_gossip::cluster_info::DATA_PLANE_FANOUT, solana_ledger::shred::Shredder, solana_sdk::{ hash::Hash, @@ -246,6 +247,11 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); + let nodes: Vec<_> = cluster_info + .all_peers() + .into_iter() + .map(|(node, _)| node) + .collect(); // Creat cluster partition. let cluster_partition: HashSet = { @@ -273,8 +279,11 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let seed = shred.seed(self_pubkey, &root_bank); - let node = cluster_nodes.get_broadcast_peer(seed)?; + let addr = cluster_nodes + .get_broadcast_addrs(shred, &root_bank, DATA_PLANE_FANOUT, socket_addr_space) + .first() + .copied()?; + let node = nodes.iter().find(|node| node.tvu == addr)?; if !socket_addr_space.check(&node.tvu) { return None; } diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index ec982db51809cc..ff0ce1124cff19 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -2,22 +2,31 @@ use { crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, lru::LruCache, + rand::{Rng, SeedableRng}, + rand_chacha::ChaChaRng, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, - weighted_shuffle::{weighted_best, weighted_shuffle}, + weighted_shuffle::{ + weighted_best, weighted_sample_single, weighted_shuffle, WeightedShuffle, + }, }, + solana_ledger::shred::Shred, solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, + feature_set, pubkey::Pubkey, + timing::timestamp, }, + solana_streamer::socket::SocketAddrSpace, std::{ any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData, + net::SocketAddr, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -41,6 +50,9 @@ pub struct ClusterNodes { // All staked nodes + other known tvu-peers + the node itself; // sorted by (stake, pubkey) in descending order. nodes: Vec, + // Cumulative stakes (excluding the node itself), used for sampling + // broadcast peers. + cumulative_weights: Vec, // Weights and indices for sampling peers. weighted_{shuffle,best} expect // weights >= 1. For backward compatibility we use max(1, stake) for // weights and exclude nodes with no contact-info. @@ -102,9 +114,68 @@ impl ClusterNodes { new_cluster_nodes(cluster_info, stakes) } + pub(crate) fn get_broadcast_addrs( + &self, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + socket_addr_space: &SocketAddrSpace, + ) -> Vec { + const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); + let shred_seed = shred.seed(self.pubkey, root_bank); + if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { + if let Some(node) = self.get_broadcast_peer(shred_seed) { + if socket_addr_space.check(&node.tvu) { + return vec![node.tvu]; + } + } + return Vec::default(); + } + let mut rng = ChaChaRng::from_seed(shred_seed); + let index = match weighted_sample_single(&mut rng, &self.cumulative_weights) { + None => return Vec::default(), + Some(index) => index, + }; + if let Some(node) = self.nodes[index].contact_info() { + let now = timestamp(); + let age = Duration::from_millis(now.saturating_sub(node.wallclock)); + if age < MAX_CONTACT_INFO_AGE + && ContactInfo::is_valid_address(&node.tvu, socket_addr_space) + { + return vec![node.tvu]; + } + } + let nodes: Vec<_> = self + .nodes + .iter() + .filter(|node| node.pubkey() != self.pubkey) + .collect(); + if nodes.is_empty() { + return Vec::default(); + } + let mut rng = ChaChaRng::from_seed(shred_seed); + let nodes = shuffle_nodes(&mut rng, &nodes); + let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); + neighbors[..1] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)) + .chain( + neighbors[1..] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)), + ) + .chain( + children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)), + ) + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect() + } + /// Returns the root of turbine broadcast tree, which the leader sends the /// shred to. - pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { + fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> { if self.index.is_empty() { None } else { @@ -118,14 +189,82 @@ impl ClusterNodes { } impl ClusterNodes { - pub(crate) fn get_retransmit_peers( + pub(crate) fn get_retransmit_addrs( + &self, + slot_leader: Pubkey, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + ) -> Vec { + let (neighbors, children) = + self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), it should send the packet to tvu socket of its + // children and also tvu_forward socket of its neighbors. Otherwise it + // should only forward to tvu_forwards socket of its children. + if neighbors[0].pubkey() != self.pubkey { + return children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) + .collect(); + } + // First neighbor is this node itself, so skip it. + neighbors[1..] + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) + .chain( + children + .iter() + .filter_map(|node| Some(node.contact_info()?.tvu)), + ) + .collect() + } + + fn get_retransmit_peers( + &self, + slot_leader: Pubkey, + shred: &Shred, + root_bank: &Bank, + fanout: usize, + ) -> ( + Vec<&Node>, // neighbors + Vec<&Node>, // children + ) { + let shred_seed = shred.seed(slot_leader, root_bank); + if !enable_turbine_peers_shuffle_patch(shred.slot(), root_bank) { + return self.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); + } + // Exclude slot leader from list of nodes. + let nodes: Vec<_> = if slot_leader == self.pubkey { + error!("retransmit from slot leader: {}", slot_leader); + self.nodes.iter().collect() + } else { + self.nodes + .iter() + .filter(|node| node.pubkey() != slot_leader) + .collect() + }; + let mut rng = ChaChaRng::from_seed(shred_seed); + let nodes = shuffle_nodes(&mut rng, &nodes); + let self_index = nodes + .iter() + .position(|node| node.pubkey() == self.pubkey) + .unwrap(); + let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); + // Assert that the node itself is included in the set of neighbors, at + // the right offset. + debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); + (neighbors, children) + } + + fn get_retransmit_peers_compat( &self, shred_seed: [u8; 32], fanout: usize, slot_leader: Pubkey, ) -> ( - Vec<&ContactInfo>, // neighbors - Vec<&ContactInfo>, // children + Vec<&Node>, // neighbors + Vec<&Node>, // children ) { // Exclude leader from list of nodes. let (weights, index): (Vec, Vec) = if slot_leader == self.pubkey { @@ -153,14 +292,34 @@ impl ClusterNodes { self.nodes[neighbors[self_index % fanout]].pubkey(), self.pubkey ); - let get_contact_infos = |index: Vec| -> Vec<&ContactInfo> { - index - .into_iter() - .map(|i| self.nodes[i].contact_info().unwrap()) - .collect() - }; - (get_contact_infos(neighbors), get_contact_infos(children)) + let neighbors = neighbors.into_iter().map(|i| &self.nodes[i]).collect(); + let children = children.into_iter().map(|i| &self.nodes[i]).collect(); + (neighbors, children) + } +} + +fn build_cumulative_weights(self_pubkey: Pubkey, nodes: &[Node]) -> Vec { + let cumulative_stakes: Vec<_> = nodes + .iter() + .scan(0, |acc, node| { + if node.pubkey() != self_pubkey { + *acc += node.stake; + } + Some(*acc) + }) + .collect(); + if cumulative_stakes.last() != Some(&0) { + return cumulative_stakes; } + nodes + .iter() + .scan(0, |acc, node| { + if node.pubkey() != self_pubkey { + *acc += 1; + } + Some(*acc) + }) + .collect() } fn new_cluster_nodes( @@ -170,6 +329,11 @@ fn new_cluster_nodes( let self_pubkey = cluster_info.id(); let nodes = get_nodes(cluster_info, stakes); let broadcast = TypeId::of::() == TypeId::of::(); + let cumulative_weights = if broadcast { + build_cumulative_weights(self_pubkey, &nodes) + } else { + Vec::default() + }; // For backward compatibility: // * nodes which do not have contact-info are excluded. // * stakes are floored at 1. @@ -187,6 +351,7 @@ fn new_cluster_nodes( ClusterNodes { pubkey: self_pubkey, nodes, + cumulative_weights, index, _phantom: PhantomData::default(), } @@ -225,6 +390,44 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec bool { + let feature_slot = root_bank + .feature_set + .activated_slot(&feature_set::turbine_peers_shuffle::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = root_bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + +// Shuffles nodes w.r.t their stakes. +// Unstaked nodes will always appear at the very end. +fn shuffle_nodes<'a, R: Rng>(rng: &mut R, nodes: &[&'a Node]) -> Vec<&'a Node> { + // Nodes are sorted by (stake, pubkey) in descending order. + let stakes: Vec = nodes + .iter() + .map(|node| node.stake) + .take_while(|stake| *stake > 0) + .collect(); + let num_staked = stakes.len(); + let mut out: Vec<_> = WeightedShuffle::new(rng, &stakes) + .unwrap() + .map(|i| nodes[i]) + .collect(); + let weights = vec![1; nodes.len() - num_staked]; + out.extend( + WeightedShuffle::new(rng, &weights) + .unwrap() + .map(|i| nodes[i + num_staked]), + ); + out +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -306,6 +509,7 @@ impl Default for ClusterNodes { Self { pubkey: Pubkey::default(), nodes: Vec::default(), + cumulative_weights: Vec::default(), index: Vec::default(), _phantom: PhantomData::default(), } @@ -458,15 +662,15 @@ mod tests { let (neighbors_indices, children_indices) = compute_retransmit_peers(fanout, self_index, &shuffled_index); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader); + cluster_nodes.get_retransmit_peers_compat(shred_seed, fanout, slot_leader); assert_eq!(children.len(), children_indices.len()); for (node, index) in children.into_iter().zip(children_indices) { - assert_eq!(*node, peers[index]); + assert_eq!(*node.contact_info().unwrap(), peers[index]); } assert_eq!(neighbors.len(), neighbors_indices.len()); - assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id); + assert_eq!(neighbors[0].pubkey(), peers[neighbors_indices[0]].id); for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) { - assert_eq!(*node, peers[index]); + assert_eq!(*node.contact_info().unwrap(), peers[index]); } } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3770f532d8fc6c..de0f714fc45ba8 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -17,7 +17,10 @@ use { lru::LruCache, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_client::rpc_response::SlotUpdate, - solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, + solana_gossip::{ + cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, + contact_info::ContactInfo, + }, solana_ledger::{ shred::Shred, {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, @@ -28,6 +31,7 @@ use { solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, + solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, std::{ collections::{BTreeSet, HashSet}, net::UdpSocket, @@ -215,7 +219,6 @@ fn retransmit( epoch_cache_update.stop(); stats.epoch_cache_update += epoch_cache_update.as_us(); - let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); let retransmit_shred = |shred: Shred, socket: &UdpSocket| { if should_skip_retransmit(&shred, shreds_received) { @@ -253,37 +256,29 @@ fn retransmit( }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); - let shred_seed = shred.seed(slot_leader, &root_bank); - let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); - let anchor_node = neighbors[0].id == my_id; + let addrs: Vec<_> = cluster_nodes + .get_retransmit_addrs(slot_leader, &shred, &root_bank, DATA_PLANE_FANOUT) + .into_iter() + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect(); compute_turbine_peers.stop(); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), it should send the packet to tvu socket of its - // children and also tvu_forward socket of its neighbors. Otherwise it - // should only forward to tvu_forward socket of its children. - if anchor_node { - // First neighbor is this node itself, so skip it. - ClusterInfo::retransmit_to( - &neighbors[1..], - &shred.payload, - socket, - true, // forward socket - socket_addr_space, + if let Err(SendPktsError::IoError(ioerr, num_failed)) = + multi_target_send(socket, &shred.payload, &addrs) + { + inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); + inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + addrs.len(), ); } - ClusterInfo::retransmit_to( - &children, - &shred.payload, - socket, - !anchor_node, // send to forward socket! - socket_addr_space, - ); retransmit_time.stop(); stats .retransmit_total diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index fb5c6665f03863..a972f14218f82c 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2634,27 +2634,27 @@ fn get_epoch_duration(bank_forks: Option<&RwLock>) -> Duration { /// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake -pub fn compute_retransmit_peers( +pub fn compute_retransmit_peers( fanout: usize, - node: usize, - index: &[usize], -) -> (Vec /*neighbors*/, Vec /*children*/) { + index: usize, // Local node's index withing the nodes slice. + nodes: &[T], +) -> (Vec /*neighbors*/, Vec /*children*/) { // 1st layer: fanout nodes starting at 0 // 2nd layer: fanout**2 nodes starting at fanout // 3rd layer: fanout**3 nodes starting at fanout + fanout**2 // ... // Each layer is divided into neighborhoods of fanout nodes each. - let offset = node % fanout; // Node's index within its neighborhood. - let anchor = node - offset; // First node in the neighborhood. + let offset = index % fanout; // Node's index within its neighborhood. + let anchor = index - offset; // First node in the neighborhood. let neighbors = (anchor..) .take(fanout) - .map(|i| index.get(i).copied()) + .map(|i| nodes.get(i).copied()) .while_some() .collect(); let children = ((anchor + 1) * fanout + offset..) .step_by(fanout) .take(fanout) - .map(|i| index.get(i).copied()) + .map(|i| nodes.get(i).copied()) .while_some() .collect(); (neighbors, children) diff --git a/gossip/src/weighted_shuffle.rs b/gossip/src/weighted_shuffle.rs index 25e0658bec6e45..0482396221ad47 100644 --- a/gossip/src/weighted_shuffle.rs +++ b/gossip/src/weighted_shuffle.rs @@ -135,6 +135,35 @@ where } } +// Equivalent to WeightedShuffle(rng, weights).unwrap().next(). +pub fn weighted_sample_single(rng: &mut R, cumulative_weights: &[T]) -> Option +where + T: Copy + Default + PartialOrd + SampleUniform, +{ + let zero = ::default(); + let high = cumulative_weights.last().copied().unwrap_or_default(); + #[allow(clippy::neg_cmp_op_on_partial_ord)] + if !(high > zero) { + return None; + } + let sample = ::Sampler::sample_single(zero, high, rng); + let mut lo = 0usize; + let mut hi = cumulative_weights.len() - 1; + while lo + 1 < hi { + let k = lo + (hi - lo) / 2; + if cumulative_weights[k] <= sample { + lo = k; + } else { + hi = k; + } + } + if cumulative_weights[lo] > sample { + Some(lo) + } else { + Some(hi) + } +} + /// Returns a list of indexes shuffled based on the input weights /// Note - The sum of all weights must not exceed `u64::MAX` pub fn weighted_shuffle(weights: F, seed: [u8; 32]) -> Vec @@ -288,20 +317,40 @@ mod tests { assert_eq!(best_index, 2); } + // Asserts that empty weights will return empty shuffle. + #[test] + fn test_weighted_shuffle_empty_weights() { + let weights = Vec::::new(); + let mut rng = rand::thread_rng(); + let shuffle = WeightedShuffle::new(&mut rng, &weights); + assert!(shuffle.unwrap().next().is_none()); + assert_eq!(weighted_sample_single(&mut rng, &weights), None); + } + + // Asserts that zero weights will return empty shuffle. + #[test] + fn test_weighted_shuffle_zero_weights() { + let weights = vec![0u64; 5]; + let mut rng = rand::thread_rng(); + let shuffle = WeightedShuffle::new(&mut rng, &weights); + assert!(shuffle.unwrap().next().is_none()); + assert_eq!(weighted_sample_single(&mut rng, &weights), None); + } + // Asserts that each index is selected proportional to its weight. #[test] fn test_weighted_shuffle_sanity() { let seed: Vec<_> = (1..).step_by(3).take(32).collect(); let seed: [u8; 32] = seed.try_into().unwrap(); let mut rng = ChaChaRng::from_seed(seed); - let weights = [1, 1000, 10, 100]; - let mut counts = [0; 4]; + let weights = [1, 0, 1000, 0, 0, 10, 100, 0]; + let mut counts = [0; 8]; for _ in 0..100000 { let mut shuffle = WeightedShuffle::new(&mut rng, &weights).unwrap(); counts[shuffle.next().unwrap()] += 1; let _ = shuffle.count(); // consume the rest. } - assert_eq!(counts, [101, 90113, 891, 8895]); + assert_eq!(counts, [101, 0, 90113, 0, 0, 891, 8895, 0]); } #[test] @@ -309,6 +358,13 @@ mod tests { let weights = [ 78, 70, 38, 27, 21, 0, 82, 42, 21, 77, 77, 17, 4, 50, 96, 83, 33, 16, 72, ]; + let cumulative_weights: Vec<_> = weights + .iter() + .scan(0, |acc, w| { + *acc += w; + Some(*acc) + }) + .collect(); let seed = [48u8; 32]; let mut rng = ChaChaRng::from_seed(seed); let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); @@ -316,6 +372,11 @@ mod tests { shuffle, [2, 11, 16, 0, 13, 14, 15, 10, 1, 9, 7, 6, 12, 18, 4, 17, 3, 8] ); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(2), + ); let seed = [37u8; 32]; let mut rng = ChaChaRng::from_seed(seed); let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); @@ -323,12 +384,24 @@ mod tests { shuffle, [17, 3, 14, 13, 6, 10, 15, 16, 9, 2, 4, 1, 0, 7, 8, 18, 11, 12] ); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(17), + ); } #[test] fn test_weighted_shuffle_match_slow() { let mut rng = rand::thread_rng(); let weights: Vec = repeat_with(|| rng.gen_range(0, 1000)).take(997).collect(); + let cumulative_weights: Vec<_> = weights + .iter() + .scan(0, |acc, w| { + *acc += w; + Some(*acc) + }) + .collect(); for _ in 0..10 { let mut seed = [0u8; 32]; rng.fill(&mut seed[..]); @@ -336,7 +409,12 @@ mod tests { let shuffle: Vec<_> = WeightedShuffle::new(&mut rng, &weights).unwrap().collect(); let mut rng = ChaChaRng::from_seed(seed); let shuffle_slow = weighted_shuffle_slow(&mut rng, weights.clone()); - assert_eq!(shuffle, shuffle_slow,); + assert_eq!(shuffle, shuffle_slow); + let mut rng = ChaChaRng::from_seed(seed); + assert_eq!( + weighted_sample_single(&mut rng, &cumulative_weights), + Some(shuffle[0]), + ); } } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index fcf581660d3610..98204a92234284 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -241,6 +241,10 @@ pub mod send_to_tpu_vote_port { solana_sdk::declare_id!("C5fh68nJ7uyKAuYZg2x9sEQ5YrVf3dkW6oojNBSc3Jvo"); } +pub mod turbine_peers_shuffle { + solana_sdk::declare_id!("4VvpgRD6UsHvkXwpuQhtR5NG1G4esMaExeWuSEpsYRUa"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -296,6 +300,7 @@ lazy_static! { (optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"), (remove_native_loader::id(), "Remove support for the native loader"), (send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"), + (turbine_peers_shuffle::id(), "turbine peers shuffle patch"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()