Skip to content

Commit

Permalink
Evict oldest vote on vote refresh after restart (solana-labs#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Mar 21, 2024
1 parent 5f16932 commit e963f87
Showing 1 changed file with 124 additions and 40 deletions.
164 changes: 124 additions & 40 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,31 @@ impl ClusterInfo {
}
}

fn find_vote_index_to_evict(&self, should_evict_vote: impl Fn(&Vote) -> bool) -> u8 {
let self_pubkey = self.id();
let mut num_crds_votes = 0;
let vote_index = {
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote: &CrdsData = gossip_crds.get(&vote)?;
num_crds_votes += 1;
match &vote {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
Some((vote.wallclock, ix))
}
CrdsData::Vote(_, _) => None,
_ => panic!("this should not happen!"),
}
})
.min() // Boot the oldest evicted vote by wallclock.
.map(|(_ /*wallclock*/, ix)| ix)
};
vote_index.unwrap_or(num_crds_votes)
}

pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b));
// Find a crds vote which is evicted from the tower, and recycle its
Expand All @@ -1057,8 +1082,7 @@ impl ClusterInfo {
// gossip.
// TODO: When there are more than one vote evicted from the tower, only
// one crds vote is overwritten here. Decide what to do with the rest.
let mut num_crds_votes = 0;
let self_pubkey = self.id();

// Returns true if the tower does not contain the vote.slot.
let should_evict_vote = |vote: &Vote| -> bool {
match vote.slot() {
Expand All @@ -1069,26 +1093,7 @@ impl ClusterInfo {
}
}
};
let vote_index = {
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote: &CrdsData = gossip_crds.get(&vote)?;
num_crds_votes += 1;
match &vote {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
Some((vote.wallclock, ix))
}
CrdsData::Vote(_, _) => None,
_ => panic!("this should not happen!"),
}
})
.min() // Boot the oldest evicted vote by wallclock.
.map(|(_ /*wallclock*/, ix)| ix)
};
let vote_index = vote_index.unwrap_or(num_crds_votes);
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
let (_, vote, hash, _) = vote_parser::parse_vote_transaction(&vote).unwrap();
panic!(
Expand All @@ -1102,7 +1107,7 @@ impl ClusterInfo {
self.push_vote_at_index(vote, vote_index);
}

pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) {
pub fn refresh_vote(&self, refresh_vote: Transaction, refresh_vote_slot: Slot) {
let vote_index = {
let self_pubkey = self.id();
let gossip_crds =
Expand All @@ -1116,7 +1121,7 @@ impl ClusterInfo {
panic!("this should not happen!");
};
match prev_vote.slot() {
Some(prev_vote_slot) => prev_vote_slot == vote_slot,
Some(prev_vote_slot) => prev_vote_slot == refresh_vote_slot,
None => {
error!("crds vote with no slots!");
false
Expand All @@ -1125,13 +1130,27 @@ impl ClusterInfo {
})
};

// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to wait for your oldest vote to propagate back to you.
//
// We don't write to an arbitrary index, because it may replace one of this validator's
// existing votes on the network.
if let Some(vote_index) = vote_index {
self.push_vote_at_index(vote, vote_index);
self.push_vote_at_index(refresh_vote, vote_index);
} else {
// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to repush and evict the oldest vote
let should_evict_vote = |vote: &Vote| -> bool {
vote.slot()
.map(|slot| refresh_vote_slot > slot)
.unwrap_or(true)
};
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
warn!(
"trying to refresh slot {} but all votes in gossip table are for newer slots",
refresh_vote_slot,
);
return;
}
self.push_vote_at_index(refresh_vote, vote_index);
}
}

Expand Down Expand Up @@ -3673,6 +3692,77 @@ mod tests {
.unwrap();
}

#[test]
fn test_refresh_vote_eviction() {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);

// Push MAX_LOCKOUT_HISTORY votes into gossip, one for each slot between
// [lowest_vote_slot, lowest_vote_slot + MAX_LOCKOUT_HISTORY)
let lowest_vote_slot = 1;
let max_vote_slot = lowest_vote_slot + MAX_LOCKOUT_HISTORY as Slot;
let mut first_vote = None;
let mut prev_votes = vec![];
for slot in 1..max_vote_slot {
prev_votes.push(slot);
let unrefresh_vote = Vote::new(vec![slot], Hash::new_unique());
let vote_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
unrefresh_vote,
);
let vote_tx = Transaction::new_with_payer(
&[vote_ix], // instructions
None, // payer
);
if first_vote.is_none() {
first_vote = Some(vote_tx.clone());
}
cluster_info.push_vote(&prev_votes, vote_tx);
}

let initial_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes.len(), MAX_LOCKOUT_HISTORY);

// Trying to refresh a vote less than all votes in gossip should do nothing
let refresh_slot = lowest_vote_slot - 1;
let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique());
let refresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
refresh_vote.clone(),
);
let refresh_tx = Transaction::new_with_payer(
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
let current_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes, current_votes);
assert!(!current_votes.contains(&refresh_tx));

// Trying to refresh a vote should evict the first slot less than the refreshed vote slot
let refresh_slot = max_vote_slot + 1;
let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique());
let refresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
refresh_vote.clone(),
);
let refresh_tx = Transaction::new_with_payer(
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);

// This should evict the latest vote since it's for a slot less than refresh_slot
let votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), MAX_LOCKOUT_HISTORY);
assert!(votes.contains(&refresh_tx));
assert!(!votes.contains(&first_vote.unwrap()));
}

#[test]
fn test_refresh_vote() {
let keypair = Arc::new(Keypair::new());
Expand All @@ -3697,8 +3787,9 @@ mod tests {
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![unrefresh_tx.clone()]);

// Now construct vote for the slot to be refreshed later
let refresh_slot = 7;
// Now construct vote for the slot to be refreshed later. Has to be less than the `unrefresh_slot`,
// otherwise it will evict that slot
let refresh_slot = unrefresh_slot - 1;
let refresh_tower = vec![1, 3, unrefresh_slot, refresh_slot];
let refresh_vote = Vote::new(refresh_tower.clone(), Hash::new_unique());
let refresh_ix = vote_instruction::vote(
Expand All @@ -3712,19 +3803,12 @@ mod tests {
);

// Trying to refresh vote when it doesn't yet exist in gossip
// shouldn't add the vote
// should add the vote without eviction if there is room in the gossip table.
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
let votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 1);
assert!(votes.contains(&unrefresh_tx));

// Push the new vote for `refresh_slot`
cluster_info.push_vote(&refresh_tower, refresh_tx.clone());

// Should be two votes in gossip
let votes = cluster_info.get_votes(&mut Cursor::default());
cursor = Cursor::default();
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&refresh_tx));
Expand Down

0 comments on commit e963f87

Please sign in to comment.