diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 433d605ff1f831..59048013e82655 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -38,6 +38,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; +use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -66,7 +67,7 @@ pub enum ClusterInfoError { BadNodeInfo, BadGossipAddress, } - +#[derive(Clone)] pub struct ClusterInfo { /// The network pub gossip: CrdsGossip, @@ -88,6 +89,16 @@ pub struct Locality { pub child_layer_peers: Vec, } +impl fmt::Debug for Locality { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Packet {{ neighborhood_bounds: {:?}, current_layer: {:?}, child_layer_bounds: {:?} child_layer_peers: {:?} }}", + self.neighbor_bounds, self.layer_ix, self.child_layer_bounds, self.child_layer_peers + ) + } +} + #[derive(Debug, Deserialize, Serialize)] pub struct PruneData { /// Pubkey of the node that sent this prune data @@ -358,6 +369,7 @@ impl ClusterInfo { .map(|c| (bank.get_balance(&c.id), c.clone())) .collect(); peers_with_stakes.sort_unstable(); + peers_with_stakes.reverse(); peers_with_stakes } @@ -655,11 +667,14 @@ impl ClusterInfo { .collect() } - fn create_broadcast_orders<'a>( + pub fn create_broadcast_orders<'a, T>( contains_last_tick: bool, - blobs: &[SharedBlob], + blobs: &[T], broadcast_table: &'a [NodeInfo], - ) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> { + ) -> Vec<(T, Vec<&'a NodeInfo>)> + where + T: Clone, + { // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node. if blobs.is_empty() { diff --git a/src/crds.rs b/src/crds.rs index 76563ce60cb582..fbb896600df812 100644 --- a/src/crds.rs +++ b/src/crds.rs @@ -31,6 +31,7 @@ use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use std::cmp; +#[derive(Clone)] pub struct Crds { /// Stores the map of labels and values pub table: IndexMap, @@ -44,7 +45,7 @@ pub enum CrdsError { /// This structure stores some local metadata associated with the CrdsValue /// The implementation of PartialOrd ensures that the "highest" version is always picked to be /// stored in the Crds -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub struct VersionedCrdsValue { pub value: CrdsValue, /// local time when inserted diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index 0c78b004bcd041..bbac4984171195 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -15,6 +15,7 @@ use solana_sdk::pubkey::Pubkey; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; +#[derive(Clone)] pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index 918cc81a119475..70b8a5eb70891b 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -26,6 +26,7 @@ use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; +#[derive(Clone)] pub struct CrdsGossipPull { /// timestamp of last request pub pull_request_time: HashMap, diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 60a5963fa97390..b8d7420389f6f7 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -29,6 +29,7 @@ pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; +#[derive(Clone)] pub struct CrdsGossipPush { /// max bytes per message pub max_bytes: usize, diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index da8f4bb1a82b48..4ce1a8ad3faf14 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -2,13 +2,16 @@ use crate::bank::Bank; use crate::blocktree::Blocktree; -use crate::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE}; +use crate::cluster_info::{ + ClusterInfo, NodeInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, +}; use crate::counter::Counter; use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::BlobReceiver; use crate::window_service::WindowService; +use core::cmp; use log::Level; use solana_metrics::{influxdb, submit}; use std::net::UdpSocket; @@ -19,44 +22,28 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -fn retransmit( +/// Avalanche logic +/// 1 - For the current node find out if it is in layer 1 +/// 1.1 - If yes, then broadcast to all layer 1 nodes +/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size +/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them +/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic +fn compute_retransmit_peers( bank: &Arc, cluster_info: &Arc>, - r: &BlobReceiver, - sock: &UdpSocket, -) -> Result<()> { - let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq); - } - - submit( - influxdb::Point::new("retransmit-stage") - .add_field("count", influxdb::Value::Integer(dq.len() as i64)) - .to_owned(), - ); - - // TODO layer 2 logic here - // 1 - find out if I am in layer 1 first - // 1.1 - If yes, then broadcast to all layer 1 nodes - // 1 - using my layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size - // 1.2 - If no, then figure out what layer I am in and who my neighbors are and only broadcast to them - // 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic + fanout: usize, + hood_size: usize, + grow: bool, +) -> Vec { let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); let my_id = cluster_info.read().unwrap().id(); //calc num_layers and num_neighborhoods using the total number of nodes - let (num_layers, layer_indices) = ClusterInfo::describe_data_plane( - peers.len(), - DATA_PLANE_FANOUT, - NEIGHBORHOOD_SIZE, - GROW_LAYER_CAPACITY, - ); + let (num_layers, layer_indices) = + ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow); + if num_layers <= 1 { /* single layer data plane */ - for b in &mut dq { - ClusterInfo::retransmit(&cluster_info, b, sock)?; - } + peers } else { //find my index (my ix is the same as the first node with smaller stake) let my_index = peers @@ -65,20 +52,48 @@ fn retransmit( //find my layer let locality = ClusterInfo::localize( &layer_indices, - NEIGHBORHOOD_SIZE, + hood_size, my_index.unwrap_or(peers.len() - 1), ); - let mut retransmit_peers = - peers[locality.neighbor_bounds.0..locality.neighbor_bounds.1].to_vec(); - locality.child_layer_peers.iter().for_each(|&ix| { + let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); + let mut retransmit_peers = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); + for ix in locality.child_layer_peers { if let Some(peer) = peers.get(ix) { retransmit_peers.push(peer.clone()); + continue; } - }); - - for b in &mut dq { - ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?; + break; } + retransmit_peers + } +} + +fn retransmit( + bank: &Arc, + cluster_info: &Arc>, + r: &BlobReceiver, + sock: &UdpSocket, +) -> Result<()> { + let timer = Duration::new(1, 0); + let mut dq = r.recv_timeout(timer)?; + while let Ok(mut nq) = r.try_recv() { + dq.append(&mut nq); + } + + submit( + influxdb::Point::new("retransmit-stage") + .add_field("count", influxdb::Value::Integer(dq.len() as i64)) + .to_owned(), + ); + let retransmit_peers = compute_retransmit_peers( + &bank, + cluster_info, + DATA_PLANE_FANOUT, + NEIGHBORHOOD_SIZE, + GROW_LAYER_CAPACITY, + ); + for b in &mut dq { + ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?; } Ok(()) } @@ -171,3 +186,191 @@ impl Service for RetransmitStage { Ok(()) } } + +// Recommended to not run these tests in parallel (they are resource heavy and want all the compute) +#[cfg(test)] +mod tests { + use super::*; + use crate::cluster_info::ClusterInfo; + use crate::contact_info::ContactInfo; + use crate::genesis_block::GenesisBlock; + use rayon::iter::{IntoParallelIterator, ParallelIterator}; + use rayon::prelude::*; + use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::collections::{HashMap, HashSet}; + use std::sync::mpsc::TryRecvError; + use std::sync::mpsc::{Receiver, Sender}; + use std::sync::Mutex; + use std::time::Instant; + + type Nodes = HashMap, Receiver)>; + + fn num_threads() -> usize { + sys_info::cpu_num().unwrap_or(10) as usize + } + + /// Search for the a node with the given balance + fn find_insert_blob(id: &Pubkey, blob: i32, batches: &mut [Nodes]) { + batches.par_iter_mut().for_each(|batch| { + if batch.contains_key(id) { + let _ = batch.get_mut(id).unwrap().0.insert(blob); + } + }); + } + + fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { + let num_threads = num_threads(); + // set timeout to 5 minutes + let timeout = 60 * 5; + + // math yo + let required_balance = num_nodes * (num_nodes + 1) / 2; + + // create a genesis block + let (genesis_block, mint_keypair) = GenesisBlock::new(required_balance + 2); + + // describe the leader + let leader_info = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); + let mut cluster_info = ClusterInfo::new(leader_info.clone()); + cluster_info.set_leader(leader_info.id); + + // create a bank + let bank = Arc::new(Bank::new(&genesis_block)); + + // setup accounts for all nodes (leader has 0 bal) + let (s, r) = channel(); + let senders: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + senders.lock().unwrap().insert(leader_info.id, s); + let mut batches: Vec = Vec::with_capacity(num_threads); + (0..num_threads).for_each(|_| batches.push(HashMap::new())); + batches + .get_mut(0) + .unwrap() + .insert(leader_info.id, (HashSet::new(), r)); + let range: Vec<_> = (1..=num_nodes).collect(); + let chunk_size = (num_nodes as usize + num_threads - 1) / num_threads; + range.chunks(chunk_size).for_each(|chunk| { + chunk.into_iter().for_each(|i| { + //distribute neighbors across threads to maximize parallel compute + let batch_ix = *i as usize % batches.len(); + let node = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); + bank.transfer(*i, &mint_keypair, node.id, bank.last_id()) + .unwrap(); + cluster_info.insert_info(node.clone()); + let (s, r) = channel(); + batches + .get_mut(batch_ix) + .unwrap() + .insert(node.id, (HashSet::new(), r)); + senders.lock().unwrap().insert(node.id, s); + }) + }); + let c_info = cluster_info.clone(); + + // check that all tokens have been exhausted + assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); + + // create some "blobs". + let blobs: Vec<_> = (0..100).into_par_iter().map(|i| i as i32).collect(); + + // pretend to broadcast from leader - cluster_info::create_broadcast_orders + let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank); + broadcast_table.truncate(fanout); + let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); + + // send blobs to layer 1 nodes + orders.iter().for_each(|(b, vc)| { + vc.iter().for_each(|c| { + find_insert_blob(&c.id, *b, &mut batches); + }) + }); + assert!(!batches.is_empty()); + + // start avalanche simulation + let now = Instant::now(); + batches.par_iter_mut().for_each(|batch| { + let mut cluster = c_info.clone(); + let batch_size = batch.len(); + let mut remaining = batch_size; + let senders: HashMap<_, _> = senders.lock().unwrap().clone(); + let mut mapped_peers: HashMap>> = HashMap::new(); + while remaining > 0 { + for (id, (recv, r)) in batch.iter_mut() { + assert!(now.elapsed().as_secs() < timeout, "Timed out"); + cluster.gossip.set_self(*id); + if !mapped_peers.contains_key(id) { + let peers = compute_retransmit_peers( + &bank, + &Arc::new(RwLock::new(cluster.clone())), + fanout, + hood_size, + GROW_LAYER_CAPACITY, + ); + + let vec_peers: Vec<_> = peers + .iter() + .map(|p| { + let s = senders.get(&p.id).unwrap(); + recv.iter().for_each(|i| { + let _ = s.send(*i); + }); + s.clone() + }) + .collect(); + mapped_peers.insert(*id, vec_peers); + } + let vec_peers = mapped_peers.get(id).unwrap().to_vec(); + + //send and recv + if recv.len() < blobs.len() { + loop { + match r.try_recv() { + Ok(i) => { + if recv.insert(i) { + vec_peers.iter().for_each(|s| { + let _ = s.send(i); + }); + if recv.len() == blobs.len() { + remaining -= 1; + break; + } + } + } + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => break, + }; + } + } + } + } + }); + } + + // Run with a single layer + #[test] + fn test_retransmit_small() { + run_simulation( + DATA_PLANE_FANOUT as u64, + DATA_PLANE_FANOUT, + NEIGHBORHOOD_SIZE, + ); + } + + // Make sure at least 2 layers are used + #[test] + fn test_retransmit_medium() { + let num_nodes = DATA_PLANE_FANOUT as u64 * 10; + run_simulation(num_nodes, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE); + } + + // Scale down the network and make sure at least 3 layers are used + #[test] + fn test_retransmit_large() { + let num_nodes = DATA_PLANE_FANOUT as u64 * 20; + run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); + } + + //todo add tests with network failures +}