diff --git a/Cargo.lock b/Cargo.lock index fb51fb9c3bf92f..00e67a078dc2e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,6 +678,7 @@ dependencies = [ "aptos-aggregator", "aptos-drop-helper", "aptos-infallible", + "aptos-language-e2e-tests", "aptos-logger", "aptos-metrics-core", "aptos-mvhashmap", @@ -1646,7 +1647,6 @@ dependencies = [ name = "aptos-experimental-ptx-executor" version = "0.1.0" dependencies = [ - "aptos-crypto", "aptos-experimental-runtimes", "aptos-infallible", "aptos-logger", diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 9e9f93c62adf1d..636ca6bf104168 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -3,14 +3,15 @@ use anyhow::{bail, format_err, Result}; use aptos_block_executor::{ - code_cache_global_manager::ModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; use aptos_types::{ account_address::AccountAddress, - block_executor::config::{ - BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + block_executor::{ + config::{BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig}, + execution_state::TransactionSliceMetadata, }, contract_event::ContractEvent, state_store::TStateView, @@ -433,13 +434,12 @@ fn execute_block_no_limit( BlockAptosVM::execute_block::<_, NoOpTransactionCommitHook>( sig_verified_txns, state_view, - &ModuleCacheManager::new(), + &AptosModuleCacheManager::new(), BlockExecutorConfig { local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), }, - None, - None, + TransactionSliceMetadata::unknown(), None, ) .map(BlockOutput::into_transaction_outputs_forced) diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index 03355bab525f6b..6c95e1a213b3c3 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -4,7 +4,7 @@ use crate::transactions; use aptos_bitvec::BitVec; use aptos_block_executor::{ - code_cache_global_manager::ModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, @@ -18,6 +18,7 @@ use aptos_language_e2e_tests::{ use aptos_types::{ block_executor::{ config::{BlockExecutorConfig, BlockExecutorConfigFromOnchain}, + execution_state::TransactionSliceMetadata, partitioner::PartitionedTransactions, }, block_metadata::BlockMetadata, @@ -220,10 +221,9 @@ where >( transactions, self.state_view.as_ref(), - &ModuleCacheManager::new(), + &AptosModuleCacheManager::new(), BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), - None, - None, + TransactionSliceMetadata::unknown(), None, ) .expect("VM should not fail to start") @@ -271,13 +271,12 @@ where >( transactions, self.state_view.as_ref(), - &ModuleCacheManager::new(), + &AptosModuleCacheManager::new(), BlockExecutorConfig::new_maybe_block_limit( concurrency_level_per_shard, maybe_block_gas_limit, ), - None, - None, + TransactionSliceMetadata::unknown(), None, ) .expect("VM should not fail to start") diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index d50b5c53710470..9a671addab1a00 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -28,7 +28,7 @@ use crate::{ }; use anyhow::anyhow; use aptos_block_executor::{ - code_cache_global_manager::ModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_crypto::HashValue; use aptos_framework::{ @@ -49,6 +49,7 @@ use aptos_types::{ BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, BlockExecutorModuleCacheLocalConfig, }, + execution_state::TransactionSliceMetadata, partitioner::PartitionedTransactions, }, block_metadata::BlockMetadata, @@ -70,7 +71,6 @@ use aptos_types::{ TransactionAuxiliaryData, TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult, ViewFunctionOutput, WriteSetPayload, }, - vm::modules::AptosModuleExtension, vm_status::{AbortLocation, StatusCode, VMStatus}, }; use aptos_utils::aptos_try; @@ -117,7 +117,7 @@ use move_vm_metrics::{Timer, VM_TIMER}; use move_vm_runtime::{ logging::expect_no_verification_errors, module_traversal::{TraversalContext, TraversalStorage}, - Module, RuntimeEnvironment, WithRuntimeEnvironment, + RuntimeEnvironment, WithRuntimeEnvironment, }; use move_vm_types::gas::{GasMeter, UnmeteredGasMeter}; use num_cpus; @@ -2782,14 +2782,13 @@ impl AptosVM { pub struct AptosVMBlockExecutor { /// Manages module cache and execution environment of this block executor. Users of executor /// must use manager's API to ensure the correct state of caches. - module_cache_manager: - ModuleCacheManager, + module_cache_manager: AptosModuleCacheManager, } impl VMBlockExecutor for AptosVMBlockExecutor { fn new() -> Self { Self { - module_cache_manager: ModuleCacheManager::new(), + module_cache_manager: AptosModuleCacheManager::new(), } } @@ -2798,8 +2797,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { fail_point!("move_adapter::execute_block", |_| { Err(VMStatus::error( @@ -2831,8 +2829,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { }, onchain: onchain_config, }, - parent_block, - current_block, + transaction_slice_metadata, None, ); if ret.is_ok() { diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index c71c363e936f1b..5387978387a486 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -12,16 +12,13 @@ use aptos_aggregator::{ delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View, }; use aptos_block_executor::{ - code_cache_global::GlobalModuleCache, code_cache_global_manager::ModuleCacheManager, - errors::BlockExecutionError, executor::BlockExecutor, - task::TransactionOutput as BlockExecutorTransactionOutput, + code_cache_global_manager::AptosModuleCacheManager, errors::BlockExecutionError, + executor::BlockExecutor, task::TransactionOutput as BlockExecutorTransactionOutput, txn_commit_hook::TransactionCommitHook, types::InputOutputKey, }; -use aptos_crypto::HashValue; use aptos_infallible::Mutex; -use aptos_logger::error; use aptos_types::{ - block_executor::config::BlockExecutorConfig, + block_executor::{config::BlockExecutorConfig, execution_state::TransactionSliceMetadata}, contract_event::ContractEvent, error::PanicError, executable::ExecutableTestType, @@ -31,29 +28,18 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, TransactionOutput, TransactionStatus, }, - vm::modules::AptosModuleExtension, write_set::WriteOp, }; -use aptos_vm_environment::environment::AptosEnvironment; -use aptos_vm_logging::{ - alert, flush_speculative_logs, init_speculative_logs, prelude::CRITICAL_ERRORS, -}; +use aptos_vm_logging::{flush_speculative_logs, init_speculative_logs}; use aptos_vm_types::{ - abstract_write_op::AbstractResourceWriteOp, - module_and_script_storage::{AptosCodeStorageAdapter, AsAptosCodeStorage}, - module_write_set::ModuleWrite, - output::VMOutput, + abstract_write_op::AbstractResourceWriteOp, module_write_set::ModuleWrite, output::VMOutput, resolver::ResourceGroupSize, }; -use move_binary_format::{errors::VMError, CompiledModule}; use move_core_types::{ - account_address::AccountAddress, - ident_str, - language_storage::{ModuleId, StructTag}, + language_storage::StructTag, value::MoveTypeLayout, vm_status::{StatusCode, VMStatus}, }; -use move_vm_runtime::{Module, ModuleStorage}; use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use once_cell::sync::{Lazy, OnceCell}; use std::{ @@ -409,16 +395,9 @@ impl BlockAptosVM { executor_thread_pool: Arc, signature_verified_block: &[SignatureVerifiedTransaction], state_view: &S, - module_cache_manager: &ModuleCacheManager< - HashValue, - ModuleId, - CompiledModule, - Module, - AptosModuleExtension, - >, + module_cache_manager: &AptosModuleCacheManager, config: BlockExecutorConfig, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, transaction_commit_listener: Option, ) -> Result, VMStatus> { let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer(); @@ -432,36 +411,11 @@ impl BlockAptosVM { BLOCK_EXECUTOR_CONCURRENCY.set(config.local.concurrency_level as i64); - let environment = - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view); - let is_loader_v2_enabled = environment.features().is_loader_v2_enabled(); - - let (environment, module_cache) = if is_loader_v2_enabled { - if !module_cache_manager.mark_ready(parent_block, current_block) { - return Err(VMStatus::error( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - Some("Unable to mark module caches for block execution as ready".to_string()), - )); - } - module_cache_manager - .check_ready_and_get_caches(environment, &config.local.module_cache_config)? - } else { - (environment, Arc::new(GlobalModuleCache::empty())) - }; - - // Finally, to avoid cold starts, fetch the framework code prior to block execution. This - // ensures the state with 0 modules cached is not possible for block execution (as long as - // the config enables the framework prefetch). - if is_loader_v2_enabled - && module_cache.num_modules() == 0 - && config.local.module_cache_config.prefetch_framework_code - { - let code_storage = state_view.as_aptos_code_storage(environment.clone()); - prefetch_aptos_framework(code_storage, &module_cache).map_err(|err| { - alert!("Failed to load Aptos framework to module cache: {:?}", err); - VMError::from(err).into_vm_status() - })?; - } + let mut module_cache_manager_guard = module_cache_manager.try_lock( + &state_view, + &config.local.module_cache_config, + transaction_slice_metadata, + )?; let executor = BlockExecutor::< SignatureVerifiedTransaction, @@ -469,27 +423,13 @@ impl BlockAptosVM { S, L, ExecutableTestType, - >::new( - config, - executor_thread_pool, - module_cache, - transaction_commit_listener, - ); - - if is_loader_v2_enabled && !module_cache_manager.mark_executing() { - return Err(VMStatus::error( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - Some("Unable to mark block execution start".to_string()), - )); - } - let ret = executor.execute_block(environment, signature_verified_block, state_view); - if is_loader_v2_enabled && !module_cache_manager.mark_done() { - return Err(VMStatus::error( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - Some("Unable to mark block execution as done".to_string()), - )); - } + >::new(config, executor_thread_pool, transaction_commit_listener); + let ret = executor.execute_block( + signature_verified_block, + state_view, + &mut module_cache_manager_guard, + ); match ret { Ok(block_output) => { let (transaction_outputs, block_end_info) = block_output.into_inner(); @@ -527,16 +467,9 @@ impl BlockAptosVM { >( signature_verified_block: &[SignatureVerifiedTransaction], state_view: &S, - module_cache_manager: &ModuleCacheManager< - HashValue, - ModuleId, - CompiledModule, - Module, - AptosModuleExtension, - >, + module_cache_manager: &AptosModuleCacheManager, config: BlockExecutorConfig, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, transaction_commit_listener: Option, ) -> Result, VMStatus> { Self::execute_block_on_thread_pool::( @@ -545,72 +478,8 @@ impl BlockAptosVM { state_view, module_cache_manager, config, - parent_block, - current_block, + transaction_slice_metadata, transaction_commit_listener, ) } } - -/// If Aptos framework exists, loads "transaction_validation.move" and all its transitive -/// dependencies from storage into provided module cache. If loading fails for any reason, a panic -/// error is returned. -fn prefetch_aptos_framework( - code_storage: AptosCodeStorageAdapter, - module_cache: &GlobalModuleCache, -) -> Result<(), PanicError> { - // If framework code exists in storage, the transitive closure will be verified and cached. - let maybe_loaded = code_storage - .fetch_verified_module(&AccountAddress::ONE, ident_str!("transaction_validation")) - .map_err(|err| { - // There should be no errors when pre-fetching the framework, if there are, we - // better return an error here. - PanicError::CodeInvariantError(format!("Unable to fetch Aptos framework: {:?}", err)) - })?; - - if maybe_loaded.is_some() { - // Framework must have been loaded. Drain verified modules from local cache into - // global cache. - let verified_module_code_iter = code_storage.into_verified_module_code_iter()?; - module_cache.insert_verified_unsync(verified_module_code_iter)?; - } - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use aptos_language_e2e_tests::{data_store::FakeDataStore, executor::FakeExecutor}; - - #[test] - fn test_prefetch_existing_aptos_framework() { - let executor = FakeExecutor::from_head_genesis(); - let state_view = executor.get_state_view(); - - let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); - let code_storage = state_view.as_aptos_code_storage(environment); - - let module_cache = GlobalModuleCache::empty(); - assert_eq!(module_cache.num_modules(), 0); - - let result = prefetch_aptos_framework(code_storage, &module_cache); - assert!(result.is_ok()); - assert!(module_cache.num_modules() > 0); - } - - #[test] - fn test_prefetch_non_existing_aptos_framework() { - let state_view = FakeDataStore::default(); - - let environment = - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view); - let code_storage = state_view.as_aptos_code_storage(environment); - - let module_cache = GlobalModuleCache::empty(); - assert_eq!(module_cache.num_modules(), 0); - - let result = prefetch_aptos_framework(code_storage, &module_cache); - assert!(result.is_ok()); - assert_eq!(module_cache.num_modules(), 0); - } -} diff --git a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs index 306e847eae6c7f..bbd37221be7355 100644 --- a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs @@ -27,13 +27,12 @@ pub(crate) struct AptosExecutorTask { } impl ExecutorTask for AptosExecutorTask { - type Environment = AptosEnvironment; type Error = VMStatus; type Output = AptosTransactionOutput; type Txn = SignatureVerifiedTransaction; - fn init(env: Self::Environment, state_view: &impl StateView) -> Self { - let vm = AptosVM::new(env, state_view); + fn init(environment: AptosEnvironment, state_view: &impl StateView) -> Self { + let vm = AptosVM::new(environment, state_view); let id = state_view.id(); Self { vm, id } } diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index 3f21e37f8e2b17..5fa59716739584 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -126,10 +126,10 @@ pub mod verifier; pub use crate::aptos_vm::{AptosSimulationVM, AptosVM}; use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}; -use aptos_crypto::HashValue; use aptos_types::{ block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::PartitionedTransactions, }, state_store::StateView, transaction::{ @@ -171,8 +171,7 @@ pub trait VMBlockExecutor: Send + Sync { transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus>; /// Executes a block of transactions and returns output for each one of them, without applying @@ -186,9 +185,8 @@ pub trait VMBlockExecutor: Send + Sync { transactions, state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), - // For all use cases, we run on an unknown state. Hence, defaulting to None here. - None, - None, + // For all use cases, we run on an unknown state. + TransactionSliceMetadata::unknown(), ) .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index 4c52ba947339fc..7ec39f1d4bf61f 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -16,11 +16,12 @@ use crate::{ ExecutorShardCommand, }, }; -use aptos_block_executor::code_cache_global_manager::ModuleCacheManager; +use aptos_block_executor::code_cache_global_manager::AptosModuleCacheManager; use aptos_logger::{info, trace}; use aptos_types::{ block_executor::{ config::{BlockExecutorConfig, BlockExecutorLocalConfig}, + execution_state::TransactionSliceMetadata, partitioner::{ShardId, SubBlock, SubBlocksForShard, TransactionWithDependencies}, }, state_store::StateView, @@ -142,10 +143,9 @@ impl ShardedExecutorService { aggr_overridden_state_view.as_ref(), // Since we execute blocks in parallel, we cannot share module caches, so each // thread has its own caches. - &ModuleCacheManager::new(), + &AptosModuleCacheManager::new(), config, - None, - None, + TransactionSliceMetadata::unknown(), cross_shard_commit_sender, ) .map(BlockOutput::into_transaction_outputs_forced); diff --git a/aptos-move/block-executor/Cargo.toml b/aptos-move/block-executor/Cargo.toml index a1aefe7441e450..61ebd93b952ab3 100644 --- a/aptos-move/block-executor/Cargo.toml +++ b/aptos-move/block-executor/Cargo.toml @@ -51,6 +51,7 @@ scopeguard = { workspace = true } [dev-dependencies] aptos-aggregator = { workspace = true, features = ["testing"] } +aptos-language-e2e-tests = { workspace = true } aptos-types = { workspace = true, features = ["testing"] } criterion = { workspace = true } fail = { workspace = true, features = ["failpoints"] } diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs index 834becdcd4c948..516b0855c39c42 100644 --- a/aptos-move/block-executor/src/captured_reads.rs +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -1560,7 +1560,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = GlobalModuleCache::empty(); + let mut global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8))); @@ -1697,7 +1697,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - let global_module_cache = GlobalModuleCache::empty(); + let mut global_module_cache = GlobalModuleCache::empty(); let per_block_module_cache = SyncModuleCache::empty(); // Module exists in global cache. diff --git a/aptos-move/block-executor/src/code_cache.rs b/aptos-move/block-executor/src/code_cache.rs index 8d875dca1ce2d8..d9b236346dfe28 100644 --- a/aptos-move/block-executor/src/code_cache.rs +++ b/aptos-move/block-executor/src/code_cache.rs @@ -140,7 +140,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> ModuleCache > { // First, look up the module in the cross-block global module cache. Record the read for // later validation in case the read module is republished. - if let Some(module) = self.global_module_cache.get(key) { + if let Some(module) = self.global_module_cache.get_valid(key) { match &self.latest_view { ViewState::Sync(state) => state .captured_reads diff --git a/aptos-move/block-executor/src/code_cache_global.rs b/aptos-move/block-executor/src/code_cache_global.rs index 076dca358be7f3..a74e96170c95c4 100644 --- a/aptos-move/block-executor/src/code_cache_global.rs +++ b/aptos-move/block-executor/src/code_cache_global.rs @@ -1,7 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::explicit_sync_wrapper::ExplicitSyncWrapper; use aptos_types::error::PanicError; use hashbrown::HashMap; use move_vm_types::code::{ModuleCode, WithSize}; @@ -9,7 +8,7 @@ use std::{ hash::Hash, ops::Deref, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, }; @@ -63,9 +62,9 @@ where /// block execution. Modified safely only at block boundaries. pub struct GlobalModuleCache { /// Module cache containing the verified code. - module_cache: ExplicitSyncWrapper>>, + module_cache: HashMap>, /// Sum of serialized sizes (in bytes) of all cached modules. - size: AtomicUsize, + size: usize, } impl GlobalModuleCache @@ -77,15 +76,14 @@ where /// Returns new empty module cache. pub fn empty() -> Self { Self { - module_cache: ExplicitSyncWrapper::new(HashMap::new()), - size: AtomicUsize::new(0), + module_cache: HashMap::new(), + size: 0, } } /// Returns true if the key exists in cache and the corresponding module is valid. pub fn contains_valid(&self, key: &K) -> bool { self.module_cache - .acquire() .get(key) .is_some_and(|entry| entry.is_valid()) } @@ -94,57 +92,48 @@ where /// cache for the associated key will result in a cache miss. If an entry does not to exist, it /// is a no-op. pub fn mark_invalid_if_contains(&self, key: &K) { - if let Some(entry) = self.module_cache.acquire().get(key) { + if let Some(entry) = self.module_cache.get(key) { entry.mark_invalid(); } } /// Returns the module stored in cache. If the module has not been cached, or it exists but is /// not valid, [None] is returned. - pub fn get(&self, key: &K) -> Option>> { + pub fn get_valid(&self, key: &K) -> Option>> { self.module_cache - .acquire() .get(key) .and_then(|entry| entry.is_valid().then(|| Arc::clone(entry.module_code()))) } /// Returns the number of entries in the cache. pub fn num_modules(&self) -> usize { - self.module_cache.acquire().len() + self.module_cache.len() } /// Returns the sum of serialized sizes of modules stored in cache. pub fn size_in_bytes(&self) -> usize { - self.size.load(Ordering::Relaxed) + self.size } - /// **Use with caution: should never be called during block execution.** - /// /// Flushes the module cache. - pub fn flush_unsync(&self) { - self.module_cache.acquire().clear(); - self.size.store(0, Ordering::Relaxed); + pub fn flush(&mut self) { + self.module_cache.clear(); + self.size = 0; } - /// **Use with caution: should never be called during block execution.** - /// /// Inserts modules into the cache. /// Notes: /// 1. Only verified modules are inserted. /// 2. Valid modules should not be removed, and new modules should have unique ownership. If /// these constraints are violated, a panic error is returned. - // TODO(loader_v2): Use a trait for sync methods, and a concrete implementation for unsync. pub fn insert_verified_unsync( - &self, + &mut self, modules: impl Iterator>)>, ) -> Result<(), PanicError> { use hashbrown::hash_map::Entry::*; - let mut guard = self.module_cache.acquire(); - let module_cache = guard.dereference_mut(); - for (key, module) in modules { - if let Occupied(entry) = module_cache.entry(key.clone()) { + if let Occupied(entry) = self.module_cache.entry(key.clone()) { if entry.get().is_valid() { return Err(PanicError::CodeInvariantError( "Should never overwrite a valid module".to_string(), @@ -152,17 +141,16 @@ where } else { // Otherwise, remove the invalid entry. let size = entry.get().module_code().extension().size_in_bytes(); - self.size.fetch_sub(size, Ordering::Relaxed); + self.size -= size; entry.remove(); } } if module.code().is_verified() { - self.size - .fetch_add(module.extension().size_in_bytes(), Ordering::Relaxed); + self.size += module.extension().size_in_bytes(); let entry = Entry::new(module).expect("Module has been checked and must be verified"); - let prev = module_cache.insert(key.clone(), entry); + let prev = self.module_cache.insert(key.clone(), entry); // At this point, we must have removed the entry, or returned a panic error. assert!(prev.is_none()) @@ -173,10 +161,9 @@ where /// Insert the module to cache. Used for tests only. #[cfg(any(test, feature = "testing"))] - pub fn insert(&self, key: K, module: Arc>) { - self.size - .fetch_add(module.extension().size_in_bytes(), Ordering::Relaxed); - self.module_cache.acquire().insert( + pub fn insert(&mut self, key: K, module: Arc>) { + self.size += module.extension().size_in_bytes(); + self.module_cache.insert( key, Entry::new(module).expect("Module code should be verified"), ); @@ -185,12 +172,9 @@ where /// Removes the module from cache and returns true. If the module does not exist for the /// associated key, returns false. Used for tests only. #[cfg(any(test, feature = "testing"))] - pub fn remove(&self, key: &K) -> bool { - if let Some(entry) = self.module_cache.acquire().remove(key) { - self.size.fetch_sub( - entry.module_code().extension().size_in_bytes(), - Ordering::Relaxed, - ); + pub fn remove(&mut self, key: &K) -> bool { + if let Some(entry) = self.module_cache.remove(key) { + self.size -= entry.module_code().extension().size_in_bytes(); true } else { false @@ -221,7 +205,7 @@ mod test { #[test] fn test_cache_contains_valid_and_get() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); // Set the state. cache.insert(0, mock_verified_code(0, MockExtension::new(8))); @@ -234,14 +218,14 @@ mod test { assert!(!cache.contains_valid(&1)); assert!(!cache.contains_valid(&3)); - assert!(cache.get(&0).is_some()); - assert!(cache.get(&1).is_none()); - assert!(cache.get(&3).is_none()); + assert!(cache.get_valid(&0).is_some()); + assert!(cache.get_valid(&1).is_none()); + assert!(cache.get_valid(&3).is_none()); } #[test] fn test_cache_sizes_and_flush_unchecked() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); @@ -255,14 +239,14 @@ mod test { assert_eq!(cache.num_modules(), 2); assert_eq!(cache.size_in_bytes(), 24); - cache.flush_unsync(); + cache.flush(); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); } #[test] fn test_cache_insert_verified_unchecked() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); let mut new_modules = vec![]; for i in 0..10 { @@ -278,7 +262,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_does_not_add_deserialized_code() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); let deserialized_modules = vec![(0, mock_deserialized_code(0, MockExtension::new(8)))]; assert_ok!(cache.insert_verified_unsync(deserialized_modules.into_iter())); @@ -289,7 +273,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_does_not_override_valid_modules() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); cache.insert(0, mock_verified_code(0, MockExtension::new(8))); assert_eq!(cache.num_modules(), 1); @@ -301,7 +285,7 @@ mod test { #[test] fn test_cache_insert_verified_unchecked_overrides_invalid_modules() { - let cache = GlobalModuleCache::empty(); + let mut cache = GlobalModuleCache::empty(); cache.insert(0, mock_verified_code(0, MockExtension::new(8))); cache.mark_invalid_if_contains(&0); diff --git a/aptos-move/block-executor/src/code_cache_global_manager.rs b/aptos-move/block-executor/src/code_cache_global_manager.rs index f616a85703e768..084e7134c8c2d9 100644 --- a/aptos-move/block-executor/src/code_cache_global_manager.rs +++ b/aptos-move/block-executor/src/code_cache_global_manager.rs @@ -7,22 +7,29 @@ use crate::{ GLOBAL_MODULE_CACHE_NUM_MODULES, GLOBAL_MODULE_CACHE_SIZE_IN_BYTES, STRUCT_NAME_INDEX_MAP_NUM_ENTRIES, }, - explicit_sync_wrapper::ExplicitSyncWrapper, }; -use aptos_types::block_executor::config::BlockExecutorModuleCacheLocalConfig; +use aptos_types::{ + block_executor::{ + config::BlockExecutorModuleCacheLocalConfig, execution_state::TransactionSliceMetadata, + }, + error::PanicError, + state_store::StateView, + vm::modules::AptosModuleExtension, +}; use aptos_vm_environment::environment::AptosEnvironment; -use move_binary_format::errors::Location; -use move_core_types::vm_status::{StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, VMStatus}; -use move_vm_runtime::WithRuntimeEnvironment; -use move_vm_types::code::WithSize; -use parking_lot::Mutex; -use std::{ - fmt::Debug, - hash::Hash, - mem, - ops::{Deref, DerefMut}, - sync::Arc, +use aptos_vm_logging::alert; +use aptos_vm_types::module_and_script_storage::{AptosCodeStorageAdapter, AsAptosCodeStorage}; +use move_binary_format::{ + errors::{Location, VMError}, + CompiledModule, +}; +use move_core_types::{ + account_address::AccountAddress, ident_str, language_storage::ModuleId, vm_status::VMStatus, }; +use move_vm_runtime::{Module, ModuleStorage, WithRuntimeEnvironment}; +use move_vm_types::code::WithSize; +use parking_lot::{Mutex, MutexGuard}; +use std::{hash::Hash, ops::Deref, sync::Arc}; /// Raises an alert with the specified message. In case we run in testing mode, instead prints the /// message to standard output. @@ -31,6 +38,7 @@ macro_rules! alert_or_println { if cfg!(any(test, feature = "testing")) { println!($($arg)*) } else { + use aptos_vm_logging::{alert, prelude::CRITICAL_ERRORS}; use aptos_logger::error; alert!($($arg)*); @@ -38,466 +46,307 @@ macro_rules! alert_or_println { }; } -/// Represents the state of [GlobalModuleCache]. The following transitions are allowed: -/// 2. [State::Ready] --> [State::Executing]. -/// 3. [State::Executing] --> [State::Done]. -/// 4. [State::Done] --> [State::Ready]. -/// The optional value stored in variants is propagated during state transitions. When a full cycle -/// is reached (just before [State::Done] to [State::Ready] transition), the user can check if the -/// value is expected and continue with a new one. For instance: -/// ```text -/// Ready(Some(0)) --> Executing(Some(0)) --> Done(Some(0)) --> Ready(Some(1)) is allowed. -/// Ready(Some(0)) --> Executing(Some(0)) --> Done(Some(0)) --> Ready(Some(2)) is not allowed. -/// ``` -#[derive(Clone, Debug, Eq, PartialEq)] -enum State { - Ready(Option), - Executing(Option), - Done(Option), -} - -/// Manages module caches and the execution environment, possible across multiple blocks. -pub struct ModuleCacheManager { - /// The state of global caches. - state: Mutex>, +/// Manages module caches and the execution environment, possibly across multiple blocks. +pub struct ModuleCacheManager { + /// Records the last observed metadata associated with a batch of executed transactions. When a + /// new batch of transactions is about to be executed, the associated metadata can be checked + /// to ensure that the execution history is linear. + transaction_slice_metadata: TransactionSliceMetadata, - /// During concurrent executions, this module cache is read-only. However, it can be mutated - /// when it is known that there are no concurrent accesses. [ModuleCacheManager] must ensure - /// the safety. - module_cache: Arc>, /// The execution environment, initially set to [None]. The environment, as long as it does not /// change, can be kept for multiple block executions. - environment: ExplicitSyncWrapper>, + environment: Option, + /// Module cache, initially empty, that can be used for parallel block execution. It is the + /// responsibility of [ModuleCacheManager] to ensure it stays in sync with the environment and + /// the state. + module_cache: GlobalModuleCache, } -impl ModuleCacheManager +impl ModuleCacheManager where - T: Debug + Eq, K: Hash + Eq + Clone, - VC: Deref>, + V: Deref>, E: WithSize, { - /// Returns a new instance of [ModuleCacheManager] in a [State::Done] state with uninitialized - /// current value. + /// Returns a new instance of [ModuleCacheManager]. #[allow(clippy::new_without_default)] pub fn new() -> Self { Self { - state: Mutex::new(State::Done(None)), - module_cache: Arc::new(GlobalModuleCache::empty()), - environment: ExplicitSyncWrapper::new(None), - } - } - - /// If state is [State::Done], sets the state to [State::Ready] with the current value and - /// returns true. Otherwise, raises an alert and returns false. Additionally, synchronizes - /// module and environment caches based on the provided previous value. - pub fn mark_ready(&self, previous: Option<&T>, current: Option) -> bool { - let mut state = self.state.lock(); - - if let State::Done(recorded_previous) = state.deref() { - // If the state is done, but the values do not exist or do not match, we flush global - // caches because they execute on top of unknown state (or on top of some different to - // the previous state). - if !recorded_previous - .as_ref() - .is_some_and(|r| previous.is_some_and(|p| r == p)) - { - if let Some(environment) = self.environment.acquire().as_ref() { - environment - .runtime_environment() - .flush_struct_name_and_info_caches(); - } - self.module_cache.flush_unsync(); - } - - *state = State::Ready(current); - true - } else { - // We are not in the done state, this is an error. - alert_or_println!( - "Unable to mark ready, state: {:?}, previous: {:?}, current: {:?}", - state, - previous, - current - ); - false + transaction_slice_metadata: TransactionSliceMetadata::unknown(), + environment: None, + module_cache: GlobalModuleCache::empty(), } } - /// When in [State::Ready], runs different checks on cached modules and environment: - /// 1. If the environment is not initialized, or is different from the one in storage, it is - /// re-initialized, and module caches are flushed. - /// 2. If too many struct names have been cached in re-indexing map in runtime environment, - /// struct type caches and module caches are flushed. - /// 3. If module cache size is too large (in bytes), it is flushed. - /// The final environment and module caches are returned. - pub fn check_ready_and_get_caches( - &self, + /// Checks if the manager is ready for execution. That is: + /// 1. If previously recorded transaction metadata is not immediately before, flushes module + /// and environment. + /// 2. Sets the metadata to the new one. + /// 3. Checks if environment is set and is the same. If not, resets it. Module caches are + /// flushed in case of resets. + /// 4. Checks sizes of type and module caches. If they are too large, caches are flushed. + fn check_ready( + &mut self, storage_environment: AptosEnvironment, config: &BlockExecutorModuleCacheLocalConfig, - ) -> Result<(AptosEnvironment, Arc>), VMStatus> { - let state = self.state.lock(); - if !matches!(state.deref(), State::Ready(_)) { - let msg = format!( - "Expected ready state to check caches, got {:?}", - state.deref() - ); - return Err(VMStatus::error( - UNKNOWN_INVARIANT_VIOLATION_ERROR, - Some(msg), - )); + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result<(), VMStatus> { + // If we execute non-consecutive sequence of transactions, we need to flush everything. + if !transaction_slice_metadata.is_immediately_after(&self.transaction_slice_metadata) { + self.module_cache.flush(); + self.environment = None; } + // Record the new metadata for this slice of transactions. + self.transaction_slice_metadata = transaction_slice_metadata; - let environment = self.get_or_initialize_environment(storage_environment); - let module_cache = self.module_cache.clone(); + // Next, check the environment. If the current environment has not been set, or is + // different, we reset it to the new one, and flush the module cache. + let environment_requires_update = self + .environment + .as_ref() + .map_or(true, |environment| environment != &storage_environment); + if environment_requires_update { + self.environment = Some(storage_environment); + self.module_cache.flush(); + } - // Check 1: struct re-indexing map is not too large. If it is, we flush the cache. Also, we - // need to flush modules because they store indices into re-indexing map. + let environment = self.environment.as_ref().expect("Environment must be set"); let runtime_environment = environment.runtime_environment(); + let struct_name_index_map_size = runtime_environment .struct_name_index_map_size() .map_err(|err| err.finish(Location::Undefined).into_vm_status())?; STRUCT_NAME_INDEX_MAP_NUM_ENTRIES.set(struct_name_index_map_size as i64); + // If the environment caches too many struct names, flush type caches. Also flush module + // caches because they contain indices for struct names. if struct_name_index_map_size > config.max_struct_name_index_map_num_entries { - module_cache.flush_unsync(); runtime_environment.flush_struct_name_and_info_caches(); + self.module_cache.flush(); } - // Check 2: If the module cache is too big, flush it. - let module_cache_size_in_bytes = module_cache.size_in_bytes(); + let module_cache_size_in_bytes = self.module_cache.size_in_bytes(); GLOBAL_MODULE_CACHE_SIZE_IN_BYTES.set(module_cache_size_in_bytes as i64); - GLOBAL_MODULE_CACHE_NUM_MODULES.set(module_cache.num_modules() as i64); + GLOBAL_MODULE_CACHE_NUM_MODULES.set(self.module_cache.num_modules() as i64); + // If module cache stores too many modules, flush it as well. if module_cache_size_in_bytes > config.max_module_cache_size_in_bytes { - module_cache.flush_unsync(); + self.module_cache.flush(); } - Ok((environment, module_cache)) + Ok(()) } +} - /// If state is [State::Ready], changes it to [State::Executing] with the same value, returning - /// true. Otherwise, returns false indicating that state transition failed, also raising an - /// alert. - pub fn mark_executing(&self) -> bool { - let mut state = self.state.lock(); - if let State::Ready(v) = state.deref_mut() { - *state = State::Executing(mem::take(v)); - true - } else { - alert_or_println!("Unable to mark executing, state: {:?}", state); - false +/// Module cache manager used by Aptos block executor. Ensures that only one thread has exclusive +/// access to it at a time. +pub struct AptosModuleCacheManager { + inner: Mutex>, +} + +impl AptosModuleCacheManager { + /// Returns a new manager in its default (empty) state. + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + inner: Mutex::new(ModuleCacheManager::new()), } } - /// If state is [State::Executing], changes it to [State::Done] with the same value, returning - /// true. Otherwise, returns false indicating that state transition failed, also raising an - /// alert. - pub fn mark_done(&self) -> bool { - let mut state = self.state.lock(); - if let State::Executing(v) = state.deref_mut() { - *state = State::Done(mem::take(v)); - true - } else { - alert_or_println!("Unable to mark done, state: {:?}", state); - false + /// Tries to lock the manager. If succeeds, checks if the manager (caches, environment, etc.) + /// is ready for execution and updates states. If fails, [AptosModuleCacheManagerGuard::None] + /// is returned with an empty module cache. + fn try_lock_inner( + &self, + state_view: &impl StateView, + config: &BlockExecutorModuleCacheLocalConfig, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + // Get the current environment from storage. + let storage_environment = + AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view); + + Ok(match self.inner.try_lock() { + Some(mut guard) => { + guard.check_ready(storage_environment, config, transaction_slice_metadata)?; + AptosModuleCacheManagerGuard::Guard { guard } + }, + None => { + // TODO(loader_v2): Should we return an error here instead? + alert_or_println!("Locking module cache manager failed, fallback to empty caches"); + + // If this is true, we failed to acquire a lock, and so default storage environment + // and empty (thread-local) module caches will be used. + AptosModuleCacheManagerGuard::None { + environment: storage_environment, + module_cache: GlobalModuleCache::empty(), + } + }, + }) + } + + /// Tries to lock the manager using [AptosModuleCacheManager::try_lock_inner]. Additionally, if + /// the module cache is empty, can prefetch Aptos framework into it. + pub fn try_lock( + &self, + state_view: &impl StateView, + config: &BlockExecutorModuleCacheLocalConfig, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let mut guard = self.try_lock_inner(state_view, config, transaction_slice_metadata)?; + + // To avoid cold starts, fetch the framework code. This ensures the state with 0 modules + // cached is not possible for block execution (as long as the config enables the framework + // prefetch). + let environment = guard.environment(); + if environment.features().is_loader_v2_enabled() + && guard.module_cache().num_modules() == 0 + && config.prefetch_framework_code + { + let code_storage = state_view.as_aptos_code_storage(environment.clone()); + prefetch_aptos_framework(code_storage, guard.module_cache_mut()).map_err(|err| { + alert_or_println!("Failed to load Aptos framework to module cache: {:?}", err); + VMError::from(err).into_vm_status() + })?; } + + Ok(guard) } +} + +/// A guard that can be acquired from [AptosModuleCacheManager]. Variants represent successful and +/// no-successful lock acquisition. +pub enum AptosModuleCacheManagerGuard<'a> { + /// Holds the guard to the [AptosModuleCacheManager], and has exclusive access to it. + Guard { + guard: MutexGuard< + 'a, + ModuleCacheManager, + >, + }, + /// Either there is no [AptosModuleCacheManager], or acquiring the lock for it failed. + None { + environment: AptosEnvironment, + module_cache: GlobalModuleCache, + }, +} - /// Returns the cached global environment if it already exists, and matches the one in storage. - /// If it does not exist, or does not match, the new environment is initialized from the given - /// state, cached, and returned. Should be called when in [State::Ready] state, under lock. - fn get_or_initialize_environment( +impl<'a> AptosModuleCacheManagerGuard<'a> { + /// Returns the references to the environment. If environment is not set, panics. + pub fn environment(&self) -> &AptosEnvironment { + use AptosModuleCacheManagerGuard::*; + match self { + Guard { guard } => guard + .environment + .as_ref() + .expect("Guard always has environment set"), + None { environment, .. } => environment, + } + } + + /// Returns the references to the module cache. + pub fn module_cache( &self, - storage_environment: AptosEnvironment, - ) -> AptosEnvironment { - let mut guard = self.environment.acquire(); - let existing_environment = guard.deref_mut(); + ) -> &GlobalModuleCache { + use AptosModuleCacheManagerGuard::*; + match self { + Guard { guard } => &guard.module_cache, + None { module_cache, .. } => module_cache, + } + } - let environment_requires_update = existing_environment - .as_ref() - .map_or(true, |environment| environment != &storage_environment); - if environment_requires_update { - *existing_environment = Some(storage_environment); + /// Returns the mutable references to the module cache. + pub fn module_cache_mut( + &mut self, + ) -> &mut GlobalModuleCache { + use AptosModuleCacheManagerGuard::*; + match self { + Guard { guard } => &mut guard.module_cache, + None { module_cache, .. } => module_cache, + } + } - // If this environment has been (re-)initialized, we need to flush the module cache - // because it can contain now out-dated code. - self.module_cache.flush_unsync(); + /// A guard in [AptosModuleCacheManagerGuard::None] state with empty module cache and default + /// environment. Use for testing only. + #[cfg(test)] + pub(crate) fn none() -> Self { + use aptos_types::state_store::MockStateView; + AptosModuleCacheManagerGuard::None { + environment: AptosEnvironment::new(&MockStateView::empty()), + module_cache: GlobalModuleCache::empty(), } + } +} - existing_environment - .clone() - .expect("Environment must be set") +/// If Aptos framework exists, loads "transaction_validation.move" and all its transitive +/// dependencies from storage into provided module cache. If loading fails for any reason, a panic +/// error is returned. +fn prefetch_aptos_framework( + code_storage: AptosCodeStorageAdapter, + module_cache: &mut GlobalModuleCache, +) -> Result<(), PanicError> { + // If framework code exists in storage, the transitive closure will be verified and cached. + let maybe_loaded = code_storage + .fetch_verified_module(&AccountAddress::ONE, ident_str!("transaction_validation")) + .map_err(|err| { + // There should be no errors when pre-fetching the framework, if there are, we + // better return an error here. + PanicError::CodeInvariantError(format!("Unable to fetch Aptos framework: {:?}", err)) + })?; + + if maybe_loaded.is_some() { + // Framework must have been loaded. Drain verified modules from local cache into + // global cache. + let verified_module_code_iter = code_storage.into_verified_module_code_iter()?; + module_cache.insert_verified_unsync(verified_module_code_iter)?; } + Ok(()) } #[cfg(test)] mod test { use super::*; + use aptos_language_e2e_tests::{data_store::FakeDataStore, executor::FakeExecutor}; use aptos_types::{ on_chain_config::{FeatureFlag, Features, OnChainConfig}, state_store::{state_key::StateKey, state_value::StateValue, MockStateView}, }; use claims::assert_ok; - use move_core_types::{ - account_address::AccountAddress, identifier::Identifier, language_storage::ModuleId, - }; - use move_vm_types::{ - code::{mock_verified_code, MockDeserializedCode, MockExtension, MockVerifiedCode}, - loaded_data::runtime_types::StructIdentifier, - }; - use std::{collections::HashMap, thread, thread::JoinHandle}; - use test_case::test_case; - - #[test_case(None, None)] - #[test_case(None, Some(1))] - #[test_case(Some(0), None)] - #[test_case(Some(0), Some(1))] - #[test_case(Some(0), Some(0))] - fn test_mark_ready(recorded_previous: Option, previous: Option) { - let module_cache_manager = ModuleCacheManager::new(); - *module_cache_manager.state.lock() = State::Done(recorded_previous); - - // Pre-populate module cache to test flushing. - module_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - - assert!(!module_cache_manager.mark_executing()); - assert!(!module_cache_manager.mark_done()); - assert!(module_cache_manager.mark_ready(previous.as_ref(), Some(77))); - - // Only in matching case the module cache is not flushed. - if recorded_previous.is_some() && recorded_previous == previous { - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - } else { - assert_eq!(module_cache_manager.module_cache.num_modules(), 0); - } - - let state = module_cache_manager.state.lock().clone(); - assert_eq!(state, State::Ready(Some(77))) - } + use std::collections::HashMap; #[test] - fn test_check_ready() { - let state_view = MockStateView::empty(); - let config = BlockExecutorModuleCacheLocalConfig { - prefetch_framework_code: false, - max_module_cache_size_in_bytes: 8, - max_struct_name_index_map_num_entries: 2, - }; - - let module_cache_manager = ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new(); - - // Set up the state and the environment. - *module_cache_manager.state.lock() = State::Ready(None); - let environment = module_cache_manager.get_or_initialize_environment( - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view), - ); - - module_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(16))); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - - let runtime_environment = environment.runtime_environment(); - let dummy_struct_name = StructIdentifier { - module: ModuleId::new(AccountAddress::ONE, Identifier::new("foo").unwrap()), - name: Identifier::new("Bar").unwrap(), - }; - assert!(runtime_environment - .struct_name_to_idx_for_test(dummy_struct_name) - .is_ok()); - assert_eq!( - assert_ok!(runtime_environment.struct_name_index_map_size()), - 1 - ); - - // Module cache size in bytes is too large, should be flushed (but not struct types). - assert!(module_cache_manager - .check_ready_and_get_caches(environment.clone(), &config) - .is_ok()); - assert_eq!(module_cache_manager.module_cache.num_modules(), 0); - assert_eq!( - assert_ok!(runtime_environment.struct_name_index_map_size()), - 1 - ); + fn test_prefetch_existing_aptos_framework() { + let executor = FakeExecutor::from_head_genesis(); + let state_view = executor.get_state_view(); - module_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(4))); + let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); + let code_storage = state_view.as_aptos_code_storage(environment); - // This time size is less than the one specified in config. No flushing. - assert!(module_cache_manager - .check_ready_and_get_caches(environment.clone(), &config) - .is_ok()); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - assert_eq!( - assert_ok!(runtime_environment.struct_name_index_map_size()), - 1 - ); - - let dummy_struct_names = [ - StructIdentifier { - module: ModuleId::new(AccountAddress::ONE, Identifier::new("foo").unwrap()), - name: Identifier::new("Foo").unwrap(), - }, - StructIdentifier { - module: ModuleId::new(AccountAddress::ONE, Identifier::new("foo").unwrap()), - name: Identifier::new("Baz").unwrap(), - }, - ]; - for dummy_struct_name in dummy_struct_names { - assert!(runtime_environment - .struct_name_to_idx_for_test(dummy_struct_name) - .is_ok()); - } - assert_eq!( - assert_ok!(runtime_environment.struct_name_index_map_size()), - 3 - ); + let mut module_cache = GlobalModuleCache::empty(); + assert_eq!(module_cache.num_modules(), 0); - // Too many struct names cached. - assert!(module_cache_manager - .check_ready_and_get_caches(environment.clone(), &config) - .is_ok()); - assert_eq!(module_cache_manager.module_cache.num_modules(), 0); - assert_eq!( - assert_ok!(runtime_environment.struct_name_index_map_size()), - 0 - ); + let result = prefetch_aptos_framework(code_storage, &mut module_cache); + assert!(result.is_ok()); + assert!(module_cache.num_modules() > 0); } #[test] - fn test_mark_executing() { - let module_cache_manager = ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new(); - *module_cache_manager.state.lock() = State::Ready(Some(100)); - - assert!(!module_cache_manager.mark_ready(Some(&76), Some(77))); - assert!(!module_cache_manager.mark_done()); - - assert!(module_cache_manager.mark_executing()); - - let state = module_cache_manager.state.lock().clone(); - assert_eq!(state, State::Executing(Some(100))) - } + fn test_prefetch_non_existing_aptos_framework() { + let state_view = FakeDataStore::default(); - #[test] - fn test_mark_done() { - let module_cache_manager = ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new(); - *module_cache_manager.state.lock() = State::Executing(Some(100)); - - assert!(!module_cache_manager.mark_ready(Some(&76), Some(77))); - assert!(!module_cache_manager.mark_executing()); - - assert!(module_cache_manager.mark_done()); - - let state = module_cache_manager.state.lock().clone(); - assert_eq!(state, State::Done(Some(100))) - } - - /// Joins threads. Succeeds only if a single handle evaluates to [Ok] and the rest are [Err]s. - fn join_and_assert_single_true(handles: Vec>) { - let mut num_true = 0; - let mut num_false = 0; - - let num_handles = handles.len(); - for handle in handles { - if handle.join().unwrap() { - num_true += 1; - } else { - num_false += 1; - } - } - assert_eq!(num_true, 1); - assert_eq!(num_false, num_handles - 1); - } - - #[test] - fn test_mark_ready_concurrent() { - let global_cache_manager = Arc::new(ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new()); - - let mut handles = vec![]; - for _ in 0..32 { - let handle = thread::spawn({ - let global_cache_manager = global_cache_manager.clone(); - move || global_cache_manager.mark_ready(Some(&1), Some(2)) - }); - handles.push(handle); - } - join_and_assert_single_true(handles); - } + let environment = + AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view); + let code_storage = state_view.as_aptos_code_storage(environment); - #[test] - fn test_mark_executing_concurrent() { - let global_cache_manager = Arc::new(ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new()); - assert!(global_cache_manager.mark_ready(Some(&0), Some(1))); - - let mut handles = vec![]; - for _ in 0..32 { - let handle = thread::spawn({ - let global_cache_manager = global_cache_manager.clone(); - move || global_cache_manager.mark_executing() - }); - handles.push(handle); - } - join_and_assert_single_true(handles); - } + let mut module_cache = GlobalModuleCache::empty(); + assert_eq!(module_cache.num_modules(), 0); - #[test] - fn test_mark_done_concurrent() { - let global_cache_manager = Arc::new(ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new()); - assert!(global_cache_manager.mark_ready(Some(&0), Some(1))); - assert!(global_cache_manager.mark_executing()); - - let mut handles = vec![]; - for _ in 0..32 { - let handle = thread::spawn({ - let global_cache_manager = global_cache_manager.clone(); - move || global_cache_manager.mark_done() - }); - handles.push(handle); - } - join_and_assert_single_true(handles); + let result = prefetch_aptos_framework(code_storage, &mut module_cache); + assert!(result.is_ok()); + assert_eq!(module_cache.num_modules(), 0); } + #[allow(dead_code)] fn state_view_with_changed_feature_flag( feature_flag: Option, ) -> MockStateView { @@ -519,67 +368,28 @@ mod test { } #[test] - fn test_get_or_initialize_environment() { - let module_cache_manager = ModuleCacheManager::< - i32, - i32, - MockDeserializedCode, - MockVerifiedCode, - MockExtension, - >::new(); - *module_cache_manager.state.lock() = State::Ready(None); - - module_cache_manager - .module_cache - .insert(0, mock_verified_code(0, MockExtension::new(8))); - module_cache_manager - .module_cache - .insert(1, mock_verified_code(1, MockExtension::new(8))); - assert_eq!(module_cache_manager.module_cache.num_modules(), 2); - assert!(module_cache_manager.environment.acquire().is_none()); - - // Environment has to be set to the same value, cache flushed. - let state_view = state_view_with_changed_feature_flag(None); - let environment = module_cache_manager.get_or_initialize_environment( - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view), - ); - assert_eq!(module_cache_manager.module_cache.num_modules(), 0); - assert!(module_cache_manager - .environment - .acquire() - .as_ref() - .is_some_and(|cached_environment| cached_environment == &environment)); - - module_cache_manager - .module_cache - .insert(2, mock_verified_code(2, MockExtension::new(8))); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - assert!(module_cache_manager.environment.acquire().is_some()); - - // Environment has to be re-set to the new value, cache flushed. - let state_view = - state_view_with_changed_feature_flag(Some(FeatureFlag::CODE_DEPENDENCY_CHECK)); - let environment = module_cache_manager.get_or_initialize_environment( - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view), + fn test_check_ready_sets_transaction_slice_metadata() { + let state_view = MockStateView::empty(); + let config = BlockExecutorModuleCacheLocalConfig { + prefetch_framework_code: false, + max_module_cache_size_in_bytes: 8, + max_struct_name_index_map_num_entries: 2, + }; + + let manager = AptosModuleCacheManager::new(); + assert_eq!( + manager.inner.lock().transaction_slice_metadata, + TransactionSliceMetadata::Unknown ); - assert_eq!(module_cache_manager.module_cache.num_modules(), 0); - assert!(module_cache_manager - .environment - .acquire() - .as_ref() - .is_some_and(|cached_environment| cached_environment == &environment)); - module_cache_manager - .module_cache - .insert(3, mock_verified_code(3, MockExtension::new(8))); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - assert!(module_cache_manager.environment.acquire().is_some()); + let metadata_1 = TransactionSliceMetadata::block_from_u64(0, 1); + assert_ok!(manager.try_lock(&state_view, &config, metadata_1)); + assert_eq!(manager.inner.lock().transaction_slice_metadata, metadata_1); - // Environment is kept, and module caches are not flushed. - let new_environment = module_cache_manager.get_or_initialize_environment( - AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view), - ); - assert_eq!(module_cache_manager.module_cache.num_modules(), 1); - assert!(environment == new_environment); + let metadata_2 = TransactionSliceMetadata::block_from_u64(1, 2); + assert_ok!(manager.try_lock(&state_view, &config, metadata_2)); + assert_eq!(manager.inner.lock().transaction_slice_metadata, metadata_2); } + + // TODO(loader_v2): Add more unit tests like with previous commits. } diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 603bce0fbe1abc..2332e9ca721cdb 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -4,6 +4,7 @@ use crate::{ code_cache_global::GlobalModuleCache, + code_cache_global_manager::AptosModuleCacheManagerGuard, counters::{ self, BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, PARALLEL_EXECUTION_SECONDS, RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, @@ -44,6 +45,7 @@ use aptos_types::{ vm::modules::AptosModuleExtension, write_set::{TransactionWrite, WriteOp}, }; +use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::{alert, clear_speculative_txn_logs, init_speculative_logs, prelude::*}; use aptos_vm_types::{ change_set::randomly_check_layout_matches, module_write_set::ModuleWrite, @@ -74,8 +76,6 @@ pub struct BlockExecutor { // threads that may be concurrently participating in parallel execution. config: BlockExecutorConfig, executor_thread_pool: Arc, - global_module_cache: - Arc>, transaction_commit_hook: Option, phantom: PhantomData<(T, E, S, L, X)>, } @@ -93,9 +93,6 @@ where pub fn new( config: BlockExecutorConfig, executor_thread_pool: Arc, - global_module_cache: Arc< - GlobalModuleCache, - >, transaction_commit_hook: Option, ) -> Self { assert!( @@ -106,7 +103,6 @@ where Self { config, executor_thread_pool, - global_module_cache, transaction_commit_hook, phantom: PhantomData, } @@ -560,6 +556,7 @@ where /// in outputs, which is heavier (due to serialization / deserialization, copies, etc). Moreover, /// since prepare_and_queue_commit_ready_txns takes care of synchronization in the flag-combining /// way, the materialization can be almost embarrassingly parallelizable. + #[allow(clippy::too_many_arguments)] fn prepare_and_queue_commit_ready_txns( &self, block_gas_limit_type: &BlockGasLimitType, @@ -569,6 +566,12 @@ where last_input_output: &TxnLastInputOutput, shared_commit_state: &ExplicitSyncWrapper>, base_view: &S, + global_module_cache: &GlobalModuleCache< + ModuleId, + CompiledModule, + Module, + AptosModuleExtension, + >, runtime_environment: &RuntimeEnvironment, start_shared_counter: u32, shared_counter: &AtomicU32, @@ -605,7 +608,7 @@ where versioned_cache, executor, base_view, - self.global_module_cache.as_ref(), + global_module_cache, runtime_environment, ParallelState::new( versioned_cache, @@ -624,7 +627,7 @@ where Self::publish_module_writes( txn_idx, module_write_set, - self.global_module_cache.as_ref(), + global_module_cache, versioned_cache, scheduler, runtime_environment, @@ -637,7 +640,7 @@ where let validation_result = Self::validate( txn_idx, last_input_output, - self.global_module_cache.as_ref(), + global_module_cache, versioned_cache, scheduler, ); @@ -666,7 +669,7 @@ where Self::publish_module_writes( txn_idx, module_write_set, - self.global_module_cache.as_ref(), + global_module_cache, versioned_cache, scheduler, runtime_environment, @@ -841,6 +844,12 @@ where shared_counter: &AtomicU32, last_input_output: &TxnLastInputOutput, base_view: &S, + global_module_cache: &GlobalModuleCache< + ModuleId, + CompiledModule, + Module, + AptosModuleExtension, + >, runtime_environment: &RuntimeEnvironment, final_results: &ExplicitSyncWrapper>, ) -> Result<(), PanicError> { @@ -852,7 +861,7 @@ where ); let latest_view = LatestView::new( base_view, - self.global_module_cache.as_ref(), + global_module_cache, runtime_environment, ViewState::Sync(parallel_state), txn_idx, @@ -939,13 +948,19 @@ where fn worker_loop( &self, - env: &E::Environment, + environment: &AptosEnvironment, block: &[T], last_input_output: &TxnLastInputOutput, versioned_cache: &MVHashMap, scheduler: &Scheduler, // TODO: should not need to pass base view. base_view: &S, + global_module_cache: &GlobalModuleCache< + ModuleId, + CompiledModule, + Module, + AptosModuleExtension, + >, start_shared_counter: u32, shared_counter: &AtomicU32, shared_commit_state: &ExplicitSyncWrapper>, @@ -955,11 +970,11 @@ where // Make executor for each task. TODO: fast concurrent executor. let num_txns = block.len(); let init_timer = VM_INIT_SECONDS.start_timer(); - let executor = E::init(env.clone(), base_view); + let executor = E::init(environment.clone(), base_view); drop(init_timer); // Shared environment used by each executor. - let runtime_environment = env.runtime_environment(); + let runtime_environment = environment.runtime_environment(); let _timer = WORK_WITH_TASK_SECONDS.start_timer(); let mut scheduler_task = SchedulerTask::Retry; @@ -974,6 +989,7 @@ where shared_counter, last_input_output, base_view, + global_module_cache, runtime_environment, final_results, )?; @@ -1001,6 +1017,7 @@ where last_input_output, shared_commit_state, base_view, + global_module_cache, runtime_environment, start_shared_counter, shared_counter, @@ -1018,7 +1035,7 @@ where let valid = Self::validate( txn_idx, last_input_output, - self.global_module_cache.as_ref(), + global_module_cache, versioned_cache, scheduler, ); @@ -1046,7 +1063,7 @@ where versioned_cache, &executor, base_view, - self.global_module_cache.as_ref(), + global_module_cache, runtime_environment, ParallelState::new( versioned_cache, @@ -1081,9 +1098,9 @@ where pub(crate) fn execute_transactions_parallel( &self, - env: &E::Environment, signature_verified_block: &[T], base_view: &S, + module_cache_manager_guard: &mut AptosModuleCacheManagerGuard, ) -> Result, ()> { let _timer = PARALLEL_EXECUTION_SECONDS.start_timer(); // Using parallel execution with 1 thread currently will not work as it @@ -1130,12 +1147,13 @@ where for _ in 0..num_workers { s.spawn(|_| { if let Err(err) = self.worker_loop( - env, + module_cache_manager_guard.environment(), signature_verified_block, &last_input_output, &versioned_cache, &scheduler, base_view, + module_cache_manager_guard.module_cache(), start_shared_counter, &shared_counter, &shared_commit_state, @@ -1168,7 +1186,8 @@ where } counters::update_state_counters(versioned_cache.stats(), true); - self.global_module_cache + module_cache_manager_guard + .module_cache_mut() .insert_verified_unsync(versioned_cache.take_modules_iter()) .map_err(|err| { alert!("[BlockSTM] Encountered panic error: {:?}", err); @@ -1339,18 +1358,19 @@ where pub(crate) fn execute_transactions_sequential( &self, - env: &E::Environment, signature_verified_block: &[T], base_view: &S, + module_cache_manager_guard: &mut AptosModuleCacheManagerGuard, resource_group_bcs_fallback: bool, ) -> Result, SequentialBlockExecutionError> { let num_txns = signature_verified_block.len(); + let init_timer = VM_INIT_SECONDS.start_timer(); - let executor = E::init(env.clone(), base_view); + let environment = module_cache_manager_guard.environment(); + let executor = E::init(environment.clone(), base_view); drop(init_timer); - let runtime_environment = env.runtime_environment(); - + let runtime_environment = environment.runtime_environment(); let start_counter = gen_id_start_value(true); let counter = RefCell::new(start_counter); let unsync_map = UnsyncMap::new(); @@ -1366,7 +1386,7 @@ where for (idx, txn) in signature_verified_block.iter().enumerate() { let latest_view = LatestView::::new( base_view, - self.global_module_cache.as_ref(), + module_cache_manager_guard.module_cache(), runtime_environment, ViewState::Unsync(SequentialState::new(&unsync_map, start_counter, &counter)), idx as TxnIndex, @@ -1558,7 +1578,7 @@ where Self::apply_output_sequential( idx as TxnIndex, runtime_environment, - self.global_module_cache.as_ref(), + module_cache_manager_guard.module_cache(), &unsync_map, &output, resource_write_set.clone(), @@ -1646,7 +1666,8 @@ where ret.resize_with(num_txns, E::Output::skip_output); counters::update_state_counters(unsync_map.stats(), false); - self.global_module_cache + module_cache_manager_guard + .module_cache_mut() .insert_verified_unsync(unsync_map.into_modules_iter())?; let block_end_info = if self @@ -1683,15 +1704,18 @@ where pub fn execute_block( &self, - env: E::Environment, signature_verified_block: &[T], base_view: &S, + module_cache_manager_guard: &mut AptosModuleCacheManagerGuard, ) -> BlockExecutionResult, E::Error> { let _timer = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.start_timer(); if self.config.local.concurrency_level > 1 { - let parallel_result = - self.execute_transactions_parallel(&env, signature_verified_block, base_view); + let parallel_result = self.execute_transactions_parallel( + signature_verified_block, + base_view, + module_cache_manager_guard, + ); // If parallel gave us result, return it if let Ok(output) = parallel_result { @@ -1706,17 +1730,23 @@ where // Clear by re-initializing the speculative logs. init_speculative_logs(signature_verified_block.len()); - // Flush the cache and the environment to re-run from the "clean" state. - env.runtime_environment() + // Flush all caches to re-run from the "clean" state. + module_cache_manager_guard + .environment() + .runtime_environment() .flush_struct_name_and_info_caches(); - self.global_module_cache.flush_unsync(); + module_cache_manager_guard.module_cache_mut().flush(); info!("parallel execution requiring fallback"); } // If we didn't run parallel, or it didn't finish successfully - run sequential - let sequential_result = - self.execute_transactions_sequential(&env, signature_verified_block, base_view, false); + let sequential_result = self.execute_transactions_sequential( + signature_verified_block, + base_view, + module_cache_manager_guard, + false, + ); // If sequential gave us result, return it let sequential_error = match sequential_result { @@ -1735,9 +1765,9 @@ where init_speculative_logs(signature_verified_block.len()); let sequential_result = self.execute_transactions_sequential( - &env, signature_verified_block, base_view, + module_cache_manager_guard, true, ); diff --git a/aptos-move/block-executor/src/proptest_types/bencher.rs b/aptos-move/block-executor/src/proptest_types/bencher.rs index 527c926f1b636a..8450dadcc233d7 100644 --- a/aptos-move/block-executor/src/proptest_types/bencher.rs +++ b/aptos-move/block-executor/src/proptest_types/bencher.rs @@ -3,13 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - code_cache_global::GlobalModuleCache, + code_cache_global_manager::AptosModuleCacheManagerGuard, executor::BlockExecutor, proptest_types::{ baseline::BaselineOutput, types::{ - KeyType, MockEnvironment, MockOutput, MockTask, MockTransaction, TransactionGen, - TransactionGenParams, + KeyType, MockOutput, MockTask, MockTransaction, TransactionGen, TransactionGenParams, }, }, txn_commit_hook::NoOpTransactionCommitHook, @@ -126,18 +125,18 @@ where .build() .unwrap(), ); - let global_module_cache = Arc::new(GlobalModuleCache::empty()); let config = BlockExecutorConfig::new_no_block_limit(num_cpus::get()); - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, E>, MockTask, E>, MockStateView>, NoOpTransactionCommitHook, E>, usize>, ExecutableTestType, - >::new(config, executor_thread_pool, global_module_cache, None) - .execute_transactions_parallel(&env, &self.transactions, &state_view); + >::new(config, executor_thread_pool, None) + .execute_transactions_parallel(&self.transactions, &state_view, &mut guard); self.baseline_output.assert_parallel_output(&output); } diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index 5d83c2fe50578a..e7450bf9bb42b5 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -3,15 +3,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - code_cache_global::GlobalModuleCache, + code_cache_global_manager::AptosModuleCacheManagerGuard, errors::SequentialBlockExecutionError, executor::BlockExecutor, proptest_types::{ baseline::BaselineOutput, types::{ - DeltaDataView, KeyType, MockEnvironment, MockEvent, MockOutput, MockTask, - MockTransaction, NonEmptyGroupDataView, TransactionGen, TransactionGenParams, - MAX_GAS_PER_TXN, + DeltaDataView, KeyType, MockEvent, MockOutput, MockTask, MockTransaction, + NonEmptyGroupDataView, TransactionGen, TransactionGenParams, MAX_GAS_PER_TXN, }, }, txn_commit_hook::NoOpTransactionCommitHook, @@ -70,7 +69,8 @@ fn run_transactions( ); for _ in 0..num_repeat { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, E>, MockTask, E>, @@ -80,10 +80,9 @@ fn run_transactions( >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &state_view); + .execute_transactions_parallel(&transactions, &state_view, &mut guard); if module_access.0 && module_access.1 { assert_matches!(output, Err(())); @@ -207,7 +206,8 @@ fn deltas_writes_mixed_with_block_gas_limit(num_txns: usize, maybe_block_gas_lim ); for _ in 0..20 { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -217,10 +217,9 @@ fn deltas_writes_mixed_with_block_gas_limit(num_txns: usize, maybe_block_gas_lim >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); BaselineOutput::generate(&transactions, maybe_block_gas_limit) .assert_parallel_output(&output); @@ -260,7 +259,8 @@ fn deltas_resolver_with_block_gas_limit(num_txns: usize, maybe_block_gas_limit: ); for _ in 0..20 { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -270,10 +270,9 @@ fn deltas_resolver_with_block_gas_limit(num_txns: usize, maybe_block_gas_limit: >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); BaselineOutput::generate(&transactions, maybe_block_gas_limit) .assert_parallel_output(&output); @@ -418,7 +417,7 @@ fn publishing_fixed_params_with_block_gas_limit( ); // Confirm still no intersection - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -428,10 +427,9 @@ fn publishing_fixed_params_with_block_gas_limit( >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), maybe_block_gas_limit), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); assert_ok!(output); // Adjust the reads of txn indices[2] to contain module read to key 42. @@ -462,7 +460,8 @@ fn publishing_fixed_params_with_block_gas_limit( ); for _ in 0..200 { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -475,10 +474,9 @@ fn publishing_fixed_params_with_block_gas_limit( Some(max(w_index, r_index) as u64 * MAX_GAS_PER_TXN + 1), ), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) // Ensure enough gas limit to commit the module txns (4 is maximum gas per txn) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); assert_matches!(output, Err(())); } @@ -546,7 +544,8 @@ fn non_empty_group( ); for _ in 0..num_repeat_parallel { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -556,16 +555,16 @@ fn non_empty_group( >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); BaselineOutput::generate(&transactions, None).assert_parallel_output(&output); } for _ in 0..num_repeat_sequential { - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = BlockExecutor::< MockTransaction, MockEvent>, MockTask, MockEvent>, @@ -575,10 +574,9 @@ fn non_empty_group( >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool.clone(), - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_sequential(&env, &transactions, &data_view, false); + .execute_transactions_sequential(&transactions, &data_view, &mut guard, false); // TODO: test dynamic disabled as well. BaselineOutput::generate(&transactions, None).assert_output(&output.map_err(|e| match e { diff --git a/aptos-move/block-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs index 27e6bdfb53653c..ec416785fd80aa 100644 --- a/aptos-move/block-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -25,6 +25,7 @@ use aptos_types::{ transaction::BlockExecutableTransaction as Transaction, write_set::{TransactionWrite, WriteOp, WriteOpKind}, }; +use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_types::{ module_and_script_storage::code_storage::AptosCodeStorage, module_write_set::ModuleWrite, @@ -38,7 +39,6 @@ use claims::{assert_ge, assert_le, assert_ok}; use move_core_types::{ ident_str, identifier::IdentStr, language_storage::ModuleId, value::MoveTypeLayout, }; -use move_vm_runtime::{RuntimeEnvironment, WithRuntimeEnvironment}; use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use once_cell::sync::OnceCell; use proptest::{arbitrary::Arbitrary, collection::vec, prelude::*, proptest, sample::Index}; @@ -833,36 +833,16 @@ impl MockTask { } } -#[derive(Clone)] -pub(crate) struct MockEnvironment { - runtime_environment: RuntimeEnvironment, -} - -impl MockEnvironment { - pub(crate) fn new() -> Self { - Self { - runtime_environment: RuntimeEnvironment::new(vec![]), - } - } -} - -impl WithRuntimeEnvironment for MockEnvironment { - fn runtime_environment(&self) -> &RuntimeEnvironment { - &self.runtime_environment - } -} - impl ExecutorTask for MockTask where K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + Debug + 'static, E: Send + Sync + Debug + Clone + TransactionEvent + 'static, { - type Environment = MockEnvironment; type Error = usize; type Output = MockOutput; type Txn = MockTransaction; - fn init(_env: Self::Environment, _state_view: &impl TStateView) -> Self { + fn init(_environment: AptosEnvironment, _state_view: &impl TStateView) -> Self { Self::new() } diff --git a/aptos-move/block-executor/src/task.rs b/aptos-move/block-executor/src/task.rs index 9767921be647a8..3599420e28c3fd 100644 --- a/aptos-move/block-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -14,13 +14,13 @@ use aptos_types::{ transaction::BlockExecutableTransaction as Transaction, write_set::WriteOp, }; +use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_types::{ module_and_script_storage::code_storage::AptosCodeStorage, module_write_set::ModuleWrite, resolver::{ResourceGroupSize, TExecutorView, TResourceGroupView}, }; use move_core_types::{value::MoveTypeLayout, vm_status::StatusCode}; -use move_vm_runtime::WithRuntimeEnvironment; use std::{ collections::{BTreeMap, HashSet}, fmt::Debug, @@ -64,13 +64,9 @@ pub trait ExecutorTask: Sync { /// Type of error when the executor failed to process a transaction and needs to abort. type Error: Debug + Clone + Send + Sync + Eq + 'static; - /// Type to initialize the single thread transaction executor. Clone and Sync are required because - /// we will create an instance of executor on each individual thread. - type Environment: Sync + Clone + WithRuntimeEnvironment; - /// Create an instance of the transaction executor. fn init( - env: Self::Environment, + environment: AptosEnvironment, state_view: &impl TStateView::Key>, ) -> Self; diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index 0cb4a946dbc272..31a4622a6e1f52 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -5,14 +5,14 @@ mod code_cache_tests; use crate::{ - code_cache_global::GlobalModuleCache, + code_cache_global_manager::AptosModuleCacheManagerGuard, errors::SequentialBlockExecutionError, executor::BlockExecutor, proptest_types::{ baseline::BaselineOutput, types::{ - DeltaDataView, KeyType, MockEnvironment, MockEvent, MockIncarnation, MockOutput, - MockTask, MockTransaction, NonEmptyGroupDataView, ValueType, + DeltaDataView, KeyType, MockEvent, MockIncarnation, MockOutput, MockTask, + MockTransaction, NonEmptyGroupDataView, ValueType, }, }, scheduler::{ @@ -87,18 +87,19 @@ fn test_resource_group_deletion() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ); - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); assert_ok!(block_executor.execute_transactions_sequential( - &env, &transactions, &data_view, + &mut guard, false )); - assert_ok!(block_executor.execute_transactions_parallel(&env, &transactions, &data_view)); + + let mut guard = AptosModuleCacheManagerGuard::none(); + assert_ok!(block_executor.execute_transactions_parallel(&transactions, &data_view, &mut guard)); } #[test] @@ -154,13 +155,13 @@ fn resource_group_bcs_fallback() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ); // Execute the block normally. - let env = MockEnvironment::new(); - let output = block_executor.execute_transactions_parallel(&env, &transactions, &data_view); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = + block_executor.execute_transactions_parallel(&transactions, &data_view, &mut guard); match output { Ok(block_output) => { let txn_outputs = block_output.into_transaction_outputs_forced(); @@ -178,22 +179,27 @@ fn resource_group_bcs_fallback() { fail::cfg("fail-point-resource-group-serialization", "return()").unwrap(); assert!(!fail::list().is_empty()); - let env = MockEnvironment::new(); - let par_output = block_executor.execute_transactions_parallel(&env, &transactions, &data_view); + let mut guard = AptosModuleCacheManagerGuard::none(); + let par_output = + block_executor.execute_transactions_parallel(&transactions, &data_view, &mut guard); assert_matches!(par_output, Err(())); - let env = MockEnvironment::new(); - let seq_output = - block_executor.execute_transactions_sequential(&env, &transactions, &data_view, false); + let mut guard = AptosModuleCacheManagerGuard::none(); + let seq_output = block_executor.execute_transactions_sequential( + &transactions, + &data_view, + &mut guard, + false, + ); assert_matches!( seq_output, Err(SequentialBlockExecutionError::ResourceGroupSerializationError) ); // Now execute with fallback handling for resource group serialization error: - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); let fallback_output = block_executor - .execute_transactions_sequential(&env, &transactions, &data_view, true) + .execute_transactions_sequential(&transactions, &data_view, &mut guard, true) .map_err(|e| match e { SequentialBlockExecutionError::ResourceGroupSerializationError => { panic!("Unexpected error") @@ -201,8 +207,8 @@ fn resource_group_bcs_fallback() { SequentialBlockExecutionError::ErrorToReturn(err) => err, }); - let env = MockEnvironment::new(); - let fallback_output_block = block_executor.execute_block(env, &transactions, &data_view); + let mut guard = AptosModuleCacheManagerGuard::none(); + let fallback_output_block = block_executor.execute_block(&transactions, &data_view, &mut guard); for output in [fallback_output, fallback_output_block] { match output { Ok(block_output) => { @@ -254,7 +260,6 @@ fn block_output_err_precedence() { >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ); @@ -264,8 +269,9 @@ fn block_output_err_precedence() { assert!(!fail::list().is_empty()); // Pause the thread that processes the aborting txn1, so txn2 can halt the scheduler first. // Confirm that the fatal VM error is still detected and sequential fallback triggered. - let env = MockEnvironment::new(); - let output = block_executor.execute_transactions_parallel(&env, &transactions, &data_view); + let mut guard = AptosModuleCacheManagerGuard::none(); + let output = + block_executor.execute_transactions_parallel(&transactions, &data_view, &mut guard); assert_matches!(output, Err(())); scenario.teardown(); } @@ -294,13 +300,12 @@ fn skip_rest_gas_limit() { >::new( BlockExecutorConfig::new_maybe_block_limit(num_cpus::get(), Some(5)), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ); // Should hit block limit on the skip transaction. - let env = MockEnvironment::new(); - let _ = block_executor.execute_transactions_parallel(&env, &transactions, &data_view); + let mut guard = AptosModuleCacheManagerGuard::none(); + let _ = block_executor.execute_transactions_parallel(&transactions, &data_view, &mut guard); } // TODO: add unit test for block gas limit! @@ -320,7 +325,7 @@ where .unwrap(), ); - let env = MockEnvironment::new(); + let mut guard = AptosModuleCacheManagerGuard::none(); let output = BlockExecutor::< MockTransaction, MockTask, @@ -330,10 +335,9 @@ where >::new( BlockExecutorConfig::new_no_block_limit(num_cpus::get()), executor_thread_pool, - Arc::new(GlobalModuleCache::empty()), None, ) - .execute_transactions_parallel(&env, &transactions, &data_view); + .execute_transactions_parallel(&transactions, &data_view, &mut guard); let baseline = BaselineOutput::generate(&transactions, None); baseline.assert_parallel_output(&output); diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index da4598abf98c54..a6245c55085003 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -15,7 +15,7 @@ use crate::{ use aptos_abstract_gas_usage::CalibrationAlgebra; use aptos_bitvec::BitVec; use aptos_block_executor::{ - code_cache_global_manager::ModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_crypto::HashValue; use aptos_framework::ReleaseBundle; @@ -28,9 +28,12 @@ use aptos_types::{ new_block_event_key, AccountResource, CoinInfoResource, CoinStoreResource, ConcurrentSupplyResource, NewBlockEvent, ObjectGroupResource, CORE_CODE_ADDRESS, }, - block_executor::config::{ - BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, - BlockExecutorModuleCacheLocalConfig, + block_executor::{ + config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + execution_state::TransactionSliceMetadata, }, block_metadata::BlockMetadata, chain_id::ChainId, @@ -648,10 +651,9 @@ impl FakeExecutor { txn_block, &state_view, // Do not use shared module caches in tests. - &ModuleCacheManager::new(), + &AptosModuleCacheManager::new(), config, - None, - None, + TransactionSliceMetadata::unknown(), None, ) .map(BlockOutput::into_transaction_outputs_forced) diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index 38e1b08b4c393e..d2d3f12b2f6870 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -6,12 +6,12 @@ use crate::{ metrics::TIMER, }; use anyhow::Result; -use aptos_crypto::HashValue; use aptos_types::{ account_address::AccountAddress, account_config::{DepositEvent, WithdrawEvent}, block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::PartitionedTransactions, }, contract_event::ContractEvent, event::EventKey, @@ -360,8 +360,7 @@ impl VMBlockExecutor for NativeExecutor { transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), _onchain_config: BlockExecutorConfigFromOnchain, - _parent_block: Option<&HashValue>, - _current_block: Option, + _transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { let transaction_outputs = NATIVE_EXECUTOR_POOL .install(|| { diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 45738dd6ca6a53..352f42d93e6433 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -30,7 +30,10 @@ use aptos_storage_interface::{ async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView, DbReaderWriter, }; use aptos_types::{ - block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock}, + block_executor::{ + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::ExecutableBlock, + }, ledger_info::LedgerInfoWithSignatures, state_store::StateViewId, }; @@ -237,8 +240,7 @@ where transactions, state_view, onchain_config.clone(), - Some(&parent_block_id), - Some(block_id), + TransactionSliceMetadata::block(parent_block_id, block_id), )? }; diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index 55c8e005ede28f..b094eb7e5b5c8b 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -28,7 +28,9 @@ use aptos_storage_interface::{ state_delta::StateDelta, DbReaderWriter, }; use aptos_types::{ - block_executor::config::BlockExecutorConfigFromOnchain, + block_executor::{ + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + }, contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, state_store::StateViewId, @@ -603,8 +605,7 @@ impl ChunkExecutorInner { txns.into(), state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), - None, - None, + TransactionSliceMetadata::chunk(begin_version, end_version), )?; // not `zip_eq`, deliberately for (version, txn_out, txn_info, write_set, events) in multizip(( diff --git a/execution/executor/src/chunk_executor/transaction_chunk.rs b/execution/executor/src/chunk_executor/transaction_chunk.rs index 46d84d4fc1ae19..974203e2a7f0e3 100644 --- a/execution/executor/src/chunk_executor/transaction_chunk.rs +++ b/execution/executor/src/chunk_executor/transaction_chunk.rs @@ -11,7 +11,9 @@ use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::cached_state_view::CachedStateView; use aptos_types::{ - block_executor::config::BlockExecutorConfigFromOnchain, + block_executor::{ + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + }, transaction::{Transaction, TransactionOutput, Version}, }; use aptos_vm::VMBlockExecutor; @@ -88,8 +90,7 @@ impl TransactionChunk for ChunkToExecute { sig_verified_txns.into(), state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), - None, - None, + TransactionSliceMetadata::unknown(), ) } } diff --git a/execution/executor/src/db_bootstrapper/mod.rs b/execution/executor/src/db_bootstrapper/mod.rs index ccb6bc61950143..28118993efcad6 100644 --- a/execution/executor/src/db_bootstrapper/mod.rs +++ b/execution/executor/src/db_bootstrapper/mod.rs @@ -18,7 +18,9 @@ use aptos_storage_interface::{ use aptos_types::{ account_config::CORE_CODE_ADDRESS, aggregate_signature::AggregateSignature, - block_executor::config::BlockExecutorConfigFromOnchain, + block_executor::{ + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + }, block_info::{BlockInfo, GENESIS_EPOCH, GENESIS_ROUND, GENESIS_TIMESTAMP_USECS}, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, on_chain_config::ConfigurationResource, @@ -136,8 +138,7 @@ pub fn calculate_genesis( vec![genesis_txn.clone().into()].into(), base_state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), - None, - None, + TransactionSliceMetadata::unknown(), )?; ensure!( execution_output.num_transactions_to_commit() != 0, diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index d73e272557dabb..c9877f7dd04cba 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -9,7 +9,8 @@ use aptos_executor_types::BlockExecutorTrait; use aptos_storage_interface::{chunk_to_commit::ChunkToCommit, DbReader, DbReaderWriter, DbWriter}; use aptos_types::{ block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::PartitionedTransactions, }, ledger_info::LedgerInfoWithSignatures, state_store::StateView, @@ -79,8 +80,7 @@ impl VMBlockExecutor for FakeVM { _transactions: &[SignatureVerifiedTransaction], _state_view: &impl StateView, _onchain_config: BlockExecutorConfigFromOnchain, - _parent_block: Option<&HashValue>, - _current_block: Option, + _transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { Ok(BlockOutput::new(vec![], None)) } diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index 701c2e1d332fcc..d7281498bba626 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -6,12 +6,13 @@ mod mock_vm_test; use anyhow::Result; -use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, Uniform}; +use aptos_crypto::{ed25519::Ed25519PrivateKey, PrivateKey, Uniform}; use aptos_types::{ account_address::AccountAddress, account_config::NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG, block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::PartitionedTransactions, }, bytes::NumToBytes, chain_id::ChainId, @@ -67,8 +68,7 @@ impl VMBlockExecutor for MockVM { transactions: &[SignatureVerifiedTransaction], state_view: &impl StateView, _onchain_config: BlockExecutorConfigFromOnchain, - _parent_block: Option<&HashValue>, - _current_block: Option, + _transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { // output_cache is used to store the output of transactions so they are visible to later // transactions. diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index 366c891ff17605..17b953f6d459ae 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -18,7 +18,9 @@ use aptos_storage_interface::{ use aptos_types::{ account_address::AccountAddress, aggregate_signature::AggregateSignature, - block_executor::config::BlockExecutorConfigFromOnchain, + block_executor::{ + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + }, block_info::BlockInfo, bytes::NumToBytes, chain_id::ChainId, @@ -689,8 +691,7 @@ fn run_transactions_naive( ) .unwrap(), block_executor_onchain_config.clone(), - None, - None, + TransactionSliceMetadata::unknown(), ) .unwrap(); let output = ApplyExecutionOutput::run(out, &ledger_view).unwrap(); diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 5d2e962dff9890..f002b6311d1288 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -22,6 +22,7 @@ use aptos_storage_interface::cached_state_view::{CachedStateView, StateCache}; use aptos_types::{ block_executor::{ config::BlockExecutorConfigFromOnchain, + execution_state::TransactionSliceMetadata, partitioner::{ExecutableTransactions, PartitionedTransactions}, }, contract_event::ContractEvent, @@ -44,14 +45,12 @@ use std::{iter, sync::Arc}; pub struct DoGetExecutionOutput; impl DoGetExecutionOutput { - // Note: state checkpoint will be appended in when the current block is Some(..). pub fn by_transaction_execution( executor: &V, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result { let out = match transactions { ExecutableTransactions::Unsharded(txns) => { @@ -60,15 +59,14 @@ impl DoGetExecutionOutput { txns, state_view, onchain_config, - parent_block, - current_block, + transaction_slice_metadata, )? }, ExecutableTransactions::Sharded(txns) => Self::by_transaction_execution_sharded::( txns, state_view, onchain_config, - current_block, + transaction_slice_metadata.append_state_checkpoint_to_block(), )?, }; @@ -92,16 +90,16 @@ impl DoGetExecutionOutput { transactions: Vec, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result { + let append_state_checkpoint_to_block = + transaction_slice_metadata.append_state_checkpoint_to_block(); let block_output = Self::execute_block::( executor, &transactions, &state_view, onchain_config, - parent_block, - current_block, + transaction_slice_metadata, )?; let (transaction_outputs, block_end_info) = block_output.into_inner(); @@ -111,7 +109,7 @@ impl DoGetExecutionOutput { transaction_outputs, state_view.into_state_cache(), block_end_info, - current_block, + append_state_checkpoint_to_block, ) } @@ -212,16 +210,14 @@ impl DoGetExecutionOutput { transactions: &[SignatureVerifiedTransaction], state_view: &CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result> { let _timer = OTHER_TIMERS.timer_with(&["vm_execute_block"]); Ok(executor.execute_block( transactions, state_view, onchain_config, - parent_block, - current_block, + transaction_slice_metadata, )?) } @@ -235,8 +231,7 @@ impl DoGetExecutionOutput { transactions: &[SignatureVerifiedTransaction], state_view: &CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, - parent_block: Option<&HashValue>, - current_block: Option, + transaction_slice_metadata: TransactionSliceMetadata, ) -> Result> { use aptos_types::{ state_store::{StateViewId, TStateView}, @@ -250,8 +245,7 @@ impl DoGetExecutionOutput { transactions, state_view, onchain_config, - parent_block, - current_block, + transaction_slice_metadata, )?, _ => BlockOutput::new( transactions diff --git a/experimental/execution/ptx-executor/Cargo.toml b/experimental/execution/ptx-executor/Cargo.toml index 4ed757a8bba816..0da896d31500a3 100644 --- a/experimental/execution/ptx-executor/Cargo.toml +++ b/experimental/execution/ptx-executor/Cargo.toml @@ -13,7 +13,6 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] -aptos-crypto = { workspace = true } aptos-experimental-runtimes = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } diff --git a/experimental/execution/ptx-executor/src/lib.rs b/experimental/execution/ptx-executor/src/lib.rs index 649c0651af63ac..8c0d24104748f5 100644 --- a/experimental/execution/ptx-executor/src/lib.rs +++ b/experimental/execution/ptx-executor/src/lib.rs @@ -21,13 +21,13 @@ use crate::{ analyzer::PtxAnalyzer, finalizer::PtxFinalizer, metrics::TIMER, runner::PtxRunner, scheduler::PtxScheduler, sorter::PtxSorter, state_reader::PtxStateReader, }; -use aptos_crypto::HashValue; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::Mutex; use aptos_metrics_core::TimerHelper; use aptos_types::{ block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, + config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, + partitioner::PartitionedTransactions, }, state_store::StateView, transaction::{ @@ -54,8 +54,7 @@ impl VMBlockExecutor for PtxBlockExecutor { transactions: &[SignatureVerifiedTransaction], state_view: &(impl StateView + Sync), _onchain_config: BlockExecutorConfigFromOnchain, - _parent_block: Option<&HashValue>, - _current_block: Option, + _transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { let _timer = TIMER.timer_with(&["block_total"]); diff --git a/third_party/move/move-vm/runtime/src/storage/environment.rs b/third_party/move/move-vm/runtime/src/storage/environment.rs index bc152dd1cc06b7..a8bd47ca7d56b4 100644 --- a/third_party/move/move-vm/runtime/src/storage/environment.rs +++ b/third_party/move/move-vm/runtime/src/storage/environment.rs @@ -25,10 +25,9 @@ use move_core_types::{ identifier::{IdentStr, Identifier}, vm_status::{sub_status::unknown_invariant_violation::EPARANOID_FAILURE, StatusCode}, }; -use move_vm_types::{ - loaded_data::runtime_types::{StructIdentifier, StructNameIndex}, - sha3_256, -}; +#[cfg(any(test, feature = "testing"))] +use move_vm_types::loaded_data::runtime_types::{StructIdentifier, StructNameIndex}; +use move_vm_types::sha3_256; use std::sync::Arc; /// [MoveVM] runtime environment encapsulating different configurations. Shared between the VM and diff --git a/types/src/block_executor/execution_state.rs b/types/src/block_executor/execution_state.rs new file mode 100644 index 00000000000000..a06aa8391cab24 --- /dev/null +++ b/types/src/block_executor/execution_state.rs @@ -0,0 +1,107 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction::Version; +use aptos_crypto::HashValue; + +/// Specifies the kind of transactions for the block executor. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum TransactionSliceMetadata { + /// Block execution. Specifies the parent (executed) block, and the child (to be executed) + /// block. + Block { parent: HashValue, child: HashValue }, + /// Chunk execution, e.g., state sync or replay. Specifies the start and the end versions of + /// transaction slice. + Chunk { begin: Version, end: Version }, + /// The origin of transactions is not known, e.g., running a test. + Unknown, +} + +impl TransactionSliceMetadata { + pub fn unknown() -> Self { + Self::Unknown + } + + pub fn block(parent: HashValue, child: HashValue) -> Self { + Self::Block { parent, child } + } + + #[cfg(any(test, feature = "testing"))] + pub fn block_from_u64(parent: u64, child: u64) -> Self { + Self::Block { + parent: HashValue::from_u64(parent), + child: HashValue::from_u64(child), + } + } + + pub fn chunk(begin: Version, end: Version) -> Self { + Self::Chunk { begin, end } + } + + /// Returns the hash of the block where to append the state checkpoint (i.e., the current hash + /// of [TransactionSliceMetadata::Block]). For other variants, returns [None]. + pub fn append_state_checkpoint_to_block(&self) -> Option { + use TransactionSliceMetadata::*; + + match self { + Unknown => None, + Block { child, .. } => Some(*child), + Chunk { .. } => None, + } + } + + /// Returns true if transaction slice immediately follows the previous one. That is, if: + /// 1. Both are [TransactionSliceMetadata::Block] and the previous child is equal to the + /// current parent. + /// 2. Both are [TransactionSliceMetadata::Chunk] and the previous end version is equal to + /// the current start version. + pub fn is_immediately_after(&self, previous: &TransactionSliceMetadata) -> bool { + use TransactionSliceMetadata::*; + + match (previous, self) { + (Unknown, Unknown) + | (Unknown, Block { .. }) + | (Unknown, Chunk { .. }) + | (Block { .. }, Unknown) + | (Block { .. }, Chunk { .. }) + | (Chunk { .. }, Unknown) + | (Chunk { .. }, Block { .. }) => false, + (Block { child, .. }, Block { parent, .. }) => parent == child, + (Chunk { end, .. }, Chunk { begin, .. }) => begin == end, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_append_state_checkpoint_to_block() { + assert!(TransactionSliceMetadata::unknown() + .append_state_checkpoint_to_block() + .is_none()); + assert!(TransactionSliceMetadata::chunk(1, 2) + .append_state_checkpoint_to_block() + .is_none()); + + let parent = HashValue::from_u64(2); + let child = HashValue::from_u64(3); + let execution_state = TransactionSliceMetadata::block(parent, child); + assert_eq!( + execution_state.append_state_checkpoint_to_block(), + Some(child) + ); + } + + #[test] + fn test_is_immediately_after() { + let fst = TransactionSliceMetadata::block(HashValue::from_u64(2), HashValue::from_u64(3)); + let snd = TransactionSliceMetadata::block(HashValue::from_u64(3), HashValue::from_u64(4)); + assert!(snd.is_immediately_after(&fst)); + + let fst = TransactionSliceMetadata::block(HashValue::from_u64(2), HashValue::from_u64(3)); + let snd = TransactionSliceMetadata::block(HashValue::from_u64(4), HashValue::from_u64(5)); + assert!(!snd.is_immediately_after(&fst)); + } +} diff --git a/types/src/block_executor/mod.rs b/types/src/block_executor/mod.rs index ada88a8d0192ce..fb0081dacd7ca9 100644 --- a/types/src/block_executor/mod.rs +++ b/types/src/block_executor/mod.rs @@ -3,4 +3,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod config; +pub mod execution_state; pub mod partitioner;