Skip to content

Commit

Permalink
Cleanup banking bench (#24851)
Browse files Browse the repository at this point in the history
* Cleanup banking bench

* Fully remove assert

(cherry picked from commit e83efe6)

# Conflicts:
#	banking-bench/src/main.rs
  • Loading branch information
carllin authored and mergify[bot] committed May 2, 2022
1 parent 504df47 commit 24b8ede
Showing 1 changed file with 156 additions and 36 deletions.
192 changes: 156 additions & 36 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use {
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
solana_perf::packet::to_packet_batches,
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
Expand Down Expand Up @@ -55,7 +55,6 @@ fn check_txs(
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
trace!("no bank");
no_bank = true;
break;
}
Expand Down Expand Up @@ -89,20 +88,52 @@ fn make_accounts_txs(
.collect()
}

struct Config {
struct PacketsPerIteration {
packet_batches: Vec<PacketBatch>,
transactions: Vec<Transaction>,
packets_per_batch: usize,
chunk_len: usize,
num_threads: usize,
}

<<<<<<< HEAD
impl Config {
fn get_transactions_index(&self, chunk_index: usize) -> usize {
chunk_index * (self.chunk_len / self.num_threads) * self.packets_per_batch
=======
impl PacketsPerIteration {
fn new(
packets_per_batch: usize,
batches_per_iteration: usize,
genesis_hash: Hash,
write_lock_contention: WriteLockContention,
) -> Self {
let total_num_transactions = packets_per_batch * batches_per_iteration;
let transactions = make_accounts_txs(
total_num_transactions,
packets_per_batch,
genesis_hash,
write_lock_contention,
);

let packet_batches: Vec<PacketBatch> = to_packet_batches(&transactions, packets_per_batch);
assert_eq!(packet_batches.len(), batches_per_iteration);
Self {
packet_batches,
transactions,
packets_per_batch,
}
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
}
}

fn bytes_as_usize(bytes: &[u8]) -> usize {
bytes[0] as usize | (bytes[1] as usize) << 8
fn refresh_blockhash(&mut self, new_blockhash: Hash) {
for tx in self.transactions.iter_mut() {
tx.message.recent_blockhash = new_blockhash;
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
}
self.packet_batches = to_packet_batches(&self.transactions, self.packets_per_batch);
}
}

#[allow(clippy::cognitive_complexity)]
Expand All @@ -113,7 +144,17 @@ fn main() {
.about(crate_description!())
.version(solana_version::version!())
.arg(
<<<<<<< HEAD
Arg::with_name("num_chunks")
=======
Arg::new("iterations")
.long("iterations")
.takes_value(true)
.help("Number of test iterations"),
)
.arg(
Arg::new("num_chunks")
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
.long("num-chunks")
.takes_value(true)
.value_name("SIZE")
Expand All @@ -139,6 +180,7 @@ fn main() {
.help("Use the same payer for transfers"),
)
.arg(
<<<<<<< HEAD
Arg::with_name("iterations")
.long("iterations")
.takes_value(true)
Expand All @@ -147,6 +189,16 @@ fn main() {
.arg(
Arg::with_name("num_threads")
.long("num-threads")
=======
Arg::new("batches_per_iteration")
.long("batches-per-iteration")
.takes_value(true)
.help("Number of batches to send in each iteration"),
)
.arg(
Arg::new("num_banking_threads")
.long("num-banking-threads")
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
.takes_value(true)
.help("Number of iterations"),
)
Expand All @@ -159,7 +211,10 @@ fn main() {
let packets_per_chunk = value_t!(matches, "packets_per_chunk", usize).unwrap_or(192);
let iterations = value_t!(matches, "iterations", usize).unwrap_or(1000);

<<<<<<< HEAD
let total_num_transactions = num_chunks * num_threads * packets_per_chunk;
=======
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
let mint_total = 1_000_000_000_000;
let GenesisConfigInfo {
genesis_config,
Expand All @@ -180,6 +235,7 @@ fn main() {
.unwrap()
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);

<<<<<<< HEAD
info!("threads: {} txs: {}", num_threads, total_num_transactions);

let same_payer = matches.is_present("same_payer");
Expand All @@ -199,26 +255,86 @@ fn main() {
fund.signatures = vec![Signature::new(&sig[0..64])];
let x = bank.process_transaction(&fund);
x.unwrap();
=======
let mut all_packets: Vec<PacketsPerIteration> = std::iter::from_fn(|| {
Some(PacketsPerIteration::new(
packets_per_batch,
batches_per_iteration,
genesis_config.hash(),
write_lock_contention,
))
})
.take(num_chunks)
.collect();

// fund all the accounts
let total_num_transactions: u64 = all_packets
.iter()
.map(|packets_for_single_iteration| packets_for_single_iteration.transactions.len() as u64)
.sum();
info!(
"threads: {} txs: {}",
num_banking_threads, total_num_transactions
);

all_packets.iter().for_each(|packets_for_single_iteration| {
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let mut fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / total_num_transactions,
genesis_config.hash(),
);
// Ignore any pesky duplicate signature errors in the case we are using single-payer
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
fund.signatures = vec![Signature::new(&sig[0..64])];
bank.process_transaction(&fund).unwrap();
});
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
});

let skip_sanity = matches.is_present("skip_sanity");
if !skip_sanity {
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute sequentially
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
});
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel

<<<<<<< HEAD
let res = bank.process_transactions(transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
=======
if write_lock_contention == WriteLockContention::None {
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute in parallel
let res =
bank.process_transactions(packets_for_single_iteration.transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
}
bank.clear_signatures();
});
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
}
bank.clear_signatures();
}

<<<<<<< HEAD
let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_chunk);
=======
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -249,9 +365,12 @@ fn main() {
);
poh_recorder.lock().unwrap().set_bank(&bank);

<<<<<<< HEAD
let chunk_len = verified.len() / num_chunks;
let mut start = 0;

=======
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
Expand All @@ -262,16 +381,21 @@ fn main() {
let mut txs_processed = 0;
let mut root = 1;
let collector = solana_sdk::pubkey::new_rand();
<<<<<<< HEAD
let config = Config {
packets_per_batch: packets_per_chunk,
chunk_len,
num_threads,
};
=======
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
let mut total_sent = 0;
for _ in 0..iterations {
for current_iteration_index in 0..iterations {
trace!("RUNNING ITERATION {}", current_iteration_index);
let now = Instant::now();
let mut sent = 0;

<<<<<<< HEAD
for (i, v) in verified[start..start + chunk_len]
.chunks(chunk_len / num_threads)
.enumerate()
Expand All @@ -281,24 +405,22 @@ fn main() {
if index < transactions.len() {
byte = bytes_as_usize(transactions[index].signatures[0].as_ref());
}
=======
let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks];
for (packet_batch_index, packet_batch) in
packets_for_this_iteration.packet_batches.iter().enumerate()
{
sent += packet_batch.packets.len();
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
trace!(
"sending... {}..{} {} v.len: {} sig: {} transactions.len: {} index: {}",
start + i,
start + chunk_len,
"Sending PacketBatch index {}, {}",
packet_batch_index,
timestamp(),
v.len(),
byte,
transactions.len(),
index,
);
for xv in v {
sent += xv.packets.len();
}
verified_sender.send(v.to_vec()).unwrap();
verified_sender.send(vec![packet_batch.clone()]).unwrap();
}
let start_tx_index = config.get_transactions_index(start);
let end_tx_index = config.get_transactions_index(start + chunk_len);
for tx in &transactions[start_tx_index..end_tx_index] {

for tx in &packets_for_this_iteration.transactions {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
Expand All @@ -311,7 +433,7 @@ fn main() {
}
if check_txs(
&signal_receiver,
total_num_transactions / num_chunks,
packets_for_this_iteration.transactions.len(),
&poh_recorder,
) {
debug!(
Expand All @@ -320,7 +442,6 @@ fn main() {
bank.transaction_count(),
txs_processed
);
assert!(txs_processed < bank.transaction_count());
txs_processed = bank.transaction_count();
tx_total_us += duration_as_us(&now.elapsed());

Expand Down Expand Up @@ -372,22 +493,21 @@ fn main() {
debug!(
"time: {} us checked: {} sent: {}",
duration_as_us(&now.elapsed()),
total_num_transactions / num_chunks,
total_num_transactions / num_chunks as u64,
sent,
);
total_sent += sent;

if bank.slot() > 0 && bank.slot() % 16 == 0 {
for tx in transactions.iter_mut() {
tx.message.recent_blockhash = bank.last_blockhash();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
if current_iteration_index % 16 == 0 {
let last_blockhash = bank.last_blockhash();
for packets_for_single_iteration in all_packets.iter_mut() {
packets_for_single_iteration.refresh_blockhash(last_blockhash);
}
<<<<<<< HEAD
verified = to_packet_batches(&transactions.clone(), packets_per_chunk);
=======
>>>>>>> e83efe678 (Cleanup banking bench (#24851))
}

start += chunk_len;
start %= verified.len();
}
let txs_processed = bank_forks.working_bank().transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
Expand Down

0 comments on commit 24b8ede

Please sign in to comment.