Skip to content

Commit

Permalink
[Executor] Merge sequential & parallel execution flow (#4683)
Browse files Browse the repository at this point in the history
* Merge sequential and parallel flows

* rename parallel to block
  • Loading branch information
gelash authored Nov 29, 2022
1 parent a92815f commit feec33f
Show file tree
Hide file tree
Showing 38 changed files with 519 additions and 497 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/aptos-move/framework/aptos-token @areshand
/aptos-move/framework/src/natives/cryptography/ @alinush
/aptos-move/framework/src/natives/aggregator_natives/ @georgemitenkov @gelash @zekun000
/aptos-move/parallel-executor/ @gelash @zekun000
/aptos-move/block-executor/ @gelash @zekun000 @sasha8
/aptos-move/vm-genesis/ @davidiw @movekevin

# Owner for logger config
Expand Down
58 changes: 29 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"aptos-move/aptos-transactional-test-harness",
"aptos-move/aptos-validator-interface",
"aptos-move/aptos-vm",
"aptos-move/block-executor",
"aptos-move/e2e-move-tests",
"aptos-move/e2e-tests",
"aptos-move/e2e-testsuite",
Expand All @@ -26,7 +27,6 @@ members = [
"aptos-move/move-examples",
"aptos-move/mvhashmap",
"aptos-move/package-builder",
"aptos-move/parallel-executor",
"aptos-move/vm-genesis",
"aptos-move/writeset-transaction-generator",
"aptos-node",
Expand Down Expand Up @@ -173,6 +173,7 @@ aptos-aggregator = { path = "aptos-move/aptos-aggregator" }
aptos-api = { path = "api" }
aptos-api-test-context = { path = "api/test-context" }
aptos-api-types = { path = "api/types" }
aptos-block-executor = { path = "aptos-move/block-executor" }
aptos-bitvec = { path = "crates/aptos-bitvec" }
aptos-build-info = { path = "crates/aptos-build-info" }
aptos-compression = { path = "crates/aptos-compression" }
Expand Down Expand Up @@ -203,7 +204,6 @@ aptos-network-checker = { path = "crates/aptos-network-checker" }
aptos-node = { path = "aptos-node" }
aptos-node-checker = { path = "ecosystem/node-checker" }
aptos-openapi = { path = "crates/aptos-openapi" }
aptos-parallel-executor = { path = "aptos-move/parallel-executor" }
aptos-proptest-helpers = { path = "crates/aptos-proptest-helpers" }
aptos-protos = { path = "crates/aptos-protos" }
aptos-push-metrics = { path = "crates/aptos-push-metrics" }
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-aggregator/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl TransactionOutputExt {
&self.output
}

// TODO: rename to unpack() and consider other into()'s in the crate.
pub fn into(self) -> (DeltaChangeSet, TransactionOutput) {
(self.delta_change_set, self.output)
}
Expand Down
8 changes: 3 additions & 5 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use aptos_types::{
on_chain_config::{OnChainConfig, ValidatorSet},
transaction::Transaction,
};
use aptos_vm::{
data_cache::AsMoveResolver, parallel_executor::ParallelAptosVM, AptosVM, VMExecutor,
};
use aptos_vm::{block_executor::BlockAptosVM, data_cache::AsMoveResolver};
use criterion::{measurement::Measurement, BatchSize, Bencher};
use language_e2e_tests::{
account_universe::{log_balance_strategy, AUTransactionGen, AccountUniverseGen},
Expand Down Expand Up @@ -188,15 +186,15 @@ impl TransactionBenchState {
fn execute(self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
AptosVM::execute_block(self.transactions, self.executor.get_state_view())
BlockAptosVM::execute_block(self.transactions, self.executor.get_state_view(), 1)
.expect("VM should not fail to start");
}

/// Executes this state in a single block via parallel execution.
fn execute_parallel(self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
ParallelAptosVM::execute_block(
BlockAptosVM::execute_block(
self.transactions,
self.executor.get_state_view(),
num_cpus::get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use aptos_types::{
};
use aptos_vm::{
data_cache::{AsMoveResolver, IntoMoveResolver, StorageAdapterOwned},
AptosVM,
AptosVM, VMExecutor,
};
use clap::StructOpt;
use language_e2e_tests::data_store::{FakeDataStore, GENESIS_CHANGE_SET_HEAD};
Expand Down Expand Up @@ -472,23 +472,26 @@ impl<'a> AptosTestAdapter<'a> {
/// Should error if the transaction ends up being discarded, or having a status other than
/// EXECUTED.
fn run_transaction(&mut self, txn: Transaction) -> Result<TransactionOutput> {
let mut outputs = AptosVM::execute_block_and_keep_vm_status(vec![txn], &self.storage)?;
let mut outputs = AptosVM::execute_block(vec![txn], &self.storage)?;

assert_eq!(outputs.len(), 1);

let (status, output) = outputs.pop().unwrap();
let output = outputs.pop().unwrap();
match output.status() {
TransactionStatus::Keep(kept_vm_status) => {
self.storage.add_write_set(output.write_set());
match kept_vm_status {
ExecutionStatus::Success => Ok(output),
_ => {
bail!("Failed to execute transaction. ExecutionStatus: {}", status)
bail!(
"Failed to execute transaction. ExecutionStatus: {:?}",
kept_vm_status
)
}
}
}
TransactionStatus::Discard(_) => {
bail!("Transaction discarded. VMStatus: {}", status)
TransactionStatus::Discard(status_code) => {
bail!("Transaction discarded. VM status code: {:?}", status_code)
}
TransactionStatus::Retry => panic!(),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
processed 4 tasks

task 1 'publish'. lines 4-30:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 2 'run'. lines 33-33:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 3 'view'. lines 35-35:
[No Resource Exists]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
processed 3 tasks

task 1 'run'. lines 5-13:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 2 'view'. lines 15-15:
key 0x1::coin::CoinStore<0x1::aptos_coin::AptosCoin> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
processed 2 tasks

task 1 'publish'. lines 4-9:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
processed 3 tasks

task 2 'run'. lines 29-29:
Error: Failed to execute transaction. ExecutionStatus: status STORAGE_WRITE_LIMIT_REACHED of type Execution
Error: Failed to execute transaction. ExecutionStatus: ExecutionFailure { location: Script, function: 0, code_offset: 0 }
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-aggregator = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-crypto = { workspace = true }
aptos-crypto-derive = { workspace = true }
aptos-gas = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-module-verifier = { workspace = true }
aptos-parallel-executor = { workspace = true }
aptos-state-view = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
Expand Down
72 changes: 0 additions & 72 deletions aptos-move/aptos-vm/src/adapter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ use crate::{
logging::AdapterLogSchema,
move_vm_ext::{MoveResolverExt, SessionExt, SessionId},
};
use aptos_logger::prelude::*;
use aptos_types::{
block_metadata::BlockMetadata,
transaction::{Transaction, TransactionOutput, TransactionStatus, WriteSetPayload},
write_set::WriteSet,
};
use rayon::prelude::*;

/// This trait describes the VM adapter's interface.
/// TODO: bring more of the execution logic in aptos_vm into this file.
Expand Down Expand Up @@ -133,76 +131,6 @@ pub(crate) fn validate_signature_checked_transaction<S: MoveResolverExt, A: VMAd
}
}

pub(crate) fn execute_block_impl<A: VMAdapter, S: StateView>(
adapter: &A,
transactions: Vec<Transaction>,
data_cache: &mut StateViewCache<S>,
) -> Result<Vec<(VMStatus, TransactionOutput)>, VMStatus> {
let mut result = vec![];
let mut should_restart = false;

info!(
AdapterLogSchema::new(data_cache.id(), 0),
"Executing block, transaction count: {}",
transactions.len()
);

let signature_verified_block: Vec<PreprocessedTransaction>;
{
// Verify the signatures of all the transactions in parallel.
// This is time consuming so don't wait and do the checking
// sequentially while executing the transactions.
signature_verified_block = transactions
.into_par_iter()
.map(preprocess_transaction::<A>)
.collect();
}

for (idx, txn) in signature_verified_block.into_iter().enumerate() {
let log_context = AdapterLogSchema::new(data_cache.id(), idx);
if should_restart {
let txn_output =
TransactionOutput::new(WriteSet::default(), vec![], 0, TransactionStatus::Retry);
result.push((VMStatus::Error(StatusCode::UNKNOWN_STATUS), txn_output));
debug!(log_context, "Retry after reconfiguration");
continue;
};
let (vm_status, output_ext, sender) = adapter.execute_single_transaction(
&txn,
&data_cache.as_move_resolver(),
&log_context,
)?;

// Apply deltas.
let output = output_ext.into_transaction_output(&data_cache);

if !output.status().is_discarded() {
data_cache.push_write_set(output.write_set());
} else {
match sender {
Some(s) => trace!(
log_context,
"Transaction discarded, sender: {}, error: {:?}",
s,
vm_status,
),
None => trace!(log_context, "Transaction malformed, error: {:?}", vm_status,),
}
}

if A::should_restart_execution(&output) {
info!(
AdapterLogSchema::new(data_cache.id(), 0),
"Reconfiguration occurred: restart required",
);
should_restart = true;
}

result.push((vm_status, output))
}
Ok(result)
}

/// Transactions after signature checking:
/// Waypoints and BlockPrologues are not signed and are unaffected by signature checking,
/// but a user transaction or writeset transaction is transformed to a SignatureCheckedTransaction.
Expand Down
Loading

0 comments on commit feec33f

Please sign in to comment.