Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.16: Ensure that uncommitted transactions are always removed from QoS (backport of #32285) #32320

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use {
},
solana_runtime::bank::Bank,
solana_sdk::{
account::Account, signature::Keypair, signer::Signer, stake_history::Epoch, system_program,
system_transaction, transaction::SanitizedTransaction,
account::Account, feature_set::apply_cost_tracker_during_replay, signature::Keypair,
signer::Signer, stake_history::Epoch, system_program, system_transaction,
transaction::SanitizedTransaction,
},
std::sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -32,6 +33,7 @@ use {
tempfile::TempDir,
test::Bencher,
};

extern crate test;

fn create_accounts(num: usize) -> Vec<Keypair> {
Expand Down Expand Up @@ -91,7 +93,7 @@ struct BenchFrame {
signal_receiver: Receiver<(Arc<Bank>, (Entry, u64))>,
}

fn setup() -> BenchFrame {
fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame {
let mint_total = u64::MAX;
let GenesisConfigInfo {
mut genesis_config, ..
Expand All @@ -102,6 +104,11 @@ fn setup() -> BenchFrame {
genesis_config.ticks_per_slot = 10_000;

let mut bank = Bank::new_for_benches(&genesis_config);

if !apply_cost_tracker_during_replay {
bank.deactivate_feature(&apply_cost_tracker_during_replay::id());
}

// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX;

Expand All @@ -128,15 +135,19 @@ fn setup() -> BenchFrame {
}
}

fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) {
fn bench_process_and_record_transactions(
bencher: &mut Bencher,
batch_size: usize,
apply_cost_tracker_during_replay: bool,
) {
let BenchFrame {
bank,
ledger_path: _ledger_path,
exit,
poh_recorder,
poh_service,
signal_receiver: _signal_receiver,
} = setup();
} = setup(apply_cost_tracker_during_replay);
let consumer = create_consumer(&poh_recorder);
let transactions = create_transactions(&bank, 2_usize.pow(20));
let mut transaction_iter = transactions.chunks(batch_size);
Expand All @@ -156,15 +167,30 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz

#[bench]
fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1);
bench_process_and_record_transactions(bencher, 1, true);
}

#[bench]
fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32);
bench_process_and_record_transactions(bencher, 32, true);
}

#[bench]
fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64);
bench_process_and_record_transactions(bencher, 64, true);
}

#[bench]
fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1, false);
}

#[bench]
fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32, false);
}

#[bench]
fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64, false);
}
71 changes: 49 additions & 22 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,23 @@ impl Consumer {
..
} = execute_and_commit_transactions_output;

// Costs of all transactions are added to the cost_tracker before processing.
// To ensure accurate tracking of compute units, transactions that ultimately
// were not included in the block should have their cost removed.
QosService::remove_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
);

// once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer
// adjust block with executed cost (a behavior more inline with bankless leader), it
// should use requested, or default `compute_unit_limit` as transaction's execution cost.
if !bank
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
QosService::update_or_remove_transaction_costs(
QosService::update_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
Expand Down Expand Up @@ -1197,36 +1206,25 @@ mod tests {
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

let single_transfer_cost = get_block_cost();
assert_ne!(single_transfer_cost, 0);
let block_cost = get_block_cost();
assert_ne!(block_cost, 0);
assert_eq!(get_tx_count(), 1);

//
// TEST: When a tx in a batch can't be executed (here because of account
// locks), then its cost does not affect the cost tracker only if qos
// adjusts it with actual execution cost (when apply_cost_tracker_during_replay
// is not enabled).
//

// TEST: it's expected that the allocation will execute but the transfer will not
// because of a shared write-lock between mint_keypair. Ensure only the first transaction
// takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature
// is applied correctly
let allocate_keypair = Keypair::new();

let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
// intentionally use a tx that has a different cost
system_transaction::allocate(
&mint_keypair,
&allocate_keypair,
genesis_config.hash(),
1,
100,
),
// this one won't execute in process_and_record_transactions from shared account lock overlap
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
]);
let mut expected_block_cost = 2 * single_transfer_cost;
let mut expected_tracked_tx_count = 2;
if apply_cost_tracker_during_replay_enabled {
expected_block_cost +=
CostModel::calculate_cost(&transactions[1], &bank.feature_set).sum();
expected_tracked_tx_count += 1;
}

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand All @@ -1239,10 +1237,39 @@ mod tests {
} = process_transactions_batch_output.execute_and_commit_transactions_output;
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());

// first one should have been committed, second one not committed due to AccountInUse error during
// account locking
let commit_transactions_result = commit_transactions_result.unwrap();
assert_eq!(commit_transactions_result.len(), 2);
assert!(matches!(
commit_transactions_result.get(0).unwrap(),
CommitTransactionDetails::Committed { .. }
));
assert!(matches!(
commit_transactions_result.get(1).unwrap(),
CommitTransactionDetails::NotCommitted
));
assert_eq!(retryable_transaction_indexes, vec![1]);

let expected_block_cost = if !apply_cost_tracker_during_replay_enabled {
let actual_bpf_execution_cost = match commit_transactions_result.get(0).unwrap() {
CommitTransactionDetails::Committed { compute_units } => *compute_units,
CommitTransactionDetails::NotCommitted => {
unreachable!()
}
};

let mut cost = CostModel::calculate_cost(&transactions[0], &bank.feature_set);
cost.bpf_execution_cost = actual_bpf_execution_cost;

block_cost + cost.sum()
} else {
block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum()
};

assert_eq!(get_block_cost(), expected_block_cost);
assert_eq!(get_tx_count(), expected_tracked_tx_count);
assert_eq!(get_tx_count(), 2);

poh_recorder
.read()
Expand Down
99 changes: 74 additions & 25 deletions core/src/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,30 @@ impl QosService {
(select_results, num_included)
}

/// Update the transaction cost in the cost_tracker with the real cost for
/// transactions that were executed successfully;
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
/// Updates the transaction costs for committed transactions. Does not handle removing costs
/// for transactions that didn't get recorded or committed
pub fn update_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
if let Some(transaction_committed_status) = transaction_committed_status {
Self::update_committed_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
)
}
}

/// Removes transaction costs from the cost tracker if not committed or recorded
pub fn remove_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
Some(transaction_committed_status) => Self::update_transaction_costs(
Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
Expand All @@ -199,7 +212,7 @@ impl QosService {
}
}

fn update_transaction_costs<'a>(
fn remove_uncommitted_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
Expand All @@ -211,11 +224,29 @@ impl QosService {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost)
}
}
});
}

fn update_committed_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_cost_results
.zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
if let CommitTransactionDetails::Committed { compute_units } =
transaction_committed_details
{
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
}
});
Expand Down Expand Up @@ -733,7 +764,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_commited() {
fn test_update_and_remove_transaction_costs_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -777,11 +808,19 @@ mod tests {
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,

// All transactions are committed, no costs should be removed
QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
final_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
Expand All @@ -794,7 +833,7 @@ mod tests {
}

#[test]
fn test_update_or_remove_transaction_costs_not_commited() {
fn test_update_and_remove_transaction_costs_not_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -828,14 +867,26 @@ mod tests {
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank);

// update costs doesn't impact non-committed
QosService::update_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);

QosService::remove_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count());
}
}

#[test]
fn test_update_or_remove_transaction_costs_mixed_execution() {
fn test_update_and_remove_transaction_costs_mixed_execution() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
Expand Down Expand Up @@ -885,11 +936,9 @@ mod tests {
}
})
.collect();
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);

QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);

// assert the final block cost
let mut expected_final_txs_count = 0u64;
Expand Down