From 516f32e5271d2f8060cd971c5987e3548b3417c0 Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Fri, 22 Nov 2024 18:56:20 -0800 Subject: [PATCH] Various variants of "simplified/dummy" executors (#15152) ## Description Adding all these executor configurations, for understanding upper bounds on different approaches/optimizations: ``` /// Transaction execution: AptosVM /// Executing conflicts: in the input order, via BlockSTM, /// State: BlockSTM-provided MVHashMap-based view with caching AptosVMWithBlockSTM, /// Transaction execution: NativeVM - a simplified rust implemtation to create VMChangeSet, /// Executing conflicts: in the input order, via BlockSTM /// State: BlockSTM-provided MVHashMap-based view with caching NativeVMWithBlockSTM, /// Transaction execution: AptosVM /// Executing conflicts: All transactions execute on the state at the beginning of the block /// State: Raw CachedStateView AptosVMParallelUncoordinated, /// Transaction execution: Native rust code producing WriteSet /// Executing conflicts: All transactions execute on the state at the beginning of the block /// State: Raw CachedStateView NativeParallelUncoordinated, /// Transaction execution: Native rust code updating in-memory state, no WriteSet output /// Executing conflicts: All transactions execute on the state in the first come - first serve basis /// State: In-memory DashMap with rust values of state (i.e. StateKey -> Resource (either Account or FungibleStore)), /// cached across blocks, filled upon first request NativeValueCacheParallelUncoordinated, /// Transaction execution: Native rust code updating in-memory state, no WriteSet output /// Executing conflicts: All transactions execute on the state in the first come - first serve basis /// State: In-memory DashMap with AccountAddress to seq_num and balance (ignoring all other fields). /// kept across blocks, randomly initialized on first access, storage ignored. NativeNoStorageParallelUncoordinated, ``` --- .../workflow-run-execution-performance.yaml | 13 +- Cargo.lock | 7 + aptos-move/aptos-aggregator/src/resolver.rs | 1 - .../aptos-debugger/src/aptos_debugger.rs | 4 +- .../src/transaction_bench_state.rs | 6 +- aptos-move/aptos-vm/src/aptos_vm.rs | 10 +- aptos-move/aptos-vm/src/block_executor/mod.rs | 55 +- .../aptos-vm/src/block_executor/vm_wrapper.rs | 2 +- aptos-move/aptos-vm/src/lib.rs | 12 +- .../sharded_executor_service.rs | 4 +- aptos-move/e2e-tests/src/executor.rs | 4 +- execution/executor-benchmark/Cargo.toml | 7 + execution/executor-benchmark/src/db_access.rs | 186 ++- .../src/db_reliable_submitter.rs | 17 +- execution/executor-benchmark/src/lib.rs | 360 +++-- execution/executor-benchmark/src/main.rs | 54 +- .../src/native/aptos_vm_uncoordinated.rs | 73 + .../executor-benchmark/src/native/mod.rs | 4 + .../src/native/native_transaction.rs | 148 ++ .../src/native/native_vm.rs | 939 +++++++++++++ .../parallel_uncoordinated_block_executor.rs | 1241 +++++++++++++++++ .../executor-benchmark/src/native_executor.rs | 457 ------ execution/executor-benchmark/src/pipeline.rs | 19 +- testsuite/single_node_performance.py | 113 +- .../account_config/resources/coin_store.rs | 4 + .../account_config/resources/core_account.rs | 2 +- 26 files changed, 3046 insertions(+), 696 deletions(-) create mode 100644 execution/executor-benchmark/src/native/aptos_vm_uncoordinated.rs create mode 100644 execution/executor-benchmark/src/native/native_transaction.rs create mode 100644 execution/executor-benchmark/src/native/native_vm.rs create mode 100644 execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs delete mode 100644 execution/executor-benchmark/src/native_executor.rs diff --git a/.github/workflows/workflow-run-execution-performance.yaml b/.github/workflows/workflow-run-execution-performance.yaml index 349832f84ad75..5fadd5757199c 100644 --- a/.github/workflows/workflow-run-execution-performance.yaml +++ b/.github/workflows/workflow-run-execution-performance.yaml @@ -27,6 +27,11 @@ on: default: false type: boolean description: Whether to run or skip move-only e2e tests at the beginning. + USE_COIN_APT: + required: false + default: false + type: boolean + description: By default, FA APT is exclusively used. If set Coin APT is used instead. SOURCE: required: false default: CI @@ -74,6 +79,7 @@ on: - CONTINUOUS - MAINNET - MAINNET_LARGE_DB + - EXECUTORS type: choice description: Which set of tests to run. MAINNET/MAINNET_LARGE_DB are for performance validation of mainnet nodes. SKIP_MOVE_E2E: @@ -81,6 +87,11 @@ on: default: false type: boolean description: Whether to skip move-only e2e tests at the beginning. + USE_COIN_APT: + required: false + default: false + type: boolean + description: By default, FA APT is exclusively used. If set Coin APT is used instead. IGNORE_TARGET_DETERMINATION: required: false default: true @@ -123,7 +134,7 @@ jobs: - name: Run single node execution benchmark in performance build mode shell: bash - run: TABULATE_INSTALL=lib-only pip install tabulate && FLOW="${{ inputs.FLOW }}" SOURCE="${{ inputs.SOURCE }}" RUNNER_NAME="${{ inputs.RUNNER_NAME }}" SKIP_MOVE_E2E="${{ inputs.SKIP_MOVE_E2E && '1' || '' }}" NUMBER_OF_EXECUTION_THREADS="${{ inputs.NUMBER_OF_EXECUTION_THREADS }}" testsuite/single_node_performance.py + run: TABULATE_INSTALL=lib-only pip install tabulate && FLOW="${{ inputs.FLOW }}" SOURCE="${{ inputs.SOURCE }}" RUNNER_NAME="${{ inputs.RUNNER_NAME }}" SKIP_MOVE_E2E="${{ inputs.SKIP_MOVE_E2E && '1' || '' }}" DISABLE_FA_APT="${{ inputs.USE_COIN_APT && '1' || '' }}" NUMBER_OF_EXECUTION_THREADS="${{ inputs.NUMBER_OF_EXECUTION_THREADS }}" testsuite/single_node_performance.py if: ${{ (inputs.IGNORE_TARGET_DETERMINATION || needs.test-target-determinator.outputs.run_execution_performance_test == 'true') }} - run: echo "Skipping single node execution performance! Unrelated changes detected." diff --git a/Cargo.lock b/Cargo.lock index 509cf4daf2c5a..41f1d2707b060 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1498,6 +1498,7 @@ name = "aptos-executor-benchmark" version = "0.1.0" dependencies = [ "anyhow", + "aptos-aggregator", "aptos-block-executor", "aptos-block-partitioner", "aptos-config", @@ -1512,6 +1513,7 @@ dependencies = [ "aptos-jellyfish-merkle", "aptos-logger", "aptos-metrics-core", + "aptos-mvhashmap", "aptos-node-resource-metrics", "aptos-profiler 0.1.0", "aptos-push-metrics", @@ -1522,15 +1524,20 @@ dependencies = [ "aptos-types", "aptos-vm", "aptos-vm-environment", + "aptos-vm-logging", + "aptos-vm-types", "async-trait", "bcs 0.1.4", + "bytes", "chrono", "clap 4.5.21", + "dashmap", "derivative", "indicatif 0.15.0", "itertools 0.13.0", "jemallocator", "move-core-types", + "move-vm-types", "num_cpus", "once_cell", "rand 0.7.3", diff --git a/aptos-move/aptos-aggregator/src/resolver.rs b/aptos-move/aptos-aggregator/src/resolver.rs index 0f522753392a1..200633129734c 100644 --- a/aptos-move/aptos-aggregator/src/resolver.rs +++ b/aptos-move/aptos-aggregator/src/resolver.rs @@ -86,7 +86,6 @@ pub trait TAggregatorV1View { PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR) .with_message("Cannot convert delta for deleted aggregator".to_string()) })?; - delta_op .apply_to(base) .map_err(|e| match &e { diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 2f6c08107f9ab..2dc191812cc1a 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -28,7 +28,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, AptosVM, }; @@ -434,7 +434,7 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - BlockAptosVM::execute_block::< + AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index 39446d7943fe2..f250de5c60758 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -35,7 +35,7 @@ use aptos_types::{ vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, @@ -217,7 +217,7 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = BlockAptosVM::execute_block::< + let output = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, @@ -268,7 +268,7 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = BlockAptosVM::execute_block::< + let output = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 16ff4a35bcdc6..f2afca7412a88 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, counters::*, data_cache::{AsMoveResolver, StorageAdapter}, errors::{discarded_output, expect_only_successful_execution}, @@ -22,8 +22,8 @@ use crate::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, system_module_names::*, transaction_metadata::TransactionMetadata, - transaction_validation, verifier, - verifier::randomness::get_randomness_annotation, + transaction_validation, + verifier::{self, randomness::get_randomness_annotation}, VMBlockExecutor, VMValidator, }; use anyhow::anyhow; @@ -2781,7 +2781,7 @@ impl AptosVM { // TODO - move out from this file? -/// Production implementation of TransactionBlockExecutor. +/// Production implementation of VMBlockExecutor. /// /// Transaction execution: AptosVM /// Executing conflicts: in the input order, via BlockSTM, @@ -2820,7 +2820,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { ); let count = txn_provider.num_txns(); - let ret = BlockAptosVM::execute_block::< + let ret = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index f136e461a8c60..d5c55eeb957c4 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -4,17 +4,18 @@ pub(crate) mod vm_wrapper; -use crate::{ - block_executor::vm_wrapper::AptosExecutorTask, - counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}, -}; +use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}; use aptos_aggregator::{ delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View, }; use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, errors::BlockExecutionError, - executor::BlockExecutor, task::TransactionOutput as BlockExecutorTransactionOutput, - txn_commit_hook::TransactionCommitHook, txn_provider::TxnProvider, types::InputOutputKey, + code_cache_global_manager::AptosModuleCacheManager, + errors::BlockExecutionError, + executor::BlockExecutor, + task::{ExecutorTask, TransactionOutput as BlockExecutorTransactionOutput}, + txn_commit_hook::TransactionCommitHook, + txn_provider::TxnProvider, + types::InputOutputKey, }; use aptos_infallible::Mutex; use aptos_types::{ @@ -46,8 +47,10 @@ use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use once_cell::sync::{Lazy, OnceCell}; use std::{ collections::{BTreeMap, HashSet}, + marker::PhantomData, sync::Arc, }; +use vm_wrapper::AptosExecutorTask; static RAYON_EXEC_POOL: Lazy> = Lazy::new(|| { Arc::new( @@ -69,7 +72,7 @@ pub struct AptosTransactionOutput { } impl AptosTransactionOutput { - pub(crate) fn new(output: VMOutput) -> Self { + pub fn new(output: VMOutput) -> Self { Self { vm_output: Mutex::new(Some(output)), committed_output: OnceCell::new(), @@ -387,9 +390,24 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { } } -pub struct BlockAptosVM; +pub struct AptosBlockExecutorWrapper< + E: ExecutorTask< + Txn = SignatureVerifiedTransaction, + Error = VMStatus, + Output = AptosTransactionOutput, + >, +> { + _phantom: PhantomData, +} -impl BlockAptosVM { +impl< + E: ExecutorTask< + Txn = SignatureVerifiedTransaction, + Error = VMStatus, + Output = AptosTransactionOutput, + >, + > AptosBlockExecutorWrapper +{ pub fn execute_block_on_thread_pool< S: StateView + Sync, L: TransactionCommitHook, @@ -420,14 +438,12 @@ impl BlockAptosVM { transaction_slice_metadata, )?; - let executor = BlockExecutor::< - SignatureVerifiedTransaction, - AptosExecutorTask, - S, - L, - ExecutableTestType, - TP, - >::new(config, executor_thread_pool, transaction_commit_listener); + let executor = + BlockExecutor::::new( + config, + executor_thread_pool, + transaction_commit_listener, + ); let ret = executor.execute_block( signature_verified_block, @@ -488,3 +504,6 @@ impl BlockAptosVM { ) } } + +// Same as AptosBlockExecutorWrapper with AptosExecutorTask +pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper; diff --git a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs index bbd37221be735..5c3b81dc667a3 100644 --- a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs @@ -21,7 +21,7 @@ use aptos_vm_types::{ use fail::fail_point; use move_core_types::vm_status::{StatusCode, VMStatus}; -pub(crate) struct AptosExecutorTask { +pub struct AptosExecutorTask { vm: AptosVM, id: StateViewId, } diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index d18fde5c39b5c..8aa474f762699 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -194,9 +194,11 @@ pub trait VMBlockExecutor: Send + Sync { /// Executes a block of transactions using a sharded block executor and returns the results. fn execute_block_sharded>( - sharded_block_executor: &ShardedBlockExecutor, - transactions: PartitionedTransactions, - state_view: Arc, - onchain_config: BlockExecutorConfigFromOnchain, - ) -> Result, VMStatus>; + _sharded_block_executor: &ShardedBlockExecutor, + _transactions: PartitionedTransactions, + _state_view: Arc, + _onchain_config: BlockExecutorConfigFromOnchain, + ) -> Result, VMStatus> { + unimplemented!("sharded not supported") + } } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index 5d68d52a14e1b..16833d1e48555 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - block_executor::BlockAptosVM, + block_executor::AptosVMBlockExecutorWrapper, sharded_block_executor::{ aggr_overridden_state_view::{AggregatorOverriddenStateView, TOTAL_SUPPLY_AGGR_BASE_VAL}, coordinator_client::CoordinatorClient, @@ -140,7 +140,7 @@ impl ShardedExecutorService { }); s.spawn(move |_| { let txn_provider = DefaultTxnProvider::new(signature_verified_transactions); - let ret = BlockAptosVM::execute_block_on_thread_pool( + let ret = AptosVMBlockExecutorWrapper::execute_block_on_thread_pool( executor_thread_pool, &txn_provider, aggr_overridden_state_view.as_ref(), diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index cc145f190928d..c510477791f99 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -54,7 +54,7 @@ use aptos_types::{ AptosCoinType, CoinType, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, gas::make_prod_gas_meter, move_vm_ext::{MoveVmExt, SessionExt, SessionId}, @@ -678,7 +678,7 @@ impl FakeExecutor { onchain: onchain_config, }; let txn_provider = DefaultTxnProvider::new(txn_block); - BlockAptosVM::execute_block_on_thread_pool::< + AptosVMBlockExecutorWrapper::execute_block_on_thread_pool::< _, NoOpTransactionCommitHook, _, diff --git a/execution/executor-benchmark/Cargo.toml b/execution/executor-benchmark/Cargo.toml index 4f99fe0a77268..11083a7c5430b 100644 --- a/execution/executor-benchmark/Cargo.toml +++ b/execution/executor-benchmark/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } +aptos-aggregator = { workspace = true } aptos-block-executor = { workspace = true } aptos-block-partitioner = { workspace = true } aptos-config = { workspace = true } @@ -28,6 +29,7 @@ aptos-genesis = { workspace = true, features = ["testing"] } aptos-jellyfish-merkle = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } +aptos-mvhashmap = { workspace = true } aptos-node-resource-metrics = { workspace = true } aptos-push-metrics = { workspace = true } aptos-sdk = { workspace = true } @@ -36,14 +38,19 @@ aptos-transaction-generator-lib = { workspace = true } aptos-types = { workspace = true } aptos-vm = { workspace = true } aptos-vm-environment = { workspace = true } +aptos-vm-logging = { workspace = true } +aptos-vm-types = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } +dashmap = { workspace = true } derivative = { workspace = true } indicatif = { workspace = true } itertools = { workspace = true } move-core-types = { workspace = true } +move-vm-types = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } rand = { workspace = true } diff --git a/execution/executor-benchmark/src/db_access.rs b/execution/executor-benchmark/src/db_access.rs index 98eebae63b4ca..ec645db4fab90 100644 --- a/execution/executor-benchmark/src/db_access.rs +++ b/execution/executor-benchmark/src/db_access.rs @@ -2,60 +2,81 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use aptos_storage_interface::state_view::DbStateView; use aptos_types::{ account_address::AccountAddress, + account_config::{ + AccountResource, CoinInfoResource, CoinStoreResource, ConcurrentSupplyResource, + FungibleStoreResource, ObjectCoreResource, ObjectGroupResource, TypeInfoResource, + }, + event::{EventHandle, EventKey}, state_store::{state_key::StateKey, StateView}, write_set::TOTAL_SUPPLY_STATE_KEY, + AptosCoinType, CoinType, }; +use itertools::Itertools; use move_core_types::{ identifier::Identifier, language_storage::{StructTag, TypeTag}, + move_resource::MoveStructType, }; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::str::FromStr; - -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct CoinStore { - pub coin: u64, - pub _frozen: bool, - pub _deposit_events: EventHandle, - pub _withdraw_events: EventHandle, -} +use serde::de::DeserializeOwned; +use std::{collections::BTreeMap, str::FromStr}; -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct EventHandle { - _counter: u64, - _guid: GUID, -} +pub struct CommonStructTags { + pub account: StructTag, + pub apt_coin_store: StructTag, + pub object_group: StructTag, + pub object_core: StructTag, + pub fungible_store: StructTag, + pub concurrent_supply: StructTag, + + pub apt_coin_type_name: String, -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct GUID { - _creation_num: u64, - _address: Address, + pub apt_coin_info_resource: StateKey, } -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct Account { - pub authentication_key: Vec, - pub sequence_number: u64, - pub _guid_creation_num: u64, - pub _coin_register_events: EventHandle, - pub _key_rotation_events: EventHandle, - pub _rotation_capability_offer: CapabilityOffer, - pub _signer_capability_offer: CapabilityOffer, +impl CommonStructTags { + pub fn new() -> Self { + Self { + account: AccountResource::struct_tag(), + apt_coin_store: CoinStoreResource::::struct_tag(), + object_group: ObjectGroupResource::struct_tag(), + object_core: ObjectCoreResource::struct_tag(), + fungible_store: FungibleStoreResource::struct_tag(), + concurrent_supply: ConcurrentSupplyResource::struct_tag(), + + apt_coin_type_name: "0x1::aptos_coin::AptosCoin".to_string(), + apt_coin_info_resource: StateKey::resource_typed::>( + &AptosCoinType::coin_info_address(), + ) + .unwrap(), + } + } } -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct CapabilityOffer { - _for_address: Option
, +impl Default for CommonStructTags { + fn default() -> Self { + Self::new() + } } -pub type Address = [u8; 32]; +pub struct DbAccessUtil { + pub common: CommonStructTags, +} -pub struct DbAccessUtil; +impl Default for DbAccessUtil { + fn default() -> Self { + Self::new() + } +} impl DbAccessUtil { + pub fn new() -> Self { + Self { + common: CommonStructTags::new(), + } + } + pub fn new_struct_tag( address: AccountAddress, module: &str, @@ -70,46 +91,36 @@ impl DbAccessUtil { } } - pub fn new_state_key( - address: AccountAddress, - resource_address: AccountAddress, - module: &str, - name: &str, - type_args: Vec, - ) -> StateKey { - StateKey::resource( - &address, - &Self::new_struct_tag(resource_address, module, name, type_args), - ) - .unwrap() + pub fn new_state_key_account(&self, address: &AccountAddress) -> StateKey { + StateKey::resource(address, &self.common.account).unwrap() } - pub fn new_state_key_account(address: AccountAddress) -> StateKey { - Self::new_state_key(address, AccountAddress::ONE, "account", "Account", vec![]) + pub fn new_state_key_aptos_coin(&self, address: &AccountAddress) -> StateKey { + StateKey::resource(address, &self.common.apt_coin_store).unwrap() } - pub fn new_state_key_aptos_coin(address: AccountAddress) -> StateKey { - Self::new_state_key(address, AccountAddress::ONE, "coin", "CoinStore", vec![ - TypeTag::Struct(Box::new(Self::new_struct_tag( - AccountAddress::ONE, - "aptos_coin", - "AptosCoin", - vec![], - ))), - ]) + pub fn new_state_key_object_resource_group(&self, address: &AccountAddress) -> StateKey { + StateKey::resource_group(address, &self.common.object_group) } pub fn get_account( account_key: &StateKey, state_view: &impl StateView, - ) -> Result> { + ) -> Result> { Self::get_value(account_key, state_view) } - pub fn get_coin_store( + pub fn get_fa_store( + store_key: &StateKey, + state_view: &impl StateView, + ) -> Result> { + Self::get_value(store_key, state_view) + } + + pub fn get_apt_coin_store( coin_store_key: &StateKey, state_view: &impl StateView, - ) -> Result> { + ) -> Result>> { Self::get_value(coin_store_key, state_view) } @@ -123,14 +134,63 @@ impl DbAccessUtil { value.transpose().map_err(anyhow::Error::msg) } - pub fn get_db_value( + pub fn get_resource_group( state_key: &StateKey, - state_view: &DbStateView, - ) -> Result> { + state_view: &impl StateView, + ) -> Result>>> { Self::get_value(state_key, state_view) } pub fn get_total_supply(state_view: &impl StateView) -> Result> { Self::get_value(&TOTAL_SUPPLY_STATE_KEY, state_view) } + + pub fn new_account_resource(address: AccountAddress) -> AccountResource { + AccountResource::new( + 0, + address.to_vec(), + EventHandle::new(EventKey::new(1, address), 0), + EventHandle::new(EventKey::new(2, address), 0), + ) + } + + pub fn new_apt_coin_store( + balance: u64, + address: AccountAddress, + ) -> CoinStoreResource { + CoinStoreResource::::new( + balance, + false, + EventHandle::new(EventKey::new(1, address), 0), + EventHandle::new(EventKey::new(2, address), 0), + ) + } + + pub fn new_object_core(address: AccountAddress, owner: AccountAddress) -> ObjectCoreResource { + ObjectCoreResource::new(owner, false, EventHandle::new(EventKey::new(1, address), 0)) + } + + pub fn new_type_info_resource() -> anyhow::Result { + let struct_tag = T::struct_tag(); + Ok(TypeInfoResource { + account_address: struct_tag.address, + module_name: bcs::to_bytes(&struct_tag.module.to_string())?, + struct_name: bcs::to_bytes( + &if struct_tag.type_args.is_empty() { + struct_tag.name.to_string() + } else { + format!( + "{}<{}>", + struct_tag.name, + struct_tag + .type_args + .iter() + .map(|v| v.to_string()) + .join(", ") + ) + .to_string() + }, + )?, + }) + } } diff --git a/execution/executor-benchmark/src/db_reliable_submitter.rs b/execution/executor-benchmark/src/db_reliable_submitter.rs index d95b8f37e70fa..8ed1ece5e1fb2 100644 --- a/execution/executor-benchmark/src/db_reliable_submitter.rs +++ b/execution/executor-benchmark/src/db_reliable_submitter.rs @@ -2,15 +2,16 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::db_access::{CoinStore, DbAccessUtil}; +use crate::db_access::DbAccessUtil; use anyhow::{Context, Result}; use aptos_storage_interface::{state_view::LatestDbStateCheckpointView, DbReaderWriter}; use aptos_transaction_generator_lib::{CounterState, ReliableTransactionSubmitter}; use aptos_types::{ account_address::AccountAddress, - account_config::AccountResource, + account_config::{AccountResource, CoinStoreResource}, state_store::MoveResourceExt, transaction::{SignedTransaction, Transaction}, + AptosCoinType, }; use async_trait::async_trait; use std::{ @@ -28,12 +29,14 @@ pub struct DbReliableTransactionSubmitter { impl ReliableTransactionSubmitter for DbReliableTransactionSubmitter { async fn get_account_balance(&self, account_address: AccountAddress) -> Result { let db_state_view = self.db.reader.latest_state_checkpoint_view().unwrap(); - let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(account_address); - let sender_coin_store = - DbAccessUtil::get_db_value::(&sender_coin_store_key, &db_state_view)? - .unwrap(); + let sender_coin_store_key = DbAccessUtil::new().new_state_key_aptos_coin(&account_address); + let sender_coin_store = DbAccessUtil::get_value::>( + &sender_coin_store_key, + &db_state_view, + )? + .unwrap(); - Ok(sender_coin_store.coin) + Ok(sender_coin_store.coin()) } async fn query_sequence_number(&self, address: AccountAddress) -> Result { diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index ef9b88e284c16..68285d5348396 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -10,7 +10,6 @@ mod db_reliable_submitter; mod ledger_update_stage; mod metrics; pub mod native; -pub mod native_executor; pub mod pipeline; pub mod transaction_committer; pub mod transaction_executor; @@ -44,7 +43,7 @@ use aptos_transaction_generator_lib::{ TransactionType::{self, CoinTransfer}, }; use aptos_types::on_chain_config::{FeatureFlag, Features}; -use aptos_vm::VMBlockExecutor; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; use db_reliable_submitter::DbReliableTransactionSubmitter; use metrics::TIMER; use pipeline::PipelineConfig; @@ -63,11 +62,8 @@ pub fn default_benchmark_features() -> Features { init_features } -pub fn init_db_and_executor(config: &NodeConfig) -> (DbReaderWriter, BlockExecutor) -where - V: VMBlockExecutor, -{ - let db = DbReaderWriter::new( +pub fn init_db(config: &NodeConfig) -> DbReaderWriter { + DbReaderWriter::new( AptosDB::open( config.storage.get_dir_paths(), false, /* readonly */ @@ -79,11 +75,7 @@ where None, ) .expect("DB should open."), - ); - - let executor = BlockExecutor::new(db.clone()); - - (db, executor) + ) } fn create_checkpoint( @@ -140,7 +132,7 @@ pub fn run_benchmark( config.storage.dir = checkpoint_dir.as_ref().to_path_buf(); config.storage.storage_pruner_config = pruner_config; config.storage.rocksdb_configs.enable_storage_sharding = enable_storage_sharding; - let (db, executor) = init_db_and_executor::(&config); + let db = init_db(&config); let root_account = TransactionGenerator::read_root_account(genesis_key, &db); let root_account = Arc::new(root_account); @@ -176,7 +168,7 @@ pub fn run_benchmark( let (main_signer_accounts, burner_accounts) = accounts_cache.split(num_main_signer_accounts); - let (transaction_generator_creator, phase) = init_workload::( + let (transaction_generator_creator, phase) = init_workload::( transaction_mix.clone(), root_account.clone(), main_signer_accounts, @@ -198,6 +190,7 @@ pub fn run_benchmark( }; let start_version = db.reader.expect_synced_version(); + let executor = BlockExecutor::::new(db.clone()); let (pipeline, block_sender) = Pipeline::new(executor, start_version, &pipeline_config, Some(num_blocks)); @@ -268,15 +261,19 @@ pub fn run_benchmark( info!("Done creating workload"); pipeline.start_pipeline_processing(); info!("Waiting for pipeline to finish"); - pipeline.join(); + let num_pipeline_txns = pipeline.join(); info!("Executed workload {}", workload_name); - if !pipeline_config.skip_commit { - let num_txns = - db.reader.expect_synced_version() - start_version - num_blocks_created as u64; - overall_measuring.print_end("Overall", num_txns); + let num_txns = if !pipeline_config.skip_commit { + db.reader.expect_synced_version() - start_version - num_blocks_created as u64 + } else { + num_pipeline_txns.unwrap_or_default() + }; + overall_measuring.print_end("Overall", num_txns); + + if !pipeline_config.skip_commit { if verify_sequence_numbers { generator.verify_sequence_numbers(db.reader.clone()); } @@ -395,7 +392,8 @@ fn add_accounts_impl( config.storage.dir = output_dir.as_ref().to_path_buf(); config.storage.storage_pruner_config = pruner_config; config.storage.rocksdb_configs.enable_storage_sharding = enable_storage_sharding; - let (db, executor) = init_db_and_executor::(&config); + let db = init_db(&config); + let executor = BlockExecutor::::new(db.clone()); let start_version = db.reader.get_latest_ledger_info_version().unwrap(); @@ -784,108 +782,250 @@ fn log_total_supply(db_reader: &Arc) { #[cfg(test)] mod tests { use crate::{ - db_generator::bootstrap_with_genesis, default_benchmark_features, init_db_and_executor, - native::native_config::NativeConfig, pipeline::PipelineConfig, + db_generator::bootstrap_with_genesis, + default_benchmark_features, init_db, + native::{ + aptos_vm_uncoordinated::AptosVMParallelUncoordinatedBlockExecutor, + native_config::NativeConfig, + native_vm::NativeVMBlockExecutor, + parallel_uncoordinated_block_executor::{ + NativeNoStorageRawTransactionExecutor, NativeParallelUncoordinatedBlockExecutor, + NativeRawTransactionExecutor, NativeValueCacheRawTransactionExecutor, + }, + }, + pipeline::PipelineConfig, transaction_executor::BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, - transaction_generator::TransactionGenerator, BenchmarkWorkload, + transaction_generator::TransactionGenerator, + BenchmarkWorkload, }; use aptos_config::config::NO_OP_STORAGE_PRUNER_CONFIG; use aptos_crypto::HashValue; + use aptos_executor::block_executor::BlockExecutor; use aptos_executor_types::BlockExecutorTrait; use aptos_sdk::{transaction_builder::aptos_stdlib, types::LocalAccount}; use aptos_temppath::TempPath; use aptos_transaction_generator_lib::{args::TransactionTypeArg, WorkflowProgress}; - use aptos_types::{on_chain_config::FeatureFlag, transaction::Transaction}; + use aptos_types::{ + access_path::Path, + account_address::AccountAddress, + on_chain_config::{FeatureFlag, Features}, + state_store::state_key::inner::StateKeyInner, + transaction::{Transaction, TransactionPayload}, + }; use aptos_vm::{aptos_vm::AptosVMBlockExecutor, AptosVM, VMBlockExecutor}; + use itertools::Itertools; + use move_core_types::language_storage::StructTag; use rand::thread_rng; - use std::fs; + use std::{ + collections::{BTreeMap, HashMap}, + fs, + }; + + #[test] + fn test_compare_vm_and_vm_uncoordinated() { + test_compare_prod_and_another_all_types::(true); + } #[test] fn test_compare_vm_and_native() { + test_compare_prod_and_another_all_types::(false); + } + + #[test] + fn test_compare_vm_and_native_parallel_uncoordinated() { + test_compare_prod_and_another_all_types::< + NativeParallelUncoordinatedBlockExecutor, + >(false); + } + + fn test_compare_prod_and_another_all_types(values_match: bool) { + let mut non_fa_features = default_benchmark_features(); + non_fa_features.disable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + non_fa_features.disable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + // non_fa_features.disable(FeatureFlag::MODULE_EVENT_MIGRATION); + // non_fa_features.disable(FeatureFlag::COIN_TO_FUNGIBLE_ASSET_MIGRATION); + + test_compare_prod_and_another::(values_match, non_fa_features.clone(), |address| { + aptos_stdlib::aptos_account_transfer(address, 1000) + }); + + test_compare_prod_and_another::( + values_match, + non_fa_features, + aptos_stdlib::aptos_account_create_account, + ); + + let mut fa_features = default_benchmark_features(); + fa_features.enable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + fa_features.enable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + fa_features.disable(FeatureFlag::CONCURRENT_FUNGIBLE_BALANCE); + + test_compare_prod_and_another::(values_match, fa_features.clone(), |address| { + aptos_stdlib::aptos_account_fungible_transfer_only(address, 1000) + }); + + test_compare_prod_and_another::(values_match, fa_features.clone(), |address| { + aptos_stdlib::aptos_account_transfer(address, 1000) + }); + + test_compare_prod_and_another::( + values_match, + fa_features, + aptos_stdlib::aptos_account_create_account, + ); + } + + fn test_compare_prod_and_another( + values_match: bool, + features: Features, + txn_payload_f: impl Fn(AccountAddress) -> TransactionPayload, + ) { aptos_logger::Logger::new().init(); let db_dir = TempPath::new(); fs::create_dir_all(db_dir.as_ref()).unwrap(); - let mut init_features = default_benchmark_features(); - init_features.enable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); - init_features.enable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); - - bootstrap_with_genesis(&db_dir, false, init_features.clone()); + bootstrap_with_genesis(&db_dir, false, features.clone()); let (mut config, genesis_key) = - aptos_genesis::test_utils::test_config_with_custom_features(init_features); + aptos_genesis::test_utils::test_config_with_custom_features(features); config.storage.dir = db_dir.as_ref().to_path_buf(); config.storage.storage_pruner_config = NO_OP_STORAGE_PRUNER_CONFIG; config.storage.rocksdb_configs.enable_storage_sharding = false; - let _txn = { - let (vm_db, vm_executor) = init_db_and_executor::(&config); + let (txn, vm_result) = { + let vm_db = init_db(&config); + let vm_executor = BlockExecutor::::new(vm_db.clone()); + let root_account = TransactionGenerator::read_root_account(genesis_key, &vm_db); let dst = LocalAccount::generate(&mut thread_rng()); - let num_coins = 1000; let txn_factory = TransactionGenerator::create_transaction_factory(); - let txn = Transaction::UserTransaction(root_account.sign_with_transaction_builder( - txn_factory.payload(aptos_stdlib::aptos_account_fungible_transfer_only( - dst.address(), - num_coins, - )), - )); + let txn = + Transaction::UserTransaction(root_account.sign_with_transaction_builder( + txn_factory.payload(txn_payload_f(dst.address())), + )); + let parent_block_id = vm_executor.committed_block_id(); + let block_id = HashValue::random(); vm_executor .execute_and_state_checkpoint( - (HashValue::zero(), vec![txn.clone()]).into(), - vm_executor.committed_block_id(), + (block_id, vec![txn.clone()]).into(), + parent_block_id, BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, ) .unwrap(); - txn + let result = vm_executor + .ledger_update(block_id, parent_block_id) + .unwrap() + .execution_output; + result.check_aborts_discards_retries(false, false, false); + (txn, result) }; - // let (_native_db, native_executor) = init_db_and_executor::(&config); - // native_executor - // .execute_and_state_checkpoint( - // (HashValue::zero(), vec![txn]).into(), - // native_executor.committed_block_id(), - // BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, - // ) - // .unwrap(); - - // let ( - // vm_txns, - // _vm_state_updates_vec, - // _vm_state_checkpoint_hashes, - // _vm_state_updates_before_last_checkpoint, - // _vm_sharded_state_cache, - // _vm_block_end_info, - // ) = vm_result.into_inner(); - // let (vm_statuses_for_input_txns, - // vm_to_commit, - // vm_to_discard, - // vm_to_retry, - // ) = vm_txns.into_inner(); - - // let ( - // native_txns, - // _native_state_updates_vec, - // _native_state_checkpoint_hashes, - // _native_state_updates_before_last_checkpoint, - // _native_sharded_state_cache, - // _native_block_end_info, - // ) = native_result.into_inner(); - // let (native_statuses_for_input_txns, - // native_to_commit, - // native_to_discard, - // native_to_retry, - // ) = native_txns.into_inner(); - - // println!("{:?}", vm_to_commit.parsed_outputs()); - // assert_eq!(vm_statuses_for_input_txns, native_statuses_for_input_txns); - // assert_eq!(vm_to_commit, native_to_commit); - // assert_eq!(vm_to_discard, native_to_discard); - // assert_eq!(vm_to_retry, native_to_retry); + let other_db = init_db(&config); + let other_executor = BlockExecutor::::new(other_db.clone()); + + let parent_block_id = other_executor.committed_block_id(); + let block_id = HashValue::random(); + other_executor + .execute_and_state_checkpoint( + (block_id, vec![txn]).into(), + parent_block_id, + BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + let other_result = other_executor + .ledger_update(block_id, parent_block_id) + .unwrap() + .execution_output; + other_result.check_aborts_discards_retries(false, false, false); + + let vm_to_commit = &vm_result.to_commit; + let other_to_commit = &other_result.to_commit; + + assert_eq!(2, vm_to_commit.transaction_outputs.len()); + let vm_txn_output = &vm_to_commit.transaction_outputs[0]; + let vm_cp_txn_output = &vm_to_commit.transaction_outputs[1]; + + assert_eq!(2, other_to_commit.transaction_outputs.len()); + let other_txn_output = &other_to_commit.transaction_outputs[0]; + let other_cp_txn_output = &other_to_commit.transaction_outputs[1]; + + assert_eq!(vm_cp_txn_output, other_cp_txn_output); + + let vm_event_types = vm_txn_output + .events() + .iter() + .map(|event| event.type_tag().clone()) + .sorted() + .collect::>(); + let other_event_types = other_txn_output + .events() + .iter() + .map(|event| event.type_tag().clone()) + .sorted() + .collect::>(); + assert_eq!(vm_event_types, other_event_types); + + if values_match { + for (event1, event2) in vm_txn_output + .events() + .iter() + .zip_eq(other_txn_output.events().iter()) + { + assert_eq!(event1, event2); + } + } + + let vm_writes = vm_txn_output + .write_set() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + let other_writes = other_txn_output + .write_set() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + for (key, value) in vm_writes.iter() { + if let StateKeyInner::AccessPath(apath) = key.inner() { + if let Path::ResourceGroup(_) = apath.get_path() { + let vm_resources = + bcs::from_bytes::>>(value.bytes().unwrap()) + .unwrap(); + let other_resources = + other_writes + .get(key) + .map_or_else(BTreeMap::new, |other_value| { + bcs::from_bytes::>>( + other_value.bytes().unwrap(), + ) + .unwrap() + }); + + assert_eq!( + vm_resources.keys().collect::>(), + other_resources.keys().collect::>() + ); + if values_match { + assert_eq!(vm_resources, other_resources); + } + } + } + + assert!(other_writes.contains_key(key), "missing: {:?}", key); + if values_match { + let other_value = other_writes.get(key).unwrap(); + assert_eq!(value, other_value, "different value for key: {:?}", key); + } + } + assert_eq!(vm_writes.len(), other_writes.len()); + + if values_match { + assert_eq!(vm_txn_output, other_txn_output); + } } fn test_generic_benchmark( @@ -961,29 +1101,53 @@ mod tests { AptosVM::set_num_shards_once(1); AptosVM::set_concurrency_level_once(4); AptosVM::set_processed_transactions_detailed_counters(); - for _ in 0..10 { - test_generic_benchmark::( - Some(TransactionTypeArg::RepublishAndCall), - true, - ); - } + test_generic_benchmark::( + Some(TransactionTypeArg::RepublishAndCall), + true, + ); } #[test] fn test_benchmark_transaction() { - AptosVM::set_num_shards_once(1); + AptosVM::set_num_shards_once(4); AptosVM::set_concurrency_level_once(4); AptosVM::set_processed_transactions_detailed_counters(); - NativeConfig::set_concurrency_level_once(1); + NativeConfig::set_concurrency_level_once(4); test_generic_benchmark::( Some(TransactionTypeArg::ModifyGlobalMilestoneAggV2), true, ); } - // #[test] - // fn test_native_benchmark() { - // // correct execution not yet implemented, so cannot be checked for validity - // test_generic_benchmark::(None, false); - // } + #[test] + fn test_native_vm_benchmark_transaction() { + test_generic_benchmark::( + Some(TransactionTypeArg::AptFaTransfer), + true, + ); + } + + #[test] + fn test_native_loose_block_executor_benchmark() { + // correct execution not yet implemented, so cannot be checked for validity + test_generic_benchmark::< + NativeParallelUncoordinatedBlockExecutor, + >(Some(TransactionTypeArg::NoOp), false); + } + + #[test] + fn test_native_value_cache_loose_block_executor_benchmark() { + // correct execution not yet implemented, so cannot be checked for validity + test_generic_benchmark::< + NativeParallelUncoordinatedBlockExecutor, + >(Some(TransactionTypeArg::NoOp), false); + } + + #[test] + fn test_native_direct_raw_loose_block_executor_benchmark() { + // correct execution not yet implemented, so cannot be checked for validity + test_generic_benchmark::< + NativeParallelUncoordinatedBlockExecutor, + >(Some(TransactionTypeArg::NoOp), false); + } } diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index 90e479caa932b..3f24f0129bf67 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -14,8 +14,18 @@ use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, }; use aptos_executor_benchmark::{ - default_benchmark_features, native::native_config::NativeConfig, - native_executor::NativeExecutor, pipeline::PipelineConfig, BenchmarkWorkload, + default_benchmark_features, + native::{ + aptos_vm_uncoordinated::AptosVMParallelUncoordinatedBlockExecutor, + native_config::NativeConfig, + native_vm::NativeVMBlockExecutor, + parallel_uncoordinated_block_executor::{ + NativeNoStorageRawTransactionExecutor, NativeParallelUncoordinatedBlockExecutor, + NativeRawTransactionExecutor, NativeValueCacheRawTransactionExecutor, + }, + }, + pipeline::PipelineConfig, + BenchmarkWorkload, }; use aptos_executor_service::remote_executor_client; use aptos_experimental_ptx_executor::PtxBlockExecutor; @@ -234,10 +244,28 @@ enum BlockExecutorTypeOpt { /// State: BlockSTM-provided MVHashMap-based view with caching #[default] AptosVMWithBlockSTM, + /// Transaction execution: NativeVM - a simplified rust implemtation to create VMChangeSet, + /// Executing conflicts: in the input order, via BlockSTM + /// State: BlockSTM-provided MVHashMap-based view with caching + NativeVMWithBlockSTM, + /// Transaction execution: AptosVM + /// Executing conflicts: All transactions execute on the state at the beginning of the block + /// State: Raw CachedStateView + AptosVMParallelUncoordinated, /// Transaction execution: Native rust code producing WriteSet /// Executing conflicts: All transactions execute on the state at the beginning of the block /// State: Raw CachedStateView - NativeLooseSpeculative, + NativeParallelUncoordinated, + /// Transaction execution: Native rust code updating in-memory state, no WriteSet output + /// Executing conflicts: All transactions execute on the state in the first come - first serve basis + /// State: In-memory DashMap with rust values of state (i.e. StateKey -> Resource (either Account or FungibleStore)), + /// cached across blocks, filled upon first request + NativeValueCacheParallelUncoordinated, + /// Transaction execution: Native rust code updating in-memory state, no WriteSet output + /// Executing conflicts: All transactions execute on the state in the first come - first serve basis + /// State: In-memory DashMap with AccountAddress to seq_num and balance (ignoring all other fields). + /// kept across blocks, randomly initialized on first access, storage ignored. + NativeNoStorageParallelUncoordinated, PtxExecutor, } @@ -619,8 +647,24 @@ fn main() { BlockExecutorTypeOpt::AptosVMWithBlockSTM => { run::(opt); }, - BlockExecutorTypeOpt::NativeLooseSpeculative => { - run::(opt); + BlockExecutorTypeOpt::NativeVMWithBlockSTM => { + run::(opt); + }, + BlockExecutorTypeOpt::AptosVMParallelUncoordinated => { + run::(opt); + }, + BlockExecutorTypeOpt::NativeParallelUncoordinated => { + run::>(opt); + }, + BlockExecutorTypeOpt::NativeValueCacheParallelUncoordinated => { + run::>( + opt, + ); + }, + BlockExecutorTypeOpt::NativeNoStorageParallelUncoordinated => { + run::>( + opt, + ); }, BlockExecutorTypeOpt::PtxExecutor => { #[cfg(target_os = "linux")] diff --git a/execution/executor-benchmark/src/native/aptos_vm_uncoordinated.rs b/execution/executor-benchmark/src/native/aptos_vm_uncoordinated.rs new file mode 100644 index 0000000000000..6496db5da509e --- /dev/null +++ b/execution/executor-benchmark/src/native/aptos_vm_uncoordinated.rs @@ -0,0 +1,73 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::native_config::NATIVE_EXECUTOR_POOL; +use aptos_block_executor::{ + counters::BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, txn_provider::default::DefaultTxnProvider, +}; +use aptos_types::{ + block_executor::{ + config::BlockExecutorConfigFromOnchain, + transaction_slice_metadata::TransactionSliceMetadata, + }, + state_store::StateView, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, + TransactionOutput, + }, + vm_status::VMStatus, +}; +use aptos_vm::{AptosVM, VMBlockExecutor}; +use aptos_vm_environment::environment::AptosEnvironment; +use aptos_vm_logging::log_schema::AdapterLogSchema; +use aptos_vm_types::module_and_script_storage::AsAptosCodeStorage; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; + +pub struct AptosVMParallelUncoordinatedBlockExecutor; + +impl VMBlockExecutor for AptosVMParallelUncoordinatedBlockExecutor { + fn new() -> Self { + Self + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + _onchain_config: BlockExecutorConfigFromOnchain, + _transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let _timer = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.start_timer(); + + // let features = Features::fetch_config(&state_view).unwrap_or_default(); + + let env = AptosEnvironment::new(state_view); + let vm = AptosVM::new(env.clone(), state_view); + + let transaction_outputs = NATIVE_EXECUTOR_POOL.install(|| { + txn_provider + .get_txns() + .par_iter() + .enumerate() + .map(|(txn_idx, txn)| { + let log_context = AdapterLogSchema::new(state_view.id(), txn_idx); + let code_storage = state_view.as_aptos_code_storage(env.clone()); + + vm.execute_single_transaction( + txn, + &vm.as_move_resolver(state_view), + &code_storage, + &log_context, + ) + .map(|(_vm_status, vm_output)| { + vm_output + .try_materialize_into_transaction_output(state_view) + .unwrap() + }) + }) + .collect::, _>>() + })?; + + Ok(BlockOutput::new(transaction_outputs, None)) + } +} diff --git a/execution/executor-benchmark/src/native/mod.rs b/execution/executor-benchmark/src/native/mod.rs index affa99e856bfb..5ab07fbb4b1a3 100644 --- a/execution/executor-benchmark/src/native/mod.rs +++ b/execution/executor-benchmark/src/native/mod.rs @@ -1,4 +1,8 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +pub mod aptos_vm_uncoordinated; pub mod native_config; +pub mod native_transaction; +pub mod native_vm; +pub mod parallel_uncoordinated_block_executor; diff --git a/execution/executor-benchmark/src/native/native_transaction.rs b/execution/executor-benchmark/src/native/native_transaction.rs new file mode 100644 index 0000000000000..8c1ccbd9b89ed --- /dev/null +++ b/execution/executor-benchmark/src/native/native_transaction.rs @@ -0,0 +1,148 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + account_address::AccountAddress, + transaction::signature_verified_transaction::SignatureVerifiedTransaction, +}; +use std::collections::HashMap; + +#[derive(Debug)] +pub enum NativeTransaction { + Nop { + sender: AccountAddress, + sequence_number: u64, + }, + FaTransfer { + sender: AccountAddress, + sequence_number: u64, + recipient: AccountAddress, + amount: u64, + }, + Transfer { + sender: AccountAddress, + sequence_number: u64, + recipient: AccountAddress, + amount: u64, + fail_on_recipient_account_existing: bool, + fail_on_recipient_account_missing: bool, + }, + BatchTransfer { + sender: AccountAddress, + sequence_number: u64, + recipients: Vec, + amounts: Vec, + fail_on_recipient_account_existing: bool, + fail_on_recipient_account_missing: bool, + }, +} + +impl NativeTransaction { + pub fn parse(txn: &SignatureVerifiedTransaction) -> Self { + match &txn.expect_valid() { + aptos_types::transaction::Transaction::UserTransaction(user_txn) => { + match user_txn.payload() { + aptos_types::transaction::TransactionPayload::EntryFunction(f) => { + match ( + *f.module().address(), + f.module().name().as_str(), + f.function().as_str(), + ) { + (AccountAddress::ONE, "aptos_account", "fungible_transfer_only") => { + Self::FaTransfer { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + recipient: bcs::from_bytes(&f.args()[0]).unwrap(), + amount: bcs::from_bytes(&f.args()[1]).unwrap(), + } + }, + (AccountAddress::ONE, "coin", "transfer") => Self::Transfer { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + recipient: bcs::from_bytes(&f.args()[0]).unwrap(), + amount: bcs::from_bytes(&f.args()[1]).unwrap(), + fail_on_recipient_account_existing: false, + fail_on_recipient_account_missing: true, + }, + (AccountAddress::ONE, "aptos_account", "transfer") => Self::Transfer { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + recipient: bcs::from_bytes(&f.args()[0]).unwrap(), + amount: bcs::from_bytes(&f.args()[1]).unwrap(), + fail_on_recipient_account_existing: false, + fail_on_recipient_account_missing: false, + }, + (AccountAddress::ONE, "aptos_account", "create_account") => { + Self::Transfer { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + recipient: bcs::from_bytes(&f.args()[0]).unwrap(), + amount: 0, + fail_on_recipient_account_existing: true, + fail_on_recipient_account_missing: false, + } + }, + (AccountAddress::ONE, "aptos_account", "batch_transfer") => { + Self::BatchTransfer { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + recipients: bcs::from_bytes(&f.args()[0]).unwrap(), + amounts: bcs::from_bytes(&f.args()[1]).unwrap(), + fail_on_recipient_account_existing: false, + fail_on_recipient_account_missing: true, + } + }, + (_, "simple", "nop") => Self::Nop { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + }, + (AccountAddress::ONE, "code", "publish_package_txn") => { + // Publishing doesn't do anything, either we know how to deal + // with later transactions or not. + Self::Nop { + sender: user_txn.sender(), + sequence_number: user_txn.sequence_number(), + } + }, + _ => unimplemented!( + "{} {}::{}", + *f.module().address(), + f.module().name().as_str(), + f.function().as_str() + ), + } + }, + _ => unimplemented!(), + } + }, + _ => unimplemented!(), + } + } +} + +pub fn compute_deltas_for_batch( + recipient_addresses: Vec, + transfer_amounts: Vec, + sender_address: AccountAddress, +) -> (HashMap, u64) { + let mut deltas = HashMap::new(); + for (recipient, amount) in recipient_addresses + .into_iter() + .zip(transfer_amounts.into_iter()) + { + let amount = amount as i64; + deltas + .entry(recipient) + .and_modify(|counter| *counter += amount) + .or_insert(amount); + deltas + .entry(sender_address) + .and_modify(|counter| *counter -= amount) + .or_insert(-amount); + } + + let amount_from_sender = -deltas.remove(&sender_address).unwrap_or(0); + assert!(amount_from_sender >= 0); + + (deltas, amount_from_sender as u64) +} diff --git a/execution/executor-benchmark/src/native/native_vm.rs b/execution/executor-benchmark/src/native/native_vm.rs new file mode 100644 index 0000000000000..0621aebfc8648 --- /dev/null +++ b/execution/executor-benchmark/src/native/native_vm.rs @@ -0,0 +1,939 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + db_access::DbAccessUtil, + native::{ + native_config::{NativeConfig, NATIVE_EXECUTOR_POOL}, + native_transaction::{compute_deltas_for_batch, NativeTransaction}, + }, +}; +use aptos_aggregator::{ + bounded_math::SignedU128, + delayed_change::{DelayedApplyChange, DelayedChange}, + delta_change_set::{DeltaOp, DeltaWithMax}, + delta_math::DeltaHistory, +}; +use aptos_block_executor::{ + code_cache_global_manager::AptosModuleCacheManager, + task::{ExecutionStatus, ExecutorTask}, + txn_commit_hook::NoOpTransactionCommitHook, + txn_provider::default::DefaultTxnProvider, +}; +use aptos_logger::error; +use aptos_mvhashmap::types::TxnIndex; +use aptos_types::{ + account_address::AccountAddress, + account_config::{ + primary_apt_store, AccountResource, CoinDeposit, CoinInfoResource, CoinRegister, + CoinStoreResource, CoinWithdraw, ConcurrentSupplyResource, DepositFAEvent, + FungibleStoreResource, WithdrawFAEvent, + }, + block_executor::{ + config::{BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig}, + transaction_slice_metadata::TransactionSliceMetadata, + }, + contract_event::ContractEvent, + fee_statement::FeeStatement, + move_utils::move_event_v2::MoveEventV2Type, + on_chain_config::FeatureFlag, + state_store::{state_key::StateKey, state_value::StateValueMetadata, StateView}, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, Transaction, + TransactionAuxiliaryData, TransactionOutput, TransactionStatus, WriteSetPayload, + }, + write_set::WriteOp, + AptosCoinType, +}; +use aptos_vm::{ + block_executor::{AptosBlockExecutorWrapper, AptosTransactionOutput}, + VMBlockExecutor, +}; +use aptos_vm_environment::environment::AptosEnvironment; +use aptos_vm_types::{ + abstract_write_op::{ + AbstractResourceWriteOp, GroupWrite, ResourceGroupInPlaceDelayedFieldChangeOp, + }, + change_set::VMChangeSet, + module_write_set::ModuleWriteSet, + output::VMOutput, + resolver::{ExecutorView, ResourceGroupView}, + resource_group_adapter::group_size_as_sum, +}; +use bytes::Bytes; +use move_core_types::{ + language_storage::StructTag, + value::{IdentifierMappingKind, MoveStructLayout, MoveTypeLayout}, + vm_status::VMStatus, +}; +use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; +use serde::{de::DeserializeOwned, Serialize}; +use std::{collections::BTreeMap, fmt::Debug, sync::Arc, u128}; + +pub struct NativeVMBlockExecutor; + +// Executor external API +impl VMBlockExecutor for NativeVMBlockExecutor { + fn new() -> Self { + Self + } + + /// Execute a block of `transactions`. The output vector will have the exact same length as the + /// input vector. The discarded transactions will be marked as `TransactionStatus::Discard` and + /// have an empty `WriteSet`. Also `state_view` is immutable, and does not have interior + /// mutability. Writes to be applied to the data view are encoded in the write set part of a + /// transaction output. + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + onchain_config: BlockExecutorConfigFromOnchain, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + AptosBlockExecutorWrapper::::execute_block_on_thread_pool::< + _, + NoOpTransactionCommitHook, + _, + >( + Arc::clone(&NATIVE_EXECUTOR_POOL), + txn_provider, + state_view, + &AptosModuleCacheManager::new(), + BlockExecutorConfig { + local: BlockExecutorLocalConfig::default_with_concurrency_level( + NativeConfig::get_concurrency_level(), + ), + onchain: onchain_config, + }, + transaction_slice_metadata, + None, + ) + } +} + +pub(crate) struct NativeVMExecutorTask { + fa_migration_complete: bool, + db_util: DbAccessUtil, +} + +impl ExecutorTask for NativeVMExecutorTask { + type Error = VMStatus; + type Output = AptosTransactionOutput; + type Txn = SignatureVerifiedTransaction; + + fn init(env: AptosEnvironment, _state_view: &impl StateView) -> Self { + let fa_migration_complete = env + .features() + .is_enabled(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + let new_accounts_default_to_fa = env + .features() + .is_enabled(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + assert_eq!( + fa_migration_complete, new_accounts_default_to_fa, + "native code only works with both flags either enabled or disabled" + ); + + Self { + fa_migration_complete, + db_util: DbAccessUtil::new(), + } + } + + // This function is called by the BlockExecutor for each transaction it intends + // to execute (via the ExecutorTask trait). It can be as a part of sequential + // execution, or speculatively as a part of a parallel execution. + fn execute_transaction( + &self, + executor_with_group_view: &(impl ExecutorView + ResourceGroupView), + txn: &SignatureVerifiedTransaction, + _txn_idx: TxnIndex, + ) -> ExecutionStatus { + let gas_units = 4; + + match self.execute_transaction_impl( + executor_with_group_view, + txn, + gas_units, + self.fa_migration_complete, + ) { + Ok(change_set) => ExecutionStatus::Success(AptosTransactionOutput::new(VMOutput::new( + change_set, + ModuleWriteSet::empty(), + FeeStatement::new(gas_units, gas_units, 0, 0, 0), + TransactionStatus::Keep(aptos_types::transaction::ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + ))), + Err(_) => ExecutionStatus::SpeculativeExecutionAbortError("something".to_string()), + } + } + + fn is_transaction_dynamic_change_set_capable(txn: &Self::Txn) -> bool { + if txn.is_valid() { + if let Transaction::GenesisTransaction(WriteSetPayload::Direct(_)) = txn.expect_valid() + { + // WriteSetPayload::Direct cannot be handled in mode where delayed_field_optimization or + // resource_groups_split_in_change_set is enabled. + return false; + } + } + true + } +} + +impl NativeVMExecutorTask { + fn execute_transaction_impl( + &self, + view: &(impl ExecutorView + ResourceGroupView), + txn: &SignatureVerifiedTransaction, + gas_units: u64, + fa_migration_complete: bool, + ) -> Result { + let gas = gas_units * 100; + + let mut resource_write_set = BTreeMap::new(); + let mut events = Vec::new(); + let mut delayed_field_change_set = BTreeMap::new(); + let aggregator_v1_write_set = BTreeMap::new(); + let mut aggregator_v1_delta_set = BTreeMap::new(); + + self.reduce_apt_supply( + fa_migration_complete, + gas, + view, + &mut resource_write_set, + &mut delayed_field_change_set, + &mut aggregator_v1_delta_set, + ) + .unwrap(); + + match NativeTransaction::parse(txn) { + NativeTransaction::Nop { + sender, + sequence_number, + } => { + self.check_and_set_sequence_number( + sender, + sequence_number, + view, + &mut resource_write_set, + )?; + self.withdraw_apt( + fa_migration_complete, + sender, + 0, + view, + gas, + &mut resource_write_set, + &mut events, + )?; + }, + NativeTransaction::FaTransfer { + sender, + sequence_number, + recipient, + amount, + } => { + self.check_and_set_sequence_number( + sender, + sequence_number, + view, + &mut resource_write_set, + )?; + self.withdraw_fa_apt_from_signer( + sender, + amount, + view, + gas, + &mut resource_write_set, + &mut events, + )?; + if amount > 0 { + self.deposit_fa_apt( + recipient, + amount, + view, + &mut resource_write_set, + &mut events, + )?; + } + }, + NativeTransaction::Transfer { + sender, + sequence_number, + recipient, + amount, + fail_on_recipient_account_existing: fail_on_account_existing, + fail_on_recipient_account_missing: fail_on_account_missing, + } => { + self.check_and_set_sequence_number( + sender, + sequence_number, + view, + &mut resource_write_set, + )?; + + self.withdraw_apt( + fa_migration_complete, + sender, + amount, + view, + gas, + &mut resource_write_set, + &mut events, + )?; + + let exists = self.deposit_apt( + fa_migration_complete, + recipient, + amount, + view, + &mut resource_write_set, + &mut events, + )?; + + if !exists || fail_on_account_existing { + self.check_or_create_account( + recipient, + fail_on_account_existing, + fail_on_account_missing, + view, + &mut resource_write_set, + )?; + } + }, + NativeTransaction::BatchTransfer { + sender, + sequence_number, + recipients, + amounts, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + } => { + self.check_and_set_sequence_number( + sender, + sequence_number, + view, + &mut resource_write_set, + )?; + + let (deltas, amount_to_sender) = + compute_deltas_for_batch(recipients, amounts, sender); + + self.withdraw_apt( + fa_migration_complete, + sender, + amount_to_sender, + view, + gas, + &mut resource_write_set, + &mut events, + )?; + + for (recipient_address, transfer_amount) in deltas.into_iter() { + let existed = self.deposit_apt( + fa_migration_complete, + recipient_address, + transfer_amount as u64, + view, + &mut resource_write_set, + &mut events, + )?; + + if !existed || fail_on_recipient_account_existing { + self.check_or_create_account( + recipient_address, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + view, + &mut resource_write_set, + )?; + } + } + }, + }; + + events.push(( + FeeStatement::new(gas_units, gas_units, 0, 0, 0).create_event_v2(), + None, + )); + + Ok(VMChangeSet::new( + resource_write_set, + events, + delayed_field_change_set, + aggregator_v1_write_set, + aggregator_v1_delta_set, + )) + } + + pub fn get_value( + state_key: &StateKey, + view: &(impl ExecutorView + ResourceGroupView), + ) -> Result, ()> { + view.get_resource_state_value(state_key, None) + .map_err(hide_error)? + .map(|value| { + bcs::from_bytes::(value.bytes()).map(|bytes| (bytes, value.into_metadata())) + }) + .transpose() + .map_err(hide_error) + } + + pub fn get_value_from_group( + group_key: &StateKey, + resource_tag: &StructTag, + view: &(impl ExecutorView + ResourceGroupView), + ) -> Result, ()> { + Self::get_value_from_group_with_layout(group_key, resource_tag, view, None) + } + + pub fn get_value_from_group_with_layout( + group_key: &StateKey, + resource_tag: &StructTag, + view: &(impl ExecutorView + ResourceGroupView), + maybe_layout: Option<&MoveTypeLayout>, + ) -> Result, ()> { + view.get_resource_from_group(group_key, resource_tag, maybe_layout) + .map_err(hide_error)? + .map(|value| bcs::from_bytes::(&value)) + .transpose() + .map_err(hide_error) + } + + fn check_and_set_sequence_number( + &self, + sender_address: AccountAddress, + sequence_number: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + ) -> Result<(), ()> { + let sender_account_key = self.db_util.new_state_key_account(&sender_address); + + let value = Self::get_value::(&sender_account_key, view)?; + + match value { + Some((mut account, metadata)) => { + if sequence_number == account.sequence_number { + account.sequence_number += 1; + resource_write_set.insert( + sender_account_key, + AbstractResourceWriteOp::Write(WriteOp::modification( + Bytes::from(bcs::to_bytes(&account).map_err(hide_error)?), + metadata, + )), + ); + Ok(()) + } else { + error!( + "Invalid sequence number: txn: {} vs account: {}", + sequence_number, account.sequence_number + ); + Err(()) + } + }, + None => { + error!("Account doesn't exist"); + Err(()) + }, + } + } + + fn check_or_create_account( + &self, + address: AccountAddress, + fail_on_account_existing: bool, + fail_on_account_missing: bool, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + ) -> Result<(), ()> { + let account_key = self.db_util.new_state_key_account(&address); + + let value = Self::get_value::(&account_key, view)?; + match value { + Some(_) => { + if fail_on_account_existing { + return Err(()); + } + }, + None => { + if fail_on_account_missing { + return Err(()); + } else { + let account = DbAccessUtil::new_account_resource(address); + + resource_write_set.insert( + account_key, + AbstractResourceWriteOp::Write(WriteOp::legacy_creation(Bytes::from( + bcs::to_bytes(&account).map_err(hide_error)?, + ))), + ); + } + }, + } + + Ok(()) + } + + fn reduce_apt_supply( + &self, + fa_migration_complete: bool, + gas: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + delayed_field_change_set: &mut BTreeMap>, + aggregator_v1_delta_set: &mut BTreeMap, + ) -> Result<(), ()> { + if fa_migration_complete { + self.reduce_fa_apt_supply(gas, view, resource_write_set, delayed_field_change_set) + } else { + self.reduce_coin_apt_supply(gas, view, aggregator_v1_delta_set) + } + } + + fn reduce_fa_apt_supply( + &self, + gas: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + delayed_field_change_set: &mut BTreeMap>, + ) -> Result<(), ()> { + let apt_metadata_object_state_key = self + .db_util + .new_state_key_object_resource_group(&AccountAddress::TEN); + + let concurrent_supply_rg_tag = &self.db_util.common.concurrent_supply; + + let concurrent_supply_layout = MoveTypeLayout::Struct(MoveStructLayout::new(vec![ + MoveTypeLayout::Native( + IdentifierMappingKind::Aggregator, + Box::new(MoveTypeLayout::U128), + ), + MoveTypeLayout::U128, + ])); + + let supply = Self::get_value_from_group_with_layout::( + &apt_metadata_object_state_key, + concurrent_supply_rg_tag, + view, + Some(&concurrent_supply_layout), + )? + .unwrap(); + + let delayed_id = DelayedFieldID::from(*supply.current.get() as u64); + view.validate_delayed_field_id(&delayed_id).unwrap(); + delayed_field_change_set.insert( + delayed_id, + DelayedChange::Apply(DelayedApplyChange::AggregatorDelta { + delta: DeltaWithMax::new(SignedU128::Negative(gas as u128), u128::MAX), + }), + ); + let materialized_size = view + .get_resource_state_value_size(&apt_metadata_object_state_key) + .map_err(hide_error)? + .unwrap(); + let metadata = view + .get_resource_state_value_metadata(&apt_metadata_object_state_key) + .map_err(hide_error)? + .unwrap(); + resource_write_set.insert( + apt_metadata_object_state_key, + AbstractResourceWriteOp::ResourceGroupInPlaceDelayedFieldChange( + ResourceGroupInPlaceDelayedFieldChangeOp { + materialized_size, + metadata, + }, + ), + ); + Ok(()) + } + + fn reduce_coin_apt_supply( + &self, + gas: u64, + view: &(impl ExecutorView + ResourceGroupView), + aggregator_v1_delta_set: &mut BTreeMap, + ) -> Result<(), ()> { + let (sender_coin_store, _metadata) = Self::get_value::>( + &self.db_util.common.apt_coin_info_resource, + view, + )? + .ok_or(())?; + + let delta_op = DeltaOp::new(SignedU128::Negative(gas as u128), u128::MAX, DeltaHistory { + max_achieved_positive_delta: 0, + min_achieved_negative_delta: gas as u128, + min_overflow_positive_delta: None, + max_underflow_negative_delta: None, + }); + aggregator_v1_delta_set.insert(sender_coin_store.supply_aggregator_state_key(), delta_op); + Ok(()) + } + + fn withdraw_apt( + &self, + fa_migration_complete: bool, + sender: AccountAddress, + amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + gas: u64, + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result<(), ()> { + if fa_migration_complete { + self.withdraw_fa_apt_from_signer( + sender, + amount, + view, + gas, + resource_write_set, + events, + )?; + } else { + self.withdraw_coin_apt_from_signer( + sender, + amount, + view, + gas, + resource_write_set, + events, + )?; + } + Ok(()) + } + + fn withdraw_fa_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + gas: u64, + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result<(), ()> { + let sender_store_address = primary_apt_store(sender_address); + let sender_fa_store_object_key = self + .db_util + .new_state_key_object_resource_group(&sender_store_address); + let fungible_store_rg_tag = &self.db_util.common.fungible_store; + + match Self::get_value_from_group::( + &sender_fa_store_object_key, + fungible_store_rg_tag, + view, + )? { + Some(mut fa_store) => { + if fa_store.balance >= transfer_amount + gas { + fa_store.balance -= transfer_amount + gas; + let fa_store_write = Self::create_single_resource_in_group_modification( + &fa_store, + &sender_fa_store_object_key, + fungible_store_rg_tag.clone(), + view, + )?; + resource_write_set.insert(sender_fa_store_object_key, fa_store_write); + + if transfer_amount > 0 { + events.push(( + WithdrawFAEvent { + store: sender_store_address, + amount: transfer_amount, + } + .create_event_v2(), + None, + )); + } + events.push(( + WithdrawFAEvent { + store: sender_store_address, + amount: gas, + } + .create_event_v2(), + None, + )); + Ok(()) + } else { + Err(()) + } + }, + None => Err(()), + } + } + + fn withdraw_coin_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + gas: u64, + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result<(), ()> { + let sender_coin_store_key = self.db_util.new_state_key_aptos_coin(&sender_address); + + let sender_coin_store_opt = + Self::get_value::>(&sender_coin_store_key, view)?; + + let (mut sender_coin_store, metadata) = match sender_coin_store_opt { + None => { + return self.withdraw_fa_apt_from_signer( + sender_address, + transfer_amount, + view, + gas, + resource_write_set, + events, + ) + }, + Some((sender_coin_store, metadata)) => (sender_coin_store, metadata), + }; + + sender_coin_store.set_coin(sender_coin_store.coin() - transfer_amount - gas); + + // first need to create events, to update the handle, and then serialize sender_coin_store + if transfer_amount > 0 { + events.push(( + CoinWithdraw { + coin_type: self.db_util.common.apt_coin_type_name.clone(), + account: sender_address, + amount: transfer_amount, + } + .create_event_v2(), + None, + )); + } + // coin doesn't emit WithdrawEvent for gas. + + resource_write_set.insert( + sender_coin_store_key, + AbstractResourceWriteOp::Write(WriteOp::modification( + Bytes::from(bcs::to_bytes(&sender_coin_store).map_err(hide_error)?), + metadata, + )), + ); + + Ok(()) + } + + /// Returns bool whether FungibleStore existed. + fn deposit_apt( + &self, + fa_migration_complete: bool, + recipient_address: AccountAddress, + transfer_amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result { + if fa_migration_complete { + self.deposit_fa_apt( + recipient_address, + transfer_amount, + view, + resource_write_set, + events, + ) + } else { + self.deposit_coin_apt( + recipient_address, + transfer_amount, + view, + resource_write_set, + events, + ) + } + } + + /// Returns bool whether FungibleStore existed. + fn deposit_fa_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result { + let recipient_store_address = primary_apt_store(recipient_address); + let recipient_fa_store_object_key = self + .db_util + .new_state_key_object_resource_group(&recipient_store_address); + let fungible_store_rg_tag = &self.db_util.common.fungible_store; + + let (mut fa_store, rest_to_create, existed) = + match Self::get_value_from_group::( + &recipient_fa_store_object_key, + fungible_store_rg_tag, + view, + )? { + Some(fa_store) => (fa_store, None, true), + None => ( + FungibleStoreResource::new(AccountAddress::TEN, 0, false), + Some(BTreeMap::from([( + self.db_util.common.object_core.clone(), + bcs::to_bytes(&DbAccessUtil::new_object_core( + recipient_store_address, + recipient_address, + )) + .map_err(hide_error)?, + )])), + false, + ), + }; + + fa_store.balance += transfer_amount; + + let fa_store_write = if existed { + Self::create_single_resource_in_group_modification( + &fa_store, + &recipient_fa_store_object_key, + fungible_store_rg_tag.clone(), + view, + )? + } else { + let mut rg = rest_to_create.unwrap(); + rg.insert( + fungible_store_rg_tag.clone(), + bcs::to_bytes(&fa_store).map_err(hide_error)?, + ); + Self::create_resource_in_group_creation(&recipient_fa_store_object_key, rg, view)? + }; + resource_write_set.insert(recipient_fa_store_object_key, fa_store_write); + + if transfer_amount > 0 { + let event = DepositFAEvent { + store: recipient_store_address, + amount: transfer_amount, + }; + events.push((event.create_event_v2(), None)); + } + Ok(existed) + } + + fn deposit_coin_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + view: &(impl ExecutorView + ResourceGroupView), + resource_write_set: &mut BTreeMap, + events: &mut Vec<(ContractEvent, Option)>, + ) -> Result { + let recipient_coin_store_key = self.db_util.new_state_key_aptos_coin(&recipient_address); + let (mut recipient_coin_store, recipient_coin_store_metadata, existed) = + match Self::get_value::>( + &recipient_coin_store_key, + view, + )? { + Some((recipient_coin_store, metadata)) => { + (recipient_coin_store, Some(metadata), true) + }, + None => { + events.push(( + CoinRegister { + account: AccountAddress::ONE, + type_info: DbAccessUtil::new_type_info_resource::() + .map_err(hide_error)?, + } + .create_event_v2(), + None, + )); + ( + DbAccessUtil::new_apt_coin_store(0, recipient_address), + None, + false, + ) + }, + }; + + recipient_coin_store.set_coin(recipient_coin_store.coin() + transfer_amount); + + // first need to create events, to update the handle, and then serialize sender_coin_store + if transfer_amount > 0 { + events.push(( + CoinDeposit { + coin_type: self.db_util.common.apt_coin_type_name.clone(), + account: recipient_address, + amount: transfer_amount, + } + .create_event_v2(), + None, + )) + } + let write_op = if existed { + WriteOp::modification( + Bytes::from(bcs::to_bytes(&recipient_coin_store).map_err(hide_error)?), + recipient_coin_store_metadata.unwrap(), + ) + } else { + WriteOp::legacy_creation(Bytes::from( + bcs::to_bytes(&recipient_coin_store).map_err(hide_error)?, + )) + }; + resource_write_set.insert( + recipient_coin_store_key, + AbstractResourceWriteOp::Write(write_op), + ); + + Ok(existed) + } + + fn create_single_resource_in_group_modification( + value: &T, + group_key: &StateKey, + resource_tag: StructTag, + view: &(impl ExecutorView + ResourceGroupView), + ) -> Result { + let metadata = view + .get_resource_state_value_metadata(group_key) + .map_err(hide_error)? + .unwrap(); + let size = view.resource_group_size(group_key).map_err(hide_error)?; + let value_bytes = Bytes::from(bcs::to_bytes(value).map_err(hide_error)?); + let group_write = AbstractResourceWriteOp::WriteResourceGroup(GroupWrite::new( + WriteOp::modification(Bytes::new(), metadata), + BTreeMap::from([( + resource_tag, + (WriteOp::legacy_modification(value_bytes), None), + )]), + size, + size.get(), + )); + Ok(group_write) + } + + fn create_resource_in_group_creation( + group_key: &StateKey, + resources: BTreeMap>, + view: &(impl ExecutorView + ResourceGroupView), + ) -> Result { + let size = view.resource_group_size(group_key).map_err(hide_error)?; + assert_eq!(size.get(), 0); + let inner_ops = resources + .into_iter() + .map(|(resource_tag, value)| -> Result<_, ()> { + Ok(( + resource_tag, + (WriteOp::legacy_creation(Bytes::from(value)), None), + )) + }) + .collect::, ()>>()?; + + let new_size = group_size_as_sum( + inner_ops + .iter() + .map(|(resource_tag, (value, _layout))| (resource_tag, value.bytes_size())), + ) + .map_err(hide_error)?; + + let group_write = AbstractResourceWriteOp::WriteResourceGroup(GroupWrite::new( + WriteOp::legacy_creation(Bytes::new()), + inner_ops, + new_size, + size.get(), + )); + Ok(group_write) + } +} + +fn hide_error(e: E) { + error!("encountered error {:?}, hiding", e); +} diff --git a/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs b/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs new file mode 100644 index 0000000000000..3b364c645424a --- /dev/null +++ b/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs @@ -0,0 +1,1241 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::native_transaction::compute_deltas_for_batch; +use crate::{ + db_access::DbAccessUtil, + metrics::TIMER, + native::{native_config::NATIVE_EXECUTOR_POOL, native_transaction::NativeTransaction}, +}; +use anyhow::{bail, Result}; +use aptos_block_executor::{ + counters::BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, txn_provider::default::DefaultTxnProvider, +}; +use aptos_types::{ + account_address::AccountAddress, + account_config::{ + primary_apt_store, AccountResource, CoinDeposit, CoinInfoResource, CoinRegister, + CoinStoreResource, CoinWithdraw, ConcurrentSupplyResource, DepositFAEvent, + FungibleStoreResource, WithdrawFAEvent, + }, + block_executor::{ + config::BlockExecutorConfigFromOnchain, + transaction_slice_metadata::TransactionSliceMetadata, + }, + contract_event::ContractEvent, + fee_statement::FeeStatement, + move_utils::move_event_v2::MoveEventV2Type, + on_chain_config::{FeatureFlag, Features, OnChainConfig}, + state_store::{state_key::StateKey, StateView}, + transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, ExecutionStatus, + TransactionAuxiliaryData, TransactionOutput, TransactionStatus, + }, + vm_status::{StatusCode, VMStatus}, + write_set::{WriteOp, WriteSetMut}, + AptosCoinType, +}; +use aptos_vm::VMBlockExecutor; +use dashmap::{ + mapref::one::{Ref, RefMut}, + DashMap, +}; +use once_cell::sync::OnceCell; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use std::{ + cell::Cell, + collections::BTreeMap, + hash::RandomState, + sync::atomic::{AtomicU64, Ordering}, + u64, +}; +use thread_local::ThreadLocal; + +/// Executes transactions fully, and produces TransactionOutput (with final WriteSet) +/// (unlike execution within BlockSTM that produces non-materialized VMChangeSet) +pub trait RawTransactionExecutor: Sync { + type BlockState: Sync; + + fn new() -> Self; + + fn init_block_state(&self, state_view: &(impl StateView + Sync)) -> Self::BlockState; + + fn execute_transaction( + &self, + txn: NativeTransaction, + state_view: &(impl StateView + Sync), + block_state: &Self::BlockState, + ) -> Result; +} + +pub struct NativeParallelUncoordinatedBlockExecutor { + executor: E, +} + +impl VMBlockExecutor + for NativeParallelUncoordinatedBlockExecutor +{ + fn new() -> Self { + Self { executor: E::new() } + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + _onchain_config: BlockExecutorConfigFromOnchain, + _transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let native_transactions = NATIVE_EXECUTOR_POOL.install(|| { + txn_provider + .get_txns() + .par_iter() + .map(NativeTransaction::parse) + .collect::>() + }); + + let _timer = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.start_timer(); + + let state = self.executor.init_block_state(state_view); + + let transaction_outputs = NATIVE_EXECUTOR_POOL + .install(|| { + native_transactions + .into_par_iter() + .map(|txn| self.executor.execute_transaction(txn, state_view, &state)) + .collect::>>() + }) + .map_err(|e| { + VMStatus::error( + StatusCode::DELAYED_FIELD_OR_BLOCKSTM_CODE_INVARIANT_ERROR, + Some(format!("{:?}", e).to_string()), + ) + })?; + + Ok(BlockOutput::new(transaction_outputs, None)) + } +} + +pub struct IncrementalOutput { + write_set: Vec<(StateKey, WriteOp)>, + events: Vec, +} + +impl IncrementalOutput { + fn new() -> Self { + IncrementalOutput { + write_set: vec![], + events: vec![], + } + } + + fn into_success_output(mut self, gas: u64) -> Result { + self.events + .push(FeeStatement::new(gas, gas, 0, 0, 0).create_event_v2()); + + Ok(TransactionOutput::new( + WriteSetMut::new(self.write_set).freeze()?, + self.events, + /*gas_used=*/ gas, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + )) + } +} + +pub trait CommonNativeRawTransactionExecutor: Sync + Send { + fn new_impl() -> Self; + + fn update_sequence_number( + &self, + sender_address: AccountAddress, + sequence_number: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; + + fn reduce_apt_supply( + &self, + fa_migration_complete: bool, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + if fa_migration_complete { + self.reduce_fa_apt_supply(gas, state_view, output) + } else { + self.reduce_coin_apt_supply(gas, state_view, output) + } + } + + fn reduce_fa_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; + + fn reduce_coin_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; + + fn withdraw_apt_from_signer( + &self, + fa_migration_complete: bool, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + if fa_migration_complete { + self.withdraw_fa_apt_from_signer( + sender_address, + transfer_amount, + gas, + state_view, + output, + ) + } else { + self.withdraw_coin_apt_from_signer( + sender_address, + transfer_amount, + gas, + state_view, + output, + ) + } + } + + fn withdraw_fa_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; + + fn withdraw_coin_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; + + fn deposit_apt( + &self, + fa_migration_complete: bool, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result { + if fa_migration_complete { + self.deposit_fa_apt(recipient_address, transfer_amount, state_view, output) + } else { + self.deposit_coin_apt(recipient_address, transfer_amount, state_view, output) + } + } + + fn deposit_fa_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result; + + fn deposit_coin_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result; + + fn check_or_create_account( + &self, + address: AccountAddress, + fail_on_account_existing: bool, + fail_on_account_missing: bool, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()>; +} + +impl RawTransactionExecutor for T { + type BlockState = bool; + + fn new() -> Self { + Self::new_impl() + } + + fn init_block_state(&self, state_view: &(impl StateView + Sync)) -> bool { + let features = Features::fetch_config(&state_view).unwrap_or_default(); + let fa_migration_complete = + features.is_enabled(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + let new_accounts_default_to_fa = + features.is_enabled(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + assert_eq!( + fa_migration_complete, new_accounts_default_to_fa, + "native code only works with both flags either enabled or disabled" + ); + + fa_migration_complete + } + + fn execute_transaction( + &self, + txn: NativeTransaction, + state_view: &(impl StateView + Sync), + block_state: &bool, + ) -> Result { + let fa_migration_complete = *block_state; + + let gas_unit = 4; // hardcode gas consumed. + let gas = gas_unit * 100; + + let mut output = IncrementalOutput::new(); + + match txn { + NativeTransaction::Nop { + sender, + sequence_number, + } => { + self.update_sequence_number(sender, sequence_number, state_view, &mut output)?; + + self.withdraw_apt_from_signer( + fa_migration_complete, + sender, + 0, + gas, + state_view, + &mut output, + )?; + }, + NativeTransaction::FaTransfer { + sender, + sequence_number, + recipient, + amount, + } => { + self.update_sequence_number(sender, sequence_number, state_view, &mut output)?; + + self.withdraw_fa_apt_from_signer(sender, amount, gas, state_view, &mut output)?; + + let _existed = self.deposit_fa_apt(recipient, amount, state_view, &mut output)?; + }, + NativeTransaction::Transfer { + sender, + sequence_number, + recipient, + amount, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + } => { + self.update_sequence_number(sender, sequence_number, state_view, &mut output)?; + self.withdraw_apt_from_signer( + fa_migration_complete, + sender, + amount, + gas, + state_view, + &mut output, + )?; + + let existed = self.deposit_apt( + fa_migration_complete, + recipient, + amount, + state_view, + &mut output, + )?; + + if !existed || fail_on_recipient_account_existing { + self.check_or_create_account( + recipient, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + state_view, + &mut output, + )?; + } + }, + NativeTransaction::BatchTransfer { + sender, + sequence_number, + recipients, + amounts, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + } => { + self.update_sequence_number(sender, sequence_number, state_view, &mut output)?; + + let (deltas, amount_to_sender) = + compute_deltas_for_batch(recipients, amounts, sender); + + self.withdraw_apt_from_signer( + fa_migration_complete, + sender, + amount_to_sender, + gas, + state_view, + &mut output, + )?; + + for (recipient_address, transfer_amount) in deltas.into_iter() { + let existed = self.deposit_apt( + fa_migration_complete, + recipient_address, + transfer_amount as u64, + state_view, + &mut output, + )?; + + if !existed || fail_on_recipient_account_existing { + self.check_or_create_account( + recipient_address, + fail_on_recipient_account_existing, + fail_on_recipient_account_missing, + state_view, + &mut output, + )?; + } + } + }, + }; + + self.reduce_apt_supply(fa_migration_complete, gas, state_view, &mut output)?; + + output.into_success_output(gas) + } +} + +pub struct NativeRawTransactionExecutor { + db_util: DbAccessUtil, +} + +impl CommonNativeRawTransactionExecutor for NativeRawTransactionExecutor { + fn new_impl() -> Self { + Self { + db_util: DbAccessUtil::new(), + } + } + + fn update_sequence_number( + &self, + sender_address: AccountAddress, + sequence_number: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let sender_account_key = self.db_util.new_state_key_account(&sender_address); + let mut sender_account = + DbAccessUtil::get_account(&sender_account_key, state_view)?.unwrap(); + + sender_account.sequence_number = sequence_number + 1; + + output.write_set.push(( + sender_account_key, + WriteOp::legacy_modification(bcs::to_bytes(&sender_account)?.into()), + )); + Ok(()) + } + + fn reduce_fa_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let apt_metadata_object_state_key = self + .db_util + .new_state_key_object_resource_group(&AccountAddress::TEN); + + let concurrent_supply_rg_tag = &self.db_util.common.concurrent_supply; + + let mut apt_metadata_object = + DbAccessUtil::get_resource_group(&apt_metadata_object_state_key, state_view)?.unwrap(); + let mut supply = bcs::from_bytes::( + &apt_metadata_object + .remove(concurrent_supply_rg_tag) + .unwrap(), + )?; + + supply.current.set(supply.current.get() - gas as u128); + + apt_metadata_object.insert(concurrent_supply_rg_tag.clone(), bcs::to_bytes(&supply)?); + + output.write_set.push(( + apt_metadata_object_state_key, + WriteOp::legacy_modification(bcs::to_bytes(&apt_metadata_object)?.into()), + )); + + Ok(()) + } + + fn reduce_coin_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let coin_info = DbAccessUtil::get_value::>( + &self.db_util.common.apt_coin_info_resource, + state_view, + )? + .ok_or_else(|| anyhow::anyhow!("no coin info"))?; + + let total_supply_state_key = coin_info.supply_aggregator_state_key(); + let total_supply = DbAccessUtil::get_value::(&total_supply_state_key, state_view)? + .ok_or_else(|| anyhow::anyhow!("no total supply"))?; + + output.write_set.push(( + total_supply_state_key, + WriteOp::legacy_modification(bcs::to_bytes(&(total_supply - gas as u128))?.into()), + )); + + Ok(()) + } + + fn withdraw_fa_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let sender_store_address = primary_apt_store(sender_address); + + let sender_fa_store_object_key = self + .db_util + .new_state_key_object_resource_group(&sender_store_address); + let mut sender_fa_store_object = { + let _timer = TIMER + .with_label_values(&["read_sender_fa_store"]) + .start_timer(); + match DbAccessUtil::get_resource_group(&sender_fa_store_object_key, state_view)? { + Some(sender_fa_store_object) => sender_fa_store_object, + None => bail!("sender fa store missing"), + } + }; + + let fungible_store_rg_tag = &self.db_util.common.fungible_store; + let mut sender_fa_store = bcs::from_bytes::( + &sender_fa_store_object + .remove(fungible_store_rg_tag) + .unwrap(), + )?; + + sender_fa_store.balance -= transfer_amount + gas; + + sender_fa_store_object.insert( + fungible_store_rg_tag.clone(), + bcs::to_bytes(&sender_fa_store)?, + ); + + output.write_set.push(( + sender_fa_store_object_key, + WriteOp::legacy_modification(bcs::to_bytes(&sender_fa_store_object)?.into()), + )); + + if transfer_amount > 0 { + output.events.push( + WithdrawFAEvent { + store: sender_store_address, + amount: transfer_amount, + } + .create_event_v2(), + ); + } + + output.events.push( + WithdrawFAEvent { + store: sender_store_address, + amount: gas, + } + .create_event_v2(), + ); + Ok(()) + } + + fn withdraw_coin_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let sender_coin_store_key = self.db_util.new_state_key_aptos_coin(&sender_address); + let sender_coin_store_opt = { + let _timer = TIMER + .with_label_values(&["read_sender_coin_store"]) + .start_timer(); + DbAccessUtil::get_apt_coin_store(&sender_coin_store_key, state_view)? + }; + let mut sender_coin_store = match sender_coin_store_opt { + None => { + return self.withdraw_fa_apt_from_signer( + sender_address, + transfer_amount, + gas, + state_view, + output, + ) + }, + Some(sender_coin_store) => sender_coin_store, + }; + + sender_coin_store.set_coin(sender_coin_store.coin() - transfer_amount - gas); + + if transfer_amount != 0 { + output.events.push( + CoinWithdraw { + coin_type: self.db_util.common.apt_coin_type_name.clone(), + account: sender_address, + amount: transfer_amount, + } + .create_event_v2(), + ); + // Coin doesn't emit Withdraw event for gas + } + + output.write_set.push(( + sender_coin_store_key, + WriteOp::legacy_modification(bcs::to_bytes(&sender_coin_store)?.into()), + )); + + Ok(()) + } + + fn deposit_fa_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result { + let recipient_store_address = primary_apt_store(recipient_address); + let recipient_fa_store_object_key = self + .db_util + .new_state_key_object_resource_group(&recipient_store_address); + let fungible_store_rg_tag = &self.db_util.common.fungible_store; + + let (mut recipient_fa_store, mut recipient_fa_store_object, recipient_fa_store_existed) = + match DbAccessUtil::get_resource_group(&recipient_fa_store_object_key, state_view)? { + Some(mut recipient_fa_store_object) => { + let recipient_fa_store = bcs::from_bytes::( + &recipient_fa_store_object + .remove(fungible_store_rg_tag) + .unwrap(), + )?; + (recipient_fa_store, recipient_fa_store_object, true) + }, + None => { + let receipeint_fa_store = + FungibleStoreResource::new(AccountAddress::TEN, 0, false); + let receipeint_fa_store_object = BTreeMap::from([( + self.db_util.common.object_core.clone(), + bcs::to_bytes(&DbAccessUtil::new_object_core( + recipient_store_address, + recipient_address, + ))?, + )]); + (receipeint_fa_store, receipeint_fa_store_object, false) + }, + }; + + recipient_fa_store.balance += transfer_amount; + + recipient_fa_store_object.insert( + fungible_store_rg_tag.clone(), + bcs::to_bytes(&recipient_fa_store)?, + ); + + output.write_set.push(( + recipient_fa_store_object_key, + if recipient_fa_store_existed { + WriteOp::legacy_modification(bcs::to_bytes(&recipient_fa_store_object)?.into()) + } else { + WriteOp::legacy_creation(bcs::to_bytes(&recipient_fa_store_object)?.into()) + }, + )); + + if transfer_amount != 0 { + output.events.push( + DepositFAEvent { + store: recipient_store_address, + amount: transfer_amount, + } + .create_event_v2(), + ) + } + + Ok(recipient_fa_store_existed) + } + + fn deposit_coin_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result { + let recipient_coin_store_key = self.db_util.new_state_key_aptos_coin(&recipient_address); + + let (mut recipient_coin_store, recipient_coin_store_existed) = + match DbAccessUtil::get_apt_coin_store(&recipient_coin_store_key, state_view)? { + Some(recipient_coin_store) => (recipient_coin_store, true), + None => { + output.events.push( + CoinRegister { + account: AccountAddress::ONE, + type_info: DbAccessUtil::new_type_info_resource::()?, + } + .create_event_v2(), + ); + ( + DbAccessUtil::new_apt_coin_store(0, recipient_address), + false, + ) + }, + }; + + recipient_coin_store.set_coin(recipient_coin_store.coin() + transfer_amount); + + // first need to create events, to update the handle, and then serialize sender_coin_store + if transfer_amount != 0 { + output.events.push( + CoinDeposit { + coin_type: self.db_util.common.apt_coin_type_name.clone(), + account: recipient_address, + amount: transfer_amount, + } + .create_event_v2(), + ); + } + + output.write_set.push(( + recipient_coin_store_key, + if recipient_coin_store_existed { + WriteOp::legacy_modification(bcs::to_bytes(&recipient_coin_store)?.into()) + } else { + WriteOp::legacy_creation(bcs::to_bytes(&recipient_coin_store)?.into()) + }, + )); + + Ok(recipient_coin_store_existed) + } + + fn check_or_create_account( + &self, + address: AccountAddress, + fail_on_account_existing: bool, + fail_on_account_missing: bool, + state_view: &(impl StateView + Sync), + output: &mut IncrementalOutput, + ) -> Result<()> { + let account_key = self.db_util.new_state_key_account(&address); + match DbAccessUtil::get_account(&account_key, state_view)? { + Some(_) => { + if fail_on_account_existing { + bail!("account exists"); + } + }, + None => { + if fail_on_account_missing { + bail!("account missing") + } else { + let account = DbAccessUtil::new_account_resource(address); + output.write_set.push(( + account_key, + WriteOp::legacy_creation(bcs::to_bytes(&account)?.into()), + )); + } + }, + } + + Ok(()) + } +} + +const USE_THREAD_LOCAL_SUPPLY: bool = true; +struct CoinSupply { + pub total_supply: u128, +} + +struct SupplyWithDecrement { + #[allow(dead_code)] + pub base: u128, + pub decrement: ThreadLocal>, +} + +enum CachedResource { + Account(AccountResource), + FungibleStore(FungibleStoreResource), + FungibleSupply(ConcurrentSupplyResource), + AptCoinStore(CoinStoreResource), + AptCoinInfo(CoinInfoResource), + AptCoinSupply(CoinSupply), + SupplyDecrement(SupplyWithDecrement), +} + +pub struct NativeValueCacheRawTransactionExecutor { + db_util: DbAccessUtil, + cache: DashMap, + coin_supply_state_key: OnceCell, +} + +impl CommonNativeRawTransactionExecutor for NativeValueCacheRawTransactionExecutor { + fn new_impl() -> Self { + Self { + db_util: DbAccessUtil::new(), + cache: DashMap::new(), + coin_supply_state_key: OnceCell::new(), + } + } + + fn update_sequence_number( + &self, + sender_address: AccountAddress, + sequence_number: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let sender_account_key = self.db_util.new_state_key_account(&sender_address); + + match self + .cache_get_mut_or_init(&sender_account_key, |key| { + CachedResource::Account( + DbAccessUtil::get_account(key, state_view).unwrap().unwrap(), + ) + }) + .value_mut() + { + CachedResource::Account(account) => { + account.sequence_number = sequence_number + 1; + }, + _ => { + panic!("wrong type") + }, + }; + Ok(()) + } + + fn check_or_create_account( + &self, + address: AccountAddress, + fail_on_account_existing: bool, + fail_on_account_missing: bool, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let account_key = self.db_util.new_state_key_account(&address); + let mut missing = false; + self.cache_get_mut_or_init(&account_key, |key| { + CachedResource::Account(match DbAccessUtil::get_account(key, state_view).unwrap() { + Some(account) => account, + None => { + missing = true; + assert!(!fail_on_account_missing); + DbAccessUtil::new_account_resource(address) + }, + }) + }); + if fail_on_account_existing { + assert!(missing); + } + Ok(()) + } + + fn reduce_fa_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let cache_key = + StateKey::resource(&AccountAddress::TEN, &self.db_util.common.concurrent_supply) + .unwrap(); + + if USE_THREAD_LOCAL_SUPPLY { + let entry = self.cache_get_or_init(&cache_key, |_key| { + let concurrent_supply = self.fetch_concurrent_supply(state_view); + CachedResource::SupplyDecrement(SupplyWithDecrement { + base: *concurrent_supply.current.get(), + decrement: ThreadLocal::new(), + }) + }); + match entry.value() { + CachedResource::SupplyDecrement(SupplyWithDecrement { decrement, .. }) => { + let decrement_cell = decrement.get_or_default(); + decrement_cell.set(decrement_cell.get() + gas as u128); + }, + _ => panic!("wrong type"), + } + } else { + let mut entry = self.cache_get_mut_or_init(&cache_key, |_key| { + let concurrent_supply = self.fetch_concurrent_supply(state_view); + CachedResource::FungibleSupply(concurrent_supply) + }); + match entry.value_mut() { + CachedResource::FungibleSupply(fungible_supply) => { + fungible_supply + .current + .set(fungible_supply.current.get() - gas as u128); + }, + _ => panic!("wrong type"), + }; + } + + Ok(()) + } + + fn reduce_coin_apt_supply( + &self, + gas: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let total_supply_state_key = self.coin_supply_state_key.get_or_init(|| { + let entry = + self.cache_get_mut_or_init(&self.db_util.common.apt_coin_info_resource, |key| { + CachedResource::AptCoinInfo( + DbAccessUtil::get_value::>(key, state_view) + .unwrap() + .unwrap(), + ) + }); + + let total_supply_state_key = match entry.value() { + CachedResource::AptCoinInfo(coin_info) => coin_info.supply_aggregator_state_key(), + _ => panic!("wrong type"), + }; + total_supply_state_key + }); + + if USE_THREAD_LOCAL_SUPPLY { + let total_supply_entry = self.cache_get_or_init(total_supply_state_key, |key| { + CachedResource::SupplyDecrement(SupplyWithDecrement { + base: DbAccessUtil::get_value::(key, state_view) + .unwrap() + .unwrap(), + decrement: ThreadLocal::new(), + }) + }); + match total_supply_entry.value() { + CachedResource::SupplyDecrement(SupplyWithDecrement { decrement, .. }) => { + let decrement_cell = decrement.get_or_default(); + decrement_cell.set(decrement_cell.get() + gas as u128); + }, + _ => panic!("wrong type"), + } + } else { + let mut total_supply_entry = + self.cache_get_mut_or_init(total_supply_state_key, |key| { + CachedResource::AptCoinSupply(CoinSupply { + total_supply: DbAccessUtil::get_value::(key, state_view) + .unwrap() + .unwrap(), + }) + }); + + match total_supply_entry.value_mut() { + CachedResource::AptCoinSupply(coin_supply) => { + coin_supply.total_supply -= gas as u128; + }, + _ => panic!("wrong type"), + }; + } + + Ok(()) + } + + fn withdraw_fa_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let _existed = + self.update_fa_balance(sender_address, state_view, 0, transfer_amount + gas, true); + Ok(()) + } + + fn withdraw_coin_apt_from_signer( + &self, + sender_address: AccountAddress, + transfer_amount: u64, + gas: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result<()> { + let _existed = + self.update_coin_balance(sender_address, state_view, 0, transfer_amount + gas, true); + Ok(()) + } + + fn deposit_fa_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result { + let existed = + self.update_fa_balance(recipient_address, state_view, transfer_amount, 0, false); + Ok(existed) + } + + fn deposit_coin_apt( + &self, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &(impl StateView + Sync), + _output: &mut IncrementalOutput, + ) -> Result { + let existed = + self.update_coin_balance(recipient_address, state_view, transfer_amount, 0, false); + Ok(existed) + } +} + +impl NativeValueCacheRawTransactionExecutor { + fn cache_get_or_init<'a>( + &'a self, + key: &StateKey, + init_value: impl FnOnce(&StateKey) -> CachedResource, + ) -> Ref<'a, StateKey, CachedResource, RandomState> { + // Data in cache is going to be the hot path, so short-circuit here to avoid cloning the key. + if let Some(ref_mut) = self.cache.get(key) { + return ref_mut; + } + + self.cache + .entry(key.clone()) + .or_insert(init_value(key)) + .downgrade() + } + + fn cache_get_mut_or_init<'a>( + &'a self, + key: &StateKey, + init_value: impl FnOnce(&StateKey) -> CachedResource, + ) -> RefMut<'a, StateKey, CachedResource, RandomState> { + // Data in cache is going to be the hot path, so short-circuit here to avoid cloning the key. + if let Some(ref_mut) = self.cache.get_mut(key) { + return ref_mut; + } + + self.cache.entry(key.clone()).or_insert(init_value(key)) + } + + fn fetch_concurrent_supply( + &self, + state_view: &(impl StateView + Sync), + ) -> ConcurrentSupplyResource { + let concurrent_supply_rg_tag = &self.db_util.common.concurrent_supply; + + let apt_metadata_object_state_key = self + .db_util + .new_state_key_object_resource_group(&AccountAddress::TEN); + + let mut apt_metadata_object = + DbAccessUtil::get_resource_group(&apt_metadata_object_state_key, state_view) + .unwrap() + .unwrap(); + + bcs::from_bytes::( + &apt_metadata_object + .remove(concurrent_supply_rg_tag) + .unwrap(), + ) + .unwrap() + } + + fn update_fa_balance( + &self, + account: AccountAddress, + state_view: &(impl StateView + Sync), + increment: u64, + decrement: u64, + fail_on_missing: bool, + ) -> bool { + let store_address = primary_apt_store(account); + let fungible_store_rg_tag = &self.db_util.common.fungible_store; + let cache_key = StateKey::resource(&store_address, fungible_store_rg_tag).unwrap(); + + let mut exists = true; + let mut entry = self.cache.entry(cache_key).or_insert_with(|| { + let fa_store_object_key = self + .db_util + .new_state_key_object_resource_group(&store_address); + let rg_opt = + DbAccessUtil::get_resource_group(&fa_store_object_key, state_view).unwrap(); + CachedResource::FungibleStore(match rg_opt { + Some(mut rg) => { + bcs::from_bytes(&rg.remove(fungible_store_rg_tag).unwrap()).unwrap() + }, + None => { + exists = false; + assert!(!fail_on_missing); + FungibleStoreResource::new(AccountAddress::TEN, 0, false) + }, + }) + }); + match entry.value_mut() { + CachedResource::FungibleStore(fungible_store_resource) => { + fungible_store_resource.balance += increment; + fungible_store_resource.balance -= decrement; + }, + _ => panic!("wrong type"), + }; + exists + } + + fn update_coin_balance( + &self, + account: AccountAddress, + state_view: &(impl StateView + Sync), + increment: u64, + decrement: u64, + fail_on_missing: bool, + ) -> bool { + let coin_store_key = self.db_util.new_state_key_aptos_coin(&account); + let mut exists = true; + + let mut entry = self.cache_get_mut_or_init(&coin_store_key, |key| { + CachedResource::AptCoinStore( + match DbAccessUtil::get_apt_coin_store(key, state_view).unwrap() { + Some(store) => store, + None => { + exists = false; + assert!(!fail_on_missing); + DbAccessUtil::new_apt_coin_store(0, account) + }, + }, + ) + }); + + match entry.value_mut() { + CachedResource::AptCoinStore(coin_store) => { + coin_store.set_coin(coin_store.coin() + increment - decrement); + }, + _ => panic!("wrong type"), + }; + + exists + } +} + +pub struct NativeNoStorageRawTransactionExecutor { + seq_nums: DashMap, + balances: DashMap, + total_supply_decrement: ThreadLocal>, + total_supply: AtomicU64, +} + +impl RawTransactionExecutor for NativeNoStorageRawTransactionExecutor { + type BlockState = (); + + fn new() -> Self { + Self { + seq_nums: DashMap::new(), + balances: DashMap::new(), + total_supply_decrement: ThreadLocal::new(), + total_supply: AtomicU64::new(u64::MAX), + } + } + + fn init_block_state(&self, _state_view: &(impl StateView + Sync)) {} + + fn execute_transaction( + &self, + txn: NativeTransaction, + _state_view: &(impl StateView + Sync), + _block_state: &(), + ) -> Result { + let gas_units = 4; + let gas = gas_units * 100; + + if USE_THREAD_LOCAL_SUPPLY { + let decrement_cell = self.total_supply_decrement.get_or_default(); + decrement_cell.set(decrement_cell.get() + gas as u128); + } else { + self.total_supply.fetch_sub(gas, Ordering::Relaxed); + } + + let output = IncrementalOutput::new(); + let (sender, sequence_number) = match txn { + NativeTransaction::Nop { + sender, + sequence_number, + } => { + *self + .balances + .entry(sender) + .or_insert(100_000_000_000_000_000) -= gas; + (sender, sequence_number) + }, + NativeTransaction::FaTransfer { + sender, + sequence_number, + recipient, + amount, + } + | NativeTransaction::Transfer { + sender, + sequence_number, + recipient, + amount, + .. + } => { + *self + .balances + .entry(sender) + .or_insert(100_000_000_000_000_000) -= amount + gas; + *self + .balances + .entry(recipient) + .or_insert(100_000_000_000_000_000) += amount; + (sender, sequence_number) + }, + NativeTransaction::BatchTransfer { + sender, + sequence_number, + recipients, + amounts, + .. + } => { + let (deltas, amount_from_sender) = + compute_deltas_for_batch(recipients, amounts, sender); + + *self + .balances + .entry(sender) + .or_insert(100_000_000_000_000_000) -= amount_from_sender; + + for (recipient, amount) in deltas.into_iter() { + *self + .balances + .entry(recipient) + .or_insert(100_000_000_000_000_000) += amount as u64; + } + (sender, sequence_number) + }, + }; + + self.seq_nums.insert(sender, sequence_number); + output.into_success_output(gas) + } +} diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs deleted file mode 100644 index 69d4eba89ddca..0000000000000 --- a/execution/executor-benchmark/src/native_executor.rs +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - db_access::{Account, CoinStore, DbAccessUtil}, - metrics::TIMER, -}; -use anyhow::Result; -use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; -use aptos_types::{ - account_address::AccountAddress, - account_config::{DepositEvent, WithdrawEvent}, - block_executor::{ - config::BlockExecutorConfigFromOnchain, partitioner::PartitionedTransactions, - transaction_slice_metadata::TransactionSliceMetadata, - }, - contract_event::ContractEvent, - event::EventKey, - state_store::{state_key::StateKey, StateView}, - transaction::{ - signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, ExecutionStatus, - Transaction, TransactionAuxiliaryData, TransactionOutput, TransactionStatus, - }, - vm_status::AbortLocation, - write_set::{WriteOp, WriteSet, WriteSetMut}, -}; -use aptos_vm::{ - sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, - VMBlockExecutor, -}; -use move_core_types::{ - ident_str, - language_storage::{ModuleId, TypeTag}, - move_resource::MoveStructType, - vm_status::{StatusCode, VMStatus}, -}; -use once_cell::sync::{Lazy, OnceCell}; -use rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}; -use std::{collections::HashMap, sync::Arc}; - -struct IncrementalOutput { - write_set: Vec<(StateKey, WriteOp)>, - events: Vec, -} - -impl IncrementalOutput { - fn into_success_output(self) -> Result { - Ok(TransactionOutput::new( - WriteSetMut::new(self.write_set).freeze()?, - self.events, - /*gas_used=*/ 1, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )) - } - - fn append(&mut self, mut other: IncrementalOutput) { - self.write_set.append(&mut other.write_set); - self.events.append(&mut other.events); - } - - fn to_abort(status: TransactionStatus) -> TransactionOutput { - TransactionOutput::new( - Default::default(), - vec![], - 0, - status, - TransactionAuxiliaryData::default(), - ) - } -} - -pub struct NativeExecutor; - -static NATIVE_EXECUTOR_CONCURRENCY_LEVEL: OnceCell = OnceCell::new(); -static NATIVE_EXECUTOR_POOL: Lazy = Lazy::new(|| { - ThreadPoolBuilder::new() - .num_threads(NativeExecutor::get_concurrency_level()) - .thread_name(|index| format!("native_exe_{}", index)) - .build() - .unwrap() -}); - -impl NativeExecutor { - pub fn set_concurrency_level_once(concurrency_level: usize) { - NATIVE_EXECUTOR_CONCURRENCY_LEVEL - .set(concurrency_level) - .ok(); - } - - pub fn get_concurrency_level() -> usize { - match NATIVE_EXECUTOR_CONCURRENCY_LEVEL.get() { - Some(concurrency_level) => *concurrency_level, - None => 32, - } - } - - fn withdraw_from_signer( - sender_address: AccountAddress, - transfer_amount: u64, - state_view: &impl StateView, - ) -> Result> { - let sender_account_key = DbAccessUtil::new_state_key_account(sender_address); - let mut sender_account = { - let _timer = TIMER - .with_label_values(&["read_sender_account"]) - .start_timer(); - DbAccessUtil::get_account(&sender_account_key, state_view)?.unwrap() - }; - let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(sender_address); - let mut sender_coin_store = { - let _timer = TIMER - .with_label_values(&["read_sender_coin_store"]) - .start_timer(); - DbAccessUtil::get_coin_store(&sender_coin_store_key, state_view)?.unwrap() - }; - - // Note: numbers below may not be real. When runninng in parallel there might be conflicts. - sender_coin_store.coin -= transfer_amount; - - let gas = 1; - sender_coin_store.coin -= gas; - - sender_account.sequence_number += 1; - - // add total supply via aggregators? - // let mut total_supply: u128 = - // DbAccessUtil::get_value(&TOTAL_SUPPLY_STATE_KEY, state_view)?.unwrap(); - // total_supply -= gas as u128; - - // TODO(grao): Add other reads to match the read set of the real transaction. - let write_set = vec![ - ( - sender_account_key, - WriteOp::legacy_modification(bcs::to_bytes(&sender_account)?.into()), - ), - ( - sender_coin_store_key, - WriteOp::legacy_modification(bcs::to_bytes(&sender_coin_store)?.into()), - ), - // ( - // TOTAL_SUPPLY_STATE_KEY.clone(), - // WriteOp::legacy_modification(bcs::to_bytes(&total_supply)?), - // ), - ]; - - // TODO(grao): Some values are fake, because I'm lazy. - let events = vec![ContractEvent::new_v1( - EventKey::new(0, sender_address), - 0, - TypeTag::Struct(Box::new(WithdrawEvent::struct_tag())), - sender_address.to_vec(), - )]; - Ok(Ok(IncrementalOutput { write_set, events })) - } - - fn deposit( - recipient_address: AccountAddress, - transfer_amount: u64, - state_view: &impl StateView, - fail_on_existing: bool, - fail_on_missing: bool, - ) -> Result> { - let recipient_account_key = DbAccessUtil::new_state_key_account(recipient_address); - let recipient_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(recipient_address); - - let recipient_account = { - let _timer = TIMER.with_label_values(&["read_new_account"]).start_timer(); - DbAccessUtil::get_account(&recipient_account_key, state_view)? - }; - - let mut write_set = Vec::new(); - if recipient_account.is_some() { - if fail_on_existing { - return Ok(Err(TransactionStatus::Keep(ExecutionStatus::MoveAbort { - location: AbortLocation::Module(ModuleId::new( - AccountAddress::ONE, - ident_str!("account").into(), - )), - code: 7, - info: None, - }))); - } - - let mut recipient_coin_store = { - let _timer = TIMER - .with_label_values(&["read_new_coin_store"]) - .start_timer(); - DbAccessUtil::get_coin_store(&recipient_coin_store_key, state_view)?.unwrap() - }; - - if transfer_amount != 0 { - recipient_coin_store.coin += transfer_amount; - - write_set.push(( - recipient_coin_store_key, - WriteOp::legacy_modification(bcs::to_bytes(&recipient_coin_store)?.into()), - )); - } - } else { - if fail_on_missing { - return Ok(Err(TransactionStatus::Keep(ExecutionStatus::MoveAbort { - location: AbortLocation::Module(ModuleId::new( - AccountAddress::ONE, - ident_str!("account").into(), - )), - code: 8, - info: None, - }))); - } - - { - let _timer = TIMER - .with_label_values(&["read_new_coin_store"]) - .start_timer(); - assert!( - DbAccessUtil::get_coin_store(&recipient_coin_store_key, state_view)?.is_none() - ); - } - - let recipient_account = Account { - authentication_key: recipient_address.to_vec(), - ..Default::default() - }; - - let recipient_coin_store = CoinStore { - coin: transfer_amount, - ..Default::default() - }; - - write_set.push(( - recipient_account_key, - WriteOp::legacy_creation(bcs::to_bytes(&recipient_account)?.into()), - )); - write_set.push(( - recipient_coin_store_key, - WriteOp::legacy_creation(bcs::to_bytes(&recipient_coin_store)?.into()), - )); - } - - let events = vec![ - ContractEvent::new_v1( - EventKey::new(0, recipient_address), - 0, - TypeTag::Struct(Box::new(DepositEvent::struct_tag())), - recipient_address.to_vec(), - ), // TODO(grao): CoinRegisterEvent - ]; - Ok(Ok(IncrementalOutput { write_set, events })) - } - - fn handle_account_creation_and_transfer( - sender_address: AccountAddress, - recipient_address: AccountAddress, - transfer_amount: u64, - state_view: &impl StateView, - fail_on_existing: bool, - fail_on_missing: bool, - ) -> Result { - let _timer = TIMER.with_label_values(&["account_creation"]).start_timer(); - - let mut output = { - let output = Self::withdraw_from_signer(sender_address, transfer_amount, state_view)?; - match output { - Ok(output) => output, - Err(status) => return Ok(IncrementalOutput::to_abort(status)), - } - }; - - let deposit_output = Self::deposit( - recipient_address, - transfer_amount, - state_view, - fail_on_existing, - fail_on_missing, - )?; - - match deposit_output { - Ok(deposit_output) => { - output.append(deposit_output); - output.into_success_output() - }, - Err(status) => Ok(IncrementalOutput::to_abort(status)), - } - } - - fn handle_batch_account_creation_and_transfer( - sender_address: AccountAddress, - recipient_addresses: Vec, - transfer_amounts: Vec, - state_view: &impl StateView, - fail_on_existing: bool, - fail_on_missing: bool, - ) -> Result { - let mut deltas = HashMap::new(); - for (recipient, amount) in recipient_addresses - .into_iter() - .zip(transfer_amounts.into_iter()) - { - let amount = amount as i64; - deltas - .entry(recipient) - .and_modify(|counter| *counter += amount) - .or_insert(amount); - deltas - .entry(sender_address) - .and_modify(|counter| *counter -= amount) - .or_insert(-amount); - } - - let amount_to_sender = -deltas.remove(&sender_address).unwrap_or(0); - - assert!(amount_to_sender >= 0); - let mut output = { - let output = - Self::withdraw_from_signer(sender_address, amount_to_sender as u64, state_view)?; - match output { - Ok(output) => output, - Err(status) => return Ok(IncrementalOutput::to_abort(status)), - } - }; - - for (recipient_address, transfer_amount) in deltas.into_iter() { - output.append({ - let deposit_output = Self::deposit( - recipient_address, - transfer_amount as u64, - state_view, - fail_on_existing, - fail_on_missing, - )?; - - match deposit_output { - Ok(deposit_output) => deposit_output, - Err(status) => return Ok(IncrementalOutput::to_abort(status)), - } - }); - } - - output.into_success_output() - } - - fn handle_state_checkpoint() -> Result { - Ok(TransactionOutput::new( - WriteSet::default(), - vec![], - /*gas_used=*/ 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - )) - } -} - -impl VMBlockExecutor for NativeExecutor { - fn new() -> Self { - Self - } - - fn execute_block( - &self, - txn_provider: &DefaultTxnProvider, - state_view: &(impl StateView + Sync), - _onchain_config: BlockExecutorConfigFromOnchain, - _transaction_slice_metadata: TransactionSliceMetadata, - ) -> Result, VMStatus> { - let num_txns = txn_provider.num_txns(); - let transaction_outputs = NATIVE_EXECUTOR_POOL - .install(|| { - (0..num_txns) - .into_par_iter() - .map(|idx| { - let txn = txn_provider.get_txn(idx as u32); - match &txn.expect_valid() { - Transaction::StateCheckpoint(_) => Self::handle_state_checkpoint(), - Transaction::UserTransaction(user_txn) => match user_txn.payload() { - aptos_types::transaction::TransactionPayload::EntryFunction(f) => { - match ( - *f.module().address(), - f.module().name().as_str(), - f.function().as_str(), - ) { - (AccountAddress::ONE, "coin", "transfer") => { - Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - true, - ) - }, - (AccountAddress::ONE, "aptos_account", "transfer") => { - Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - false, - ) - }, - ( - AccountAddress::ONE, - "aptos_account", - "create_account", - ) => Self::handle_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - 0, - &state_view, - true, - false, - ), - ( - AccountAddress::ONE, - "aptos_account", - "batch_transfer", - ) => Self::handle_batch_account_creation_and_transfer( - user_txn.sender(), - bcs::from_bytes(&f.args()[0]).unwrap(), - bcs::from_bytes(&f.args()[1]).unwrap(), - &state_view, - false, - true, - ), - _ => unimplemented!( - "{} {}::{}", - *f.module().address(), - f.module().name().as_str(), - f.function().as_str() - ), - } - }, - _ => unimplemented!(), - }, - _ => unimplemented!(), - } - }) - .collect::>>() - }) - .map_err(|err| VMStatus::Error { - status_code: StatusCode::ABORTED, - sub_status: None, - message: Some(err.to_string()), - })?; - Ok(BlockOutput::new(transaction_outputs, None)) - } - - fn execute_block_sharded>( - _sharded_block_executor: &ShardedBlockExecutor, - _transactions: PartitionedTransactions, - _state_view: Arc, - _onchain_config: BlockExecutorConfigFromOnchain, - ) -> std::result::Result, VMStatus> { - unimplemented!() - } -} diff --git a/execution/executor-benchmark/src/pipeline.rs b/execution/executor-benchmark/src/pipeline.rs index 5a80a1e85e475..2252d6bbeeac8 100644 --- a/execution/executor-benchmark/src/pipeline.rs +++ b/execution/executor-benchmark/src/pipeline.rs @@ -46,7 +46,7 @@ pub struct PipelineConfig { } pub struct Pipeline { - join_handles: Vec>, + join_handles: Vec>, phantom: PhantomData, start_pipeline_tx: Option>, } @@ -137,12 +137,15 @@ where .name("block_preparation".to_string()) .spawn(move || { start_pipeline_rx.map(|rx| rx.recv()); + let mut processed = 0; while let Ok(txns) = raw_block_receiver.recv() { + processed += txns.len() as u64; let exe_block_msg = preparation_stage.process(txns); executable_block_sender.send(exe_block_msg).unwrap(); } info!("Done preparation"); start_execution_tx.map(|tx| tx.send(())); + processed }) .expect("Failed to spawn block partitioner thread."); join_handles.push(preparation_thread); @@ -202,6 +205,7 @@ where overall_measuring.print_end("Overall execution", executed); } start_ledger_update_tx.map(|tx| tx.send(())); + executed }) .expect("Failed to spawn transaction executor thread."); join_handles.push(exe_thread); @@ -218,6 +222,8 @@ where ledger_update_stage.ledger_update(ledger_update_msg); } start_commit_tx.map(|tx| tx.send(())); + + 0 }) .expect("Failed to spawn ledger update thread."); join_handles.push(ledger_update_thread); @@ -231,6 +237,8 @@ where let mut committer = TransactionCommitter::new(executor_3, start_version, commit_receiver); committer.run(); + + 0 }) .expect("Failed to spawn transaction committer thread."); join_handles.push(commit_thread); @@ -250,10 +258,15 @@ where self.start_pipeline_tx.as_ref().map(|tx| tx.send(())); } - pub fn join(self) { + pub fn join(self) -> Option { + let mut counts = vec![]; for handle in self.join_handles { - handle.join().unwrap() + let count = handle.join().unwrap(); + if count > 0 { + counts.push(count); + } } + counts.into_iter().min() } } diff --git a/testsuite/single_node_performance.py b/testsuite/single_node_performance.py index 7cfdc79eb327e..8d78dc89d8b9d 100755 --- a/testsuite/single_node_performance.py +++ b/testsuite/single_node_performance.py @@ -31,6 +31,8 @@ class Flow(Flag): AGG_V2 = auto() # Test resource groups RESOURCE_GROUPS = auto() + # Test different executor types + EXECUTORS = auto() # Tests that are run on LAND_BLOCKING and continuously on main @@ -101,8 +103,10 @@ class Flow(Flag): if os.environ.get("DISABLE_FA_APT"): FEATURE_FLAGS = "" + FA_MIGRATION_COMPLETE = False else: FEATURE_FLAGS = "--enable-feature NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE --enable-feature OPERATIONS_DEFAULT_TO_FA_APT_STORE" + FA_MIGRATION_COMPLETE = True if os.environ.get("ENABLE_PRUNER"): DB_PRUNER_FLAGS = "--enable-state-pruner --enable-ledger-pruner --enable-epoch-snapshot-pruner --ledger-pruning-batch-size 10000 --state-prune-window 3000000 --epoch-snapshot-prune-window 3000000 --ledger-prune-window 3000000" @@ -128,7 +132,9 @@ class RunGroupKeyExtra: sig_verify_num_threads_override: Optional[int] = field(default=None) execution_num_threads_override: Optional[int] = field(default=None) split_stages_override: bool = field(default=False) + skip_commit_override: bool = field(default=False) single_block_dst_working_set: bool = field(default=False) + execution_sharding: bool = field(default=False) @dataclass @@ -164,13 +170,16 @@ class RunGroupConfig: no-op 1 VM 6 0.938 1.019 38925.3 no-op 1000 VM 6 0.943 1.019 36444.6 apt-fa-transfer 1 VM 6 0.927 1.018 26954.7 +apt-fa-transfer 1 NativeVM 6 0.927 1.018 35259.7 account-generation 1 VM 6 0.96 1.02 20606.2 +account-generation 1 NativeVM 6 0.96 1.02 28216.2 account-resource32-b 1 VM 6 0.94 1.026 34260.4 modify-global-resource 1 VM 6 0.993 1.021 2260.5 modify-global-resource 100 VM 6 0.982 1.02 33129.7 publish-package 1 VM 6 0.983 1.012 1672.6 mix_publish_transfer 1 VM 6 0.972 1.044 20832.8 batch100-transfer 1 VM 6 0.953 1.024 645.1 +batch100-transfer 1 NativeVM 6 0.953 1.024 1437.0 vector-picture30k 1 VM 6 0.992 1.039 103.6 vector-picture30k 100 VM 6 0.913 1.015 1831.5 smart-table-picture30-k-with200-change 1 VM 6 0.976 1.034 16.1 @@ -213,10 +222,9 @@ class RunGroupConfig: RunGroupConfig(key=RunGroupKey("no-op"), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("no-op", module_working_set_size=1000), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("apt-fa-transfer"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - # RunGroupConfig(key=RunGroupKey("apt-fa-transfer", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), - + RunGroupConfig(key=RunGroupKey("apt-fa-transfer", executor_type="NativeVM"), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("account-generation"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - # RunGroupConfig(key=RunGroupKey("account-generation", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), + RunGroupConfig(key=RunGroupKey("account-generation", executor_type="NativeVM"), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("account-resource32-b"), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("modify-global-resource"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), RunGroupConfig(key=RunGroupKey("modify-global-resource", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), @@ -226,7 +234,7 @@ class RunGroupConfig: transaction_weights_override="1 100", ), included_in=LAND_BLOCKING_AND_C, waived=True), RunGroupConfig(key=RunGroupKey("batch100-transfer"), included_in=LAND_BLOCKING_AND_C), - # RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), + RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="NativeVM"), included_in=Flow.CONTINUOUS), RunGroupConfig(expected_tps=100, key=RunGroupKey("vector-picture40"), included_in=Flow(0), waived=True), RunGroupConfig(expected_tps=1000, key=RunGroupKey("vector-picture40", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), @@ -298,8 +306,62 @@ class RunGroupConfig: RunGroupConfig(expected_tps=6800, key=RunGroupKey("token-v2-ambassador-mint"), included_in=Flow.MAINNET_LARGE_DB), # RunGroupConfig(expected_tps=17000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), # RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), - +] + [ + # no-commit throughput of different executor, used on continuous flow + RunGroupConfig( + expected_tps=40000, + key=RunGroupKey( + "no_commit_{}{}".format( + transaction_type:="apt-fa-transfer" if FA_MIGRATION_COMPLETE else "coin-transfer", + "_sharding" if executor_sharding else "", + ), + executor_type=executor_type + ), + key_extra=RunGroupKeyExtra( + transaction_type_override=transaction_type, + sig_verify_num_threads_override=16, + skip_commit_override=True, + execution_sharding=executor_sharding, + ), + included_in=Flow.CONTINUOUS, + waived=True, + ) + for executor_sharding, executor_types in [ + (False, ["VM", "NativeVM", "AptosVMSpeculative", "NativeSpeculative"]), + # executor sharding doesn't support FA for now. + (True, [] if FA_MIGRATION_COMPLETE else ["VM", "NativeVM"]) + ] + for executor_type in executor_types +] + [ + # sweep of all executors for the extensive EXECUTORS flow + RunGroupConfig( + expected_tps=10000 if sequential else 30000, + key=RunGroupKey( + "{}_{}_by_stages".format( + transaction_type:="apt-fa-transfer" if FA_MIGRATION_COMPLETE else "coin-transfer", + "sequential" if sequential else "parallel" + ), + executor_type=executor_type + ), + key_extra=RunGroupKeyExtra( + transaction_type_override=transaction_type, + sig_verify_num_threads_override=1 if sequential else NUMBER_OF_EXECUTION_THREADS, + execution_num_threads_override=1 if sequential else None, + split_stages_override=True, + single_block_dst_working_set=True, + ), + included_in=Flow.EXECUTORS, + waived=True, + ) + for sequential in [True, False] + for executor_sharding, executor_types in [ + (False, ["VM", "NativeVM", "AptosVMSpeculative", "NativeSpeculative", "NativeValueCacheSpeculative", "NativeNoStorageSpeculative"]), + # executor sharding doesn't support FA for now. + (True, [] if FA_MIGRATION_COMPLETE else ["VM", "NativeVM"]) + ] + for executor_type in executor_types ] + # fmt: on # Run the single node with performance optimizations enabled @@ -721,30 +783,37 @@ def print_table( if test.key_extra.split_stages_override: pipeline_extra_args.append("--split-stages") + if test.key_extra.skip_commit_override: + pipeline_extra_args.append("--skip-commit") - sharding_traffic_flags = test.key_extra.sharding_traffic_flags or "" + pipeline_extra_args.append( + test.key_extra.sharding_traffic_flags or "--transactions-per-sender 1" + ) if test.key.executor_type == "VM": - executor_type_str = "--block-executor-type aptos-vm-with-block-stm --transactions-per-sender 1" - # elif test.key.executor_type == "NativeVM": - # executor_type_str = ( - # "--block-executor-type native-vm-with-block-stm --transactions-per-sender 1" - # ) + executor_type_str = "--block-executor-type aptos-vm-with-block-stm" + elif test.key.executor_type == "NativeVM": + executor_type_str = "--block-executor-type native-vm-with-block-stm" + elif test.key.executor_type == "AptosVMSpeculative": + executor_type_str = "--block-executor-type aptos-vm-parallel-uncoordinated" elif test.key.executor_type == "NativeSpeculative": - executor_type_str = "--block-executor-type native-loose-speculative --transactions-per-sender 1" - # elif test.key.executor_type == "NativeValueCacheSpeculative": - # executor_type_str = ( - # "--block-executor-type native-value-cache-loose-speculative --transactions-per-sender 1" - # ) - # elif test.key.executor_type == "NativeNoStorageSpeculative": - # executor_type_str = ( - # "--block-executor-type native-no-storage-loose-speculative --transactions-per-sender 1" - # ) - elif test.key.executor_type == "sharded": - executor_type_str = f"--num-executor-shards {number_of_execution_threads} {sharding_traffic_flags}" + executor_type_str = "--block-executor-type native-parallel-uncoordinated" + elif test.key.executor_type == "NativeValueCacheSpeculative": + executor_type_str = ( + "--block-executor-type native-value-cache-parallel-uncoordinated" + ) + elif test.key.executor_type == "NativeNoStorageSpeculative": + executor_type_str = ( + "--block-executor-type native-no-storage-parallel-uncoordinated" + ) else: raise Exception(f"executor type not supported {test.key.executor_type}") + if test.key_extra.execution_sharding: + pipeline_extra_args.append( + f"--num-executor-shards {number_of_execution_threads}" + ) + if NUM_BLOCKS < 200: pipeline_extra_args.append("--generate-then-execute") diff --git a/types/src/account_config/resources/coin_store.rs b/types/src/account_config/resources/coin_store.rs index 472d9234c7112..aeb81816f0b48 100644 --- a/types/src/account_config/resources/coin_store.rs +++ b/types/src/account_config/resources/coin_store.rs @@ -73,6 +73,10 @@ impl CoinStoreResource { self.coin } + pub fn set_coin(&mut self, coin: u64) { + self.coin = coin; + } + pub fn frozen(&self) -> bool { self.frozen } diff --git a/types/src/account_config/resources/core_account.rs b/types/src/account_config/resources/core_account.rs index 2334276811a9d..8a0672fe744c0 100644 --- a/types/src/account_config/resources/core_account.rs +++ b/types/src/account_config/resources/core_account.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; #[cfg_attr(any(test, feature = "fuzzing"), derive(Arbitrary))] pub struct AccountResource { authentication_key: Vec, - sequence_number: u64, + pub sequence_number: u64, guid_creation_num: u64, coin_register_events: EventHandle, key_rotation_events: EventHandle,