From 4b5be3e0a7e3edb93c06f8967fa12585e1c9343b Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 12 Mar 2024 17:33:30 -0500 Subject: [PATCH 1/5] Share the threadpool for tx execution and entry verifification Previously, entry verification had a dedicated threadpool used to verify PoH hashes as well as some basic transaction verification via Bank::verify_transaction(). It should also be noted that the entry verification code provides logic to offload to a GPU if one is present. Regardless of whether a GPU is present or not, some of the verification must be done on a CPU. Moreso, the CPU verification of entries and transaction execution are serial operations; entry verification finishes first before moving onto transaction execution. So, tx execution and entry verification are not competing for CPU cycles at the same time and can use the same pool. One exception to the above statement is that if someone is using the feature to replay forks in parallel, then hypothetically, different forks may end up competing for the same resources at the same time. However, that is already true given that we had pools that were shared between replay of multiple forks. So, this change doesn't really change much for that case, but will reduce overhead in the single fork case which is the vast majority of the time. --- entry/src/entry.rs | 69 ++++++++++++++---------------- ledger/src/blockstore_processor.rs | 40 ++++++++++------- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 46aad401dec9b0..13b106cfbc36c2 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -6,7 +6,6 @@ use { crate::poh::Poh, crossbeam_channel::{Receiver, Sender}, dlopen2::symbor::{Container, SymBorApi, Symbol}, - lazy_static::lazy_static, log::*, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, @@ -21,7 +20,6 @@ use { recycler::Recycler, sigverify, }, - solana_rayon_threadlimit::get_max_thread_count, solana_sdk::{ hash::Hash, packet::Meta, @@ -41,16 +39,6 @@ use { }, }; -// get_max_thread_count to match number of threads in the old code. -// see: https://github.com/solana-labs/solana/pull/24853 -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solEntry{i:02}")) - .build() - .unwrap(); -} - pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -359,7 +347,7 @@ impl EntryVerificationState { self.poh_duration_us } - pub fn finish_verify(&mut self) -> bool { + pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool { match &mut self.device_verification_data { DeviceVerificationData::Gpu(verification_state) => { let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); @@ -370,7 +358,7 @@ impl EntryVerificationState { .expect("unwrap Arc") .into_inner() .expect("into_inner"); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes .into_par_iter() .cloned() @@ -405,9 +393,10 @@ impl EntryVerificationState { pub fn verify_transactions( entries: Vec, + thread_pool: &ThreadPool, verify: Arc Result + Send + Sync>, ) -> Result> { - PAR_THREAD_POOL.install(|| { + thread_pool.install(|| { entries .into_par_iter() .map(|entry| { @@ -430,6 +419,7 @@ pub fn verify_transactions( pub fn start_verify_transactions( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify_recyclers: VerifyRecyclers, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result @@ -459,15 +449,16 @@ pub fn start_verify_transactions( .is_some(); if use_cpu { - start_verify_transactions_cpu(entries, skip_verification, verify) + start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify) } else { - start_verify_transactions_gpu(entries, verify_recyclers, verify) + start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify) } } fn start_verify_transactions_cpu( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -484,7 +475,7 @@ fn start_verify_transactions_cpu( move |versioned_tx| verify(versioned_tx, mode) }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; Ok(EntrySigVerificationState { verification_status: EntryVerificationStatus::Success, @@ -497,6 +488,7 @@ fn start_verify_transactions_cpu( fn start_verify_transactions_gpu( entries: Vec, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -512,7 +504,7 @@ fn start_verify_transactions_gpu( } }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; let entry_txs: Vec<&SanitizedTransaction> = entries .iter() @@ -618,12 +610,12 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState; - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState; + fn start_verify(&self, start_hash: &Hash, thread_pool: &ThreadPool, recyclers: VerifyRecyclers) -> EntryVerificationState; - fn verify(&self, start_hash: &Hash) -> bool; + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count /// for the next entry slice. @@ -633,12 +625,12 @@ pub trait EntrySlice { } impl EntrySlice for [Entry] { - fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash, VerifyRecyclers::default()) - .finish_verify() + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool { + self.start_verify(start_hash, thread_pool, VerifyRecyclers::default()) + .finish_verify(thread_pool) } - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -646,7 +638,7 @@ impl EntrySlice for [Entry] { transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { entry_pairs.all(|(x0, x1)| { let r = x1.verify(&x0.hash); if !r { @@ -672,7 +664,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState { + fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState { use solana_sdk::hash::HASH_BYTES; let now = Instant::now(); let genesis = [Entry { @@ -703,7 +695,7 @@ impl EntrySlice for [Entry] { num_hashes.resize(aligned_len, 0); let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect(); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes_chunked .par_iter_mut() .zip(num_hashes) @@ -753,7 +745,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] let (has_avx2, has_avx512) = ( is_x86_feature_detected!("avx2"), @@ -764,25 +756,26 @@ impl EntrySlice for [Entry] { if api().is_some() { if has_avx512 && self.len() >= 128 { - self.verify_cpu_x86_simd(start_hash, 16) + self.verify_cpu_x86_simd(start_hash, 16, thread_pool) } else if has_avx2 && self.len() >= 48 { - self.verify_cpu_x86_simd(start_hash, 8) + self.verify_cpu_x86_simd(start_hash, 8, thread_pool) } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } fn start_verify( &self, start_hash: &Hash, + thread_pool: &ThreadPool, recyclers: VerifyRecyclers, ) -> EntryVerificationState { let start = Instant::now(); let Some(api) = perf_libs::api() else { - return self.verify_cpu(start_hash); + return self.verify_cpu(start_hash, thread_pool); }; inc_new_counter_info!("entry_verify-num_entries", self.len()); @@ -839,7 +832,7 @@ impl EntrySlice for [Entry] { }) .unwrap(); - let verifications = PAR_THREAD_POOL.install(|| { + let verifications = thread_pool.install(|| { self.into_par_iter() .map(|entry| { let answer = entry.hash; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9eace1e7c9cd34..7f419c46493251 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -519,20 +519,23 @@ pub fn process_entries_for_tests( let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); let mut batch_timing = BatchExecutionTiming::default(); - let mut replay_entries: Vec<_> = - entry::verify_transactions(entries, Arc::new(verify_transaction))? - .into_iter() - .map(|entry| { - let starting_index = entry_starting_index; - if let EntryType::Transactions(ref transactions) = entry { - entry_starting_index = entry_starting_index.saturating_add(transactions.len()); - } - ReplayEntry { - entry, - starting_index, - } - }) - .collect(); + let mut replay_entries: Vec<_> = entry::verify_transactions( + entries, + &replay_tx_thread_pool, + Arc::new(verify_transaction), + )? + .into_iter() + .map(|entry| { + let starting_index = entry_starting_index; + if let EntryType::Transactions(ref transactions) = entry { + entry_starting_index = entry_starting_index.saturating_add(transactions.len()); + } + ReplayEntry { + entry, + starting_index, + } + }) + .collect(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( @@ -1292,7 +1295,11 @@ fn confirm_slot_entries( let last_entry_hash = entries.last().map(|e| e.hash); let verifier = if !skip_verification { datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); - let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); + let entry_state = entries.start_verify( + &progress.last_entry, + replay_tx_thread_pool, + recyclers.clone(), + ); if entry_state.status() == EntryVerificationStatus::Failure { warn!("Ledger proof of history failed at slot: {}", slot); return Err(BlockError::InvalidEntryHash.into()); @@ -1315,6 +1322,7 @@ fn confirm_slot_entries( let transaction_verification_result = entry::start_verify_transactions( entries, skip_verification, + replay_tx_thread_pool, recyclers.clone(), Arc::new(verify_transaction), ); @@ -1381,7 +1389,7 @@ fn confirm_slot_entries( } if let Some(mut verifier) = verifier { - let verified = verifier.finish_verify(); + let verified = verifier.finish_verify(replay_tx_thread_pool); *poh_verify_elapsed += verifier.poh_duration_us(); if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot()); From 698545511522f810671cbc4013724745e9455505 Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 25 Mar 2024 11:42:09 -0500 Subject: [PATCH 2/5] Update unit tests --- core/src/banking_stage.rs | 6 +- entry/src/entry.rs | 125 ++++++++++++++++++++++------- local-cluster/src/cluster_tests.rs | 14 ++-- 3 files changed, 107 insertions(+), 38 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 603ff55f0003b4..134ad09786d670 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -786,7 +786,7 @@ mod tests { crate::banking_trace::{BankingPacketBatch, BankingTracer}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{entry_thread_pool_for_tests, Entry, EntrySlice}, solana_gossip::cluster_info::Node, solana_ledger::{ blockstore::Blockstore, @@ -941,7 +941,7 @@ mod tests { .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); - assert!(entries.verify(&start_hash)); + assert!(entries.verify(&start_hash, &entry_thread_pool_for_tests())); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } @@ -1060,7 +1060,7 @@ mod tests { .map(|(_bank, (entry, _tick_height))| entry) .collect(); - assert!(entries.verify(&blockhash)); + assert!(entries.verify(&blockhash, &entry_thread_pool_for_tests())); if !entries.is_empty() { blockhash = entries.last().unwrap().hash; for entry in entries { diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 13b106cfbc36c2..6160f0042962ec 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -611,10 +611,23 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; - fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState; - fn start_verify(&self, start_hash: &Hash, thread_pool: &ThreadPool, recyclers: VerifyRecyclers) - -> EntryVerificationState; + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn start_verify( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + recyclers: VerifyRecyclers, + ) -> EntryVerificationState; fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count @@ -630,7 +643,11 @@ impl EntrySlice for [Entry] { .finish_verify(thread_pool) } - fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -664,7 +681,12 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState { + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { use solana_sdk::hash::HASH_BYTES; let now = Instant::now(); let genesis = [Entry { @@ -931,6 +953,14 @@ pub fn next_versioned_entry( } } +pub fn entry_thread_pool_for_tests() -> ThreadPool { + rayon::ThreadPoolBuilder::new() + .num_threads(4) + .thread_name(|i| format!("solEntryTest{i:02}")) + .build() + .expect("new rayon threadpool") +} + #[cfg(test)] mod tests { use { @@ -961,6 +991,7 @@ mod tests { entries: Vec, skip_verification: bool, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn( VersionedTransaction, @@ -982,10 +1013,16 @@ mod tests { } }; - let cpu_verify_result = verify_transactions(entries.clone(), Arc::new(verify_func)); + let cpu_verify_result = + verify_transactions(entries.clone(), thread_pool, Arc::new(verify_func)); let mut gpu_verify_result: EntrySigVerificationState = { - let verify_result = - start_verify_transactions(entries, skip_verification, verify_recyclers, verify); + let verify_result = start_verify_transactions( + entries, + skip_verification, + thread_pool, + verify_recyclers, + verify, + ); match verify_result { Ok(res) => res, _ => EntrySigVerificationState { @@ -1015,6 +1052,8 @@ mod tests { #[test] fn test_entry_gpu_verify() { + let thread_pool = entry_thread_pool_for_tests(); + let verify_transaction = { move |versioned_tx: VersionedTransaction, verification_mode: TransactionVerificationMode| @@ -1060,12 +1099,14 @@ mod tests { entries_invalid, false, recycler.clone(), + &thread_pool, Arc::new(verify_transaction) )); assert!(test_verify_transactions( entries_valid, false, recycler, + &thread_pool, Arc::new(verify_transaction) )); } @@ -1089,6 +1130,8 @@ mod tests { #[test] fn test_transaction_signing() { + let thread_pool = entry_thread_pool_for_tests(); + use solana_sdk::signature::Signature; let zero = Hash::default(); @@ -1098,27 +1141,27 @@ mod tests { // Verify entry with 2 transactions let mut e0 = [Entry::new(&zero, 0, vec![tx0, tx1])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Clear signature of the first transaction, see that it does not verify let orig_sig = e0[0].transactions[0].signatures[0]; e0[0].transactions[0].signatures[0] = Signature::default(); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // restore original signature e0[0].transactions[0].signatures[0] = orig_sig; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Resize signatures and see verification fails. let len = e0[0].transactions[0].signatures.len(); e0[0].transactions[0] .signatures .resize(len - 1, Signature::default()); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // Pass an entry with no transactions let e0 = [Entry::new(&zero, 0, vec![])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); } #[test] @@ -1151,41 +1194,57 @@ mod tests { #[test] fn test_verify_slice1() { solana_logger::setup(); + let thread_pool = entry_thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); - assert!(vec![][..].verify(&zero)); // base case - assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 - assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad - assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step + // base case + assert!(vec![][..].verify(&zero, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one, &thread_pool)); + // inductive step + assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero, &thread_pool)); let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2]; bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&zero)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&zero, &thread_pool)); } #[test] fn test_verify_slice_with_hashes1() { solana_logger::setup(); + let thread_pool = entry_thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![Entry::new_tick(1, &two)][..].verify(&one)); // singleton case 1 - assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(1, &two)][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![])]; ticks.push(next_entry(&ticks.last().unwrap().hash, 1, vec![])); - assert!(ticks.verify(&one)); // inductive step + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] fn test_verify_slice_with_hashes_and_transactions() { solana_logger::setup(); + let thread_pool = entry_thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); @@ -1193,9 +1252,12 @@ mod tests { let bob_keypair = Keypair::new(); let tx0 = system_transaction::transfer(&alice_keypair, &bob_keypair.pubkey(), 1, one); let tx1 = system_transaction::transfer(&bob_keypair, &alice_keypair.pubkey(), 1, one); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one)); // singleton case 1 - assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![tx0.clone()])]; ticks.push(next_entry( @@ -1203,12 +1265,15 @@ mod tests { 1, vec![tx1.clone()], )); - assert!(ticks.verify(&one)); // inductive step + + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![tx0])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![tx1])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] @@ -1347,7 +1412,7 @@ mod tests { info!("done.. {}", time); let mut time = Measure::start("poh"); - let res = entries.verify(&Hash::default()); + let res = entries.verify(&Hash::default(), &entry_thread_pool_for_tests()); assert_eq!(res, !modified); time.stop(); info!("{} {}", time, res); diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index dffe2a8713ab08..1bea43aca32f64 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -5,7 +5,7 @@ use log::*; use { rand::{thread_rng, Rng}, - rayon::prelude::*, + rayon::{prelude::*, ThreadPool}, solana_client::{ connection_cache::{ConnectionCache, Protocol}, thin_client::ThinClient, @@ -14,7 +14,7 @@ use { tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, }, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::{ cluster_info::{self, ClusterInfo}, contact_info::{ContactInfo, LegacyContactInfo}, @@ -180,6 +180,8 @@ pub fn send_many_transactions( pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { let ledger = Blockstore::open(ledger_path).unwrap(); + let thread_pool = entry::entry_thread_pool_for_tests(); + let zeroth_slot = ledger.get_slot_entries(0, 0).unwrap(); let last_id = zeroth_slot.last().unwrap().hash; let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap(); @@ -201,7 +203,7 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { None }; - let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks); + let last_id = verify_slot_ticks(&ledger, &thread_pool, slot, &last_id, should_verify_ticks); pending_slots.extend( next_slots .into_iter() @@ -630,21 +632,23 @@ pub fn start_gossip_voter( fn get_and_verify_slot_entries( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, ) -> Vec { let entries = blockstore.get_slot_entries(slot, 0).unwrap(); - assert!(entries.verify(last_entry)); + assert!(entries.verify(last_entry, thread_pool)); entries } fn verify_slot_ticks( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, expected_num_ticks: Option, ) -> Hash { - let entries = get_and_verify_slot_entries(blockstore, slot, last_entry); + let entries = get_and_verify_slot_entries(blockstore, thread_pool, slot, last_entry); let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum(); if let Some(expected_num_ticks) = expected_num_ticks { assert_eq!(num_ticks, expected_num_ticks); From 53cb3463e54d553f82da87103d05e80f708d7420 Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 25 Mar 2024 12:16:20 -0500 Subject: [PATCH 3/5] Fixup benches --- Cargo.lock | 1 + entry/benches/entry_sigverify.rs | 6 +++++- entry/src/entry.rs | 9 +++++++++ poh-bench/Cargo.toml | 1 + poh-bench/src/main.rs | 25 +++++++++++++++++-------- poh/benches/poh_verify.rs | 10 +++++++--- 6 files changed, 40 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8836adf757915..f1c06592e86c3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6613,6 +6613,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-perf", + "solana-rayon-threadlimit", "solana-sdk", "solana-version", ] diff --git a/entry/benches/entry_sigverify.rs b/entry/benches/entry_sigverify.rs index b3a1b7b5cdb3e6..09adeb6cfd831a 100644 --- a/entry/benches/entry_sigverify.rs +++ b/entry/benches/entry_sigverify.rs @@ -16,6 +16,7 @@ use { #[bench] fn bench_gpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -53,6 +54,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { let res = entry::start_verify_transactions( entries.clone(), false, + &thread_pool, recycler.clone(), Arc::new(verify_transaction), ); @@ -65,6 +67,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { #[bench] fn bench_cpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -89,6 +92,7 @@ fn bench_cpusigverify(bencher: &mut Bencher) { }; bencher.iter(|| { - let _ans = entry::verify_transactions(entries.clone(), Arc::new(verify_transaction)); + let _ans = + entry::verify_transactions(entries.clone(), &thread_pool, Arc::new(verify_transaction)); }) } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 6160f0042962ec..3883462de672ac 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -20,6 +20,7 @@ use { recycler::Recycler, sigverify, }, + solana_rayon_threadlimit::get_max_thread_count, solana_sdk::{ hash::Hash, packet::Meta, @@ -961,6 +962,14 @@ pub fn entry_thread_pool_for_tests() -> ThreadPool { .expect("new rayon threadpool") } +pub fn thread_pool_for_benches() -> ThreadPool { + rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|i| format!("solEntryBnch{i:02}")) + .build() + .expect("new rayon threadpool") +} + #[cfg(test)] mod tests { use { diff --git a/poh-bench/Cargo.toml b/poh-bench/Cargo.toml index fb44c0cb81d966..8cd3979b17c79b 100644 --- a/poh-bench/Cargo.toml +++ b/poh-bench/Cargo.toml @@ -17,6 +17,7 @@ solana-entry = { workspace = true } solana-logger = { workspace = true } solana-measure = { workspace = true } solana-perf = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-sdk = { workspace = true } solana-version = { workspace = true } diff --git a/poh-bench/src/main.rs b/poh-bench/src/main.rs index d835bac05a3ff9..941d581a825b73 100644 --- a/poh-bench/src/main.rs +++ b/poh-bench/src/main.rs @@ -7,6 +7,7 @@ use { clap::{crate_description, crate_name, Arg, Command}, solana_measure::measure::Measure, solana_perf::perf_libs, + solana_rayon_threadlimit::get_max_thread_count, solana_sdk::hash::hash, }; @@ -73,6 +74,14 @@ fn main() { let start_hash = hash(&[1, 2, 3, 4]); let ticks = create_ticks(max_num_entries, hashes_per_tick, start_hash); let mut num_entries = start_num_entries as usize; + let num_threads = matches + .value_of_t("num_threads") + .unwrap_or(get_max_thread_count()); + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("solPohBench{i:02}")) + .build() + .expect("new rayon threadpool"); if matches.is_present("cuda") { perf_libs::init_cuda(); } @@ -81,8 +90,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_generic(&start_hash) - .finish_verify()); + .verify_cpu_generic(&start_hash, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -100,8 +109,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 8) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 8, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -115,8 +124,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 16) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 16, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -132,8 +141,8 @@ fn main() { let recyclers = VerifyRecyclers::default(); for _ in 0..iterations { assert!(ticks[..num_entries] - .start_verify(&start_hash, recyclers.clone()) - .finish_verify()); + .start_verify(&start_hash, &thread_pool, recyclers.clone()) + .finish_verify(&thread_pool)); } time.stop(); println!( diff --git a/poh/benches/poh_verify.rs b/poh/benches/poh_verify.rs index 47f31860c38d9c..cd33cdae43ef8d 100644 --- a/poh/benches/poh_verify.rs +++ b/poh/benches/poh_verify.rs @@ -2,7 +2,7 @@ extern crate test; use { - solana_entry::entry::{next_entry_mut, Entry, EntrySlice}, + solana_entry::entry::{self, next_entry_mut, Entry, EntrySlice}, solana_sdk::{ hash::{hash, Hash}, signature::{Keypair, Signer}, @@ -17,6 +17,8 @@ const NUM_ENTRIES: usize = 800; #[bench] fn bench_poh_verify_ticks(bencher: &mut Bencher) { solana_logger::setup(); + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -27,12 +29,14 @@ fn bench_poh_verify_ticks(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) } #[bench] fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -47,6 +51,6 @@ fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) } From 75d5c5d05c7c824d8cd349dbc6d7c3c0ff7d6427 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 26 Mar 2024 22:06:21 -0500 Subject: [PATCH 4/5] Rename entry_thread_pool_for_tests --> thread_pool_for_tests --- core/src/banking_stage.rs | 6 +++--- entry/src/entry.rs | 14 +++++++------- local-cluster/src/cluster_tests.rs | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 134ad09786d670..7be8af1373ccbe 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -786,7 +786,7 @@ mod tests { crate::banking_trace::{BankingPacketBatch, BankingTracer}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, - solana_entry::entry::{entry_thread_pool_for_tests, Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::cluster_info::Node, solana_ledger::{ blockstore::Blockstore, @@ -941,7 +941,7 @@ mod tests { .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); - assert!(entries.verify(&start_hash, &entry_thread_pool_for_tests())); + assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests())); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } @@ -1060,7 +1060,7 @@ mod tests { .map(|(_bank, (entry, _tick_height))| entry) .collect(); - assert!(entries.verify(&blockhash, &entry_thread_pool_for_tests())); + assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests())); if !entries.is_empty() { blockhash = entries.last().unwrap().hash; for entry in entries { diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 3883462de672ac..d317dd354fc86e 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -954,7 +954,7 @@ pub fn next_versioned_entry( } } -pub fn entry_thread_pool_for_tests() -> ThreadPool { +pub fn thread_pool_for_tests() -> ThreadPool { rayon::ThreadPoolBuilder::new() .num_threads(4) .thread_name(|i| format!("solEntryTest{i:02}")) @@ -1061,7 +1061,7 @@ mod tests { #[test] fn test_entry_gpu_verify() { - let thread_pool = entry_thread_pool_for_tests(); + let thread_pool = thread_pool_for_tests(); let verify_transaction = { move |versioned_tx: VersionedTransaction, @@ -1139,7 +1139,7 @@ mod tests { #[test] fn test_transaction_signing() { - let thread_pool = entry_thread_pool_for_tests(); + let thread_pool = thread_pool_for_tests(); use solana_sdk::signature::Signature; let zero = Hash::default(); @@ -1203,7 +1203,7 @@ mod tests { #[test] fn test_verify_slice1() { solana_logger::setup(); - let thread_pool = entry_thread_pool_for_tests(); + let thread_pool = thread_pool_for_tests(); let zero = Hash::default(); let one = hash(zero.as_ref()); @@ -1225,7 +1225,7 @@ mod tests { #[test] fn test_verify_slice_with_hashes1() { solana_logger::setup(); - let thread_pool = entry_thread_pool_for_tests(); + let thread_pool = thread_pool_for_tests(); let zero = Hash::default(); let one = hash(zero.as_ref()); @@ -1252,7 +1252,7 @@ mod tests { #[test] fn test_verify_slice_with_hashes_and_transactions() { solana_logger::setup(); - let thread_pool = entry_thread_pool_for_tests(); + let thread_pool = thread_pool_for_tests(); let zero = Hash::default(); let one = hash(zero.as_ref()); @@ -1421,7 +1421,7 @@ mod tests { info!("done.. {}", time); let mut time = Measure::start("poh"); - let res = entries.verify(&Hash::default(), &entry_thread_pool_for_tests()); + let res = entries.verify(&Hash::default(), &thread_pool_for_tests()); assert_eq!(res, !modified); time.stop(); info!("{} {}", time, res); diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 1bea43aca32f64..aa318f9df16f34 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -180,7 +180,7 @@ pub fn send_many_transactions( pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { let ledger = Blockstore::open(ledger_path).unwrap(); - let thread_pool = entry::entry_thread_pool_for_tests(); + let thread_pool = entry::thread_pool_for_tests(); let zeroth_slot = ledger.get_slot_entries(0, 0).unwrap(); let last_id = zeroth_slot.last().unwrap().hash; From d3df7da835250a4d14a7f2d57d7fe1a32a88e9b3 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 26 Mar 2024 22:15:02 -0500 Subject: [PATCH 5/5] Comment to explain thread count in test pool --- entry/src/entry.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index d317dd354fc86e..7497f96d65980f 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -955,6 +955,10 @@ pub fn next_versioned_entry( } pub fn thread_pool_for_tests() -> ThreadPool { + // Allocate fewer threads for unit tests + // Unit tests typically aren't creating massive blocks to verify, and + // multiple tests could be running in parallel so any further parallelism + // will do more harm than good rayon::ThreadPoolBuilder::new() .num_threads(4) .thread_name(|i| format!("solEntryTest{i:02}"))