diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 9efb151a6078ed..9234994104707b 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1419,6 +1419,8 @@ pub mod test { &AbsRequestSender::default(), None, &mut self.heaviest_subtree_fork_choice, + &mut true, + &[], ) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 04f42e6674e176..b1bb613e95659f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -36,6 +36,7 @@ use solana_sdk::{ genesis_config::ClusterType, hash::Hash, pubkey::Pubkey, + signature::Signature, signature::{Keypair, Signer}, timing::timestamp, transaction::Transaction, @@ -104,6 +105,7 @@ pub struct ReplayStageConfig { pub rewards_recorder_sender: Option, pub cache_block_time_sender: Option, pub bank_notification_sender: Option, + pub wait_for_vote_to_start_leader: bool, } #[derive(Default)] @@ -264,6 +266,7 @@ impl ReplayStage { rewards_recorder_sender, cache_block_time_sender, bank_notification_sender, + wait_for_vote_to_start_leader, } = config; trace!("replay stage"); @@ -293,6 +296,8 @@ impl ReplayStage { let mut partition_exists = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); + let mut voted_signatures = Vec::new(); + let mut has_voted = !wait_for_vote_to_start_leader; loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -479,6 +484,8 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &cache_block_time_sender, &bank_notification_sender, + &mut voted_signatures, + &mut has_voted, ); }; voting_time.stop(); @@ -570,6 +577,7 @@ impl ReplayStage { &progress, &retransmit_slots_sender, &mut skipped_slots_info, + has_voted, ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -885,7 +893,12 @@ impl ReplayStage { progress_map: &ProgressMap, retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, + has_voted: bool, ) { + if !has_voted { + info!("Haven't landed a vote, so skipping my leader slot"); + return; + } // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1077,6 +1090,8 @@ impl ReplayStage { heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, cache_block_time_sender: &Option, bank_notification_sender: &Option, + vote_signatures: &mut Vec, + has_voted: &mut bool, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1129,6 +1144,8 @@ impl ReplayStage { accounts_background_request_sender, highest_confirmed_root, heaviest_subtree_fork_choice, + has_voted, + vote_signatures, ); subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { @@ -1158,6 +1175,8 @@ impl ReplayStage { last_vote, &tower_slots, switch_fork_decision, + vote_signatures, + *has_voted, ); } @@ -1169,6 +1188,8 @@ impl ReplayStage { vote: Vote, tower: &[Slot], switch_fork_decision: &SwitchForkDecision, + vote_signatures: &mut Vec, + has_voted: bool, ) { if authorized_voter_keypairs.is_empty() { return; @@ -1238,6 +1259,14 @@ impl ReplayStage { let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); + if !has_voted { + vote_signatures.push(vote_tx.signatures[0]); + if vote_signatures.len() > 200 { + vote_signatures.remove(0); + } + } else { + vote_signatures.clear(); + } let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); @@ -1838,6 +1867,8 @@ impl ReplayStage { accounts_background_request_sender: &AbsRequestSender, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + has_voted: &mut bool, + voted_signatures: &[Signature], ) { bank_forks.write().unwrap().set_root( new_root, @@ -1845,6 +1876,15 @@ impl ReplayStage { highest_confirmed_root, ); let r_bank_forks = bank_forks.read().unwrap(); + let new_root_bank = &r_bank_forks[new_root]; + if !*has_voted { + for signature in voted_signatures { + if new_root_bank.get_signature_status(signature).is_some() { + *has_voted = true; + break; + } + } + } progress.handle_new_root(&r_bank_forks); heaviest_subtree_fork_choice.set_root(new_root); } @@ -2252,6 +2292,8 @@ pub(crate) mod tests { &AbsRequestSender::default(), None, &mut heaviest_subtree_fork_choice, + &mut true, + &[], ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); @@ -2296,6 +2338,8 @@ pub(crate) mod tests { &AbsRequestSender::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, + &mut true, + &[], ); assert_eq!(bank_forks.read().unwrap().root(), root); assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c83614dbd2452e..22c4237e7bdb28 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -84,6 +84,7 @@ pub struct TvuConfig { pub use_index_hash_calculation: bool, pub rocksdb_compaction_interval: Option, pub rocksdb_max_compaction_jitter: Option, + pub wait_for_vote_to_start_leader: bool, } impl Tvu { @@ -254,6 +255,7 @@ impl Tvu { rewards_recorder_sender, cache_block_time_sender, bank_notification_sender, + wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, }; let replay_stage = ReplayStage::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index b38428de003d65..9c5340e6547ee6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -573,9 +573,12 @@ impl Validator { check_poh_speed(&genesis_config, None); } - if wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check) { + let (failed, did_wait) = + wait_for_supermajority(config, &bank, &cluster_info, rpc_override_health_check); + if failed { abort(); } + let wait_for_vote_to_start_leader = !did_wait; let poh_service = PohService::new( poh_recorder.clone(), @@ -659,6 +662,7 @@ impl Validator { use_index_hash_calculation: config.accounts_db_use_index_hash_calculation, rocksdb_compaction_interval: config.rocksdb_compaction_interval, rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, + wait_for_vote_to_start_leader, }, &max_slots, ); @@ -1224,18 +1228,18 @@ fn wait_for_supermajority( bank: &Bank, cluster_info: &ClusterInfo, rpc_override_health_check: Arc, -) -> bool { +) -> (bool, bool) { if let Some(wait_for_supermajority) = config.wait_for_supermajority { match wait_for_supermajority.cmp(&bank.slot()) { - std::cmp::Ordering::Less => return false, + std::cmp::Ordering::Less => return (false, false), std::cmp::Ordering::Greater => { error!("Ledger does not have enough data to wait for supermajority, please enable snapshot fetch. Has {} needs {}", bank.slot(), wait_for_supermajority); - return true; + return (true, false); } _ => {} } } else { - return false; + return (false, false); } if let Some(expected_bank_hash) = config.expected_bank_hash { @@ -1245,7 +1249,7 @@ fn wait_for_supermajority( bank.hash(), expected_bank_hash ); - return true; + return (true, false); } } @@ -1269,7 +1273,7 @@ fn wait_for_supermajority( sleep(Duration::new(1, 0)); } rpc_override_health_check.store(false, Ordering::Relaxed); - false + (false, true) } fn report_target_features() { @@ -1545,41 +1549,45 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_config)); let mut config = ValidatorConfig::default(); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - assert!(!wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone() - )); + assert!( + !wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone() + ) + .0 + ); // bank=0, wait=1, should fail config.wait_for_supermajority = Some(1); - assert!(wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone() - )); + assert!( + wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone() + ) + .0 + ); // bank=1, wait=0, should pass, bank is past the wait slot let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1); config.wait_for_supermajority = Some(0); - assert!(!wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check.clone() - )); + assert!( + !wait_for_supermajority( + &config, + &bank, + &cluster_info, + rpc_override_health_check.clone() + ) + .0 + ); // bank=1, wait=1, equal, but bad hash provided config.wait_for_supermajority = Some(1); config.expected_bank_hash = Some(hash(&[1])); - assert!(wait_for_supermajority( - &config, - &bank, - &cluster_info, - rpc_override_health_check - )); + assert!(wait_for_supermajority(&config, &bank, &cluster_info, rpc_override_health_check).0); } #[test]