Skip to content

Commit

Permalink
v1.16: Ensure that uncommitted transactions are always removed from Q…
Browse files Browse the repository at this point in the history
…oS (backport of #32285) (#32320)

Ensure that uncommitted transactions are always removed from QoS (#32285)

Co-authored-by: Tao Zhu <[email protected]>
(cherry picked from commit 5dee2e4)

Co-authored-by: buffalu <[email protected]>
  • Loading branch information
mergify[bot] and buffalu authored Jun 28, 2023
1 parent a7496b2 commit a54fe57
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 55 deletions.
42 changes: 34 additions & 8 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use {
},
solana_runtime::bank::Bank,
solana_sdk::{
account::Account, signature::Keypair, signer::Signer, stake_history::Epoch, system_program,
system_transaction, transaction::SanitizedTransaction,
account::Account, feature_set::apply_cost_tracker_during_replay, signature::Keypair,
signer::Signer, stake_history::Epoch, system_program, system_transaction,
transaction::SanitizedTransaction,
},
std::sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -32,6 +33,7 @@ use {
tempfile::TempDir,
test::Bencher,
};

extern crate test;

fn create_accounts(num: usize) -> Vec<Keypair> {
Expand Down Expand Up @@ -91,7 +93,7 @@ struct BenchFrame {
signal_receiver: Receiver<(Arc<Bank>, (Entry, u64))>,
}

fn setup() -> BenchFrame {
fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame {
let mint_total = u64::MAX;
let GenesisConfigInfo {
mut genesis_config, ..
Expand All @@ -102,6 +104,11 @@ fn setup() -> BenchFrame {
genesis_config.ticks_per_slot = 10_000;

let mut bank = Bank::new_for_benches(&genesis_config);

if !apply_cost_tracker_during_replay {
bank.deactivate_feature(&apply_cost_tracker_during_replay::id());
}

// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX;

Expand All @@ -128,15 +135,19 @@ fn setup() -> BenchFrame {
}
}

fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) {
fn bench_process_and_record_transactions(
bencher: &mut Bencher,
batch_size: usize,
apply_cost_tracker_during_replay: bool,
) {
let BenchFrame {
bank,
ledger_path: _ledger_path,
exit,
poh_recorder,
poh_service,
signal_receiver: _signal_receiver,
} = setup();
} = setup(apply_cost_tracker_during_replay);
let consumer = create_consumer(&poh_recorder);
let transactions = create_transactions(&bank, 2_usize.pow(20));
let mut transaction_iter = transactions.chunks(batch_size);
Expand All @@ -156,15 +167,30 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz

#[bench]
fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1);
bench_process_and_record_transactions(bencher, 1, true);
}

#[bench]
fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32);
bench_process_and_record_transactions(bencher, 32, true);
}

#[bench]
fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64);
bench_process_and_record_transactions(bencher, 64, true);
}

#[bench]
fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1, false);
}

#[bench]
fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32, false);
}

#[bench]
fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64, false);
}
71 changes: 49 additions & 22 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,23 @@ impl Consumer {
..
} = execute_and_commit_transactions_output;

// Costs of all transactions are added to the cost_tracker before processing.
// To ensure accurate tracking of compute units, transactions that ultimately
// were not included in the block should have their cost removed.
QosService::remove_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
);

// once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer
// adjust block with executed cost (a behavior more inline with bankless leader), it
// should use requested, or default `compute_unit_limit` as transaction's execution cost.
if !bank
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
QosService::update_or_remove_transaction_costs(
QosService::update_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
Expand Down Expand Up @@ -1197,36 +1206,25 @@ mod tests {
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

let single_transfer_cost = get_block_cost();
assert_ne!(single_transfer_cost, 0);
let block_cost = get_block_cost();
assert_ne!(block_cost, 0);
assert_eq!(get_tx_count(), 1);

//
// TEST: When a tx in a batch can't be executed (here because of account
// locks), then its cost does not affect the cost tracker only if qos
// adjusts it with actual execution cost (when apply_cost_tracker_during_replay
// is not enabled).
//

// TEST: it's expected that the allocation will execute but the transfer will not
// because of a shared write-lock between mint_keypair. Ensure only the first transaction
// takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature
// is applied correctly
let allocate_keypair = Keypair::new();

let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
// intentionally use a tx that has a different cost
system_transaction::allocate(
&mint_keypair,
&allocate_keypair,
genesis_config.hash(),
1,
100,
),
// this one won't execute in process_and_record_transactions from shared account lock overlap
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
]);
let mut expected_block_cost = 2 * single_transfer_cost;
let mut expected_tracked_tx_count = 2;
if apply_cost_tracker_during_replay_enabled {
expected_block_cost +=
CostModel::calculate_cost(&transactions[1], &bank.feature_set).sum();
expected_tracked_tx_count += 1;
}

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand All @@ -1239,10 +1237,39 @@ mod tests {
} = process_transactions_batch_output.execute_and_commit_transactions_output;
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

// first one should have been committed, second one not committed due to AccountInUse error during
// account locking
let commit_transactions_result = commit_transactions_result.unwrap();
assert_eq!(commit_transactions_result.len(), 2);
assert!(matches!(
commit_transactions_result.get(0).unwrap(),
CommitTransactionDetails::Committed { .. }
));
assert!(matches!(
commit_transactions_result.get(1).unwrap(),
CommitTransactionDetails::NotCommitted
));
assert_eq!(retryable_transaction_indexes, vec![1]);

let expected_block_cost = if !apply_cost_tracker_during_replay_enabled {
let actual_bpf_execution_cost = match commit_transactions_result.get(0).unwrap() {
CommitTransactionDetails::Committed { compute_units } => *compute_units,
CommitTransactionDetails::NotCommitted => {
unreachable!()
}
};

let mut cost = CostModel::calculate_cost(&transactions[0], &bank.feature_set);
cost.bpf_execution_cost = actual_bpf_execution_cost;

block_cost + cost.sum()
} else {
block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum()
};

assert_eq!(get_block_cost(), expected_block_cost);
assert_eq!(get_tx_count(), expected_tracked_tx_count);
assert_eq!(get_tx_count(), 2);

poh_recorder
.read()
Expand Down
99 changes: 74 additions & 25 deletions core/src/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,30 @@ impl QosService {
(select_results, num_included)
}

/// Update the transaction cost in the cost_tracker with the real cost for
/// transactions that were executed successfully;
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
/// Updates the transaction costs for committed transactions. Does not handle removing costs
/// for transactions that didn't get recorded or committed
pub fn update_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
if let Some(transaction_committed_status) = transaction_committed_status {
Self::update_committed_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
)
}
}

/// Removes transaction costs from the cost tracker if not committed or recorded
pub fn remove_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
Some(transaction_committed_status) => Self::update_transaction_costs(
Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
Expand All @@ -199,7 +212,7 @@ impl QosService {
}
}

fn update_transaction_costs<'a>(
fn remove_uncommitted_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
Expand All @@ -211,11 +224,29 @@ impl QosService {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost)
}
}
});
}

fn update_committed_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_cost_results
.zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
if let CommitTransactionDetails::Committed { compute_units } =
transaction_committed_details
{
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
}
});
Expand Down Expand Up @@ -733,7 +764,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_commited() {
fn test_update_and_remove_transaction_costs_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -777,11 +808,19 @@ mod tests {
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,

// All transactions are committed, no costs should be removed
QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
final_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
Expand All @@ -794,7 +833,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_not_commited() {
fn test_update_and_remove_transaction_costs_not_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -828,14 +867,26 @@ mod tests {
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank);

// update costs doesn't impact non-committed
QosService::update_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::remove_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count());
}
}

#[test]
fn test_update_or_remove_transaction_costs_mixed_execution() {
fn test_update_and_remove_transaction_costs_mixed_execution() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -885,11 +936,9 @@ mod tests {
}
})
.collect();
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);

QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);

// assert the final block cost
let mut expected_final_txs_count = 0u64;
Expand Down

0 comments on commit a54fe57

Please sign in to comment.