From def3bc4c4f551db73095e49faf44433507bc296b Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 20 Dec 2023 10:20:30 -0500 Subject: [PATCH] Add feature flag for LastIndex and Erasure duplicate proofs (#34360) * Add feature flag for LastIndex and Erasure duplicate proofs * pr feedback: use root bank instead of 2 params * pr feedback: & instead of &Arc * pr feedback: reuse fn, remove redundant clones * rebase: fix feature set conflict --- core/src/window_service.rs | 43 +++++++++++++++++++++++++++++++++--- sdk/src/feature_set.rs | 7 +++++- turbine/src/cluster_nodes.rs | 2 +- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a68a20e2078471..a36692cc7ef3ab 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,6 +1,7 @@ //! `window_service` handles the data plane incoming shreds, storing them in //! blockstore and retransmitting where required //! + use { crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, @@ -28,7 +29,12 @@ use { solana_metrics::inc_new_counter_error, solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, - solana_sdk::clock::Slot, + solana_runtime::bank_forks::BankForks, + solana_sdk::{ + clock::{Slot, DEFAULT_MS_PER_SLOT}, + feature_set, + }, + solana_turbine::cluster_nodes, std::{ cmp::Reverse, collections::{HashMap, HashSet}, @@ -142,12 +148,31 @@ fn run_check_duplicate( blockstore: &Blockstore, shred_receiver: &Receiver, duplicate_slots_sender: &DuplicateSlotSender, + bank_forks: &RwLock, ) -> Result<()> { + let mut root_bank = bank_forks.read().unwrap().root_bank(); + let mut last_updated = Instant::now(); let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> { + if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { + // Grabs bank forks lock once a slot + last_updated = Instant::now(); + root_bank = bank_forks.read().unwrap().root_bank(); + } let shred_slot = shred.slot(); + let send_index_and_erasure_conflicts = cluster_nodes::check_feature_activation( + &feature_set::index_erasure_conflict_duplicate_proofs::id(), + shred_slot, + &root_bank, + ); let (shred1, shred2) = match shred { - PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict), - PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::LastIndexConflict(shred, conflict) + | PossibleDuplicateShred::ErasureConflict(shred, conflict) => { + if send_index_and_erasure_conflicts { + (shred, conflict) + } else { + return Ok(()); + } + } PossibleDuplicateShred::Exists(shred) => { // Unlike the other cases we have to wait until here to decide to handle the duplicate and store // in blockstore. This is because the duplicate could have been part of the same insert batch, @@ -342,6 +367,7 @@ impl WindowService { let outstanding_requests = Arc::>::default(); let cluster_info = repair_info.cluster_info.clone(); + let bank_forks = repair_info.bank_forks.clone(); let repair_service = RepairService::new( blockstore.clone(), @@ -366,6 +392,7 @@ impl WindowService { blockstore.clone(), duplicate_receiver, duplicate_slots_sender, + bank_forks, ); let t_insert = Self::start_window_insert_thread( @@ -392,6 +419,7 @@ impl WindowService { blockstore: Arc, duplicate_receiver: Receiver, duplicate_slots_sender: DuplicateSlotSender, + bank_forks: Arc>, ) -> JoinHandle<()> { let handle_error = || { inc_new_counter_error!("solana-check-duplicate-error", 1, 1); @@ -405,6 +433,7 @@ impl WindowService { &blockstore, &duplicate_receiver, &duplicate_slots_sender, + &bank_forks, ) { if Self::should_exit_on_error(e, &handle_error) { break; @@ -507,9 +536,11 @@ mod test { solana_gossip::contact_info::ContactInfo, solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, + genesis_utils::create_genesis_config, get_tmp_ledger_path_auto_delete, shred::{ProcessShredsStats, Shredder}, }, + solana_runtime::bank::Bank, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -556,6 +587,8 @@ mod test { #[test] fn test_run_check_duplicate() { let ledger_path = get_tmp_ledger_path_auto_delete!(); + let genesis_config = create_genesis_config(10_000).genesis_config; + let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config)); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let (sender, receiver) = unbounded(); let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); @@ -587,6 +620,7 @@ mod test { &blockstore, &receiver, &duplicate_slot_sender, + &bank_forks, ) .unwrap(); @@ -616,6 +650,8 @@ mod test { Arc::new(keypair), SocketAddrSpace::Unspecified, )); + let genesis_config = create_genesis_config(10_000).genesis_config; + let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config)); // Start duplicate thread receiving and inserting duplicates let t_check_duplicate = WindowService::start_check_duplicate_thread( @@ -624,6 +660,7 @@ mod test { blockstore.clone(), duplicate_shred_receiver, duplicate_slot_sender, + bank_forks, ); let handle_duplicate = |shred| { diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 637391cfa415cc..fc886c7fefd582 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -744,6 +744,10 @@ pub mod consume_blockstore_duplicate_proofs { solana_sdk::declare_id!("6YsBCejwK96GZCkJ6mkZ4b68oP63z2PLoQmWjC7ggTqZ"); } +pub mod index_erasure_conflict_duplicate_proofs { + solana_sdk::declare_id!("dupPajaLy2SSn8ko42aZz4mHANDNrLe8Nw8VQgFecLa"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -924,7 +928,8 @@ lazy_static! { (enable_zk_transfer_with_fee::id(), "enable Zk Token proof program transfer with fee"), (drop_legacy_shreds::id(), "drops legacy shreds #34328"), (allow_commission_decrease_at_any_time::id(), "Allow commission decrease at any time in epoch #33843"), - (consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372") + (consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372"), + (index_erasure_conflict_duplicate_proofs::id(), "generate duplicate proofs for index and erasure conflicts #34360"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 57676a34b75eff..124d6d8c99cca1 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -513,7 +513,7 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool // Returns true if the feature is effective for the shred slot. #[must_use] -fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool { +pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool { match root_bank.feature_set.activated_slot(feature) { None => false, Some(feature_slot) => {