Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Use parallel iterator for executing transactions in an entry #22717

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use {
genesis_config::{ClusterType, GenesisConfig},
hash::Hash,
pubkey::Pubkey,
saturating_add_assign,
timing::AtomicInterval,
},
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
Expand Down Expand Up @@ -217,6 +218,39 @@ pub struct ErrorCounters {
pub invalid_rent_paying_account: usize,
}

impl ErrorCounters {
pub fn accumulate(&mut self, other: &ErrorCounters) {
saturating_add_assign!(self.total, other.total);
saturating_add_assign!(self.account_in_use, other.account_in_use);
saturating_add_assign!(self.account_loaded_twice, other.account_loaded_twice);
saturating_add_assign!(self.account_not_found, other.account_not_found);
saturating_add_assign!(self.blockhash_not_found, other.blockhash_not_found);
saturating_add_assign!(self.blockhash_too_old, other.blockhash_too_old);
saturating_add_assign!(self.call_chain_too_deep, other.call_chain_too_deep);
saturating_add_assign!(self.already_processed, other.already_processed);
saturating_add_assign!(self.instruction_error, other.instruction_error);
saturating_add_assign!(self.insufficient_funds, other.insufficient_funds);
saturating_add_assign!(self.invalid_account_for_fee, other.invalid_account_for_fee);
saturating_add_assign!(self.invalid_account_index, other.invalid_account_index);
saturating_add_assign!(
self.invalid_program_for_execution,
other.invalid_program_for_execution
);
saturating_add_assign!(
self.not_allowed_during_cluster_maintenance,
other.not_allowed_during_cluster_maintenance
);
saturating_add_assign!(
self.invalid_writable_account,
other.invalid_writable_account
);
saturating_add_assign!(
self.invalid_rent_paying_account,
other.invalid_rent_paying_account
);
}
}

#[derive(Debug, Default, Clone, Copy)]
pub struct IndexGenerationInfo {
pub accounts_data_len: u64,
Expand Down
184 changes: 133 additions & 51 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ use {
log::*,
rand::Rng,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
iter::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelIterator,
},
ThreadPool, ThreadPoolBuilder,
},
solana_measure::measure::Measure,
Expand All @@ -83,6 +86,7 @@ use {
sysvar_cache::SysvarCache,
timings::ExecuteTimings,
},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::{
account::{
create_account_shared_data_with_fields as create_account, from_account, Account,
Expand Down Expand Up @@ -160,10 +164,23 @@ use {
mod sysvar_cache;
mod transaction_account_state_info;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("bank_tx_{}", ix))
.build()
.unwrap()));

pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;

pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5;

pub struct ExecutionResultWithTimingAndError {
result: TransactionExecutionResult,
timings: ExecuteTimings,
error_counters: ErrorCounters,
signature_count: u64,
}

#[derive(Clone, Debug, PartialEq)]
pub struct RentDebit {
rent_collected: u64,
Expand Down Expand Up @@ -3727,6 +3744,77 @@ impl Bank {
})
}

pub fn execute_transaction(
&self,
accs: &mut TransactionLoadResult,
tx: &SanitizedTransaction,
enable_cpi_recording: bool,
enable_log_recording: bool,
) -> ExecutionResultWithTimingAndError {
match accs {
(Err(e), _nonce) => ExecutionResultWithTimingAndError {
result: TransactionExecutionResult::NotExecuted(e.clone()),
timings: ExecuteTimings::default(),
error_counters: ErrorCounters::default(),
signature_count: 0,
},
(Ok(loaded_transaction), nonce) => {
let mut feature_set_clone_time = Measure::start("feature_set_clone");
let feature_set = self.feature_set.clone();
feature_set_clone_time.stop();
let mut timings = ExecuteTimings::default();
let mut error_counters = ErrorCounters::default();
saturating_add_assign!(
timings.execute_accessories.feature_set_clone_us,
feature_set_clone_time.as_us()
);

let signature_count = u64::from(tx.message().header().num_required_signatures);

let mut compute_budget = self.compute_budget.unwrap_or_else(ComputeBudget::new);
if feature_set.is_active(&tx_wide_compute_cap::id()) {
let mut compute_budget_process_transaction_time =
Measure::start("compute_budget_process_transaction_time");
let process_transaction_result =
compute_budget.process_transaction(tx, feature_set);
compute_budget_process_transaction_time.stop();
saturating_add_assign!(
timings
.execute_accessories
.compute_budget_process_transaction_us,
compute_budget_process_transaction_time.as_us()
);
if let Err(err) = process_transaction_result {
return ExecutionResultWithTimingAndError {
result: TransactionExecutionResult::NotExecuted(err),
timings: ExecuteTimings::default(),
error_counters: ErrorCounters::default(),
signature_count,
};
}
}

let durable_nonce_fee = nonce.as_ref().map(DurableNonceFee::from);

ExecutionResultWithTimingAndError {
result: self.execute_loaded_transaction(
tx,
loaded_transaction,
compute_budget,
durable_nonce_fee,
enable_cpi_recording,
enable_log_recording,
&mut timings,
&mut error_counters,
),
timings,
error_counters,
signature_count,
}
}
}
}

#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
Expand Down Expand Up @@ -3787,60 +3875,54 @@ impl Bank {
load_time.stop();

let mut execution_time = Measure::start("execution_time");
let mut signature_count: u64 = 0;

let execution_results: Vec<TransactionExecutionResult> = loaded_txs
.iter_mut()
.zip(sanitized_txs.iter())
.map(|(accs, tx)| match accs {
(Err(e), _nonce) => TransactionExecutionResult::NotExecuted(e.clone()),
(Ok(loaded_transaction), nonce) => {
let mut feature_set_clone_time = Measure::start("feature_set_clone");
let feature_set = self.feature_set.clone();
feature_set_clone_time.stop();
saturating_add_assign!(
timings.execute_accessories.feature_set_clone_us,
feature_set_clone_time.as_us()
);

signature_count += u64::from(tx.message().header().num_required_signatures);

let mut compute_budget = self.compute_budget.unwrap_or_else(ComputeBudget::new);
if feature_set.is_active(&tx_wide_compute_cap::id()) {
let mut compute_budget_process_transaction_time =
Measure::start("compute_budget_process_transaction_time");
let process_transaction_result =
compute_budget.process_transaction(tx, feature_set);
compute_budget_process_transaction_time.stop();
saturating_add_assign!(
timings
.execute_accessories
.compute_budget_process_transaction_us,
compute_budget_process_transaction_time.as_us()
);
if let Err(err) = process_transaction_result {
return TransactionExecutionResult::NotExecuted(err);
}
}

let durable_nonce_fee = nonce.as_ref().map(DurableNonceFee::from);

self.execute_loaded_transaction(
tx,
loaded_transaction,
compute_budget,
durable_nonce_fee,
enable_cpi_recording,
enable_log_recording,
timings,
&mut error_counters,
)
}
})
.collect();
let execution_results_with_timing_and_error: Vec<ExecutionResultWithTimingAndError> =
if loaded_txs.len() >= get_thread_count() {
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
loaded_txs
.par_iter_mut()
.zip(sanitized_txs.par_iter())
.map(|(accs, tx)| {
self.execute_transaction(
accs,
tx,
enable_cpi_recording,
enable_log_recording,
)
})
.collect()
})
})
} else {
loaded_txs
.iter_mut()
.zip(sanitized_txs.iter())
.map(|(accs, tx)| {
self.execute_transaction(
accs,
tx,
enable_cpi_recording,
enable_log_recording,
)
})
.collect()
};

execution_time.stop();

let mut signature_count: u64 = 0;
let execution_results: Vec<TransactionExecutionResult> =
execution_results_with_timing_and_error
.into_iter()
.map(|val| {
saturating_add_assign!(signature_count, val.signature_count);
timings.accumulate(&val.timings);
error_counters.accumulate(&val.error_counters);
val.result
})
.collect();

debug!(
"check: {}us load: {}us execute: {}us txs_len={}",
check_time.as_us(),
Expand Down