From ca6a5048c40f28e68f276f044123834cc3d0a0ee Mon Sep 17 00:00:00 2001 From: Rati Gelashvili Date: Sat, 19 Nov 2022 11:28:56 -0800 Subject: [PATCH] Merge sequential and parallel flows --- .../src/aptos_test_harness.rs | 15 +- .../aptos_test_harness/call_function.exp | 4 +- .../aptos_test_harness/execute_script.exp | 2 +- .../aptos_test_harness/publish_module.exp | 2 +- .../write_set_too_large.exp | 2 +- aptos-move/aptos-vm/src/adapter_common.rs | 72 --------- aptos-move/aptos-vm/src/aptos_vm.rs | 39 +---- aptos-move/aptos-vm/src/data_cache.rs | 41 ++--- .../aptos-vm/src/parallel_executor/mod.rs | 145 ++++++++++++------ .../src/parallel_executor/vm_wrapper.rs | 100 ++++++++---- aptos-move/e2e-tests/src/executor.rs | 23 +-- aptos-move/parallel-executor/src/executor.rs | 67 ++++++-- .../src/proptest_types/bencher.rs | 8 +- .../src/proptest_types/tests.rs | 10 +- .../src/proptest_types/types.rs | 26 +++- aptos-move/parallel-executor/src/task.rs | 14 +- .../parallel-executor/src/unit_tests/mod.rs | 4 +- 17 files changed, 299 insertions(+), 275 deletions(-) diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index 0dd044487fb94..bb7dbac8f79df 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -25,7 +25,7 @@ use aptos_types::{ }; use aptos_vm::{ data_cache::{AsMoveResolver, IntoMoveResolver, StorageAdapterOwned}, - AptosVM, + AptosVM, VMExecutor, }; use clap::StructOpt; use language_e2e_tests::data_store::{FakeDataStore, GENESIS_CHANGE_SET_HEAD}; @@ -472,23 +472,26 @@ impl<'a> AptosTestAdapter<'a> { /// Should error if the transaction ends up being discarded, or having a status other than /// EXECUTED. fn run_transaction(&mut self, txn: Transaction) -> Result { - let mut outputs = AptosVM::execute_block_and_keep_vm_status(vec![txn], &self.storage)?; + let mut outputs = AptosVM::execute_block(vec![txn], &self.storage)?; assert_eq!(outputs.len(), 1); - let (status, output) = outputs.pop().unwrap(); + let output = outputs.pop().unwrap(); match output.status() { TransactionStatus::Keep(kept_vm_status) => { self.storage.add_write_set(output.write_set()); match kept_vm_status { ExecutionStatus::Success => Ok(output), _ => { - bail!("Failed to execute transaction. ExecutionStatus: {}", status) + bail!( + "Failed to execute transaction. ExecutionStatus: {:?}", + kept_vm_status + ) } } } - TransactionStatus::Discard(_) => { - bail!("Transaction discarded. VMStatus: {}", status) + TransactionStatus::Discard(status_code) => { + bail!("Transaction discarded. VM status code: {:?}", status_code) } TransactionStatus::Retry => panic!(), } diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp index 9d293cca30eb5..a06f83af2ac9b 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp @@ -1,10 +1,10 @@ processed 4 tasks task 1 'publish'. lines 4-30: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 2 'run'. lines 33-33: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 3 'view'. lines 35-35: [No Resource Exists] diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp index 919e07af2bf8b..b246fa070b510 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp @@ -1,7 +1,7 @@ processed 3 tasks task 1 'run'. lines 5-13: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 2 'view'. lines 15-15: key 0x1::coin::CoinStore<0x1::aptos_coin::AptosCoin> { diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp index 216a415f529ca..5a8a9d468c39d 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp @@ -1,4 +1,4 @@ processed 2 tasks task 1 'publish'. lines 4-9: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp index 8bf428496daaa..82b73cc2a8810 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp @@ -1,4 +1,4 @@ processed 3 tasks task 2 'run'. lines 29-29: -Error: Failed to execute transaction. ExecutionStatus: status STORAGE_WRITE_LIMIT_REACHED of type Execution +Error: Failed to execute transaction. ExecutionStatus: ExecutionFailure { location: Script, function: 0, code_offset: 0 } diff --git a/aptos-move/aptos-vm/src/adapter_common.rs b/aptos-move/aptos-vm/src/adapter_common.rs index ed5338d116b29..35c6d2b94bd9d 100644 --- a/aptos-move/aptos-vm/src/adapter_common.rs +++ b/aptos-move/aptos-vm/src/adapter_common.rs @@ -15,13 +15,11 @@ use crate::{ logging::AdapterLogSchema, move_vm_ext::{MoveResolverExt, SessionExt, SessionId}, }; -use aptos_logger::prelude::*; use aptos_types::{ block_metadata::BlockMetadata, transaction::{Transaction, TransactionOutput, TransactionStatus, WriteSetPayload}, write_set::WriteSet, }; -use rayon::prelude::*; /// This trait describes the VM adapter's interface. /// TODO: bring more of the execution logic in aptos_vm into this file. @@ -133,76 +131,6 @@ pub(crate) fn validate_signature_checked_transaction( - adapter: &A, - transactions: Vec, - data_cache: &mut StateViewCache, -) -> Result, VMStatus> { - let mut result = vec![]; - let mut should_restart = false; - - info!( - AdapterLogSchema::new(data_cache.id(), 0), - "Executing block, transaction count: {}", - transactions.len() - ); - - let signature_verified_block: Vec; - { - // Verify the signatures of all the transactions in parallel. - // This is time consuming so don't wait and do the checking - // sequentially while executing the transactions. - signature_verified_block = transactions - .into_par_iter() - .map(preprocess_transaction::) - .collect(); - } - - for (idx, txn) in signature_verified_block.into_iter().enumerate() { - let log_context = AdapterLogSchema::new(data_cache.id(), idx); - if should_restart { - let txn_output = - TransactionOutput::new(WriteSet::default(), vec![], 0, TransactionStatus::Retry); - result.push((VMStatus::Error(StatusCode::UNKNOWN_STATUS), txn_output)); - debug!(log_context, "Retry after reconfiguration"); - continue; - }; - let (vm_status, output_ext, sender) = adapter.execute_single_transaction( - &txn, - &data_cache.as_move_resolver(), - &log_context, - )?; - - // Apply deltas. - let output = output_ext.into_transaction_output(&data_cache); - - if !output.status().is_discarded() { - data_cache.push_write_set(output.write_set()); - } else { - match sender { - Some(s) => trace!( - log_context, - "Transaction discarded, sender: {}, error: {:?}", - s, - vm_status, - ), - None => trace!(log_context, "Transaction malformed, error: {:?}", vm_status,), - } - } - - if A::should_restart_execution(&output) { - info!( - AdapterLogSchema::new(data_cache.id(), 0), - "Reconfiguration occurred: restart required", - ); - should_restart = true; - } - - result.push((vm_status, output)) - } - Ok(result) -} - /// Transactions after signature checking: /// Waypoints and BlockPrologues are not signed and are unaffected by signature checking, /// but a user transaction or writeset transaction is transformed to a SignatureCheckedTransaction. diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 5a13de33e880d..60e909a5e970d 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2,18 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - adapter_common, adapter_common::{ discard_error_output, discard_error_vm_status, validate_signature_checked_transaction, validate_signed_transaction, PreprocessedTransaction, VMAdapter, }, aptos_vm_impl::{get_transaction_output, AptosVMImpl, AptosVMInternals}, counters::*, - data_cache::{AsMoveResolver, IntoMoveResolver, StateViewCache}, + data_cache::{AsMoveResolver, IntoMoveResolver}, delta_state_view::DeltaStateView, errors::expect_only_successful_execution, logging::AdapterLogSchema, move_vm_ext::{MoveResolverExt, SessionExt, SessionId}, + parallel_executor::ParallelAptosVM, system_module_names::*, transaction_arg_validation, transaction_metadata::TransactionMetadata, @@ -912,21 +912,6 @@ impl AptosVM { Ok((VMStatus::Executed, output)) } - /// Alternate form of 'execute_block' that keeps the vm_status before it goes into the - /// `TransactionOutput` - pub fn execute_block_and_keep_vm_status( - transactions: Vec, - state_view: &impl StateView, - ) -> Result, VMStatus> { - let mut state_view_cache = StateViewCache::new(state_view); - let count = transactions.len(); - let vm = AptosVM::new(&state_view_cache); - let res = adapter_common::execute_block_impl(&vm, transactions, &mut state_view_cache)?; - // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); - Ok(res) - } - pub fn simulate_signed_transaction( txn: &SignedTransaction, state_view: &impl StateView, @@ -983,22 +968,10 @@ impl VMExecutor for AptosVM { )) }); - let concurrency_level = Self::get_concurrency_level(); - if concurrency_level > 1 { - let (result, err) = crate::parallel_executor::ParallelAptosVM::execute_block( - transactions, - state_view, - concurrency_level, - )?; - debug!("Parallel execution error {:?}", err); - Ok(result) - } else { - let output = Self::execute_block_and_keep_vm_status(transactions, state_view)?; - Ok(output - .into_iter() - .map(|(_vm_status, txn_output)| txn_output) - .collect()) - } + // Record the histogram count for transactions per block. + BLOCK_TRANSACTION_COUNT.observe(transactions.len() as f64); + + ParallelAptosVM::execute_block(transactions, state_view, Self::get_concurrency_level()) } } diff --git a/aptos-move/aptos-vm/src/data_cache.rs b/aptos-move/aptos-vm/src/data_cache.rs index 3f57f1e247dbd..fd1ecd72be515 100644 --- a/aptos-move/aptos-vm/src/data_cache.rs +++ b/aptos-move/aptos-vm/src/data_cache.rs @@ -10,11 +10,8 @@ use aptos_logger::prelude::*; use aptos_state_view::{StateView, StateViewId}; use aptos_types::state_store::state_storage_usage::StateStorageUsage; use aptos_types::{ - access_path::AccessPath, - on_chain_config::ConfigStorage, - state_store::state_key::StateKey, - vm_status::StatusCode, - write_set::{WriteOp, WriteSet}, + access_path::AccessPath, on_chain_config::ConfigStorage, state_store::state_key::StateKey, + vm_status::StatusCode, write_set::WriteOp, }; use fail::fail_point; use framework::natives::state_storage::StateStorageUsageResolver; @@ -26,6 +23,7 @@ use move_core_types::{ }; use move_table_extension::{TableHandle, TableResolver}; use std::{ + borrow::Cow, collections::btree_map::BTreeMap, ops::{Deref, DerefMut}, }; @@ -45,33 +43,23 @@ use std::{ /// track of incremental changes is vital to the consistency of the data store and the system. pub struct StateViewCache<'a, S> { data_view: &'a S, - data_map: BTreeMap>>, + data_map: Cow<'a, BTreeMap>, } impl<'a, S: StateView> StateViewCache<'a, S> { + pub fn from_map_ref(data_view: &'a S, data_map_ref: &'a BTreeMap) -> Self { + Self { + data_view, + data_map: Cow::Borrowed(data_map_ref), + } + } + /// Create a `StateViewCache` give a `StateView`. Hold updates to the data store and /// forward data request to the `StateView` if not in the local cache. pub fn new(data_view: &'a S) -> Self { StateViewCache { data_view, - data_map: BTreeMap::new(), - } - } - - // Publishes a `WriteSet` computed at the end of a transaction. - // The effect is to build a layer in front of the `StateView` which keeps - // track of the data as if the changes were applied immediately. - pub(crate) fn push_write_set(&mut self, write_set: &WriteSet) { - for (ap, write_op) in write_set.iter() { - match write_op { - WriteOp::Modification(blob) | WriteOp::Creation(blob) => { - self.data_map.insert(ap.clone(), Some(blob.clone())); - } - WriteOp::Deletion => { - self.data_map.remove(ap); - self.data_map.insert(ap.clone(), None); - } - } + data_map: Cow::Owned(BTreeMap::new()), } } } @@ -84,7 +72,10 @@ impl<'block, S: StateView> StateView for StateViewCache<'block, S> { ))); match self.data_map.get(state_key) { - Some(opt_data) => Ok(opt_data.clone()), + Some(write_op) => Ok(match write_op { + WriteOp::Modification(blob) | WriteOp::Creation(blob) => Some(blob.clone()), + WriteOp::Deletion => None, + }), None => match self.data_view.get_state_value(state_key) { Ok(remote_data) => Ok(remote_data), // TODO: should we forward some error info? diff --git a/aptos-move/aptos-vm/src/parallel_executor/mod.rs b/aptos-move/aptos-vm/src/parallel_executor/mod.rs index a5357719170ed..0953bdeca8efb 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/mod.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/mod.rs @@ -7,13 +7,15 @@ mod vm_wrapper; use crate::{ adapter_common::{preprocess_transaction, PreprocessedTransaction}, aptos_vm::AptosVM, + logging::AdapterLogSchema, parallel_executor::vm_wrapper::AptosVMWrapper, }; use aptos_aggregator::{delta_change_set::DeltaOp, transaction::TransactionOutputExt}; +use aptos_logger::{debug, info}; use aptos_parallel_executor::{ errors::Error, - executor::ParallelTransactionExecutor, - output_delta_resolver::ResolvedData, + executor::{ParallelTransactionExecutor, RAYON_EXEC_POOL}, + output_delta_resolver::{OutputDeltaResolver, ResolvedData}, task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput}, }; use aptos_state_view::StateView; @@ -82,61 +84,106 @@ impl PTransactionOutput for AptosTransactionOutput { pub struct ParallelAptosVM(); impl ParallelAptosVM { + fn process_parallel_block_output( + results: Vec, + delta_resolver: OutputDeltaResolver, + state_view: &S, + ) -> Vec { + // TODO: MVHashmap, and then delta resolver should track aggregator keys. + let mut aggregator_keys: HashMap> = HashMap::new(); + for res in results.iter() { + let output_ext = AptosTransactionOutput::as_ref(res); + for (key, _) in output_ext.delta_change_set().iter() { + if !aggregator_keys.contains_key(key) { + aggregator_keys.insert(key.clone(), state_view.get_state_value(key)); + } + } + } + + let materialized_deltas = + delta_resolver.resolve(aggregator_keys.into_iter().collect(), results.len()); + + results + .into_iter() + .zip(materialized_deltas.into_iter()) + .map(|(res, delta_writes)| { + let output_ext = AptosTransactionOutput::into(res); + output_ext.output_with_delta_writes(WriteSetMut::new(delta_writes)) + }) + .collect() + } + + fn process_sequential_block_output( + results: Vec, + ) -> Vec { + results + .into_iter() + .map(|res| { + let output_ext = AptosTransactionOutput::into(res); + let (deltas, output) = output_ext.into(); + debug_assert!(deltas.is_empty(), "[Execution] Deltas must be materialized"); + output + }) + .collect() + } + pub fn execute_block( transactions: Vec, state_view: &S, concurrency_level: usize, - ) -> Result<(Vec, Option>), VMStatus> { + ) -> Result, VMStatus> { // Verify the signatures of all the transactions in parallel. // This is time consuming so don't wait and do the checking // sequentially while executing the transactions. - let signature_verified_block: Vec = transactions - .par_iter() - .map(|txn| preprocess_transaction::(txn.clone())) - .collect(); - - match ParallelTransactionExecutor::>::new( - concurrency_level, - ) - .execute_transactions_parallel(state_view, signature_verified_block) - { - Ok((results, delta_resolver)) => { - // TODO: with more deltas, collect keys in parallel (in parallel executor). - let mut aggregator_keys: HashMap> = - HashMap::new(); - - for res in results.iter() { - let output_ext = AptosTransactionOutput::as_ref(res); - for (key, _) in output_ext.delta_change_set().iter() { - if !aggregator_keys.contains_key(key) { - aggregator_keys.insert(key.clone(), state_view.get_state_value(key)); - } - } - } + let signature_verified_block: Vec = + RAYON_EXEC_POOL.install(|| { + transactions + .par_iter() + .map(|txn| preprocess_transaction::(txn.clone())) + .collect() + }); + + let log_context = AdapterLogSchema::new(state_view.id(), 0); + info!( + log_context, + "Executing block, transaction count: {}", + transactions.len() + ); + + let executor = + ParallelTransactionExecutor::>::new( + concurrency_level, + ); + + let mut ret = if concurrency_level > 1 { + executor + .execute_transactions_parallel(state_view, &signature_verified_block) + .map(|(results, delta_resolver)| { + Self::process_parallel_block_output(results, delta_resolver, state_view) + }) + } else { + executor + .execute_transactions_sequential(state_view, &signature_verified_block) + .map(Self::process_sequential_block_output) + }; + + if ret == Err(Error::ModulePathReadWrite) { + debug!("[Execution]: Module read & written, sequential fallback"); + + ret = executor + .execute_transactions_sequential(state_view, &signature_verified_block) + .map(Self::process_sequential_block_output); + } - let materialized_deltas = - delta_resolver.resolve(aggregator_keys.into_iter().collect(), results.len()); - Ok(( - results - .into_iter() - .zip(materialized_deltas.into_iter()) - .map(|(res, delta_writes)| { - let output_ext = AptosTransactionOutput::into(res); - output_ext.output_with_delta_writes(WriteSetMut::new(delta_writes)) - }) - .collect(), - None, - )) - } - Err(err @ Error::ModulePathReadWrite) => { - let output = AptosVM::execute_block_and_keep_vm_status(transactions, state_view)?; - Ok(( - output - .into_iter() - .map(|(_vm_status, txn_output)| txn_output) - .collect(), - Some(err), - )) + RAYON_EXEC_POOL.spawn(move || { + // Explicit async drop. + drop(signature_verified_block); + }); + + match ret { + Ok(outputs) => Ok(outputs), + Err(Error::ModulePathReadWrite) => { + unreachable!("[Execution]: Must be handled by sequential fallback") } Err(Error::InvariantViolation) => Err(VMStatus::Error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, diff --git a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs index afabc379a2e3c..662ed6c59d4f8 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs @@ -4,10 +4,12 @@ use crate::{ adapter_common::{PreprocessedTransaction, VMAdapter}, aptos_vm::AptosVM, - data_cache::StorageAdapter, + data_cache::{AsMoveResolver, StateViewCache, StorageAdapter}, logging::AdapterLogSchema, + move_vm_ext::MoveResolverExt, parallel_executor::{storage_wrapper::VersionedView, AptosTransactionOutput}, }; +use aptos_aggregator::{delta_change_set::DeltaChangeSet, transaction::TransactionOutputExt}; use aptos_logger::prelude::*; use aptos_parallel_executor::{ executor::MVHashMapView, @@ -20,12 +22,54 @@ use move_core_types::{ language_storage::{ModuleId, CORE_CODE_ADDRESS}, vm_status::VMStatus, }; +use std::collections::btree_map::BTreeMap; pub(crate) struct AptosVMWrapper<'a, S> { vm: AptosVM, base_view: &'a S, } +fn execute_transaction( + vm: &AptosVM, + txn: &PreprocessedTransaction, + view: S, + log_context: AdapterLogSchema, + materialize_deltas: bool, +) -> ExecutionStatus { + match vm.execute_single_transaction(txn, &view, &log_context) { + Ok((vm_status, mut output_ext, sender)) => { + if materialize_deltas { + // Keep TransactionOutputExt type for wrapper. + output_ext = TransactionOutputExt::new( + DeltaChangeSet::empty(), // Cleared deltas. + output_ext.into_transaction_output(&view), // Materialize. + ); + } + + if output_ext.txn_output().status().is_discarded() { + match sender { + Some(s) => trace!( + log_context, + "Transaction discarded, sender: {}, error: {:?}", + s, + vm_status, + ), + None => { + trace!(log_context, "Transaction malformed, error: {:?}", vm_status,) + } + }; + } + if AptosVM::should_restart_execution(output_ext.txn_output()) { + info!(log_context, "Reconfiguration occurred: restart required",); + ExecutionStatus::SkipRest(AptosTransactionOutput::new(output_ext)) + } else { + ExecutionStatus::Success(AptosTransactionOutput::new(output_ext)) + } + } + Err(err) => ExecutionStatus::Abort(err), + } +} + impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { type T = PreprocessedTransaction; type Output = AptosTransactionOutput; @@ -54,39 +98,33 @@ impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { } } - fn execute_transaction( + fn execute_transaction_btree_view( &self, - view: &MVHashMapView, + view: &BTreeMap, txn: &PreprocessedTransaction, + txn_idx: usize, ) -> ExecutionStatus { - let log_context = AdapterLogSchema::new(self.base_view.id(), view.txn_idx()); - let versioned_view = VersionedView::new_view(self.base_view, view); + let state_cache_view = StateViewCache::from_map_ref(self.base_view, view); + execute_transaction( + &self.vm, + txn, + state_cache_view.as_move_resolver(), + AdapterLogSchema::new(self.base_view.id(), txn_idx), + true, + ) + } - match self - .vm - .execute_single_transaction(txn, &versioned_view, &log_context) - { - Ok((vm_status, output_ext, sender)) => { - if output_ext.txn_output().status().is_discarded() { - match sender { - Some(s) => trace!( - log_context, - "Transaction discarded, sender: {}, error: {:?}", - s, - vm_status, - ), - None => { - trace!(log_context, "Transaction malformed, error: {:?}", vm_status,) - } - }; - } - if AptosVM::should_restart_execution(output_ext.txn_output()) { - ExecutionStatus::SkipRest(AptosTransactionOutput::new(output_ext)) - } else { - ExecutionStatus::Success(AptosTransactionOutput::new(output_ext)) - } - } - Err(err) => ExecutionStatus::Abort(err), - } + fn execute_transaction_mvhashmap_view( + &self, + view: &MVHashMapView, + txn: &PreprocessedTransaction, + ) -> ExecutionStatus { + execute_transaction( + &self.vm, + txn, + VersionedView::new_view(self.base_view, view), + AdapterLogSchema::new(self.base_view.id(), view.txn_idx()), + false, + ) } } diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index e08d4077112e5..4eaffe69085d5 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -354,21 +354,6 @@ impl FakeExecutor { ) } - /// Alternate form of 'execute_block' that keeps the vm_status before it goes into the - /// `TransactionOutput` - pub fn execute_block_and_keep_vm_status( - &self, - txn_block: Vec, - ) -> Result, VMStatus> { - AptosVM::execute_block_and_keep_vm_status( - txn_block - .into_iter() - .map(Transaction::UserTransaction) - .collect(), - &self.data_store, - ) - } - /// Executes the transaction as a singleton block and applies the resulting write set to the /// data store. Panics if execution fails pub fn execute_and_apply(&mut self, transaction: SignedTransaction) -> TransactionOutput { @@ -395,13 +380,7 @@ impl FakeExecutor { &self, txn_block: Vec, ) -> Result, VMStatus> { - let (result, _) = ParallelAptosVM::execute_block( - txn_block, - &self.data_store, - usize::min(4, num_cpus::get()), - )?; - - Ok(result) + ParallelAptosVM::execute_block(txn_block, &self.data_store, usize::min(4, num_cpus::get())) } pub fn execute_transaction_block( diff --git a/aptos-move/parallel-executor/src/executor.rs b/aptos-move/parallel-executor/src/executor.rs index 0db577570bedf..25bb72dd4ce18 100644 --- a/aptos-move/parallel-executor/src/executor.rs +++ b/aptos-move/parallel-executor/src/executor.rs @@ -15,9 +15,9 @@ use aptos_types::write_set::TransactionWrite; use mvhashmap::{MVHashMap, MVHashMapError, MVHashMapOutput}; use num_cpus; use once_cell::sync::Lazy; -use std::{hash::Hash, marker::PhantomData, sync::Arc, thread::spawn}; +use std::{collections::btree_map::BTreeMap, hash::Hash, marker::PhantomData, sync::Arc}; -static RAYON_EXEC_POOL: Lazy = Lazy::new(|| { +pub static RAYON_EXEC_POOL: Lazy = Lazy::new(|| { rayon::ThreadPoolBuilder::new() .num_threads(num_cpus::get()) .thread_name(|index| format!("par_exec_{}", index)) @@ -55,7 +55,7 @@ pub enum ReadResult { impl< 'a, - K: ModulePath + PartialOrd + Send + Clone + Hash + Eq, + K: ModulePath + PartialOrd + Ord + Send + Clone + Hash + Eq, V: TransactionWrite + Send + Sync, > MVHashMapView<'a, K, V> { @@ -105,7 +105,7 @@ impl< // `self.txn_idx` estimated to depend on a write from `dep_idx`. match self.scheduler.wait_for_dependency(self.txn_idx, dep_idx) { Some(dep_condition) => { - // Wait on a condition variable correpsonding to the encountered + // Wait on a condition variable corresponding to the encountered // read dependency. Once the dep_idx finishes re-execution, scheduler // will mark the dependency as resolved, and then the txn_idx will be // scheduled for re-execution, which will re-awaken cvar here. @@ -163,8 +163,8 @@ where /// be handled by sequential execution) and that concurrency_level <= num_cpus. pub fn new(concurrency_level: usize) -> Self { assert!( - concurrency_level > 1 && concurrency_level <= num_cpus::get(), - "Parallel execution concurrency level {} should be between 2 and number of CPUs", + concurrency_level > 0 && concurrency_level <= num_cpus::get(), + "Parallel execution concurrency level {} should be between 1 and number of CPUs", concurrency_level ); Self { @@ -198,7 +198,7 @@ where }; // VM execution. - let execute_result = executor.execute_transaction(&state_view, txn); + let execute_result = executor.execute_transaction_mvhashmap_view(&state_view, txn); let mut prev_modified_keys = last_input_output.modified_keys(idx_to_execute); // For tracking whether the recent execution wrote outside of the previous write/delta set. @@ -359,7 +359,7 @@ where pub fn execute_transactions_parallel( &self, executor_initial_arguments: E::Argument, - signature_verified_block: Vec, + signature_verified_block: &Vec, ) -> Result< ( Vec, @@ -367,6 +367,8 @@ where ), E::Error, > { + assert!(self.concurrency_level > 1, "Must use sequential execution"); + let versioned_data_cache = MVHashMap::new(); if signature_verified_block.is_empty() { @@ -382,7 +384,7 @@ where s.spawn(|_| { self.work_task_with_scope( &executor_initial_arguments, - &signature_verified_block, + signature_verified_block, &last_input_output, &versioned_data_cache, &scheduler, @@ -416,10 +418,9 @@ where ret }; - spawn(move || { + RAYON_EXEC_POOL.spawn(move || { // Explicit async drops. drop(last_input_output); - drop(signature_verified_block); drop(scheduler); }); @@ -434,4 +435,48 @@ where } } } + + pub fn execute_transactions_sequential( + &self, + executor_arguments: E::Argument, + signature_verified_block: &Vec, + ) -> Result, E::Error> { + let num_txns = signature_verified_block.len(); + let executor = E::init(executor_arguments); + let mut data_map = BTreeMap::new(); + + let mut ret = Vec::with_capacity(num_txns); + for (idx, txn) in signature_verified_block.iter().enumerate() { + // this call internally materializes deltas. + let res = executor.execute_transaction_btree_view(&data_map, txn, idx); + + let must_skip = matches!(res, ExecutionStatus::SkipRest(_)); + + match res { + ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => { + assert_eq!( + output.get_deltas().len(), + 0, + "Sequential execution must materialize deltas" + ); + // Apply the writes. + for (ap, write_op) in output.get_writes().into_iter() { + data_map.insert(ap, write_op); + } + ret.push(output); + } + ExecutionStatus::Abort(err) => { + // Record the status indicating abort. + return Err(Error::UserError(err)); + } + } + + if must_skip { + break; + } + } + + ret.resize_with(num_txns, E::Output::skip_output); + Ok(ret) + } } diff --git a/aptos-move/parallel-executor/src/proptest_types/bencher.rs b/aptos-move/parallel-executor/src/proptest_types/bencher.rs index 3a608bca340f6..4793401566665 100644 --- a/aptos-move/parallel-executor/src/proptest_types/bencher.rs +++ b/aptos-move/parallel-executor/src/proptest_types/bencher.rs @@ -27,8 +27,10 @@ pub struct Bencher { phantom_value: PhantomData, } -pub(crate) struct BencherState -where +pub(crate) struct BencherState< + K: Hash + Clone + Debug + Eq + PartialOrd + Ord, + V: Clone + Eq + Arbitrary, +> where Vec: From, { transactions: Vec, ValueType>>, @@ -112,7 +114,7 @@ where Transaction, ValueType>, Task, ValueType>, >::new(num_cpus::get()) - .execute_transactions_parallel((), self.transactions.clone()) + .execute_transactions_parallel((), &self.transactions) .map(|(res, _)| res); self.expected_output.assert_output(&output, None); diff --git a/aptos-move/parallel-executor/src/proptest_types/tests.rs b/aptos-move/parallel-executor/src/proptest_types/tests.rs index d164fc287d2ed..83dca8df4292a 100644 --- a/aptos-move/parallel-executor/src/proptest_types/tests.rs +++ b/aptos-move/parallel-executor/src/proptest_types/tests.rs @@ -50,7 +50,7 @@ fn run_transactions( Transaction, ValueType>, Task, ValueType>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); if module_access.0 && module_access.1 { @@ -170,7 +170,7 @@ fn deltas_writes_mixed() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); let baseline = ExpectedOutput::generate_baseline(&transactions, None); @@ -206,7 +206,7 @@ fn deltas_resolver() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()); + .execute_transactions_parallel((), &transactions); let (output, delta_resolver) = output.unwrap(); // Should not be possible to overflow or underflow, as each delta is at @@ -351,7 +351,7 @@ fn publishing_fixed_params() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()); + .execute_transactions_parallel((), &transactions); assert_ok!(output); // Adjust the reads of txn indices[2] to contain module read to key 42. @@ -386,7 +386,7 @@ fn publishing_fixed_params() { Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); assert_eq!(output.unwrap_err(), Error::ModulePathReadWrite); diff --git a/aptos-move/parallel-executor/src/proptest_types/types.rs b/aptos-move/parallel-executor/src/proptest_types/types.rs index 6b964475fa5b6..c5cdd8ba526e5 100644 --- a/aptos-move/parallel-executor/src/proptest_types/types.rs +++ b/aptos-move/parallel-executor/src/proptest_types/types.rs @@ -22,7 +22,7 @@ use proptest::{arbitrary::Arbitrary, collection::vec, prelude::*, proptest, samp use proptest_derive::Arbitrary; use std::collections::hash_map::DefaultHasher; use std::{ - collections::{BTreeSet, HashMap}, + collections::{btree_map::BTreeMap, BTreeSet, HashMap}, convert::TryInto, fmt::Debug, hash::{Hash, Hasher}, @@ -40,8 +40,8 @@ const STORAGE_DELTA_VAL: u128 = 100; // Generation of transactions /////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Copy, Hash, Debug, PartialEq, PartialOrd, Eq)] -pub struct KeyType( +#[derive(Clone, Copy, Hash, Debug, PartialEq, PartialOrd, Ord, Eq)] +pub struct KeyType( /// Wrapping the types used for testing to add ModulePath trait implementation (below). pub K, /// The bool field determines for testing purposes, whether the key will be interpreted @@ -51,7 +51,7 @@ pub struct KeyType( pub bool, ); -impl ModulePath for KeyType { +impl ModulePath for KeyType { fn module_path(&self) -> Option { // Since K is generic, use its hash to assign addresses. let mut hasher = DefaultHasher::new(); @@ -320,7 +320,7 @@ impl> + Arbitrary + Clone + Debug + Eq + Sync + Send> Transactio impl TransactionType for Transaction where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type Key = K; @@ -341,7 +341,7 @@ impl Task { impl ExecutorTask for Task where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type T = Transaction; @@ -353,7 +353,17 @@ where Self::new() } - fn execute_transaction( + fn execute_transaction_btree_view( + &self, + _view: &BTreeMap, + _txn: &Self::T, + _txn_idx: usize, + ) -> ExecutionStatus { + // Separate PR to proptest sequential execution flow. + unreachable!(); + } + + fn execute_transaction_mvhashmap_view( &self, view: &MVHashMapView, txn: &Self::T, @@ -394,7 +404,7 @@ pub struct Output(Vec<(K, V)>, Vec<(K, DeltaOp)>, Vec>); impl TransactionOutput for Output where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type T = Transaction; diff --git a/aptos-move/parallel-executor/src/task.rs b/aptos-move/parallel-executor/src/task.rs index fc7df3143b092..d75e2e8b1a107 100644 --- a/aptos-move/parallel-executor/src/task.rs +++ b/aptos-move/parallel-executor/src/task.rs @@ -6,7 +6,7 @@ use aptos_aggregator::delta_change_set::DeltaOp; use aptos_types::{ access_path::AccessPath, state_store::state_key::StateKey, write_set::TransactionWrite, }; -use std::{fmt::Debug, hash::Hash}; +use std::{collections::btree_map::BTreeMap, fmt::Debug, hash::Hash}; /// The execution result of a transaction #[derive(Debug)] @@ -39,7 +39,7 @@ impl ModulePath for StateKey { /// Trait that defines a transaction that could be parallel executed by the scheduler. Each /// transaction will write to a key value storage as their side effect. pub trait Transaction: Sync + Send + 'static { - type Key: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath; + type Key: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath; type Value: Send + Sync + TransactionWrite; } @@ -68,8 +68,16 @@ pub trait ExecutorTask: Sync { /// Create an instance of the transaction executor. fn init(args: Self::Argument) -> Self; + /// Execute one single transaction given the view of the current state as a BTreeMap, + fn execute_transaction_btree_view( + &self, + view: &BTreeMap<::Key, ::Value>, + txn: &Self::T, + txn_idx: usize, + ) -> ExecutionStatus; + /// Execute one single transaction given the view of the current state. - fn execute_transaction( + fn execute_transaction_mvhashmap_view( &self, view: &MVHashMapView<::Key, ::Value>, txn: &Self::T, diff --git a/aptos-move/parallel-executor/src/unit_tests/mod.rs b/aptos-move/parallel-executor/src/unit_tests/mod.rs index 2b282cf38f7b8..97d1b41951614 100644 --- a/aptos-move/parallel-executor/src/unit_tests/mod.rs +++ b/aptos-move/parallel-executor/src/unit_tests/mod.rs @@ -18,11 +18,11 @@ use std::{ fn run_and_assert(transactions: Vec>) where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + Eq + TransactionWrite + 'static, { let output = ParallelTransactionExecutor::, Task>::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); let baseline = ExpectedOutput::generate_baseline(&transactions, None);