diff --git a/aptos-move/aptos-transaction-benchmarks/src/main.rs b/aptos-move/aptos-transaction-benchmarks/src/main.rs index c8f13ffaff0ed..e710dd0c604d5 100755 --- a/aptos-move/aptos-transaction-benchmarks/src/main.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/main.rs @@ -47,6 +47,9 @@ struct ParamSweepOpt { #[clap(long, default_value = "10")] pub num_runs: usize, + + #[clap(long)] + pub maybe_gas_limit: Option, } #[derive(Debug, Parser)] @@ -71,6 +74,9 @@ struct ExecuteOpt { #[clap(long, default_value = "true")] pub no_conflict_txns: bool, + + #[clap(long)] + pub maybe_gas_limit: Option, } fn param_sweep(opt: ParamSweepOpt) { @@ -85,6 +91,8 @@ fn param_sweep(opt: ParamSweepOpt) { let run_parallel = !opt.skip_parallel; let run_sequential = !opt.skip_sequential; + let maybe_gas_limit = opt.maybe_gas_limit; + assert!( run_sequential || run_parallel, "Must run at least one of parallel or sequential" @@ -102,6 +110,7 @@ fn param_sweep(opt: ParamSweepOpt) { 1, concurrency_level, false, + maybe_gas_limit, ); par_tps.sort(); seq_tps.sort(); @@ -162,6 +171,7 @@ fn execute(opt: ExecuteOpt) { opt.num_executor_shards, opt.concurrency_level_per_shard, opt.no_conflict_txns, + opt.maybe_gas_limit, ); let sum: usize = par_tps.iter().sum(); diff --git a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs index c74d13e99b3a0..fb6d65617cf72 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs @@ -73,6 +73,7 @@ where self.num_transactions, 1, AccountPickStyle::Unlimited, + None, ) }, |state| state.execute_sequential(), @@ -91,6 +92,7 @@ where self.num_transactions, 1, AccountPickStyle::Unlimited, + None, ) }, |state| state.execute_parallel(), @@ -111,6 +113,7 @@ where num_executor_shards: usize, concurrency_level_per_shard: usize, no_conflict_txn: bool, + maybe_gas_limit: Option, ) -> (Vec, Vec) { let mut par_tps = Vec::new(); let mut seq_tps = Vec::new(); @@ -135,6 +138,7 @@ where num_txn, num_executor_shards, account_pick_style, + maybe_gas_limit, ); for i in 0..total_runs { @@ -184,12 +188,14 @@ where num_transactions: usize, num_executor_shards: usize, account_pick_style: AccountPickStyle, + maybe_gas_limit: Option, ) -> Self { Self::with_universe( strategy, universe_strategy(num_accounts, num_transactions, account_pick_style), num_transactions, num_executor_shards, + maybe_gas_limit, ) } @@ -200,6 +206,7 @@ where universe_strategy: impl Strategy, num_transactions: usize, num_executor_shards: usize, + maybe_gas_limit: Option, ) -> Self { let mut runner = TestRunner::default(); let universe_gen = universe_strategy @@ -214,9 +221,13 @@ where let universe = universe_gen.setup_gas_cost_stability(&mut executor); let state_view = Arc::new(executor.get_state_view().clone()); - let parallel_block_executor = - Arc::new(ShardedBlockExecutor::new(num_executor_shards, None)); - let sequential_block_executor = Arc::new(ShardedBlockExecutor::new(1, Some(1))); + let parallel_block_executor = Arc::new(ShardedBlockExecutor::new( + num_executor_shards, + None, + maybe_gas_limit, + )); + let sequential_block_executor = + Arc::new(ShardedBlockExecutor::new(1, Some(1), maybe_gas_limit)); let validator_set = ValidatorSet::fetch_config( &FakeExecutor::from_head_genesis() diff --git a/aptos-move/aptos-vm-logging/src/lib.rs b/aptos-move/aptos-vm-logging/src/lib.rs index 5cb3c3263bc75..be2361bc85d81 100644 --- a/aptos-move/aptos-vm-logging/src/lib.rs +++ b/aptos-move/aptos-vm-logging/src/lib.rs @@ -76,12 +76,12 @@ pub fn speculative_log(level: Level, context: &AdapterLogSchema, message: String }; } -/// Flushes the currently stored logs, and swaps the speculative log / event storage with None. +/// Flushes the first num_to_flush logs in the currently stored logs, and swaps the speculative log / event storage with None. /// Must be called after block execution is complete (removes the storage from Arc). -pub fn flush_speculative_logs() { +pub fn flush_speculative_logs(num_to_flush: usize) { if let Some(log_events_ptr) = BUFFERED_LOG_EVENTS.swap(None) { match Arc::try_unwrap(log_events_ptr) { - Ok(log_events) => log_events.flush(), + Ok(log_events) => log_events.flush(num_to_flush), Err(_) => { alert!("Speculative log storage must be uniquely owned to flush"); }, diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 69feda6203bd4..0e53faa29262f 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -1483,6 +1483,41 @@ impl VMExecutor for AptosVM { transactions, state_view, Self::get_concurrency_level(), + None, + ); + if ret.is_ok() { + // Record the histogram count for transactions per block. + BLOCK_TRANSACTION_COUNT.observe(count as f64); + } + ret + } + + fn execute_block_with_gas_limit( + transactions: Vec, + state_view: &(impl StateView + Sync), + maybe_gas_limit: Option, + ) -> std::result::Result, VMStatus> { + fail_point!("move_adapter::execute_block", |_| { + Err(VMStatus::Error( + StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, + None, + )) + }); + + let log_context = AdapterLogSchema::new(state_view.id(), 0); + info!( + log_context, + "Executing block, transaction count: {}", + transactions.len() + ); + + let count = transactions.len(); + let ret = BlockAptosVM::execute_block( + Arc::clone(&RAYON_EXEC_POOL), + transactions, + state_view, + Self::get_concurrency_level(), + maybe_gas_limit, ); if ret.is_ok() { // Record the histogram count for transactions per block. diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 03e59ba116473..358b58244fbf4 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -124,6 +124,13 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { "Could not combine TransactionOutputExt with deltas" ); } + + /// Return the amount of gas consumed by the transaction. + fn gas_used(&self) -> u64 { + self.committed_output + .get() + .map_or(0, |output| output.gas_used()) + } } pub struct BlockAptosVM(); @@ -134,6 +141,7 @@ impl BlockAptosVM { transactions: Vec, state_view: &S, concurrency_level: usize, + maybe_gas_limit: Option, ) -> Result, VMStatus> { let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer(); // Verify the signatures of all the transactions in parallel. @@ -151,23 +159,32 @@ impl BlockAptosVM { }); drop(signature_verification_timer); - init_speculative_logs(signature_verified_block.len()); + let num_txns = signature_verified_block.len(); + init_speculative_logs(num_txns); BLOCK_EXECUTOR_CONCURRENCY.set(concurrency_level as i64); let executor = BlockExecutor::, S>::new( concurrency_level, executor_thread_pool, + maybe_gas_limit, ); let ret = executor.execute_block(state_view, signature_verified_block, state_view); - flush_speculative_logs(); - match ret { - Ok(outputs) => Ok(outputs - .into_iter() - .map(|output| output.take_output()) - .collect()), + Ok(outputs) => { + let output_vec: Vec = outputs + .into_iter() + .map(|output| output.take_output()) + .collect(); + + // Flush the speculative logs of the committed transactions. + let pos = output_vec.partition_point(|o| !o.status().is_retry()); + + flush_speculative_logs(pos); + + Ok(output_vec) + }, Err(Error::ModulePathReadWrite) => { unreachable!("[Execution]: Must be handled by sequential fallback") }, diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index c96723f2b5324..b598a1411bf1e 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -155,6 +155,14 @@ pub trait VMExecutor: Send + Sync { state_view: &(impl StateView + Sync), ) -> Result, VMStatus>; + /// Executes a block of transactions with per_block_gas_limit + /// and returns output for each one of them. + fn execute_block_with_gas_limit( + transactions: Vec, + state_view: &(impl StateView + Sync), + maybe_gas_limit: Option, + ) -> Result, VMStatus>; + /// Executes a block of transactions using a sharded block executor and returns the results. fn execute_block_sharded( sharded_block_executor: &ShardedBlockExecutor, diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs b/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs index 3c480bb5d77f6..eb9ae70387b72 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs @@ -18,6 +18,7 @@ pub struct ExecutorShard { executor_thread_pool: Arc, command_rx: Receiver>, result_tx: Sender, VMStatus>>, + maybe_gas_limit: Option, } impl ExecutorShard { @@ -26,6 +27,7 @@ impl ExecutorShard { num_executor_threads: usize, command_rx: Receiver>, result_tx: Sender, VMStatus>>, + maybe_gas_limit: Option, ) -> Self { let executor_thread_pool = Arc::new( rayon::ThreadPoolBuilder::new() @@ -38,6 +40,7 @@ impl ExecutorShard { executor_thread_pool, command_rx, result_tx, + maybe_gas_limit, } } @@ -60,6 +63,7 @@ impl ExecutorShard { transactions, state_view.as_ref(), concurrency_level_per_shard, + self.maybe_gas_limit, ); drop(state_view); self.result_tx.send(ret).unwrap(); diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs b/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs index 5b30703aa101a..5fd8d4a86a655 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs @@ -38,7 +38,11 @@ pub enum ExecutorShardCommand { } impl ShardedBlockExecutor { - pub fn new(num_executor_shards: usize, executor_threads_per_shard: Option) -> Self { + pub fn new( + num_executor_shards: usize, + executor_threads_per_shard: Option, + maybe_gas_limit: Option, + ) -> Self { assert!(num_executor_shards > 0, "num_executor_shards must be > 0"); let executor_threads_per_shard = executor_threads_per_shard.unwrap_or_else(|| { (num_cpus::get() as f64 / num_executor_shards as f64).ceil() as usize @@ -56,6 +60,7 @@ impl ShardedBlockExecutor { executor_threads_per_shard, transactions_rx, result_tx, + maybe_gas_limit, )); } info!( @@ -128,13 +133,19 @@ fn spawn_executor_shard( concurrency_level: usize, command_rx: Receiver>, result_tx: Sender, VMStatus>>, + maybe_gas_limit: Option, ) -> thread::JoinHandle<()> { // create and start a new executor shard in a separate thread thread::Builder::new() .name(format!("executor-shard-{}", shard_id)) .spawn(move || { - let executor_shard = - ExecutorShard::new(shard_id, concurrency_level, command_rx, result_tx); + let executor_shard = ExecutorShard::new( + shard_id, + concurrency_level, + command_rx, + result_tx, + maybe_gas_limit, + ); executor_shard.start(); }) .unwrap() diff --git a/aptos-move/block-executor/src/counters.rs b/aptos-move/block-executor/src/counters.rs index 397c89e047a44..1903c971920c5 100644 --- a/aptos-move/block-executor/src/counters.rs +++ b/aptos-move/block-executor/src/counters.rs @@ -24,6 +24,24 @@ pub static SPECULATIVE_ABORT_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// Count of times the BlockSTM is early halted due to exceeding the per-block gas limit. +pub static PARALLEL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_execution_par_gas_limit_count", + "Count of times the BlockSTM is early halted due to exceeding the per-block gas limit" + ) + .unwrap() +}); + +/// Count of times the sequential execution is early halted due to exceeding the per-block gas limit. +pub static SEQUENTIAL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_execution_seq_gas_limit_count", + "Count of times the sequential execution is early halted due to exceeding the per-block gas limit" + ) + .unwrap() +}); + pub static PARALLEL_EXECUTION_SECONDS: Lazy = Lazy::new(|| { register_histogram!( // metric name @@ -109,3 +127,39 @@ pub static DEPENDENCY_WAIT_SECONDS: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static PARALLEL_PER_BLOCK_GAS: Lazy = Lazy::new(|| { + register_histogram!( + "aptos_execution_par_per_block_gas", + "The per-block consumed gas in parallel execution (Block STM)", + exponential_buckets(/*start=*/ 1.0, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + +pub static SEQUENTIAL_PER_BLOCK_GAS: Lazy = Lazy::new(|| { + register_histogram!( + "aptos_execution_seq_per_block_gas", + "The per-block consumed gas in sequential execution", + exponential_buckets(/*start=*/ 1.0, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + +pub static PARALLEL_PER_TXN_GAS: Lazy = Lazy::new(|| { + register_histogram!( + "aptos_execution_par_per_txn_gas", + "The per-txn consumed gas in parallel execution (Block STM)", + exponential_buckets(/*start=*/ 1.0, /*factor=*/ 1.5, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + +pub static SEQUENTIAL_PER_TXN_GAS: Lazy = Lazy::new(|| { + register_histogram!( + "aptos_execution_seq_per_txn_gas", + "The per-txn consumed gas in sequential execution", + exponential_buckets(/*start=*/ 1.0, /*factor=*/ 1.5, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 25e76d89e3404..8970d0fccb52d 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -9,7 +9,7 @@ use crate::{ TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS, }, errors::*, - scheduler::{Scheduler, SchedulerTask, Wave}, + scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave}, task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput}, txn_last_input_output::TxnLastInputOutput, view::{LatestView, MVHashMapView}, @@ -40,7 +40,7 @@ use std::{ #[derive(Debug)] enum CommitRole { - Coordinator(Vec>, usize), + Coordinator(Vec>), Worker(Receiver), } @@ -49,6 +49,7 @@ pub struct BlockExecutor { // threads that may be concurrently participating in parallel execution. concurrency_level: usize, executor_thread_pool: Arc, + maybe_gas_limit: Option, phantom: PhantomData<(T, E, S)>, } @@ -60,7 +61,11 @@ where { /// The caller needs to ensure that concurrency_level > 1 (0 is illegal and 1 should /// be handled by sequential execution) and that concurrency_level <= num_cpus. - pub fn new(concurrency_level: usize, executor_thread_pool: Arc) -> Self { + pub fn new( + concurrency_level: usize, + executor_thread_pool: Arc, + maybe_gas_limit: Option, + ) -> Self { assert!( concurrency_level > 0 && concurrency_level <= num_cpus::get(), "Parallel execution concurrency level {} should be between 1 and number of CPUs", @@ -69,6 +74,7 @@ where Self { concurrency_level, executor_thread_pool, + maybe_gas_limit, phantom: PhantomData, } } @@ -145,7 +151,15 @@ where versioned_cache.delete(&k, idx_to_execute); } - last_input_output.record(idx_to_execute, speculative_view.take_reads(), result); + if last_input_output + .record(idx_to_execute, speculative_view.take_reads(), result) + .is_err() + { + // When there is module publishing r/w intersection, can early halt BlockSTM to + // fallback to sequential execution. + scheduler.halt(); + return SchedulerTask::NoTask; + } scheduler.finish_execution(idx_to_execute, incarnation, updates_outside) } @@ -164,7 +178,7 @@ where let (idx_to_validate, incarnation) = version_to_validate; let read_set = last_input_output .read_set(idx_to_validate) - .expect("Prior read-set must be recorded"); + .expect("[BlockSTM]: Prior read-set must be recorded"); let valid = read_set.iter().all(|r| { match versioned_cache.fetch_data(r.path(), idx_to_validate) { @@ -205,7 +219,72 @@ where } } - fn commit_hook( + fn coordinator_commit_hook( + &self, + maybe_gas_limit: Option, + scheduler: &Scheduler, + post_commit_txs: &Vec>, + worker_idx: &mut usize, + accumulated_gas: &mut u64, + scheduler_task: &mut SchedulerTask, + last_input_output: &TxnLastInputOutput, + ) { + while let Some(txn_idx) = scheduler.try_commit() { + post_commit_txs[*worker_idx] + .send(txn_idx) + .expect("Worker must be available"); + // Iterate round robin over workers to do commit_hook. + *worker_idx = (*worker_idx + 1) % post_commit_txs.len(); + + // Committed the last transaction, BlockSTM finishes execution. + if txn_idx as usize + 1 == scheduler.num_txns() as usize { + *scheduler_task = SchedulerTask::Done; + + counters::PARALLEL_PER_BLOCK_GAS.observe(*accumulated_gas as f64); + break; + } + + // For committed txns with Success status, calculate the accumulated gas. + // For committed txns with Abort or SkipRest status, early halt BlockSTM. + match last_input_output.gas_used(txn_idx) { + Some(gas) => { + *accumulated_gas += gas; + counters::PARALLEL_PER_TXN_GAS.observe(gas as f64); + }, + None => { + scheduler.halt(); + + counters::PARALLEL_PER_BLOCK_GAS.observe(*accumulated_gas as f64); + debug!("[BlockSTM]: Early halted due to Abort or SkipRest txn."); + break; + }, + }; + + if let Some(per_block_gas_limit) = maybe_gas_limit { + // When the accumulated gas of the committed txns exceeds PER_BLOCK_GAS_LIMIT, early halt BlockSTM. + if *accumulated_gas >= per_block_gas_limit { + // Set the execution output status to be SkipRest, to skip the rest of the txns. + last_input_output.update_to_skip_rest(txn_idx); + scheduler.halt(); + + counters::PARALLEL_PER_BLOCK_GAS.observe(*accumulated_gas as f64); + counters::PARALLEL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT.inc(); + debug!("[BlockSTM]: Early halted due to accumulated_gas {} >= PER_BLOCK_GAS_LIMIT {}.", *accumulated_gas, per_block_gas_limit); + break; + } + } + + // Remark: When early halting the BlockSTM, we have to make sure the current / new tasks + // will be properly handled by the threads. For instance, it is possible that the committing + // thread holds an execution task from the last iteration, and then early halts the BlockSTM + // due to a txn execution abort. In this case, we cannot reset the scheduler_task of the + // committing thread (to be Done), otherwise some other pending thread waiting for the execution + // will be pending on read forever (since the halt logic let the execution task to wake up such + // pending task). + } + } + + fn worker_commit_hook( &self, txn_idx: TxnIndex, versioned_cache: &MVHashMap, @@ -263,31 +342,34 @@ where let executor = E::init(*executor_arguments); drop(init_timer); - let committing = matches!(role, CommitRole::Coordinator(_, _)); + let committing = matches!(role, CommitRole::Coordinator(_)); let _timer = WORK_WITH_TASK_SECONDS.start_timer(); let mut scheduler_task = SchedulerTask::NoTask; + let mut accumulated_gas = 0; + let mut worker_idx = 0; loop { // Only one thread does try_commit to avoid contention. match &role { - CommitRole::Coordinator(post_commit_txs, mut idx) => { - while let Some(txn_idx) = scheduler.try_commit() { - post_commit_txs[idx] - .send(txn_idx) - .expect("Worker must be available"); - // Iterate round robin over workers to do commit_hook. - idx = (idx + 1) % post_commit_txs.len(); - - if txn_idx as usize + 1 == block.len() { - // Committed the last transaction / everything. - scheduler_task = SchedulerTask::Done; - break; - } - } + CommitRole::Coordinator(post_commit_txs) => { + self.coordinator_commit_hook( + self.maybe_gas_limit, + scheduler, + post_commit_txs, + &mut worker_idx, + &mut accumulated_gas, + &mut scheduler_task, + last_input_output, + ); }, CommitRole::Worker(rx) => { while let Ok(txn_idx) = rx.try_recv() { - self.commit_hook(txn_idx, versioned_cache, last_input_output, base_view); + self.worker_commit_hook( + txn_idx, + versioned_cache, + last_input_output, + base_view, + ); } }, } @@ -312,7 +394,7 @@ where SchedulerTask::ExecutionTask(_, Some(condvar)) => { let (lock, cvar) = &*condvar; // Mark dependency resolved. - *lock.lock() = true; + *lock.lock() = DependencyStatus::Resolved; // Wake up the process waiting for dependency. cvar.notify_one(); @@ -324,7 +406,7 @@ where if let CommitRole::Worker(rx) = &role { // Until the sender drops the tx, an index for commit_hook might be sent. while let Ok(txn_idx) = rx.recv() { - self.commit_hook( + self.worker_commit_hook( txn_idx, versioned_cache, last_input_output, @@ -362,7 +444,7 @@ where let scheduler = Scheduler::new(num_txns); let mut roles: Vec = vec![]; - let mut senders = Vec::with_capacity(self.concurrency_level - 1); + let mut senders: Vec> = Vec::with_capacity(self.concurrency_level - 1); for _ in 0..(self.concurrency_level - 1) { let (tx, rx) = mpsc::channel(); roles.push(CommitRole::Worker(rx)); @@ -373,7 +455,7 @@ where // Note: It is important that the Coordinator is the first thread that // picks up a role will be a coordinator. Hence, if multiple parallel // executors are running concurrently, they will all have active coordinator. - roles.push(CommitRole::Coordinator(senders, 0)); + roles.push(CommitRole::Coordinator(senders)); let timer = RAYON_EXECUTION_SECONDS.start_timer(); self.executor_thread_pool.scope(|s| { @@ -447,6 +529,7 @@ where let mut data_map = BTreeMap::new(); let mut ret = Vec::with_capacity(num_txns); + let mut accumulated_gas = 0; for (idx, txn) in signature_verified_block.iter().enumerate() { let res = executor.execute_transaction( &LatestView::::new_btree_view(base_view, &data_map, idx as TxnIndex), @@ -456,7 +539,6 @@ where ); let must_skip = matches!(res, ExecutionStatus::SkipRest(_)); - match res { ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => { assert_eq!( @@ -468,6 +550,10 @@ where for (ap, write_op) in output.get_writes().into_iter() { data_map.insert(ap, write_op); } + // Calculating the accumulated gas of the committed txns. + let txn_gas = output.gas_used(); + accumulated_gas += txn_gas; + counters::SEQUENTIAL_PER_TXN_GAS.observe(txn_gas as f64); ret.push(output); }, ExecutionStatus::Abort(err) => { @@ -476,11 +562,24 @@ where }, } + // When the txn is a SkipRest txn, halt sequential execution. if must_skip { + debug!("[Execution]: Sequential execution early halted due to SkipRest txn."); break; } + + if let Some(per_block_gas_limit) = self.maybe_gas_limit { + // When the accumulated gas of the committed txns + // exceeds per_block_gas_limit, halt sequential execution. + if accumulated_gas >= per_block_gas_limit { + counters::SEQUENTIAL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT.inc(); + debug!("[Execution]: Sequential execution early halted due to accumulated_gas {} >= PER_BLOCK_GAS_LIMIT {}.", accumulated_gas, per_block_gas_limit); + break; + } + } } + counters::SEQUENTIAL_PER_BLOCK_GAS.observe(accumulated_gas as f64); ret.resize_with(num_txns, E::Output::skip_output); Ok(ret) } diff --git a/aptos-move/block-executor/src/proptest_types/bencher.rs b/aptos-move/block-executor/src/proptest_types/bencher.rs index f0a32724b2546..abe5c9d236019 100644 --- a/aptos-move/block-executor/src/proptest_types/bencher.rs +++ b/aptos-move/block-executor/src/proptest_types/bencher.rs @@ -100,7 +100,7 @@ where .map(|txn_gen| txn_gen.materialize(&key_universe, (false, false))) .collect(); - let expected_output = ExpectedOutput::generate_baseline(&transactions, None); + let expected_output = ExpectedOutput::generate_baseline(&transactions, None, None); Self { transactions, @@ -124,7 +124,7 @@ where Transaction, ValueType>, Task, ValueType>, EmptyDataView, ValueType>, - >::new(num_cpus::get(), executor_thread_pool) + >::new(num_cpus::get(), executor_thread_pool, None) .execute_transactions_parallel((), &self.transactions, &data_view); self.expected_output.assert_output(&output); diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index d5194d81c3ea8..93d6cda5fc9af 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -19,7 +19,8 @@ use proptest::{ strategy::{Strategy, ValueTree}, test_runner::TestRunner, }; -use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; +use rand::Rng; +use std::{cmp::max, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; fn run_transactions( key_universe: &[K], @@ -28,6 +29,7 @@ fn run_transactions( skip_rest_transactions: Vec, num_repeat: usize, module_access: (bool, bool), + maybe_gas_limit: Option, ) where K: Hash + Clone + Debug + Eq + Send + Sync + PartialOrd + Ord + 'static, V: Clone + Eq + Send + Sync + Arbitrary + 'static, @@ -62,7 +64,11 @@ fn run_transactions( Transaction, ValueType>, Task, ValueType>, EmptyDataView, ValueType>, - >::new(num_cpus::get(), executor_thread_pool.clone()) + >::new( + num_cpus::get(), + executor_thread_pool.clone(), + maybe_gas_limit, + ) .execute_transactions_parallel((), &transactions, &data_view); if module_access.0 && module_access.1 { @@ -70,7 +76,7 @@ fn run_transactions( continue; } - let baseline = ExpectedOutput::generate_baseline(&transactions, None); + let baseline = ExpectedOutput::generate_baseline(&transactions, None, maybe_gas_limit); baseline.assert_output(&output); } } @@ -84,7 +90,7 @@ proptest! { abort_transactions in vec(any::(), 0), skip_rest_transactions in vec(any::(), 0), ) { - run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false)); + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), None); } #[test] @@ -94,7 +100,7 @@ proptest! { abort_transactions in vec(any::(), 5), skip_rest_transactions in vec(any::(), 0), ) { - run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false)); + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), None); } #[test] @@ -104,7 +110,7 @@ proptest! { abort_transactions in vec(any::(), 0), skip_rest_transactions in vec(any::(), 5), ) { - run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false)); + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), None); } #[test] @@ -114,7 +120,7 @@ proptest! { abort_transactions in vec(any::(), 5), skip_rest_transactions in vec(any::(), 5), ) { - run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false)); + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), None); } #[test] @@ -124,12 +130,11 @@ proptest! { abort_transactions in vec(any::(), 3), skip_rest_transactions in vec(any::(), 3), ) { - run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false)); + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), None); } } -#[test] -fn dynamic_read_writes() { +fn dynamic_read_writes_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); let universe = vec(any::<[u8; 32]>(), 100) @@ -138,7 +143,7 @@ fn dynamic_read_writes() { .current(); let transaction_gen = vec( any_with::>(TransactionGenParams::new_dynamic()), - 3000, + num_txns, ) .new_tree(&mut runner) .expect("creating a new value should succeed") @@ -151,13 +156,12 @@ fn dynamic_read_writes() { vec![], 100, (false, false), + maybe_gas_limit, ); } -#[test] -fn deltas_writes_mixed() { +fn deltas_writes_mixed_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); - let num_txns = 1000; let universe = vec(any::<[u8; 32]>(), 50) .new_tree(&mut runner) @@ -193,18 +197,20 @@ fn deltas_writes_mixed() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, DeltaDataView, ValueType<[u8; 32]>>, - >::new(num_cpus::get(), executor_thread_pool.clone()) + >::new( + num_cpus::get(), + executor_thread_pool.clone(), + maybe_gas_limit, + ) .execute_transactions_parallel((), &transactions, &data_view); - let baseline = ExpectedOutput::generate_baseline(&transactions, None); + let baseline = ExpectedOutput::generate_baseline(&transactions, None, maybe_gas_limit); baseline.assert_output(&output); } } -#[test] -fn deltas_resolver() { +fn deltas_resolver_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); - let num_txns = 1000; let universe = vec(any::<[u8; 32]>(), 50) .new_tree(&mut runner) @@ -240,7 +246,11 @@ fn deltas_resolver() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, DeltaDataView, ValueType<[u8; 32]>>, - >::new(num_cpus::get(), executor_thread_pool.clone()) + >::new( + num_cpus::get(), + executor_thread_pool.clone(), + maybe_gas_limit, + ) .execute_transactions_parallel((), &transactions, &data_view); let delta_writes = output @@ -250,13 +260,13 @@ fn deltas_resolver() { .map(|out| out.delta_writes()) .collect(); - let baseline = ExpectedOutput::generate_baseline(&transactions, Some(delta_writes)); + let baseline = + ExpectedOutput::generate_baseline(&transactions, Some(delta_writes), maybe_gas_limit); baseline.assert_output(&output); } } -#[test] -fn dynamic_read_writes_contended() { +fn dynamic_read_writes_contended_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); let universe = vec(any::<[u8; 32]>(), 10) @@ -266,7 +276,7 @@ fn dynamic_read_writes_contended() { let transaction_gen = vec( any_with::>(TransactionGenParams::new_dynamic()), - 1000, + num_txns, ) .new_tree(&mut runner) .expect("creating a new value should succeed") @@ -279,11 +289,11 @@ fn dynamic_read_writes_contended() { vec![], 100, (false, false), + maybe_gas_limit, ); } -#[test] -fn module_publishing_fallback() { +fn module_publishing_fallback_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); let universe = vec(any::<[u8; 32]>(), 100) @@ -292,7 +302,7 @@ fn module_publishing_fallback() { .current(); let transaction_gen = vec( any_with::>(TransactionGenParams::new_dynamic()), - 3000, + num_txns, ) .new_tree(&mut runner) .expect("creating a new value should succeed") @@ -305,6 +315,7 @@ fn module_publishing_fallback() { vec![], 2, (false, true), + maybe_gas_limit, ); run_transactions( &universe, @@ -313,13 +324,21 @@ fn module_publishing_fallback() { vec![], 2, (false, true), + maybe_gas_limit, + ); + run_transactions( + &universe, + transaction_gen, + vec![], + vec![], + 2, + (true, true), + maybe_gas_limit, ); - run_transactions(&universe, transaction_gen, vec![], vec![], 2, (true, true)); } -fn publishing_fixed_params() { +fn publishing_fixed_params_with_gas_limit(num_txns: usize, maybe_gas_limit: Option) { let mut runner = TestRunner::default(); - let num_txns = 300; let universe = vec(any::<[u8; 32]>(), 50) .new_tree(&mut runner) @@ -388,7 +407,7 @@ fn publishing_fixed_params() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, DeltaDataView, ValueType<[u8; 32]>>, - >::new(num_cpus::get(), executor_thread_pool) + >::new(num_cpus::get(), executor_thread_pool, maybe_gas_limit) .execute_transactions_parallel((), &transactions, &data_view); assert_ok!(output); @@ -431,18 +450,149 @@ fn publishing_fixed_params() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, DeltaDataView, ValueType<[u8; 32]>>, - >::new(num_cpus::get(), executor_thread_pool.clone()) + >::new( + num_cpus::get(), + executor_thread_pool.clone(), + Some(max(w_index, r_index) as u64 + 1), + ) // Ensure enough gas limit to commit the module txns .execute_transactions_parallel((), &transactions, &data_view); assert_eq!(output.unwrap_err(), Error::ModulePathReadWrite); } } +#[test] +fn dynamic_read_writes() { + dynamic_read_writes_with_gas_limit(3000, None); +} + +#[test] +fn deltas_writes_mixed() { + deltas_writes_mixed_with_gas_limit(1000, None); +} + +#[test] +fn deltas_resolver() { + deltas_resolver_with_gas_limit(1000, None); +} + +#[test] +fn dynamic_read_writes_contended() { + dynamic_read_writes_contended_with_gas_limit(1000, None); +} + +#[test] +fn module_publishing_fallback() { + module_publishing_fallback_with_gas_limit(3000, None); +} + #[test] // Test a single transaction intersection interleaves with a lot of dependencies and // not overlapping module r/w keys. fn module_publishing_races() { for _ in 0..5 { - publishing_fixed_params(); + publishing_fixed_params_with_gas_limit(300, None); + } +} + +// The following set of tests are the same tests as above with per-block gas limit. +proptest! { + #![proptest_config(ProptestConfig::with_cases(32))] + #[test] + fn no_early_termination_with_block_gas_limit( + universe in vec(any::<[u8; 32]>(), 100), + transaction_gen in vec(any::>(), 5000).no_shrink(), + abort_transactions in vec(any::(), 0), + skip_rest_transactions in vec(any::(), 0), + ) { + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), Some(rand::thread_rng().gen_range(0, 5000) as u64)); + } + + #[test] + fn abort_only_with_block_gas_limit( + universe in vec(any::<[u8; 32]>(), 100), + transaction_gen in vec(any::>(), 10).no_shrink(), + abort_transactions in vec(any::(), 5), + skip_rest_transactions in vec(any::(), 0), + ) { + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), Some(rand::thread_rng().gen_range(0, 10) as u64)); + } + + #[test] + fn skip_rest_only_with_block_gas_limit( + universe in vec(any::<[u8; 32]>(), 100), + transaction_gen in vec(any::>(), 5000).no_shrink(), + abort_transactions in vec(any::(), 0), + skip_rest_transactions in vec(any::(), 5), + ) { + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), Some(rand::thread_rng().gen_range(0, 5000) as u64)); + } + + #[test] + fn mixed_transactions_with_block_gas_limit( + universe in vec(any::<[u8; 32]>(), 100), + transaction_gen in vec(any::>(), 5000).no_shrink(), + abort_transactions in vec(any::(), 5), + skip_rest_transactions in vec(any::(), 5), + ) { + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), Some(rand::thread_rng().gen_range(0, 5000) as u64)); + } + + #[test] + fn dynamic_read_writes_mixed_with_block_gas_limit( + universe in vec(any::<[u8; 32]>(), 100), + transaction_gen in vec(any_with::>(TransactionGenParams::new_dynamic()), 5000).no_shrink(), + abort_transactions in vec(any::(), 3), + skip_rest_transactions in vec(any::(), 3), + ) { + run_transactions(&universe, transaction_gen, abort_transactions, skip_rest_transactions, 1, (false, false), Some(rand::thread_rng().gen_range(0, 5000) as u64)); + } +} + +#[test] +fn dynamic_read_writes_with_block_gas_limit() { + dynamic_read_writes_with_gas_limit(3000, Some(rand::thread_rng().gen_range(0, 3000) as u64)); + dynamic_read_writes_with_gas_limit(3000, Some(0)); +} + +#[test] +fn deltas_writes_mixed_with_block_gas_limit() { + deltas_writes_mixed_with_gas_limit(1000, Some(rand::thread_rng().gen_range(0, 1000) as u64)); + deltas_writes_mixed_with_gas_limit(1000, Some(0)); +} + +#[test] +fn deltas_resolver_with_block_gas_limit() { + deltas_resolver_with_gas_limit(1000, Some(rand::thread_rng().gen_range(0, 1000) as u64)); + deltas_resolver_with_gas_limit(1000, Some(0)); +} + +#[test] +fn dynamic_read_writes_contended_with_block_gas_limit() { + dynamic_read_writes_contended_with_gas_limit( + 1000, + Some(rand::thread_rng().gen_range(0, 1000) as u64), + ); + dynamic_read_writes_contended_with_gas_limit(1000, Some(0)); +} + +#[test] +fn module_publishing_fallback_with_block_gas_limit() { + module_publishing_fallback_with_gas_limit( + 3000, + // Need to execute at least 2 txns to trigger module publishing fallback + Some(rand::thread_rng().gen_range(1, 3000) as u64), + ); +} + +#[test] +// Test a single transaction intersection interleaves with a lot of dependencies and +// not overlapping module r/w keys. +fn module_publishing_races_with_block_gas_limit() { + for _ in 0..5 { + publishing_fixed_params_with_gas_limit( + 300, + Some(rand::thread_rng().gen_range(0, 300) as u64), + ); } } diff --git a/aptos-move/block-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs index 8d711751d52b2..c0c8aca22ac88 100644 --- a/aptos-move/block-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -450,7 +450,10 @@ where let mut reads_result = vec![]; for k in reads[read_idx].iter() { // TODO: later test errors as well? (by fixing state_view behavior). - reads_result.push(view.get_state_value_bytes(k).unwrap()); + match view.get_state_value_bytes(k) { + Ok(v) => reads_result.push(v), + Err(_) => reads_result.push(None), + } } ExecutionStatus::Success(Output( writes_and_deltas[write_idx].0.clone(), @@ -479,7 +482,11 @@ where V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { pub(crate) fn delta_writes(&self) -> Vec<(K, WriteOp)> { - self.3.get().cloned().expect("Delta writes must be set") + if self.3.get().is_some() { + self.3.get().cloned().expect("Delta writes must be set") + } else { + Vec::new() + } } } @@ -505,6 +512,10 @@ where fn incorporate_delta_writes(&self, delta_writes: Vec<(K, WriteOp)>) { assert_ok!(self.3.set(delta_writes)); } + + fn gas_used(&self) -> u64 { + 1 + } } /////////////////////////////////////////////////////////////////////////// @@ -517,6 +528,7 @@ pub enum ExpectedOutput { SkipRest(usize, Vec, Option)>>), Success(Vec, Option)>>), DeltaFailure(usize, Vec, Option)>>), + ExceedBlockGasLimit(usize, Vec, Option)>>), } impl ExpectedOutput { @@ -524,12 +536,14 @@ impl ExpectedOutput { pub fn generate_baseline( txns: &[Transaction], resolved_deltas: Option>>, + maybe_gas_limit: Option, ) -> Self { let mut current_world = HashMap::new(); // Delta world stores the latest u128 value of delta aggregator. When empty, the // value is derived based on deserializing current_world, or falling back to // STORAGE_AGGREGATOR_VAL. let mut delta_world = HashMap::new(); + let mut accumulated_gas = 0; let mut result_vec = vec![]; for (idx, txn) in txns.iter().enumerate() { @@ -622,7 +636,15 @@ impl ExpectedOutput { } } - result_vec.push(result) + result_vec.push(result); + + // In unit tests, the gas_used of any txn is set to be 1. + accumulated_gas += 1; + if let Some(block_gas_limit) = maybe_gas_limit { + if accumulated_gas >= block_gas_limit { + return Self::ExceedBlockGasLimit(idx, result_vec); + } + } }, Transaction::SkipRest => return Self::SkipRest(idx, result_vec), } @@ -678,6 +700,21 @@ impl ExpectedOutput { .skip(*skip_at) .for_each(|Output(_, _, result, _)| assert!(result.is_empty())) }, + (Self::ExceedBlockGasLimit(last_committed, expected_results), Ok(results)) => { + // Check_result asserts internally, so no need to return a bool. + results + .iter() + .take(*last_committed + 1) + .zip(expected_results.iter()) + .for_each(|(Output(_, _, result, _), expected_results)| { + Self::check_result(expected_results, result) + }); + + results + .iter() + .skip(*last_committed + 1) + .for_each(|Output(_, _, result, _)| assert!(result.is_empty())) + }, (Self::DeltaFailure(fail_idx, expected_results), Ok(results)) => { // Check_result asserts internally, so no need to return a bool. results diff --git a/aptos-move/block-executor/src/scheduler.rs b/aptos-move/block-executor/src/scheduler.rs index 8d1c63e1e2d99..4a0c12312c3a8 100644 --- a/aptos-move/block-executor/src/scheduler.rs +++ b/aptos-move/block-executor/src/scheduler.rs @@ -21,7 +21,23 @@ const TXN_IDX_MASK: u64 = (1 << 32) - 1; pub type Wave = u32; -type DependencyCondvar = Arc<(Mutex, Condvar)>; +#[derive(Debug)] +pub enum DependencyStatus { + // The dependency is not resolved yet. + Unresolved, + // The dependency is resolved. + Resolved, + // The parallel execution is halted. + ExecutionHalted, +} +type DependencyCondvar = Arc<(Mutex, Condvar)>; + +// Return value of the function wait_for_dependency +pub enum DependencyResult { + Dependency(DependencyCondvar), + Resolved, + ExecutionHalted, +} /// A holder for potential task returned from the Scheduler. ExecutionTask and ValidationTask /// each contain a version of transaction that must be executed or validated, respectively. @@ -57,20 +73,25 @@ pub enum SchedulerTask { /// to 'ReadyToExecute(incarnation + 1)', allowing the scheduler to create an execution /// task for the next incarnation of the transaction. /// +/// 'ExecutionHalted' is a transaction status marking that parallel execution is halted, due to +/// reasons such as module r/w intersection or exceeding per-block gas limit. It is safe to ignore +/// this status during the transaction invariant checks, e.g., suspend(), resume(), set_executed_status(). +/// When 'resolve_condvar' is called, all txns' statuses become ExecutionHalted. +/// /// Status transition diagram: -/// Ready(i) -/// | try_incarnate (incarnate successfully) -/// | -/// ↓ suspend (waiting on dependency) resume -/// Executing(i) -----------------------------> Suspended(i) ------------> Ready(i) -/// | -/// | finish_execution -/// ↓ -/// Executed(i) (pending for (re)validations) ---------------------------> Committed(i) -/// | -/// | try_abort (abort successfully) -/// ↓ finish_abort -/// Aborting(i) ---------------------------------------------------------> Ready(i+1) +/// Ready(i) --- +/// | try_incarnate (incarnate successfully) | +/// | | +/// ↓ suspend (waiting on dependency) resume | +/// Executing(i) -----------------------------> Suspended(i) ------------> Ready(i) | +/// | | resolve_condvar +/// | finish_execution |-----------------> ExecutionHalted +/// ↓ | +/// Executed(i) (pending for (re)validations) ---------------------------> Committed(i) | +/// | | +/// | try_abort (abort successfully) | +/// ↓ finish_abort | +/// Aborting(i) ---------------------------------------------------------> Ready(i+1) --- /// #[derive(Debug)] enum ExecutionStatus { @@ -80,6 +101,7 @@ enum ExecutionStatus { Executed(Incarnation), Committed(Incarnation), Aborting(Incarnation), + ExecutionHalted, } impl PartialEq for ExecutionStatus { @@ -232,6 +254,10 @@ impl Scheduler { } } + pub fn num_txns(&self) -> TxnIndex { + self.num_txns + } + /// If successful, returns Some(TxnIndex), the index of committed transaction. /// The current implementation has one dedicated thread to try_commit. /// Should not be called after the last transaction is committed. @@ -360,12 +386,12 @@ impl Scheduler { &self, txn_idx: TxnIndex, dep_txn_idx: TxnIndex, - ) -> Option { + ) -> DependencyResult { // Note: Could pre-check that txn dep_txn_idx isn't in an executed state, but the caller // usually has just observed the read dependency. // Create a condition variable associated with the dependency. - let dep_condvar = Arc::new((Mutex::new(false), Condvar::new())); + let dep_condvar = Arc::new((Mutex::new(DependencyStatus::Unresolved), Condvar::new())); let mut stored_deps = self.txn_dependency[dep_txn_idx as usize].lock(); @@ -378,9 +404,21 @@ impl Scheduler { // To avoid zombie dependency (and losing liveness), must return here and // not add a (stale) dependency. - return None; + // Note: acquires (a different, status) mutex, while holding (dependency) mutex. + // Only place in scheduler where a thread may hold >1 mutexes, hence, such + // acquisitions always happens in the same order (this function), may not deadlock. + return DependencyResult::Resolved; + } + + // If the execution is already halted, suspend will return false. + // The synchronization is guaranteed by the Mutex around txn_status. + // If the execution is halted, the first finishing thread will first set the status of each txn + // to be ExecutionHalted, then notify the conditional variable. So if a thread sees ExecutionHalted, + // it knows the execution is halted and it can return; otherwise, the finishing thread will notify + // the conditional variable later and awake the pending thread. + if !self.suspend(txn_idx, dep_condvar.clone()) { + return DependencyResult::ExecutionHalted; } - self.suspend(txn_idx, dep_condvar.clone()); // Safe to add dependency here (still holding the lock) - finish_execution of txn // dep_txn_idx is guaranteed to acquire the same lock later and clear the dependency. @@ -388,7 +426,7 @@ impl Scheduler { // Stored deps gets unlocked here. - Some(dep_condvar) + DependencyResult::Dependency(dep_condvar) } pub fn finish_validation(&self, txn_idx: TxnIndex, wave: Wave) { @@ -505,9 +543,56 @@ impl Scheduler { SchedulerTask::NoTask } + + /// This function can halt the BlockSTM early, even if there are unfinished tasks. + /// It will set the done_marker to be true, resolve all pending dependencies. + /// + /// Currently there are 4 scenarios to early halt the BlockSTM execution. + /// 1. There is a module publishing txn that has read/write intersection with any txns even during speculative execution. + /// 2. There is a txn with VM execution status Abort. + /// 3. There is a txn with VM execution status SkipRest. + /// 4. The committed txns have exceeded the PER_BLOCK_GAS_LIMIT. + /// + /// For scenarios 1 and 2, only the error will be returned as the output of the block execution. + /// For scenarios 3 and 4, the execution outputs of the committed txn prefix will be returned. + pub fn halt(&self) { + // The first thread that sets done_marker to be true will be reponsible for + // resolving the conditional variables, to help other theads that may be pending + // on the read dependency. See the comment of the function resolve_condvar(). + if !self.done_marker.swap(true, Ordering::SeqCst) { + for txn_idx in 0..self.num_txns { + self.resolve_condvar(txn_idx); + } + } + } + + /// When early halt the BlockSTM, some of the threads + /// may still be working on execution, and waiting for dependency (indicated by the condition variable `condvar`). + /// Therefore the commit thread needs to wake up all such pending threads, by sending notification to the condition + /// variable and setting the lock variables properly. + pub fn resolve_condvar(&self, txn_idx: TxnIndex) { + let mut status = self.txn_status[txn_idx as usize].0.write(); + { + // Only transactions with status Suspended or ReadyToExecute may have the condition variable of pending threads. + match &*status { + ExecutionStatus::Suspended(_, condvar) + | ExecutionStatus::ReadyToExecute(_, Some(condvar)) => { + let (lock, cvar) = &*(condvar.clone()); + // Mark parallel execution halted due to reasons like module r/w intersection. + *lock.lock() = DependencyStatus::ExecutionHalted; + // Wake up the process waiting for dependency. + cvar.notify_one(); + }, + _ => (), + } + // Set the all transactions' status to be ExecutionHalted. + // Then any dependency read (wait_for_dependency) will immediately return and abort the VM execution. + *status = ExecutionStatus::ExecutionHalted; + } + } } -/// Public functions of the Scheduler +/// Private functions of the Scheduler impl Scheduler { fn unpack_validation_idx(validation_idx: u64) -> (TxnIndex, Wave) { ( @@ -678,13 +763,18 @@ impl Scheduler { /// Put a transaction in a suspended state, with a condition variable that can be /// used to wake it up after the dependency is resolved. - fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) { + /// Return true when the txn is successfully suspended. + /// Return false when the execution is halted. + fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) -> bool { let mut status = self.txn_status[txn_idx as usize].0.write(); - if let ExecutionStatus::Executing(incarnation) = *status { - *status = ExecutionStatus::Suspended(incarnation, dep_condvar); - } else { - unreachable!(); + match *status { + ExecutionStatus::Executing(incarnation) => { + *status = ExecutionStatus::Suspended(incarnation, dep_condvar); + true + }, + ExecutionStatus::ExecutionHalted => false, + _ => unreachable!(), } } @@ -694,6 +784,10 @@ impl Scheduler { fn resume(&self, txn_idx: TxnIndex) { let mut status = self.txn_status[txn_idx as usize].0.write(); + if matches!(*status, ExecutionStatus::ExecutionHalted) { + return; + } + if let ExecutionStatus::Suspended(incarnation, dep_condvar) = &*status { *status = ExecutionStatus::ReadyToExecute(*incarnation, Some(dep_condvar.clone())); } else { @@ -704,10 +798,13 @@ impl Scheduler { /// Set status of the transaction to Executed(incarnation). fn set_executed_status(&self, txn_idx: TxnIndex, incarnation: Incarnation) { let mut status = self.txn_status[txn_idx as usize].0.write(); + // The execution is already halted. + if matches!(*status, ExecutionStatus::ExecutionHalted) { + return; + } // Only makes sense when the current status is 'Executing'. debug_assert!(*status == ExecutionStatus::Executing(incarnation)); - *status = ExecutionStatus::Executed(incarnation); } @@ -715,10 +812,13 @@ impl Scheduler { /// an incremented incarnation number. fn set_aborted_status(&self, txn_idx: TxnIndex, incarnation: Incarnation) { let mut status = self.txn_status[txn_idx as usize].0.write(); + // The execution is already halted. + if matches!(*status, ExecutionStatus::ExecutionHalted) { + return; + } // Only makes sense when the current status is 'Aborting'. debug_assert!(*status == ExecutionStatus::Aborting(incarnation)); - *status = ExecutionStatus::ReadyToExecute(incarnation + 1, None); } diff --git a/aptos-move/block-executor/src/task.rs b/aptos-move/block-executor/src/task.rs index f7416582b2513..7589bef53ded1 100644 --- a/aptos-move/block-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -92,4 +92,7 @@ pub trait TransactionOutput: Send + Sync + Debug { &self, delta_writes: Vec<(::Key, WriteOp)>, ); + + /// Return the amount of gas consumed by the transaction. + fn gas_used(&self) -> u64; } diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index 4223655469100..30fc77f57fb50 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -5,6 +5,8 @@ use crate::{ errors::Error, task::{ExecutionStatus, Transaction, TransactionOutput}, }; +use anyhow::anyhow; +use aptos_infallible::Mutex; use aptos_mvhashmap::types::{Incarnation, TxnIndex, Version}; use aptos_types::{access_path::AccessPath, executable::ModulePath, write_set::WriteOp}; use arc_swap::ArcSwapOption; @@ -128,6 +130,8 @@ pub struct TxnLastInputOutput { module_reads: DashSet, module_read_write_intersection: AtomicBool, + + commit_locks: Vec>, // Shared locks to prevent race during commit } impl TxnLastInputOutput { @@ -142,6 +146,7 @@ impl TxnLastInputO module_writes: DashSet::new(), module_reads: DashSet::new(), module_read_write_intersection: AtomicBool::new(false), + commit_locks: (0..num_txns).map(|_| Mutex::new(())).collect(), } } @@ -178,7 +183,7 @@ impl TxnLastInputO txn_idx: TxnIndex, input: Vec>, output: ExecutionStatus>, - ) { + ) -> anyhow::Result<()> { let read_modules: Vec = input.iter().filter_map(|desc| desc.module_path()).collect(); let written_modules: Vec = match &output { @@ -197,11 +202,16 @@ impl TxnLastInputO { self.module_read_write_intersection .store(true, Ordering::Release); + return Err(anyhow!( + "[BlockSTM]: Detect module r/w intersection, will fallback to sequential execution" + )); } } self.inputs[txn_idx as usize].store(Some(Arc::new(input))); self.outputs[txn_idx as usize].store(Some(Arc::new(TxnOutput::from_output_status(output)))); + + Ok(()) } pub(crate) fn module_publishing_may_race(&self) -> bool { @@ -212,6 +222,28 @@ impl TxnLastInputO self.inputs[txn_idx as usize].load_full() } + pub fn gas_used(&self, txn_idx: TxnIndex) -> Option { + match &self.outputs[txn_idx as usize] + .load_full() + .expect("[BlockSTM]: Execution output must be recorded after execution") + .output_status + { + ExecutionStatus::Success(output) => Some(output.gas_used()), + _ => None, + } + } + + pub fn update_to_skip_rest(&self, txn_idx: TxnIndex) { + let _lock = self.commit_locks[txn_idx as usize].lock(); + if let ExecutionStatus::Success(output) = self.take_output(txn_idx) { + self.outputs[txn_idx as usize].store(Some(Arc::new(TxnOutput { + output_status: ExecutionStatus::SkipRest(output), + }))); + } else { + unreachable!(); + } + } + // Extracts a set of paths written or updated during execution from transaction // output: (modified by writes, modified by deltas). pub(crate) fn modified_keys(&self, txn_idx: TxnIndex) -> KeySet { @@ -236,7 +268,11 @@ impl TxnLastInputO usize, Box::Txn as Transaction>::Key>>, ) { - self.outputs[txn_idx as usize].load().as_ref().map_or( + let _lock = self.commit_locks[txn_idx as usize].lock(); + let ret: ( + usize, + Box::Txn as Transaction>::Key>>, + ) = self.outputs[txn_idx as usize].load().as_ref().map_or( ( 0, Box::new(empty::<<::Txn as Transaction>::Key>()), @@ -251,7 +287,8 @@ impl TxnLastInputO Box::new(empty::<<::Txn as Transaction>::Key>()), ), }, - ) + ); + ret } // Called when a transaction is committed to record WriteOps for materialized aggregator values @@ -261,6 +298,7 @@ impl TxnLastInputO txn_idx: TxnIndex, delta_writes: Vec<(<::Txn as Transaction>::Key, WriteOp)>, ) { + let _lock = self.commit_locks[txn_idx as usize].lock(); match &self.outputs[txn_idx as usize] .load_full() .expect("Output must exist") @@ -278,10 +316,10 @@ impl TxnLastInputO pub(crate) fn take_output(&self, txn_idx: TxnIndex) -> ExecutionStatus> { let owning_ptr = self.outputs[txn_idx as usize] .swap(None) - .expect("Output must be recorded after execution"); + .expect("[BlockSTM]: Output must be recorded after execution"); Arc::try_unwrap(owning_ptr) .map(|output| output.output_status) - .expect("Output should be uniquely owned after execution") + .expect("[BlockSTM]: Output should be uniquely owned after execution") } } diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index b1688c14de26e..c31b6acafc1b2 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -5,7 +5,7 @@ use crate::{ executor::BlockExecutor, proptest_types::types::{DeltaDataView, ExpectedOutput, KeyType, Task, Transaction, ValueType}, - scheduler::{Scheduler, SchedulerTask}, + scheduler::{DependencyResult, Scheduler, SchedulerTask}, }; use aptos_aggregator::delta_change_set::{delta_add, delta_sub, DeltaOp, DeltaUpdate}; use aptos_mvhashmap::types::TxnIndex; @@ -40,10 +40,11 @@ where let output = BlockExecutor::, Task, DeltaDataView>::new( num_cpus::get(), executor_thread_pool, + None, ) .execute_transactions_parallel((), &transactions, &data_view); - let baseline = ExpectedOutput::generate_baseline(&transactions, None); + let baseline = ExpectedOutput::generate_baseline(&transactions, None, None); baseline.assert_output(&output); } @@ -425,11 +426,16 @@ fn scheduler_dependency() { s.next_task(false), SchedulerTask::ValidationTask((0, 0), 0) )); - // Current status of 0 is executed - hence, no dependency added. - assert!(s.wait_for_dependency(3, 0).is_none()); + assert!(matches!( + s.wait_for_dependency(3, 0), + DependencyResult::Resolved + )); // Dependency added for transaction 4 on transaction 2. - assert!(s.wait_for_dependency(4, 2).is_some()); + assert!(matches!( + s.wait_for_dependency(4, 2), + DependencyResult::Dependency(_) + )); assert!(matches!( s.finish_execution(2, 0, false), @@ -468,7 +474,6 @@ fn incarnation_one_scheduler(num_txns: TxnIndex) -> Scheduler { SchedulerTask::ExecutionTask((j, 1), None) if i == j )); } - s } @@ -477,8 +482,14 @@ fn scheduler_incarnation() { let s = incarnation_one_scheduler(5); // execution/validation index = 5, wave = 0. - assert!(s.wait_for_dependency(1, 0).is_some()); - assert!(s.wait_for_dependency(3, 0).is_some()); + assert!(matches!( + s.wait_for_dependency(1, 0), + DependencyResult::Dependency(_) + )); + assert!(matches!( + s.wait_for_dependency(3, 0), + DependencyResult::Dependency(_) + )); // Because validation index is higher, return validation task to caller (even with // revalidate_suffix = true) - because now we always decrease validation idx to txn_idx + 1 diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index ab5767594db1e..fb5e7e998362d 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - counters, scheduler::Scheduler, task::Transaction, txn_last_input_output::ReadDescriptor, + counters, + scheduler::{DependencyResult, DependencyStatus, Scheduler}, + task::Transaction, + txn_last_input_output::ReadDescriptor, }; use anyhow::Result; use aptos_aggregator::delta_change_set::{deserialize, serialize}; @@ -44,6 +47,8 @@ pub(crate) enum ReadResult { U128(u128), // Read could not resolve the delta (no base value). Unresolved, + // Parallel execution halts. + ExecutionHalted, // Read did not return anything. None, } @@ -120,7 +125,7 @@ impl< Err(Dependency(dep_idx)) => { // `self.txn_idx` estimated to depend on a write from `dep_idx`. match self.scheduler.wait_for_dependency(txn_idx, dep_idx) { - Some(dep_condition) => { + DependencyResult::Dependency(dep_condition) => { let _timer = counters::DEPENDENCY_WAIT_SECONDS.start_timer(); // Wait on a condition variable corresponding to the encountered // read dependency. Once the dep_idx finishes re-execution, scheduler @@ -138,11 +143,17 @@ impl< // eventually finish and lead to unblocking txn_idx, contradiction. let (lock, cvar) = &*dep_condition; let mut dep_resolved = lock.lock(); - while !*dep_resolved { + while let DependencyStatus::Unresolved = *dep_resolved { dep_resolved = cvar.wait(dep_resolved).unwrap(); } + if let DependencyStatus::ExecutionHalted = *dep_resolved { + return ReadResult::ExecutionHalted; + } + }, + DependencyResult::ExecutionHalted => { + return ReadResult::ExecutionHalted; }, - None => continue, + DependencyResult::Resolved => continue, } }, Err(DeltaApplicationFailure) => { @@ -253,6 +264,15 @@ impl<'a, T: Transaction, S: TStateView> TStateView for LatestView< match mv_value { ReadResult::Value(v) => Ok(v.as_state_value()), ReadResult::U128(v) => Ok(Some(StateValue::new_legacy(serialize(&v)))), + // ExecutionHalted indicates that the parallel execution is halted. + // The read should return immediately and log the error. + // For now we use STORAGE_ERROR as the VM will not log the speculative eror, + // so no actual error will be logged once the execution is halted and + // the speculative logging is flushed. + ReadResult::ExecutionHalted => Err(anyhow::Error::new(VMStatus::Error( + StatusCode::STORAGE_ERROR, + Some("Speculative error to halt BlockSTM early.".to_string()), + ))), ReadResult::None => self.get_base_value(state_key), ReadResult::Unresolved => unreachable!( "Must be resolved as base value is recorded in the MV data structure" diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index a18d956d57d6f..ad8547d4d8055 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -408,6 +408,7 @@ impl FakeExecutor { txn_block, &self.data_store, usize::min(4, num_cpus::get()), + None, ) } diff --git a/consensus/consensus-types/src/block.rs b/consensus/consensus-types/src/block.rs index 3f17e72581475..90b8f9148270b 100644 --- a/consensus/consensus-types/src/block.rs +++ b/consensus/consensus-types/src/block.rs @@ -346,13 +346,26 @@ impl Block { &self, validators: &[AccountAddress], txns: Vec, + block_gas_limit: Option, ) -> Vec { - once(Transaction::BlockMetadata( - self.new_block_metadata(validators), - )) - .chain(txns.into_iter().map(Transaction::UserTransaction)) - .chain(once(Transaction::StateCheckpoint(self.id))) - .collect() + if block_gas_limit.is_some() { + // After the per-block gas limit change, StateCheckpoint txn + // is inserted after block execution + once(Transaction::BlockMetadata( + self.new_block_metadata(validators), + )) + .chain(txns.into_iter().map(Transaction::UserTransaction)) + .collect() + } else { + // Before the per-block gas limit change, StateCheckpoint txn + // is inserted here for compatibility. + once(Transaction::BlockMetadata( + self.new_block_metadata(validators), + )) + .chain(txns.into_iter().map(Transaction::UserTransaction)) + .chain(once(Transaction::StateCheckpoint(self.id))) + .collect() + } } fn new_block_metadata(&self, validators: &[AccountAddress]) -> BlockMetadata { diff --git a/consensus/consensus-types/src/executed_block.rs b/consensus/consensus-types/src/executed_block.rs index 334f74eff6793..6a1b45656bd57 100644 --- a/consensus/consensus-types/src/executed_block.rs +++ b/consensus/consensus-types/src/executed_block.rs @@ -108,14 +108,36 @@ impl ExecutedBlock { &self, validators: &[AccountAddress], txns: Vec, + block_gas_limit: Option, ) -> Vec { // reconfiguration suffix don't execute if self.is_reconfiguration_suffix() { return vec![]; } + + let mut txns_with_state_checkpoint = + self.block + .transactions_to_execute(validators, txns, block_gas_limit); + if block_gas_limit.is_some() && !self.state_compute_result.has_reconfiguration() { + // After the per-block gas limit change, + // insert state checkpoint at the position + // 1) after last txn if there is no Retry + // 2) before the first Retry + if let Some(pos) = self + .state_compute_result + .compute_status() + .iter() + .position(|s| s.is_retry()) + { + txns_with_state_checkpoint.insert(pos, Transaction::StateCheckpoint(self.id())); + } else { + txns_with_state_checkpoint.push(Transaction::StateCheckpoint(self.id())); + } + } + itertools::zip_eq( - self.block.transactions_to_execute(validators, txns), + txns_with_state_checkpoint, self.state_compute_result.compute_status(), ) .filter_map(|(txn, status)| match status { diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 5bbf4cb59b737..c90a20aecf711 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -680,6 +680,7 @@ impl EpochManager { let (payload_manager, quorum_store_msg_tx) = quorum_store_builder.init_payload_manager(); let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); + let block_gas_limit = onchain_execution_config.block_gas_limit(); self.quorum_store_msg_tx = quorum_store_msg_tx; let payload_client = QuorumStoreClient::new( @@ -692,6 +693,7 @@ impl EpochManager { &epoch_state, payload_manager.clone(), transaction_shuffler, + block_gas_limit, ); let state_computer = if onchain_consensus_config.decoupled_execution() { Arc::new(self.spawn_decoupled_execution( diff --git a/consensus/src/experimental/ordering_state_computer.rs b/consensus/src/experimental/ordering_state_computer.rs index 30f80aaab3b47..7fb5f63e4c50f 100644 --- a/consensus/src/experimental/ordering_state_computer.rs +++ b/consensus/src/experimental/ordering_state_computer.rs @@ -125,6 +125,7 @@ impl StateComputer for OrderingStateComputer { _: &EpochState, _payload_manager: Arc, _: Arc, + _: Option, ) { } diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 05f3ed25b450c..3f108ffbf253b 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -122,11 +122,16 @@ impl StateComputer for ExecutionProxy { let shuffled_txns = txn_shuffler.shuffle(txns); + let block_gas_limit = self.executor.get_block_gas_limit(); + // TODO: figure out error handling for the prologue txn let executor = self.executor.clone(); - let transactions_to_execute = - block.transactions_to_execute(&self.validators.lock(), shuffled_txns.clone()); + let transactions_to_execute = block.transactions_to_execute( + &self.validators.lock(), + shuffled_txns.clone(), + block_gas_limit, + ); let compute_result = monitor!( "execute_block", @@ -174,6 +179,8 @@ impl StateComputer for ExecutionProxy { let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); + let block_gas_limit = self.executor.get_block_gas_limit(); + for block in blocks { block_ids.push(block.id()); @@ -184,7 +191,11 @@ impl StateComputer for ExecutionProxy { let signed_txns = payload_manager.get_transactions(block.block()).await?; let shuffled_txns = txn_shuffler.shuffle(signed_txns); - txns.extend(block.transactions_to_commit(&self.validators.lock(), shuffled_txns)); + txns.extend(block.transactions_to_commit( + &self.validators.lock(), + shuffled_txns, + block_gas_limit, + )); reconfig_events.extend(block.reconfig_event()); } @@ -277,6 +288,7 @@ impl StateComputer for ExecutionProxy { epoch_state: &EpochState, payload_manager: Arc, transaction_shuffler: Arc, + block_gas_limit: Option, ) { *self.validators.lock() = epoch_state .verifier @@ -286,6 +298,7 @@ impl StateComputer for ExecutionProxy { self.transaction_shuffler .lock() .replace(transaction_shuffler); + self.executor.update_block_gas_limit(block_gas_limit); } // Clears the epoch-specific state. Only a sync_to call is expected before calling new_epoch @@ -340,6 +353,12 @@ async fn test_commit_sync_race() { } fn finish(&self) {} + + fn get_block_gas_limit(&self) -> Option { + None + } + + fn update_block_gas_limit(&self, _block_gas_limit: Option) {} } #[async_trait::async_trait] @@ -402,6 +421,7 @@ async fn test_commit_sync_race() { &EpochState::empty(), Arc::new(PayloadManager::DirectMempool), create_transaction_shuffler(TransactionShufflerType::NoShuffling), + None, ); executor .commit(&[], generate_li(1, 1), callback.clone()) diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index ec4a3af5760b7..8af7c35fed81d 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -77,6 +77,7 @@ pub trait StateComputer: Send + Sync { epoch_state: &EpochState, payload_manager: Arc, transaction_shuffler: Arc, + block_gas_limit: Option, ); // Reconfigure to clear epoch state at end of epoch. diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index cd33877825a53..eaac348fed730 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -133,7 +133,14 @@ impl StateComputer for MockStateComputer { Ok(()) } - fn new_epoch(&self, _: &EpochState, _: Arc, _: Arc) {} + fn new_epoch( + &self, + _: &EpochState, + _: Arc, + _: Arc, + _: Option, + ) { + } fn end_epoch(&self) {} } @@ -163,7 +170,14 @@ impl StateComputer for EmptyStateComputer { Ok(()) } - fn new_epoch(&self, _: &EpochState, _: Arc, _: Arc) {} + fn new_epoch( + &self, + _: &EpochState, + _: Arc, + _: Arc, + _: Option, + ) { + } fn end_epoch(&self) {} } @@ -217,7 +231,14 @@ impl StateComputer for RandomComputeResultStateComputer { Ok(()) } - fn new_epoch(&self, _: &EpochState, _: Arc, _: Arc) {} + fn new_epoch( + &self, + _: &EpochState, + _: Arc, + _: Arc, + _: Option, + ) { + } fn end_epoch(&self) {} } diff --git a/crates/aptos-speculative-state-helper/src/lib.rs b/crates/aptos-speculative-state-helper/src/lib.rs index 04c55cab679dc..a18563c97926e 100644 --- a/crates/aptos-speculative-state-helper/src/lib.rs +++ b/crates/aptos-speculative-state-helper/src/lib.rs @@ -101,10 +101,11 @@ impl SpeculativeEvents { } } - /// Flush the stored events asynchronously by spawning global rayon threads. - pub fn flush(mut self) { + /// Flush the first num_to_flush stored events asynchronously by spawning global rayon threads. + pub fn flush(mut self, num_to_flush: usize) { + let to_flush = self.events.drain(..num_to_flush).collect::>(); rayon::spawn(move || { - self.events + to_flush .into_par_iter() .with_min_len(EVENT_DISPATCH_BATCH_SIZE) .for_each(|m| { diff --git a/crates/aptos-speculative-state-helper/src/tests/logging.rs b/crates/aptos-speculative-state-helper/src/tests/logging.rs index bee069ce65289..b09aac239fd5b 100644 --- a/crates/aptos-speculative-state-helper/src/tests/logging.rs +++ b/crates/aptos-speculative-state-helper/src/tests/logging.rs @@ -82,7 +82,7 @@ fn test_speculative_logging() { )); // Expected assert_err!(receiver.try_recv()); - speculative_logs.flush(); + speculative_logs.flush(2); // We expect 3 messages. let expected = vec![ diff --git a/crates/aptos-speculative-state-helper/src/tests/proptests.rs b/crates/aptos-speculative-state-helper/src/tests/proptests.rs index b431618f312f0..a07c79dde1bc3 100644 --- a/crates/aptos-speculative-state-helper/src/tests/proptests.rs +++ b/crates/aptos-speculative-state-helper/src/tests/proptests.rs @@ -254,7 +254,7 @@ fn test_events(counter_ops: Vec<(usize, usize)>) { while worker_done_cnt.load(Ordering::Relaxed) != 4 {} - spec_events.flush(); + spec_events.flush(num_counters); // Need the number of recorded events after last clear. let expected_dispatched_events = recorded_final_event_cnt.load(Ordering::Relaxed); while arc_event_done_cnt.load(Ordering::Relaxed) != expected_dispatched_events {} diff --git a/execution/executor-benchmark/src/benchmark_transaction.rs b/execution/executor-benchmark/src/benchmark_transaction.rs index 83a04ebfb6b57..fa803e6e16d53 100644 --- a/execution/executor-benchmark/src/benchmark_transaction.rs +++ b/execution/executor-benchmark/src/benchmark_transaction.rs @@ -82,4 +82,19 @@ impl TransactionBlockExecutor for AptosVM { state_view, ) } + + fn execute_transaction_block_with_gas_limit( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result { + AptosVM::execute_transaction_block_with_gas_limit( + transactions + .into_iter() + .map(|txn| txn.transaction) + .collect(), + state_view, + maybe_gas_limit, + ) + } } diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index 8848c249ba843..391e0d993bb90 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -439,4 +439,17 @@ impl TransactionBlockExecutor for NativeExecutor { state_cache: state_view.into_state_cache(), }) } + + // Dummy function that is not supposed to be used + fn execute_transaction_block_with_gas_limit( + _transactions: Vec, + state_view: CachedStateView, + _maybe_gas_limit: Option, + ) -> Result { + Ok(ChunkOutput { + transactions: vec![], + transaction_outputs: vec![], + state_cache: state_view.into_state_cache(), + }) + } } diff --git a/execution/executor-test-helpers/src/integration_test_impl.rs b/execution/executor-test-helpers/src/integration_test_impl.rs index b46e64ccc8acc..6fa95eb084217 100644 --- a/execution/executor-test-helpers/src/integration_test_impl.rs +++ b/execution/executor-test-helpers/src/integration_test_impl.rs @@ -160,7 +160,7 @@ pub fn test_execution_with_storage_impl() -> Arc { txn_factory.transfer(account3.address(), 10 * B), ))); } - let block3 = block(block3); // append state checkpoint txn + let block3 = block(block3, executor.get_block_gas_limit()); // append state checkpoint txn let output1 = executor .execute_block((block1_id, block1.clone()), parent_block_id) @@ -429,9 +429,12 @@ pub fn test_execution_with_storage_impl() -> Arc { }) .unwrap(); + // With block gas limit, StateCheckpoint txn is inserted to block after execution. + let diff = executor.get_block_gas_limit().map(|_| 0).unwrap_or(1); + let transaction_list_with_proof = db .reader - .get_transactions(14, 16, current_version, false) + .get_transactions(14, 15 + diff, current_version, false) .unwrap(); verify_transactions(&transaction_list_with_proof, &block3).unwrap(); diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 68c8a3dc68947..c6efafc177198 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -124,6 +124,10 @@ pub trait BlockExecutorTrait: Send + Sync { /// Finishes the block executor by releasing memory held by inner data structures(SMT). fn finish(&self); + + fn get_block_gas_limit(&self) -> Option; + + fn update_block_gas_limit(&self, block_gas_limit: Option); } #[derive(Clone)] diff --git a/execution/executor/src/block_executor.rs b/execution/executor/src/block_executor.rs index 21dc7269225de..f595e49242604 100644 --- a/execution/executor/src/block_executor.rs +++ b/execution/executor/src/block_executor.rs @@ -16,7 +16,7 @@ use crate::{ use anyhow::Result; use aptos_crypto::HashValue; use aptos_executor_types::{BlockExecutorTrait, Error, StateComputeResult}; -use aptos_infallible::RwLock; +use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_scratchpad::SparseMerkleTree; use aptos_state_view::StateViewId; @@ -36,6 +36,12 @@ pub trait TransactionBlockExecutor: Send + Sync { transactions: Vec, state_view: CachedStateView, ) -> Result; + + fn execute_transaction_block_with_gas_limit( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result; } impl TransactionBlockExecutor for AptosVM { @@ -45,6 +51,18 @@ impl TransactionBlockExecutor for AptosVM { ) -> Result { ChunkOutput::by_transaction_execution::(transactions, state_view) } + + fn execute_transaction_block_with_gas_limit( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result { + ChunkOutput::by_transaction_execution_with_gas_limit::( + transactions, + state_view, + maybe_gas_limit, + ) + } } pub struct BlockExecutor { @@ -85,6 +103,24 @@ where V: TransactionBlockExecutor, T: Send + Sync, { + fn get_block_gas_limit(&self) -> Option { + self.maybe_initialize().expect("Failed to initialize."); + self.inner + .read() + .as_ref() + .expect("BlockExecutor is not reset") + .get_block_gas_limit() + } + + fn update_block_gas_limit(&self, block_gas_limit: Option) { + self.maybe_initialize().expect("Failed to initialize."); + self.inner + .write() + .as_ref() + .expect("BlockExecutor is not reset") + .update_block_gas_limit(block_gas_limit); + } + fn committed_block_id(&self) -> HashValue { self.maybe_initialize().expect("Failed to initialize."); self.inner @@ -134,6 +170,7 @@ struct BlockExecutorInner { db: DbReaderWriter, block_tree: BlockTree, phantom: PhantomData<(V, T)>, + block_gas_limit: Mutex>, } impl BlockExecutorInner @@ -147,6 +184,7 @@ where db, block_tree, phantom: PhantomData, + block_gas_limit: Mutex::new(None), }) } @@ -166,6 +204,15 @@ where V: TransactionBlockExecutor, T: Send + Sync, { + fn get_block_gas_limit(&self) -> Option { + self.block_gas_limit.lock().as_ref().copied() + } + + fn update_block_gas_limit(&self, block_gas_limit: Option) { + let mut gas_limit = self.block_gas_limit.lock(); + *gas_limit = block_gas_limit; + } + fn committed_block_id(&self) -> HashValue { self.block_tree.root_block().id } @@ -218,6 +265,8 @@ where )? }; + let maybe_gas_limit = self.get_block_gas_limit(); + let chunk_output = { let _timer = APTOS_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.start_timer(); fail_point!("executor::vm_execute_block", |_| { @@ -225,14 +274,25 @@ where "Injected error in vm_execute_block" ))) }); - V::execute_transaction_block(transactions, state_view)? + if maybe_gas_limit.is_some() { + V::execute_transaction_block_with_gas_limit( + transactions, + state_view, + maybe_gas_limit, + )? + } else { + V::execute_transaction_block(transactions, state_view)? + } }; chunk_output.trace_log_transaction_status(); let _timer = APTOS_EXECUTOR_OTHER_TIMERS_SECONDS .with_label_values(&["apply_to_ledger"]) .start_timer(); - let (output, _, _) = chunk_output.apply_to_ledger_for_block(parent_view)?; + + let (output, _, _) = chunk_output + .apply_to_ledger_for_block(parent_view, maybe_gas_limit.map(|_| block_id))?; + output }; output.ensure_ends_with_state_checkpoint()?; diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index 0a71db54b29cd..373e7b79fcb88 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -143,7 +143,7 @@ impl ChunkExecutorInner { transaction_infos: &[TransactionInfo], ) -> Result { let (mut executed_chunk, to_discard, to_retry) = - chunk_output.apply_to_ledger(latest_view)?; + chunk_output.apply_to_ledger(latest_view, None)?; ensure_no_discard(to_discard)?; ensure_no_retry(to_retry)?; executed_chunk.ledger_info = executed_chunk @@ -587,7 +587,8 @@ impl ChunkExecutorInner { let state_view = self.state_view(latest_view)?; let chunk_output = ChunkOutput::by_transaction_output(txns_and_outputs, state_view)?; - let (executed_batch, to_discard, to_retry) = chunk_output.apply_to_ledger(latest_view)?; + let (executed_batch, to_discard, to_retry) = + chunk_output.apply_to_ledger(latest_view, None)?; ensure_no_discard(to_discard)?; ensure_no_retry(to_retry)?; executed_batch.ensure_transaction_infos_match(&txn_infos)?; diff --git a/execution/executor/src/components/apply_chunk_output.rs b/execution/executor/src/components/apply_chunk_output.rs index edea740a5d331..e4808d78f30f9 100644 --- a/execution/executor/src/components/apply_chunk_output.rs +++ b/execution/executor/src/components/apply_chunk_output.rs @@ -26,11 +26,17 @@ use aptos_types::{ proof::accumulator::InMemoryAccumulator, state_store::{state_key::StateKey, state_value::StateValue, ShardedStateUpdates}, transaction::{ - Transaction, TransactionInfo, TransactionOutput, TransactionStatus, TransactionToCommit, + ExecutionStatus, Transaction, TransactionInfo, TransactionOutput, TransactionStatus, + TransactionToCommit, }, + write_set::WriteSet, }; use rayon::prelude::*; -use std::{collections::HashMap, iter::repeat, sync::Arc}; +use std::{ + collections::HashMap, + iter::{once, repeat}, + sync::Arc, +}; pub struct ApplyChunkOutput; @@ -38,6 +44,7 @@ impl ApplyChunkOutput { pub fn apply_block( chunk_output: ChunkOutput, base_view: &ExecutedTrees, + append_state_checkpoint_to_block: Option, ) -> Result<(ExecutedBlock, Vec, Vec)> { let ChunkOutput { state_cache, @@ -48,8 +55,15 @@ impl ApplyChunkOutput { let _timer = APTOS_EXECUTOR_OTHER_TIMERS_SECONDS .with_label_values(&["sort_transactions"]) .start_timer(); - // Separate transactions with different VM statuses. - Self::sort_transactions(transactions, transaction_outputs)? + // Separate transactions with different VM statuses, i.e., Keep, Discard and Retry. + // Will return transactions with Retry txns sorted after Keep/Discard txns. + // If the transactions contain no reconfiguration txn, will insert the StateCheckpoint txn + // at the boundary of Keep/Discard txns and Retry txns. + Self::sort_transactions_with_state_checkpoint( + transactions, + transaction_outputs, + append_state_checkpoint_to_block, + )? }; // Apply the write set, get the latest state. @@ -106,6 +120,7 @@ impl ApplyChunkOutput { pub fn apply_chunk( chunk_output: ChunkOutput, base_view: &ExecutedTrees, + append_state_checkpoint_to_block: Option, ) -> Result<(ExecutedChunk, Vec, Vec)> { let ChunkOutput { state_cache, @@ -116,8 +131,15 @@ impl ApplyChunkOutput { let _timer = APTOS_EXECUTOR_OTHER_TIMERS_SECONDS .with_label_values(&["sort_transactions"]) .start_timer(); - // Separate transactions with different VM statuses. - Self::sort_transactions(transactions, transaction_outputs)? + // Separate transactions with different VM statuses, i.e., Keep, Discard and Retry. + // Will return transactions with Retry txns sorted after Keep/Discard txns. + // If the transactions contain no reconfiguration txn, will insert the StateCheckpoint txn + // at the boundary of Keep/Discard txns and Retry txns. + Self::sort_transactions_with_state_checkpoint( + transactions, + transaction_outputs, + append_state_checkpoint_to_block, + )? }; // Apply the write set, get the latest state. @@ -156,9 +178,10 @@ impl ApplyChunkOutput { )) } - fn sort_transactions( + fn sort_transactions_with_state_checkpoint( mut transactions: Vec, transaction_outputs: Vec, + append_state_checkpoint_to_block: Option, ) -> Result<( bool, Vec, @@ -166,7 +189,6 @@ impl ApplyChunkOutput { Vec, Vec, )> { - let num_txns = transactions.len(); let mut transaction_outputs: Vec = transaction_outputs.into_iter().map(Into::into).collect(); // N.B. off-by-1 intentionally, for exclusive index @@ -175,30 +197,57 @@ impl ApplyChunkOutput { .position(|o| o.is_reconfig()) .map(|idx| idx + 1); - // Transactions after the epoch ending are all to be retried. + let block_gas_limit_marker = transaction_outputs + .iter() + .position(|o| matches!(o.status(), TransactionStatus::Retry)); + + // Transactions after the epoch ending txn are all to be retried. + // Transactions after the txn that exceeded per-block gas limit are also to be retried. let to_retry = if let Some(pos) = new_epoch_marker { transaction_outputs.drain(pos..); transactions.drain(pos..).collect() + } else if let Some(pos) = block_gas_limit_marker { + transaction_outputs.drain(pos..); + transactions.drain(pos..).collect() } else { vec![] }; - // N.B. Transaction status after the epoch marker are ignored and set to Retry forcibly. - let status = transaction_outputs - .iter() - .map(|t| t.status()) - .cloned() - .chain(repeat(TransactionStatus::Retry)) - .take(num_txns) - .collect(); + let state_checkpoint_to_add = + new_epoch_marker.map_or_else(|| append_state_checkpoint_to_block, |_| None); + + let keeps_and_discards = transaction_outputs.iter().map(|t| t.status()).cloned(); + let retries = repeat(TransactionStatus::Retry).take(to_retry.len()); + + let status = if state_checkpoint_to_add.is_some() { + keeps_and_discards + .chain(once(TransactionStatus::Keep(ExecutionStatus::Success))) + .chain(retries) + .collect() + } else { + keeps_and_discards.chain(retries).collect() + }; // Separate transactions with the Keep status out. - let (to_keep, to_discard) = + let (mut to_keep, to_discard) = itertools::zip_eq(transactions.into_iter(), transaction_outputs.into_iter()) .partition::, _>(|(_, o)| { matches!(o.status(), TransactionStatus::Keep(_)) }); + // Append the StateCheckpoint transaction to the end of to_keep + if let Some(block_id) = state_checkpoint_to_add { + let state_checkpoint_txn = Transaction::StateCheckpoint(block_id); + let state_checkpoint_txn_output: ParsedTransactionOutput = + Into::into(TransactionOutput::new( + WriteSet::default(), + Vec::new(), + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + )); + to_keep.push((state_checkpoint_txn, state_checkpoint_txn_output)); + } + // Sanity check transactions with the Discard status: let to_discard = to_discard .into_iter() diff --git a/execution/executor/src/components/chunk_output.rs b/execution/executor/src/components/chunk_output.rs index f3c9e72d3c3fd..fd6be269ec04a 100644 --- a/execution/executor/src/components/chunk_output.rs +++ b/execution/executor/src/components/chunk_output.rs @@ -6,6 +6,7 @@ use crate::{components::apply_chunk_output::ApplyChunkOutput, metrics}; use anyhow::Result; +use aptos_crypto::HashValue; use aptos_executor_types::{ExecutedBlock, ExecutedChunk}; use aptos_infallible::Mutex; use aptos_logger::{sample, sample::SampleRate, trace, warn}; @@ -28,6 +29,7 @@ pub static SHARDED_BLOCK_EXECUTOR: Lazy( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result { + let transaction_outputs = Self::execute_block_with_gas_limit::( + transactions.clone(), + &state_view, + maybe_gas_limit, + )?; + + // to print txn output for debugging, uncomment: + // println!("{:?}", transaction_outputs.iter().map(|t| t.status() ).collect::>()); + + update_counters_for_processed_chunk(&transactions, &transaction_outputs, "executed"); + + Ok(Self { + transactions, + transaction_outputs, + state_cache: state_view.into_state_cache(), + }) + } + pub fn by_transaction_execution_sharded( transactions: Vec, state_view: CachedStateView, @@ -110,23 +135,25 @@ impl ChunkOutput { pub fn apply_to_ledger( self, base_view: &ExecutedTrees, + append_state_checkpoint_to_block: Option, ) -> Result<(ExecutedChunk, Vec, Vec)> { fail_point!("executor::apply_to_ledger", |_| { Err(anyhow::anyhow!("Injected error in apply_to_ledger.")) }); - ApplyChunkOutput::apply_chunk(self, base_view) + ApplyChunkOutput::apply_chunk(self, base_view, append_state_checkpoint_to_block) } pub fn apply_to_ledger_for_block( self, base_view: &ExecutedTrees, + append_state_checkpoint_to_block: Option, ) -> Result<(ExecutedBlock, Vec, Vec)> { fail_point!("executor::apply_to_ledger_for_block", |_| { Err(anyhow::anyhow!( "Injected error in apply_to_ledger_for_block." )) }); - ApplyChunkOutput::apply_block(self, base_view) + ApplyChunkOutput::apply_block(self, base_view, append_state_checkpoint_to_block) } pub fn trace_log_transaction_status(&self) { @@ -163,6 +190,21 @@ impl ChunkOutput { Ok(V::execute_block(transactions, &state_view)?) } + /// Executes the block of [Transaction]s using the [VMExecutor] and returns + /// a vector of [TransactionOutput]s. + #[cfg(not(feature = "consensus-only-perf-test"))] + fn execute_block_with_gas_limit( + transactions: Vec, + state_view: &CachedStateView, + maybe_gas_limit: Option, + ) -> Result> { + Ok(V::execute_block_with_gas_limit( + transactions, + &state_view, + maybe_gas_limit, + )?) + } + /// In consensus-only mode, executes the block of [Transaction]s using the /// [VMExecutor] only if its a genesis block. In all other cases, this /// method returns an [TransactionOutput] with an empty [WriteSet], constant diff --git a/execution/executor/src/db_bootstrapper.rs b/execution/executor/src/db_bootstrapper.rs index 53cf8bbbe1b99..2f8b7ef9123ec 100644 --- a/execution/executor/src/db_bootstrapper.rs +++ b/execution/executor/src/db_bootstrapper.rs @@ -138,7 +138,7 @@ pub fn calculate_genesis( let (mut output, _, _) = ChunkOutput::by_transaction_execution::(vec![genesis_txn.clone()], base_state_view)? - .apply_to_ledger(&executed_trees)?; + .apply_to_ledger(&executed_trees, None)?; ensure!( !output.to_commit.is_empty(), "Genesis txn execution failed." diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 4bc08598be817..ed2f8cd55d5cc 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -55,6 +55,18 @@ impl TransactionBlockExecutor for FakeVM { ) -> Result { ChunkOutput::by_transaction_execution::(transactions, state_view) } + + fn execute_transaction_block_with_gas_limit( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result { + ChunkOutput::by_transaction_execution_with_gas_limit::( + transactions, + state_view, + maybe_gas_limit, + ) + } } impl VMExecutor for FakeVM { @@ -72,6 +84,14 @@ impl VMExecutor for FakeVM { ) -> Result, VMStatus> { Ok(Vec::new()) } + + fn execute_block_with_gas_limit( + _transactions: Vec, + _state_view: &impl StateView, + _maybe_gas_limit: Option, + ) -> Result, VMStatus> { + Ok(Vec::new()) + } } /// A fake database implementing DbReader and DbWriter diff --git a/execution/executor/src/mock_vm/mod.rs b/execution/executor/src/mock_vm/mod.rs index 4e6002aab0d52..8c541c930b089 100644 --- a/execution/executor/src/mock_vm/mod.rs +++ b/execution/executor/src/mock_vm/mod.rs @@ -64,6 +64,18 @@ impl TransactionBlockExecutor for MockVM { ) -> Result { ChunkOutput::by_transaction_execution::(transactions, state_view) } + + fn execute_transaction_block_with_gas_limit( + transactions: Vec, + state_view: CachedStateView, + maybe_gas_limit: Option, + ) -> Result { + ChunkOutput::by_transaction_execution_with_gas_limit::( + transactions, + state_view, + maybe_gas_limit, + ) + } } impl VMExecutor for MockVM { @@ -200,6 +212,14 @@ impl VMExecutor for MockVM { Ok(outputs) } + fn execute_block_with_gas_limit( + transactions: Vec, + state_view: &(impl StateView + Sync), + _maybe_gas_limit: Option, + ) -> Result, VMStatus> { + MockVM::execute_block(transactions, state_view) + } + fn execute_block_sharded( _sharded_block_executor: &ShardedBlockExecutor, _transactions: Vec, diff --git a/execution/executor/src/tests/chunk_executor_tests.rs b/execution/executor/src/tests/chunk_executor_tests.rs index 2875e60b8d9f9..b032dda0f21cb 100644 --- a/execution/executor/src/tests/chunk_executor_tests.rs +++ b/execution/executor/src/tests/chunk_executor_tests.rs @@ -273,9 +273,16 @@ fn test_executor_execute_and_commit_chunk_local_result_mismatch() { .map(|_| encode_mint_transaction(tests::gen_address(rng.gen::()), 100)) .collect::>(); let output = executor - .execute_block((block_id, block(txns)), parent_block_id) + .execute_block( + (block_id, block(txns, executor.get_block_gas_limit())), + parent_block_id, + ) .unwrap(); - let ledger_info = tests::gen_ledger_info(6, output.root_hash(), block_id, 1); + // With no block gas limit, StateCheckpoint txn is inserted to block before execution. + // So the ledger_info version needs to + 1 with no block gas limit. + let maybe_gas_limit = executor.get_block_gas_limit(); + let diff = maybe_gas_limit.map(|_| 0).unwrap_or(1); + let ledger_info = tests::gen_ledger_info(5 + diff, output.root_hash(), block_id, 1); executor.commit_blocks(vec![block_id], ledger_info).unwrap(); } diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index 31697183b271f..441bca23eda26 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -52,7 +52,10 @@ fn execute_and_commit_block( let id = gen_block_id(txn_index + 1); let output = executor - .execute_block((id, block(vec![txn])), parent_block_id) + .execute_block( + (id, block(vec![txn], executor.get_block_gas_limit())), + parent_block_id, + ) .unwrap(); let version = 2 * (txn_index + 1); assert_eq!(output.version(), version); @@ -77,6 +80,7 @@ impl TestExecutor { let waypoint = generate_waypoint::(&db, &genesis).unwrap(); maybe_bootstrap::(&db, &genesis, waypoint).unwrap(); let executor = BlockExecutor::new(db.clone()); + executor.update_block_gas_limit(Some(1000)); // Can comment out this line to test without gas limit TestExecutor { _path: path, @@ -147,7 +151,13 @@ fn test_executor_status() { let txn2 = encode_transfer_transaction(gen_address(0), gen_address(1), 500); let output = executor - .execute_block((block_id, block(vec![txn0, txn1, txn2])), parent_block_id) + .execute_block( + ( + block_id, + block(vec![txn0, txn1, txn2], executor.get_block_gas_limit()), + ), + parent_block_id, + ) .unwrap(); assert_eq!( @@ -173,7 +183,13 @@ fn test_executor_status_consensus_only() { let txn2 = encode_transfer_transaction(gen_address(0), gen_address(1), 500); let output = executor - .execute_block((block_id, block(vec![txn0, txn1, txn2])), parent_block_id) + .execute_block( + ( + block_id, + block(vec![txn0, txn1, txn2], executor.get_block_gas_limit()), + ), + parent_block_id, + ) .unwrap(); // We should not discard any transactions because we don't actually execute them. @@ -199,7 +215,10 @@ fn test_executor_one_block() { .map(|i| encode_mint_transaction(gen_address(i), 100)) .collect::>(); let output = executor - .execute_block((block_id, block(txns)), parent_block_id) + .execute_block( + (block_id, block(txns, executor.get_block_gas_limit())), + parent_block_id, + ) .unwrap(); let version = num_user_txns + 1; assert_eq!(output.version(), version); @@ -241,10 +260,22 @@ fn test_executor_two_blocks_with_failed_txns() { }) .collect::>(); let _output1 = executor - .execute_block((block1_id, block(block1_txns)), parent_block_id) + .execute_block( + ( + block1_id, + block(block1_txns, executor.get_block_gas_limit()), + ), + parent_block_id, + ) .unwrap(); let output2 = executor - .execute_block((block2_id, block(block2_txns)), block1_id) + .execute_block( + ( + block2_id, + block(block2_txns, executor.get_block_gas_limit()), + ), + block1_id, + ) .unwrap(); let ledger_info = gen_ledger_info(77, output2.root_hash(), block2_id, 1); @@ -262,7 +293,13 @@ fn test_executor_commit_twice() { .collect::>(); let block1_id = gen_block_id(1); let output1 = executor - .execute_block((block1_id, block(block1_txns)), parent_block_id) + .execute_block( + ( + block1_id, + block(block1_txns, executor.get_block_gas_limit()), + ), + parent_block_id, + ) .unwrap(); let ledger_info = gen_ledger_info(6, output1.root_hash(), block1_id, 1); executor @@ -288,7 +325,13 @@ fn test_executor_execute_same_block_multiple_times() { let mut responses = vec![]; for _i in 0..100 { let output = executor - .execute_block((block_id, block(txns.clone())), parent_block_id) + .execute_block( + ( + block_id, + block(txns.clone(), executor.get_block_gas_limit()), + ), + parent_block_id, + ) .unwrap(); responses.push(output); } @@ -296,6 +339,12 @@ fn test_executor_execute_same_block_multiple_times() { assert_eq!(responses.len(), 1); } +fn ledger_version_from_block_size(block_size: usize, maybe_gas_limit: Option) -> usize { + // With block gas limit, StateCheckpoint txn is inserted to block after execution. + // So the ledger_info version needs to block_size + 1 with block gas limit. + block_size + maybe_gas_limit.map(|_| 1).unwrap_or(0) +} + /// Generates a list of `TransactionListWithProof`s according to the given ranges. fn create_transaction_chunks( chunk_ranges: Vec>, @@ -319,13 +368,17 @@ fn create_transaction_chunks( let txn = encode_mint_transaction(gen_address(i), 100); txns.push(txn); } - txns.push(Transaction::StateCheckpoint(HashValue::random())); + if executor.get_block_gas_limit().is_none() { + txns.push(Transaction::StateCheckpoint(HashValue::random())); + } let id = gen_block_id(1); let output = executor .execute_block((id, txns.clone()), executor.committed_block_id()) .unwrap(); - let ledger_version = txns.len() as u64; + + let ledger_version = + ledger_version_from_block_size(txns.len(), executor.get_block_gas_limit()) as u64; let ledger_info = gen_ledger_info(ledger_version, output.root_hash(), id, 1); executor .commit_blocks(vec![id], ledger_info.clone()) @@ -361,7 +414,7 @@ fn test_noop_block_after_reconfiguration() { .execute_block((first_block_id, vec![first_txn]), parent_block_id) .unwrap(); parent_block_id = first_block_id; - let second_block = TestBlock::new(10, 10, gen_block_id(2)); + let second_block = TestBlock::new(10, 10, gen_block_id(2), executor.get_block_gas_limit()); let output2 = executor .execute_block((second_block.id, second_block.txns), parent_block_id) .unwrap(); @@ -432,7 +485,7 @@ fn apply_transaction_by_writeset( let chunk_output = ChunkOutput::by_transaction_output(transactions_and_outputs, state_view).unwrap(); - let (executed, _, _) = chunk_output.apply_to_ledger(&ledger_view).unwrap(); + let (executed, _, _) = chunk_output.apply_to_ledger(&ledger_view, None).unwrap(); db.writer .save_transactions( @@ -536,13 +589,12 @@ fn test_reconfig_suffix_empty_blocks() { db: _, executor, } = TestExecutor::new(); - let block_a = TestBlock::new(10000, 1, gen_block_id(1)); - let mut block_b = TestBlock::new(10000, 1, gen_block_id(2)); - let block_c = TestBlock::new(1, 1, gen_block_id(3)); - let block_d = TestBlock::new(1, 1, gen_block_id(4)); - let checkpoint_txn = block_b.txns.pop().unwrap(); + // add gas limit to be consistent with block executor that will add state checkpoint txn + let block_a = TestBlock::new(10000, 1, gen_block_id(1), Some(0)); + let mut block_b = TestBlock::new(10000, 1, gen_block_id(2), Some(0)); + let block_c = TestBlock::new(1, 1, gen_block_id(3), Some(0)); + let block_d = TestBlock::new(1, 1, gen_block_id(4), Some(0)); block_b.txns.push(encode_reconfiguration_transaction()); - block_b.txns.push(checkpoint_txn); let parent_block_id = executor.committed_block_id(); executor .execute_block((block_a.id, block_a.txns), parent_block_id) @@ -572,7 +624,7 @@ struct TestBlock { } impl TestBlock { - fn new(num_user_txns: u64, amount: u32, id: HashValue) -> Self { + fn new(num_user_txns: u64, amount: u32, id: HashValue, maybe_gas_limit: Option) -> Self { let txns = if num_user_txns == 0 { Vec::new() } else { @@ -580,6 +632,7 @@ impl TestBlock { (0..num_user_txns) .map(|index| encode_mint_transaction(gen_address(index), u64::from(amount))) .collect(), + maybe_gas_limit, ) }; TestBlock { txns, id } @@ -605,7 +658,7 @@ fn run_transactions_naive(transactions: Vec) -> HashValue { .unwrap(), ) .unwrap(); - let (executed, _, _) = out.apply_to_ledger(&ledger_view).unwrap(); + let (executed, _, _) = out.apply_to_ledger(&ledger_view, None).unwrap(); db.writer .save_transactions( &executed.transactions_to_commit().unwrap(), @@ -633,11 +686,12 @@ proptest! { 0..num_user_txns - 1 // avoid state checkpoint right after reconfig ) }).no_shrink()) { + let executor = TestExecutor::new(); + let block_id = gen_block_id(1); - let mut block = TestBlock::new(num_user_txns, 10, block_id); + let mut block = TestBlock::new(num_user_txns, 10, block_id, executor.get_block_gas_limit()); let num_txns = block.txns.len() as LeafCount; block.txns[reconfig_txn_index as usize] = encode_reconfiguration_transaction(); - let executor = TestExecutor::new(); let parent_block_id = executor.committed_block_id(); let output = executor.execute_block( @@ -664,14 +718,16 @@ proptest! { ).unwrap(); prop_assert!(retry_output.compute_status().iter().all(|s| matches!(*s, TransactionStatus::Keep(_)))); + let ledger_version = ledger_version_from_block_size(num_txns as usize, executor.get_block_gas_limit()) as u64; + // commit - let ledger_info = gen_ledger_info(num_txns as Version, retry_output.root_hash(), retry_block_id, 12345 /* timestamp */); + let ledger_info = gen_ledger_info(ledger_version, retry_output.root_hash(), retry_block_id, 12345 /* timestamp */); executor.commit_blocks(vec![retry_block_id], ledger_info).unwrap(); // get txn_infos from db let db = executor.db.reader.clone(); - prop_assert_eq!(db.get_latest_version().unwrap(), num_txns as Version); - let txn_list = db.get_transactions(1 /* start version */, num_txns, num_txns as Version /* ledger version */, false /* fetch events */).unwrap(); + prop_assert_eq!(db.get_latest_version().unwrap(), ledger_version); + let txn_list = db.get_transactions(1 /* start version */, num_txns, ledger_version /* ledger version */, false /* fetch events */).unwrap(); prop_assert_eq!(&block.txns, &txn_list.transactions); let txn_infos = txn_list.proof.transaction_infos; let write_sets = db.get_write_set_iterator(1, num_txns).unwrap().collect::>().unwrap(); @@ -692,13 +748,16 @@ proptest! { #[test] #[cfg_attr(feature = "consensus-only-perf-test", ignore)] fn test_executor_restart(a_size in 1..30u64, b_size in 1..30u64, amount in any::()) { - let block_a = TestBlock::new(a_size, amount, gen_block_id(1)); - let block_b = TestBlock::new(b_size, amount, gen_block_id(2)); - let TestExecutor { _path, db, executor } = TestExecutor::new(); + + let block_a = TestBlock::new(a_size, amount, gen_block_id(1), executor.get_block_gas_limit()); + let block_b = TestBlock::new(b_size, amount, gen_block_id(2), executor.get_block_gas_limit()); + let mut parent_block_id; let mut root_hash; + let maybe_gas_limit = executor.get_block_gas_limit(); + // First execute and commit one block, then destroy executor. { parent_block_id = executor.committed_block_id(); @@ -706,7 +765,7 @@ proptest! { (block_a.id, block_a.txns.clone()), parent_block_id ).unwrap(); root_hash = output_a.root_hash(); - let ledger_info = gen_ledger_info(block_a.txns.len() as u64, root_hash, block_a.id, 1); + let ledger_info = gen_ledger_info(ledger_version_from_block_size(block_a.txns.len(), maybe_gas_limit) as u64, root_hash, block_a.id, 1); executor.commit_blocks(vec![block_a.id], ledger_info).unwrap(); parent_block_id = block_a.id; } @@ -714,10 +773,11 @@ proptest! { // Now we construct a new executor and run one more block. { let executor = BlockExecutor::::new(db); + executor.update_block_gas_limit(maybe_gas_limit); let output_b = executor.execute_block((block_b.id, block_b.txns.clone()), parent_block_id).unwrap(); root_hash = output_b.root_hash(); let ledger_info = gen_ledger_info( - (block_a.txns.len() + block_b.txns.len()) as u64, + (ledger_version_from_block_size(block_a.txns.len(), maybe_gas_limit) + ledger_version_from_block_size(block_b.txns.len(), maybe_gas_limit)) as u64, root_hash, block_b.id, 2, @@ -728,7 +788,13 @@ proptest! { let expected_root_hash = run_transactions_naive({ let mut txns = vec![]; txns.extend(block_a.txns.iter().cloned()); + if executor.get_block_gas_limit().is_some() { + txns.push(Transaction::StateCheckpoint(block_a.id)); + } txns.extend(block_b.txns.iter().cloned()); + if executor.get_block_gas_limit().is_some() { + txns.push(Transaction::StateCheckpoint(block_b.id)); + } txns }); prop_assert_eq!(root_hash, expected_root_hash); @@ -771,7 +837,7 @@ proptest! { let second_block_id = gen_block_id(2); let output2 = executor.execute_block( - (second_block_id, block(second_block_txns)), + (second_block_id, block(second_block_txns, executor.get_block_gas_limit())), first_block_id, ).unwrap(); diff --git a/execution/executor/tests/db_bootstrapper_test.rs b/execution/executor/tests/db_bootstrapper_test.rs index 4d1c8e21057c0..f1c0cf93a5542 100644 --- a/execution/executor/tests/db_bootstrapper_test.rs +++ b/execution/executor/tests/db_bootstrapper_test.rs @@ -84,10 +84,13 @@ fn execute_and_commit(txns: Vec, db: &DbReaderWriter, signer: &Vali let li = db.reader.get_latest_ledger_info().unwrap(); let version = li.ledger_info().version(); let epoch = li.ledger_info().next_block_epoch(); - let target_version = version + txns.len() as u64; + let target_version = version + txns.len() as u64 + 1; // Due to StateCheckpoint txn let executor = BlockExecutor::::new(db.clone()); let output = executor - .execute_block((block_id, txns), executor.committed_block_id()) + .execute_block( + (block_id, block(txns, executor.get_block_gas_limit())), + executor.committed_block_id(), + ) .unwrap(); assert_eq!(output.num_leaves(), target_version + 1); let ledger_info_with_sigs = @@ -207,7 +210,7 @@ fn test_new_genesis() { let txn2 = get_account_transaction(genesis_key, 1, &account2, &account2_key); let txn3 = get_aptos_coin_mint_transaction(genesis_key, 2, &account1, 200_000_000); let txn4 = get_aptos_coin_mint_transaction(genesis_key, 3, &account2, 200_000_000); - execute_and_commit(block(vec![txn1, txn2, txn3, txn4]), &db, &signer); + execute_and_commit(vec![txn1, txn2, txn3, txn4], &db, &signer); assert_eq!(get_balance(&account1, &db), 200_000_000); assert_eq!(get_balance(&account2, &db), 200_000_000); @@ -297,7 +300,7 @@ fn test_new_genesis() { println!("FINAL TRANSFER"); // Transfer some money. let txn = get_aptos_coin_transfer_transaction(account1, 0, &account1_key, account2, 50_000_000); - execute_and_commit(block(vec![txn]), &db, &signer); + execute_and_commit(vec![txn], &db, &signer); // And verify. assert_eq!(get_balance(&account2, &db), 250_000_000); diff --git a/types/src/on_chain_config/execution_config.rs b/types/src/on_chain_config/execution_config.rs index 13a9d923e81c9..3b031ee39f40d 100644 --- a/types/src/on_chain_config/execution_config.rs +++ b/types/src/on_chain_config/execution_config.rs @@ -9,14 +9,24 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub enum OnChainExecutionConfig { V1(ExecutionConfigV1), + V2(ExecutionConfigV2), } /// The public interface that exposes all values with safe fallback. impl OnChainExecutionConfig { - /// The number of recent rounds that don't count into reputations. + /// The type of the transaction shuffler being used. pub fn transaction_shuffler_type(&self) -> TransactionShufflerType { match &self { OnChainExecutionConfig::V1(config) => config.transaction_shuffler_type.clone(), + OnChainExecutionConfig::V2(config) => config.transaction_shuffler_type.clone(), + } + } + + /// The per-block gas limit being used. + pub fn block_gas_limit(&self) -> Option { + match &self { + OnChainExecutionConfig::V1(_config) => None, + OnChainExecutionConfig::V2(config) => config.block_gas_limit, } } } @@ -59,6 +69,21 @@ impl Default for ExecutionConfigV1 { } } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct ExecutionConfigV2 { + pub transaction_shuffler_type: TransactionShufflerType, + pub block_gas_limit: Option, +} + +impl Default for ExecutionConfigV2 { + fn default() -> Self { + Self { + transaction_shuffler_type: TransactionShufflerType::NoShuffling, + block_gas_limit: None, + } + } +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it pub enum TransactionShufflerType { @@ -70,6 +95,7 @@ pub enum TransactionShufflerType { mod test { use super::*; use crate::on_chain_config::OnChainConfigPayload; + use rand::Rng; use std::{collections::HashMap, sync::Arc}; #[test] @@ -100,6 +126,35 @@ mod test { result.transaction_shuffler_type(), TransactionShufflerType::SenderAwareV1(32) )); + + // V2 test with random per-block gas limit + let rand_gas_limit = rand::thread_rng().gen_range(0, 1000000) as u64; + let config = OnChainExecutionConfig::V2(ExecutionConfigV2 { + transaction_shuffler_type: TransactionShufflerType::SenderAwareV1(32), + block_gas_limit: Some(rand_gas_limit), + }); + + let s = serde_yaml::to_string(&config).unwrap(); + let result = serde_yaml::from_str::(&s).unwrap(); + assert!(matches!( + result.transaction_shuffler_type(), + TransactionShufflerType::SenderAwareV1(32) + )); + assert!(result.block_gas_limit() == Some(rand_gas_limit)); + + // V2 test with no per-block gas limit + let config = OnChainExecutionConfig::V2(ExecutionConfigV2 { + transaction_shuffler_type: TransactionShufflerType::SenderAwareV1(32), + block_gas_limit: None, + }); + + let s = serde_yaml::to_string(&config).unwrap(); + let result = serde_yaml::from_str::(&s).unwrap(); + assert!(matches!( + result.transaction_shuffler_type(), + TransactionShufflerType::SenderAwareV1(32) + )); + assert!(matches!(result.block_gas_limit(), None)); } #[test] @@ -122,5 +177,50 @@ mod test { result.transaction_shuffler_type(), TransactionShufflerType::SenderAwareV1(32) )); + + // V2 test with random per-block gas limit + let rand_gas_limit = rand::thread_rng().gen_range(0, 1000000) as u64; + let execution_config = OnChainExecutionConfig::V2(ExecutionConfigV2 { + transaction_shuffler_type: TransactionShufflerType::SenderAwareV1(32), + block_gas_limit: Some(rand_gas_limit), + }); + + let mut configs = HashMap::new(); + configs.insert( + OnChainExecutionConfig::CONFIG_ID, + // Requires double serialization, check deserialize_into_config for more details + bcs::to_bytes(&bcs::to_bytes(&execution_config).unwrap()).unwrap(), + ); + + let payload = OnChainConfigPayload::new(1, Arc::new(configs)); + + let result: OnChainExecutionConfig = payload.get().unwrap(); + assert!(matches!( + result.transaction_shuffler_type(), + TransactionShufflerType::SenderAwareV1(32) + )); + assert!(result.block_gas_limit() == Some(rand_gas_limit)); + + // V2 test with no per-block gas limit + let execution_config = OnChainExecutionConfig::V2(ExecutionConfigV2 { + transaction_shuffler_type: TransactionShufflerType::SenderAwareV1(32), + block_gas_limit: None, + }); + + let mut configs = HashMap::new(); + configs.insert( + OnChainExecutionConfig::CONFIG_ID, + // Requires double serialization, check deserialize_into_config for more details + bcs::to_bytes(&bcs::to_bytes(&execution_config).unwrap()).unwrap(), + ); + + let payload = OnChainConfigPayload::new(1, Arc::new(configs)); + + let result: OnChainExecutionConfig = payload.get().unwrap(); + assert!(matches!( + result.transaction_shuffler_type(), + TransactionShufflerType::SenderAwareV1(32) + )); + assert!(matches!(result.block_gas_limit(), None)); } } diff --git a/types/src/test_helpers/transaction_test_helpers.rs b/types/src/test_helpers/transaction_test_helpers.rs index 04ef543c27133..9d4d1de16bc56 100644 --- a/types/src/test_helpers/transaction_test_helpers.rs +++ b/types/src/test_helpers/transaction_test_helpers.rs @@ -239,7 +239,9 @@ pub fn get_test_txn_with_chain_id( SignedTransaction::new(raw_txn, public_key, signature) } -pub fn block(mut user_txns: Vec) -> Vec { - user_txns.push(Transaction::StateCheckpoint(HashValue::random())); +pub fn block(mut user_txns: Vec, maybe_gas_limit: Option) -> Vec { + if maybe_gas_limit.is_none() { + user_txns.push(Transaction::StateCheckpoint(HashValue::random())); + } user_txns } diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index 6e617a5499540..2d3563cae2fda 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -818,6 +818,14 @@ impl TransactionStatus { } } + pub fn is_retry(&self) -> bool { + match self { + TransactionStatus::Discard(_) => false, + TransactionStatus::Keep(_) => false, + TransactionStatus::Retry => true, + } + } + pub fn as_kept_status(&self) -> Result { match self { TransactionStatus::Keep(s) => Ok(s.clone()),