Skip to content

Commit

Permalink
[small][agg_v2] Fix agg v2 error handling - Success can also have fai…
Browse files Browse the repository at this point in the history
…lure error codes (#10428)
  • Loading branch information
igor-aptos authored Oct 18, 2023
1 parent ec8382c commit 7a58f36
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 95 deletions.
31 changes: 27 additions & 4 deletions aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_state_view::StateView;
use aptos_types::transaction::signature_verified_transaction::SignatureVerifiedTransaction;
use aptos_vm_logging::{log_schema::AdapterLogSchema, prelude::*};
use aptos_vm_types::resolver::{ExecutorView, ResourceGroupView};
use move_core_types::vm_status::VMStatus;
use move_core_types::vm_status::{StatusCode, VMStatus};

pub(crate) struct AptosExecutorTask<'a, S> {
vm: AptosVM,
Expand Down Expand Up @@ -52,8 +52,8 @@ impl<'a, S: 'a + StateView + Sync> ExecutorTask for AptosExecutorTask<'a, S> {
.execute_single_transaction(txn, &resolver, &log_context)
{
Ok((vm_status, mut vm_output, sender)) => {
// TODO: move materialize deltas outside, into sequential execution.
if materialize_deltas {
// TODO: Integrate aggregator v2.
vm_output = vm_output
.try_materialize(&resolver)
.expect("Delta materialization failed");
Expand All @@ -76,7 +76,16 @@ impl<'a, S: 'a + StateView + Sync> ExecutorTask for AptosExecutorTask<'a, S> {
},
};
}
if AptosVM::should_restart_execution(&vm_output) {
if vm_status.status_code() == StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR {
ExecutionStatus::SpeculativeExecutionAbortError(
vm_status.message().cloned().unwrap_or_default(),
)
} else if vm_status.status_code() == StatusCode::DELAYED_FIELDS_CODE_INVARIANT_ERROR
{
ExecutionStatus::DelayedFieldsCodeInvariantError(
vm_status.message().cloned().unwrap_or_default(),
)
} else if AptosVM::should_restart_execution(&vm_output) {
speculative_info!(
&log_context,
"Reconfiguration occurred: restart required".into()
Expand All @@ -86,7 +95,21 @@ impl<'a, S: 'a + StateView + Sync> ExecutorTask for AptosExecutorTask<'a, S> {
ExecutionStatus::Success(AptosTransactionOutput::new(vm_output))
}
},
Err(err) => ExecutionStatus::Abort(err),
// execute_single_transaction only returns an error when transactions that should never fail
// (BlockMetadataTransaction and GenesisTransaction) return an error themselves.
Err(err) => {
if err.status_code() == StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR {
ExecutionStatus::SpeculativeExecutionAbortError(
err.message().cloned().unwrap_or_default(),
)
} else if err.status_code() == StatusCode::DELAYED_FIELDS_CODE_INVARIANT_ERROR {
ExecutionStatus::DelayedFieldsCodeInvariantError(
err.message().cloned().unwrap_or_default(),
)
} else {
ExecutionStatus::Abort(err)
}
},
}
}
}
69 changes: 34 additions & 35 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
errors::*,
explicit_sync_wrapper::ExplicitSyncWrapper,
scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave},
task::{CategorizeError, ErrorCategory, ExecutionStatus, ExecutorTask, TransactionOutput},
task::{ExecutionStatus, ExecutorTask, TransactionOutput},
txn_commit_hook::TransactionCommitHook,
txn_last_input_output::TxnLastInputOutput,
view::{LatestView, ParallelState, SequentialState, ViewState},
Expand Down Expand Up @@ -70,7 +70,6 @@ impl<T, E, S, L, X> BlockExecutor<T, E, S, L, X>
where
T: Transaction,
E: ExecutorTask<Txn = T>,
E::Error: CategorizeError,
S: TStateView<Key = T::Key> + Sync,
L: TransactionCommitHook<Output = E::Output>,
X: Executable + 'static,
Expand All @@ -97,6 +96,10 @@ where
}
}

fn fallback_to_sequential() {
unimplemented!();
}

fn execute(
idx_to_execute: TxnIndex,
incarnation: Incarnation,
Expand Down Expand Up @@ -190,20 +193,17 @@ where
ExecutionStatus::SkipRest(output)
},
ExecutionStatus::Abort(err) => {
match err.categorize() {
ErrorCategory::CodeInvariantError => {
// TODO fallback to speculative execution
panic!("");
},
ErrorCategory::SpeculativeExecutionError => {
speculative_inconsistent = true;
},
_ => (),
};

// Record the status indicating abort.
ExecutionStatus::Abort(Error::UserError(err))
},
ExecutionStatus::SpeculativeExecutionAbortError(msg) => {
speculative_inconsistent = true;
ExecutionStatus::SpeculativeExecutionAbortError(msg)
},
ExecutionStatus::DelayedFieldsCodeInvariantError(msg) => {
Self::fallback_to_sequential();
ExecutionStatus::DelayedFieldsCodeInvariantError(msg)
},
};

// Remove entries from previous write/delta set that were not overwritten.
Expand Down Expand Up @@ -244,8 +244,7 @@ where
.expect("[BlockSTM]: Prior read-set must be recorded");

if read_set.validate_incorrect_use() {
// TODO fallback to speculative
panic!("Incorrect use !");
Self::fallback_to_sequential();
}

// Note: we validate delayed field reads only at try_commit.
Expand Down Expand Up @@ -324,17 +323,6 @@ where
let mut execution_still_valid =
read_set.validate_delayed_field_reads(versioned_cache.delayed_fields(), txn_idx);

match last_input_output.output_category(txn_idx) {
Some(ErrorCategory::SpeculativeExecutionError) => {
assert!(!execution_still_valid);
},
Some(ErrorCategory::CodeInvariantError) => {
// TODO: fallback to sequential execution
panic!();
},
_ => (),
};

if execution_still_valid {
if let Some(delayed_field_ids) = last_input_output.delayed_field_keys(txn_idx) {
if let Err(e) = versioned_cache
Expand Down Expand Up @@ -748,6 +736,10 @@ where
ExecutionStatus::Abort(_) => {
txn_commit_listener.on_execution_aborted(txn_idx);
},
ExecutionStatus::SpeculativeExecutionAbortError(msg)
| ExecutionStatus::DelayedFieldsCodeInvariantError(msg) => {
panic!("Cannot be materializing with {}", msg);
},
}
}

Expand All @@ -757,6 +749,10 @@ where
final_results[txn_idx as usize] = t;
},
ExecutionStatus::Abort(_) => (),
ExecutionStatus::SpeculativeExecutionAbortError(msg)
| ExecutionStatus::DelayedFieldsCodeInvariantError(msg) => {
panic!("Cannot be materializing with {}", msg);
},
};
}

Expand Down Expand Up @@ -1104,21 +1100,24 @@ where
ret.push(output);
},
ExecutionStatus::Abort(err) => {
match err.categorize() {
ErrorCategory::CodeInvariantError
| ErrorCategory::SpeculativeExecutionError => panic!(
"Sequential execution must not have delayed fields errors: {:?}",
err
),
_ => (),
};

if let Some(commit_hook) = &self.transaction_commit_hook {
commit_hook.on_execution_aborted(idx as TxnIndex);
}
// Record the status indicating abort.
return Err(Error::UserError(err));
},
ExecutionStatus::SpeculativeExecutionAbortError(msg) => {
panic!(
"Sequential execution must not have SpeculativeExecutionAbortError: {:?}",
msg
);
},
ExecutionStatus::DelayedFieldsCodeInvariantError(msg) => {
panic!(
"Sequential execution must not have DelayedFieldsCodeInvariantError: {:?}",
msg
);
},
}
// When the txn is a SkipRest txn, halt sequential execution.
if must_skip {
Expand Down
41 changes: 8 additions & 33 deletions aptos-move/block-executor/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use aptos_types::{
write_set::WriteOp,
};
use aptos_vm_types::resolver::{TExecutorView, TResourceGroupView};
use move_core_types::{
value::MoveTypeLayout,
vm_status::{StatusCode, VMStatus},
};
use move_core_types::value::MoveTypeLayout;
use std::{collections::HashMap, fmt::Debug, sync::Arc};

/// The execution result of a transaction
Expand All @@ -26,6 +23,12 @@ pub enum ExecutionStatus<O, E> {
/// Transaction was executed successfully, but will skip the execution of the trailing
/// transactions in the list
SkipRest(O),
/// During transaction execution, it detected that it is in inconsistent state
/// due to speculative reads it did, and needs to be re-executed
SpeculativeExecutionAbortError(String),
/// During transaction execution, it detected code invariant error
/// Which can only be caused by the bug in the code.
DelayedFieldsCodeInvariantError(String),
}

/// Inference result of a transaction.
Expand All @@ -34,34 +37,6 @@ pub struct Accesses<K> {
pub keys_written: Vec<K>,
}

pub enum ErrorCategory {
CodeInvariantError,
SpeculativeExecutionError,
ValidError,
}

pub trait CategorizeError {
fn categorize(&self) -> ErrorCategory;
}

impl CategorizeError for usize {
fn categorize(&self) -> ErrorCategory {
ErrorCategory::ValidError
}
}

impl CategorizeError for VMStatus {
fn categorize(&self) -> ErrorCategory {
match self.status_code() {
StatusCode::DELAYED_FIELDS_CODE_INVARIANT_ERROR => ErrorCategory::CodeInvariantError,
StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR => {
ErrorCategory::SpeculativeExecutionError
},
_ => ErrorCategory::ValidError,
}
}
}

/// Trait for single threaded transaction executor.
// TODO: Sync should not be required. Sync is only introduced because this trait occurs as a phantom type of executor struct.
pub trait ExecutorTask: Sync {
Expand All @@ -72,7 +47,7 @@ pub trait ExecutorTask: Sync {
type Output: TransactionOutput<Txn = Self::Txn> + 'static;

/// Type of error when the executor failed to process a transaction and needs to abort.
type Error: Debug + Clone + Send + Sync + Eq + CategorizeError + 'static;
type Error: Debug + Clone + Send + Sync + Eq + 'static;

/// Type to initialize the single thread transaction executor. Copy and Sync are required because
/// we will create an instance of executor on each individual thread.
Expand Down
46 changes: 23 additions & 23 deletions aptos-move/block-executor/src/txn_last_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
captured_reads::CapturedReads,
errors::Error,
task::{CategorizeError, ErrorCategory, ExecutionStatus, TransactionOutput},
task::{ExecutionStatus, TransactionOutput},
};
use aptos_mvhashmap::types::TxnIndex;
use aptos_types::{
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct TxnLastInputOutput<T: Transaction, O: TransactionOutput<Txn = T>, E:
module_read_write_intersection: AtomicBool,
}

impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + CategorizeError>
impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
TxnLastInputOutput<T, O, E>
{
pub fn new(num_txns: TxnIndex) -> Self {
Expand Down Expand Up @@ -113,7 +113,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => {
output.module_write_set()
},
ExecutionStatus::Abort(_) => HashMap::new(),
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => HashMap::new(),
};

if !self.module_read_write_intersection.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -214,7 +216,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
.map(|k| (k, false))
.chain(t.module_write_set().into_keys().map(|k| (k, true))),
),
ExecutionStatus::Abort(_) => None,
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => None,
})
}

Expand All @@ -228,7 +232,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => {
Some(t.resource_write_set())
},
ExecutionStatus::Abort(_) => None,
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => None,
})
}

Expand All @@ -243,7 +249,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => {
Some(t.delayed_field_change_set().into_keys())
},
ExecutionStatus::Abort(_) => None,
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => None,
})
}

Expand All @@ -254,7 +262,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => {
t.aggregator_v1_delta_set().into_keys().collect()
},
ExecutionStatus::Abort(_) => vec![],
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => vec![],
},
)
}
Expand All @@ -270,27 +280,15 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
let events = t.get_events();
Box::new(events.into_iter())
},
ExecutionStatus::Abort(_) => {
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => {
Box::new(empty::<(T::Event, Option<MoveTypeLayout>)>())
},
},
)
}

pub(crate) fn output_category(&self, txn_idx: TxnIndex) -> Option<ErrorCategory> {
self.outputs[txn_idx as usize]
.load()
.as_ref()
.and_then(|txn_output| match txn_output.output_status() {
ExecutionStatus::Success(_) => None,
ExecutionStatus::SkipRest(_) => None,
ExecutionStatus::Abort(Error::UserError(err)) => Some(err.categorize()),
ExecutionStatus::Abort(Error::ModulePathReadWrite) => {
Some(ErrorCategory::ValidError)
},
})
}

// Called when a transaction is committed to record WriteOps for materialized aggregator values
// corresponding to the (deltas) in the recorded final output of the transaction.
pub(crate) fn record_materialized_txn_output(
Expand All @@ -312,7 +310,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone + Ca
patched_events,
);
},
ExecutionStatus::Abort(_) => {},
ExecutionStatus::Abort(_)
| ExecutionStatus::SpeculativeExecutionAbortError(_)
| ExecutionStatus::DelayedFieldsCodeInvariantError(_) => {},
};
}

Expand Down

0 comments on commit 7a58f36

Please sign in to comment.