From 8568d5d09cc4a82573445a9d2f54e13cd5eb9d66 Mon Sep 17 00:00:00 2001 From: jonch <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 7 Jun 2021 13:32:56 +0800 Subject: [PATCH] accounts: parallel load across txs via core-pinned threadpool --- Cargo.lock | 6 ++- programs/bpf/Cargo.lock | 14 ++++++ runtime/Cargo.toml | 2 + runtime/src/accounts.rs | 83 +++++++++++++++++++++++++++++++ runtime/src/accounts_db.rs | 29 ++++++++++- runtime/src/bank.rs | 4 +- runtime/src/hashed_transaction.rs | 8 ++- runtime/src/lib.rs | 1 + runtime/src/pinned_threads.rs | 71 ++++++++++++++++++++++++++ 9 files changed, 212 insertions(+), 6 deletions(-) create mode 100644 runtime/src/pinned_threads.rs diff --git a/Cargo.lock b/Cargo.lock index b247b428ad0a7e..49cae123fce6c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3416,9 +3416,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" dependencies = [ "crossbeam-channel", "crossbeam-deque 0.8.0", @@ -5308,6 +5308,7 @@ dependencies = [ "bv", "byteorder", "bzip2", + "core_affinity", "crossbeam-channel", "dashmap", "dir-diff", @@ -5325,6 +5326,7 @@ dependencies = [ "ouroboros", "rand 0.7.3", "rayon", + "rayon-core", "regex", "rustc_version", "serde", diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index b37c9adfa6ef7c..7cbed871a55f26 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -491,6 +491,18 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +[[package]] +name = "core_affinity" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" +dependencies = [ + "kernel32-sys", + "libc", + "num_cpus", + "winapi 0.2.8", +] + [[package]] name = "cpufeatures" version = "0.1.4" @@ -3449,6 +3461,7 @@ dependencies = [ "bv", "byteorder 1.3.4", "bzip2", + "core_affinity", "crossbeam-channel", "dashmap", "dir-diff", @@ -3466,6 +3479,7 @@ dependencies = [ "ouroboros", "rand 0.7.3", "rayon", + "rayon-core", "regex", "rustc_version", "serde", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 75cc9b4cbf9f6d..27499a643f8129 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -17,6 +17,7 @@ bv = { version = "0.11.1", features = ["serde"] } byteorder = "1.3.4" bzip2 = "0.3.3" dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] } +core_affinity = "0.5.10" crossbeam-channel = "0.5" dir-diff = "0.3.2" flate2 = "1.0.14" @@ -33,6 +34,7 @@ num_cpus = "1.13.0" ouroboros = "0.5.1" rand = "0.7.0" rayon = "1.5.0" +rayon-core = "1.9.0" regex = "1.3.9" serde = { version = "1.0.126", features = ["rc"] } serde_derive = "1.0.103" diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index b61633429db846..cb73588cdf725a 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -9,6 +9,7 @@ use crate::{ TransactionExecutionResult, }, blockhash_queue::BlockhashQueue, + hashed_transaction::HashedTransaction, rent_collector::RentCollector, system_instruction_processor::{get_system_account_kind, SystemAccountKind}, }; @@ -18,6 +19,7 @@ use dashmap::{ }; use log::*; use rand::{thread_rng, Rng}; +use rayon::prelude::*; use solana_sdk::{ account::{Account, AccountSharedData, ReadableAccount, WritableAccount}, account_utils::StateMut, @@ -467,6 +469,87 @@ impl Accounts { .collect() } + pub fn load_accounts_parallel<'a>( + &self, + ancestors: &Ancestors, + txs: &'a [HashedTransaction<'a>], + lock_results: Vec, + hash_queue: &BlockhashQueue, + error_counters: &mut ErrorCounters, + rent_collector: &RentCollector, + feature_set: &FeatureSet, + ) -> Vec { + let fee_config = FeeConfig { + secp256k1_program_enabled: feature_set + .is_active(&feature_set::secp256k1_program_enabled::id()), + }; + let mut error_counters_list = vec![ErrorCounters::default(); lock_results.len()]; + + let res = self.accounts_db.pinned_thread_pool.install(|| { + txs.par_iter() + .map(|h| { + let res: &'a Transaction = h.into(); + res + }) + .zip(lock_results) + .zip(error_counters_list.par_iter_mut()) + .map(|etx| match etx { + ((tx, (Ok(()), nonce_rollback)), error_counters) => { + let fee_calculator = nonce_rollback + .as_ref() + .map(|nonce_rollback| nonce_rollback.fee_calculator()) + .unwrap_or_else(|| { + hash_queue + .get_fee_calculator(&tx.message().recent_blockhash) + .cloned() + }); + let fee = if let Some(fee_calculator) = fee_calculator { + fee_calculator.calculate_fee_with_config(tx.message(), &fee_config) + } else { + return (Err(TransactionError::BlockhashNotFound), None); + }; + + let loaded_transaction = match self.load_transaction( + ancestors, + tx, + fee, + error_counters, + rent_collector, + feature_set, + ) { + Ok(loaded_transaction) => loaded_transaction, + Err(e) => return (Err(e), None), + }; + + // Update nonce_rollback with fee-subtracted accounts + let nonce_rollback = if let Some(nonce_rollback) = nonce_rollback { + match NonceRollbackFull::from_partial( + nonce_rollback, + tx.message(), + &loaded_transaction.accounts, + ) { + Ok(nonce_rollback) => Some(nonce_rollback), + Err(e) => return (Err(e), None), + } + } else { + None + }; + + (Ok(loaded_transaction), nonce_rollback) + } + ((_, (Err(e), _nonce_rollback)), _) => (Err(e), None), + }) + .collect() + }); + *error_counters += error_counters_list + .iter() + .fold(ErrorCounters::default(), |mut x, y| { + x += *y; + x + }); + res + } + fn filter_zero_lamport_account( account: AccountSharedData, slot: Slot, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 11093705e65de7..bfc4ec58edcd04 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -29,6 +29,7 @@ use crate::{ ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, contains::Contains, + pinned_threads::pinned_spawn_handler_frac, pubkey_bins::PubkeyBinCalculator16, read_only_accounts_cache::ReadOnlyAccountsCache, sorted_storages::SortedStorages, @@ -119,7 +120,7 @@ pub enum ScanStorageResult { Stored(B), } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] pub struct ErrorCounters { pub total: usize, pub account_in_use: usize, @@ -137,6 +138,29 @@ pub struct ErrorCounters { pub not_allowed_during_cluster_maintenance: usize, } +impl std::ops::AddAssign for ErrorCounters { + fn add_assign(&mut self, other: Self) { + *self = ErrorCounters { + total: self.total + other.total, + account_in_use: self.account_in_use + other.account_in_use, + account_loaded_twice: self.account_loaded_twice + other.account_loaded_twice, + account_not_found: self.account_not_found + other.account_not_found, + blockhash_not_found: self.blockhash_not_found + other.blockhash_not_found, + blockhash_too_old: self.blockhash_too_old + other.blockhash_too_old, + call_chain_too_deep: self.call_chain_too_deep + other.call_chain_too_deep, + already_processed: self.already_processed + other.already_processed, + instruction_error: self.instruction_error + other.instruction_error, + insufficient_funds: self.insufficient_funds + other.insufficient_funds, + invalid_account_for_fee: self.invalid_account_for_fee + other.invalid_account_for_fee, + invalid_account_index: self.invalid_account_index + other.invalid_account_index, + invalid_program_for_execution: self.invalid_program_for_execution + + other.invalid_program_for_execution, + not_allowed_during_cluster_maintenance: self.not_allowed_during_cluster_maintenance + + other.not_allowed_during_cluster_maintenance, + } + } +} + #[derive(Default, Debug, PartialEq, Clone)] pub struct AccountInfo { /// index identifying the append storage @@ -810,6 +834,8 @@ pub struct AccountsDb { pub thread_pool_clean: ThreadPool, + pub pinned_thread_pool: ThreadPool, + /// Number of append vecs to create to maximize parallelism when scanning /// the accounts min_num_stores: usize, @@ -1275,6 +1301,7 @@ impl Default for AccountsDb { .build() .unwrap(), thread_pool_clean: make_min_priority_thread_pool(), + pinned_thread_pool: pinned_spawn_handler_frac(1, 4), min_num_stores: num_threads, bank_hashes: RwLock::new(bank_hashes), frozen_accounts: HashMap::new(), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e59d61926d3890..604acb34389941 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3074,9 +3074,9 @@ impl Bank { check_time.stop(); let mut load_time = Measure::start("accounts_load"); - let mut loaded_accounts = self.rc.accounts.load_accounts( + let mut loaded_accounts = self.rc.accounts.load_accounts_parallel( &self.ancestors, - hashed_txs.as_transactions_iter(), + hashed_txs, check_results, &self.blockhash_queue.read().unwrap(), &mut error_counters, diff --git a/runtime/src/hashed_transaction.rs b/runtime/src/hashed_transaction.rs index 3f3b35f918982f..3de90edca94959 100644 --- a/runtime/src/hashed_transaction.rs +++ b/runtime/src/hashed_transaction.rs @@ -39,12 +39,18 @@ impl<'a> From<&'a Transaction> for HashedTransaction<'a> { } } +impl<'a> Into<&'a Transaction> for &'a HashedTransaction<'a> { + fn into(self) -> &'a Transaction { + self.transaction.as_ref() + } +} + pub trait HashedTransactionSlice<'a> { fn as_transactions_iter(&'a self) -> Box + '_>; } impl<'a> HashedTransactionSlice<'a> for [HashedTransaction<'a>] { fn as_transactions_iter(&'a self) -> Box + '_> { - Box::new(self.iter().map(|h| h.transaction.as_ref())) + Box::new(self.iter().map(|h| h.into())) } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4f61834e339e3d..08a6fb4ce3a02b 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -28,6 +28,7 @@ pub mod log_collector; pub mod message_processor; mod native_loader; pub mod non_circulating_supply; +mod pinned_threads; mod pubkey_bins; mod read_only_accounts_cache; pub mod rent_collector; diff --git a/runtime/src/pinned_threads.rs b/runtime/src/pinned_threads.rs new file mode 100644 index 00000000000000..db910be96209a9 --- /dev/null +++ b/runtime/src/pinned_threads.rs @@ -0,0 +1,71 @@ +use core_affinity; +use rayon::ThreadPool; +use rayon_core::{ThreadBuilder, ThreadPoolBuilder}; +use std::{io, thread}; + +const NUM_THREADS_PER_CORE: usize = 8; + +#[derive(Debug, Default)] +pub struct PinnedSpawn { + cores: Vec, + len: usize, + core_id_pointer: usize, +} + +impl PinnedSpawn { + // pub fn new(num_cores: usize) -> Self { + // let core_ids = core_affinity::get_core_ids().unwrap(); + // if num_cores > core_ids.len() { + // panic!("More cores requested than available"); + // } + // Self { + // cores: core_ids.into_iter().rev().take(num_cores).collect(), + // len: num_cores, + // core_id_pointer: 0, + // } + // } + + // Pins as many threads as the ceil of the fraction times the total number of cores + // This ensures that at least 1 core would be pinned + pub fn new_frac_of_cores(num: usize, denom: usize) -> Self { + if num > denom { + panic!("fraction must be <= 1"); + } + let core_ids = core_affinity::get_core_ids().unwrap(); + let num_cores = (num * core_ids.len() - 1) / denom + 1; + Self { + cores: core_ids.into_iter().rev().take(num_cores).collect(), + len: num_cores, + core_id_pointer: 0, + } + } + + // Spawn threads pinned to core in a round robin fashion + pub fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { + let mut b = thread::Builder::new(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + let id_for_spawn = self.cores[self.core_id_pointer]; + b.spawn(move || { + core_affinity::set_for_current(id_for_spawn); + thread.run() + })?; + self.core_id_pointer += 1; + self.core_id_pointer %= self.len; + Ok(()) + } +} + +pub fn pinned_spawn_handler_frac(num: usize, denom: usize) -> ThreadPool { + let mut spawner = PinnedSpawn::new_frac_of_cores(num, denom); + ThreadPoolBuilder::new() + .thread_name(|i| format!("pinned-thread-for-parallel-load-{}", i)) + .num_threads(spawner.len * NUM_THREADS_PER_CORE) + .spawn_handler(|thread| spawner.spawn(thread)) + .build() + .unwrap() +}