diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 44ed870e3fb86b..d7455038d6410e 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -203,7 +203,10 @@ impl LatestUnprocessedVotes { let pubkey = vote.pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); - if let Some(latest_vote) = self.get_entry(pubkey) { + + let with_latest_vote = |latest_vote: &RwLock, + vote: LatestValidatorVotePacket| + -> Option { let (latest_slot, latest_timestamp) = latest_vote .read() .map(|vote| (vote.slot(), vote.timestamp())) @@ -225,15 +228,24 @@ impl LatestUnprocessedVotes { } } } - return Some(vote); - } + Some(vote) + }; - // Should have low lock contention because this is only hit on the first few blocks of startup - // and when a new vote account starts voting. - let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); - latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote))); - self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); - None + if let Some(latest_vote) = self.get_entry(pubkey) { + with_latest_vote(&latest_vote, vote) + } else { + // Grab write-lock to insert new vote. + match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) { + std::collections::hash_map::Entry::Occupied(entry) => { + with_latest_vote(entry.get(), vote) + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(Arc::new(RwLock::new(vote))); + self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); + None + } + } + } } #[cfg(test)] @@ -682,6 +694,47 @@ mod tests { ); } + #[test] + fn test_update_latest_vote_race() { + // There was a race condition in updating the same pubkey in the hashmap + // when the entry does not initially exist. + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + + const NUM_VOTES: usize = 100; + let keypairs = Arc::new( + (0..NUM_VOTES) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect_vec(), + ); + + // Insert votes in parallel + let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, + keypairs: &Arc>, + i: usize| { + let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None); + latest_unprocessed_votes.update_latest_vote(vote); + }; + + let hdl = Builder::new() + .spawn({ + let latest_unprocessed_votes = latest_unprocessed_votes.clone(); + let keypairs = keypairs.clone(); + move || { + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + } + }) + .unwrap(); + + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + + hdl.join().unwrap(); + assert_eq!(NUM_VOTES, latest_unprocessed_votes.len()); + } + #[test] fn test_simulate_threads() { let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());