diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 18f74b21fd551a..076fdb6b755712 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -332,6 +332,51 @@ impl CrdsGossip { } } +// Returns active and valid cluster nodes to gossip with. +pub(crate) fn get_gossip_nodes( + rng: &mut R, + now: u64, + pubkey: &Pubkey, // This node. + // By default, should only push to or pull from gossip nodes with the same + // shred-version. Except for spy nodes (shred_version == 0u16) which can + // pull from any node. + verify_shred_version: impl Fn(/*shred_version:*/ u16) -> bool, + crds: &RwLock, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + socket_addr_space: &SocketAddrSpace, +) -> Vec { + // Exclude nodes which have not been active for this long. + const ACTIVE_TIMEOUT: Duration = Duration::from_secs(60); + let active_cutoff = now.saturating_sub(ACTIVE_TIMEOUT.as_millis() as u64); + let crds = crds.read().unwrap(); + crds.get_nodes() + .filter_map(|value| { + let node = value.value.contact_info().unwrap(); + // Exclude nodes which have not been active recently. + if value.local_timestamp < active_cutoff { + // In order to mitigate eclipse attack, for staked nodes + // continue retrying periodically. + let stake = stakes.get(&node.id).copied().unwrap_or_default(); + if stake == 0u64 || !rng.gen_ratio(1, 16) { + return None; + } + } + Some(node) + }) + .filter(|node| { + &node.id != pubkey + && verify_shred_version(node.shred_version) + && ContactInfo::is_valid_address(&node.gossip, socket_addr_space) + && match gossip_validators { + Some(nodes) => nodes.contains(&node.id), + None => true, + } + }) + .cloned() + .collect() +} + // Dedups gossip addresses, keeping only the one with the highest stake. pub(crate) fn dedup_gossip_addresses( nodes: impl IntoIterator, diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 7b2b49ea33c77c..634b94c2eb5fe3 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -54,8 +54,6 @@ pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000; // Retention period of hashes of received outdated values. const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; -// Do not pull from peers which have not been updated for this long. -const PULL_ACTIVE_TIMEOUT_MS: u64 = 60_000; pub const FALSE_RATE: f64 = 0.1f64; pub const KEYS: f64 = 8f64; @@ -229,17 +227,20 @@ impl CrdsGossipPull { pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, ) -> Result>, CrdsGossipError> { + let mut rng = rand::thread_rng(); // Active and valid gossip nodes with matching shred-version. - let nodes = self.pull_options( - crds, - &self_keypair.pubkey(), - self_shred_version, + let nodes = crds_gossip::get_gossip_nodes( + &mut rng, now, + &self_keypair.pubkey(), + // Pull from nodes with the same shred version, unless this is a + // spy node which then can pull from any node. + |shred_version| self_shred_version == 0u16 || shred_version == self_shred_version, + crds, gossip_validators, stakes, socket_addr_space, ); - let mut rng = rand::thread_rng(); // Check for nodes which have responded to ping messages. let nodes = crds_gossip::maybe_ping_gossip_addresses( &mut rng, @@ -272,44 +273,6 @@ impl CrdsGossipPull { Ok(nodes.zip(filters).into_group_map()) } - fn pull_options( - &self, - crds: &RwLock, - self_id: &Pubkey, - self_shred_version: u16, - now: u64, - gossip_validators: Option<&HashSet>, - stakes: &HashMap, - socket_addr_space: &SocketAddrSpace, - ) -> Vec { - let mut rng = rand::thread_rng(); - let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); - let crds = crds.read().unwrap(); - crds.get_nodes() - .filter_map(|value| { - let info = value.value.contact_info().unwrap(); - // Stop pulling from nodes which have not been active recently. - if value.local_timestamp < active_cutoff { - // In order to mitigate eclipse attack, for staked nodes - // continue retrying periodically. - let stake = stakes.get(&info.id).unwrap_or(&0); - if *stake == 0 || !rng.gen_ratio(1, 16) { - return None; - } - } - Some(info) - }) - .filter(|v| { - v.id != *self_id - && ContactInfo::is_valid_address(&v.gossip, socket_addr_space) - && (self_shred_version == 0 || self_shred_version == v.shred_version) - && gossip_validators - .map_or(true, |gossip_validators| gossip_validators.contains(&v.id)) - }) - .cloned() - .collect() - } - /// Process a pull request pub(crate) fn process_pull_requests(crds: &RwLock, callers: I, now: u64) where @@ -625,7 +588,6 @@ pub(crate) mod tests { crate::{ cluster_info::MAX_BLOOM_SIZE, crds_value::{CrdsData, Vote}, - socketaddr, }, itertools::Itertools, rand::{seq::SliceRandom, thread_rng, SeedableRng}, @@ -679,186 +641,6 @@ pub(crate) mod tests { } } - #[test] - fn test_new_pull_with_stakes() { - let mut crds = Crds::default(); - let mut stakes = HashMap::new(); - let node = CrdsGossipPull::default(); - let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( - &solana_sdk::pubkey::new_rand(), - 0, - ))); - crds.insert(me.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - for i in 1..=30 { - let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo( - ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), - )); - let id = entry.label().pubkey(); - crds.insert(entry.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - stakes.insert(id, i * 100); - } - let now = 1024; - let crds = RwLock::new(crds); - let options = node.pull_options( - &crds, - &me.label().pubkey(), - 0, - now, - None, - &stakes, - &SocketAddrSpace::Unspecified, - ); - assert!(!options.is_empty()); - } - - #[test] - fn test_no_pulls_from_different_shred_versions() { - let mut crds = Crds::default(); - let stakes = HashMap::new(); - let node = CrdsGossipPull::default(); - - let gossip = socketaddr!("127.0.0.1:1234"); - - let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 123, - gossip, - ..ContactInfo::default() - })); - let spy = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 0, - gossip, - ..ContactInfo::default() - })); - let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 123, - gossip, - ..ContactInfo::default() - })); - let node_456 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 456, - gossip, - ..ContactInfo::default() - })); - - crds.insert(me.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(spy.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - let crds = RwLock::new(crds); - - // shred version 123 should ignore nodes with versions 0 and 456 - let options = node - .pull_options( - &crds, - &me.label().pubkey(), - 123, - 0, - None, - &stakes, - &SocketAddrSpace::Unspecified, - ) - .iter() - .map(|peer| peer.id) - .collect::>(); - assert_eq!(options.len(), 1); - assert!(!options.contains(&spy.pubkey())); - assert!(options.contains(&node_123.pubkey())); - - // spy nodes will see all - let options = node - .pull_options( - &crds, - &spy.label().pubkey(), - 0, - 0, - None, - &stakes, - &SocketAddrSpace::Unspecified, - ) - .iter() - .map(|peer| peer.id) - .collect::>(); - assert_eq!(options.len(), 3); - assert!(options.contains(&me.pubkey())); - assert!(options.contains(&node_123.pubkey())); - assert!(options.contains(&node_456.pubkey())); - } - - #[test] - fn test_pulls_only_from_allowed() { - let mut crds = Crds::default(); - let stakes = HashMap::new(); - let node = CrdsGossipPull::default(); - let gossip = socketaddr!("127.0.0.1:1234"); - - let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip, - ..ContactInfo::default() - })); - let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip, - ..ContactInfo::default() - })); - - crds.insert(me.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - let crds = RwLock::new(crds); - - // Empty gossip_validators -- will pull from nobody - let mut gossip_validators = HashSet::new(); - let options = node.pull_options( - &crds, - &me.label().pubkey(), - 0, - 0, - Some(&gossip_validators), - &stakes, - &SocketAddrSpace::Unspecified, - ); - assert!(options.is_empty()); - - // Unknown pubkey in gossip_validators -- will pull from nobody - gossip_validators.insert(solana_sdk::pubkey::new_rand()); - let options = node.pull_options( - &crds, - &me.label().pubkey(), - 0, - 0, - Some(&gossip_validators), - &stakes, - &SocketAddrSpace::Unspecified, - ); - assert!(options.is_empty()); - - // node_123 pubkey in gossip_validators -- will pull from it - gossip_validators.insert(node_123.pubkey()); - let options = node.pull_options( - &crds, - &me.label().pubkey(), - 0, - 0, - Some(&gossip_validators), - &stakes, - &SocketAddrSpace::Unspecified, - ); - assert_eq!(options.len(), 1); - assert_eq!(options[0].id, node_123.pubkey()); - } - #[test] fn test_crds_filter_set_add() { let mut rng = thread_rng(); diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index c310b1e5244259..807cba5ef8717d 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -17,14 +17,12 @@ use { crds::{Crds, CrdsError, Cursor, GossipRoute}, crds_gossip, crds_value::CrdsValue, - legacy_contact_info::LegacyContactInfo as ContactInfo, ping_pong::PingCache, push_active_set::PushActiveSet, received_cache::ReceivedCache, }, bincode::serialized_size, itertools::Itertools, - rand::Rng, solana_sdk::{ packet::PACKET_DATA_SIZE, pubkey::Pubkey, @@ -52,8 +50,6 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3; -// Do not push to peers which have not been updated for this long. -const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; pub struct CrdsGossipPush { /// Max bytes per message @@ -249,12 +245,15 @@ impl CrdsGossipPush { ) { let mut rng = rand::thread_rng(); // Active and valid gossip nodes with matching shred-version. - let nodes = self.push_options( - crds, + let nodes = crds_gossip::get_gossip_nodes( + &mut rng, + timestamp(), // now &self_keypair.pubkey(), - self_shred_version, - stakes, + // Only push to nodes with the same shred version. + |shred_version| shred_version == self_shred_version, + crds, gossip_validators, + stakes, socket_addr_space, ); // Check for nodes which have responded to ping messages. @@ -276,45 +275,6 @@ impl CrdsGossipPush { active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes) } - fn push_options( - &self, - crds: &RwLock, - self_id: &Pubkey, - self_shred_version: u16, - stakes: &HashMap, - gossip_validators: Option<&HashSet>, - socket_addr_space: &SocketAddrSpace, - ) -> Vec { - let now = timestamp(); - let mut rng = rand::thread_rng(); - let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); - let crds = crds.read().unwrap(); - crds.get_nodes() - .filter_map(|value| { - let info = value.value.contact_info().unwrap(); - // Stop pushing to nodes which have not been active recently. - if value.local_timestamp < active_cutoff { - // In order to mitigate eclipse attack, for staked nodes - // continue retrying periodically. - let stake = stakes.get(&info.id).unwrap_or(&0); - if *stake == 0 || !rng.gen_ratio(1, 16) { - return None; - } - } - Some(info) - }) - .filter(|info| { - info.id != *self_id - && ContactInfo::is_valid_address(&info.gossip, socket_addr_space) - && self_shred_version == info.shred_version - && gossip_validators.map_or(true, |gossip_validators| { - gossip_validators.contains(&info.id) - }) - }) - .cloned() - .collect() - } - // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { let active_set = self.active_set.read().unwrap().mock_clone(); @@ -336,7 +296,7 @@ impl CrdsGossipPush { mod tests { use { super::*, - crate::{crds_value::CrdsData, socketaddr}, + crate::{crds_value::CrdsData, legacy_contact_info::LegacyContactInfo as ContactInfo}, std::time::{Duration, Instant}, }; @@ -434,172 +394,6 @@ mod tests { ); } - #[test] - fn test_active_set_refresh_with_bank() { - solana_logger::setup(); - let time = timestamp() - 1024; //make sure there's at least a 1 second delay - let mut crds = Crds::default(); - let push = CrdsGossipPush::default(); - let mut stakes = HashMap::new(); - for i in 1..=100 { - let peer = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo( - ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), time), - )); - let id = peer.label().pubkey(); - crds.insert(peer.clone(), time, GossipRoute::LocalMessage) - .unwrap(); - stakes.insert(id, i * 100); - } - let crds = RwLock::new(crds); - let options = push.push_options( - &crds, - &Pubkey::default(), - 0, - &stakes, - None, - &SocketAddrSpace::Unspecified, - ); - assert!(!options.is_empty()); - } - - #[test] - fn test_no_pushes_to_from_different_shred_versions() { - let now = timestamp(); - let mut crds = Crds::default(); - let stakes = HashMap::new(); - let node = CrdsGossipPush::default(); - - let gossip = socketaddr!("127.0.0.1:1234"); - - let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 123, - gossip, - ..ContactInfo::default() - })); - let spy = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 0, - gossip, - ..ContactInfo::default() - })); - let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 123, - gossip, - ..ContactInfo::default() - })); - let node_456 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - shred_version: 456, - gossip, - ..ContactInfo::default() - })); - - crds.insert(me.clone(), now, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(spy.clone(), now, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_456, now, GossipRoute::LocalMessage) - .unwrap(); - let crds = RwLock::new(crds); - - // shred version 123 should ignore nodes with versions 0 and 456 - let options = node - .push_options( - &crds, - &me.label().pubkey(), - 123, - &stakes, - None, - &SocketAddrSpace::Unspecified, - ) - .iter() - .map(|node| node.id) - .collect::>(); - assert_eq!(options.len(), 1); - assert!(!options.contains(&spy.pubkey())); - assert!(options.contains(&node_123.pubkey())); - - // spy nodes should not push to people on different shred versions - let options = node.push_options( - &crds, - &spy.label().pubkey(), - 0, - &stakes, - None, - &SocketAddrSpace::Unspecified, - ); - assert!(options.is_empty()); - } - - #[test] - fn test_pushes_only_to_allowed() { - let now = timestamp(); - let mut crds = Crds::default(); - let stakes = HashMap::new(); - let node = CrdsGossipPush::default(); - let gossip = socketaddr!("127.0.0.1:1234"); - - let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip, - ..ContactInfo::default() - })); - let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip, - ..ContactInfo::default() - })); - - crds.insert(me.clone(), 0, GossipRoute::LocalMessage) - .unwrap(); - crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) - .unwrap(); - let crds = RwLock::new(crds); - - // Unknown pubkey in gossip_validators -- will push to nobody - let mut gossip_validators = HashSet::new(); - let options = node.push_options( - &crds, - &me.label().pubkey(), - 0, - &stakes, - Some(&gossip_validators), - &SocketAddrSpace::Unspecified, - ); - - assert!(options.is_empty()); - - // Unknown pubkey in gossip_validators -- will push to nobody - gossip_validators.insert(solana_sdk::pubkey::new_rand()); - let options = node.push_options( - &crds, - &me.label().pubkey(), - 0, - &stakes, - Some(&gossip_validators), - &SocketAddrSpace::Unspecified, - ); - assert!(options.is_empty()); - - // node_123 pubkey in gossip_validators -- will push to it - gossip_validators.insert(node_123.pubkey()); - let options = node.push_options( - &crds, - &me.label().pubkey(), - 0, - &stakes, - Some(&gossip_validators), - &SocketAddrSpace::Unspecified, - ); - - assert_eq!(options.len(), 1); - assert_eq!(options[0].id, node_123.pubkey()); - } - #[test] fn test_new_push_messages() { let now = timestamp();