Skip to content

Commit

Permalink
rename parallel to block
Browse files Browse the repository at this point in the history
  • Loading branch information
gelash committed Nov 23, 2022
1 parent ca6a504 commit d22cc15
Show file tree
Hide file tree
Showing 30 changed files with 107 additions and 112 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/aptos-move/framework/aptos-token @areshand
/aptos-move/framework/src/natives/cryptography/ @alinush
/aptos-move/framework/src/natives/aggregator_natives/ @georgemitenkov @gelash @zekun000
/aptos-move/parallel-executor/ @gelash @zekun000
/aptos-move/block-executor/ @gelash @zekun000 @sasha8
/aptos-move/vm-genesis/ @davidiw @movekevin

# Owner for logger config
Expand Down
58 changes: 29 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"aptos-move/aptos-transactional-test-harness",
"aptos-move/aptos-validator-interface",
"aptos-move/aptos-vm",
"aptos-move/block-executor",
"aptos-move/e2e-move-tests",
"aptos-move/e2e-tests",
"aptos-move/e2e-testsuite",
Expand All @@ -26,7 +27,6 @@ members = [
"aptos-move/move-examples",
"aptos-move/mvhashmap",
"aptos-move/package-builder",
"aptos-move/parallel-executor",
"aptos-move/vm-genesis",
"aptos-move/writeset-transaction-generator",
"aptos-node",
Expand Down Expand Up @@ -173,6 +173,7 @@ aptos-aggregator = { path = "aptos-move/aptos-aggregator" }
aptos-api = { path = "api" }
aptos-api-test-context = { path = "api/test-context" }
aptos-api-types = { path = "api/types" }
aptos-block-executor = { path = "aptos-move/block-executor" }
aptos-bitvec = { path = "crates/aptos-bitvec" }
aptos-build-info = { path = "crates/aptos-build-info" }
aptos-compression = { path = "crates/aptos-compression" }
Expand Down Expand Up @@ -203,7 +204,6 @@ aptos-network-checker = { path = "crates/aptos-network-checker" }
aptos-node = { path = "aptos-node" }
aptos-node-checker = { path = "ecosystem/node-checker" }
aptos-openapi = { path = "crates/aptos-openapi" }
aptos-parallel-executor = { path = "aptos-move/parallel-executor" }
aptos-proptest-helpers = { path = "crates/aptos-proptest-helpers" }
aptos-protos = { path = "crates/aptos-protos" }
aptos-push-metrics = { path = "crates/aptos-push-metrics" }
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-aggregator/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl TransactionOutputExt {
&self.output
}

// TODO: rename to unpack() and consider other into()'s in the crate.
pub fn into(self) -> (DeltaChangeSet, TransactionOutput) {
(self.delta_change_set, self.output)
}
Expand Down
8 changes: 3 additions & 5 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use aptos_types::{
on_chain_config::{OnChainConfig, ValidatorSet},
transaction::Transaction,
};
use aptos_vm::{
data_cache::AsMoveResolver, parallel_executor::ParallelAptosVM, AptosVM, VMExecutor,
};
use aptos_vm::{block_executor::BlockAptosVM, data_cache::AsMoveResolver};
use criterion::{measurement::Measurement, BatchSize, Bencher};
use language_e2e_tests::{
account_universe::{log_balance_strategy, AUTransactionGen, AccountUniverseGen},
Expand Down Expand Up @@ -188,15 +186,15 @@ impl TransactionBenchState {
fn execute(self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
AptosVM::execute_block(self.transactions, self.executor.get_state_view())
BlockAptosVM::execute_block(self.transactions, self.executor.get_state_view(), 1)
.expect("VM should not fail to start");
}

/// Executes this state in a single block via parallel execution.
fn execute_parallel(self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
ParallelAptosVM::execute_block(
BlockAptosVM::execute_block(
self.transactions,
self.executor.get_state_view(),
num_cpus::get(),
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-aggregator = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-crypto = { workspace = true }
aptos-crypto-derive = { workspace = true }
aptos-gas = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-module-verifier = { workspace = true }
aptos-parallel-executor = { workspace = true }
aptos-state-view = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
Expand Down
19 changes: 15 additions & 4 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::{
validate_signed_transaction, PreprocessedTransaction, VMAdapter,
},
aptos_vm_impl::{get_transaction_output, AptosVMImpl, AptosVMInternals},
block_executor::BlockAptosVM,
counters::*,
data_cache::{AsMoveResolver, IntoMoveResolver},
delta_state_view::DeltaStateView,
errors::expect_only_successful_execution,
logging::AdapterLogSchema,
move_vm_ext::{MoveResolverExt, SessionExt, SessionId},
parallel_executor::ParallelAptosVM,
system_module_names::*,
transaction_arg_validation,
transaction_metadata::TransactionMetadata,
Expand Down Expand Up @@ -968,10 +968,21 @@ impl VMExecutor for AptosVM {
))
});

// Record the histogram count for transactions per block.
BLOCK_TRANSACTION_COUNT.observe(transactions.len() as f64);
let log_context = AdapterLogSchema::new(state_view.id(), 0);
info!(
log_context,
"Executing block, transaction count: {}",
transactions.len()
);

ParallelAptosVM::execute_block(transactions, state_view, Self::get_concurrency_level())
let count = transactions.len();
let ret =
BlockAptosVM::execute_block(transactions, state_view, Self::get_concurrency_level());
if ret.is_ok() {
// Record the histogram count for transactions per block.
BLOCK_TRANSACTION_COUNT.observe(count as f64);
}
ret
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,35 @@
// SPDX-License-Identifier: Apache-2.0

mod storage_wrapper;
mod vm_wrapper;
pub(crate) mod vm_wrapper;

use crate::{
adapter_common::{preprocess_transaction, PreprocessedTransaction},
aptos_vm::AptosVM,
logging::AdapterLogSchema,
parallel_executor::vm_wrapper::AptosVMWrapper,
block_executor::vm_wrapper::AptosVMWrapper,
AptosVM,
};
use aptos_aggregator::{delta_change_set::DeltaOp, transaction::TransactionOutputExt};
use aptos_logger::{debug, info};
use aptos_parallel_executor::{
use aptos_block_executor::{
errors::Error,
executor::{ParallelTransactionExecutor, RAYON_EXEC_POOL},
executor::{BlockExecutor, RAYON_EXEC_POOL},
output_delta_resolver::{OutputDeltaResolver, ResolvedData},
task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput},
task::{
Transaction as BlockExecutorTransaction,
TransactionOutput as BlockExecutorTransactionOutput,
},
};
use aptos_logger::debug;
use aptos_state_view::StateView;
use aptos_types::{
state_store::state_key::StateKey,
transaction::{Transaction, TransactionOutput, TransactionStatus},
write_set::{WriteOp, WriteSet, WriteSetMut},
};
use move_core_types::vm_status::{StatusCode, VMStatus};
use move_core_types::vm_status::VMStatus;
use rayon::prelude::*;
use std::collections::HashMap;

impl PTransaction for PreprocessedTransaction {
impl BlockExecutorTransaction for PreprocessedTransaction {
type Key = StateKey;
type Value = WriteOp;
}
Expand All @@ -50,7 +52,7 @@ impl AptosTransactionOutput {
}
}

impl PTransactionOutput for AptosTransactionOutput {
impl BlockExecutorTransactionOutput for AptosTransactionOutput {
type T = PreprocessedTransaction;

fn get_writes(&self) -> Vec<(StateKey, WriteOp)> {
Expand Down Expand Up @@ -81,34 +83,34 @@ impl PTransactionOutput for AptosTransactionOutput {
}
}

pub struct ParallelAptosVM();
pub struct BlockAptosVM();

impl ParallelAptosVM {
impl BlockAptosVM {
fn process_parallel_block_output<S: StateView>(
results: Vec<AptosTransactionOutput>,
delta_resolver: OutputDeltaResolver<StateKey, WriteOp>,
state_view: &S,
) -> Vec<TransactionOutput> {
// TODO: MVHashmap, and then delta resolver should track aggregator keys.
let mut aggregator_keys: HashMap<StateKey, anyhow::Result<ResolvedData>> = HashMap::new();
// TODO: MVHashmap, and then delta resolver should track aggregator base values.
let mut aggregator_base_values: HashMap<StateKey, anyhow::Result<ResolvedData>> =
HashMap::new();
for res in results.iter() {
let output_ext = AptosTransactionOutput::as_ref(res);
for (key, _) in output_ext.delta_change_set().iter() {
if !aggregator_keys.contains_key(key) {
aggregator_keys.insert(key.clone(), state_view.get_state_value(key));
for (key, _) in res.as_ref().delta_change_set().iter() {
if !aggregator_base_values.contains_key(key) {
aggregator_base_values.insert(key.clone(), state_view.get_state_value(key));
}
}
}

let materialized_deltas =
delta_resolver.resolve(aggregator_keys.into_iter().collect(), results.len());
delta_resolver.resolve(aggregator_base_values.into_iter().collect(), results.len());

results
.into_iter()
.zip(materialized_deltas.into_iter())
.map(|(res, delta_writes)| {
let output_ext = AptosTransactionOutput::into(res);
output_ext.output_with_delta_writes(WriteSetMut::new(delta_writes))
res.into()
.output_with_delta_writes(WriteSetMut::new(delta_writes))
})
.collect()
}
Expand All @@ -119,8 +121,7 @@ impl ParallelAptosVM {
results
.into_iter()
.map(|res| {
let output_ext = AptosTransactionOutput::into(res);
let (deltas, output) = output_ext.into();
let (deltas, output) = res.into().into();
debug_assert!(deltas.is_empty(), "[Execution] Deltas must be materialized");
output
})
Expand All @@ -138,22 +139,13 @@ impl ParallelAptosVM {
let signature_verified_block: Vec<PreprocessedTransaction> =
RAYON_EXEC_POOL.install(|| {
transactions
.par_iter()
.map(|txn| preprocess_transaction::<AptosVM>(txn.clone()))
.into_par_iter()
.map(preprocess_transaction::<AptosVM>)
.collect()
});

let log_context = AdapterLogSchema::new(state_view.id(), 0);
info!(
log_context,
"Executing block, transaction count: {}",
transactions.len()
);

let executor =
ParallelTransactionExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new(
concurrency_level,
);
BlockExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new(concurrency_level);

let mut ret = if concurrency_level > 1 {
executor
Expand All @@ -175,8 +167,11 @@ impl ParallelAptosVM {
.map(Self::process_sequential_block_output);
}

// Explicit async drop. Happens here because we can't currently move to
// BlockExecutor due to the Module publishing fallback. TODO: fix after
// module publishing fallback is removed.
RAYON_EXEC_POOL.spawn(move || {
// Explicit async drop.
// Explicit async drops.
drop(signature_verified_block);
});

Expand All @@ -185,9 +180,6 @@ impl ParallelAptosVM {
Err(Error::ModulePathReadWrite) => {
unreachable!("[Execution]: Must be handled by sequential fallback")
}
Err(Error::InvariantViolation) => Err(VMStatus::Error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
)),
Err(Error::UserError(err)) => Err(err),
}
}
Expand Down
Loading

0 comments on commit d22cc15

Please sign in to comment.