Skip to content

Commit

Permalink
Stabilize some banking stage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Oct 7, 2019
1 parent c34cc49 commit 2262ee1
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
43 changes: 29 additions & 14 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,19 @@ impl Service for BankingStage {
pub fn create_test_recorder(
bank: &Arc<Bank>,
blocktree: &Arc<Blocktree>,
poh_config: Option<PohConfig>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(PohConfig::default());
let poh_config = if poh_config.is_none() {
Arc::new(PohConfig::default())
} else {
Arc::new(poh_config.unwrap())
};
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
Expand Down Expand Up @@ -986,7 +991,7 @@ mod tests {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _entry_receiever) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand Down Expand Up @@ -1020,8 +1025,10 @@ mod tests {
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
poh_config.target_tick_count = Some(6);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand All @@ -1031,7 +1038,6 @@ mod tests {
vote_receiver,
);
trace!("sending bank");
sleep(Duration::from_millis(600));
drop(verified_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -1069,8 +1075,11 @@ mod tests {
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
// limit the tick to 1 to prevent clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
poh_config.target_tick_count = Some(1);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand Down Expand Up @@ -1120,6 +1129,9 @@ mod tests {

drop(verified_sender);
drop(vote_sender);
// wait until banking_stage to finish up all packets
banking_stage.join().unwrap();

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);
Expand All @@ -1128,18 +1140,20 @@ mod tests {
let bank = Bank::new(&genesis_block);
bank.process_transaction(&fund_tx).unwrap();
//receive entries + ticks
for _ in 0..10 {
loop {
let entries: Vec<Entry> = entry_receiver
.iter()
.map(|(_bank, (entry, _tick_height))| entry)
.collect();

assert!(entries.verify(&blockhash));
blockhash = entries.last().unwrap().hash;
for entry in entries {
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
if !entries.is_empty() {
blockhash = entries.last().unwrap().hash;
for entry in entries {
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
}
}

if bank.get_balance(&to) == 1 {
Expand All @@ -1153,13 +1167,11 @@ mod tests {
assert_eq!(bank.get_balance(&to2), 0);

drop(entry_receiver);
banking_stage.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
}

#[test]
#[ignore]
fn test_banking_stage_entryfication() {
solana_logger::setup();
// In this attack we'll demonstrate that a verifier can interpret the ledger
Expand Down Expand Up @@ -1212,8 +1224,11 @@ mod tests {
Blocktree::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
// limit the tick to 1 to prevent clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
poh_config.target_tick_count = Some(1);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info =
ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
Expand Down
23 changes: 22 additions & 1 deletion core/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
if poh_config.hashes_per_tick.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
if poh_config.target_tick_count.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
} else {
Self::short_lived_tick_producer(poh_recorder, &poh_config, &poh_exit_);
}
} else {
// PoH service runs in a tight loop, generating hashes as fast as possible.
// Let's dedicate one of the CPU cores to this thread so that it can gain
Expand Down Expand Up @@ -60,6 +64,22 @@ impl PohService {
}
}

fn short_lived_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
) {
let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() {
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) && !warned {
warned = true;
warn!("exit signal is ignored because PohService is scheduled to exit soon");
}
}
}

fn tick_producer(poh_recorder: Arc<Mutex<PohRecorder>>, poh_exit: &AtomicBool) {
let poh = poh_recorder.lock().unwrap().poh.clone();
loop {
Expand Down Expand Up @@ -108,6 +128,7 @@ mod tests {
let poh_config = Arc::new(PohConfig {
hashes_per_tick: Some(2),
target_tick_duration: Duration::from_millis(42),
target_tick_count: None,
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub mod tests {
let blocktree = Arc::new(blocktree);
let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let voting_keypair = Keypair::new();
let storage_keypair = Arc::new(Keypair::new());
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/poh_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ pub struct PohConfig {
/// The target tick rate of the cluster.
pub target_tick_duration: Duration,

/// The target total tick count to be produced; used for testing only
pub target_tick_count: Option<u64>,

/// How many hashes to roll before emitting the next tick entry.
/// None enables "Low power mode", which implies:
/// * sleep for `target_tick_duration` instead of hashing
Expand All @@ -18,6 +21,7 @@ impl PohConfig {
Self {
target_tick_duration,
hashes_per_tick: None,
target_tick_count: None,
}
}
}
Expand Down

0 comments on commit 2262ee1

Please sign in to comment.