Skip to content

Commit

Permalink
Merge sequential and parallel flows
Browse files Browse the repository at this point in the history
  • Loading branch information
gelash committed Nov 23, 2022
1 parent 720c6f2 commit ca6a504
Show file tree
Hide file tree
Showing 17 changed files with 299 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use aptos_types::{
};
use aptos_vm::{
data_cache::{AsMoveResolver, IntoMoveResolver, StorageAdapterOwned},
AptosVM,
AptosVM, VMExecutor,
};
use clap::StructOpt;
use language_e2e_tests::data_store::{FakeDataStore, GENESIS_CHANGE_SET_HEAD};
Expand Down Expand Up @@ -472,23 +472,26 @@ impl<'a> AptosTestAdapter<'a> {
/// Should error if the transaction ends up being discarded, or having a status other than
/// EXECUTED.
fn run_transaction(&mut self, txn: Transaction) -> Result<TransactionOutput> {
let mut outputs = AptosVM::execute_block_and_keep_vm_status(vec![txn], &self.storage)?;
let mut outputs = AptosVM::execute_block(vec![txn], &self.storage)?;

assert_eq!(outputs.len(), 1);

let (status, output) = outputs.pop().unwrap();
let output = outputs.pop().unwrap();
match output.status() {
TransactionStatus::Keep(kept_vm_status) => {
self.storage.add_write_set(output.write_set());
match kept_vm_status {
ExecutionStatus::Success => Ok(output),
_ => {
bail!("Failed to execute transaction. ExecutionStatus: {}", status)
bail!(
"Failed to execute transaction. ExecutionStatus: {:?}",
kept_vm_status
)
}
}
}
TransactionStatus::Discard(_) => {
bail!("Transaction discarded. VMStatus: {}", status)
TransactionStatus::Discard(status_code) => {
bail!("Transaction discarded. VM status code: {:?}", status_code)
}
TransactionStatus::Retry => panic!(),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
processed 4 tasks

task 1 'publish'. lines 4-30:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 2 'run'. lines 33-33:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 3 'view'. lines 35-35:
[No Resource Exists]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
processed 3 tasks

task 1 'run'. lines 5-13:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS

task 2 'view'. lines 15-15:
key 0x1::coin::CoinStore<0x1::aptos_coin::AptosCoin> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
processed 2 tasks

task 1 'publish'. lines 4-9:
Error: Transaction discarded. VMStatus: status MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS of type Validation
Error: Transaction discarded. VM status code: MAX_GAS_UNITS_BELOW_MIN_TRANSACTION_GAS_UNITS
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
processed 3 tasks

task 2 'run'. lines 29-29:
Error: Failed to execute transaction. ExecutionStatus: status STORAGE_WRITE_LIMIT_REACHED of type Execution
Error: Failed to execute transaction. ExecutionStatus: ExecutionFailure { location: Script, function: 0, code_offset: 0 }
72 changes: 0 additions & 72 deletions aptos-move/aptos-vm/src/adapter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ use crate::{
logging::AdapterLogSchema,
move_vm_ext::{MoveResolverExt, SessionExt, SessionId},
};
use aptos_logger::prelude::*;
use aptos_types::{
block_metadata::BlockMetadata,
transaction::{Transaction, TransactionOutput, TransactionStatus, WriteSetPayload},
write_set::WriteSet,
};
use rayon::prelude::*;

/// This trait describes the VM adapter's interface.
/// TODO: bring more of the execution logic in aptos_vm into this file.
Expand Down Expand Up @@ -133,76 +131,6 @@ pub(crate) fn validate_signature_checked_transaction<S: MoveResolverExt, A: VMAd
}
}

pub(crate) fn execute_block_impl<A: VMAdapter, S: StateView>(
adapter: &A,
transactions: Vec<Transaction>,
data_cache: &mut StateViewCache<S>,
) -> Result<Vec<(VMStatus, TransactionOutput)>, VMStatus> {
let mut result = vec![];
let mut should_restart = false;

info!(
AdapterLogSchema::new(data_cache.id(), 0),
"Executing block, transaction count: {}",
transactions.len()
);

let signature_verified_block: Vec<PreprocessedTransaction>;
{
// Verify the signatures of all the transactions in parallel.
// This is time consuming so don't wait and do the checking
// sequentially while executing the transactions.
signature_verified_block = transactions
.into_par_iter()
.map(preprocess_transaction::<A>)
.collect();
}

for (idx, txn) in signature_verified_block.into_iter().enumerate() {
let log_context = AdapterLogSchema::new(data_cache.id(), idx);
if should_restart {
let txn_output =
TransactionOutput::new(WriteSet::default(), vec![], 0, TransactionStatus::Retry);
result.push((VMStatus::Error(StatusCode::UNKNOWN_STATUS), txn_output));
debug!(log_context, "Retry after reconfiguration");
continue;
};
let (vm_status, output_ext, sender) = adapter.execute_single_transaction(
&txn,
&data_cache.as_move_resolver(),
&log_context,
)?;

// Apply deltas.
let output = output_ext.into_transaction_output(&data_cache);

if !output.status().is_discarded() {
data_cache.push_write_set(output.write_set());
} else {
match sender {
Some(s) => trace!(
log_context,
"Transaction discarded, sender: {}, error: {:?}",
s,
vm_status,
),
None => trace!(log_context, "Transaction malformed, error: {:?}", vm_status,),
}
}

if A::should_restart_execution(&output) {
info!(
AdapterLogSchema::new(data_cache.id(), 0),
"Reconfiguration occurred: restart required",
);
should_restart = true;
}

result.push((vm_status, output))
}
Ok(result)
}

/// Transactions after signature checking:
/// Waypoints and BlockPrologues are not signed and are unaffected by signature checking,
/// but a user transaction or writeset transaction is transformed to a SignatureCheckedTransaction.
Expand Down
39 changes: 6 additions & 33 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
adapter_common,
adapter_common::{
discard_error_output, discard_error_vm_status, validate_signature_checked_transaction,
validate_signed_transaction, PreprocessedTransaction, VMAdapter,
},
aptos_vm_impl::{get_transaction_output, AptosVMImpl, AptosVMInternals},
counters::*,
data_cache::{AsMoveResolver, IntoMoveResolver, StateViewCache},
data_cache::{AsMoveResolver, IntoMoveResolver},
delta_state_view::DeltaStateView,
errors::expect_only_successful_execution,
logging::AdapterLogSchema,
move_vm_ext::{MoveResolverExt, SessionExt, SessionId},
parallel_executor::ParallelAptosVM,
system_module_names::*,
transaction_arg_validation,
transaction_metadata::TransactionMetadata,
Expand Down Expand Up @@ -912,21 +912,6 @@ impl AptosVM {
Ok((VMStatus::Executed, output))
}

/// Alternate form of 'execute_block' that keeps the vm_status before it goes into the
/// `TransactionOutput`
pub fn execute_block_and_keep_vm_status(
transactions: Vec<Transaction>,
state_view: &impl StateView,
) -> Result<Vec<(VMStatus, TransactionOutput)>, VMStatus> {
let mut state_view_cache = StateViewCache::new(state_view);
let count = transactions.len();
let vm = AptosVM::new(&state_view_cache);
let res = adapter_common::execute_block_impl(&vm, transactions, &mut state_view_cache)?;
// Record the histogram count for transactions per block.
BLOCK_TRANSACTION_COUNT.observe(count as f64);
Ok(res)
}

pub fn simulate_signed_transaction(
txn: &SignedTransaction,
state_view: &impl StateView,
Expand Down Expand Up @@ -983,22 +968,10 @@ impl VMExecutor for AptosVM {
))
});

let concurrency_level = Self::get_concurrency_level();
if concurrency_level > 1 {
let (result, err) = crate::parallel_executor::ParallelAptosVM::execute_block(
transactions,
state_view,
concurrency_level,
)?;
debug!("Parallel execution error {:?}", err);
Ok(result)
} else {
let output = Self::execute_block_and_keep_vm_status(transactions, state_view)?;
Ok(output
.into_iter()
.map(|(_vm_status, txn_output)| txn_output)
.collect())
}
// Record the histogram count for transactions per block.
BLOCK_TRANSACTION_COUNT.observe(transactions.len() as f64);

ParallelAptosVM::execute_block(transactions, state_view, Self::get_concurrency_level())
}
}

Expand Down
41 changes: 16 additions & 25 deletions aptos-move/aptos-vm/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ use aptos_logger::prelude::*;
use aptos_state_view::{StateView, StateViewId};
use aptos_types::state_store::state_storage_usage::StateStorageUsage;
use aptos_types::{
access_path::AccessPath,
on_chain_config::ConfigStorage,
state_store::state_key::StateKey,
vm_status::StatusCode,
write_set::{WriteOp, WriteSet},
access_path::AccessPath, on_chain_config::ConfigStorage, state_store::state_key::StateKey,
vm_status::StatusCode, write_set::WriteOp,
};
use fail::fail_point;
use framework::natives::state_storage::StateStorageUsageResolver;
Expand All @@ -26,6 +23,7 @@ use move_core_types::{
};
use move_table_extension::{TableHandle, TableResolver};
use std::{
borrow::Cow,
collections::btree_map::BTreeMap,
ops::{Deref, DerefMut},
};
Expand All @@ -45,33 +43,23 @@ use std::{
/// track of incremental changes is vital to the consistency of the data store and the system.
pub struct StateViewCache<'a, S> {
data_view: &'a S,
data_map: BTreeMap<StateKey, Option<Vec<u8>>>,
data_map: Cow<'a, BTreeMap<StateKey, WriteOp>>,
}

impl<'a, S: StateView> StateViewCache<'a, S> {
pub fn from_map_ref(data_view: &'a S, data_map_ref: &'a BTreeMap<StateKey, WriteOp>) -> Self {
Self {
data_view,
data_map: Cow::Borrowed(data_map_ref),
}
}

/// Create a `StateViewCache` give a `StateView`. Hold updates to the data store and
/// forward data request to the `StateView` if not in the local cache.
pub fn new(data_view: &'a S) -> Self {
StateViewCache {
data_view,
data_map: BTreeMap::new(),
}
}

// Publishes a `WriteSet` computed at the end of a transaction.
// The effect is to build a layer in front of the `StateView` which keeps
// track of the data as if the changes were applied immediately.
pub(crate) fn push_write_set(&mut self, write_set: &WriteSet) {
for (ap, write_op) in write_set.iter() {
match write_op {
WriteOp::Modification(blob) | WriteOp::Creation(blob) => {
self.data_map.insert(ap.clone(), Some(blob.clone()));
}
WriteOp::Deletion => {
self.data_map.remove(ap);
self.data_map.insert(ap.clone(), None);
}
}
data_map: Cow::Owned(BTreeMap::new()),
}
}
}
Expand All @@ -84,7 +72,10 @@ impl<'block, S: StateView> StateView for StateViewCache<'block, S> {
)));

match self.data_map.get(state_key) {
Some(opt_data) => Ok(opt_data.clone()),
Some(write_op) => Ok(match write_op {
WriteOp::Modification(blob) | WriteOp::Creation(blob) => Some(blob.clone()),
WriteOp::Deletion => None,
}),
None => match self.data_view.get_state_value(state_key) {
Ok(remote_data) => Ok(remote_data),
// TODO: should we forward some error info?
Expand Down
Loading

0 comments on commit ca6a504

Please sign in to comment.