From e963f87da96fc07f05c8d36bf48be0ec0cd10e2c Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 21 Mar 2024 17:54:17 -0400 Subject: [PATCH] Evict oldest vote on vote refresh after restart (#327) --- gossip/src/cluster_info.rs | 164 ++++++++++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 40 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 471d768a101051..783f8a067d7614 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1045,6 +1045,31 @@ impl ClusterInfo { } } + fn find_vote_index_to_evict(&self, should_evict_vote: impl Fn(&Vote) -> bool) -> u8 { + let self_pubkey = self.id(); + let mut num_crds_votes = 0; + let vote_index = { + let gossip_crds = + self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); + (0..MAX_LOCKOUT_HISTORY as u8) + .filter_map(|ix| { + let vote = CrdsValueLabel::Vote(ix, self_pubkey); + let vote: &CrdsData = gossip_crds.get(&vote)?; + num_crds_votes += 1; + match &vote { + CrdsData::Vote(_, vote) if should_evict_vote(vote) => { + Some((vote.wallclock, ix)) + } + CrdsData::Vote(_, _) => None, + _ => panic!("this should not happen!"), + } + }) + .min() // Boot the oldest evicted vote by wallclock. + .map(|(_ /*wallclock*/, ix)| ix) + }; + vote_index.unwrap_or(num_crds_votes) + } + pub fn push_vote(&self, tower: &[Slot], vote: Transaction) { debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b)); // Find a crds vote which is evicted from the tower, and recycle its @@ -1057,8 +1082,7 @@ impl ClusterInfo { // gossip. // TODO: When there are more than one vote evicted from the tower, only // one crds vote is overwritten here. Decide what to do with the rest. - let mut num_crds_votes = 0; - let self_pubkey = self.id(); + // Returns true if the tower does not contain the vote.slot. let should_evict_vote = |vote: &Vote| -> bool { match vote.slot() { @@ -1069,26 +1093,7 @@ impl ClusterInfo { } } }; - let vote_index = { - let gossip_crds = - self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); - (0..MAX_LOCKOUT_HISTORY as u8) - .filter_map(|ix| { - let vote = CrdsValueLabel::Vote(ix, self_pubkey); - let vote: &CrdsData = gossip_crds.get(&vote)?; - num_crds_votes += 1; - match &vote { - CrdsData::Vote(_, vote) if should_evict_vote(vote) => { - Some((vote.wallclock, ix)) - } - CrdsData::Vote(_, _) => None, - _ => panic!("this should not happen!"), - } - }) - .min() // Boot the oldest evicted vote by wallclock. - .map(|(_ /*wallclock*/, ix)| ix) - }; - let vote_index = vote_index.unwrap_or(num_crds_votes); + let vote_index = self.find_vote_index_to_evict(should_evict_vote); if (vote_index as usize) >= MAX_LOCKOUT_HISTORY { let (_, vote, hash, _) = vote_parser::parse_vote_transaction(&vote).unwrap(); panic!( @@ -1102,7 +1107,7 @@ impl ClusterInfo { self.push_vote_at_index(vote, vote_index); } - pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) { + pub fn refresh_vote(&self, refresh_vote: Transaction, refresh_vote_slot: Slot) { let vote_index = { let self_pubkey = self.id(); let gossip_crds = @@ -1116,7 +1121,7 @@ impl ClusterInfo { panic!("this should not happen!"); }; match prev_vote.slot() { - Some(prev_vote_slot) => prev_vote_slot == vote_slot, + Some(prev_vote_slot) => prev_vote_slot == refresh_vote_slot, None => { error!("crds vote with no slots!"); false @@ -1125,13 +1130,27 @@ impl ClusterInfo { }) }; - // If you don't see a vote with the same slot yet, this means you probably - // restarted, and need to wait for your oldest vote to propagate back to you. - // // We don't write to an arbitrary index, because it may replace one of this validator's // existing votes on the network. if let Some(vote_index) = vote_index { - self.push_vote_at_index(vote, vote_index); + self.push_vote_at_index(refresh_vote, vote_index); + } else { + // If you don't see a vote with the same slot yet, this means you probably + // restarted, and need to repush and evict the oldest vote + let should_evict_vote = |vote: &Vote| -> bool { + vote.slot() + .map(|slot| refresh_vote_slot > slot) + .unwrap_or(true) + }; + let vote_index = self.find_vote_index_to_evict(should_evict_vote); + if (vote_index as usize) >= MAX_LOCKOUT_HISTORY { + warn!( + "trying to refresh slot {} but all votes in gossip table are for newer slots", + refresh_vote_slot, + ); + return; + } + self.push_vote_at_index(refresh_vote, vote_index); } } @@ -3673,6 +3692,77 @@ mod tests { .unwrap(); } + #[test] + fn test_refresh_vote_eviction() { + let keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); + let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); + + // Push MAX_LOCKOUT_HISTORY votes into gossip, one for each slot between + // [lowest_vote_slot, lowest_vote_slot + MAX_LOCKOUT_HISTORY) + let lowest_vote_slot = 1; + let max_vote_slot = lowest_vote_slot + MAX_LOCKOUT_HISTORY as Slot; + let mut first_vote = None; + let mut prev_votes = vec![]; + for slot in 1..max_vote_slot { + prev_votes.push(slot); + let unrefresh_vote = Vote::new(vec![slot], Hash::new_unique()); + let vote_ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + unrefresh_vote, + ); + let vote_tx = Transaction::new_with_payer( + &[vote_ix], // instructions + None, // payer + ); + if first_vote.is_none() { + first_vote = Some(vote_tx.clone()); + } + cluster_info.push_vote(&prev_votes, vote_tx); + } + + let initial_votes = cluster_info.get_votes(&mut Cursor::default()); + assert_eq!(initial_votes.len(), MAX_LOCKOUT_HISTORY); + + // Trying to refresh a vote less than all votes in gossip should do nothing + let refresh_slot = lowest_vote_slot - 1; + let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique()); + let refresh_ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + refresh_vote.clone(), + ); + let refresh_tx = Transaction::new_with_payer( + &[refresh_ix], // instructions + None, // payer + ); + cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); + let current_votes = cluster_info.get_votes(&mut Cursor::default()); + assert_eq!(initial_votes, current_votes); + assert!(!current_votes.contains(&refresh_tx)); + + // Trying to refresh a vote should evict the first slot less than the refreshed vote slot + let refresh_slot = max_vote_slot + 1; + let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique()); + let refresh_ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + refresh_vote.clone(), + ); + let refresh_tx = Transaction::new_with_payer( + &[refresh_ix], // instructions + None, // payer + ); + cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); + + // This should evict the latest vote since it's for a slot less than refresh_slot + let votes = cluster_info.get_votes(&mut Cursor::default()); + assert_eq!(votes.len(), MAX_LOCKOUT_HISTORY); + assert!(votes.contains(&refresh_tx)); + assert!(!votes.contains(&first_vote.unwrap())); + } + #[test] fn test_refresh_vote() { let keypair = Arc::new(Keypair::new()); @@ -3697,8 +3787,9 @@ mod tests { let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![unrefresh_tx.clone()]); - // Now construct vote for the slot to be refreshed later - let refresh_slot = 7; + // Now construct vote for the slot to be refreshed later. Has to be less than the `unrefresh_slot`, + // otherwise it will evict that slot + let refresh_slot = unrefresh_slot - 1; let refresh_tower = vec![1, 3, unrefresh_slot, refresh_slot]; let refresh_vote = Vote::new(refresh_tower.clone(), Hash::new_unique()); let refresh_ix = vote_instruction::vote( @@ -3712,19 +3803,12 @@ mod tests { ); // Trying to refresh vote when it doesn't yet exist in gossip - // shouldn't add the vote + // should add the vote without eviction if there is room in the gossip table. cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); - let votes = cluster_info.get_votes(&mut cursor); - assert_eq!(votes, vec![]); - let votes = cluster_info.get_votes(&mut Cursor::default()); - assert_eq!(votes.len(), 1); - assert!(votes.contains(&unrefresh_tx)); - - // Push the new vote for `refresh_slot` - cluster_info.push_vote(&refresh_tower, refresh_tx.clone()); // Should be two votes in gossip - let votes = cluster_info.get_votes(&mut Cursor::default()); + cursor = Cursor::default(); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 2); assert!(votes.contains(&unrefresh_tx)); assert!(votes.contains(&refresh_tx));