Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
accounts: parallel load across txs via core-pinned threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-chuang committed Jun 7, 2021
1 parent 18ec675 commit 8568d5d
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 6 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions programs/bpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bv = { version = "0.11.1", features = ["serde"] }
byteorder = "1.3.4"
bzip2 = "0.3.3"
dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] }
core_affinity = "0.5.10"
crossbeam-channel = "0.5"
dir-diff = "0.3.2"
flate2 = "1.0.14"
Expand All @@ -33,6 +34,7 @@ num_cpus = "1.13.0"
ouroboros = "0.5.1"
rand = "0.7.0"
rayon = "1.5.0"
rayon-core = "1.9.0"
regex = "1.3.9"
serde = { version = "1.0.126", features = ["rc"] }
serde_derive = "1.0.103"
Expand Down
83 changes: 83 additions & 0 deletions runtime/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
TransactionExecutionResult,
},
blockhash_queue::BlockhashQueue,
hashed_transaction::HashedTransaction,
rent_collector::RentCollector,
system_instruction_processor::{get_system_account_kind, SystemAccountKind},
};
Expand All @@ -18,6 +19,7 @@ use dashmap::{
};
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount, WritableAccount},
account_utils::StateMut,
Expand Down Expand Up @@ -467,6 +469,87 @@ impl Accounts {
.collect()
}

pub fn load_accounts_parallel<'a>(
&self,
ancestors: &Ancestors,
txs: &'a [HashedTransaction<'a>],
lock_results: Vec<TransactionCheckResult>,
hash_queue: &BlockhashQueue,
error_counters: &mut ErrorCounters,
rent_collector: &RentCollector,
feature_set: &FeatureSet,
) -> Vec<TransactionLoadResult> {
let fee_config = FeeConfig {
secp256k1_program_enabled: feature_set
.is_active(&feature_set::secp256k1_program_enabled::id()),
};
let mut error_counters_list = vec![ErrorCounters::default(); lock_results.len()];

let res = self.accounts_db.pinned_thread_pool.install(|| {
txs.par_iter()
.map(|h| {
let res: &'a Transaction = h.into();
res
})
.zip(lock_results)
.zip(error_counters_list.par_iter_mut())
.map(|etx| match etx {
((tx, (Ok(()), nonce_rollback)), error_counters) => {
let fee_calculator = nonce_rollback
.as_ref()
.map(|nonce_rollback| nonce_rollback.fee_calculator())
.unwrap_or_else(|| {
hash_queue
.get_fee_calculator(&tx.message().recent_blockhash)
.cloned()
});
let fee = if let Some(fee_calculator) = fee_calculator {
fee_calculator.calculate_fee_with_config(tx.message(), &fee_config)
} else {
return (Err(TransactionError::BlockhashNotFound), None);
};

let loaded_transaction = match self.load_transaction(
ancestors,
tx,
fee,
error_counters,
rent_collector,
feature_set,
) {
Ok(loaded_transaction) => loaded_transaction,
Err(e) => return (Err(e), None),
};

// Update nonce_rollback with fee-subtracted accounts
let nonce_rollback = if let Some(nonce_rollback) = nonce_rollback {
match NonceRollbackFull::from_partial(
nonce_rollback,
tx.message(),
&loaded_transaction.accounts,
) {
Ok(nonce_rollback) => Some(nonce_rollback),
Err(e) => return (Err(e), None),
}
} else {
None
};

(Ok(loaded_transaction), nonce_rollback)
}
((_, (Err(e), _nonce_rollback)), _) => (Err(e), None),
})
.collect()
});
*error_counters += error_counters_list
.iter()
.fold(ErrorCounters::default(), |mut x, y| {
x += *y;
x
});
res
}

fn filter_zero_lamport_account(
account: AccountSharedData,
slot: Slot,
Expand Down
29 changes: 28 additions & 1 deletion runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
ancestors::Ancestors,
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
contains::Contains,
pinned_threads::pinned_spawn_handler_frac,
pubkey_bins::PubkeyBinCalculator16,
read_only_accounts_cache::ReadOnlyAccountsCache,
sorted_storages::SortedStorages,
Expand Down Expand Up @@ -119,7 +120,7 @@ pub enum ScanStorageResult<R, B> {
Stored(B),
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ErrorCounters {
pub total: usize,
pub account_in_use: usize,
Expand All @@ -137,6 +138,29 @@ pub struct ErrorCounters {
pub not_allowed_during_cluster_maintenance: usize,
}

impl std::ops::AddAssign for ErrorCounters {
fn add_assign(&mut self, other: Self) {
*self = ErrorCounters {
total: self.total + other.total,
account_in_use: self.account_in_use + other.account_in_use,
account_loaded_twice: self.account_loaded_twice + other.account_loaded_twice,
account_not_found: self.account_not_found + other.account_not_found,
blockhash_not_found: self.blockhash_not_found + other.blockhash_not_found,
blockhash_too_old: self.blockhash_too_old + other.blockhash_too_old,
call_chain_too_deep: self.call_chain_too_deep + other.call_chain_too_deep,
already_processed: self.already_processed + other.already_processed,
instruction_error: self.instruction_error + other.instruction_error,
insufficient_funds: self.insufficient_funds + other.insufficient_funds,
invalid_account_for_fee: self.invalid_account_for_fee + other.invalid_account_for_fee,
invalid_account_index: self.invalid_account_index + other.invalid_account_index,
invalid_program_for_execution: self.invalid_program_for_execution
+ other.invalid_program_for_execution,
not_allowed_during_cluster_maintenance: self.not_allowed_during_cluster_maintenance
+ other.not_allowed_during_cluster_maintenance,
}
}
}

#[derive(Default, Debug, PartialEq, Clone)]
pub struct AccountInfo {
/// index identifying the append storage
Expand Down Expand Up @@ -810,6 +834,8 @@ pub struct AccountsDb {

pub thread_pool_clean: ThreadPool,

pub pinned_thread_pool: ThreadPool,

/// Number of append vecs to create to maximize parallelism when scanning
/// the accounts
min_num_stores: usize,
Expand Down Expand Up @@ -1275,6 +1301,7 @@ impl Default for AccountsDb {
.build()
.unwrap(),
thread_pool_clean: make_min_priority_thread_pool(),
pinned_thread_pool: pinned_spawn_handler_frac(1, 4),
min_num_stores: num_threads,
bank_hashes: RwLock::new(bank_hashes),
frozen_accounts: HashMap::new(),
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3074,9 +3074,9 @@ impl Bank {
check_time.stop();

let mut load_time = Measure::start("accounts_load");
let mut loaded_accounts = self.rc.accounts.load_accounts(
let mut loaded_accounts = self.rc.accounts.load_accounts_parallel(
&self.ancestors,
hashed_txs.as_transactions_iter(),
hashed_txs,
check_results,
&self.blockhash_queue.read().unwrap(),
&mut error_counters,
Expand Down
8 changes: 7 additions & 1 deletion runtime/src/hashed_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ impl<'a> From<&'a Transaction> for HashedTransaction<'a> {
}
}

impl<'a> Into<&'a Transaction> for &'a HashedTransaction<'a> {
fn into(self) -> &'a Transaction {
self.transaction.as_ref()
}
}

pub trait HashedTransactionSlice<'a> {
fn as_transactions_iter(&'a self) -> Box<dyn Iterator<Item = &'a Transaction> + '_>;
}

impl<'a> HashedTransactionSlice<'a> for [HashedTransaction<'a>] {
fn as_transactions_iter(&'a self) -> Box<dyn Iterator<Item = &'a Transaction> + '_> {
Box::new(self.iter().map(|h| h.transaction.as_ref()))
Box::new(self.iter().map(|h| h.into()))
}
}
1 change: 1 addition & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod log_collector;
pub mod message_processor;
mod native_loader;
pub mod non_circulating_supply;
mod pinned_threads;
mod pubkey_bins;
mod read_only_accounts_cache;
pub mod rent_collector;
Expand Down
71 changes: 71 additions & 0 deletions runtime/src/pinned_threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use core_affinity;
use rayon::ThreadPool;
use rayon_core::{ThreadBuilder, ThreadPoolBuilder};
use std::{io, thread};

const NUM_THREADS_PER_CORE: usize = 8;

#[derive(Debug, Default)]
pub struct PinnedSpawn {
cores: Vec<core_affinity::CoreId>,
len: usize,
core_id_pointer: usize,
}

impl PinnedSpawn {
// pub fn new(num_cores: usize) -> Self {
// let core_ids = core_affinity::get_core_ids().unwrap();
// if num_cores > core_ids.len() {
// panic!("More cores requested than available");
// }
// Self {
// cores: core_ids.into_iter().rev().take(num_cores).collect(),
// len: num_cores,
// core_id_pointer: 0,
// }
// }

// Pins as many threads as the ceil of the fraction times the total number of cores
// This ensures that at least 1 core would be pinned
pub fn new_frac_of_cores(num: usize, denom: usize) -> Self {
if num > denom {
panic!("fraction must be <= 1");
}
let core_ids = core_affinity::get_core_ids().unwrap();
let num_cores = (num * core_ids.len() - 1) / denom + 1;
Self {
cores: core_ids.into_iter().rev().take(num_cores).collect(),
len: num_cores,
core_id_pointer: 0,
}
}

// Spawn threads pinned to core in a round robin fashion
pub fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
let mut b = thread::Builder::new();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
let id_for_spawn = self.cores[self.core_id_pointer];
b.spawn(move || {
core_affinity::set_for_current(id_for_spawn);
thread.run()
})?;
self.core_id_pointer += 1;
self.core_id_pointer %= self.len;
Ok(())
}
}

pub fn pinned_spawn_handler_frac(num: usize, denom: usize) -> ThreadPool {
let mut spawner = PinnedSpawn::new_frac_of_cores(num, denom);
ThreadPoolBuilder::new()
.thread_name(|i| format!("pinned-thread-for-parallel-load-{}", i))
.num_threads(spawner.len * NUM_THREADS_PER_CORE)
.spawn_handler(|thread| spawner.spawn(thread))
.build()
.unwrap()
}

0 comments on commit 8568d5d

Please sign in to comment.