Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize some banking stage tests #6251

Merged
merged 8 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion banking_bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn main() {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(
Expand Down
4 changes: 2 additions & 2 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);

let tx = test_tx();
let len = 4096;
Expand Down Expand Up @@ -198,7 +198,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(
Expand Down
40 changes: 26 additions & 14 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,15 @@ impl Service for BankingStage {
pub fn create_test_recorder(
bank: &Arc<Bank>,
blocktree: &Arc<Blocktree>,
poh_config: Option<PohConfig>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(PohConfig::default());
let poh_config = Arc::new(poh_config.unwrap_or_default());
let (mut poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
Expand Down Expand Up @@ -986,7 +987,7 @@ mod tests {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _entry_receiever) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand All @@ -1011,6 +1012,7 @@ mod tests {
mut genesis_block, ..
} = create_genesis_block(2);
genesis_block.ticks_per_slot = 4;
let num_extra_ticks = 2;
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded();
Expand All @@ -1020,8 +1022,10 @@ mod tests {
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
poh_config.target_tick_count = Some(bank.max_tick_height() + num_extra_ticks);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand All @@ -1031,7 +1035,6 @@ mod tests {
vote_receiver,
);
trace!("sending bank");
sleep(Duration::from_millis(600));
Copy link
Member Author

@ryoqun ryoqun Oct 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because poh_config's life time is controlled by the target_tick_count, this is no longer needed.

drop(verified_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -1069,8 +1072,11 @@ mod tests {
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
// limit tick count to avoid clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
poh_config.target_tick_count = Some(bank.max_tick_height() - 1);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(
Expand Down Expand Up @@ -1120,6 +1126,9 @@ mod tests {

drop(verified_sender);
drop(vote_sender);
// wait until banking_stage to finish up all packets
banking_stage.join().unwrap();

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);
Expand All @@ -1128,18 +1137,20 @@ mod tests {
let bank = Bank::new(&genesis_block);
bank.process_transaction(&fund_tx).unwrap();
//receive entries + ticks
for _ in 0..10 {
loop {
let entries: Vec<Entry> = entry_receiver
.iter()
.map(|(_bank, (entry, _tick_height))| entry)
.collect();

assert!(entries.verify(&blockhash));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assertion should be moved inside the newly-added empty guard if clause.

blockhash = entries.last().unwrap().hash;
for entry in entries {
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
if !entries.is_empty() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under high load, I observed empty entries are returned.

blockhash = entries.last().unwrap().hash;
for entry in entries {
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
}
}

if bank.get_balance(&to) == 1 {
Expand All @@ -1153,13 +1164,11 @@ mod tests {
assert_eq!(bank.get_balance(&to2), 0);

drop(entry_receiver);
banking_stage.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
}

#[test]
#[ignore]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

fn test_banking_stage_entryfication() {
solana_logger::setup();
// In this attack we'll demonstrate that a verifier can interpret the ledger
Expand Down Expand Up @@ -1212,8 +1221,11 @@ mod tests {
Blocktree::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let mut poh_config = PohConfig::default();
// limit tick count to avoid clearing working_bank at PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
poh_config.target_tick_count = Some(bank.max_tick_height() - 1);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, Some(poh_config));
let cluster_info =
ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
Expand Down
27 changes: 26 additions & 1 deletion core/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
if poh_config.hashes_per_tick.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
if poh_config.target_tick_count.is_none() {
Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_);
} else {
Self::short_lived_sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
);
}
} else {
// PoH service runs in a tight loop, generating hashes as fast as possible.
// Let's dedicate one of the CPU cores to this thread so that it can gain
Expand Down Expand Up @@ -60,6 +68,22 @@ impl PohService {
}
}

fn short_lived_sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
) {
let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() {
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) && !warned {
warned = true;
warn!("exit signal is ignored because PohService is scheduled to exit soon");
}
}
}

fn tick_producer(poh_recorder: Arc<Mutex<PohRecorder>>, poh_exit: &AtomicBool) {
let poh = poh_recorder.lock().unwrap().poh.clone();
loop {
Expand Down Expand Up @@ -108,6 +132,7 @@ mod tests {
let poh_config = Arc::new(PohConfig {
hashes_per_tick: Some(2),
target_tick_duration: Duration::from_millis(42),
target_tick_count: None,
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub mod tests {
let blocktree = Arc::new(blocktree);
let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blocktree);
create_test_recorder(&bank, &blocktree, None);
let voting_keypair = Keypair::new();
let storage_keypair = Arc::new(Keypair::new());
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
Expand Down
1 change: 1 addition & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,7 @@ mod tests {
/ DEFAULT_TICKS_PER_SLOT,
),
hashes_per_tick: None,
target_tick_count: None,
},

..GenesisBlock::default()
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/poh_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ pub struct PohConfig {
/// The target tick rate of the cluster.
pub target_tick_duration: Duration,

/// The target total tick count to be produced; used for testing only
pub target_tick_count: Option<u64>,

/// How many hashes to roll before emitting the next tick entry.
/// None enables "Low power mode", which implies:
/// * sleep for `target_tick_duration` instead of hashing
Expand All @@ -18,6 +21,7 @@ impl PohConfig {
Self {
target_tick_duration,
hashes_per_tick: None,
target_tick_count: None,
}
}
}
Expand Down