diff --git a/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs b/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs index 9a408882901998..64627c634797d8 100644 --- a/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs +++ b/aptos-move/aptos-vm-types/src/module_and_script_storage/state_view_adapter.rs @@ -5,7 +5,7 @@ use crate::module_and_script_storage::module_storage::AptosModuleStorage; use ambassador::Delegate; use aptos_types::{ error::PanicError, - state_store::{state_key::StateKey, state_value::StateValueMetadata, StateView}, + state_store::{state_key::StateKey, state_value::StateValueMetadata, StateView, TStateView}, vm::modules::AptosModuleExtension, }; use bytes::Bytes; @@ -28,7 +28,7 @@ use move_vm_types::{ code::{ModuleBytesStorage, ModuleCode}, module_storage_error, }; -use std::sync::Arc; +use std::{ops::Deref, sync::Arc}; /// Avoids orphan rule to implement [ModuleBytesStorage] for [StateView]. struct StateViewAdapter<'s, S> { @@ -48,6 +48,14 @@ impl<'s, S: StateView> ModuleBytesStorage for StateViewAdapter<'s, S> { } } +impl<'s, S: StateView> Deref for StateViewAdapter<'s, S> { + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.state_view + } +} + /// A (not thread-safe) implementation of code storage on top of a state view. It is never built /// directly by clients - only via [AsAptosCodeStorage] trait. Can be used to resolve both modules /// and cached scripts. @@ -84,7 +92,7 @@ impl<'s, S: StateView, E: WithRuntimeEnvironment> AptosCodeStorageAdapter<'s, S, } /// Drains cached verified modules from the code storage, transforming them into format used by - /// global caches. Should only be called when the code storage borrows [StateView]. + /// global caches. pub fn into_verified_module_code_iter( self, ) -> Result< @@ -96,47 +104,42 @@ impl<'s, S: StateView, E: WithRuntimeEnvironment> AptosCodeStorageAdapter<'s, S, >, PanicError, > { - let state_view = match self.storage.module_storage().byte_storage().state_view { - BorrowedOrOwned::Borrowed(state_view) => state_view, - BorrowedOrOwned::Owned(_) => { - return Err(PanicError::CodeInvariantError( - "Verified modules should only be extracted from borrowed state".to_string(), - )) - }, - }; - - let mut modules_to_add = vec![]; - for (key, verified_code) in self + let (state_view, verified_modules_iter) = self .storage .into_module_storage() - .into_verified_modules_iter() - { - // We have cached the module previously, so we must be able to find it in storage. - let extension = state_view - .get_state_value(&StateKey::module_id(&key)) - .map_err(|err| { - let msg = format!( - "Failed to retrieve module {}::{} from storage {:?}", - key.address(), - key.name(), - err - ); - PanicError::CodeInvariantError(msg) - })? - .map(AptosModuleExtension::new) - .ok_or_else(|| { - let msg = format!( - "Module {}::{} should exist, but it does not anymore", - key.address(), - key.name() - ); - PanicError::CodeInvariantError(msg) - })?; - - let module = ModuleCode::from_verified_ref(verified_code, Arc::new(extension)); - modules_to_add.push((key, Arc::new(module))) - } - Ok(modules_to_add.into_iter()) + .unpack_into_verified_modules_iter(); + + Ok(verified_modules_iter + .map(|(key, verified_code)| { + // We have cached the module previously, so we must be able to find it in storage. + let extension = state_view + .get_state_value(&StateKey::module_id(&key)) + .map_err(|err| { + let msg = format!( + "Failed to retrieve module {}::{} from storage {:?}", + key.address(), + key.name(), + err + ); + PanicError::CodeInvariantError(msg) + })? + .map_or_else( + || { + let msg = format!( + "Module {}::{} should exist, but it does not anymore", + key.address(), + key.name() + ); + Err(PanicError::CodeInvariantError(msg)) + }, + |state_value| Ok(AptosModuleExtension::new(state_value)), + )?; + + let module = ModuleCode::from_arced_verified(verified_code, Arc::new(extension)); + Ok((key, Arc::new(module))) + }) + .collect::, PanicError>>()? + .into_iter()) } } diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 9f2f7a546f1dc5..68877952e80eec 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -39,8 +39,11 @@ use aptos_vm_logging::{ alert, flush_speculative_logs, init_speculative_logs, prelude::CRITICAL_ERRORS, }; use aptos_vm_types::{ - abstract_write_op::AbstractResourceWriteOp, module_and_script_storage::AsAptosCodeStorage, - module_write_set::ModuleWrite, output::VMOutput, resolver::ResourceGroupSize, + abstract_write_op::AbstractResourceWriteOp, + module_and_script_storage::{AptosCodeStorageAdapter, AsAptosCodeStorage}, + module_write_set::ModuleWrite, + output::VMOutput, + resolver::ResourceGroupSize, }; use move_binary_format::{ errors::{Location, VMError}, @@ -454,26 +457,23 @@ impl BlockAptosVM { let struct_name_index_map_size = runtime_environment .struct_name_index_map_size() .map_err(|err| err.finish(Location::Undefined).into_vm_status())?; - if struct_name_index_map_size > module_cache_config.max_struct_name_index_map_size { - module_cache.flush_unchecked(); + if struct_name_index_map_size > module_cache_config.max_struct_name_index_map_num_entries { + module_cache.flush_unsync(); runtime_environment.flush_struct_name_and_info_caches(); } // Check 2: If the module cache is too big, flush it. - if module_cache - .size_in_bytes_is_greater_than(module_cache_config.max_module_cache_size_in_bytes) - { - module_cache.flush_unchecked(); + if module_cache.size_in_bytes() > module_cache_config.max_module_cache_size_in_bytes { + module_cache.flush_unsync(); } // Finally, to avoid cold starts, fetch the framework code prior to block execution. if module_cache.num_modules() == 0 && module_cache_config.prefetch_framework_code { - prefetch_aptos_framework(environment.clone(), state_view, &module_cache).map_err( - |err| { - alert!("Failed to load Aptos framework to module cache: {:?}", err); - VMError::from(err).into_vm_status() - }, - )?; + let code_storage = state_view.as_aptos_code_storage(environment.clone()); + prefetch_aptos_framework(code_storage, &module_cache).map_err(|err| { + alert!("Failed to load Aptos framework to module cache: {:?}", err); + VMError::from(err).into_vm_status() + })?; } let executor = BlockExecutor::< @@ -491,8 +491,6 @@ impl BlockAptosVM { let ret = executor.execute_block(environment, signature_verified_block, state_view); if !module_cache_manager.mark_done() { - // Something is wrong as we were not able to mark execution as done. Return an - // error. return Err(VMStatus::error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, Some("Unable to mark block execution as done".to_string()), @@ -560,13 +558,10 @@ impl BlockAptosVM { /// If Aptos framework exists, loads "transaction_validation.move" and all its transitive /// dependencies from storage into provided module cache. If loading fails for any reason, a panic /// error is returned. -fn prefetch_aptos_framework( - environment: AptosEnvironment, - state_view: &impl StateView, +fn prefetch_aptos_framework( + code_storage: AptosCodeStorageAdapter, module_cache: &GlobalModuleCache, ) -> Result<(), PanicError> { - let code_storage = state_view.as_aptos_code_storage(environment); - // If framework code exists in storage, the transitive closure will be verified and cached. let maybe_loaded = code_storage .fetch_verified_module(&AccountAddress::ONE, ident_str!("transaction_validation")) @@ -576,13 +571,11 @@ fn prefetch_aptos_framework( PanicError::CodeInvariantError(format!("Unable to fetch Aptos framework: {:?}", err)) })?; - if let Some(module) = maybe_loaded { - drop(module); - + if maybe_loaded.is_some() { // Framework must have been loaded. Drain verified modules from local cache into // global cache. let verified_module_code_iter = code_storage.into_verified_module_code_iter()?; - module_cache.insert_verified_unchecked(verified_module_code_iter)?; + module_cache.insert_verified_unsync(verified_module_code_iter)?; } Ok(()) } @@ -598,10 +591,12 @@ mod test { let state_view = executor.get_state_view(); let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); + let code_storage = state_view.as_aptos_code_storage(environment); + let module_cache = GlobalModuleCache::empty(); assert_eq!(module_cache.num_modules(), 0); - let result = prefetch_aptos_framework(environment, state_view, &module_cache); + let result = prefetch_aptos_framework(code_storage, &module_cache); assert!(result.is_ok()); assert!(module_cache.num_modules() > 0); } @@ -612,10 +607,12 @@ mod test { let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view); + let code_storage = state_view.as_aptos_code_storage(environment); + let module_cache = GlobalModuleCache::empty(); assert_eq!(module_cache.num_modules(), 0); - let result = prefetch_aptos_framework(environment, &state_view, &module_cache); + let result = prefetch_aptos_framework(code_storage, &module_cache); assert!(result.is_ok()); assert_eq!(module_cache.num_modules(), 0); } diff --git a/aptos-move/aptos-vm/tests/sharded_block_executor.rs b/aptos-move/aptos-vm/tests/sharded_block_executor.rs index da872b40493675..1dcfbea2dd0f93 100644 --- a/aptos-move/aptos-vm/tests/sharded_block_executor.rs +++ b/aptos-move/aptos-vm/tests/sharded_block_executor.rs @@ -308,11 +308,11 @@ mod test_utils { .into_iter() .map(|t| t.into_txn()) .collect(); - let executor = AptosVMBlockExecutor::new(); - if let Some(module_cache_manager) = executor.module_cache_manager() { + let block_executor = AptosVMBlockExecutor::new(); + if let Some(module_cache_manager) = block_executor.module_cache_manager() { module_cache_manager.mark_ready(None, None); } - let unsharded_txn_output = executor + let unsharded_txn_output = block_executor .execute_block_no_limit(&ordered_txns, executor.data_store()) .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); @@ -362,11 +362,11 @@ mod test_utils { ) .unwrap(); - let executor = AptosVMBlockExecutor::new(); - if let Some(module_cache_manager) = executor.module_cache_manager() { + let block_executor = AptosVMBlockExecutor::new(); + if let Some(module_cache_manager) = block_executor.module_cache_manager() { module_cache_manager.mark_ready(None, None); } - let unsharded_txn_output = executor + let unsharded_txn_output = block_executor .execute_block_no_limit(&execution_ordered_txns, executor.data_store()) .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); @@ -420,11 +420,11 @@ mod test_utils { ) .unwrap(); - let executor = AptosVMBlockExecutor::new(); - if let Some(module_cache_manager) = executor.module_cache_manager() { + let block_executor = AptosVMBlockExecutor::new(); + if let Some(module_cache_manager) = block_executor.module_cache_manager() { module_cache_manager.mark_ready(None, None); } - let unsharded_txn_output = executor + let unsharded_txn_output = block_executor .execute_block_no_limit(&execution_ordered_txns, executor.data_store()) .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs index c1214baa633db8..834becdcd4c948 100644 --- a/aptos-move/block-executor/src/captured_reads.rs +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -295,11 +295,11 @@ impl DelayedFieldRead { /// Represents a module read, either from immutable cross-block cache, or from code [SyncCodeCache] /// used by block executor (per-block cache). This way, when transaction needs to read a module /// from [SyncCodeCache] it can first check the read-set here. -enum ModuleRead { +enum ModuleRead { /// Read from the cross-block module cache. GlobalCache, /// Read from per-block cache ([SyncCodeCache]) used by parallel execution. - PerBlockCache(Option<(Arc>, V)>), + PerBlockCache(Option<(Arc>, Option)>), } /// Represents a result of a read from [CapturedReads] when they are used as the transaction-level @@ -326,7 +326,7 @@ pub(crate) struct CapturedReads { #[deprecated] pub(crate) deprecated_module_reads: Vec, - module_reads: hashbrown::HashMap>>, + module_reads: hashbrown::HashMap>, /// If there is a speculative failure (e.g. delta application failure, or an observed /// inconsistency), the transaction output is irrelevant (must be discarded and transaction @@ -1578,12 +1578,12 @@ mod test { assert!(!valid); // Without invalid module (and if it is not captured), validation should pass. - global_module_cache.remove(&1); + assert!(global_module_cache.remove(&1)); captured_reads.module_reads.remove(&1); assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache)); // Validation fails if we captured a cross-block module which does not exist anymore. - global_module_cache.remove(&0); + assert!(global_module_cache.remove(&0)); let valid = captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache); assert!(!valid); diff --git a/aptos-move/block-executor/src/code_cache_global.rs b/aptos-move/block-executor/src/code_cache_global.rs index d8fa927a600084..fef08f5f5c9406 100644 --- a/aptos-move/block-executor/src/code_cache_global.rs +++ b/aptos-move/block-executor/src/code_cache_global.rs @@ -3,7 +3,6 @@ use crate::explicit_sync_wrapper::ExplicitSyncWrapper; use aptos_types::error::PanicError; -use crossbeam::utils::CachePadded; use hashbrown::HashMap; use move_vm_types::code::{ModuleCode, WithSize}; use std::{ @@ -16,22 +15,22 @@ use std::{ }; /// Entry stored in [GlobalModuleCache]. -struct Entry { +struct Entry { /// True if this code is "valid" within the block execution context (i.e., there has been no /// republishing of this module so far). If false, executor needs to read the module from the /// sync/unsync module caches. - valid: CachePadded, + valid: AtomicBool, /// Cached verified module. Must always be verified. - module: CachePadded>>, + module: Arc>, } -impl Entry +impl Entry where - VC: Deref>, - E: WithSize, + Verified: Deref>, + Extension: WithSize, { /// Returns a new valid module. Returns a (panic) error if the module is not verified. - fn new(module: Arc>) -> Result { + fn new(module: Arc>) -> Result { if !module.code().is_verified() { return Err(PanicError::CodeInvariantError( "Module code is not verified".to_string(), @@ -39,8 +38,8 @@ where } Ok(Self { - valid: CachePadded::new(AtomicBool::new(true)), - module: CachePadded::new(module), + valid: AtomicBool::new(true), + module, }) } @@ -55,24 +54,24 @@ where } /// Returns the module code stored is this [Entry]. - fn module_code(&self) -> &Arc> { - self.module.deref() + fn module_code(&self) -> &Arc> { + &self.module } } -/// A read-only module cache for verified code, that can be accessed concurrently within the block. -/// Can only be modified safely at block boundaries. -pub struct GlobalModuleCache { +/// A global module cache for verified code that is read-only and concurrently accessed during the +/// block execution. Modified safely only at block boundaries. +pub struct GlobalModuleCache { /// Module cache containing the verified code. - module_cache: ExplicitSyncWrapper>>, + module_cache: ExplicitSyncWrapper>>, /// Sum of serialized sizes (in bytes) of all cached modules. size: AtomicUsize, } -impl GlobalModuleCache +impl GlobalModuleCache where K: Hash + Eq + Clone, - VC: Deref>, + V: Deref>, E: WithSize, { /// Returns new empty module cache. @@ -92,27 +91,21 @@ where } /// Marks the cached module (if it exists) as invalid. As a result, all subsequent calls to the - /// cache for the associated key will result in a cache miss. If an entry does not to exist, is - /// a no-op. + /// cache for the associated key will result in a cache miss. If an entry does not to exist, it + /// is a no-op. pub fn mark_invalid_if_contains(&self, key: &K) { if let Some(entry) = self.module_cache.acquire().get(key) { entry.mark_invalid(); } } - /// Returns the module stored in cache. If the module has not been cached, or it exists but it - /// is not valid, [None] is returned. - pub fn get(&self, key: &K) -> Option>> { + /// Returns the module stored in cache. If the module has not been cached, or it exists but is + /// not valid, [None] is returned. + pub fn get(&self, key: &K) -> Option>> { self.module_cache .acquire() .get(key) - .and_then(|entry| entry.is_valid().then(|| entry.module_code().clone())) - } - - /// Flushes the cache. Should never be called throughout block-execution. Use with caution. - pub fn flush_unchecked(&self) { - self.module_cache.acquire().clear(); - self.size.store(0, Ordering::Relaxed); + .and_then(|entry| entry.is_valid().then(|| Arc::clone(entry.module_code()))) } /// Returns the number of entries in the cache. @@ -125,22 +118,24 @@ where self.size.load(Ordering::Relaxed) } - /// Returns true if the sum of serialized sizes of modules stored in cache is greater than the - /// specified value. - pub fn size_in_bytes_is_greater_than(&self, size: usize) -> bool { - self.size_in_bytes() > size + /// **Use with caution: should never be called during block execution.** + /// + /// Flushes the module cache. + pub fn flush_unsync(&self) { + self.module_cache.acquire().clear(); + self.size.store(0, Ordering::Relaxed); } - /// Inserts modules into the cache. Should never be called throughout block-execution. Use with - /// caution. + /// **Use with caution: should never be called during block execution.** /// + /// Inserts modules into the cache. /// Notes: /// 1. Only verified modules are inserted. /// 2. Valid modules should not be removed, and new modules should have unique ownership. If /// these constraints are violated, a panic error is returned. - pub fn insert_verified_unchecked( + pub fn insert_verified_unsync( &self, - modules: impl Iterator>)>, + modules: impl Iterator>)>, ) -> Result<(), PanicError> { use hashbrown::hash_map::Entry::*; @@ -177,7 +172,7 @@ where /// Insert the module to cache. Used for tests only. #[cfg(any(test, feature = "testing"))] - pub fn insert(&self, key: K, module: Arc>) { + pub fn insert(&self, key: K, module: Arc>) { self.size .fetch_add(module.extension().size_in_bytes(), Ordering::Relaxed); self.module_cache.acquire().insert( @@ -186,14 +181,18 @@ where ); } - /// Removes the module from cache. Used for tests only. + /// Removes the module from cache and returns true. If the module does not exist for the + /// associated key, returns false. Used for tests only. #[cfg(any(test, feature = "testing"))] - pub fn remove(&self, key: &K) { + pub fn remove(&self, key: &K) -> bool { if let Some(entry) = self.module_cache.acquire().remove(key) { self.size.fetch_sub( entry.module_code().extension().size_in_bytes(), Ordering::Relaxed, ); + true + } else { + false } } } @@ -251,15 +250,11 @@ mod test { assert_eq!(cache.num_modules(), 3); assert_eq!(cache.size_in_bytes(), 32); - cache.remove(&2); + assert!(cache.remove(&2)); assert_eq!(cache.num_modules(), 2); assert_eq!(cache.size_in_bytes(), 24); - assert!(cache.size_in_bytes_is_greater_than(23)); - assert!(!cache.size_in_bytes_is_greater_than(24)); - assert!(!cache.size_in_bytes_is_greater_than(25)); - - cache.flush_unchecked(); + cache.flush_unsync(); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); } @@ -273,7 +268,7 @@ mod test { new_modules.push((i, mock_verified_code(i, MockExtension::new(8)))); } assert!(cache - .insert_verified_unchecked(new_modules.into_iter()) + .insert_verified_unsync(new_modules.into_iter()) .is_ok()); assert_eq!(cache.num_modules(), 10); @@ -285,7 +280,7 @@ mod test { let cache = GlobalModuleCache::empty(); let deserialized_modules = vec![(0, mock_deserialized_code(0, MockExtension::new(8)))]; - assert_ok!(cache.insert_verified_unchecked(deserialized_modules.into_iter())); + assert_ok!(cache.insert_verified_unsync(deserialized_modules.into_iter())); assert_eq!(cache.num_modules(), 0); assert_eq!(cache.size_in_bytes(), 0); @@ -300,7 +295,7 @@ mod test { assert_eq!(cache.size_in_bytes(), 8); let new_modules = vec![(0, mock_verified_code(100, MockExtension::new(32)))]; - assert_err!(cache.insert_verified_unchecked(new_modules.into_iter())); + assert_err!(cache.insert_verified_unsync(new_modules.into_iter())); } #[test] @@ -313,7 +308,7 @@ mod test { assert_eq!(cache.size_in_bytes(), 8); let new_modules = vec![(0, mock_verified_code(100, MockExtension::new(32)))]; - assert_ok!(cache.insert_verified_unchecked(new_modules.into_iter())); + assert_ok!(cache.insert_verified_unsync(new_modules.into_iter())); assert_eq!(cache.num_modules(), 1); assert_eq!(cache.size_in_bytes(), 32); diff --git a/aptos-move/block-executor/src/code_cache_global_manager.rs b/aptos-move/block-executor/src/code_cache_global_manager.rs index 7fabddeb4242e3..5793feab4c8b0b 100644 --- a/aptos-move/block-executor/src/code_cache_global_manager.rs +++ b/aptos-move/block-executor/src/code_cache_global_manager.rs @@ -10,6 +10,7 @@ use parking_lot::Mutex; use std::{ fmt::Debug, hash::Hash, + mem, ops::{Deref, DerefMut}, sync::Arc, }; @@ -32,6 +33,13 @@ macro_rules! alert_or_println { /// 2. [State::Ready] --> [State::Executing]. /// 3. [State::Executing] --> [State::Done]. /// 4. [State::Done] --> [State::Ready]. +/// The optional value stored in variants is propagated during state transitions. When a full cycle +/// is reached (just before [State::Done] to [State::Ready] transition), the user can check if the +/// value is expected and continue with a new one. For instance: +/// ```text +/// Ready(Some(0)) --> Executing(Some(0)) --> Done(Some(0)) --> Ready(Some(1)) is allowed. +/// Ready(Some(0)) --> Executing(Some(0)) --> Done(Some(0)) --> Ready(Some(2)) is not allowed. +/// ``` #[derive(Clone, Debug, Eq, PartialEq)] enum State { Ready(Option), @@ -39,47 +47,6 @@ enum State { Done(Option), } -impl State { - /// If the state is [State::Ready], returns its value. Otherwise, returns [None]. - fn value_from_ready(&self) -> Option> { - match self { - State::Ready(v) => Some(v.clone()), - _ => None, - } - } - - /// If the state is [State::Executing], returns its value. Otherwise, returns [None]. - fn value_from_executing(&self) -> Option> { - match self { - State::Executing(v) => Some(v.clone()), - _ => None, - } - } - - /// If the state is [State::Done], returns its value. Otherwise, returns [None]. - fn value_from_done(&self) -> Option> { - match self { - State::Done(v) => Some(v.clone()), - _ => None, - } - } - - /// Sets the current state to [State::Ready]. - fn set_ready(&mut self, value: Option) { - *self = Self::Ready(value); - } - - /// Sets the current state to [State::Executing]. - fn set_executing(&mut self, value: Option) { - *self = Self::Executing(value); - } - - /// Sets the current state to [State::Done]. - fn set_done(&mut self, value: Option) { - *self = Self::Done(value); - } -} - /// Manages module caches and the execution environment, possible across multiple blocks. pub struct ModuleCacheManager { /// The state of global caches. @@ -96,7 +63,7 @@ pub struct ModuleCacheManager { impl ModuleCacheManager where - T: Clone + Debug + Eq, + T: Debug + Eq, K: Hash + Eq + Clone, VC: Deref>, E: WithSize, @@ -118,37 +85,35 @@ where pub fn mark_ready(&self, previous: Option<&T>, current: Option) -> bool { let mut state = self.state.lock(); - let recorded_previous = state.value_from_done(); - match (recorded_previous, previous) { - (None, _) => { - // We are not in the done state, this is an error. - alert_or_println!( - "Unable to mark ready, state: {:?}, previous: {:?}, current: {:?}", - state, - previous, - current - ); - false - }, - (Some(Some(recorded_previous)), Some(previous)) if recorded_previous.eq(previous) => { - // We are in done state with matching values. Can mark ready. - state.set_ready(current); - true - }, - _ => { - // If the state is done, but the values do not exist or do not match, we still set - // the state as ready, but also flush global caches because they execute on top of - // unknown state (or on top of some different to previous state). - self.module_cache.flush_unchecked(); + if let State::Done(recorded_previous) = state.deref() { + // If the state is done, but the values do not exist or do not match, we flush global + // caches because they execute on top of unknown state (or on top of some different to + // the previous state). + if !recorded_previous + .as_ref() + .is_some_and(|r| previous.is_some_and(|p| r == p)) + { if let Some(environment) = self.environment.acquire().as_ref() { environment .runtime_environment() .flush_struct_name_and_info_caches(); + self.module_cache.flush_unsync(); + } else { + debug_assert!(self.module_cache.num_modules() == 0); } + } - state.set_ready(current); - true - }, + *state = State::Ready(current); + true + } else { + // We are not in the done state, this is an error. + alert_or_println!( + "Unable to mark ready, state: {:?}, previous: {:?}, current: {:?}", + state, + previous, + current + ); + false } } @@ -157,13 +122,13 @@ where /// alert. pub fn mark_executing(&self) -> bool { let mut state = self.state.lock(); - if let Some(value) = state.value_from_ready() { - state.set_executing(value); - return true; + if let State::Ready(v) = state.deref_mut() { + *state = State::Executing(mem::take(v)); + true + } else { + alert_or_println!("Unable to mark executing, state: {:?}", state); + false } - - alert_or_println!("Unable to mark executing, state: {:?}", state); - false } /// If state is [State::Executing], changes it to [State::Done] with the same value, returning @@ -171,13 +136,13 @@ where /// alert. pub fn mark_done(&self) -> bool { let mut state = self.state.lock(); - if let Some(value) = state.value_from_executing() { - state.set_done(value); - return true; + if let State::Executing(v) = state.deref_mut() { + *state = State::Done(mem::take(v)); + true + } else { + alert_or_println!("Unable to mark done, state: {:?}", state); + false } - - alert_or_println!("Unable to mark done, state: {:?}", state); - false } /// Returns the cached global environment if it already exists, and matches the one in storage. @@ -192,28 +157,20 @@ where let mut guard = self.environment.acquire(); let existing_environment = guard.deref_mut(); - let (environment, is_new) = match existing_environment.as_ref() { - None => { - *existing_environment = Some(new_environment.clone()); - (new_environment, true) - }, - Some(environment) => { - if environment == &new_environment { - (environment.clone(), false) - } else { - *existing_environment = Some(new_environment.clone()); - (new_environment, true) - } - }, - }; + let environment_requires_update = existing_environment + .as_ref() + .map_or(true, |environment| environment == &new_environment); + if environment_requires_update { + *existing_environment = Some(new_environment); - // If this environment has been (re-)initialized, we need to flush the module cache because - // it can contain now out-dated code. - if is_new { - self.module_cache.flush_unchecked(); + // If this environment has been (re-)initialized, we need to flush the module cache + // because it can contain now out-dated code. + self.module_cache.flush_unsync(); } - environment + existing_environment + .clone() + .expect("Environment must be set") } /// Returns the global module cache. @@ -229,60 +186,12 @@ mod test { on_chain_config::{FeatureFlag, Features, OnChainConfig}, state_store::{state_key::StateKey, state_value::StateValue, MockStateView}, }; - use claims::assert_matches; use move_vm_types::code::{ mock_verified_code, MockDeserializedCode, MockExtension, MockVerifiedCode, }; use std::{collections::HashMap, thread, thread::JoinHandle}; use test_case::test_case; - #[test_case(None)] - #[test_case(Some(0))] - fn test_ready_state(value: Option) { - let state = State::Ready(value); - - assert_eq!(state.value_from_ready(), Some(value)); - assert!(state.value_from_executing().is_none()); - assert!(state.value_from_done().is_none()); - } - - #[test_case(None)] - #[test_case(Some(0))] - fn test_executing_state(value: Option) { - let state = State::Executing(value); - - assert!(state.value_from_ready().is_none()); - assert_eq!(state.value_from_executing(), Some(value)); - assert!(state.value_from_done().is_none()); - } - - #[test_case(None)] - #[test_case(Some(0))] - fn test_done_state(value: Option) { - let state = State::Done(value); - - assert!(state.value_from_ready().is_none()); - assert!(state.value_from_executing().is_none()); - assert_eq!(state.value_from_done(), Some(value)); - } - - #[test] - fn test_set_state() { - let mut state = State::Done(None); - - state.set_ready(Some(0)); - assert_matches!(state, State::Ready(Some(0))); - - state.set_executing(Some(10)); - assert_matches!(state, State::Executing(Some(10))); - - state.set_done(Some(100)); - assert_matches!(state, State::Done(Some(100))); - - state.set_ready(Some(1000)); - assert_matches!(state, State::Ready(Some(1000))); - } - #[test_case(None, None)] #[test_case(None, Some(1))] #[test_case(Some(0), None)] @@ -290,10 +199,7 @@ mod test { #[test_case(Some(0), Some(0))] fn test_mark_ready(recorded_previous: Option, previous: Option) { let module_cache_manager = ModuleCacheManager::new(); - module_cache_manager - .state - .lock() - .set_done(recorded_previous); + *module_cache_manager.state.lock() = State::Done(recorded_previous); // Pre-populate module cache to test flushing. module_cache_manager @@ -326,7 +232,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - module_cache_manager.state.lock().set_ready(Some(100)); + *module_cache_manager.state.lock() = State::Ready(Some(100)); assert!(!module_cache_manager.mark_ready(Some(&76), Some(77))); assert!(!module_cache_manager.mark_done()); @@ -346,7 +252,7 @@ mod test { MockVerifiedCode, MockExtension, >::new(); - module_cache_manager.state.lock().set_executing(Some(100)); + *module_cache_manager.state.lock() = State::Executing(Some(100)); assert!(!module_cache_manager.mark_ready(Some(&76), Some(77))); assert!(!module_cache_manager.mark_executing()); diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 151dc89522013f..603bce0fbe1abc 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -618,9 +618,9 @@ where // Publish modules before we decrease validation index so that validations observe // the new module writes as well. if runtime_environment.vm_config().use_loader_v2 { - executed_at_commit = true; let module_write_set = last_input_output.module_write_set(txn_idx); if !module_write_set.is_empty() { + executed_at_commit = true; Self::publish_module_writes( txn_idx, module_write_set, @@ -1169,7 +1169,7 @@ where counters::update_state_counters(versioned_cache.stats(), true); self.global_module_cache - .insert_verified_unchecked(versioned_cache.take_modules_iter()) + .insert_verified_unsync(versioned_cache.take_modules_iter()) .map_err(|err| { alert!("[BlockSTM] Encountered panic error: {:?}", err); })?; @@ -1647,7 +1647,7 @@ where counters::update_state_counters(unsync_map.stats(), false); self.global_module_cache - .insert_verified_unchecked(unsync_map.into_modules_iter())?; + .insert_verified_unsync(unsync_map.into_modules_iter())?; let block_end_info = if self .config @@ -1709,7 +1709,7 @@ where // Flush the cache and the environment to re-run from the "clean" state. env.runtime_environment() .flush_struct_name_and_info_caches(); - self.global_module_cache.flush_unchecked(); + self.global_module_cache.flush_unsync(); info!("parallel execution requiring fallback"); } diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs index 150d13b5b5f4c7..bfd2c36cd17761 100644 --- a/execution/executor-service/src/test_utils.rs +++ b/execution/executor-service/src/test_utils.rs @@ -137,11 +137,11 @@ pub fn test_sharded_block_executor_no_conflict> .into_iter() .map(|t| t.into_txn()) .collect(); - let executor = AptosVMBlockExecutor::new(); - if let Some(module_cache_manager) = executor.module_cache_manager() { + let block_executor = AptosVMBlockExecutor::new(); + if let Some(module_cache_manager) = block_executor.module_cache_manager() { module_cache_manager.mark_ready(None, None); } - let unsharded_txn_output = executor + let unsharded_txn_output = block_executor .execute_block_no_limit(&txns, executor.data_store()) .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); @@ -196,11 +196,11 @@ pub fn sharded_block_executor_with_conflict>( ) .unwrap(); - let executor = AptosVMBlockExecutor::new(); - if let Some(module_cache_manager) = executor.module_cache_manager() { + let block_executor = AptosVMBlockExecutor::new(); + if let Some(module_cache_manager) = block_executor.module_cache_manager() { module_cache_manager.mark_ready(None, None); } - let unsharded_txn_output = executor + let unsharded_txn_output = block_executor .execute_block_no_limit(&execution_ordered_txns, executor.data_store()) .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); diff --git a/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs b/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs index 8138bac66e02cb..186b0b86f875ff 100644 --- a/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs +++ b/third_party/move/move-vm/runtime/src/storage/implementations/unsync_module_storage.rs @@ -165,17 +165,24 @@ impl<'s, S: ModuleBytesStorage, E: WithRuntimeEnvironment> UnsyncModuleStorage<' } /// Returns an iterator of all modules that have been cached and verified. - pub fn into_verified_modules_iter(self) -> impl Iterator)> { - self.0 - .module_cache - .into_modules_iter() - .flat_map(|(key, module)| { - module.code().is_verified().then(|| { - // TODO(loader_v2): - // We should be able to take ownership here, instead of clones. - (key, module.code().verified().clone()) - }) - }) + pub fn unpack_into_verified_modules_iter( + self, + ) -> ( + BorrowedOrOwned<'s, S>, + impl Iterator)>, + ) { + let verified_modules_iter = + self.0 + .module_cache + .into_modules_iter() + .flat_map(|(key, module)| { + module.code().is_verified().then(|| { + // TODO(loader_v2): + // We should be able to take ownership here, instead of clones. + (key, module.code().verified().clone()) + }) + }); + (self.0.base_storage, verified_modules_iter) } /// Test-only method that checks the state of the module cache. @@ -185,15 +192,17 @@ impl<'s, S: ModuleBytesStorage, E: WithRuntimeEnvironment> UnsyncModuleStorage<' deserialized: Vec<&'b ModuleId>, verified: Vec<&'b ModuleId>, ) { + use claims::*; + assert_eq!(self.0.num_modules(), deserialized.len() + verified.len()); for id in deserialized { let result = self.0.get_module_or_build_with(id, &self.0); - let module = claims::assert_some!(claims::assert_ok!(result)).0; + let module = assert_some!(assert_ok!(result)).0; assert!(!module.code().is_verified()) } for id in verified { let result = self.0.get_module_or_build_with(id, &self.0); - let module = claims::assert_some!(claims::assert_ok!(result)).0; + let module = assert_some!(assert_ok!(result)).0; assert!(module.code().is_verified()) } } diff --git a/third_party/move/move-vm/types/src/code/cache/module_cache.rs b/third_party/move/move-vm/types/src/code/cache/module_cache.rs index e3663c4f388dac..7697c20e3a8c6e 100644 --- a/third_party/move/move-vm/types/src/code/cache/module_cache.rs +++ b/third_party/move/move-vm/types/src/code/cache/module_cache.rs @@ -32,13 +32,13 @@ where /// Creates new [ModuleCode] from verified code. pub fn from_verified(verified_code: VC, extension: Arc) -> Self { - Self::from_verified_ref(Arc::new(verified_code), extension) + Self::from_arced_verified(Arc::new(verified_code), extension) } /// Creates new [ModuleCode] from [Arc]ed verified code. - pub fn from_verified_ref(verified_code: Arc, extension: Arc) -> Self { + pub fn from_arced_verified(verified_code: Arc, extension: Arc) -> Self { Self { - code: Code::from_verified_ref(verified_code), + code: Code::from_arced_verified(verified_code), extension, } } diff --git a/third_party/move/move-vm/types/src/code/cache/test_types.rs b/third_party/move/move-vm/types/src/code/cache/test_types.rs index 793d5f11951ef6..1ad64e4bb15901 100644 --- a/third_party/move/move-vm/types/src/code/cache/test_types.rs +++ b/third_party/move/move-vm/types/src/code/cache/test_types.rs @@ -56,21 +56,21 @@ pub fn mock_verified_code( #[derive(Clone, Debug)] pub struct MockExtension { - size: usize, + mock_size: usize, } impl MockExtension { - pub fn new(size: usize) -> Self { - Self { size } + pub fn new(mock_size: usize) -> Self { + Self { mock_size } } } impl WithSize for MockExtension { fn size_in_bytes(&self) -> usize { - self.size + self.mock_size } } -pub fn mock_extension(size: usize) -> Arc { - Arc::new(MockExtension::new(size)) +pub fn mock_extension(mock_size: usize) -> Arc { + Arc::new(MockExtension::new(mock_size)) } diff --git a/third_party/move/move-vm/types/src/code/cache/types.rs b/third_party/move/move-vm/types/src/code/cache/types.rs index 09f51096b9cb99..12cd078c89d43f 100644 --- a/third_party/move/move-vm/types/src/code/cache/types.rs +++ b/third_party/move/move-vm/types/src/code/cache/types.rs @@ -68,7 +68,7 @@ where } /// Returns new verified code from [Arc]ed instance. - pub fn from_verified_ref(verified_code: Arc) -> Self { + pub fn from_arced_verified(verified_code: Arc) -> Self { Self::Verified(verified_code) } diff --git a/types/src/block_executor/config.rs b/types/src/block_executor/config.rs index 95e66d1609850f..1342eba0de53f0 100644 --- a/types/src/block_executor/config.rs +++ b/types/src/block_executor/config.rs @@ -15,7 +15,7 @@ pub struct BlockExecutorModuleCacheLocalConfig { pub max_module_cache_size_in_bytes: usize, /// The maximum size (in terms of entries) of struct name re-indexing map stored in the runtime /// environment. - pub max_struct_name_index_map_size: usize, + pub max_struct_name_index_map_num_entries: usize, } impl Default for BlockExecutorModuleCacheLocalConfig { @@ -24,7 +24,7 @@ impl Default for BlockExecutorModuleCacheLocalConfig { prefetch_framework_code: true, // Use 50 Mb for now, should be large enough to cache many modules. max_module_cache_size_in_bytes: 50 * 1024 * 1024, - max_struct_name_index_map_size: 100_000, + max_struct_name_index_map_num_entries: 100_000, } } } @@ -39,8 +39,6 @@ pub struct BlockExecutorLocalConfig { // If true, we will discard the failed blocks and continue with the next block. // (allow_fallback needs to be set) pub discard_failed_blocks: bool, - - /// Various cache configurations, see [BlockExecutorModuleCacheLocalConfig] for more details. pub module_cache_config: BlockExecutorModuleCacheLocalConfig, }