From 2fc1f93433d72d6cd38a70cd53a710e66a628878 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Wed, 28 Jun 2023 15:44:58 -0500 Subject: [PATCH] Ensure that uncommitted transactions are always removed from QoS (#32285) Co-authored-by: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> (cherry picked from commit 5dee2e4d0cac7ecb81bc134a69ae632452d4b9ba) --- core/benches/consumer.rs | 42 ++++++++++--- core/src/banking_stage/consumer.rs | 71 ++++++++++++++------- core/src/qos_service.rs | 99 ++++++++++++++++++++++-------- 3 files changed, 157 insertions(+), 55 deletions(-) diff --git a/core/benches/consumer.rs b/core/benches/consumer.rs index 9fbbe0d960091e..d6f908c5770d5f 100644 --- a/core/benches/consumer.rs +++ b/core/benches/consumer.rs @@ -22,8 +22,9 @@ use { }, solana_runtime::bank::Bank, solana_sdk::{ - account::Account, signature::Keypair, signer::Signer, stake_history::Epoch, system_program, - system_transaction, transaction::SanitizedTransaction, + account::Account, feature_set::apply_cost_tracker_during_replay, signature::Keypair, + signer::Signer, stake_history::Epoch, system_program, system_transaction, + transaction::SanitizedTransaction, }, std::sync::{ atomic::{AtomicBool, Ordering}, @@ -32,6 +33,7 @@ use { tempfile::TempDir, test::Bencher, }; + extern crate test; fn create_accounts(num: usize) -> Vec { @@ -91,7 +93,7 @@ struct BenchFrame { signal_receiver: Receiver<(Arc, (Entry, u64))>, } -fn setup() -> BenchFrame { +fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame { let mint_total = u64::MAX; let GenesisConfigInfo { mut genesis_config, .. @@ -102,6 +104,11 @@ fn setup() -> BenchFrame { genesis_config.ticks_per_slot = 10_000; let mut bank = Bank::new_for_benches(&genesis_config); + + if !apply_cost_tracker_during_replay { + bank.deactivate_feature(&apply_cost_tracker_during_replay::id()); + } + // Allow arbitrary transaction processing time for the purposes of this bench bank.ns_per_slot = u128::MAX; @@ -128,7 +135,11 @@ fn setup() -> BenchFrame { } } -fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) { +fn bench_process_and_record_transactions( + bencher: &mut Bencher, + batch_size: usize, + apply_cost_tracker_during_replay: bool, +) { let BenchFrame { bank, ledger_path: _ledger_path, @@ -136,7 +147,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz poh_recorder, poh_service, signal_receiver: _signal_receiver, - } = setup(); + } = setup(apply_cost_tracker_during_replay); let consumer = create_consumer(&poh_recorder); let transactions = create_transactions(&bank, 2_usize.pow(20)); let mut transaction_iter = transactions.chunks(batch_size); @@ -156,15 +167,30 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz #[bench] fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 1); + bench_process_and_record_transactions(bencher, 1, true); } #[bench] fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 32); + bench_process_and_record_transactions(bencher, 32, true); } #[bench] fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) { - bench_process_and_record_transactions(bencher, 64); + bench_process_and_record_transactions(bencher, 64, true); +} + +#[bench] +fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) { + bench_process_and_record_transactions(bencher, 1, false); +} + +#[bench] +fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) { + bench_process_and_record_transactions(bencher, 32, false); +} + +#[bench] +fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) { + bench_process_and_record_transactions(bencher, 64, false); } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index ba58bd48d07124..9ae10096723f88 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -474,6 +474,15 @@ impl Consumer { .. } = execute_and_commit_transactions_output; + // Costs of all transactions are added to the cost_tracker before processing. + // To ensure accurate tracking of compute units, transactions that ultimately + // were not included in the block should have their cost removed. + QosService::remove_costs( + transaction_qos_cost_results.iter(), + commit_transactions_result.as_ref().ok(), + bank, + ); + // once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer // adjust block with executed cost (a behavior more inline with bankless leader), it // should use requested, or default `compute_unit_limit` as transaction's execution cost. @@ -481,7 +490,7 @@ impl Consumer { .feature_set .is_active(&feature_set::apply_cost_tracker_during_replay::id()) { - QosService::update_or_remove_transaction_costs( + QosService::update_costs( transaction_qos_cost_results.iter(), commit_transactions_result.as_ref().ok(), bank, @@ -1197,36 +1206,25 @@ mod tests { assert_eq!(executed_with_successful_result_count, 1); assert!(commit_transactions_result.is_ok()); - let single_transfer_cost = get_block_cost(); - assert_ne!(single_transfer_cost, 0); + let block_cost = get_block_cost(); + assert_ne!(block_cost, 0); assert_eq!(get_tx_count(), 1); - // - // TEST: When a tx in a batch can't be executed (here because of account - // locks), then its cost does not affect the cost tracker only if qos - // adjusts it with actual execution cost (when apply_cost_tracker_during_replay - // is not enabled). - // - + // TEST: it's expected that the allocation will execute but the transfer will not + // because of a shared write-lock between mint_keypair. Ensure only the first transaction + // takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature + // is applied correctly let allocate_keypair = Keypair::new(); - let transactions = sanitize_transactions(vec![ - system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()), - // intentionally use a tx that has a different cost system_transaction::allocate( &mint_keypair, &allocate_keypair, genesis_config.hash(), - 1, + 100, ), + // this one won't execute in process_and_record_transactions from shared account lock overlap + system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()), ]); - let mut expected_block_cost = 2 * single_transfer_cost; - let mut expected_tracked_tx_count = 2; - if apply_cost_tracker_during_replay_enabled { - expected_block_cost += - CostModel::calculate_cost(&transactions[1], &bank.feature_set).sum(); - expected_tracked_tx_count += 1; - } let process_transactions_batch_output = consumer.process_and_record_transactions(&bank, &transactions, 0); @@ -1239,10 +1237,39 @@ mod tests { } = process_transactions_batch_output.execute_and_commit_transactions_output; assert_eq!(executed_with_successful_result_count, 1); assert!(commit_transactions_result.is_ok()); + + // first one should have been committed, second one not committed due to AccountInUse error during + // account locking + let commit_transactions_result = commit_transactions_result.unwrap(); + assert_eq!(commit_transactions_result.len(), 2); + assert!(matches!( + commit_transactions_result.get(0).unwrap(), + CommitTransactionDetails::Committed { .. } + )); + assert!(matches!( + commit_transactions_result.get(1).unwrap(), + CommitTransactionDetails::NotCommitted + )); assert_eq!(retryable_transaction_indexes, vec![1]); + let expected_block_cost = if !apply_cost_tracker_during_replay_enabled { + let actual_bpf_execution_cost = match commit_transactions_result.get(0).unwrap() { + CommitTransactionDetails::Committed { compute_units } => *compute_units, + CommitTransactionDetails::NotCommitted => { + unreachable!() + } + }; + + let mut cost = CostModel::calculate_cost(&transactions[0], &bank.feature_set); + cost.bpf_execution_cost = actual_bpf_execution_cost; + + block_cost + cost.sum() + } else { + block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum() + }; + assert_eq!(get_block_cost(), expected_block_cost); - assert_eq!(get_tx_count(), expected_tracked_tx_count); + assert_eq!(get_tx_count(), 2); poh_recorder .read() diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 5df430574f6fa1..a27974a2b95f87 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -180,17 +180,30 @@ impl QosService { (select_results, num_included) } - /// Update the transaction cost in the cost_tracker with the real cost for - /// transactions that were executed successfully; - /// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker - /// being inflated with unsuccessfully executed transactions. - pub fn update_or_remove_transaction_costs<'a>( + /// Updates the transaction costs for committed transactions. Does not handle removing costs + /// for transactions that didn't get recorded or committed + pub fn update_costs<'a>( + transaction_cost_results: impl Iterator>, + transaction_committed_status: Option<&Vec>, + bank: &Arc, + ) { + if let Some(transaction_committed_status) = transaction_committed_status { + Self::update_committed_transaction_costs( + transaction_cost_results, + transaction_committed_status, + bank, + ) + } + } + + /// Removes transaction costs from the cost tracker if not committed or recorded + pub fn remove_costs<'a>( transaction_cost_results: impl Iterator>, transaction_committed_status: Option<&Vec>, bank: &Arc, ) { match transaction_committed_status { - Some(transaction_committed_status) => Self::update_transaction_costs( + Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs( transaction_cost_results, transaction_committed_status, bank, @@ -199,7 +212,7 @@ impl QosService { } } - fn update_transaction_costs<'a>( + fn remove_uncommitted_transaction_costs<'a>( transaction_cost_results: impl Iterator>, transaction_committed_status: &Vec, bank: &Arc, @@ -211,11 +224,29 @@ impl QosService { // Only transactions that the qos service included have to be // checked for update if let Ok(tx_cost) = tx_cost { - match transaction_committed_details { - CommitTransactionDetails::Committed { compute_units } => { - cost_tracker.update_execution_cost(tx_cost, *compute_units) - } - CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost), + if *transaction_committed_details == CommitTransactionDetails::NotCommitted { + cost_tracker.remove(tx_cost) + } + } + }); + } + + fn update_committed_transaction_costs<'a>( + transaction_cost_results: impl Iterator>, + transaction_committed_status: &Vec, + bank: &Arc, + ) { + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + transaction_cost_results + .zip(transaction_committed_status) + .for_each(|(tx_cost, transaction_committed_details)| { + // Only transactions that the qos service included have to be + // checked for update + if let Ok(tx_cost) = tx_cost { + if let CommitTransactionDetails::Committed { compute_units } = + transaction_committed_details + { + cost_tracker.update_execution_cost(tx_cost, *compute_units) } } }); @@ -733,7 +764,7 @@ mod tests { } #[test] - fn test_update_or_remove_transaction_costs_commited() { + fn test_update_and_remove_transaction_costs_committed() { solana_logger::setup(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); @@ -777,11 +808,19 @@ mod tests { }) .collect(); let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count; - QosService::update_or_remove_transaction_costs( - qos_cost_results.iter(), - Some(&commited_status), - &bank, + + // All transactions are committed, no costs should be removed + QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank); + assert_eq!( + total_txs_cost, + bank.read_cost_tracker().unwrap().block_cost() + ); + assert_eq!( + transaction_count, + bank.read_cost_tracker().unwrap().transaction_count() ); + + QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank); assert_eq!( final_txs_cost, bank.read_cost_tracker().unwrap().block_cost() @@ -794,7 +833,7 @@ mod tests { } #[test] - fn test_update_or_remove_transaction_costs_not_commited() { + fn test_update_and_remove_transaction_costs_not_committed() { solana_logger::setup(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); @@ -828,14 +867,26 @@ mod tests { total_txs_cost, bank.read_cost_tracker().unwrap().block_cost() ); - QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank); + + // update costs doesn't impact non-committed + QosService::update_costs(qos_cost_results.iter(), None, &bank); + assert_eq!( + total_txs_cost, + bank.read_cost_tracker().unwrap().block_cost() + ); + assert_eq!( + transaction_count, + bank.read_cost_tracker().unwrap().transaction_count() + ); + + QosService::remove_costs(qos_cost_results.iter(), None, &bank); assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost()); assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count()); } } #[test] - fn test_update_or_remove_transaction_costs_mixed_execution() { + fn test_update_and_remove_transaction_costs_mixed_execution() { solana_logger::setup(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); @@ -885,11 +936,9 @@ mod tests { } }) .collect(); - QosService::update_or_remove_transaction_costs( - qos_cost_results.iter(), - Some(&commited_status), - &bank, - ); + + QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank); + QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank); // assert the final block cost let mut expected_final_txs_count = 0u64;