Skip to content

Commit

Permalink
additional costs in block capacity calc (#25059)
Browse files Browse the repository at this point in the history
* Added additional costs to block capacity computation, and pushed alloc of CostModel all the way to the top of the call chain, instead of reallocing

* Fix two compiler errors

* Update block processing to propagate computed costs, rather than re-computing deeper in the call stack

* Clippy fix

* Reformatting fix after merge

* Add CostModel::sum_without_bpf
  • Loading branch information
jdavis103 authored May 12, 2022
1 parent 3367e44 commit 08da486
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 19 deletions.
69 changes: 51 additions & 18 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use {
};

// it tracks the block cost available capacity - number of compute-units allowed
// by max block cost limit
// by max block cost limit.
#[derive(Debug)]
pub struct BlockCostCapacityMeter {
pub capacity: u64,
Expand Down Expand Up @@ -163,6 +163,7 @@ fn execute_batch(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_cost: u64,
) -> Result<()> {
let record_token_balances = transaction_status_sender.is_some();

Expand Down Expand Up @@ -194,13 +195,14 @@ fn execute_batch(
let remaining_block_cost_cap = cost_capacity_meter
.write()
.unwrap()
.accumulate(execution_cost_units);
.accumulate(execution_cost_units + tx_cost);

debug!(
"bank {} executed a batch, number of transactions {}, total execute cu {}, remaining block cost cap {}",
"bank {} executed a batch, number of transactions {}, total execute cu {}, total additional cu {}, remaining block cost cap {}",
bank.slot(),
batch.sanitized_transactions().len(),
execution_cost_units,
tx_cost,
remaining_block_cost_cap,
);

Expand Down Expand Up @@ -262,13 +264,15 @@ fn execute_batches_internal(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_costs: &[u64],
) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let (results, new_timings): (Vec<Result<()>>, Vec<ExecuteTimings>) =
PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.map(|batch| {
.enumerate()
.map(|(index, batch)| {
let mut timings = ExecuteTimings::default();
let result = execute_batch(
batch,
Expand All @@ -277,6 +281,7 @@ fn execute_batches_internal(
replay_vote_sender,
&mut timings,
cost_capacity_meter.clone(),
tx_costs[index],
);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
Expand Down Expand Up @@ -317,6 +322,7 @@ fn execute_batches(
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
cost_model: &CostModel,
) -> Result<()> {
let lock_results = batches
.iter()
Expand All @@ -327,42 +333,64 @@ fn execute_batches(
.flat_map(|batch| batch.sanitized_transactions().to_vec())
.collect::<Vec<_>>();

let cost_model = CostModel::new();
let mut minimal_tx_cost = u64::MAX;
let mut total_cost: u64 = 0;
let mut total_cost_without_bpf: u64 = 0;
// Allowing collect here, since it also computes the minimal tx cost, and aggregate cost.
// These two values are later used for checking if the tx_costs vector needs to be iterated over.
// The collection is a pair of (full cost, cost without estimated-bpf-code-costs).
#[allow(clippy::needless_collect)]
let tx_costs = sanitized_txs
.iter()
.map(|tx| {
let cost = cost_model.calculate_cost(tx).sum();
let tx_cost = cost_model.calculate_cost(tx);
let cost = tx_cost.sum();
let cost_without_bpf = tx_cost.sum_without_bpf();
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
total_cost = total_cost.saturating_add(cost);
cost
total_cost_without_bpf = total_cost_without_bpf.saturating_add(cost_without_bpf);
(cost, cost_without_bpf)
})
.collect::<Vec<_>>();

let target_batch_count = get_thread_count() as u64;

let mut tx_batches: Vec<TransactionBatch> = vec![];
let mut tx_batch_costs: Vec<u64> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let target_batch_cost = total_cost / target_batch_count;
let mut batch_cost: u64 = 0;
let mut batch_cost_without_bpf: u64 = 0;
let mut slice_start = 0;
tx_costs.into_iter().enumerate().for_each(|(index, cost)| {
let next_index = index + 1;
batch_cost = batch_cost.saturating_add(cost);
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
let tx_batch =
rebatch_transactions(&lock_results, bank, &sanitized_txs, slice_start, index);
slice_start = next_index;
tx_batches.push(tx_batch);
batch_cost = 0;
}
});
tx_costs
.into_iter()
.enumerate()
.for_each(|(index, cost_pair)| {
let next_index = index + 1;
batch_cost = batch_cost.saturating_add(cost_pair.0);
batch_cost_without_bpf = batch_cost_without_bpf.saturating_add(cost_pair.1);
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
let tx_batch = rebatch_transactions(
&lock_results,
bank,
&sanitized_txs,
slice_start,
index,
);
slice_start = next_index;
tx_batches.push(tx_batch);
tx_batch_costs.push(batch_cost_without_bpf);
batch_cost = 0;
batch_cost_without_bpf = 0;
}
});
&tx_batches[..]
} else {
match batches.len() {
// Ensure that the total cost attributed to this batch is essentially correct
0 => tx_batch_costs = Vec::new(),
n => tx_batch_costs = vec![total_cost_without_bpf / (n as u64); n],
}
batches
};

Expand All @@ -374,6 +402,7 @@ fn execute_batches(
replay_vote_sender,
timings,
cost_capacity_meter,
&tx_batch_costs,
)
}

Expand Down Expand Up @@ -430,6 +459,7 @@ fn process_entries_with_callback(
let mut batches = vec![];
let mut tick_hashes = vec![];
let mut rng = thread_rng();
let cost_model = CostModel::new();

for entry in entries {
match entry {
Expand All @@ -447,6 +477,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
&cost_model,
)?;
batches.clear();
for hash in &tick_hashes {
Expand Down Expand Up @@ -504,6 +535,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
&cost_model,
)?;
batches.clear();
}
Expand All @@ -519,6 +551,7 @@ fn process_entries_with_callback(
replay_vote_sender,
timings,
cost_capacity_meter,
&cost_model,
)?;
for hash in tick_hashes {
bank.register_tick(hash);
Expand Down
6 changes: 5 additions & 1 deletion runtime/src/cost_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ impl TransactionCost {
}

pub fn sum(&self) -> u64 {
self.sum_without_bpf()
.saturating_add(self.bpf_execution_cost)
}

pub fn sum_without_bpf(&self) -> u64 {
self.signature_cost
.saturating_add(self.write_lock_cost)
.saturating_add(self.data_bytes_cost)
.saturating_add(self.builtins_execution_cost)
.saturating_add(self.bpf_execution_cost)
}
}

Expand Down

0 comments on commit 08da486

Please sign in to comment.