diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a243cac2f279ac..bcab738547db68 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -65,7 +65,7 @@ use { }; // it tracks the block cost available capacity - number of compute-units allowed -// by max block cost limit +// by max block cost limit. #[derive(Debug)] pub struct BlockCostCapacityMeter { pub capacity: u64, @@ -163,6 +163,7 @@ fn execute_batch( replay_vote_sender: Option<&ReplayVoteSender>, timings: &mut ExecuteTimings, cost_capacity_meter: Arc>, + tx_cost: u64, ) -> Result<()> { let record_token_balances = transaction_status_sender.is_some(); @@ -194,13 +195,14 @@ fn execute_batch( let remaining_block_cost_cap = cost_capacity_meter .write() .unwrap() - .accumulate(execution_cost_units); + .accumulate(execution_cost_units + tx_cost); debug!( - "bank {} executed a batch, number of transactions {}, total execute cu {}, remaining block cost cap {}", + "bank {} executed a batch, number of transactions {}, total execute cu {}, total additional cu {}, remaining block cost cap {}", bank.slot(), batch.sanitized_transactions().len(), execution_cost_units, + tx_cost, remaining_block_cost_cap, ); @@ -262,13 +264,15 @@ fn execute_batches_internal( replay_vote_sender: Option<&ReplayVoteSender>, timings: &mut ExecuteTimings, cost_capacity_meter: Arc>, + tx_costs: &[u64], ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); let (results, new_timings): (Vec>, Vec) = PAR_THREAD_POOL.install(|| { batches .into_par_iter() - .map(|batch| { + .enumerate() + .map(|(index, batch)| { let mut timings = ExecuteTimings::default(); let result = execute_batch( batch, @@ -277,6 +281,7 @@ fn execute_batches_internal( replay_vote_sender, &mut timings, cost_capacity_meter.clone(), + tx_costs[index], ); if let Some(entry_callback) = entry_callback { entry_callback(bank); @@ -317,6 +322,7 @@ fn execute_batches( replay_vote_sender: Option<&ReplayVoteSender>, timings: &mut ExecuteTimings, cost_capacity_meter: Arc>, + cost_model: &CostModel, ) -> Result<()> { let lock_results = batches .iter() @@ -327,42 +333,64 @@ fn execute_batches( .flat_map(|batch| batch.sanitized_transactions().to_vec()) .collect::>(); - let cost_model = CostModel::new(); let mut minimal_tx_cost = u64::MAX; let mut total_cost: u64 = 0; + let mut total_cost_without_bpf: u64 = 0; // Allowing collect here, since it also computes the minimal tx cost, and aggregate cost. // These two values are later used for checking if the tx_costs vector needs to be iterated over. + // The collection is a pair of (full cost, cost without estimated-bpf-code-costs). #[allow(clippy::needless_collect)] let tx_costs = sanitized_txs .iter() .map(|tx| { - let cost = cost_model.calculate_cost(tx).sum(); + let tx_cost = cost_model.calculate_cost(tx); + let cost = tx_cost.sum(); + let cost_without_bpf = tx_cost.sum_without_bpf(); minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); total_cost = total_cost.saturating_add(cost); - cost + total_cost_without_bpf = total_cost_without_bpf.saturating_add(cost_without_bpf); + (cost, cost_without_bpf) }) .collect::>(); let target_batch_count = get_thread_count() as u64; let mut tx_batches: Vec = vec![]; + let mut tx_batch_costs: Vec = vec![]; let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) { let target_batch_cost = total_cost / target_batch_count; let mut batch_cost: u64 = 0; + let mut batch_cost_without_bpf: u64 = 0; let mut slice_start = 0; - tx_costs.into_iter().enumerate().for_each(|(index, cost)| { - let next_index = index + 1; - batch_cost = batch_cost.saturating_add(cost); - if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { - let tx_batch = - rebatch_transactions(&lock_results, bank, &sanitized_txs, slice_start, index); - slice_start = next_index; - tx_batches.push(tx_batch); - batch_cost = 0; - } - }); + tx_costs + .into_iter() + .enumerate() + .for_each(|(index, cost_pair)| { + let next_index = index + 1; + batch_cost = batch_cost.saturating_add(cost_pair.0); + batch_cost_without_bpf = batch_cost_without_bpf.saturating_add(cost_pair.1); + if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { + let tx_batch = rebatch_transactions( + &lock_results, + bank, + &sanitized_txs, + slice_start, + index, + ); + slice_start = next_index; + tx_batches.push(tx_batch); + tx_batch_costs.push(batch_cost_without_bpf); + batch_cost = 0; + batch_cost_without_bpf = 0; + } + }); &tx_batches[..] } else { + match batches.len() { + // Ensure that the total cost attributed to this batch is essentially correct + 0 => tx_batch_costs = Vec::new(), + n => tx_batch_costs = vec![total_cost_without_bpf / (n as u64); n], + } batches }; @@ -374,6 +402,7 @@ fn execute_batches( replay_vote_sender, timings, cost_capacity_meter, + &tx_batch_costs, ) } @@ -430,6 +459,7 @@ fn process_entries_with_callback( let mut batches = vec![]; let mut tick_hashes = vec![]; let mut rng = thread_rng(); + let cost_model = CostModel::new(); for entry in entries { match entry { @@ -447,6 +477,7 @@ fn process_entries_with_callback( replay_vote_sender, timings, cost_capacity_meter.clone(), + &cost_model, )?; batches.clear(); for hash in &tick_hashes { @@ -504,6 +535,7 @@ fn process_entries_with_callback( replay_vote_sender, timings, cost_capacity_meter.clone(), + &cost_model, )?; batches.clear(); } @@ -519,6 +551,7 @@ fn process_entries_with_callback( replay_vote_sender, timings, cost_capacity_meter, + &cost_model, )?; for hash in tick_hashes { bank.register_tick(hash); diff --git a/runtime/src/cost_model.rs b/runtime/src/cost_model.rs index e9ed3cb1a5a2b8..79a7bbabbefee0 100644 --- a/runtime/src/cost_model.rs +++ b/runtime/src/cost_model.rs @@ -62,11 +62,15 @@ impl TransactionCost { } pub fn sum(&self) -> u64 { + self.sum_without_bpf() + .saturating_add(self.bpf_execution_cost) + } + + pub fn sum_without_bpf(&self) -> u64 { self.signature_cost .saturating_add(self.write_lock_cost) .saturating_add(self.data_bytes_cost) .saturating_add(self.builtins_execution_cost) - .saturating_add(self.bpf_execution_cost) } }