diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0cf3cace2f82c0..9263163ee3ced5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -430,7 +430,10 @@ impl BankingStage { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( @@ -512,7 +515,10 @@ impl BankingStage { // Once an entry has been recorded, its blockhash is registered with the bank. let data_budget = Arc::new(DataBudget::default()); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index acb34b8b4dc1e9..7a8dd4afd1db3b 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -103,6 +103,9 @@ impl Forwarder { // load all accounts from address loader; let current_bank = self.bank_forks.read().unwrap().working_bank(); + // if we have crossed an epoch boundary, recache any state + unprocessed_transaction_storage.cache_epoch_boundary_info(¤t_bank); + // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something // already processed), then add to forwarding buffer. let filter_forwarding_result = unprocessed_transaction_storage diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 6e9f20441d9efb..fc6036bee67701 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -9,6 +9,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Slot, UnixTimestamp}, + feature_set::{self}, program_utils::limited_deserialize, pubkey::Pubkey, }, @@ -18,7 +19,7 @@ use { collections::HashMap, ops::DerefMut, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, }, @@ -42,18 +43,23 @@ pub struct LatestValidatorVotePacket { } impl LatestValidatorVotePacket { - pub fn new(packet: Packet, vote_source: VoteSource) -> Result { + pub fn new( + packet: Packet, + vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, + ) -> Result { if !packet.meta().is_simple_vote_tx() { return Err(DeserializedPacketError::VoteTransactionError); } let vote = Arc::new(ImmutableDeserializedPacket::new(packet)?); - Self::new_from_immutable(vote, vote_source) + Self::new_from_immutable(vote, vote_source, deprecate_legacy_vote_ixs) } pub fn new_from_immutable( vote: Arc, vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, ) -> Result { let message = vote.transaction().get_message(); let (_, instruction) = message @@ -61,9 +67,20 @@ impl LatestValidatorVotePacket { .next() .ok_or(DeserializedPacketError::VoteTransactionError)?; + let instruction_filter = |ix: &VoteInstruction| { + if deprecate_legacy_vote_ixs { + matches!( + ix, + VoteInstruction::TowerSync(_) | VoteInstruction::TowerSyncSwitch(_, _), + ) + } else { + ix.is_single_vote_state_update() + } + }; + match limited_deserialize::(&instruction.data) { Ok(vote_state_update_instruction) - if vote_state_update_instruction.is_single_vote_state_update() => + if instruction_filter(&vote_state_update_instruction) => { let &pubkey = message .message @@ -116,29 +133,6 @@ impl LatestValidatorVotePacket { } } -// TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+ -// This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross -// version https://github.com/dalek-cryptography/ed25519-dalek/pull/214 -pub(crate) fn weighted_random_order_by_stake<'a>( - bank: &Bank, - pubkeys: impl Iterator, -) -> impl Iterator { - // Efraimidis and Spirakis algo for weighted random sample without replacement - let staked_nodes = bank.staked_nodes(); - let mut pubkey_with_weight: Vec<(f64, Pubkey)> = pubkeys - .filter_map(|&pubkey| { - let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); - if stake == 0 { - None // Ignore votes from unstaked validators - } else { - Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) - } - }) - .collect::>(); - pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); - pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) -} - #[derive(Default, Debug)] pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, @@ -149,11 +143,23 @@ pub(crate) struct VoteBatchInsertionMetrics { pub struct LatestUnprocessedVotes { latest_votes_per_pubkey: RwLock>>>, num_unprocessed_votes: AtomicUsize, + // These are only ever written to by the tpu vote thread + cached_staked_nodes: RwLock>>, + deprecate_legacy_vote_ixs: AtomicBool, + current_epoch: AtomicU64, } impl LatestUnprocessedVotes { - pub fn new() -> Self { - Self::default() + pub fn new(bank: &Bank) -> Self { + let deprecate_legacy_vote_ixs = bank + .feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()); + Self { + cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()), + current_epoch: AtomicU64::new(bank.epoch()), + deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs), + ..Self::default() + } } pub fn len(&self) -> usize { @@ -164,6 +170,17 @@ impl LatestUnprocessedVotes { self.len() == 0 } + fn filter_unstaked_votes<'a>( + &'a self, + votes: impl Iterator + 'a, + ) -> impl Iterator + 'a { + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + votes.filter(move |vote| { + let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0); + stake > 0 + }) + } + pub(crate) fn insert_batch( &self, votes: impl Iterator, @@ -172,7 +189,7 @@ impl LatestUnprocessedVotes { let mut num_dropped_gossip = 0; let mut num_dropped_tpu = 0; - for vote in votes { + for vote in self.filter_unstaked_votes(votes) { if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) { match vote.vote_source { VoteSource::Gossip => num_dropped_gossip += 1, @@ -283,6 +300,48 @@ impl LatestUnprocessedVotes { .and_then(|l| l.read().unwrap().timestamp()) } + #[cfg(test)] + pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) { + let staked_nodes: HashMap = + staked_nodes.iter().map(|pk| (*pk, 1u64)).collect(); + *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes); + } + + fn weighted_random_order_by_stake(&self) -> impl Iterator { + // Efraimidis and Spirakis algo for weighted random sample without replacement + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap(); + let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey + .keys() + .filter_map(|&pubkey| { + let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); + if stake == 0 { + None // Ignore votes from unstaked validators + } else { + Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) + } + }) + .collect::>(); + pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); + pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) + } + + /// Recache the staked nodes based on a bank from the new epoch. + /// This should only be run by the TPU vote thread + pub(super) fn cache_epoch_boundary_info(&self, bank: &Bank) { + if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) { + return; + } + let mut staked_nodes = self.cached_staked_nodes.write().unwrap(); + *staked_nodes = bank.current_epoch_staked_nodes().clone(); + self.current_epoch.store(bank.epoch(), Ordering::Relaxed); + self.deprecate_legacy_vote_ixs.store( + bank.feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()), + Ordering::Relaxed, + ); + } + /// Returns how many packets were forwardable /// Performs a weighted random order based on stake and stops forwarding at the first error /// Votes from validators with 0 stakes are ignored @@ -292,11 +351,7 @@ impl LatestUnprocessedVotes { forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> usize { let mut continue_forwarding = true; - let pubkeys_by_stake = weighted_random_order_by_stake( - &bank, - self.latest_votes_per_pubkey.read().unwrap().keys(), - ) - .collect_vec(); + let pubkeys_by_stake = self.weighted_random_order_by_stake(); pubkeys_by_stake .into_iter() .filter(|&pubkey| { @@ -337,14 +392,8 @@ impl LatestUnprocessedVotes { } /// Drains all votes yet to be processed sorted by a weighted random ordering by stake - pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { - let pubkeys_by_stake = weighted_random_order_by_stake( - &bank, - self.latest_votes_per_pubkey.read().unwrap().keys(), - ) - .collect_vec(); - pubkeys_by_stake - .into_iter() + pub fn drain_unprocessed(&self, _bank: Arc) -> Vec> { + self.weighted_random_order_by_stake() .filter_map(|pubkey| { self.get_entry(pubkey).and_then(|lock| { let mut latest_vote = lock.write().unwrap(); @@ -372,6 +421,10 @@ impl LatestUnprocessedVotes { } }); } + + pub(super) fn should_deprecate_legacy_vote_ixs(&self) -> bool { + self.deprecate_legacy_vote_ixs.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -385,7 +438,10 @@ mod tests { bank::Bank, genesis_utils::{self, ValidatorVoteKeypairs}, }, - solana_sdk::{hash::Hash, signature::Signer, system_transaction::transfer}, + solana_sdk::{ + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::GenesisConfig, hash::Hash, + signature::Signer, system_transaction::transfer, + }, solana_vote_program::{ vote_state::TowerSync, vote_transaction::{new_tower_sync_transaction, new_vote_transaction}, @@ -414,7 +470,7 @@ mod tests { .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); - LatestValidatorVotePacket::new(packet, vote_source).unwrap() + LatestValidatorVotePacket::new(packet, vote_source, true).unwrap() } fn deserialize_packets<'a>( @@ -423,7 +479,8 @@ mod tests { vote_source: VoteSource, ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { - LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() + LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source, true) + .ok() }) } @@ -540,9 +597,13 @@ mod tests { #[test] fn test_update_latest_vote() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None); let vote_b = from_slots( @@ -743,7 +804,7 @@ mod tests { fn test_update_latest_vote_race() { // There was a race condition in updating the same pubkey in the hashmap // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); const NUM_VOTES: usize = 100; let keypairs = Arc::new( @@ -751,6 +812,11 @@ mod tests { .map(|_| ValidatorVoteKeypairs::new_rand()) .collect_vec(), ); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); // Insert votes in parallel let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, @@ -782,7 +848,7 @@ mod tests { #[test] fn test_simulate_threads() { - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let keypairs = Arc::new( (0..10) @@ -790,6 +856,11 @@ mod tests { .collect_vec(), ); let keypairs_tpu = keypairs.clone(); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); let vote_limit = 1000; let gossip = Builder::new() @@ -845,8 +916,15 @@ mod tests { #[test] fn test_forwardable_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); - let bank = Arc::new(Bank::default_for_tests()); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); + let bank = Arc::new(bank); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -858,7 +936,8 @@ mod tests { latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); - // Don't forward 0 stake accounts + // Recache on epoch boundary and don't forward 0 stake accounts + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes .get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts); assert_eq!(0, forwarded); @@ -876,11 +955,17 @@ mod tests { 200, ) .genesis_config; - let bank = Bank::new_for_tests(&config); + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 2 * MINIMUM_SLOTS_PER_EPOCH, + ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Don't forward votes from gossip + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( Arc::new(bank), &mut forward_packet_batches_by_accounts, @@ -901,11 +986,17 @@ mod tests { 200, ) .genesis_config; - let bank = Arc::new(Bank::new_for_tests(&config)); + let bank_0 = Bank::new_for_tests(&config); + let bank = Arc::new(Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + )); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Forward from TPU + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( bank.clone(), &mut forward_packet_batches_by_accounts, @@ -938,11 +1029,17 @@ mod tests { #[test] fn test_clear_forwarded_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); let keypair_c = ValidatorVoteKeypairs::new_rand(); let keypair_d = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + keypair_c.node_keypair.pubkey(), + keypair_d.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); let mut vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); @@ -976,4 +1073,97 @@ mod tests { latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey()) ); } + + #[test] + fn test_insert_batch_unstaked() { + let keypair_a = ValidatorVoteKeypairs::new_rand(); + let keypair_b = ValidatorVoteKeypairs::new_rand(); + let keypair_c = ValidatorVoteKeypairs::new_rand(); + let keypair_d = ValidatorVoteKeypairs::new_rand(); + + let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); + let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); + let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None); + let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None); + let votes = [ + vote_a.clone(), + vote_b.clone(), + vote_c.clone(), + vote_d.clone(), + ] + .into_iter(); + + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let latest_unprocessed_votes = LatestUnprocessedVotes::new(&bank_0); + + // Insert batch should filter out all votes as they are unstaked + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in same epoch should not update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_a.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH - 1, + ); + assert_eq!(bank.epoch(), 0); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in next epoch should update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_b.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 1); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + + // Previously unstaked votes are not (yet) removed + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_c.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 2); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 2); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()), + Some(vote_c.slot()) + ); + } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 7a529bd457856c..7c019993aed012 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -412,6 +412,13 @@ impl UnprocessedTransactionStorage { ), } } + + pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + match self { + Self::LocalTransactionStorage(_) => (), + Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), + } + } } impl VoteStorage { @@ -449,6 +456,8 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( Arc::new(deserialized_packet), self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), ) .ok() }), @@ -512,6 +521,10 @@ impl VoteStorage { should_process_packet, ); + let deprecate_legacy_vote_ixs = self + .latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(); + while let Some((packets, payload)) = scanner.iterate() { let vote_packets = packets.iter().map(|p| (*p).clone()).collect_vec(); @@ -521,6 +534,7 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( vote_packets[*i].clone(), self.vote_source, + deprecate_legacy_vote_ixs, ) .ok() }), @@ -529,7 +543,12 @@ impl VoteStorage { } else { self.latest_unprocessed_votes.insert_batch( vote_packets.into_iter().filter_map(|packet| { - LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() + LatestValidatorVotePacket::new_from_immutable( + packet, + self.vote_source, + deprecate_legacy_vote_ixs, + ) + .ok() }), true, // should_replenish_taken_votes ); @@ -538,6 +557,14 @@ impl VoteStorage { scanner.finalize().payload.reached_end_of_slot } + + fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + if matches!(self.vote_source, VoteSource::Gossip) { + panic!("Gossip vote thread should not be checking epoch boundary"); + } + self.latest_unprocessed_votes + .cache_epoch_boundary_info(bank); + } } impl ThreadLocalUnprocessedPackets { @@ -1255,9 +1282,16 @@ mod tests { assert!(deserialized_packets.contains(&big_transfer)); } - for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { + for (vote_source, staked) in [VoteSource::Gossip, VoteSource::Tpu] + .into_iter() + .flat_map(|vs| [(vs, true), (vs, false)]) + { + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + if staked { + latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]); + } let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), vote_source, ); transaction_storage.insert_batch(vec![ @@ -1265,7 +1299,7 @@ mod tests { ImmutableDeserializedPacket::new(vote.clone())?, ImmutableDeserializedPacket::new(big_transfer.clone())?, ]); - assert_eq!(1, transaction_storage.len()); + assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len()); } Ok(()) } @@ -1291,8 +1325,10 @@ mod tests { )?; vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), VoteSource::Tpu, ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4efac319401a1c..e00bf939a30022 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6108,6 +6108,15 @@ impl Bank { Some(vote_account.clone()) } + /// Get the EpochStakes for the current Bank::epoch + pub fn current_epoch_stakes(&self) -> &EpochStakes { + // The stakes for a given epoch (E) in self.epoch_stakes are keyed by leader schedule epoch + // (E + 1) so the stakes for the current epoch are stored at self.epoch_stakes[E + 1] + self.epoch_stakes + .get(&self.epoch.saturating_add(1)) + .expect("Current epoch stakes must exist") + } + /// Get the EpochStakes for a given epoch pub fn epoch_stakes(&self, epoch: Epoch) -> Option<&EpochStakes> { self.epoch_stakes.get(&epoch) @@ -6117,6 +6126,11 @@ impl Bank { &self.epoch_stakes } + /// Get the staked nodes map for the current Bank::epoch + pub fn current_epoch_staked_nodes(&self) -> Arc> { + self.current_epoch_stakes().stakes().staked_nodes() + } + pub fn epoch_staked_nodes(&self, epoch: Epoch) -> Option>> { Some(self.epoch_stakes.get(&epoch)?.stakes().staked_nodes()) } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index ffcaae65eb5857..6c906a9f7120e9 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -853,6 +853,10 @@ pub mod enable_turbine_extended_fanout_experiments { solana_sdk::declare_id!("BZn14Liea52wtBwrXUxTv6vojuTTmfc7XGEDTXrvMD7b"); } +pub mod deprecate_legacy_vote_ixs { + solana_program::declare_id!("mustrekeyysGrhwdiwU42tCadZL8GcBb1i2GYhMopQv"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -1061,6 +1065,7 @@ lazy_static! { (vote_only_retransmitter_signed_fec_sets::id(), "vote only on retransmitter signed fec sets"), (partitioned_epoch_rewards_superfeature::id(), "replaces enable_partitioned_epoch_reward to enable partitioned rewards at epoch boundary SIMD-0118"), (enable_turbine_extended_fanout_experiments::id(), "enable turbine extended fanout experiments #2373"), + (deprecate_legacy_vote_ixs::id(), "Deprecate legacy vote instructions"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()