From e654f4b799e330691cf4b4d7f0666e78b053d98d Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Mon, 1 Mar 2021 14:10:05 -0800 Subject: [PATCH] Skip leader slots until a vote lands --- core/src/consensus.rs | 2 + core/src/replay_stage.rs | 44 ++++++++++++++++++++ core/src/tvu.rs | 2 + core/src/validator.rs | 87 +++++++++++++++++++++++----------------- 4 files changed, 99 insertions(+), 36 deletions(-) diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 3642ceebd6ef9b..63602ff4a11949 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1423,6 +1423,8 @@ pub mod test { &AbsRequestSender::default(), None, &mut self.heaviest_subtree_fork_choice, + &mut true, + &[], ) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fc7a349d420615..c5ed24dc95de16 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -36,6 +36,7 @@ use solana_sdk::{ genesis_config::ClusterType, hash::Hash, pubkey::Pubkey, + signature::Signature, signature::{Keypair, Signer}, timing::timestamp, transaction::Transaction, @@ -104,6 +105,7 @@ pub struct ReplayStageConfig { pub rewards_recorder_sender: Option, pub cache_block_time_sender: Option, pub bank_notification_sender: Option, + pub wait_for_vote_to_start_leader: bool, } #[derive(Default)] @@ -264,6 +266,7 @@ impl ReplayStage { rewards_recorder_sender, cache_block_time_sender, bank_notification_sender, + wait_for_vote_to_start_leader, } = config; trace!("replay stage"); @@ -293,6 +296,8 @@ impl ReplayStage { let mut partition_exists = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); + let mut voted_signatures = Vec::new(); + let mut has_voted = !wait_for_vote_to_start_leader; loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -479,6 +484,8 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &cache_block_time_sender, &bank_notification_sender, + &mut voted_signatures, + &mut has_voted, ); }; voting_time.stop(); @@ -570,6 +577,7 @@ impl ReplayStage { &progress, &retransmit_slots_sender, &mut skipped_slots_info, + has_voted, ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -885,7 +893,12 @@ impl ReplayStage { progress_map: &ProgressMap, retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, + has_voted: bool, ) { + if !has_voted { + info!("Haven't landed a vote, so skipping my leader slot"); + return; + } // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1085,6 +1098,8 @@ impl ReplayStage { heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, cache_block_time_sender: &Option, bank_notification_sender: &Option, + vote_signatures: &mut Vec, + has_voted: &mut bool, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1137,6 +1152,8 @@ impl ReplayStage { accounts_background_request_sender, highest_confirmed_root, heaviest_subtree_fork_choice, + has_voted, + vote_signatures, ); subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { @@ -1166,6 +1183,8 @@ impl ReplayStage { last_vote, &tower_slots, switch_fork_decision, + vote_signatures, + *has_voted, ); } @@ -1177,6 +1196,8 @@ impl ReplayStage { vote: Vote, tower: &[Slot], switch_fork_decision: &SwitchForkDecision, + vote_signatures: &mut Vec, + has_voted: bool, ) { if authorized_voter_keypairs.is_empty() { return; @@ -1246,6 +1267,14 @@ impl ReplayStage { let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); + if !has_voted { + vote_signatures.push(vote_tx.signatures[0]); + if vote_signatures.len() > 200 { + vote_signatures.remove(0); + } + } else { + vote_signatures.clear(); + } let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); @@ -1846,6 +1875,8 @@ impl ReplayStage { accounts_background_request_sender: &AbsRequestSender, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + has_voted: &mut bool, + voted_signatures: &[Signature], ) { bank_forks.write().unwrap().set_root( new_root, @@ -1853,6 +1884,15 @@ impl ReplayStage { highest_confirmed_root, ); let r_bank_forks = bank_forks.read().unwrap(); + let new_root_bank = &r_bank_forks[new_root]; + if !*has_voted { + for signature in voted_signatures { + if new_root_bank.get_signature_status(signature).is_some() { + *has_voted = true; + break; + } + } + } progress.handle_new_root(&r_bank_forks); heaviest_subtree_fork_choice.set_root(new_root); } @@ -2260,6 +2300,8 @@ pub(crate) mod tests { &AbsRequestSender::default(), None, &mut heaviest_subtree_fork_choice, + &mut true, + &[], ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); @@ -2304,6 +2346,8 @@ pub(crate) mod tests { &AbsRequestSender::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, + &mut true, + &[], ); assert_eq!(bank_forks.read().unwrap().root(), root); assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c83614dbd2452e..22c4237e7bdb28 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -84,6 +84,7 @@ pub struct TvuConfig { pub use_index_hash_calculation: bool, pub rocksdb_compaction_interval: Option, pub rocksdb_max_compaction_jitter: Option, + pub wait_for_vote_to_start_leader: bool, } impl Tvu { @@ -254,6 +255,7 @@ impl Tvu { rewards_recorder_sender, cache_block_time_sender, bank_notification_sender, + wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, }; let replay_stage = ReplayStage::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index 1f9197705ae0e5..170035994acd7e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -627,15 +627,17 @@ impl Validator { check_poh_speed(&genesis_config, None); } - if wait_for_supermajority( + let (failed, did_wait) = wait_for_supermajority( config, &bank, &cluster_info, rpc_override_health_check, &start_progress, - ) { + ); + if failed { abort(); } + let wait_for_vote_to_start_leader = !did_wait; let poh_service = PohService::new( poh_recorder.clone(), @@ -720,6 +722,7 @@ impl Validator { use_index_hash_calculation: config.accounts_db_use_index_hash_calculation, rocksdb_compaction_interval: config.rocksdb_compaction_interval, rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, + wait_for_vote_to_start_leader, }, &max_slots, ); @@ -1293,10 +1296,10 @@ fn wait_for_supermajority( cluster_info: &ClusterInfo, rpc_override_health_check: Arc, start_progress: &Arc>, -) -> bool { +) -> (bool, bool) { if let Some(wait_for_supermajority) = config.wait_for_supermajority { match wait_for_supermajority.cmp(&bank.slot()) { - std::cmp::Ordering::Less => return false, + std::cmp::Ordering::Less => return (false, false), std::cmp::Ordering::Greater => { error!( "Ledger does not have enough data to wait for supermajority, \ @@ -1304,12 +1307,12 @@ fn wait_for_supermajority( bank.slot(), wait_for_supermajority ); - return true; + return (true, false); } _ => {} } } else { - return false; + return (false, false); } if let Some(expected_bank_hash) = config.expected_bank_hash { @@ -1319,7 +1322,7 @@ fn wait_for_supermajority( bank.hash(), expected_bank_hash ); - return true; + return (true, false); } } @@ -1344,7 +1347,7 @@ fn wait_for_supermajority( sleep(Duration::new(1, 0)); } rpc_override_health_check.store(false, Ordering::Relaxed); - false + (false, true) } fn report_target_features() { @@ -1629,45 +1632,57 @@ mod tests { let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); - assert!(!wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone(), - &start_progress, - )); + assert!( + !wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone(), + &start_progress, + ) + .0 + ); // bank=0, wait=1, should fail config.wait_for_supermajority = Some(1); - assert!(wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone(), - &start_progress, - )); + assert!( + wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone(), + &start_progress, + ) + .0 + ); // bank=1, wait=0, should pass, bank is past the wait slot let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1); config.wait_for_supermajority = Some(0); - assert!(!wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone(), - &start_progress, - )); + assert!( + !wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone(), + &start_progress, + ) + .0 + ); // bank=1, wait=1, equal, but bad hash provided config.wait_for_supermajority = Some(1); config.expected_bank_hash = Some(hash(&[1])); - assert!(wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check, - &start_progress, - )); + assert!( + wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check, + &start_progress, + ) + .0 + ); } #[test]