Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race condition on vote count #1762

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ impl LatestUnprocessedVotes {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();
if let Some(latest_vote) = self.get_entry(pubkey) {

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
Expand All @@ -225,15 +228,24 @@ impl LatestUnprocessedVotes {
}
}
}
return Some(vote);
}
Some(vote)
};

// Should have low lock contention because this is only hit on the first few blocks of startup
// and when a new vote account starts voting.
let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap();
latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
if let Some(latest_vote) = self.get_entry(pubkey) {
with_latest_vote(&latest_vote, vote)
} else {
// Grab write-lock to insert new vote.
match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could get away with just this match, right? But this would mean taking the write lock every time...

Are we only expected to take this path during startup?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah afaict we do not remove keys from the map, so we only write-lock when we receive a vote from a node we hadn't seen before - though @AshwinSekar would be more familiar with when/if we clear entries out of the map.

We don't wanna write-lock everytime, so we have the initial get_entry branch.

Copy link

@AshwinSekar AshwinSekar Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not remove keys from the map, and we only write-lock here either on startup or when a validator submits a vote for the first time. the match avoids taking the write lock on the entire map for every new vote insertion

I like the approach you have here, thanks for noticing this.

std::collections::hash_map::Entry::Occupied(entry) => {
with_latest_vote(entry.get(), vote)
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -682,6 +694,47 @@ mod tests {
);
}

#[test]
fn test_update_latest_vote_race() {
// There was a race condition in updating the same pubkey in the hashmap
// when the entry does not initially exist.
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

const NUM_VOTES: usize = 100;
let keypairs = Arc::new(
(0..NUM_VOTES)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect_vec(),
);

// Insert votes in parallel
let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
};

let hdl = Builder::new()
.spawn({
let latest_unprocessed_votes = latest_unprocessed_votes.clone();
let keypairs = keypairs.clone();
move || {
for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}
}
})
.unwrap();

for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}

hdl.join().unwrap();
assert_eq!(NUM_VOTES, latest_unprocessed_votes.len());
}

#[test]
fn test_simulate_threads() {
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
Expand Down