Skip to content

Commit

Permalink
factors out common gossip {push,pull}_options code (#29737)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Jan 18, 2023
1 parent aef8692 commit 9f2910e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 440 deletions.
45 changes: 45 additions & 0 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,51 @@ impl CrdsGossip {
}
}

// Returns active and valid cluster nodes to gossip with.
pub(crate) fn get_gossip_nodes<R: Rng>(
rng: &mut R,
now: u64,
pubkey: &Pubkey, // This node.
// By default, should only push to or pull from gossip nodes with the same
// shred-version. Except for spy nodes (shred_version == 0u16) which can
// pull from any node.
verify_shred_version: impl Fn(/*shred_version:*/ u16) -> bool,
crds: &RwLock<Crds>,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<ContactInfo> {
// Exclude nodes which have not been active for this long.
const ACTIVE_TIMEOUT: Duration = Duration::from_secs(60);
let active_cutoff = now.saturating_sub(ACTIVE_TIMEOUT.as_millis() as u64);
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let node = value.value.contact_info().unwrap();
// Exclude nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&node.id).copied().unwrap_or_default();
if stake == 0u64 || !rng.gen_ratio(1, 16) {
return None;
}
}
Some(node)
})
.filter(|node| {
&node.id != pubkey
&& verify_shred_version(node.shred_version)
&& ContactInfo::is_valid_address(&node.gossip, socket_addr_space)
&& match gossip_validators {
Some(nodes) => nodes.contains(&node.id),
None => true,
}
})
.cloned()
.collect()
}

// Dedups gossip addresses, keeping only the one with the highest stake.
pub(crate) fn dedup_gossip_addresses(
nodes: impl IntoIterator<Item = ContactInfo>,
Expand Down
234 changes: 8 additions & 226 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
// Do not pull from peers which have not been updated for this long.
const PULL_ACTIVE_TIMEOUT_MS: u64 = 60_000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;

Expand Down Expand Up @@ -229,17 +227,20 @@ impl CrdsGossipPull {
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<HashMap<ContactInfo, Vec<CrdsFilter>>, CrdsGossipError> {
let mut rng = rand::thread_rng();
// Active and valid gossip nodes with matching shred-version.
let nodes = self.pull_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
let nodes = crds_gossip::get_gossip_nodes(
&mut rng,
now,
&self_keypair.pubkey(),
// Pull from nodes with the same shred version, unless this is a
// spy node which then can pull from any node.
|shred_version| self_shred_version == 0u16 || shred_version == self_shred_version,
crds,
gossip_validators,
stakes,
socket_addr_space,
);
let mut rng = rand::thread_rng();
// Check for nodes which have responded to ping messages.
let nodes = crds_gossip::maybe_ping_gossip_addresses(
&mut rng,
Expand Down Expand Up @@ -272,44 +273,6 @@ impl CrdsGossipPull {
Ok(nodes.zip(filters).into_group_map())
}

fn pull_options(
&self,
crds: &RwLock<Crds>,
self_id: &Pubkey,
self_shred_version: u16,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<ContactInfo> {
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info().unwrap();
// Stop pulling from nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&info.id).unwrap_or(&0);
if *stake == 0 || !rng.gen_ratio(1, 16) {
return None;
}
}
Some(info)
})
.filter(|v| {
v.id != *self_id
&& ContactInfo::is_valid_address(&v.gossip, socket_addr_space)
&& (self_shred_version == 0 || self_shred_version == v.shred_version)
&& gossip_validators
.map_or(true, |gossip_validators| gossip_validators.contains(&v.id))
})
.cloned()
.collect()
}

/// Process a pull request
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where
Expand Down Expand Up @@ -625,7 +588,6 @@ pub(crate) mod tests {
crate::{
cluster_info::MAX_BLOOM_SIZE,
crds_value::{CrdsData, Vote},
socketaddr,
},
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, SeedableRng},
Expand Down Expand Up @@ -679,186 +641,6 @@ pub(crate) mod tests {
}
}

#[test]
fn test_new_pull_with_stakes() {
let mut crds = Crds::default();
let mut stakes = HashMap::new();
let node = CrdsGossipPull::default();
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
for i in 1..=30 {
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
let id = entry.label().pubkey();
crds.insert(entry.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
}
let now = 1024;
let crds = RwLock::new(crds);
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
now,
None,
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
}

#[test]
fn test_no_pulls_from_different_shred_versions() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPull::default();

let gossip = socketaddr!("127.0.0.1:1234");

let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let spy = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 0,
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let node_456 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 456,
gossip,
..ContactInfo::default()
}));

crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);

// shred version 123 should ignore nodes with versions 0 and 456
let options = node
.pull_options(
&crds,
&me.label().pubkey(),
123,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey()));

// spy nodes will see all
let options = node
.pull_options(
&crds,
&spy.label().pubkey(),
0,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 3);
assert!(options.contains(&me.pubkey()));
assert!(options.contains(&node_123.pubkey()));
assert!(options.contains(&node_456.pubkey()));
}

#[test]
fn test_pulls_only_from_allowed() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPull::default();
let gossip = socketaddr!("127.0.0.1:1234");

let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));

crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);

// Empty gossip_validators -- will pull from nobody
let mut gossip_validators = HashSet::new();
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());

// Unknown pubkey in gossip_validators -- will pull from nobody
gossip_validators.insert(solana_sdk::pubkey::new_rand());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());

// node_123 pubkey in gossip_validators -- will pull from it
gossip_validators.insert(node_123.pubkey());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].id, node_123.pubkey());
}

#[test]
fn test_crds_filter_set_add() {
let mut rng = thread_rng();
Expand Down
Loading

0 comments on commit 9f2910e

Please sign in to comment.