Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

additional costs in block capacity calc #25059

Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use {
};

// it tracks the block cost available capacity - number of compute-units allowed
// by max block cost limit
// by max block cost limit.
#[derive(Debug)]
pub struct BlockCostCapacityMeter {
pub capacity: u64,
Expand Down Expand Up @@ -163,6 +163,7 @@ fn execute_batch(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_cost: u64,
) -> Result<()> {
let record_token_balances = transaction_status_sender.is_some();

Expand Down Expand Up @@ -194,13 +195,14 @@ fn execute_batch(
let remaining_block_cost_cap = cost_capacity_meter
.write()
.unwrap()
.accumulate(execution_cost_units);
.accumulate(execution_cost_units + tx_cost);
tao-stones marked this conversation as resolved.
Show resolved Hide resolved

debug!(
"bank {} executed a batch, number of transactions {}, total execute cu {}, remaining block cost cap {}",
"bank {} executed a batch, number of transactions {}, total execute cu {}, total additional cu {}, remaining block cost cap {}",
bank.slot(),
batch.sanitized_transactions().len(),
execution_cost_units,
tx_cost,
remaining_block_cost_cap,
);

Expand Down Expand Up @@ -262,13 +264,15 @@ fn execute_batches_internal(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_costs: &[u64],
) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let (results, new_timings): (Vec<Result<()>>, Vec<ExecuteTimings>) =
PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.map(|batch| {
.enumerate()
.map(|(index, batch)| {
let mut timings = ExecuteTimings::default();
let result = execute_batch(
batch,
Expand All @@ -277,6 +281,7 @@ fn execute_batches_internal(
replay_vote_sender,
&mut timings,
cost_capacity_meter.clone(),
tx_costs[index],
);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
Expand Down Expand Up @@ -317,6 +322,7 @@ fn execute_batches(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
cost_model: &CostModel,
) -> Result<()> {
let lock_results = batches
.iter()
Expand All @@ -327,42 +333,64 @@ fn execute_batches(
.flat_map(|batch| batch.sanitized_transactions().to_vec())
.collect::<Vec<_>>();

let cost_model = CostModel::new();
let mut minimal_tx_cost = u64::MAX;
let mut total_cost: u64 = 0;
let mut total_cost_without_bpf: u64 = 0;
// Allowing collect here, since it also computes the minimal tx cost, and aggregate cost.
// These two values are later used for checking if the tx_costs vector needs to be iterated over.
// The collection is a pair of (full cost, cost without estimated-bpf-code-costs).
#[allow(clippy::needless_collect)]
let tx_costs = sanitized_txs
.iter()
.map(|tx| {
let cost = cost_model.calculate_cost(tx).sum();
let tx_cost = cost_model.calculate_cost(tx);
let cost = tx_cost.sum();
let cost_without_bpf = cost.saturating_sub(tx_cost.bpf_execution_cost);
jdavis103 marked this conversation as resolved.
Show resolved Hide resolved
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
total_cost = total_cost.saturating_add(cost);
cost
total_cost_without_bpf = total_cost_without_bpf.saturating_add(cost_without_bpf);
(cost, cost_without_bpf)
})
.collect::<Vec<_>>();

let target_batch_count = get_thread_count() as u64;

let mut tx_batches: Vec<TransactionBatch> = vec![];
let mut tx_batch_costs: Vec<u64> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let target_batch_cost = total_cost / target_batch_count;
let mut batch_cost: u64 = 0;
let mut batch_cost_without_bpf: u64 = 0;
let mut slice_start = 0;
tx_costs.into_iter().enumerate().for_each(|(index, cost)| {
let next_index = index + 1;
batch_cost = batch_cost.saturating_add(cost);
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
let tx_batch =
rebatch_transactions(&lock_results, bank, &sanitized_txs, slice_start, index);
slice_start = next_index;
tx_batches.push(tx_batch);
batch_cost = 0;
}
});
tx_costs
.into_iter()
.enumerate()
.for_each(|(index, cost_pair)| {
let next_index = index + 1;
batch_cost = batch_cost.saturating_add(cost_pair.0);
batch_cost_without_bpf = batch_cost_without_bpf.saturating_add(cost_pair.1);
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
let tx_batch = rebatch_transactions(
&lock_results,
bank,
&sanitized_txs,
slice_start,
index,
);
slice_start = next_index;
tx_batches.push(tx_batch);
tx_batch_costs.push(batch_cost_without_bpf);
batch_cost = 0;
batch_cost_without_bpf = 0;
}
});
&tx_batches[..]
} else {
match batches.len() {
// Ensure that the total cost attributed to this batch is essentially correct
0 => tx_batch_costs = Vec::new(),
n => tx_batch_costs = vec![total_cost_without_bpf / (n as u64); n],
}
batches
};

Expand All @@ -374,6 +402,7 @@ fn execute_batches(
replay_vote_sender,
timings,
cost_capacity_meter,
&tx_batch_costs,
)
}

Expand Down Expand Up @@ -430,6 +459,7 @@ fn process_entries_with_callback(
let mut batches = vec![];
let mut tick_hashes = vec![];
let mut rng = thread_rng();
let cost_model = CostModel::new();
tao-stones marked this conversation as resolved.
Show resolved Hide resolved

for entry in entries {
match entry {
Expand All @@ -447,6 +477,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
&cost_model,
)?;
batches.clear();
for hash in &tick_hashes {
Expand Down Expand Up @@ -504,6 +535,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
&cost_model,
)?;
batches.clear();
}
Expand All @@ -519,6 +551,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter,
&cost_model,
)?;
for hash in tick_hashes {
bank.register_tick(hash);
Expand Down