From feec33fe9436a219346b5b03fbe5d32fc6200ef8 Mon Sep 17 00:00:00 2001 From: Rati Gelashvili Date: Tue, 29 Nov 2022 11:57:15 -0800 Subject: [PATCH] [Executor] Merge sequential & parallel execution flow (#4683) * Merge sequential and parallel flows * rename parallel to block --- CODEOWNERS | 2 +- Cargo.lock | 58 +++--- Cargo.toml | 4 +- .../aptos-aggregator/src/transaction.rs | 1 + .../src/transactions.rs | 8 +- .../src/aptos_test_harness.rs | 15 +- .../aptos_test_harness/call_function.exp | 4 +- .../aptos_test_harness/execute_script.exp | 2 +- .../aptos_test_harness/publish_module.exp | 2 +- .../write_set_too_large.exp | 2 +- aptos-move/aptos-vm/Cargo.toml | 2 +- aptos-move/aptos-vm/src/adapter_common.rs | 72 ------- aptos-move/aptos-vm/src/aptos_vm.rs | 48 ++--- aptos-move/aptos-vm/src/block_executor/mod.rs | 186 ++++++++++++++++++ .../storage_wrapper.rs | 2 +- .../aptos-vm/src/block_executor/vm_wrapper.rs | 133 +++++++++++++ aptos-move/aptos-vm/src/data_cache.rs | 45 ++--- aptos-move/aptos-vm/src/lib.rs | 2 +- .../aptos-vm/src/parallel_executor/mod.rs | 147 -------------- .../src/parallel_executor/vm_wrapper.rs | 92 --------- .../Cargo.toml | 4 +- .../benches/scheduler_benches.rs | 0 .../src/counters.rs | 0 .../src/errors.rs | 3 - .../src/executor.rs | 71 +++++-- .../src/lib.rs | 0 .../src/output_delta_resolver.rs | 0 .../src/proptest_types/bencher.rs | 12 +- .../src/proptest_types/mod.rs | 0 .../src/proptest_types/tests.rs | 22 +-- .../src/proptest_types/types.rs | 26 ++- .../src/scheduler.rs | 0 .../src/task.rs | 14 +- .../src/txn_last_input_output.rs | 0 .../src/unit_tests/mod.rs | 8 +- aptos-move/e2e-move-tests/Cargo.toml | 2 +- aptos-move/e2e-tests/src/executor.rs | 25 +-- aptos-move/e2e-testsuite/Cargo.toml | 2 +- 38 files changed, 519 insertions(+), 497 deletions(-) create mode 100644 aptos-move/aptos-vm/src/block_executor/mod.rs rename aptos-move/aptos-vm/src/{parallel_executor => block_executor}/storage_wrapper.rs (97%) create mode 100644 aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs delete mode 100644 aptos-move/aptos-vm/src/parallel_executor/mod.rs delete mode 100644 aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs rename aptos-move/{parallel-executor => block-executor}/Cargo.toml (91%) rename aptos-move/{parallel-executor => block-executor}/benches/scheduler_benches.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/counters.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/errors.rs (83%) rename aptos-move/{parallel-executor => block-executor}/src/executor.rs (87%) rename aptos-move/{parallel-executor => block-executor}/src/lib.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/output_delta_resolver.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/proptest_types/bencher.rs (92%) rename aptos-move/{parallel-executor => block-executor}/src/proptest_types/mod.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/proptest_types/tests.rs (95%) rename aptos-move/{parallel-executor => block-executor}/src/proptest_types/types.rs (96%) rename aptos-move/{parallel-executor => block-executor}/src/scheduler.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/task.rs (85%) rename aptos-move/{parallel-executor => block-executor}/src/txn_last_input_output.rs (100%) rename aptos-move/{parallel-executor => block-executor}/src/unit_tests/mod.rs (98%) diff --git a/CODEOWNERS b/CODEOWNERS index de7025f1a8d9a..72c2c6aae0b59 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -21,7 +21,7 @@ /aptos-move/framework/aptos-token @areshand /aptos-move/framework/src/natives/cryptography/ @alinush /aptos-move/framework/src/natives/aggregator_natives/ @georgemitenkov @gelash @zekun000 -/aptos-move/parallel-executor/ @gelash @zekun000 +/aptos-move/block-executor/ @gelash @zekun000 @sasha8 /aptos-move/vm-genesis/ @davidiw @movekevin # Owner for logger config diff --git a/Cargo.lock b/Cargo.lock index cf0e95d57834f..3268c71844fd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -337,6 +337,32 @@ dependencies = [ "serde_json", ] +[[package]] +name = "aptos-block-executor" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-aggregator", + "aptos-infallible", + "aptos-metrics-core", + "aptos-state-view", + "aptos-types", + "arc-swap", + "bcs 0.1.3 (git+https://github.com/aptos-labs/bcs?rev=2cde3e8446c460cb17b0c1d6bac7e27e964ac169)", + "claims", + "criterion", + "crossbeam", + "crossbeam-queue", + "dashmap", + "mvhashmap", + "num_cpus", + "once_cell", + "proptest", + "proptest-derive", + "rand 0.7.3", + "rayon", +] + [[package]] name = "aptos-build-info" version = "0.1.0" @@ -1007,32 +1033,6 @@ dependencies = [ "storage-interface", ] -[[package]] -name = "aptos-parallel-executor" -version = "0.1.0" -dependencies = [ - "anyhow", - "aptos-aggregator", - "aptos-infallible", - "aptos-metrics-core", - "aptos-state-view", - "aptos-types", - "arc-swap", - "bcs 0.1.3 (git+https://github.com/aptos-labs/bcs?rev=2cde3e8446c460cb17b0c1d6bac7e27e964ac169)", - "claims", - "criterion", - "crossbeam", - "crossbeam-queue", - "dashmap", - "mvhashmap", - "num_cpus", - "once_cell", - "proptest", - "proptest-derive", - "rand 0.7.3", - "rayon", -] - [[package]] name = "aptos-proptest-helpers" version = "0.1.0" @@ -1502,13 +1502,13 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-aggregator", + "aptos-block-executor", "aptos-crypto", "aptos-crypto-derive", "aptos-gas", "aptos-logger", "aptos-metrics-core", "aptos-module-verifier", - "aptos-parallel-executor", "aptos-state-view", "aptos-types", "bcs 0.1.3 (git+https://github.com/aptos-labs/bcs?rev=2cde3e8446c460cb17b0c1d6bac7e27e964ac169)", @@ -3470,11 +3470,11 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos", + "aptos-block-executor", "aptos-crypto", "aptos-gas", "aptos-keygen", "aptos-logger", - "aptos-parallel-executor", "aptos-state-view", "aptos-types", "aptos-vm", @@ -5156,11 +5156,11 @@ dependencies = [ name = "language-e2e-testsuite" version = "0.1.0" dependencies = [ + "aptos-block-executor", "aptos-crypto", "aptos-gas", "aptos-keygen", "aptos-logger", - "aptos-parallel-executor", "aptos-state-view", "aptos-types", "aptos-vm", diff --git a/Cargo.toml b/Cargo.toml index 233ffc9bf51be..5cb16f6623140 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "aptos-move/aptos-transactional-test-harness", "aptos-move/aptos-validator-interface", "aptos-move/aptos-vm", + "aptos-move/block-executor", "aptos-move/e2e-move-tests", "aptos-move/e2e-tests", "aptos-move/e2e-testsuite", @@ -26,7 +27,6 @@ members = [ "aptos-move/move-examples", "aptos-move/mvhashmap", "aptos-move/package-builder", - "aptos-move/parallel-executor", "aptos-move/vm-genesis", "aptos-move/writeset-transaction-generator", "aptos-node", @@ -173,6 +173,7 @@ aptos-aggregator = { path = "aptos-move/aptos-aggregator" } aptos-api = { path = "api" } aptos-api-test-context = { path = "api/test-context" } aptos-api-types = { path = "api/types" } +aptos-block-executor = { path = "aptos-move/block-executor" } aptos-bitvec = { path = "crates/aptos-bitvec" } aptos-build-info = { path = "crates/aptos-build-info" } aptos-compression = { path = "crates/aptos-compression" } @@ -203,7 +204,6 @@ aptos-network-checker = { path = "crates/aptos-network-checker" } aptos-node = { path = "aptos-node" } aptos-node-checker = { path = "ecosystem/node-checker" } aptos-openapi = { path = "crates/aptos-openapi" } -aptos-parallel-executor = { path = "aptos-move/parallel-executor" } aptos-proptest-helpers = { path = "crates/aptos-proptest-helpers" } aptos-protos = { path = "crates/aptos-protos" } aptos-push-metrics = { path = "crates/aptos-push-metrics" } diff --git a/aptos-move/aptos-aggregator/src/transaction.rs b/aptos-move/aptos-aggregator/src/transaction.rs index 9b30f81680bdc..03112418e45ac 100644 --- a/aptos-move/aptos-aggregator/src/transaction.rs +++ b/aptos-move/aptos-aggregator/src/transaction.rs @@ -184,6 +184,7 @@ impl TransactionOutputExt { &self.output } + // TODO: rename to unpack() and consider other into()'s in the crate. pub fn into(self) -> (DeltaChangeSet, TransactionOutput) { (self.delta_change_set, self.output) } diff --git a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs index 059211a87f228..ec8baba4c9b0a 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs @@ -8,9 +8,7 @@ use aptos_types::{ on_chain_config::{OnChainConfig, ValidatorSet}, transaction::Transaction, }; -use aptos_vm::{ - data_cache::AsMoveResolver, parallel_executor::ParallelAptosVM, AptosVM, VMExecutor, -}; +use aptos_vm::{block_executor::BlockAptosVM, data_cache::AsMoveResolver}; use criterion::{measurement::Measurement, BatchSize, Bencher}; use language_e2e_tests::{ account_universe::{log_balance_strategy, AUTransactionGen, AccountUniverseGen}, @@ -188,7 +186,7 @@ impl TransactionBenchState { fn execute(self) { // The output is ignored here since we're just testing transaction performance, not trying // to assert correctness. - AptosVM::execute_block(self.transactions, self.executor.get_state_view()) + BlockAptosVM::execute_block(self.transactions, self.executor.get_state_view(), 1) .expect("VM should not fail to start"); } @@ -196,7 +194,7 @@ impl TransactionBenchState { fn execute_parallel(self) { // The output is ignored here since we're just testing transaction performance, not trying // to assert correctness. - ParallelAptosVM::execute_block( + BlockAptosVM::execute_block( self.transactions, self.executor.get_state_view(), num_cpus::get(), diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index 0dd044487fb94..bb7dbac8f79df 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -25,7 +25,7 @@ use aptos_types::{ }; use aptos_vm::{ data_cache::{AsMoveResolver, IntoMoveResolver, StorageAdapterOwned}, - AptosVM, + AptosVM, VMExecutor, }; use clap::StructOpt; use language_e2e_tests::data_store::{FakeDataStore, GENESIS_CHANGE_SET_HEAD}; @@ -472,23 +472,26 @@ impl<'a> AptosTestAdapter<'a> { /// Should error if the transaction ends up being discarded, or having a status other than /// EXECUTED. fn run_transaction(&mut self, txn: Transaction) -> Result { - let mut outputs = AptosVM::execute_block_and_keep_vm_status(vec![txn], &self.storage)?; + let mut outputs = AptosVM::execute_block(vec![txn], &self.storage)?; assert_eq!(outputs.len(), 1); - let (status, output) = outputs.pop().unwrap(); + let output = outputs.pop().unwrap(); match output.status() { TransactionStatus::Keep(kept_vm_status) => { self.storage.add_write_set(output.write_set()); match kept_vm_status { ExecutionStatus::Success => Ok(output), _ => { - bail!("Failed to execute transaction. ExecutionStatus: {}", status) + bail!( + "Failed to execute transaction. ExecutionStatus: {:?}", + kept_vm_status + ) } } } - TransactionStatus::Discard(_) => { - bail!("Transaction discarded. VMStatus: {}", status) + TransactionStatus::Discard(status_code) => { + bail!("Transaction discarded. VM status code: {:?}", status_code) } TransactionStatus::Retry => panic!(), } diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp index 9d293cca30eb5..a06f83af2ac9b 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/call_function.exp @@ -1,10 +1,10 @@ processed 4 tasks task 1 'publish'. lines 4-30: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 2 'run'. lines 33-33: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 3 'view'. lines 35-35: [No Resource Exists] diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp index 919e07af2bf8b..b246fa070b510 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/execute_script.exp @@ -1,7 +1,7 @@ processed 3 tasks task 1 'run'. lines 5-13: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS task 2 'view'. lines 15-15: key 0x1::coin::CoinStore<0x1::aptos_coin::AptosCoin> { diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp index 216a415f529ca..5a8a9d468c39d 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/publish_module.exp @@ -1,4 +1,4 @@ processed 2 tasks task 1 'publish'. lines 4-9: -Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation +Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS diff --git a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp index 8bf428496daaa..82b73cc2a8810 100644 --- a/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp +++ b/aptos-move/aptos-transactional-test-harness/tests/aptos_test_harness/write_set_too_large.exp @@ -1,4 +1,4 @@ processed 3 tasks task 2 'run'. lines 29-29: -Error: Failed to execute transaction. ExecutionStatus: status STORAGE_WRITE_LIMIT_REACHED of type Execution +Error: Failed to execute transaction. ExecutionStatus: ExecutionFailure { location: Script, function: 0, code_offset: 0 } diff --git a/aptos-move/aptos-vm/Cargo.toml b/aptos-move/aptos-vm/Cargo.toml index 0f03eafc268be..760111e37fe42 100644 --- a/aptos-move/aptos-vm/Cargo.toml +++ b/aptos-move/aptos-vm/Cargo.toml @@ -15,13 +15,13 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-aggregator = { workspace = true } +aptos-block-executor = { workspace = true } aptos-crypto = { workspace = true } aptos-crypto-derive = { workspace = true } aptos-gas = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } aptos-module-verifier = { workspace = true } -aptos-parallel-executor = { workspace = true } aptos-state-view = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } diff --git a/aptos-move/aptos-vm/src/adapter_common.rs b/aptos-move/aptos-vm/src/adapter_common.rs index ed5338d116b29..35c6d2b94bd9d 100644 --- a/aptos-move/aptos-vm/src/adapter_common.rs +++ b/aptos-move/aptos-vm/src/adapter_common.rs @@ -15,13 +15,11 @@ use crate::{ logging::AdapterLogSchema, move_vm_ext::{MoveResolverExt, SessionExt, SessionId}, }; -use aptos_logger::prelude::*; use aptos_types::{ block_metadata::BlockMetadata, transaction::{Transaction, TransactionOutput, TransactionStatus, WriteSetPayload}, write_set::WriteSet, }; -use rayon::prelude::*; /// This trait describes the VM adapter's interface. /// TODO: bring more of the execution logic in aptos_vm into this file. @@ -133,76 +131,6 @@ pub(crate) fn validate_signature_checked_transaction( - adapter: &A, - transactions: Vec, - data_cache: &mut StateViewCache, -) -> Result, VMStatus> { - let mut result = vec![]; - let mut should_restart = false; - - info!( - AdapterLogSchema::new(data_cache.id(), 0), - "Executing block, transaction count: {}", - transactions.len() - ); - - let signature_verified_block: Vec; - { - // Verify the signatures of all the transactions in parallel. - // This is time consuming so don't wait and do the checking - // sequentially while executing the transactions. - signature_verified_block = transactions - .into_par_iter() - .map(preprocess_transaction::) - .collect(); - } - - for (idx, txn) in signature_verified_block.into_iter().enumerate() { - let log_context = AdapterLogSchema::new(data_cache.id(), idx); - if should_restart { - let txn_output = - TransactionOutput::new(WriteSet::default(), vec![], 0, TransactionStatus::Retry); - result.push((VMStatus::Error(StatusCode::UNKNOWN_STATUS), txn_output)); - debug!(log_context, "Retry after reconfiguration"); - continue; - }; - let (vm_status, output_ext, sender) = adapter.execute_single_transaction( - &txn, - &data_cache.as_move_resolver(), - &log_context, - )?; - - // Apply deltas. - let output = output_ext.into_transaction_output(&data_cache); - - if !output.status().is_discarded() { - data_cache.push_write_set(output.write_set()); - } else { - match sender { - Some(s) => trace!( - log_context, - "Transaction discarded, sender: {}, error: {:?}", - s, - vm_status, - ), - None => trace!(log_context, "Transaction malformed, error: {:?}", vm_status,), - } - } - - if A::should_restart_execution(&output) { - info!( - AdapterLogSchema::new(data_cache.id(), 0), - "Reconfiguration occurred: restart required", - ); - should_restart = true; - } - - result.push((vm_status, output)) - } - Ok(result) -} - /// Transactions after signature checking: /// Waypoints and BlockPrologues are not signed and are unaffected by signature checking, /// but a user transaction or writeset transaction is transformed to a SignatureCheckedTransaction. diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 5a13de33e880d..2e0ab80a2fd8f 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2,14 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - adapter_common, adapter_common::{ discard_error_output, discard_error_vm_status, validate_signature_checked_transaction, validate_signed_transaction, PreprocessedTransaction, VMAdapter, }, aptos_vm_impl::{get_transaction_output, AptosVMImpl, AptosVMInternals}, + block_executor::BlockAptosVM, counters::*, - data_cache::{AsMoveResolver, IntoMoveResolver, StateViewCache}, + data_cache::{AsMoveResolver, IntoMoveResolver}, delta_state_view::DeltaStateView, errors::expect_only_successful_execution, logging::AdapterLogSchema, @@ -912,21 +912,6 @@ impl AptosVM { Ok((VMStatus::Executed, output)) } - /// Alternate form of 'execute_block' that keeps the vm_status before it goes into the - /// `TransactionOutput` - pub fn execute_block_and_keep_vm_status( - transactions: Vec, - state_view: &impl StateView, - ) -> Result, VMStatus> { - let mut state_view_cache = StateViewCache::new(state_view); - let count = transactions.len(); - let vm = AptosVM::new(&state_view_cache); - let res = adapter_common::execute_block_impl(&vm, transactions, &mut state_view_cache)?; - // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); - Ok(res) - } - pub fn simulate_signed_transaction( txn: &SignedTransaction, state_view: &impl StateView, @@ -983,22 +968,21 @@ impl VMExecutor for AptosVM { )) }); - let concurrency_level = Self::get_concurrency_level(); - if concurrency_level > 1 { - let (result, err) = crate::parallel_executor::ParallelAptosVM::execute_block( - transactions, - state_view, - concurrency_level, - )?; - debug!("Parallel execution error {:?}", err); - Ok(result) - } else { - let output = Self::execute_block_and_keep_vm_status(transactions, state_view)?; - Ok(output - .into_iter() - .map(|(_vm_status, txn_output)| txn_output) - .collect()) + let log_context = AdapterLogSchema::new(state_view.id(), 0); + info!( + log_context, + "Executing block, transaction count: {}", + transactions.len() + ); + + let count = transactions.len(); + let ret = + BlockAptosVM::execute_block(transactions, state_view, Self::get_concurrency_level()); + if ret.is_ok() { + // Record the histogram count for transactions per block. + BLOCK_TRANSACTION_COUNT.observe(count as f64); } + ret } } diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs new file mode 100644 index 0000000000000..2d1fca8c98f24 --- /dev/null +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -0,0 +1,186 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +mod storage_wrapper; +pub(crate) mod vm_wrapper; + +use crate::{ + adapter_common::{preprocess_transaction, PreprocessedTransaction}, + block_executor::vm_wrapper::AptosExecutorTask, + AptosVM, +}; +use aptos_aggregator::{delta_change_set::DeltaOp, transaction::TransactionOutputExt}; +use aptos_block_executor::{ + errors::Error, + executor::{BlockExecutor, RAYON_EXEC_POOL}, + output_delta_resolver::{OutputDeltaResolver, ResolvedData}, + task::{ + Transaction as BlockExecutorTransaction, + TransactionOutput as BlockExecutorTransactionOutput, + }, +}; +use aptos_logger::debug; +use aptos_state_view::StateView; +use aptos_types::{ + state_store::state_key::StateKey, + transaction::{Transaction, TransactionOutput, TransactionStatus}, + write_set::{WriteOp, WriteSet, WriteSetMut}, +}; +use move_core_types::vm_status::VMStatus; +use rayon::prelude::*; +use std::collections::HashMap; + +impl BlockExecutorTransaction for PreprocessedTransaction { + type Key = StateKey; + type Value = WriteOp; +} + +// Wrapper to avoid orphan rule +pub(crate) struct AptosTransactionOutput(TransactionOutputExt); + +impl AptosTransactionOutput { + pub fn new(output: TransactionOutputExt) -> Self { + Self(output) + } + + pub fn into(self) -> TransactionOutputExt { + self.0 + } + + pub fn as_ref(&self) -> &TransactionOutputExt { + &self.0 + } +} + +impl BlockExecutorTransactionOutput for AptosTransactionOutput { + type T = PreprocessedTransaction; + + fn get_writes(&self) -> Vec<(StateKey, WriteOp)> { + self.0 + .txn_output() + .write_set() + .iter() + .map(|(key, op)| (key.clone(), op.clone())) + .collect() + } + + fn get_deltas(&self) -> Vec<(StateKey, DeltaOp)> { + self.0 + .delta_change_set() + .iter() + .map(|(key, op)| (key.clone(), *op)) + .collect() + } + + /// Execution output for transactions that comes after SkipRest signal. + fn skip_output() -> Self { + Self(TransactionOutputExt::from(TransactionOutput::new( + WriteSet::default(), + vec![], + 0, + TransactionStatus::Retry, + ))) + } +} + +pub struct BlockAptosVM(); + +impl BlockAptosVM { + fn process_parallel_block_output( + results: Vec, + delta_resolver: OutputDeltaResolver, + state_view: &S, + ) -> Vec { + // TODO: MVHashmap, and then delta resolver should track aggregator base values. + let mut aggregator_base_values: HashMap> = + HashMap::new(); + for res in results.iter() { + for (key, _) in res.as_ref().delta_change_set().iter() { + if !aggregator_base_values.contains_key(key) { + aggregator_base_values.insert(key.clone(), state_view.get_state_value(key)); + } + } + } + + let materialized_deltas = + delta_resolver.resolve(aggregator_base_values.into_iter().collect(), results.len()); + + results + .into_iter() + .zip(materialized_deltas.into_iter()) + .map(|(res, delta_writes)| { + res.into() + .output_with_delta_writes(WriteSetMut::new(delta_writes)) + }) + .collect() + } + + fn process_sequential_block_output( + results: Vec, + ) -> Vec { + results + .into_iter() + .map(|res| { + let (deltas, output) = res.into().into(); + debug_assert!(deltas.is_empty(), "[Execution] Deltas must be materialized"); + output + }) + .collect() + } + + pub fn execute_block( + transactions: Vec, + state_view: &S, + concurrency_level: usize, + ) -> Result, VMStatus> { + // Verify the signatures of all the transactions in parallel. + // This is time consuming so don't wait and do the checking + // sequentially while executing the transactions. + let signature_verified_block: Vec = + RAYON_EXEC_POOL.install(|| { + transactions + .into_par_iter() + .map(preprocess_transaction::) + .collect() + }); + + let executor = + BlockExecutor::>::new(concurrency_level); + + let mut ret = if concurrency_level > 1 { + executor + .execute_transactions_parallel(state_view, &signature_verified_block) + .map(|(results, delta_resolver)| { + Self::process_parallel_block_output(results, delta_resolver, state_view) + }) + } else { + executor + .execute_transactions_sequential(state_view, &signature_verified_block) + .map(Self::process_sequential_block_output) + }; + + if ret == Err(Error::ModulePathReadWrite) { + debug!("[Execution]: Module read & written, sequential fallback"); + + ret = executor + .execute_transactions_sequential(state_view, &signature_verified_block) + .map(Self::process_sequential_block_output); + } + + // Explicit async drop. Happens here because we can't currently move to + // BlockExecutor due to the Module publishing fallback. TODO: fix after + // module publishing fallback is removed. + RAYON_EXEC_POOL.spawn(move || { + // Explicit async drops. + drop(signature_verified_block); + }); + + match ret { + Ok(outputs) => Ok(outputs), + Err(Error::ModulePathReadWrite) => { + unreachable!("[Execution]: Must be handled by sequential fallback") + } + Err(Error::UserError(err)) => Err(err), + } + } +} diff --git a/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs b/aptos-move/aptos-vm/src/block_executor/storage_wrapper.rs similarity index 97% rename from aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs rename to aptos-move/aptos-vm/src/block_executor/storage_wrapper.rs index c0592f9256369..7547dfe187b98 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs +++ b/aptos-move/aptos-vm/src/block_executor/storage_wrapper.rs @@ -3,7 +3,7 @@ use crate::data_cache::{IntoMoveResolver, StorageAdapterOwned}; use aptos_aggregator::delta_change_set::{deserialize, serialize}; -use aptos_parallel_executor::executor::{MVHashMapView, ReadResult}; +use aptos_block_executor::executor::{MVHashMapView, ReadResult}; use aptos_state_view::{StateView, StateViewId}; use aptos_types::state_store::state_storage_usage::StateStorageUsage; use aptos_types::{ diff --git a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs new file mode 100644 index 0000000000000..4c5f407048b0a --- /dev/null +++ b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs @@ -0,0 +1,133 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + adapter_common::{PreprocessedTransaction, VMAdapter}, + aptos_vm::AptosVM, + block_executor::{storage_wrapper::VersionedView, AptosTransactionOutput}, + data_cache::{AsMoveResolver, StateViewCache, StorageAdapter}, + logging::AdapterLogSchema, + move_vm_ext::MoveResolverExt, +}; +use aptos_aggregator::{delta_change_set::DeltaChangeSet, transaction::TransactionOutputExt}; +use aptos_block_executor::{ + executor::MVHashMapView, + task::{ExecutionStatus, ExecutorTask}, +}; +use aptos_logger::prelude::*; +use aptos_state_view::StateView; +use aptos_types::{state_store::state_key::StateKey, write_set::WriteOp}; +use move_core_types::{ + ident_str, + language_storage::{ModuleId, CORE_CODE_ADDRESS}, + vm_status::VMStatus, +}; +use std::collections::btree_map::BTreeMap; + +pub(crate) struct AptosExecutorTask<'a, S> { + vm: AptosVM, + base_view: &'a S, +} + +// This function is called by the BlockExecutor for each transaction is intends +// to execute (via the ExecutorTask trait). It can be as a part of sequential +// execution, or speculatively as a part of a parallel execution. +fn execute_transaction( + vm: &AptosVM, + txn: &PreprocessedTransaction, + view: S, + log_context: AdapterLogSchema, + materialize_deltas: bool, +) -> ExecutionStatus { + match vm.execute_single_transaction(txn, &view, &log_context) { + Ok((vm_status, mut output_ext, sender)) => { + if materialize_deltas { + // Keep TransactionOutputExt type for wrapper. + output_ext = TransactionOutputExt::new( + DeltaChangeSet::empty(), // Cleared deltas. + output_ext.into_transaction_output(&view), // Materialize. + ); + } + + if output_ext.txn_output().status().is_discarded() { + match sender { + Some(s) => trace!( + log_context, + "Transaction discarded, sender: {}, error: {:?}", + s, + vm_status, + ), + None => { + trace!(log_context, "Transaction malformed, error: {:?}", vm_status,) + } + }; + } + if AptosVM::should_restart_execution(output_ext.txn_output()) { + info!(log_context, "Reconfiguration occurred: restart required",); + ExecutionStatus::SkipRest(AptosTransactionOutput::new(output_ext)) + } else { + ExecutionStatus::Success(AptosTransactionOutput::new(output_ext)) + } + } + Err(err) => ExecutionStatus::Abort(err), + } +} + +impl<'a, S: 'a + StateView> ExecutorTask for AptosExecutorTask<'a, S> { + type T = PreprocessedTransaction; + type Output = AptosTransactionOutput; + type Error = VMStatus; + type Argument = &'a S; + + fn init(argument: &'a S) -> Self { + let vm = AptosVM::new(argument); + + // Loading `0x1::account` and its transitive dependency into the code cache. + // + // This should give us a warm VM to avoid the overhead of VM cold start. + // Result of this load could be omitted as this is a best effort approach and won't hurt if that fails. + // + // Loading up `0x1::account` should be sufficient as this is the most common module + // used for prologue, epilogue and transfer functionality. + + let _ = vm.load_module( + &ModuleId::new(CORE_CODE_ADDRESS, ident_str!("account").to_owned()), + &StorageAdapter::new(argument), + ); + + Self { + vm, + base_view: argument, + } + } + + fn execute_transaction_btree_view( + &self, + view: &BTreeMap, + txn: &PreprocessedTransaction, + txn_idx: usize, + ) -> ExecutionStatus { + let state_cache_view = StateViewCache::from_map_ref(self.base_view, view); + execute_transaction( + &self.vm, + txn, + state_cache_view.as_move_resolver(), + AdapterLogSchema::new(self.base_view.id(), txn_idx), + true, + ) + } + + fn execute_transaction_mvhashmap_view( + &self, + view: &MVHashMapView, + txn: &PreprocessedTransaction, + ) -> ExecutionStatus { + execute_transaction( + &self.vm, + txn, + VersionedView::new_view(self.base_view, view), + AdapterLogSchema::new(self.base_view.id(), view.txn_idx()), + false, + ) + } +} diff --git a/aptos-move/aptos-vm/src/data_cache.rs b/aptos-move/aptos-vm/src/data_cache.rs index 3f57f1e247dbd..25bc00b3ea43a 100644 --- a/aptos-move/aptos-vm/src/data_cache.rs +++ b/aptos-move/aptos-vm/src/data_cache.rs @@ -10,11 +10,8 @@ use aptos_logger::prelude::*; use aptos_state_view::{StateView, StateViewId}; use aptos_types::state_store::state_storage_usage::StateStorageUsage; use aptos_types::{ - access_path::AccessPath, - on_chain_config::ConfigStorage, - state_store::state_key::StateKey, - vm_status::StatusCode, - write_set::{WriteOp, WriteSet}, + access_path::AccessPath, on_chain_config::ConfigStorage, state_store::state_key::StateKey, + vm_status::StatusCode, write_set::WriteOp, }; use fail::fail_point; use framework::natives::state_storage::StateStorageUsageResolver; @@ -26,6 +23,7 @@ use move_core_types::{ }; use move_table_extension::{TableHandle, TableResolver}; use std::{ + borrow::Cow, collections::btree_map::BTreeMap, ops::{Deref, DerefMut}, }; @@ -34,10 +32,6 @@ use std::{ /// but can be used as a one shot cache for systems that need a simple `RemoteCache` /// implementation (e.g. tests or benchmarks). /// -/// The cache is responsible to track all changes to the `StateView` that are the result -/// of transaction execution. Those side effects are published at the end of a transaction -/// execution via `StateViewCache::push_write_set`. -/// /// `StateViewCache` is responsible to give an up to date view over the data store, /// so that changes executed but not yet committed are visible to subsequent transactions. /// @@ -45,33 +39,23 @@ use std::{ /// track of incremental changes is vital to the consistency of the data store and the system. pub struct StateViewCache<'a, S> { data_view: &'a S, - data_map: BTreeMap>>, + data_map: Cow<'a, BTreeMap>, } impl<'a, S: StateView> StateViewCache<'a, S> { + pub fn from_map_ref(data_view: &'a S, data_map_ref: &'a BTreeMap) -> Self { + Self { + data_view, + data_map: Cow::Borrowed(data_map_ref), + } + } + /// Create a `StateViewCache` give a `StateView`. Hold updates to the data store and /// forward data request to the `StateView` if not in the local cache. pub fn new(data_view: &'a S) -> Self { StateViewCache { data_view, - data_map: BTreeMap::new(), - } - } - - // Publishes a `WriteSet` computed at the end of a transaction. - // The effect is to build a layer in front of the `StateView` which keeps - // track of the data as if the changes were applied immediately. - pub(crate) fn push_write_set(&mut self, write_set: &WriteSet) { - for (ap, write_op) in write_set.iter() { - match write_op { - WriteOp::Modification(blob) | WriteOp::Creation(blob) => { - self.data_map.insert(ap.clone(), Some(blob.clone())); - } - WriteOp::Deletion => { - self.data_map.remove(ap); - self.data_map.insert(ap.clone(), None); - } - } + data_map: Cow::Owned(BTreeMap::new()), } } } @@ -84,7 +68,10 @@ impl<'block, S: StateView> StateView for StateViewCache<'block, S> { ))); match self.data_map.get(state_key) { - Some(opt_data) => Ok(opt_data.clone()), + Some(write_op) => Ok(match write_op { + WriteOp::Modification(blob) | WriteOp::Creation(blob) => Some(blob.clone()), + WriteOp::Deletion => None, + }), None => match self.data_view.get_state_value(state_key) { Ok(remote_data) => Ok(remote_data), // TODO: should we forward some error info? diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index 7b5ff0c85464b..dfe6e2c03fedd 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -111,12 +111,12 @@ pub mod foreign_contracts; mod adapter_common; pub mod aptos_vm; mod aptos_vm_impl; +pub mod block_executor; mod delta_state_view; mod errors; pub mod logging; pub mod move_vm_ext; pub mod natives; -pub mod parallel_executor; pub mod read_write_set_analysis; pub mod system_module_names; mod transaction_arg_validation; diff --git a/aptos-move/aptos-vm/src/parallel_executor/mod.rs b/aptos-move/aptos-vm/src/parallel_executor/mod.rs deleted file mode 100644 index a5357719170ed..0000000000000 --- a/aptos-move/aptos-vm/src/parallel_executor/mod.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (c) Aptos -// SPDX-License-Identifier: Apache-2.0 - -mod storage_wrapper; -mod vm_wrapper; - -use crate::{ - adapter_common::{preprocess_transaction, PreprocessedTransaction}, - aptos_vm::AptosVM, - parallel_executor::vm_wrapper::AptosVMWrapper, -}; -use aptos_aggregator::{delta_change_set::DeltaOp, transaction::TransactionOutputExt}; -use aptos_parallel_executor::{ - errors::Error, - executor::ParallelTransactionExecutor, - output_delta_resolver::ResolvedData, - task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput}, -}; -use aptos_state_view::StateView; -use aptos_types::{ - state_store::state_key::StateKey, - transaction::{Transaction, TransactionOutput, TransactionStatus}, - write_set::{WriteOp, WriteSet, WriteSetMut}, -}; -use move_core_types::vm_status::{StatusCode, VMStatus}; -use rayon::prelude::*; -use std::collections::HashMap; - -impl PTransaction for PreprocessedTransaction { - type Key = StateKey; - type Value = WriteOp; -} - -// Wrapper to avoid orphan rule -pub(crate) struct AptosTransactionOutput(TransactionOutputExt); - -impl AptosTransactionOutput { - pub fn new(output: TransactionOutputExt) -> Self { - Self(output) - } - - pub fn into(self) -> TransactionOutputExt { - self.0 - } - - pub fn as_ref(&self) -> &TransactionOutputExt { - &self.0 - } -} - -impl PTransactionOutput for AptosTransactionOutput { - type T = PreprocessedTransaction; - - fn get_writes(&self) -> Vec<(StateKey, WriteOp)> { - self.0 - .txn_output() - .write_set() - .iter() - .map(|(key, op)| (key.clone(), op.clone())) - .collect() - } - - fn get_deltas(&self) -> Vec<(StateKey, DeltaOp)> { - self.0 - .delta_change_set() - .iter() - .map(|(key, op)| (key.clone(), *op)) - .collect() - } - - /// Execution output for transactions that comes after SkipRest signal. - fn skip_output() -> Self { - Self(TransactionOutputExt::from(TransactionOutput::new( - WriteSet::default(), - vec![], - 0, - TransactionStatus::Retry, - ))) - } -} - -pub struct ParallelAptosVM(); - -impl ParallelAptosVM { - pub fn execute_block( - transactions: Vec, - state_view: &S, - concurrency_level: usize, - ) -> Result<(Vec, Option>), VMStatus> { - // Verify the signatures of all the transactions in parallel. - // This is time consuming so don't wait and do the checking - // sequentially while executing the transactions. - let signature_verified_block: Vec = transactions - .par_iter() - .map(|txn| preprocess_transaction::(txn.clone())) - .collect(); - - match ParallelTransactionExecutor::>::new( - concurrency_level, - ) - .execute_transactions_parallel(state_view, signature_verified_block) - { - Ok((results, delta_resolver)) => { - // TODO: with more deltas, collect keys in parallel (in parallel executor). - let mut aggregator_keys: HashMap> = - HashMap::new(); - - for res in results.iter() { - let output_ext = AptosTransactionOutput::as_ref(res); - for (key, _) in output_ext.delta_change_set().iter() { - if !aggregator_keys.contains_key(key) { - aggregator_keys.insert(key.clone(), state_view.get_state_value(key)); - } - } - } - - let materialized_deltas = - delta_resolver.resolve(aggregator_keys.into_iter().collect(), results.len()); - Ok(( - results - .into_iter() - .zip(materialized_deltas.into_iter()) - .map(|(res, delta_writes)| { - let output_ext = AptosTransactionOutput::into(res); - output_ext.output_with_delta_writes(WriteSetMut::new(delta_writes)) - }) - .collect(), - None, - )) - } - Err(err @ Error::ModulePathReadWrite) => { - let output = AptosVM::execute_block_and_keep_vm_status(transactions, state_view)?; - Ok(( - output - .into_iter() - .map(|(_vm_status, txn_output)| txn_output) - .collect(), - Some(err), - )) - } - Err(Error::InvariantViolation) => Err(VMStatus::Error( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - )), - Err(Error::UserError(err)) => Err(err), - } - } -} diff --git a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs deleted file mode 100644 index afabc379a2e3c..0000000000000 --- a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright (c) Aptos -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - adapter_common::{PreprocessedTransaction, VMAdapter}, - aptos_vm::AptosVM, - data_cache::StorageAdapter, - logging::AdapterLogSchema, - parallel_executor::{storage_wrapper::VersionedView, AptosTransactionOutput}, -}; -use aptos_logger::prelude::*; -use aptos_parallel_executor::{ - executor::MVHashMapView, - task::{ExecutionStatus, ExecutorTask}, -}; -use aptos_state_view::StateView; -use aptos_types::{state_store::state_key::StateKey, write_set::WriteOp}; -use move_core_types::{ - ident_str, - language_storage::{ModuleId, CORE_CODE_ADDRESS}, - vm_status::VMStatus, -}; - -pub(crate) struct AptosVMWrapper<'a, S> { - vm: AptosVM, - base_view: &'a S, -} - -impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { - type T = PreprocessedTransaction; - type Output = AptosTransactionOutput; - type Error = VMStatus; - type Argument = &'a S; - - fn init(argument: &'a S) -> Self { - let vm = AptosVM::new(argument); - - // Loading `0x1::account` and its transitive dependency into the code cache. - // - // This should give us a warm VM to avoid the overhead of VM cold start. - // Result of this load could be omitted as this is a best effort approach and won't hurt if that fails. - // - // Loading up `0x1::account` should be sufficient as this is the most common module - // used for prologue, epilogue and transfer functionality. - - let _ = vm.load_module( - &ModuleId::new(CORE_CODE_ADDRESS, ident_str!("account").to_owned()), - &StorageAdapter::new(argument), - ); - - Self { - vm, - base_view: argument, - } - } - - fn execute_transaction( - &self, - view: &MVHashMapView, - txn: &PreprocessedTransaction, - ) -> ExecutionStatus { - let log_context = AdapterLogSchema::new(self.base_view.id(), view.txn_idx()); - let versioned_view = VersionedView::new_view(self.base_view, view); - - match self - .vm - .execute_single_transaction(txn, &versioned_view, &log_context) - { - Ok((vm_status, output_ext, sender)) => { - if output_ext.txn_output().status().is_discarded() { - match sender { - Some(s) => trace!( - log_context, - "Transaction discarded, sender: {}, error: {:?}", - s, - vm_status, - ), - None => { - trace!(log_context, "Transaction malformed, error: {:?}", vm_status,) - } - }; - } - if AptosVM::should_restart_execution(output_ext.txn_output()) { - ExecutionStatus::SkipRest(AptosTransactionOutput::new(output_ext)) - } else { - ExecutionStatus::Success(AptosTransactionOutput::new(output_ext)) - } - } - Err(err) => ExecutionStatus::Abort(err), - } - } -} diff --git a/aptos-move/parallel-executor/Cargo.toml b/aptos-move/block-executor/Cargo.toml similarity index 91% rename from aptos-move/parallel-executor/Cargo.toml rename to aptos-move/block-executor/Cargo.toml index 0b639cb1f6ac3..4fc182e134f9b 100644 --- a/aptos-move/parallel-executor/Cargo.toml +++ b/aptos-move/block-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "aptos-parallel-executor" -description = "Aptos parallel transaction executor library" +name = "aptos-block-executor" +description = "Aptos block transaction executor library, parallel execution via Block-STM" version = "0.1.0" # Workspace inherited keys diff --git a/aptos-move/parallel-executor/benches/scheduler_benches.rs b/aptos-move/block-executor/benches/scheduler_benches.rs similarity index 100% rename from aptos-move/parallel-executor/benches/scheduler_benches.rs rename to aptos-move/block-executor/benches/scheduler_benches.rs diff --git a/aptos-move/parallel-executor/src/counters.rs b/aptos-move/block-executor/src/counters.rs similarity index 100% rename from aptos-move/parallel-executor/src/counters.rs rename to aptos-move/block-executor/src/counters.rs diff --git a/aptos-move/parallel-executor/src/errors.rs b/aptos-move/block-executor/src/errors.rs similarity index 83% rename from aptos-move/parallel-executor/src/errors.rs rename to aptos-move/block-executor/src/errors.rs index 905ebc5344358..d08373d8fc7db 100644 --- a/aptos-move/parallel-executor/src/errors.rs +++ b/aptos-move/block-executor/src/errors.rs @@ -3,9 +3,6 @@ #[derive(Debug, PartialEq, Eq)] pub enum Error { - /// Invariant violation that happens internally inside of scheduler, usually an indication of - /// implementation error. - InvariantViolation, /// The same module access path for module was both read & written during speculative executions. /// This may trigger a race due to the Move-VM loader cache implementation, and mitigation requires /// aborting the parallel execution pipeline and falling back to the sequential execution. diff --git a/aptos-move/parallel-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs similarity index 87% rename from aptos-move/parallel-executor/src/executor.rs rename to aptos-move/block-executor/src/executor.rs index 0db577570bedf..14e2510a0aff7 100644 --- a/aptos-move/parallel-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -15,9 +15,9 @@ use aptos_types::write_set::TransactionWrite; use mvhashmap::{MVHashMap, MVHashMapError, MVHashMapOutput}; use num_cpus; use once_cell::sync::Lazy; -use std::{hash::Hash, marker::PhantomData, sync::Arc, thread::spawn}; +use std::{collections::btree_map::BTreeMap, hash::Hash, marker::PhantomData, sync::Arc}; -static RAYON_EXEC_POOL: Lazy = Lazy::new(|| { +pub static RAYON_EXEC_POOL: Lazy = Lazy::new(|| { rayon::ThreadPoolBuilder::new() .num_threads(num_cpus::get()) .thread_name(|index| format!("par_exec_{}", index)) @@ -55,7 +55,7 @@ pub enum ReadResult { impl< 'a, - K: ModulePath + PartialOrd + Send + Clone + Hash + Eq, + K: ModulePath + PartialOrd + Ord + Send + Clone + Hash + Eq, V: TransactionWrite + Send + Sync, > MVHashMapView<'a, K, V> { @@ -105,7 +105,7 @@ impl< // `self.txn_idx` estimated to depend on a write from `dep_idx`. match self.scheduler.wait_for_dependency(self.txn_idx, dep_idx) { Some(dep_condition) => { - // Wait on a condition variable correpsonding to the encountered + // Wait on a condition variable corresponding to the encountered // read dependency. Once the dep_idx finishes re-execution, scheduler // will mark the dependency as resolved, and then the txn_idx will be // scheduled for re-execution, which will re-awaken cvar here. @@ -147,14 +147,14 @@ impl< } } -pub struct ParallelTransactionExecutor { +pub struct BlockExecutor { // number of active concurrent tasks, corresponding to the maximum number of rayon // threads that may be concurrently participating in parallel execution. concurrency_level: usize, phantom: PhantomData<(T, E)>, } -impl ParallelTransactionExecutor +impl BlockExecutor where T: Transaction, E: ExecutorTask, @@ -163,8 +163,8 @@ where /// be handled by sequential execution) and that concurrency_level <= num_cpus. pub fn new(concurrency_level: usize) -> Self { assert!( - concurrency_level > 1 && concurrency_level <= num_cpus::get(), - "Parallel execution concurrency level {} should be between 2 and number of CPUs", + concurrency_level > 0 && concurrency_level <= num_cpus::get(), + "Parallel execution concurrency level {} should be between 1 and number of CPUs", concurrency_level ); Self { @@ -198,7 +198,7 @@ where }; // VM execution. - let execute_result = executor.execute_transaction(&state_view, txn); + let execute_result = executor.execute_transaction_mvhashmap_view(&state_view, txn); let mut prev_modified_keys = last_input_output.modified_keys(idx_to_execute); // For tracking whether the recent execution wrote outside of the previous write/delta set. @@ -359,7 +359,7 @@ where pub fn execute_transactions_parallel( &self, executor_initial_arguments: E::Argument, - signature_verified_block: Vec, + signature_verified_block: &[T], ) -> Result< ( Vec, @@ -367,6 +367,8 @@ where ), E::Error, > { + assert!(self.concurrency_level > 1, "Must use sequential execution"); + let versioned_data_cache = MVHashMap::new(); if signature_verified_block.is_empty() { @@ -382,7 +384,7 @@ where s.spawn(|_| { self.work_task_with_scope( &executor_initial_arguments, - &signature_verified_block, + signature_verified_block, &last_input_output, &versioned_data_cache, &scheduler, @@ -416,10 +418,9 @@ where ret }; - spawn(move || { + RAYON_EXEC_POOL.spawn(move || { // Explicit async drops. drop(last_input_output); - drop(signature_verified_block); drop(scheduler); }); @@ -434,4 +435,48 @@ where } } } + + pub fn execute_transactions_sequential( + &self, + executor_arguments: E::Argument, + signature_verified_block: &[T], + ) -> Result, E::Error> { + let num_txns = signature_verified_block.len(); + let executor = E::init(executor_arguments); + let mut data_map = BTreeMap::new(); + + let mut ret = Vec::with_capacity(num_txns); + for (idx, txn) in signature_verified_block.iter().enumerate() { + // this call internally materializes deltas. + let res = executor.execute_transaction_btree_view(&data_map, txn, idx); + + let must_skip = matches!(res, ExecutionStatus::SkipRest(_)); + + match res { + ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => { + assert_eq!( + output.get_deltas().len(), + 0, + "Sequential execution must materialize deltas" + ); + // Apply the writes. + for (ap, write_op) in output.get_writes().into_iter() { + data_map.insert(ap, write_op); + } + ret.push(output); + } + ExecutionStatus::Abort(err) => { + // Record the status indicating abort. + return Err(Error::UserError(err)); + } + } + + if must_skip { + break; + } + } + + ret.resize_with(num_txns, E::Output::skip_output); + Ok(ret) + } } diff --git a/aptos-move/parallel-executor/src/lib.rs b/aptos-move/block-executor/src/lib.rs similarity index 100% rename from aptos-move/parallel-executor/src/lib.rs rename to aptos-move/block-executor/src/lib.rs diff --git a/aptos-move/parallel-executor/src/output_delta_resolver.rs b/aptos-move/block-executor/src/output_delta_resolver.rs similarity index 100% rename from aptos-move/parallel-executor/src/output_delta_resolver.rs rename to aptos-move/block-executor/src/output_delta_resolver.rs diff --git a/aptos-move/parallel-executor/src/proptest_types/bencher.rs b/aptos-move/block-executor/src/proptest_types/bencher.rs similarity index 92% rename from aptos-move/parallel-executor/src/proptest_types/bencher.rs rename to aptos-move/block-executor/src/proptest_types/bencher.rs index 3a608bca340f6..20a45da767af7 100644 --- a/aptos-move/parallel-executor/src/proptest_types/bencher.rs +++ b/aptos-move/block-executor/src/proptest_types/bencher.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - executor::ParallelTransactionExecutor, + executor::BlockExecutor, proptest_types::types::{ ExpectedOutput, KeyType, Task, Transaction, TransactionGen, TransactionGenParams, ValueType, }, @@ -27,8 +27,10 @@ pub struct Bencher { phantom_value: PhantomData, } -pub(crate) struct BencherState -where +pub(crate) struct BencherState< + K: Hash + Clone + Debug + Eq + PartialOrd + Ord, + V: Clone + Eq + Arbitrary, +> where Vec: From, { transactions: Vec, ValueType>>, @@ -108,11 +110,11 @@ where } pub(crate) fn run(self) { - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType>, Task, ValueType>, >::new(num_cpus::get()) - .execute_transactions_parallel((), self.transactions.clone()) + .execute_transactions_parallel((), &self.transactions) .map(|(res, _)| res); self.expected_output.assert_output(&output, None); diff --git a/aptos-move/parallel-executor/src/proptest_types/mod.rs b/aptos-move/block-executor/src/proptest_types/mod.rs similarity index 100% rename from aptos-move/parallel-executor/src/proptest_types/mod.rs rename to aptos-move/block-executor/src/proptest_types/mod.rs diff --git a/aptos-move/parallel-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs similarity index 95% rename from aptos-move/parallel-executor/src/proptest_types/tests.rs rename to aptos-move/block-executor/src/proptest_types/tests.rs index d164fc287d2ed..814f4cf6c2467 100644 --- a/aptos-move/parallel-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -3,7 +3,7 @@ use crate::{ errors::Error, - executor::ParallelTransactionExecutor, + executor::BlockExecutor, proptest_types::types::{ ExpectedOutput, KeyType, Task, Transaction, TransactionGen, TransactionGenParams, ValueType, }, @@ -46,11 +46,11 @@ fn run_transactions( } for _ in 0..num_repeat { - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType>, Task, ValueType>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); if module_access.0 && module_access.1 { @@ -166,11 +166,11 @@ fn deltas_writes_mixed() { .collect(); for _ in 0..20 { - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); let baseline = ExpectedOutput::generate_baseline(&transactions, None); @@ -202,11 +202,11 @@ fn deltas_resolver() { .collect(); for _ in 0..20 { - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()); + .execute_transactions_parallel((), &transactions); let (output, delta_resolver) = output.unwrap(); // Should not be possible to overflow or underflow, as each delta is at @@ -347,11 +347,11 @@ fn publishing_fixed_params() { }; // Confirm still no intersection - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()); + .execute_transactions_parallel((), &transactions); assert_ok!(output); // Adjust the reads of txn indices[2] to contain module read to key 42. @@ -382,11 +382,11 @@ fn publishing_fixed_params() { }; for _ in 0..200 { - let output = ParallelTransactionExecutor::< + let output = BlockExecutor::< Transaction, ValueType<[u8; 32]>>, Task, ValueType<[u8; 32]>>, >::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); assert_eq!(output.unwrap_err(), Error::ModulePathReadWrite); diff --git a/aptos-move/parallel-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs similarity index 96% rename from aptos-move/parallel-executor/src/proptest_types/types.rs rename to aptos-move/block-executor/src/proptest_types/types.rs index 6b964475fa5b6..c5cdd8ba526e5 100644 --- a/aptos-move/parallel-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -22,7 +22,7 @@ use proptest::{arbitrary::Arbitrary, collection::vec, prelude::*, proptest, samp use proptest_derive::Arbitrary; use std::collections::hash_map::DefaultHasher; use std::{ - collections::{BTreeSet, HashMap}, + collections::{btree_map::BTreeMap, BTreeSet, HashMap}, convert::TryInto, fmt::Debug, hash::{Hash, Hasher}, @@ -40,8 +40,8 @@ const STORAGE_DELTA_VAL: u128 = 100; // Generation of transactions /////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Copy, Hash, Debug, PartialEq, PartialOrd, Eq)] -pub struct KeyType( +#[derive(Clone, Copy, Hash, Debug, PartialEq, PartialOrd, Ord, Eq)] +pub struct KeyType( /// Wrapping the types used for testing to add ModulePath trait implementation (below). pub K, /// The bool field determines for testing purposes, whether the key will be interpreted @@ -51,7 +51,7 @@ pub struct KeyType( pub bool, ); -impl ModulePath for KeyType { +impl ModulePath for KeyType { fn module_path(&self) -> Option { // Since K is generic, use its hash to assign addresses. let mut hasher = DefaultHasher::new(); @@ -320,7 +320,7 @@ impl> + Arbitrary + Clone + Debug + Eq + Sync + Send> Transactio impl TransactionType for Transaction where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type Key = K; @@ -341,7 +341,7 @@ impl Task { impl ExecutorTask for Task where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type T = Transaction; @@ -353,7 +353,17 @@ where Self::new() } - fn execute_transaction( + fn execute_transaction_btree_view( + &self, + _view: &BTreeMap, + _txn: &Self::T, + _txn_idx: usize, + ) -> ExecutionStatus { + // Separate PR to proptest sequential execution flow. + unreachable!(); + } + + fn execute_transaction_mvhashmap_view( &self, view: &MVHashMapView, txn: &Self::T, @@ -394,7 +404,7 @@ pub struct Output(Vec<(K, V)>, Vec<(K, DeltaOp)>, Vec>); impl TransactionOutput for Output where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + TransactionWrite + 'static, { type T = Transaction; diff --git a/aptos-move/parallel-executor/src/scheduler.rs b/aptos-move/block-executor/src/scheduler.rs similarity index 100% rename from aptos-move/parallel-executor/src/scheduler.rs rename to aptos-move/block-executor/src/scheduler.rs diff --git a/aptos-move/parallel-executor/src/task.rs b/aptos-move/block-executor/src/task.rs similarity index 85% rename from aptos-move/parallel-executor/src/task.rs rename to aptos-move/block-executor/src/task.rs index fc7df3143b092..d75e2e8b1a107 100644 --- a/aptos-move/parallel-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -6,7 +6,7 @@ use aptos_aggregator::delta_change_set::DeltaOp; use aptos_types::{ access_path::AccessPath, state_store::state_key::StateKey, write_set::TransactionWrite, }; -use std::{fmt::Debug, hash::Hash}; +use std::{collections::btree_map::BTreeMap, fmt::Debug, hash::Hash}; /// The execution result of a transaction #[derive(Debug)] @@ -39,7 +39,7 @@ impl ModulePath for StateKey { /// Trait that defines a transaction that could be parallel executed by the scheduler. Each /// transaction will write to a key value storage as their side effect. pub trait Transaction: Sync + Send + 'static { - type Key: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath; + type Key: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath; type Value: Send + Sync + TransactionWrite; } @@ -68,8 +68,16 @@ pub trait ExecutorTask: Sync { /// Create an instance of the transaction executor. fn init(args: Self::Argument) -> Self; + /// Execute one single transaction given the view of the current state as a BTreeMap, + fn execute_transaction_btree_view( + &self, + view: &BTreeMap<::Key, ::Value>, + txn: &Self::T, + txn_idx: usize, + ) -> ExecutionStatus; + /// Execute one single transaction given the view of the current state. - fn execute_transaction( + fn execute_transaction_mvhashmap_view( &self, view: &MVHashMapView<::Key, ::Value>, txn: &Self::T, diff --git a/aptos-move/parallel-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs similarity index 100% rename from aptos-move/parallel-executor/src/txn_last_input_output.rs rename to aptos-move/block-executor/src/txn_last_input_output.rs diff --git a/aptos-move/parallel-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs similarity index 98% rename from aptos-move/parallel-executor/src/unit_tests/mod.rs rename to aptos-move/block-executor/src/unit_tests/mod.rs index 2b282cf38f7b8..a77366f457752 100644 --- a/aptos-move/parallel-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - executor::ParallelTransactionExecutor, + executor::BlockExecutor, proptest_types::types::{ExpectedOutput, KeyType, Task, Transaction, ValueType}, scheduler::{Scheduler, SchedulerTask, TaskGuard}, task::ModulePath, @@ -18,11 +18,11 @@ use std::{ fn run_and_assert(transactions: Vec>) where - K: PartialOrd + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, + K: PartialOrd + Ord + Send + Sync + Clone + Hash + Eq + ModulePath + 'static, V: Send + Sync + Debug + Clone + Eq + TransactionWrite + 'static, { - let output = ParallelTransactionExecutor::, Task>::new(num_cpus::get()) - .execute_transactions_parallel((), transactions.clone()) + let output = BlockExecutor::, Task>::new(num_cpus::get()) + .execute_transactions_parallel((), &transactions) .map(|(res, _)| res); let baseline = ExpectedOutput::generate_baseline(&transactions, None); diff --git a/aptos-move/e2e-move-tests/Cargo.toml b/aptos-move/e2e-move-tests/Cargo.toml index 81a4d87965efa..dd69eb25e13be 100644 --- a/aptos-move/e2e-move-tests/Cargo.toml +++ b/aptos-move/e2e-move-tests/Cargo.toml @@ -15,11 +15,11 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos = { workspace = true } +aptos-block-executor = { workspace = true } aptos-crypto = { workspace = true } aptos-gas = { workspace = true, features = ["testing"] } aptos-keygen = { workspace = true } aptos-logger = { workspace = true } -aptos-parallel-executor = { workspace = true } aptos-state-view = { workspace = true } aptos-types = { workspace = true } aptos-vm = { workspace = true } diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index e08d4077112e5..0a7343ffa91ba 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -43,9 +43,9 @@ use aptos_types::{ write_set::WriteSet, }; use aptos_vm::{ + block_executor::BlockAptosVM, data_cache::{AsMoveResolver, StorageAdapter}, move_vm_ext::{MoveVmExt, SessionId}, - parallel_executor::ParallelAptosVM, AptosVM, VMExecutor, VMValidator, }; use framework::ReleaseBundle; @@ -354,21 +354,6 @@ impl FakeExecutor { ) } - /// Alternate form of 'execute_block' that keeps the vm_status before it goes into the - /// `TransactionOutput` - pub fn execute_block_and_keep_vm_status( - &self, - txn_block: Vec, - ) -> Result, VMStatus> { - AptosVM::execute_block_and_keep_vm_status( - txn_block - .into_iter() - .map(Transaction::UserTransaction) - .collect(), - &self.data_store, - ) - } - /// Executes the transaction as a singleton block and applies the resulting write set to the /// data store. Panics if execution fails pub fn execute_and_apply(&mut self, transaction: SignedTransaction) -> TransactionOutput { @@ -395,13 +380,7 @@ impl FakeExecutor { &self, txn_block: Vec, ) -> Result, VMStatus> { - let (result, _) = ParallelAptosVM::execute_block( - txn_block, - &self.data_store, - usize::min(4, num_cpus::get()), - )?; - - Ok(result) + BlockAptosVM::execute_block(txn_block, &self.data_store, usize::min(4, num_cpus::get())) } pub fn execute_transaction_block( diff --git a/aptos-move/e2e-testsuite/Cargo.toml b/aptos-move/e2e-testsuite/Cargo.toml index 26b2adb43b96a..190251477101e 100644 --- a/aptos-move/e2e-testsuite/Cargo.toml +++ b/aptos-move/e2e-testsuite/Cargo.toml @@ -13,11 +13,11 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] +aptos-block-executor = { workspace = true } aptos-crypto = { workspace = true } aptos-gas = { workspace = true, features = ["testing"] } aptos-keygen = { workspace = true } aptos-logger = { workspace = true } -aptos-parallel-executor = { workspace = true } aptos-state-view = { workspace = true } aptos-types = { workspace = true } aptos-vm = { workspace = true }