Skip to content

Commit

Permalink
Weight push peers by how long we haven't pushed to them (#12620)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored Oct 2, 2020
1 parent adeb06e commit 71c469c
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions core/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;

// 10 minutes
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;

#[derive(Clone)]
pub struct CrdsGossipPush {
/// max bytes per message
Expand All @@ -50,6 +53,8 @@ pub struct CrdsGossipPush {
/// This cache represents a lagging view of which validators
/// currently have this node in their `active_set`
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
last_pushed_to: HashMap<Pubkey, u64>,
last_pushed_to_cleanup_ts: u64,
pub num_active: usize,
pub push_fanout: usize,
pub msg_timeout: u64,
Expand All @@ -67,6 +72,8 @@ impl Default for CrdsGossipPush {
active_set: IndexMap::new(),
push_messages: HashMap::new(),
received_cache: HashMap::new(),
last_pushed_to: HashMap::new(),
last_pushed_to_cleanup_ts: 0,
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
Expand Down Expand Up @@ -254,6 +261,15 @@ impl CrdsGossipPush {
self.push_messages.remove(&v.label());
}
}

for target_pubkey in push_messages.keys() {
*self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now;
}
if now - self.last_pushed_to_cleanup_ts > MAX_PUSHED_TO_TIMEOUT_MS {
self.last_pushed_to
.retain(|_id, timestamp| now - *timestamp > MAX_PUSHED_TO_TIMEOUT_MS);
self.last_pushed_to_cleanup_ts = now;
}
push_messages
}

Expand Down Expand Up @@ -357,10 +373,10 @@ impl CrdsGossipPush {
gossip_validators.contains(&info.id)
})
})
.map(|(info, value)| {
.map(|(info, _value)| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let last_updated: u64 = value.local_timestamp;
let since = ((timestamp() - last_updated) / 1024) as u32;
let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0);
let since = ((timestamp() - last_pushed_to) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
Expand Down Expand Up @@ -576,9 +592,10 @@ mod test {
}
#[test]
fn test_active_set_refresh_with_bank() {
solana_logger::setup();
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let mut push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
for i in 1..=100 {
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
Expand All @@ -588,6 +605,7 @@ mod test {
let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100);
push.last_pushed_to.insert(id, time);
}
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
assert!(!options.is_empty());
Expand Down

0 comments on commit 71c469c

Please sign in to comment.