From 59de1b3b629dbc3d02bb3166cb0d20ea70bc5bb2 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 11 May 2020 22:20:11 -0700 Subject: [PATCH] Compute Switch Threshold (#9218) * Add switching threshold check Co-authored-by: Carl --- core/src/consensus.rs | 412 ++++++++++++++++++++++++++++++---- core/src/lib.rs | 1 + core/src/progress_map.rs | 67 +++--- core/src/pubkey_references.rs | 21 ++ core/src/replay_stage.rs | 66 +++--- 5 files changed, 468 insertions(+), 99 deletions(-) create mode 100644 core/src/pubkey_references.rs diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 055b02c394477e..20018879e67834 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,4 +1,7 @@ -use crate::progress_map::ProgressMap; +use crate::{ + progress_map::{LockoutIntervals, ProgressMap}, + pubkey_references::PubkeyReferences, +}; use chrono::prelude::*; use solana_ledger::bank_forks::BankForks; use solana_runtime::bank::Bank; @@ -12,12 +15,14 @@ use solana_vote_program::vote_state::{ BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL, }; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, + ops::Bound::{Included, Unbounded}, sync::Arc, }; pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; +pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; #[derive(Default, Debug, Clone)] pub struct StakeLockout { @@ -89,13 +94,17 @@ impl Tower { bank_slot: u64, vote_accounts: F, ancestors: &HashMap>, - ) -> (HashMap, u64, u128) + all_pubkeys: &mut PubkeyReferences, + ) -> (HashMap, u64, u128, LockoutIntervals) where F: Iterator, { let mut stake_lockouts = HashMap::new(); let mut total_stake = 0; let mut total_weight = 0; + // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], + // keyed by end of the range + let mut lockout_intervals = BTreeMap::new(); for (key, (lamports, account)) in vote_accounts { if lamports == 0 { continue; @@ -115,6 +124,14 @@ impl Tower { } let mut vote_state = vote_state.unwrap(); + for vote in &vote_state.votes { + let key = all_pubkeys.get_or_insert(&key); + lockout_intervals + .entry(vote.expiration_slot()) + .or_insert_with(|| vec![]) + .push((vote.slot, key)); + } + if key == self.node_pubkey || vote_state.node_pubkey == self.node_pubkey { debug!("vote state {:?}", vote_state); debug!( @@ -180,7 +197,7 @@ impl Tower { } total_stake += lamports; } - (stake_lockouts, total_stake, total_weight) + (stake_lockouts, total_stake, total_weight, lockout_intervals) } pub fn is_slot_confirmed( @@ -319,6 +336,98 @@ impl Tower { false } + + pub(crate) fn check_switch_threshold( + &self, + switch_slot: u64, + ancestors: &HashMap>, + descendants: &HashMap>, + progress: &ProgressMap, + total_stake: u64, + epoch_vote_accounts: &HashMap, + ) -> bool { + self.last_vote() + .slots + .last() + .map(|last_vote| { + let last_vote_ancestors = ancestors.get(&last_vote).unwrap(); + let switch_slot_ancestors = ancestors.get(&switch_slot).unwrap(); + + if switch_slot == *last_vote || switch_slot_ancestors.contains(last_vote) { + // If the `switch_slot is a descendant of the last vote, + // no switching proof is neceessary + return true; + } + + // Should never consider switching to an ancestor + // of your last vote + assert!(!last_vote_ancestors.contains(&switch_slot)); + + let mut locked_out_stake = 0; + let mut locked_out_vote_accounts = HashSet::new(); + for (candidate_slot, descendants) in descendants.iter() { + // 1) Only consider lockouts a tips of forks as that + // includes all ancestors of that fork. + // 2) Don't consider lockouts on the `last_vote` itself + // 3) Don't consider lockouts on any descendants of + // `last_vote` + if !descendants.is_empty() + || candidate_slot == last_vote + || ancestors + .get(&candidate_slot) + .expect( + "empty descendants implies this is a child, not parent of root, so must + exist in the ancestors map", + ) + .contains(last_vote) + { + continue; + } + + // By the time we reach here, any ancestors of the `last_vote`, + // should have been filtered out, as they all have a descendant, + // namely the `last_vote` itself. + assert!(!last_vote_ancestors.contains(candidate_slot)); + + // Evaluate which vote accounts in the bank are locked out + // in the interval candidate_slot..last_vote, which means + // finding any lockout intervals in the `lockout_intervals` tree + // for this bank that contain `last_vote`. + let lockout_intervals = &progress + .get(&candidate_slot) + .unwrap() + .fork_stats + .lockout_intervals; + // Find any locked out intervals in this bank with endpoint >= last_vote, + // implies they are locked out at last_vote + for (_, value) in lockout_intervals.range((Included(last_vote), Unbounded)) { + for (lockout_interval_start, vote_account_pubkey) in value { + // Only count lockouts on slots that are: + // 1) Not ancestors of `last_vote` + // 2) Not from before the current root as we can't determine if + // anything before the root was an ancestor of `last_vote` or not + if !last_vote_ancestors.contains(lockout_interval_start) + // The check if the key exists in the ancestors map + // is equivalent to checking if the key is above the + // current root. + && ancestors.contains_key(lockout_interval_start) + && !locked_out_vote_accounts.contains(vote_account_pubkey) + { + let stake = epoch_vote_accounts + .get(vote_account_pubkey) + .map(|(stake, _)| *stake) + .unwrap_or(0); + locked_out_stake += stake; + locked_out_vote_accounts.insert(vote_account_pubkey); + } + } + } + } + (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD + }) + .unwrap_or(true) + } + pub fn check_vote_stake_threshold( &self, slot: Slot, @@ -353,18 +462,6 @@ impl Tower { } } - pub(crate) fn check_switch_threshold( - &self, - _slot: Slot, - _ancestors: &HashMap>, - _descendants: &HashMap>, - _progress: &ProgressMap, - _total_epoch_stake: u64, - _epoch_vote_accounts: &HashMap, - ) -> bool { - true - } - /// Update lockouts for all the ancestors fn update_ancestor_lockouts( stake_lockouts: &mut HashMap, @@ -410,8 +507,12 @@ impl Tower { } fn bank_weight(&self, bank: &Bank, ancestors: &HashMap>) -> u128 { - let (_, _, bank_weight) = - self.collect_vote_lockouts(bank.slot(), bank.vote_accounts().into_iter(), ancestors); + let (_, _, bank_weight, _) = self.collect_vote_lockouts( + bank.slot(), + bank.vote_accounts().into_iter(), + ancestors, + &mut PubkeyReferences::default(), + ); bank_weight } @@ -501,9 +602,12 @@ pub mod test { vote_state::{Vote, VoteStateVersions}, vote_transaction, }; - use std::collections::HashMap; - use std::sync::RwLock; - use std::{thread::sleep, time::Duration}; + use std::{ + collections::HashMap, + rc::Rc, + sync::RwLock, + {thread::sleep, time::Duration}, + }; use trees::{tr, Tree, TreeWalk}; pub(crate) struct VoteSimulator { @@ -593,7 +697,7 @@ pub mod test { .cloned() .collect(); - ReplayStage::compute_bank_stats( + let _ = ReplayStage::compute_bank_stats( &my_pubkey, &ancestors, &mut frozen_banks, @@ -602,7 +706,7 @@ pub mod test { &VoteTracker::default(), &ClusterSlots::default(), &self.bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); let vote_bank = self @@ -631,19 +735,62 @@ pub mod test { } let vote = tower.new_vote_from_bank(&vote_bank, &my_vote_pubkey).0; if let Some(new_root) = tower.record_bank_vote(vote) { - ReplayStage::handle_new_root( - new_root, - &self.bank_forks, - &mut self.progress, - &None, - &mut HashSet::new(), - None, - ); + self.set_root(new_root); } vec![] } + pub fn set_root(&mut self, new_root: Slot) { + ReplayStage::handle_new_root( + new_root, + &self.bank_forks, + &mut self.progress, + &None, + &mut PubkeyReferences::default(), + None, + ) + } + + fn create_and_vote_new_branch( + &mut self, + start_slot: Slot, + end_slot: Slot, + cluster_votes: &HashMap>, + votes_to_simulate: &HashSet, + my_pubkey: &Pubkey, + tower: &mut Tower, + ) -> HashMap> { + (start_slot + 1..=end_slot) + .filter_map(|slot| { + let mut fork_tip_parent = tr(slot - 1); + fork_tip_parent.push_front(tr(slot)); + self.fill_bank_forks(fork_tip_parent, &cluster_votes); + if votes_to_simulate.contains(&slot) { + Some((slot, self.simulate_vote(slot, &my_pubkey, tower))) + } else { + None + } + }) + .collect() + } + + fn simulate_lockout_interval( + &mut self, + slot: Slot, + lockout_interval: (u64, u64), + vote_account_pubkey: &Pubkey, + ) { + self.progress + .entry(slot) + .or_insert_with(|| ForkProgress::new(Hash::default(), None, None, 0, 0)) + .fork_stats + .lockout_intervals + .entry(lockout_interval.1) + .or_default() + .push((lockout_interval.0, Rc::new(*vote_account_pubkey))); + } + fn can_progress_on_fork( &mut self, my_pubkey: &Pubkey, @@ -790,6 +937,182 @@ pub mod test { } } + #[test] + fn test_switch_threshold() { + // Init state + let mut vote_simulator = VoteSimulator::new(2); + let my_pubkey = vote_simulator.node_pubkeys[0]; + let other_vote_account = vote_simulator.vote_pubkeys[1]; + let bank0 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .clone(); + let total_stake = bank0.total_epoch_stake(); + assert_eq!( + total_stake, + vote_simulator.validator_keypairs.len() as u64 * 10_000 + ); + + // Create the tree of banks + let forks = tr(0) + / (tr(1) + / (tr(2) + // Minor fork 1 + / (tr(10) / (tr(11) / (tr(12) / (tr(13) / (tr(14)))))) + / (tr(43) + / (tr(44) + // Minor fork 2 + / (tr(45) / (tr(46) / (tr(47) / (tr(48) / (tr(49) / (tr(50))))))) + / (tr(110)))))); + + // Fill the BankForks according to the above fork structure + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let descendants = vote_simulator.bank_forks.read().unwrap().descendants(); + let mut tower = Tower::new_with_key(&my_pubkey); + + // Last vote is 47 + tower.record_vote(47, Hash::default()); + + // Trying to switch to a descendant of last vote should always work + assert!(tower.check_switch_threshold( + 48, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // Trying to switch to another fork at 110 should fail + assert!(!tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // Adding another validator lockout on a descendant of last vote should + // not count toward the switch threshold + vote_simulator.simulate_lockout_interval(50, (49, 100), &other_vote_account); + assert!(!tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // Adding another validator lockout on an ancestor of last vote should + // not count toward the switch threshold + vote_simulator.simulate_lockout_interval(50, (45, 100), &other_vote_account); + assert!(!tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // Adding another validator lockout on a different fork, but the lockout + // doesn't cover the last vote, should not satisfy the switch threshold + vote_simulator.simulate_lockout_interval(14, (12, 46), &other_vote_account); + assert!(!tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // Adding another validator lockout on a different fork, and the lockout + // covers the last vote, should satisfy the switch threshold + vote_simulator.simulate_lockout_interval(14, (12, 47), &other_vote_account); + assert!(tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + + // If we set a root, then any lockout intervals below the root shouldn't + // count toward the switch threshold. This means the other validator's + // vote lockout no longer counts + vote_simulator.set_root(43); + assert!(!tower.check_switch_threshold( + 110, + &vote_simulator.bank_forks.read().unwrap().ancestors(), + &vote_simulator.bank_forks.read().unwrap().descendants(), + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + )); + } + + #[test] + fn test_switch_threshold_votes() { + // Init state + let mut vote_simulator = VoteSimulator::new(4); + let my_pubkey = vote_simulator.node_pubkeys[0]; + let mut tower = Tower::new_with_key(&my_pubkey); + let forks = tr(0) + / (tr(1) + / (tr(2) + // Minor fork 1 + / (tr(10) / (tr(11) / (tr(12) / (tr(13) / (tr(14)))))) + / (tr(43) + / (tr(44) + // Minor fork 2 + / (tr(45) / (tr(46)))) + / (tr(110))))); + + // Have two validators, each representing 20% of the stake vote on + // minor fork 2 at slots 46 + 47 + let mut cluster_votes: HashMap> = HashMap::new(); + cluster_votes.insert(vote_simulator.node_pubkeys[1], vec![46]); + cluster_votes.insert(vote_simulator.node_pubkeys[2], vec![47]); + vote_simulator.fill_bank_forks(forks, &cluster_votes); + + // Vote on the first minor fork at slot 14, should succeed + assert!(vote_simulator + .simulate_vote(14, &my_pubkey, &mut tower,) + .is_empty()); + + // The other two validators voted at slots 46, 47, which + // will only both show up in slot 48, at which point + // 2/5 > SWITCH_FORK_THRESHOLD of the stake has voted + // on another fork, so switching should suceed + let votes_to_simulate = (46..=48).into_iter().collect(); + let results = vote_simulator.create_and_vote_new_branch( + 45, + 48, + &cluster_votes, + &votes_to_simulate, + &my_pubkey, + &mut tower, + ); + for slot in 46..=48 { + if slot == 48 { + assert!(results.get(&slot).unwrap().is_empty()); + } else { + assert_eq!( + *results.get(&slot).unwrap(), + vec![HeaviestForkFailures::FailedSwitchThreshold(slot)] + ); + } + } + } + #[test] fn test_double_partition() { // Init state @@ -878,8 +1201,12 @@ pub mod test { let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())] .into_iter() .collect(); - let (staked_lockouts, total_staked, bank_weight) = - tower.collect_vote_lockouts(1, accounts.into_iter(), &ancestors); + let (staked_lockouts, total_staked, bank_weight, _) = tower.collect_vote_lockouts( + 1, + accounts.into_iter(), + &ancestors, + &mut PubkeyReferences::default(), + ); assert_eq!(staked_lockouts[&0].stake, 2); assert_eq!(staked_lockouts[&0].lockout, 2 + 2 + 4 + 4); assert_eq!(total_staked, 2); @@ -915,10 +1242,11 @@ pub mod test { + root_weight; let expected_bank_weight = 2 * vote_account_expected_weight; assert_eq!(tower.lockouts.root_slot, Some(0)); - let (staked_lockouts, _total_staked, bank_weight) = tower.collect_vote_lockouts( + let (staked_lockouts, _total_staked, bank_weight, _) = tower.collect_vote_lockouts( MAX_LOCKOUT_HISTORY as u64, accounts.into_iter(), &ancestors, + &mut PubkeyReferences::default(), ); for i in 0..MAX_LOCKOUT_HISTORY { assert_eq!(staked_lockouts[&(i as u64)].stake, 2); @@ -1323,16 +1651,24 @@ pub mod test { for vote in &tower_votes { tower.record_vote(*vote, Hash::default()); } - let (staked_lockouts, total_staked, _) = - tower.collect_vote_lockouts(vote_to_evaluate, accounts.clone().into_iter(), &ancestors); + let (staked_lockouts, total_staked, _, _) = tower.collect_vote_lockouts( + vote_to_evaluate, + accounts.clone().into_iter(), + &ancestors, + &mut PubkeyReferences::default(), + ); assert!(tower.check_vote_stake_threshold(vote_to_evaluate, &staked_lockouts, total_staked)); // CASE 2: Now we want to evaluate a vote for slot VOTE_THRESHOLD_DEPTH + 1. This slot // will expire the vote in one of the vote accounts, so we should have insufficient // stake to pass the threshold let vote_to_evaluate = VOTE_THRESHOLD_DEPTH as u64 + 1; - let (staked_lockouts, total_staked, _) = - tower.collect_vote_lockouts(vote_to_evaluate, accounts.into_iter(), &ancestors); + let (staked_lockouts, total_staked, _, _) = tower.collect_vote_lockouts( + vote_to_evaluate, + accounts.into_iter(), + &ancestors, + &mut PubkeyReferences::default(), + ); assert!(!tower.check_vote_stake_threshold( vote_to_evaluate, &staked_lockouts, diff --git a/core/src/lib.rs b/core/src/lib.rs index 1e1114ee3c245b..87bbad1b6be9bc 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -34,6 +34,7 @@ pub mod non_circulating_supply; pub mod poh_recorder; pub mod poh_service; pub mod progress_map; +pub mod pubkey_references; pub mod repair_service; pub mod replay_stage; mod result; diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index bf0d8c2ab93e31..3f511d5eb1261f 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -1,6 +1,7 @@ use crate::{ cluster_info_vote_listener::SlotVoteTracker, cluster_slots::SlotPubkeys, - consensus::StakeLockout, replay_stage::SUPERMINORITY_THRESHOLD, + consensus::StakeLockout, pubkey_references::PubkeyReferences, + replay_stage::SUPERMINORITY_THRESHOLD, }; use solana_ledger::{ bank_forks::BankForks, @@ -9,11 +10,13 @@ use solana_ledger::{ use solana_runtime::bank::Bank; use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey}; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, rc::Rc, sync::{Arc, RwLock}, }; +pub(crate) type LockoutIntervals = BTreeMap)>>; + #[derive(Default)] pub(crate) struct ReplaySlotStats(ConfirmationTiming); impl std::ops::Deref for ReplaySlotStats { @@ -191,6 +194,7 @@ pub(crate) struct ForkStats { pub(crate) stake_lockouts: HashMap, pub(crate) confirmation_reported: bool, pub(crate) computed: bool, + pub(crate) lockout_intervals: LockoutIntervals, } #[derive(Clone, Default)] @@ -210,18 +214,12 @@ impl PropagatedStats { pub fn add_vote_pubkey( &mut self, vote_pubkey: &Pubkey, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, stake: u64, ) { if !self.propagated_validators.contains(vote_pubkey) { - let mut cached_pubkey: Option> = all_pubkeys.get(vote_pubkey).cloned(); - if cached_pubkey.is_none() { - let new_pubkey = Rc::new(*vote_pubkey); - all_pubkeys.insert(new_pubkey.clone()); - cached_pubkey = Some(new_pubkey); - } - let vote_pubkey = cached_pubkey.unwrap(); - self.propagated_validators.insert(vote_pubkey); + let cached_pubkey = all_pubkeys.get_or_insert(vote_pubkey); + self.propagated_validators.insert(cached_pubkey); self.propagated_validators_stake += stake; } } @@ -229,7 +227,7 @@ impl PropagatedStats { pub fn add_node_pubkey( &mut self, node_pubkey: &Pubkey, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, bank: &Bank, ) { if !self.propagated_node_ids.contains(node_pubkey) { @@ -252,18 +250,12 @@ impl PropagatedStats { fn add_node_pubkey_internal( &mut self, node_pubkey: &Pubkey, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, vote_account_pubkeys: &[Pubkey], epoch_vote_accounts: &HashMap, ) { - let mut cached_pubkey: Option> = all_pubkeys.get(node_pubkey).cloned(); - if cached_pubkey.is_none() { - let new_pubkey = Rc::new(*node_pubkey); - all_pubkeys.insert(new_pubkey.clone()); - cached_pubkey = Some(new_pubkey); - } - let node_pubkey = cached_pubkey.unwrap(); - self.propagated_node_ids.insert(node_pubkey); + let cached_pubkey = all_pubkeys.get_or_insert(node_pubkey); + self.propagated_node_ids.insert(cached_pubkey); for vote_account_pubkey in vote_account_pubkeys.iter() { let stake = epoch_vote_accounts .get(vote_account_pubkey) @@ -398,16 +390,19 @@ mod test { #[test] fn test_add_vote_pubkey() { let mut stats = PropagatedStats::default(); - let mut all_pubkeys = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); let mut vote_pubkey = Pubkey::new_rand(); - all_pubkeys.insert(Rc::new(vote_pubkey.clone())); + all_pubkeys.get_or_insert(&vote_pubkey); // Add a vote pubkey, the number of references in all_pubkeys // should be 2 stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 1); assert!(stats.propagated_validators.contains(&vote_pubkey)); assert_eq!(stats.propagated_validators_stake, 1); - assert_eq!(Rc::strong_count(all_pubkeys.get(&vote_pubkey).unwrap()), 2); + assert_eq!( + Rc::strong_count(&all_pubkeys.get_or_insert(&vote_pubkey)), + 3 + ); // Adding it again should change no state since the key already existed stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 1); @@ -419,7 +414,10 @@ mod test { stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 2); assert!(stats.propagated_validators.contains(&vote_pubkey)); assert_eq!(stats.propagated_validators_stake, 3); - assert_eq!(Rc::strong_count(all_pubkeys.get(&vote_pubkey).unwrap()), 2); + assert_eq!( + Rc::strong_count(&all_pubkeys.get_or_insert(&vote_pubkey)), + 3 + ); } #[test] @@ -436,9 +434,9 @@ mod test { .collect(); let mut stats = PropagatedStats::default(); - let mut all_pubkeys = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); let mut node_pubkey = Pubkey::new_rand(); - all_pubkeys.insert(Rc::new(node_pubkey.clone())); + all_pubkeys.get_or_insert(&node_pubkey); // Add a vote pubkey, the number of references in all_pubkeys // should be 2 @@ -453,7 +451,10 @@ mod test { stats.propagated_validators_stake, staked_vote_accounts as u64 ); - assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2); + assert_eq!( + Rc::strong_count(&all_pubkeys.get_or_insert(&node_pubkey)), + 3 + ); // Adding it again should not change any state stats.add_node_pubkey_internal( @@ -482,7 +483,10 @@ mod test { stats.propagated_validators_stake, staked_vote_accounts as u64 ); - assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2); + assert_eq!( + Rc::strong_count(&all_pubkeys.get_or_insert(&node_pubkey)), + 3 + ); // Addding another pubkey with different vote accounts should succeed // and increase stake @@ -506,7 +510,10 @@ mod test { stats.propagated_validators_stake, 2 * staked_vote_accounts as u64 ); - assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2); + assert_eq!( + Rc::strong_count(&all_pubkeys.get_or_insert(&node_pubkey)), + 3 + ); } #[test] diff --git a/core/src/pubkey_references.rs b/core/src/pubkey_references.rs new file mode 100644 index 00000000000000..f4569153896ba3 --- /dev/null +++ b/core/src/pubkey_references.rs @@ -0,0 +1,21 @@ +use solana_sdk::pubkey::Pubkey; +use std::{collections::HashSet, rc::Rc}; + +#[derive(Default)] +pub struct PubkeyReferences(HashSet>); + +impl PubkeyReferences { + pub fn get_or_insert(&mut self, pubkey: &Pubkey) -> Rc { + let mut cached_pubkey: Option> = self.0.get(pubkey).cloned(); + if cached_pubkey.is_none() { + let new_pubkey = Rc::new(*pubkey); + self.0.insert(new_pubkey.clone()); + cached_pubkey = Some(new_pubkey); + } + cached_pubkey.unwrap() + } + + pub fn purge(&mut self) { + self.0.retain(|x| Rc::strong_count(x) > 1); + } +} diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 38e1f1a29b7011..83aa055a7bd32f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -9,6 +9,7 @@ use crate::{ consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats}, + pubkey_references::PubkeyReferences, repair_service::DuplicateSlotsResetReceiver, result::Result, rewards_recorder_service::RewardsRecorderSender, @@ -41,7 +42,6 @@ use solana_vote_program::{ use std::{ collections::{HashMap, HashSet}, ops::Deref, - rc::Rc, result, sync::{ atomic::{AtomicBool, Ordering}, @@ -184,7 +184,7 @@ impl ReplayStage { let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { - let mut all_pubkeys: HashSet> = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); let mut progress = ProgressMap::default(); @@ -856,7 +856,7 @@ impl ReplayStage { lockouts_sender: &Sender, accounts_hash_sender: &Option, latest_root_senders: &[Sender], - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, subscriptions: &Arc, block_commitment_cache: &Arc>, ) -> Result<()> { @@ -1144,7 +1144,7 @@ impl ReplayStage { vote_tracker: &VoteTracker, cluster_slots: &ClusterSlots, bank_forks: &RwLock, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); let mut new_stats = vec![]; @@ -1165,11 +1165,13 @@ impl ReplayStage { .expect("All frozen banks must exist in the Progress map"); if !stats.computed { - let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts( - bank_slot, - bank.vote_accounts().into_iter(), - &ancestors, - ); + let (stake_lockouts, total_staked, bank_weight, lockout_intervals) = tower + .collect_vote_lockouts( + bank_slot, + bank.vote_accounts().into_iter(), + &ancestors, + all_pubkeys, + ); stats.total_staked = total_staked; stats.weight = bank_weight; stats.fork_weight = stats.weight + parent_weight; @@ -1189,6 +1191,7 @@ impl ReplayStage { bank.parent().map(|b| b.slot()).unwrap_or(0) ); stats.stake_lockouts = stake_lockouts; + stats.lockout_intervals = lockout_intervals; stats.block_height = bank.block_height(); stats.computed = true; new_stats.push(bank_slot); @@ -1223,7 +1226,7 @@ impl ReplayStage { fn update_propagation_status( progress: &mut ProgressMap, slot: Slot, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, bank_forks: &RwLock, vote_tracker: &VoteTracker, cluster_slots: &ClusterSlots, @@ -1492,7 +1495,7 @@ impl ReplayStage { mut cluster_slot_pubkeys: Vec>, fork_tip: Slot, bank_forks: &RwLock, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, ) { let mut current_leader_slot = progress.get_latest_leader_slot(fork_tip); let mut did_newly_reach_threshold = false; @@ -1552,7 +1555,7 @@ impl ReplayStage { cluster_slot_pubkeys: &mut Vec>, leader_bank: &Bank, leader_propagated_stats: &mut PropagatedStats, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, did_child_reach_threshold: bool, ) -> bool { // Track whether this slot newly confirm propagation @@ -1655,7 +1658,7 @@ impl ReplayStage { bank_forks: &RwLock, progress: &mut ProgressMap, accounts_hash_sender: &Option, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, largest_confirmed_root: Option, ) { let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); @@ -1667,7 +1670,7 @@ impl ReplayStage { let r_bank_forks = bank_forks.read().unwrap(); let new_epoch = bank_forks.read().unwrap().root_bank().epoch(); if old_epoch != new_epoch { - all_pubkeys.retain(|x| Rc::strong_count(x) > 1); + all_pubkeys.purge(); } progress.handle_new_root(&r_bank_forks); } @@ -1679,7 +1682,7 @@ impl ReplayStage { subscriptions: &Arc, rewards_recorder_sender: Option, progress: &mut ProgressMap, - all_pubkeys: &mut HashSet>, + all_pubkeys: &mut PubkeyReferences, ) { // Find the next slot that chains to the old slot let forks = bank_forks.read().unwrap(); @@ -1823,6 +1826,7 @@ pub(crate) mod tests { use std::{ fs::remove_dir_all, iter, + rc::Rc, sync::{Arc, RwLock}, }; use trees::tr; @@ -2012,7 +2016,7 @@ pub(crate) mod tests { let bank_fork_ancestors = bank_forks.ancestors(); let wrapped_bank_fork = Arc::new(RwLock::new(bank_forks)); - let mut all_pubkeys = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); (0..validators.len()) .map(|i| { let mut frozen_banks: Vec<_> = wrapped_bank_fork @@ -2200,7 +2204,7 @@ pub(crate) mod tests { &subscriptions, None, &mut progress, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); assert!(bank_forks .read() @@ -2224,7 +2228,7 @@ pub(crate) mod tests { &subscriptions, None, &mut progress, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); assert!(bank_forks .read() @@ -2278,7 +2282,7 @@ pub(crate) mod tests { &bank_forks, &mut progress, &None, - &mut HashSet::new(), + &mut PubkeyReferences::default(), None, ); assert_eq!(bank_forks.read().unwrap().root(), root); @@ -2321,7 +2325,7 @@ pub(crate) mod tests { &bank_forks, &mut progress, &None, - &mut HashSet::new(), + &mut PubkeyReferences::default(), Some(confirmed_root), ); assert_eq!(bank_forks.read().unwrap().root(), root); @@ -2875,7 +2879,7 @@ pub(crate) mod tests { &VoteTracker::default(), &ClusterSlots::default(), &bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); assert_eq!(newly_computed, vec![0]); // The only vote is in bank 1, and bank_forks does not currently contain @@ -2916,7 +2920,7 @@ pub(crate) mod tests { &VoteTracker::default(), &ClusterSlots::default(), &bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); assert_eq!(newly_computed, vec![1]); @@ -2949,7 +2953,7 @@ pub(crate) mod tests { &VoteTracker::default(), &ClusterSlots::default(), &bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); // No new stats should have been computed assert!(newly_computed.is_empty()); @@ -2984,7 +2988,7 @@ pub(crate) mod tests { &VoteTracker::default(), &ClusterSlots::default(), &vote_simulator.bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); assert_eq!( @@ -3045,7 +3049,7 @@ pub(crate) mod tests { &VoteTracker::default(), &ClusterSlots::default(), &vote_simulator.bank_forks, - &mut HashSet::new(), + &mut PubkeyReferences::default(), ); frozen_banks.sort_by_key(|bank| bank.slot()); @@ -3166,7 +3170,7 @@ pub(crate) mod tests { ..PropagatedStats::default() }; - let mut all_pubkeys = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); let child_reached_threshold = false; for i in 0..std::cmp::max(new_vote_pubkeys.len(), new_node_pubkeys.len()) { propagated_stats.is_propagated = false; @@ -3239,7 +3243,7 @@ pub(crate) mod tests { ..PropagatedStats::default() }; propagated_stats.total_epoch_stake = stake * 10; - let mut all_pubkeys = HashSet::new(); + let mut all_pubkeys = PubkeyReferences::default(); let child_reached_threshold = true; let mut newly_voted_pubkeys: Vec> = vec![]; @@ -3259,7 +3263,7 @@ pub(crate) mod tests { ..PropagatedStats::default() }; propagated_stats.is_propagated = true; - all_pubkeys = HashSet::new(); + all_pubkeys = PubkeyReferences::default(); newly_voted_pubkeys = vec![]; assert!(!ReplayStage::update_slot_propagated_threshold_from_votes( &mut newly_voted_pubkeys, @@ -3343,7 +3347,7 @@ pub(crate) mod tests { ReplayStage::update_propagation_status( &mut progress_map, 10, - &mut HashSet::new(), + &mut PubkeyReferences::default(), &RwLock::new(bank_forks), &vote_tracker, &ClusterSlots::default(), @@ -3436,7 +3440,7 @@ pub(crate) mod tests { ReplayStage::update_propagation_status( &mut progress_map, 10, - &mut HashSet::new(), + &mut PubkeyReferences::default(), &RwLock::new(bank_forks), &vote_tracker, &ClusterSlots::default(), @@ -3526,7 +3530,7 @@ pub(crate) mod tests { ReplayStage::update_propagation_status( &mut progress_map, 10, - &mut HashSet::new(), + &mut PubkeyReferences::default(), &RwLock::new(bank_forks), &vote_tracker, &ClusterSlots::default(),