From 2262ee1c4a03529b5312ad53a2283361ce144f6d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 7 Oct 2019 21:24:45 +0900 Subject: [PATCH] Stabilize some banking stage tests Fixes #5660 --- core/src/banking_stage.rs | 43 ++++++++++++++++++++++++++------------- core/src/poh_service.rs | 23 ++++++++++++++++++++- core/src/tvu.rs | 2 +- sdk/src/poh_config.rs | 4 ++++ 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0c62b2e7c05130..963bdcbe8fa67c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -928,6 +928,7 @@ impl Service for BankingStage { pub fn create_test_recorder( bank: &Arc, blocktree: &Arc, + poh_config: Option, ) -> ( Arc, Arc>, @@ -935,7 +936,11 @@ pub fn create_test_recorder( Receiver, ) { let exit = Arc::new(AtomicBool::new(false)); - let poh_config = Arc::new(PohConfig::default()); + let poh_config = if poh_config.is_none() { + Arc::new(PohConfig::default()) + } else { + Arc::new(poh_config.unwrap()) + }; let (mut poh_recorder, entry_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), @@ -986,7 +991,7 @@ mod tests { Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), ); let (exit, poh_recorder, poh_service, _entry_receiever) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&bank, &blocktree, None); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let banking_stage = BankingStage::new( @@ -1020,8 +1025,10 @@ mod tests { let blocktree = Arc::new( Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), ); + let mut poh_config = PohConfig::default(); + poh_config.target_tick_count = Some(6); let (exit, poh_recorder, poh_service, entry_receiver) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&bank, &blocktree, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let banking_stage = BankingStage::new( @@ -1031,7 +1038,6 @@ mod tests { vote_receiver, ); trace!("sending bank"); - sleep(Duration::from_millis(600)); drop(verified_sender); drop(vote_sender); exit.store(true, Ordering::Relaxed); @@ -1069,8 +1075,11 @@ mod tests { let blocktree = Arc::new( Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), ); + let mut poh_config = PohConfig::default(); + // limit the tick to 1 to prevent clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + poh_config.target_tick_count = Some(1); let (exit, poh_recorder, poh_service, entry_receiver) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&bank, &blocktree, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let banking_stage = BankingStage::new( @@ -1120,6 +1129,9 @@ mod tests { drop(verified_sender); drop(vote_sender); + // wait until banking_stage to finish up all packets + banking_stage.join().unwrap(); + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -1128,18 +1140,20 @@ mod tests { let bank = Bank::new(&genesis_block); bank.process_transaction(&fund_tx).unwrap(); //receive entries + ticks - for _ in 0..10 { + loop { let entries: Vec = entry_receiver .iter() .map(|(_bank, (entry, _tick_height))| entry) .collect(); assert!(entries.verify(&blockhash)); - blockhash = entries.last().unwrap().hash; - for entry in entries { - bank.process_transactions(&entry.transactions) - .iter() - .for_each(|x| assert_eq!(*x, Ok(()))); + if !entries.is_empty() { + blockhash = entries.last().unwrap().hash; + for entry in entries { + bank.process_transactions(&entry.transactions) + .iter() + .for_each(|x| assert_eq!(*x, Ok(()))); + } } if bank.get_balance(&to) == 1 { @@ -1153,13 +1167,11 @@ mod tests { assert_eq!(bank.get_balance(&to2), 0); drop(entry_receiver); - banking_stage.join().unwrap(); } Blocktree::destroy(&ledger_path).unwrap(); } #[test] - #[ignore] fn test_banking_stage_entryfication() { solana_logger::setup(); // In this attack we'll demonstrate that a verifier can interpret the ledger @@ -1212,8 +1224,11 @@ mod tests { Blocktree::open(&ledger_path) .expect("Expected to be able to open database ledger"), ); + let mut poh_config = PohConfig::default(); + // limit the tick to 1 to prevent clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + poh_config.target_tick_count = Some(1); let (exit, poh_recorder, poh_service, entry_receiver) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&bank, &blocktree, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 798b37ab6a38ca..c5d338cd77c15d 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -32,7 +32,11 @@ impl PohService { .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { if poh_config.hashes_per_tick.is_none() { - Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_); + if poh_config.target_tick_count.is_none() { + Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_); + } else { + Self::short_lived_tick_producer(poh_recorder, &poh_config, &poh_exit_); + } } else { // PoH service runs in a tight loop, generating hashes as fast as possible. // Let's dedicate one of the CPU cores to this thread so that it can gain @@ -60,6 +64,22 @@ impl PohService { } } + fn short_lived_tick_producer( + poh_recorder: Arc>, + poh_config: &PohConfig, + poh_exit: &AtomicBool, + ) { + let mut warned = false; + for _ in 0..poh_config.target_tick_count.unwrap() { + sleep(poh_config.target_tick_duration); + poh_recorder.lock().unwrap().tick(); + if poh_exit.load(Ordering::Relaxed) && !warned { + warned = true; + warn!("exit signal is ignored because PohService is scheduled to exit soon"); + } + } + } + fn tick_producer(poh_recorder: Arc>, poh_exit: &AtomicBool) { let poh = poh_recorder.lock().unwrap().poh.clone(); loop { @@ -108,6 +128,7 @@ mod tests { let poh_config = Arc::new(PohConfig { hashes_per_tick: Some(2), target_tick_duration: Duration::from_millis(42), + target_tick_count: None, }); let (poh_recorder, entry_receiver) = PohRecorder::new( bank.tick_height(), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 394cf873c1528c..9dbeb28efb9ef7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -257,7 +257,7 @@ pub mod tests { let blocktree = Arc::new(blocktree); let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blocktree); + create_test_recorder(&bank, &blocktree, None); let voting_keypair = Keypair::new(); let storage_keypair = Arc::new(Keypair::new()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); diff --git a/sdk/src/poh_config.rs b/sdk/src/poh_config.rs index 5c39035365260d..e9e45b420ce044 100644 --- a/sdk/src/poh_config.rs +++ b/sdk/src/poh_config.rs @@ -6,6 +6,9 @@ pub struct PohConfig { /// The target tick rate of the cluster. pub target_tick_duration: Duration, + /// The target total tick count to be produced; used for testing only + pub target_tick_count: Option, + /// How many hashes to roll before emitting the next tick entry. /// None enables "Low power mode", which implies: /// * sleep for `target_tick_duration` instead of hashing @@ -18,6 +21,7 @@ impl PohConfig { Self { target_tick_duration, hashes_per_tick: None, + target_tick_count: None, } } }