From f8630a352256e4d31b5b624cb40565f5cfe2e13b Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+tao-stones@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:49:22 -0400 Subject: [PATCH] Refactor cost tracking (#1954) * Refactor and additional metrics for cost tracking (#1888) * Refactor and add metrics: - Combine remove_* and update_* functions to reduce locking on cost-tracker and iteration. - Add method to calculate executed transaction cost by directly using actual execution cost and loaded accounts size; - Wireup histogram to report loaded accounts size; - Report time of block limits checking; - Move account counters from ExecuteDetailsTimings to ExecuteAccountsDetails; * Move committed transactions adjustment into its own function * remove histogram for loaded accounts size due to performance impact --- core/benches/consumer.rs | 36 +---- core/src/banking_stage/consumer.rs | 39 +----- core/src/banking_stage/qos_service.rs | 126 ++++++------------ cost-model/src/cost_model.rs | 45 +++++++ ledger/src/blockstore_processor.rs | 185 +++++++++++++++++++++----- program-runtime/src/timings.rs | 6 + 6 files changed, 256 insertions(+), 181 deletions(-) diff --git a/core/benches/consumer.rs b/core/benches/consumer.rs index 14010e8d91a875..6dd9eb5b8bf0fa 100644 --- a/core/benches/consumer.rs +++ b/core/benches/consumer.rs @@ -22,7 +22,6 @@ use { solana_runtime::bank::Bank, solana_sdk::{ account::{Account, ReadableAccount}, - feature_set::apply_cost_tracker_during_replay, signature::Keypair, signer::Signer, stake_history::Epoch, @@ -97,7 +96,7 @@ struct BenchFrame { signal_receiver: Receiver<(Arc, (Entry, u64))>, } -fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { +fn setup() -> BenchFrame { let mint_total = u64::MAX; let GenesisConfigInfo { mut genesis_config, .. @@ -109,10 +108,6 @@ fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { let mut bank = Bank::new_for_benches(&genesis_config); - if !apply_cost_tracker_during_replay { - bank.deactivate_feature(&apply_cost_tracker_during_replay::id()); - } - // Allow arbitrary transaction processing time for the purposes of this bench bank.ns_per_slot = u128::MAX; @@ -139,11 +134,7 @@ fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { } } -fn bench_process_and_record_transactions( - bencher: &mut Bencher, - batch_size: usize, - apply_cost_tracker_during_replay: bool, -) { +fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) { const TRANSACTIONS_PER_ITERATION: usize = 64; assert_eq!( TRANSACTIONS_PER_ITERATION % batch_size, @@ -161,7 +152,7 @@ fn bench_process_and_record_transactions( poh_recorder, poh_service, signal_receiver: _signal_receiver, - } = setup(apply_cost_tracker_during_replay); + } = setup(); let consumer = create_consumer(&poh_recorder); let transactions = create_transactions(&bank, 2_usize.pow(20)); let mut transaction_iter = transactions.chunks(batch_size); @@ -186,30 +177,15 @@ fn bench_process_and_record_transactions( #[bench] fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 1, true); + bench_process_and_record_transactions(bencher, 1); } #[bench] fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 32, true); + bench_process_and_record_transactions(bencher, 32); } #[bench] fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 64, true); -} - -#[bench] -fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 1, false); -} - -#[bench] -fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 32, false); -} - -#[bench] -fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 64, false); + bench_process_and_record_transactions(bencher, 64); } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 377741e415e8ef..6ae0881da45d8a 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -518,27 +518,14 @@ impl Consumer { // Costs of all transactions are added to the cost_tracker before processing. // To ensure accurate tracking of compute units, transactions that ultimately - // were not included in the block should have their cost removed. - QosService::remove_costs( + // were not included in the block should have their cost removed, the rest + // should update with their actually consumed units. + QosService::remove_or_update_costs( transaction_qos_cost_results.iter(), commit_transactions_result.as_ref().ok(), bank, ); - // once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer - // adjust block with executed cost (a behavior more inline with bankless leader), it - // should use requested, or default `compute_unit_limit` as transaction's execution cost. - if !bank - .feature_set - .is_active(&feature_set::apply_cost_tracker_during_replay::id()) - { - QosService::update_costs( - transaction_qos_cost_results.iter(), - commit_transactions_result.as_ref().ok(), - bank, - ); - } - retryable_transaction_indexes .iter_mut() .for_each(|x| *x += chunk_offset); @@ -1432,16 +1419,6 @@ mod tests { #[test] fn test_bank_process_and_record_transactions_cost_tracker() { - for apply_cost_tracker_during_replay_enabled in [true, false] { - bank_process_and_record_transactions_cost_tracker( - apply_cost_tracker_during_replay_enabled, - ); - } - } - - fn bank_process_and_record_transactions_cost_tracker( - apply_cost_tracker_during_replay_enabled: bool, - ) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -1450,9 +1427,6 @@ mod tests { } = create_slow_genesis_config(10_000); let mut bank = Bank::new_for_tests(&genesis_config); bank.ns_per_slot = u128::MAX; - if !apply_cost_tracker_during_replay_enabled { - bank.deactivate_feature(&feature_set::apply_cost_tracker_during_replay::id()); - } let bank = bank.wrap_with_bank_forks_for_tests().0; let pubkey = solana_sdk::pubkey::new_rand(); @@ -1521,8 +1495,7 @@ mod tests { // TEST: it's expected that the allocation will execute but the transfer will not // because of a shared write-lock between mint_keypair. Ensure only the first transaction - // takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature - // is applied correctly + // takes compute units in the block let allocate_keypair = Keypair::new(); let transactions = sanitize_transactions(vec![ system_transaction::allocate( @@ -1561,7 +1534,7 @@ mod tests { ); assert_eq!(retryable_transaction_indexes, vec![1]); - let expected_block_cost = if !apply_cost_tracker_during_replay_enabled { + let expected_block_cost = { let (actual_programs_execution_cost, actual_loaded_accounts_data_size_cost) = match commit_transactions_result.first().unwrap() { CommitTransactionDetails::Committed { @@ -1587,8 +1560,6 @@ mod tests { } block_cost + cost.sum() - } else { - block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum() }; assert_eq!(get_block_cost(), expected_block_cost); diff --git a/core/src/banking_stage/qos_service.rs b/core/src/banking_stage/qos_service.rs index 23d4ebd97619bd..bf8b7df963e392 100644 --- a/core/src/banking_stage/qos_service.rs +++ b/core/src/banking_stage/qos_service.rs @@ -132,39 +132,28 @@ impl QosService { (select_results, num_included) } - /// Updates the transaction costs for committed transactions. Does not handle removing costs - /// for transactions that didn't get recorded or committed - pub fn update_costs<'a>( - transaction_cost_results: impl Iterator>, - transaction_committed_status: Option<&Vec>, - bank: &Bank, - ) { - if let Some(transaction_committed_status) = transaction_committed_status { - Self::update_committed_transaction_costs( - transaction_cost_results, - transaction_committed_status, - bank, - ) - } - } - - /// Removes transaction costs from the cost tracker if not committed or recorded - pub fn remove_costs<'a>( + /// Removes transaction costs from the cost tracker if not committed or recorded, or + /// updates the transaction costs for committed transactions. + pub fn remove_or_update_costs<'a>( transaction_cost_results: impl Iterator>, transaction_committed_status: Option<&Vec>, bank: &Bank, ) { match transaction_committed_status { - Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs( - transaction_cost_results, - transaction_committed_status, - bank, - ), - None => Self::remove_transaction_costs(transaction_cost_results, bank), + Some(transaction_committed_status) => { + Self::remove_or_update_recorded_transaction_costs( + transaction_cost_results, + transaction_committed_status, + bank, + ) + } + None => Self::remove_unrecorded_transaction_costs(transaction_cost_results, bank), } } - fn remove_uncommitted_transaction_costs<'a>( + /// For recorded transactions, remove units reserved by uncommitted transaction, or update + /// units for committed transactions. + fn remove_or_update_recorded_transaction_costs<'a>( transaction_cost_results: impl Iterator>, transaction_committed_status: &Vec, bank: &Bank, @@ -178,45 +167,31 @@ impl QosService { // checked for update if let Ok(tx_cost) = tx_cost { num_included += 1; - if *transaction_committed_details == CommitTransactionDetails::NotCommitted { - cost_tracker.remove(tx_cost) + match transaction_committed_details { + CommitTransactionDetails::Committed { + compute_units, + loaded_accounts_data_size, + } => { + cost_tracker.update_execution_cost( + tx_cost, + *compute_units, + CostModel::calculate_loaded_accounts_data_size_cost( + *loaded_accounts_data_size, + &bank.feature_set, + ), + ); + } + CommitTransactionDetails::NotCommitted => { + cost_tracker.remove(tx_cost); + } } } }); cost_tracker.sub_transactions_in_flight(num_included); } - fn update_committed_transaction_costs<'a>( - transaction_cost_results: impl Iterator>, - transaction_committed_status: &Vec, - bank: &Bank, - ) { - let mut cost_tracker = bank.write_cost_tracker().unwrap(); - transaction_cost_results - .zip(transaction_committed_status) - .for_each(|(estimated_tx_cost, transaction_committed_details)| { - // Only transactions that the qos service included have to be - // checked for update - if let Ok(estimated_tx_cost) = estimated_tx_cost { - if let CommitTransactionDetails::Committed { - compute_units, - loaded_accounts_data_size, - } = transaction_committed_details - { - cost_tracker.update_execution_cost( - estimated_tx_cost, - *compute_units, - CostModel::calculate_loaded_accounts_data_size_cost( - *loaded_accounts_data_size, - &bank.feature_set, - ), - ) - } - } - }); - } - - fn remove_transaction_costs<'a>( + /// Remove reserved units for transaction batch that unsuccessfully recorded. + fn remove_unrecorded_transaction_costs<'a>( transaction_cost_results: impl Iterator>, bank: &Bank, ) { @@ -784,18 +759,11 @@ mod tests { + (execute_units_adjustment + loaded_accounts_data_size_cost_adjustment) * transaction_count; - // All transactions are committed, no costs should be removed - QosService::remove_costs(qos_cost_results.iter(), Some(&committed_status), &bank); - assert_eq!( - total_txs_cost, - bank.read_cost_tracker().unwrap().block_cost() - ); - assert_eq!( - transaction_count, - bank.read_cost_tracker().unwrap().transaction_count() + QosService::remove_or_update_costs( + qos_cost_results.iter(), + Some(&committed_status), + &bank, ); - - QosService::update_costs(qos_cost_results.iter(), Some(&committed_status), &bank); assert_eq!( final_txs_cost, bank.read_cost_tracker().unwrap().block_cost() @@ -843,18 +811,7 @@ mod tests { bank.read_cost_tracker().unwrap().block_cost() ); - // update costs doesn't impact non-committed - QosService::update_costs(qos_cost_results.iter(), None, &bank); - assert_eq!( - total_txs_cost, - bank.read_cost_tracker().unwrap().block_cost() - ); - assert_eq!( - transaction_count, - bank.read_cost_tracker().unwrap().transaction_count() - ); - - QosService::remove_costs(qos_cost_results.iter(), None, &bank); + QosService::remove_or_update_costs(qos_cost_results.iter(), None, &bank); assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost()); assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count()); } @@ -926,8 +883,11 @@ mod tests { }) .collect(); - QosService::remove_costs(qos_cost_results.iter(), Some(&committed_status), &bank); - QosService::update_costs(qos_cost_results.iter(), Some(&committed_status), &bank); + QosService::remove_or_update_costs( + qos_cost_results.iter(), + Some(&committed_status), + &bank, + ); // assert the final block cost let mut expected_final_txs_count = 0u64; diff --git a/cost-model/src/cost_model.rs b/cost-model/src/cost_model.rs index a6ba581f05b78b..c444ad0885566f 100644 --- a/cost-model/src/cost_model.rs +++ b/cost-model/src/cost_model.rs @@ -51,6 +51,37 @@ impl CostModel { } } + // Calculate executed transaction CU cost, with actual execution and loaded accounts size + // costs. + pub fn calculate_cost_for_executed_transaction( + transaction: &SanitizedTransaction, + actual_programs_execution_cost: u64, + actual_loaded_accounts_data_size_bytes: usize, + feature_set: &FeatureSet, + ) -> TransactionCost { + if transaction.is_simple_vote_transaction() { + TransactionCost::SimpleVote { + writable_accounts: Self::get_writable_accounts(transaction), + } + } else { + let mut tx_cost = UsageCostDetails::new_with_default_capacity(); + + Self::get_signature_cost(&mut tx_cost, transaction); + Self::get_write_lock_cost(&mut tx_cost, transaction, feature_set); + Self::get_instructions_data_cost(&mut tx_cost, transaction); + tx_cost.allocated_accounts_data_size = + Self::calculate_allocated_accounts_data_size(transaction); + + tx_cost.programs_execution_cost = actual_programs_execution_cost; + tx_cost.loaded_accounts_data_size_cost = Self::calculate_loaded_accounts_data_size_cost( + actual_loaded_accounts_data_size_bytes, + feature_set, + ); + + TransactionCost::Transaction(tx_cost) + } + } + fn get_signature_cost(tx_cost: &mut UsageCostDetails, transaction: &SanitizedTransaction) { let signatures_count_detail = transaction.message().get_signature_details(); tx_cost.num_transaction_signatures = signatures_count_detail.num_transaction_signatures(); @@ -169,6 +200,20 @@ impl CostModel { tx_cost.data_bytes_cost = data_bytes_len_total / INSTRUCTION_DATA_BYTES_COST; } + fn get_instructions_data_cost( + tx_cost: &mut UsageCostDetails, + transaction: &SanitizedTransaction, + ) { + let ix_data_bytes_len_total: u64 = transaction + .message() + .instructions() + .iter() + .map(|instruction| instruction.data.len() as u64) + .sum(); + + tx_cost.data_bytes_cost = ix_data_bytes_len_total / INSTRUCTION_DATA_BYTES_COST; + } + pub fn calculate_loaded_accounts_data_size_cost( loaded_accounts_data_size: usize, _feature_set: &FeatureSet, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 2b95d40f9a8c78..c21ebda1cd55c1 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -62,7 +62,8 @@ use { solana_svm::{ transaction_processor::ExecutionRecordingConfig, transaction_results::{ - TransactionExecutionDetails, TransactionExecutionResult, TransactionResults, + TransactionExecutionDetails, TransactionExecutionResult, + TransactionLoadedAccountsStats, TransactionResults, }, }, solana_transaction_status::token_balances::TransactionTokenBalancesSet, @@ -181,11 +182,33 @@ pub fn execute_batch( let TransactionResults { fee_collection_results, + loaded_accounts_stats, execution_results, rent_debits, .. } = tx_results; + let (check_block_cost_limits_result, check_block_cost_limits_time): (Result<()>, Measure) = + measure!(if bank + .feature_set + .is_active(&feature_set::apply_cost_tracker_during_replay::id()) + { + check_block_cost_limits( + bank, + &loaded_accounts_stats, + &execution_results, + batch.sanitized_transactions(), + ) + } else { + Ok(()) + }); + + timings.saturating_add_in_place( + ExecuteTimingType::CheckBlockLimitsUs, + check_block_cost_limits_time.as_us(), + ); + check_block_cost_limits_result?; + let executed_transactions = execution_results .iter() .zip(batch.sanitized_transactions()) @@ -220,6 +243,49 @@ pub fn execute_batch( first_err.map(|(result, _)| result).unwrap_or(Ok(())) } +// collect transactions actual execution costs, subject to block limits; +// block will be marked as dead if exceeds cost limits, details will be +// reported to metric `replay-stage-mark_dead_slot` +fn check_block_cost_limits( + bank: &Bank, + loaded_accounts_stats: &[Result], + execution_results: &[TransactionExecutionResult], + sanitized_transactions: &[SanitizedTransaction], +) -> Result<()> { + assert_eq!(loaded_accounts_stats.len(), execution_results.len()); + + let tx_costs_with_actual_execution_units: Vec<_> = execution_results + .iter() + .zip(loaded_accounts_stats) + .zip(sanitized_transactions) + .filter_map(|((execution_result, loaded_accounts_stats), tx)| { + if let Some(details) = execution_result.details() { + let tx_cost = CostModel::calculate_cost_for_executed_transaction( + tx, + details.executed_units, + loaded_accounts_stats + .as_ref() + .map_or(0, |stats| stats.loaded_accounts_data_size), + &bank.feature_set, + ); + Some(tx_cost) + } else { + None + } + }) + .collect(); + + { + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + for tx_cost in &tx_costs_with_actual_execution_units { + cost_tracker + .try_add(tx_cost) + .map_err(TransactionError::from)?; + } + } + Ok(()) +} + #[derive(Default)] pub struct ExecuteBatchesInternalMetrics { execution_timings_per_thread: HashMap, @@ -456,22 +522,10 @@ fn rebatch_and_execute_batches( let cost = tx_cost.sum(); minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); total_cost = total_cost.saturating_add(cost); - tx_cost + cost }) .collect::>(); - if bank - .feature_set - .is_active(&feature_set::apply_cost_tracker_during_replay::id()) - { - let mut cost_tracker = bank.write_cost_tracker().unwrap(); - for tx_cost in &tx_costs { - cost_tracker - .try_add(tx_cost) - .map_err(TransactionError::from)?; - } - } - let target_batch_count = get_thread_count() as u64; let mut tx_batches: Vec = vec![]; @@ -479,26 +533,23 @@ fn rebatch_and_execute_batches( let target_batch_cost = total_cost / target_batch_count; let mut batch_cost: u64 = 0; let mut slice_start = 0; - tx_costs - .into_iter() - .enumerate() - .for_each(|(index, tx_cost)| { - let next_index = index + 1; - batch_cost = batch_cost.saturating_add(tx_cost.sum()); - if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { - let tx_batch = rebatch_transactions( - &lock_results, - bank, - &sanitized_txs, - slice_start, - index, - &transaction_indexes, - ); - slice_start = next_index; - tx_batches.push(tx_batch); - batch_cost = 0; - } - }); + tx_costs.into_iter().enumerate().for_each(|(index, cost)| { + let next_index = index + 1; + batch_cost = batch_cost.saturating_add(cost); + if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { + let tx_batch = rebatch_transactions( + &lock_results, + bank, + &sanitized_txs, + slice_start, + index, + &transaction_indexes, + ); + slice_start = next_index; + tx_batches.push(tx_batch); + batch_cost = 0; + } + }); &tx_batches[..] } else { batches @@ -2199,6 +2250,7 @@ pub mod tests { }, assert_matches::assert_matches, rand::{thread_rng, Rng}, + solana_cost_model::transaction_cost::TransactionCost, solana_entry::entry::{create_ticks, next_entry, next_entry_mut}, solana_program_runtime::declare_process_instruction, solana_runtime::{ @@ -5023,4 +5075,69 @@ pub mod tests { } } } + + #[test] + fn test_check_block_cost_limit() { + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Bank::new_for_tests(&genesis_config); + + let tx = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + )); + let mut tx_cost = CostModel::calculate_cost(&tx, &bank.feature_set); + let actual_execution_cu = 1; + let actual_loaded_accounts_data_size = 64 * 1024; + let TransactionCost::Transaction(ref mut usage_cost_details) = tx_cost else { + unreachable!("test tx is non-vote tx"); + }; + usage_cost_details.programs_execution_cost = actual_execution_cu; + usage_cost_details.loaded_accounts_data_size_cost = + CostModel::calculate_loaded_accounts_data_size_cost( + actual_loaded_accounts_data_size, + &bank.feature_set, + ); + // set block-limit to be able to just have one transaction + let block_limit = tx_cost.sum(); + + bank.write_cost_tracker() + .unwrap() + .set_limits(u64::MAX, block_limit, u64::MAX); + let txs = vec![tx.clone(), tx]; + let results = vec![ + TransactionExecutionResult::Executed { + details: TransactionExecutionDetails { + status: Ok(()), + log_messages: None, + inner_instructions: None, + fee_details: solana_sdk::fee::FeeDetails::default(), + return_data: None, + executed_units: actual_execution_cu, + accounts_data_len_delta: 0, + }, + programs_modified_by_tx: HashMap::new(), + }, + TransactionExecutionResult::NotExecuted(TransactionError::AccountNotFound), + ]; + let loaded_accounts_stats = vec![ + Ok(TransactionLoadedAccountsStats { + loaded_accounts_data_size: actual_loaded_accounts_data_size, + loaded_accounts_count: 2 + }); + 2 + ]; + + assert!(check_block_cost_limits(&bank, &loaded_accounts_stats, &results, &txs).is_ok()); + assert_eq!( + Err(TransactionError::WouldExceedMaxBlockCostLimit), + check_block_cost_limits(&bank, &loaded_accounts_stats, &results, &txs) + ); + } } diff --git a/program-runtime/src/timings.rs b/program-runtime/src/timings.rs index 5255c24094dbbe..9ffc4702178676 100644 --- a/program-runtime/src/timings.rs +++ b/program-runtime/src/timings.rs @@ -53,6 +53,7 @@ pub enum ExecuteTimingType { TotalBatchesLen, UpdateTransactionStatuses, ProgramCacheUs, + CheckBlockLimitsUs, } pub struct Metrics([u64; ExecuteTimingType::CARDINALITY]); @@ -177,6 +178,11 @@ eager_macro_rules! { $eager_1 .index(ExecuteTimingType::UpdateTransactionStatuses), i64 ), + ( + "check_block_limits_us", + *$self.metrics.index(ExecuteTimingType::CheckBlockLimitsUs), + i64 + ), ( "execute_details_serialize_us", $self.details.serialize_us,