From 7a50e3e195c1f79565a4fe6ade183dc9ec2195a6 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 11 Sep 2024 15:28:06 -0400 Subject: [PATCH 1/3] banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888) * banking_stage: do not insert legacy vote ixs, refactor & unstaked * pr feedback: use matches instead of separate fn (cherry picked from commit 1334fb5248390dfdd193feeec5fa8f86763668fc) # Conflicts: # core/src/banking_stage/latest_unprocessed_votes.rs --- core/src/banking_stage.rs | 10 +- core/src/banking_stage/forwarder.rs | 3 + .../banking_stage/latest_unprocessed_votes.rs | 332 +++++++++++++++++- .../unprocessed_transaction_storage.rs | 46 ++- 4 files changed, 367 insertions(+), 24 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0cf3cace2f82c0..9263163ee3ced5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -430,7 +430,10 @@ impl BankingStage { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( @@ -512,7 +515,10 @@ impl BankingStage { // Once an entry has been recorded, its blockhash is registered with the bank. let data_budget = Arc::new(DataBudget::default()); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index acb34b8b4dc1e9..7a8dd4afd1db3b 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -103,6 +103,9 @@ impl Forwarder { // load all accounts from address loader; let current_bank = self.bank_forks.read().unwrap().working_bank(); + // if we have crossed an epoch boundary, recache any state + unprocessed_transaction_storage.cache_epoch_boundary_info(¤t_bank); + // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something // already processed), then add to forwarding buffer. let filter_forwarding_result = unprocessed_transaction_storage diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 6e9f20441d9efb..29a42198da69cc 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -9,6 +9,11 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Slot, UnixTimestamp}, +<<<<<<< HEAD +======= + feature_set::{self}, + hash::Hash, +>>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) program_utils::limited_deserialize, pubkey::Pubkey, }, @@ -18,7 +23,7 @@ use { collections::HashMap, ops::DerefMut, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, }, @@ -42,18 +47,23 @@ pub struct LatestValidatorVotePacket { } impl LatestValidatorVotePacket { - pub fn new(packet: Packet, vote_source: VoteSource) -> Result { + pub fn new( + packet: Packet, + vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, + ) -> Result { if !packet.meta().is_simple_vote_tx() { return Err(DeserializedPacketError::VoteTransactionError); } let vote = Arc::new(ImmutableDeserializedPacket::new(packet)?); - Self::new_from_immutable(vote, vote_source) + Self::new_from_immutable(vote, vote_source, deprecate_legacy_vote_ixs) } pub fn new_from_immutable( vote: Arc, vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, ) -> Result { let message = vote.transaction().get_message(); let (_, instruction) = message @@ -61,9 +71,20 @@ impl LatestValidatorVotePacket { .next() .ok_or(DeserializedPacketError::VoteTransactionError)?; + let instruction_filter = |ix: &VoteInstruction| { + if deprecate_legacy_vote_ixs { + matches!( + ix, + VoteInstruction::TowerSync(_) | VoteInstruction::TowerSyncSwitch(_, _), + ) + } else { + ix.is_single_vote_state_update() + } + }; + match limited_deserialize::(&instruction.data) { Ok(vote_state_update_instruction) - if vote_state_update_instruction.is_single_vote_state_update() => + if instruction_filter(&vote_state_update_instruction) => { let &pubkey = message .message @@ -116,6 +137,7 @@ impl LatestValidatorVotePacket { } } +<<<<<<< HEAD // TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+ // This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross // version https://github.com/dalek-cryptography/ed25519-dalek/pull/214 @@ -139,6 +161,8 @@ pub(crate) fn weighted_random_order_by_stake<'a>( pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) } +======= +>>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) #[derive(Default, Debug)] pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, @@ -149,11 +173,23 @@ pub(crate) struct VoteBatchInsertionMetrics { pub struct LatestUnprocessedVotes { latest_votes_per_pubkey: RwLock>>>, num_unprocessed_votes: AtomicUsize, + // These are only ever written to by the tpu vote thread + cached_staked_nodes: RwLock>>, + deprecate_legacy_vote_ixs: AtomicBool, + current_epoch: AtomicU64, } impl LatestUnprocessedVotes { - pub fn new() -> Self { - Self::default() + pub fn new(bank: &Bank) -> Self { + let deprecate_legacy_vote_ixs = bank + .feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()); + Self { + cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()), + current_epoch: AtomicU64::new(bank.epoch()), + deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs), + ..Self::default() + } } pub fn len(&self) -> usize { @@ -164,6 +200,17 @@ impl LatestUnprocessedVotes { self.len() == 0 } + fn filter_unstaked_votes<'a>( + &'a self, + votes: impl Iterator + 'a, + ) -> impl Iterator + 'a { + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + votes.filter(move |vote| { + let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0); + stake > 0 + }) + } + pub(crate) fn insert_batch( &self, votes: impl Iterator, @@ -172,7 +219,7 @@ impl LatestUnprocessedVotes { let mut num_dropped_gossip = 0; let mut num_dropped_tpu = 0; - for vote in votes { + for vote in self.filter_unstaked_votes(votes) { if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) { match vote.vote_source { VoteSource::Gossip => num_dropped_gossip += 1, @@ -283,6 +330,48 @@ impl LatestUnprocessedVotes { .and_then(|l| l.read().unwrap().timestamp()) } + #[cfg(test)] + pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) { + let staked_nodes: HashMap = + staked_nodes.iter().map(|pk| (*pk, 1u64)).collect(); + *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes); + } + + fn weighted_random_order_by_stake(&self) -> impl Iterator { + // Efraimidis and Spirakis algo for weighted random sample without replacement + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap(); + let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey + .keys() + .filter_map(|&pubkey| { + let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); + if stake == 0 { + None // Ignore votes from unstaked validators + } else { + Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) + } + }) + .collect::>(); + pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); + pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) + } + + /// Recache the staked nodes based on a bank from the new epoch. + /// This should only be run by the TPU vote thread + pub(super) fn cache_epoch_boundary_info(&self, bank: &Bank) { + if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) { + return; + } + let mut staked_nodes = self.cached_staked_nodes.write().unwrap(); + *staked_nodes = bank.current_epoch_staked_nodes().clone(); + self.current_epoch.store(bank.epoch(), Ordering::Relaxed); + self.deprecate_legacy_vote_ixs.store( + bank.feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()), + Ordering::Relaxed, + ); + } + /// Returns how many packets were forwardable /// Performs a weighted random order based on stake and stops forwarding at the first error /// Votes from validators with 0 stakes are ignored @@ -291,6 +380,7 @@ impl LatestUnprocessedVotes { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> usize { +<<<<<<< HEAD let mut continue_forwarding = true; let pubkeys_by_stake = weighted_random_order_by_stake( &bank, @@ -334,10 +424,54 @@ impl LatestUnprocessedVotes { false }) .count() +======= + let pubkeys_by_stake = self.weighted_random_order_by_stake(); + let mut forwarded_count: usize = 0; + for pubkey in pubkeys_by_stake { + let Some(vote) = self.get_entry(pubkey) else { + continue; + }; + + let mut vote = vote.write().unwrap(); + if vote.is_vote_taken() || vote.is_forwarded() { + continue; + } + + let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone(); + let Some(sanitized_vote_transaction) = deserialized_vote_packet + .build_sanitized_transaction( + bank.vote_only_bank(), + bank.as_ref(), + bank.get_reserved_account_keys(), + ) + else { + continue; + }; + + let forwarding_successful = forward_packet_batches_by_accounts.try_add_packet( + &sanitized_vote_transaction, + deserialized_vote_packet, + &bank.feature_set, + ); + + if !forwarding_successful { + // To match behavior of regular transactions we stop forwarding votes as soon as one + // fails. We are assuming that failure (try_add_packet) means no more space + // available. + break; + } + + vote.forwarded = true; + forwarded_count += 1; + } + + forwarded_count +>>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) } /// Drains all votes yet to be processed sorted by a weighted random ordering by stake pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { +<<<<<<< HEAD let pubkeys_by_stake = weighted_random_order_by_stake( &bank, self.latest_votes_per_pubkey.read().unwrap().keys(), @@ -345,6 +479,20 @@ impl LatestUnprocessedVotes { .collect_vec(); pubkeys_by_stake .into_iter() +======= + let slot_hashes = bank + .get_account(&sysvar::slot_hashes::id()) + .and_then(|account| from_account::(&account)); + if slot_hashes.is_none() { + error!( + "Slot hashes sysvar doesn't exist on bank {}. Including all votes without \ + filtering", + bank.slot() + ); + } + + self.weighted_random_order_by_stake() +>>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) .filter_map(|pubkey| { self.get_entry(pubkey).and_then(|lock| { let mut latest_vote = lock.write().unwrap(); @@ -372,6 +520,10 @@ impl LatestUnprocessedVotes { } }); } + + pub(super) fn should_deprecate_legacy_vote_ixs(&self) -> bool { + self.deprecate_legacy_vote_ixs.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -385,7 +537,10 @@ mod tests { bank::Bank, genesis_utils::{self, ValidatorVoteKeypairs}, }, - solana_sdk::{hash::Hash, signature::Signer, system_transaction::transfer}, + solana_sdk::{ + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::GenesisConfig, hash::Hash, + signature::Signer, system_transaction::transfer, + }, solana_vote_program::{ vote_state::TowerSync, vote_transaction::{new_tower_sync_transaction, new_vote_transaction}, @@ -414,7 +569,7 @@ mod tests { .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); - LatestValidatorVotePacket::new(packet, vote_source).unwrap() + LatestValidatorVotePacket::new(packet, vote_source, true).unwrap() } fn deserialize_packets<'a>( @@ -423,7 +578,8 @@ mod tests { vote_source: VoteSource, ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { - LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() + LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source, true) + .ok() }) } @@ -540,9 +696,13 @@ mod tests { #[test] fn test_update_latest_vote() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None); let vote_b = from_slots( @@ -743,7 +903,7 @@ mod tests { fn test_update_latest_vote_race() { // There was a race condition in updating the same pubkey in the hashmap // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); const NUM_VOTES: usize = 100; let keypairs = Arc::new( @@ -751,6 +911,11 @@ mod tests { .map(|_| ValidatorVoteKeypairs::new_rand()) .collect_vec(), ); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); // Insert votes in parallel let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, @@ -782,7 +947,7 @@ mod tests { #[test] fn test_simulate_threads() { - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let keypairs = Arc::new( (0..10) @@ -790,6 +955,11 @@ mod tests { .collect_vec(), ); let keypairs_tpu = keypairs.clone(); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); let vote_limit = 1000; let gossip = Builder::new() @@ -845,8 +1015,24 @@ mod tests { #[test] fn test_forwardable_packets() { +<<<<<<< HEAD let latest_unprocessed_votes = LatestUnprocessedVotes::new(); let bank = Arc::new(Bank::default_for_tests()); +======= + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let mut bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); + bank.set_epoch_stakes_for_test( + bank.epoch().saturating_add(2), + EpochStakes::new_for_tests(HashMap::new(), bank.epoch().saturating_add(2)), + ); + let bank = Arc::new(bank); +>>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -858,7 +1044,8 @@ mod tests { latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); - // Don't forward 0 stake accounts + // Recache on epoch boundary and don't forward 0 stake accounts + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes .get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts); assert_eq!(0, forwarded); @@ -876,11 +1063,17 @@ mod tests { 200, ) .genesis_config; - let bank = Bank::new_for_tests(&config); + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 2 * MINIMUM_SLOTS_PER_EPOCH, + ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Don't forward votes from gossip + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( Arc::new(bank), &mut forward_packet_batches_by_accounts, @@ -901,11 +1094,17 @@ mod tests { 200, ) .genesis_config; - let bank = Arc::new(Bank::new_for_tests(&config)); + let bank_0 = Bank::new_for_tests(&config); + let bank = Arc::new(Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + )); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Forward from TPU + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( bank.clone(), &mut forward_packet_batches_by_accounts, @@ -938,11 +1137,17 @@ mod tests { #[test] fn test_clear_forwarded_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); let keypair_c = ValidatorVoteKeypairs::new_rand(); let keypair_d = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + keypair_c.node_keypair.pubkey(), + keypair_d.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); let mut vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); @@ -976,4 +1181,97 @@ mod tests { latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey()) ); } + + #[test] + fn test_insert_batch_unstaked() { + let keypair_a = ValidatorVoteKeypairs::new_rand(); + let keypair_b = ValidatorVoteKeypairs::new_rand(); + let keypair_c = ValidatorVoteKeypairs::new_rand(); + let keypair_d = ValidatorVoteKeypairs::new_rand(); + + let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); + let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); + let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None); + let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None); + let votes = [ + vote_a.clone(), + vote_b.clone(), + vote_c.clone(), + vote_d.clone(), + ] + .into_iter(); + + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let latest_unprocessed_votes = LatestUnprocessedVotes::new(&bank_0); + + // Insert batch should filter out all votes as they are unstaked + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in same epoch should not update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_a.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH - 1, + ); + assert_eq!(bank.epoch(), 0); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in next epoch should update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_b.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 1); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + + // Previously unstaked votes are not (yet) removed + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_c.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 2); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 2); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()), + Some(vote_c.slot()) + ); + } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 7a529bd457856c..7c019993aed012 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -412,6 +412,13 @@ impl UnprocessedTransactionStorage { ), } } + + pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + match self { + Self::LocalTransactionStorage(_) => (), + Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), + } + } } impl VoteStorage { @@ -449,6 +456,8 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( Arc::new(deserialized_packet), self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), ) .ok() }), @@ -512,6 +521,10 @@ impl VoteStorage { should_process_packet, ); + let deprecate_legacy_vote_ixs = self + .latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(); + while let Some((packets, payload)) = scanner.iterate() { let vote_packets = packets.iter().map(|p| (*p).clone()).collect_vec(); @@ -521,6 +534,7 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( vote_packets[*i].clone(), self.vote_source, + deprecate_legacy_vote_ixs, ) .ok() }), @@ -529,7 +543,12 @@ impl VoteStorage { } else { self.latest_unprocessed_votes.insert_batch( vote_packets.into_iter().filter_map(|packet| { - LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() + LatestValidatorVotePacket::new_from_immutable( + packet, + self.vote_source, + deprecate_legacy_vote_ixs, + ) + .ok() }), true, // should_replenish_taken_votes ); @@ -538,6 +557,14 @@ impl VoteStorage { scanner.finalize().payload.reached_end_of_slot } + + fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + if matches!(self.vote_source, VoteSource::Gossip) { + panic!("Gossip vote thread should not be checking epoch boundary"); + } + self.latest_unprocessed_votes + .cache_epoch_boundary_info(bank); + } } impl ThreadLocalUnprocessedPackets { @@ -1255,9 +1282,16 @@ mod tests { assert!(deserialized_packets.contains(&big_transfer)); } - for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { + for (vote_source, staked) in [VoteSource::Gossip, VoteSource::Tpu] + .into_iter() + .flat_map(|vs| [(vs, true), (vs, false)]) + { + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + if staked { + latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]); + } let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), vote_source, ); transaction_storage.insert_batch(vec![ @@ -1265,7 +1299,7 @@ mod tests { ImmutableDeserializedPacket::new(vote.clone())?, ImmutableDeserializedPacket::new(big_transfer.clone())?, ]); - assert_eq!(1, transaction_storage.len()); + assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len()); } Ok(()) } @@ -1291,8 +1325,10 @@ mod tests { )?; vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), VoteSource::Tpu, ); From 179c9463d3dc39039b2c97fcb1320a61697891c4 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 11 Sep 2024 19:56:16 +0000 Subject: [PATCH 2/3] fix conflicts --- .../banking_stage/latest_unprocessed_votes.rs | 114 +----------------- runtime/src/bank.rs | 14 +++ sdk/src/feature_set.rs | 5 + 3 files changed, 22 insertions(+), 111 deletions(-) diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 29a42198da69cc..fc6036bee67701 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -9,11 +9,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Slot, UnixTimestamp}, -<<<<<<< HEAD -======= feature_set::{self}, - hash::Hash, ->>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) program_utils::limited_deserialize, pubkey::Pubkey, }, @@ -137,32 +133,6 @@ impl LatestValidatorVotePacket { } } -<<<<<<< HEAD -// TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+ -// This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross -// version https://github.com/dalek-cryptography/ed25519-dalek/pull/214 -pub(crate) fn weighted_random_order_by_stake<'a>( - bank: &Bank, - pubkeys: impl Iterator, -) -> impl Iterator { - // Efraimidis and Spirakis algo for weighted random sample without replacement - let staked_nodes = bank.staked_nodes(); - let mut pubkey_with_weight: Vec<(f64, Pubkey)> = pubkeys - .filter_map(|&pubkey| { - let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); - if stake == 0 { - None // Ignore votes from unstaked validators - } else { - Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) - } - }) - .collect::>(); - pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); - pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) -} - -======= ->>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) #[derive(Default, Debug)] pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, @@ -380,13 +350,8 @@ impl LatestUnprocessedVotes { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> usize { -<<<<<<< HEAD let mut continue_forwarding = true; - let pubkeys_by_stake = weighted_random_order_by_stake( - &bank, - self.latest_votes_per_pubkey.read().unwrap().keys(), - ) - .collect_vec(); + let pubkeys_by_stake = self.weighted_random_order_by_stake(); pubkeys_by_stake .into_iter() .filter(|&pubkey| { @@ -424,75 +389,11 @@ impl LatestUnprocessedVotes { false }) .count() -======= - let pubkeys_by_stake = self.weighted_random_order_by_stake(); - let mut forwarded_count: usize = 0; - for pubkey in pubkeys_by_stake { - let Some(vote) = self.get_entry(pubkey) else { - continue; - }; - - let mut vote = vote.write().unwrap(); - if vote.is_vote_taken() || vote.is_forwarded() { - continue; - } - - let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone(); - let Some(sanitized_vote_transaction) = deserialized_vote_packet - .build_sanitized_transaction( - bank.vote_only_bank(), - bank.as_ref(), - bank.get_reserved_account_keys(), - ) - else { - continue; - }; - - let forwarding_successful = forward_packet_batches_by_accounts.try_add_packet( - &sanitized_vote_transaction, - deserialized_vote_packet, - &bank.feature_set, - ); - - if !forwarding_successful { - // To match behavior of regular transactions we stop forwarding votes as soon as one - // fails. We are assuming that failure (try_add_packet) means no more space - // available. - break; - } - - vote.forwarded = true; - forwarded_count += 1; - } - - forwarded_count ->>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) } /// Drains all votes yet to be processed sorted by a weighted random ordering by stake - pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { -<<<<<<< HEAD - let pubkeys_by_stake = weighted_random_order_by_stake( - &bank, - self.latest_votes_per_pubkey.read().unwrap().keys(), - ) - .collect_vec(); - pubkeys_by_stake - .into_iter() -======= - let slot_hashes = bank - .get_account(&sysvar::slot_hashes::id()) - .and_then(|account| from_account::(&account)); - if slot_hashes.is_none() { - error!( - "Slot hashes sysvar doesn't exist on bank {}. Including all votes without \ - filtering", - bank.slot() - ); - } - + pub fn drain_unprocessed(&self, _bank: Arc) -> Vec> { self.weighted_random_order_by_stake() ->>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) .filter_map(|pubkey| { self.get_entry(pubkey).and_then(|lock| { let mut latest_vote = lock.write().unwrap(); @@ -1015,24 +916,15 @@ mod tests { #[test] fn test_forwardable_packets() { -<<<<<<< HEAD - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); - let bank = Arc::new(Bank::default_for_tests()); -======= let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); - let mut bank = Bank::new_from_parent( + let bank = Bank::new_from_parent( Arc::new(bank_0), &Pubkey::new_unique(), MINIMUM_SLOTS_PER_EPOCH, ); assert_eq!(bank.epoch(), 1); - bank.set_epoch_stakes_for_test( - bank.epoch().saturating_add(2), - EpochStakes::new_for_tests(HashMap::new(), bank.epoch().saturating_add(2)), - ); let bank = Arc::new(bank); ->>>>>>> 1334fb5248 (banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)) let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4efac319401a1c..e00bf939a30022 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6108,6 +6108,15 @@ impl Bank { Some(vote_account.clone()) } + /// Get the EpochStakes for the current Bank::epoch + pub fn current_epoch_stakes(&self) -> &EpochStakes { + // The stakes for a given epoch (E) in self.epoch_stakes are keyed by leader schedule epoch + // (E + 1) so the stakes for the current epoch are stored at self.epoch_stakes[E + 1] + self.epoch_stakes + .get(&self.epoch.saturating_add(1)) + .expect("Current epoch stakes must exist") + } + /// Get the EpochStakes for a given epoch pub fn epoch_stakes(&self, epoch: Epoch) -> Option<&EpochStakes> { self.epoch_stakes.get(&epoch) @@ -6117,6 +6126,11 @@ impl Bank { &self.epoch_stakes } + /// Get the staked nodes map for the current Bank::epoch + pub fn current_epoch_staked_nodes(&self) -> Arc> { + self.current_epoch_stakes().stakes().staked_nodes() + } + pub fn epoch_staked_nodes(&self, epoch: Epoch) -> Option>> { Some(self.epoch_stakes.get(&epoch)?.stakes().staked_nodes()) } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index ffcaae65eb5857..b934c3df04e357 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -853,6 +853,10 @@ pub mod enable_turbine_extended_fanout_experiments { solana_sdk::declare_id!("BZn14Liea52wtBwrXUxTv6vojuTTmfc7XGEDTXrvMD7b"); } +pub mod deprecate_legacy_vote_ixs { + solana_program::declare_id!("depVvnQ2UysGrhwdiwU42tCadZL8GcBb1i2GYhMopQv"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -1061,6 +1065,7 @@ lazy_static! { (vote_only_retransmitter_signed_fec_sets::id(), "vote only on retransmitter signed fec sets"), (partitioned_epoch_rewards_superfeature::id(), "replaces enable_partitioned_epoch_reward to enable partitioned rewards at epoch boundary SIMD-0118"), (enable_turbine_extended_fanout_experiments::id(), "enable turbine extended fanout experiments #2373"), + (deprecate_legacy_vote_ixs::id(), "Deprecate legacy vote instructions"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() From d6c3c5328609a5a6aebfb693d47002fac6b459b3 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 12 Sep 2024 16:58:51 +0000 Subject: [PATCH 3/3] rekey feature to indicate it must not be activated --- sdk/src/feature_set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index b934c3df04e357..6c906a9f7120e9 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -854,7 +854,7 @@ pub mod enable_turbine_extended_fanout_experiments { } pub mod deprecate_legacy_vote_ixs { - solana_program::declare_id!("depVvnQ2UysGrhwdiwU42tCadZL8GcBb1i2GYhMopQv"); + solana_program::declare_id!("mustrekeyysGrhwdiwU42tCadZL8GcBb1i2GYhMopQv"); } lazy_static! {