Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that uncommitted transactions are always removed from QoS #32285

Merged
42 changes: 34 additions & 8 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use {
},
solana_runtime::bank::Bank,
solana_sdk::{
account::Account, signature::Keypair, signer::Signer, stake_history::Epoch, system_program,
system_transaction, transaction::SanitizedTransaction,
account::Account, feature_set::apply_cost_tracker_during_replay, signature::Keypair,
signer::Signer, stake_history::Epoch, system_program, system_transaction,
transaction::SanitizedTransaction,
},
std::sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -31,6 +32,7 @@ use {
tempfile::TempDir,
test::Bencher,
};

extern crate test;

fn create_accounts(num: usize) -> Vec<Keypair> {
Expand Down Expand Up @@ -90,7 +92,7 @@ struct BenchFrame {
signal_receiver: Receiver<(Arc<Bank>, (Entry, u64))>,
}

fn setup() -> BenchFrame {
fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame {
let mint_total = u64::MAX;
let GenesisConfigInfo {
mut genesis_config, ..
Expand All @@ -101,6 +103,11 @@ fn setup() -> BenchFrame {
genesis_config.ticks_per_slot = 10_000;

let mut bank = Bank::new_for_benches(&genesis_config);

if !apply_cost_tracker_during_replay {
bank.deactivate_feature(&apply_cost_tracker_during_replay::id());
}

// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX;

Expand All @@ -127,15 +134,19 @@ fn setup() -> BenchFrame {
}
}

fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) {
fn bench_process_and_record_transactions(
bencher: &mut Bencher,
batch_size: usize,
apply_cost_tracker_during_replay: bool,
) {
let BenchFrame {
bank,
ledger_path: _ledger_path,
exit,
poh_recorder,
poh_service,
signal_receiver: _signal_receiver,
} = setup();
} = setup(apply_cost_tracker_during_replay);
let consumer = create_consumer(&poh_recorder);
let transactions = create_transactions(&bank, 2_usize.pow(20));
let mut transaction_iter = transactions.chunks(batch_size);
Expand All @@ -155,15 +166,30 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz

#[bench]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me wish there was a test_case-like crate for benches 😢
I guess these will only live until the feature is activated everywhere

fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1);
bench_process_and_record_transactions(bencher, 1, true);
}

#[bench]
fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32);
bench_process_and_record_transactions(bencher, 32, true);
}

#[bench]
fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64);
bench_process_and_record_transactions(bencher, 64, true);
}

#[bench]
fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1, false);
}

#[bench]
fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32, false);
}

#[bench]
fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64, false);
}
77 changes: 55 additions & 22 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,23 @@ impl Consumer {
..
} = execute_and_commit_transactions_output;

// Costs of all transactions are added to the cost_tracker before processing.
// To ensure accurate tracking of compute units, transactions that ultimately
// were not included in the block should have their cost removed.
QosService::remove_costs(
tao-stones marked this conversation as resolved.
Show resolved Hide resolved
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
);

// once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer
// adjust block with executed cost (a behavior more inline with bankless leader), it
// should use requested, or default `compute_unit_limit` as transaction's execution cost.
if !bank
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
QosService::update_or_remove_transaction_costs(
QosService::update_costs(
tao-stones marked this conversation as resolved.
Show resolved Hide resolved
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
Expand Down Expand Up @@ -1193,36 +1202,25 @@ mod tests {
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

let single_transfer_cost = get_block_cost();
assert_ne!(single_transfer_cost, 0);
let block_cost = get_block_cost();
assert_ne!(block_cost, 0);
assert_eq!(get_tx_count(), 1);

//
// TEST: When a tx in a batch can't be executed (here because of account
// locks), then its cost does not affect the cost tracker only if qos
// adjusts it with actual execution cost (when apply_cost_tracker_during_replay
// is not enabled).
//

// TEST: it's expected that the allocation will execute but the transfer will not
// because of a shared write-lock between mint_keypair. Ensure only the first transaction
// takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature
// is applied correctly
let allocate_keypair = Keypair::new();

let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
// intentionally use a tx that has a different cost
system_transaction::allocate(
&mint_keypair,
&allocate_keypair,
genesis_config.hash(),
1,
100,
),
// this one won't execute in process_and_record_transactions from shared account lock overlap
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
]);
let mut expected_block_cost = 2 * single_transfer_cost;
let mut expected_tracked_tx_count = 2;
if apply_cost_tracker_during_replay_enabled {
expected_block_cost +=
CostModel::calculate_cost(&transactions[1], &bank.feature_set).sum();
expected_tracked_tx_count += 1;
}

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand All @@ -1235,10 +1233,45 @@ mod tests {
} = process_transactions_batch_output.execute_and_commit_transactions_output;
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

// first one should have been committed, second one not committed due to write-lock error
let commit_transactions_result = commit_transactions_result.unwrap();
assert_eq!(commit_transactions_result.len(), 2);
assert!(matches!(
commit_transactions_result.get(0).unwrap(),
CommitTransactionDetails::Committed { .. }
));
assert!(matches!(
commit_transactions_result.get(1).unwrap(),
CommitTransactionDetails::NotCommitted
));
assert_eq!(retryable_transaction_indexes, vec![1]);

println!(
buffalu marked this conversation as resolved.
Show resolved Hide resolved
"block cost: {} cost model cost: {} executed cost: {:?}, apply_cost_tracker_during_replay_enabled: {}",
block_cost,
CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum(),
commit_transactions_result.get(0).unwrap(),
apply_cost_tracker_during_replay_enabled
);

// if apply_cost_tracker_during_replay_enabled is NOT live, then the expected block cost
// needs to be adjusted by the actual cost executed. The adjusted should just be the final
// cost
let expected_block_cost = if !apply_cost_tracker_during_replay_enabled {
block_cost
+ match commit_transactions_result.get(0).unwrap() {
CommitTransactionDetails::Committed { compute_units } => compute_units,
CommitTransactionDetails::NotCommitted => {
unreachable!()
}
}
} else {
block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum()
};

assert_eq!(get_block_cost(), expected_block_cost);
assert_eq!(get_tx_count(), expected_tracked_tx_count);
assert_eq!(get_tx_count(), 2);

poh_recorder
.read()
Expand Down
99 changes: 74 additions & 25 deletions core/src/banking_stage/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,30 @@ impl QosService {
(select_results, num_included)
}

/// Update the transaction cost in the cost_tracker with the real cost for
/// transactions that were executed successfully;
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
/// Updates the transaction costs for committed transactions. Does not handle removing costs
/// for transactions that didn't get recorded or committed
pub fn update_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
if let Some(transaction_committed_status) = transaction_committed_status {
Self::update_committed_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
)
}
}

/// Removes transaction costs from the cost tracker if not committed or recorded
pub fn remove_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
Some(transaction_committed_status) => Self::update_transaction_costs(
Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
Expand All @@ -196,7 +209,7 @@ impl QosService {
}
}

fn update_transaction_costs<'a>(
fn remove_uncommitted_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
Expand All @@ -208,11 +221,29 @@ impl QosService {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost)
}
}
});
}

fn update_committed_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
tao-stones marked this conversation as resolved.
Show resolved Hide resolved
transaction_cost_results
.zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
if let CommitTransactionDetails::Committed { compute_units } =
transaction_committed_details
{
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
}
});
Expand Down Expand Up @@ -730,7 +761,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_commited() {
fn test_update_and_remove_transaction_costs_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -774,11 +805,19 @@ mod tests {
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,

// All transactions are committed, no costs should be modified
buffalu marked this conversation as resolved.
Show resolved Hide resolved
QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
final_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
Expand All @@ -791,7 +830,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_not_commited() {
fn test_update_and_remove_transaction_costs_not_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -825,14 +864,26 @@ mod tests {
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank);

// update costs doesn't impact non-committed
QosService::update_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::remove_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count());
}
}

#[test]
fn test_update_or_remove_transaction_costs_mixed_execution() {
fn test_update_and_remove_transaction_costs_mixed_execution() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -882,11 +933,9 @@ mod tests {
}
})
.collect();
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);

QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);

// assert the final block cost
let mut expected_final_txs_count = 0u64;
Expand Down