Skip to content

Commit

Permalink
Revert "remove dated cost checking feature (backport solana-labs#29598)…
Browse files Browse the repository at this point in the history
… (solana-labs#29656)"

This reverts commit e7a57ca.
  • Loading branch information
t-nelson committed Mar 14, 2023
1 parent d373e87 commit b5fe585
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 1 deletion.
81 changes: 80 additions & 1 deletion ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,41 @@ use {
collections::{HashMap, HashSet},
path::PathBuf,
result,
sync::Arc,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
thiserror::Error,
};

// it tracks the block cost available capacity - number of compute-units allowed
// by max block cost limit
#[derive(Debug)]
pub struct BlockCostCapacityMeter {
pub capacity: u64,
pub accumulated_cost: u64,
}

impl Default for BlockCostCapacityMeter {
fn default() -> Self {
BlockCostCapacityMeter::new(MAX_BLOCK_UNITS)
}
}

impl BlockCostCapacityMeter {
pub fn new(capacity_limit: u64) -> Self {
Self {
capacity: capacity_limit,
accumulated_cost: 0_u64,
}
}

// return the remaining capacity
pub fn accumulate(&mut self, cost: u64) -> u64 {
self.accumulated_cost += cost;
self.capacity.saturating_sub(self.accumulated_cost)
}
}

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix))
Expand Down Expand Up @@ -113,12 +142,26 @@ fn get_first_error(
first_err
}

fn aggregate_total_execution_units(execute_timings: &ExecuteTimings) -> u64 {
let mut execute_cost_units: u64 = 0;
for (program_id, timing) in &execute_timings.details.per_program_timings {
if timing.count < 1 {
continue;
}
execute_cost_units =
execute_cost_units.saturating_add(timing.accumulated_units / timing.count as u64);
trace!("aggregated execution cost of {:?} {:?}", program_id, timing);
}
execute_cost_units
}

fn execute_batch(
batch: &TransactionBatch,
bank: &Arc<Bank>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
let record_token_balances = transaction_status_sender.is_some();

Expand All @@ -130,6 +173,8 @@ fn execute_batch(
vec![]
};

let pre_process_units: u64 = aggregate_total_execution_units(timings);

let (tx_results, balances) = batch.bank().load_execute_and_commit_transactions(
batch,
MAX_PROCESSING_AGE,
Expand All @@ -139,6 +184,29 @@ fn execute_batch(
timings,
);

if bank
.feature_set
.is_active(&feature_set::gate_large_block::id())
{
let execution_cost_units = aggregate_total_execution_units(timings) - pre_process_units;
let remaining_block_cost_cap = cost_capacity_meter
.write()
.unwrap()
.accumulate(execution_cost_units);

debug!(
"bank {} executed a batch, number of transactions {}, total execute cu {}, remaining block cost cap {}",
bank.slot(),
batch.sanitized_transactions().len(),
execution_cost_units,
remaining_block_cost_cap,
);

if remaining_block_cost_cap == 0_u64 {
return Err(TransactionError::WouldExceedMaxBlockCostLimit);
}
}

bank_utils::find_and_send_votes(
batch.sanitized_transactions(),
&tx_results,
Expand Down Expand Up @@ -186,6 +254,7 @@ fn execute_batches_internal(
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let (results, new_timings): (Vec<Result<()>>, Vec<ExecuteTimings>) =
Expand All @@ -201,6 +270,7 @@ fn execute_batches_internal(
transaction_status_sender,
replay_vote_sender,
&mut timings,
cost_capacity_meter.clone(),
);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
Expand Down Expand Up @@ -242,6 +312,7 @@ fn execute_batches(
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
let (lock_results, sanitized_txs): (Vec<_>, Vec<_>) = batches
.iter()
Expand Down Expand Up @@ -300,6 +371,7 @@ fn execute_batches(
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
}

Expand Down Expand Up @@ -333,6 +405,7 @@ pub fn process_entries_for_tests(
replay_vote_sender,
None,
&mut timings,
Arc::new(RwLock::new(BlockCostCapacityMeter::default())),
);

debug!("process_entries: {:?}", timings);
Expand All @@ -349,6 +422,7 @@ fn process_entries_with_callback(
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut batches = vec![];
Expand All @@ -370,6 +444,7 @@ fn process_entries_with_callback(
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
)?;
batches.clear();
for hash in &tick_hashes {
Expand Down Expand Up @@ -426,6 +501,7 @@ fn process_entries_with_callback(
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter.clone(),
)?;
batches.clear();
}
Expand All @@ -440,6 +516,7 @@ fn process_entries_with_callback(
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)?;
for hash in tick_hashes {
bank.register_tick(hash);
Expand Down Expand Up @@ -905,6 +982,7 @@ pub fn confirm_slot(

let mut replay_elapsed = Measure::start("replay_elapsed");
let mut execute_timings = ExecuteTimings::default();
let cost_capacity_meter = Arc::new(RwLock::new(BlockCostCapacityMeter::default()));
// Note: This will shuffle entries' transactions in-place.
let process_result = process_entries_with_callback(
bank,
Expand All @@ -915,6 +993,7 @@ pub fn confirm_slot(
replay_vote_sender,
transaction_cost_metrics_sender,
&mut execute_timings,
cost_capacity_meter,
)
.map_err(BlockstoreProcessorError::from);
replay_elapsed.stop();
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ pub mod stake_merge_with_unmatched_credits_observed {
solana_sdk::declare_id!("meRgp4ArRPhD3KtCY9c5yAf2med7mBLsjKTPeVUHqBL");
}

pub mod gate_large_block {
solana_sdk::declare_id!("2ry7ygxiYURULZCrypHhveanvP5tzZ4toRwVp89oCNSj");
}

pub mod zk_token_sdk_enabled {
solana_sdk::declare_id!("zk1snxsc6Fh3wsGNbbHAJNHiJoYgF29mMnTSusGx5EJ");
}
Expand Down Expand Up @@ -449,6 +453,7 @@ lazy_static! {
(merge_nonce_error_into_system_error::id(), "merge NonceError into SystemError"),
(disable_fees_sysvar::id(), "disable fees sysvar"),
(stake_merge_with_unmatched_credits_observed::id(), "allow merging active stakes with unmatched credits_observed #18985"),
(gate_large_block::id(), "validator checks block cost against max limit in realtime, reject if exceeds."),
(zk_token_sdk_enabled::id(), "enable Zk Token proof program and syscalls"),
(curve25519_syscall_enabled::id(), "enable curve25519 syscalls"),
(versioned_tx_message_enabled::id(), "enable versioned transaction message processing"),
Expand Down

0 comments on commit b5fe585

Please sign in to comment.