Skip to content

Commit

Permalink
fix race condition on vote count (anza-xyz#1762)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored and neutrinoks committed Jul 17, 2024
1 parent d930e87 commit ee2c05f
Showing 1 changed file with 62 additions and 9 deletions.
71 changes: 62 additions & 9 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ impl LatestUnprocessedVotes {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();
if let Some(latest_vote) = self.get_entry(pubkey) {

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
Expand All @@ -225,15 +228,24 @@ impl LatestUnprocessedVotes {
}
}
}
return Some(vote);
}
Some(vote)
};

// Should have low lock contention because this is only hit on the first few blocks of startup
// and when a new vote account starts voting.
let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap();
latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
if let Some(latest_vote) = self.get_entry(pubkey) {
with_latest_vote(&latest_vote, vote)
} else {
// Grab write-lock to insert new vote.
match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) {
std::collections::hash_map::Entry::Occupied(entry) => {
with_latest_vote(entry.get(), vote)
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -682,6 +694,47 @@ mod tests {
);
}

#[test]
fn test_update_latest_vote_race() {
// There was a race condition in updating the same pubkey in the hashmap
// when the entry does not initially exist.
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

const NUM_VOTES: usize = 100;
let keypairs = Arc::new(
(0..NUM_VOTES)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect_vec(),
);

// Insert votes in parallel
let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
};

let hdl = Builder::new()
.spawn({
let latest_unprocessed_votes = latest_unprocessed_votes.clone();
let keypairs = keypairs.clone();
move || {
for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}
}
})
.unwrap();

for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}

hdl.join().unwrap();
assert_eq!(NUM_VOTES, latest_unprocessed_votes.len());
}

#[test]
fn test_simulate_threads() {
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
Expand Down

0 comments on commit ee2c05f

Please sign in to comment.