From 5d8e933f4627fabe223c7f2d8ea3334698cc10f6 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Tue, 26 Jul 2022 17:20:38 +0200 Subject: [PATCH] [fix] Draft for using delta-writes in TransactionOutput and ChangeSet --- aptos-move/aptos-vm/src/adapter_common.rs | 20 ++- aptos-move/aptos-vm/src/aptos_vm.rs | 139 ++++++++++++------ aptos-move/aptos-vm/src/aptos_vm_impl.rs | 41 ++++-- .../aptos-vm/src/move_vm_ext/session.rs | 10 +- .../src/parallel_executor/vm_wrapper.rs | 7 +- types/src/transaction/change_set.rs | 15 ++ types/src/transaction/mod.rs | 41 +++++- 7 files changed, 215 insertions(+), 58 deletions(-) diff --git a/aptos-move/aptos-vm/src/adapter_common.rs b/aptos-move/aptos-vm/src/adapter_common.rs index d3d728cf3b97e..c966f8df608cf 100644 --- a/aptos-move/aptos-vm/src/adapter_common.rs +++ b/aptos-move/aptos-vm/src/adapter_common.rs @@ -5,7 +5,10 @@ use crate::{counters::*, data_cache::StateViewCache}; use anyhow::Result; use aptos_state_view::StateView; use aptos_types::{ - transaction::{SignatureCheckedTransaction, SignedTransaction, VMValidatorResult}, + transaction::{ + SignatureCheckedTransaction, SignedTransaction, TransactionOutputWithDeltas, + VMValidatorResult, + }, vm_status::{StatusCode, VMStatus}, }; @@ -64,7 +67,7 @@ pub trait VMAdapter { txn: &PreprocessedTransaction, data_cache: &S, log_context: &AdapterLogSchema, - ) -> Result<(VMStatus, TransactionOutput, Option), VMStatus>; + ) -> Result<(VMStatus, TransactionOutputWithDeltas, Option), VMStatus>; } /// Validate a signed transaction by performing the following: @@ -199,13 +202,17 @@ pub(crate) fn execute_block_impl( debug!(log_context, "Retry after reconfiguration"); continue; }; - let (vm_status, output, sender) = adapter.execute_single_transaction( + let (vm_status, output_with_deltas, sender) = adapter.execute_single_transaction( &txn, &data_cache.as_move_resolver(), &log_context, )?; + // TODO: here we extract deltas so that we can use them later + let output = output_with_deltas.into(); if !output.status().is_discarded() { data_cache.push_write_set(output.write_set()); + // TODO: also apply deltas. + // data_cache.try_apply_deltas(deltas); (consumes deltas, error on overflow) } else { match sender { Some(s) => trace!( @@ -273,6 +280,13 @@ pub(crate) fn preprocess_transaction(txn: Transaction) -> Preproce } } +pub(crate) fn discard_error_vm_status_with_deltas( + err: VMStatus, +) -> (VMStatus, TransactionOutputWithDeltas) { + let (vm_status, empty_output) = discard_error_vm_status(err); + (vm_status, TransactionOutputWithDeltas::from(empty_output)) +} + pub(crate) fn discard_error_vm_status(err: VMStatus) -> (VMStatus, TransactionOutput) { let vm_status = err.clone(); let error_code = match err.keep_or_discard() { diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 68cb46e2b5958..4f8dec41a33ba 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -4,11 +4,13 @@ use crate::{ adapter_common, adapter_common::{ - discard_error_output, discard_error_vm_status, validate_signature_checked_transaction, - validate_signed_transaction, PreprocessedTransaction, VMAdapter, + discard_error_output, discard_error_vm_status, discard_error_vm_status_with_deltas, + validate_signature_checked_transaction, validate_signed_transaction, + PreprocessedTransaction, VMAdapter, }, aptos_vm_impl::{ - charge_global_write_gas_usage, get_transaction_output, AptosVMImpl, AptosVMInternals, + charge_global_write_gas_usage, get_transaction_output, get_transaction_output_with_deltas, + AptosVMImpl, AptosVMInternals, }, counters::*, data_cache::{AsMoveResolver, StateViewCache}, @@ -31,8 +33,8 @@ use aptos_types::{ on_chain_config::{new_epoch_event_key, VMConfig, VMPublishingOption, Version}, transaction::{ ChangeSet, ExecutionStatus, ModuleBundle, SignatureCheckedTransaction, SignedTransaction, - Transaction, TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult, - WriteSetPayload, + Transaction, TransactionOutput, TransactionOutputWithDeltas, TransactionPayload, + TransactionStatus, VMValidatorResult, WriteSetPayload, }, vm_status::{StatusCode, VMStatus}, write_set::{WriteSet, WriteSetMut}, @@ -152,7 +154,7 @@ impl AptosVM { txn_data: &TransactionMetadata, storage: &S, log_context: &AdapterLogSchema, - ) -> TransactionOutput { + ) -> TransactionOutputWithDeltas { self.failed_transaction_cleanup_and_keep_vm_status( error_code, gas_status, @@ -170,7 +172,7 @@ impl AptosVM { txn_data: &TransactionMetadata, storage: &S, log_context: &AdapterLogSchema, - ) -> (VMStatus, TransactionOutput) { + ) -> (VMStatus, TransactionOutputWithDeltas) { gas_status.set_metering(false); let mut session = self.0.new_session(storage, SessionId::txn_meta(txn_data)); match TransactionStatus::from(error_code.clone()) { @@ -185,21 +187,22 @@ impl AptosVM { self.0 .run_failure_epilogue(&mut session, gas_status, txn_data, log_context) { - return discard_error_vm_status(e); + return discard_error_vm_status_with_deltas(e); } - let txn_output = get_transaction_output( + let txn_output = get_transaction_output_with_deltas( &mut (), session, gas_status.remaining_gas(), txn_data, status, ) - .unwrap_or_else(|e| discard_error_vm_status(e).1); + .unwrap_or_else(|e| discard_error_vm_status_with_deltas(e).1); (error_code, txn_output) } - TransactionStatus::Discard(status) => { - (VMStatus::Error(status), discard_error_output(status)) - } + TransactionStatus::Discard(status) => ( + VMStatus::Error(status), + TransactionOutputWithDeltas::from(discard_error_output(status)), + ), TransactionStatus::Retry => unreachable!(), } } @@ -210,14 +213,14 @@ impl AptosVM { gas_status: &mut GasStatus, txn_data: &TransactionMetadata, log_context: &AdapterLogSchema, - ) -> Result<(VMStatus, TransactionOutput), VMStatus> { + ) -> Result<(VMStatus, TransactionOutputWithDeltas), VMStatus> { gas_status.set_metering(false); self.0 .run_success_epilogue(&mut session, gas_status, txn_data, log_context)?; Ok(( VMStatus::Executed, - get_transaction_output( + get_transaction_output_with_deltas( &mut (), session, gas_status.remaining_gas(), @@ -227,14 +230,14 @@ impl AptosVM { )) } - fn execute_script_or_script_function( + fn execute_script_or_script_function_with_deltas( &self, mut session: SessionExt, gas_status: &mut GasStatus, txn_data: &TransactionMetadata, payload: &TransactionPayload, log_context: &AdapterLogSchema, - ) -> Result<(VMStatus, TransactionOutput), VMStatus> { + ) -> Result<(VMStatus, TransactionOutputWithDeltas), VMStatus> { fail_point!("move_adapter::execute_script_or_script_function", |_| { Err(VMStatus::Error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, @@ -302,6 +305,24 @@ impl AptosVM { } } + fn execute_script_or_script_function( + &self, + mut session: SessionExt, + gas_status: &mut GasStatus, + txn_data: &TransactionMetadata, + payload: &TransactionPayload, + log_context: &AdapterLogSchema, + ) -> Result<(VMStatus, TransactionOutput), VMStatus> { + self.execute_script_or_script_function_with_deltas( + session, + gas_status, + txn_data, + payload, + log_context, + ) + .map(|(vm_status, output)| (vm_status, output.into())) + } + fn verify_module_bundle( session: &mut SessionExt, module_bundle: &ModuleBundle, @@ -372,14 +393,14 @@ impl AptosVM { Ok(()) } - fn execute_modules( + fn execute_modules_with_deltas( &self, mut session: SessionExt, gas_status: &mut GasStatus, txn_data: &TransactionMetadata, modules: &ModuleBundle, log_context: &AdapterLogSchema, - ) -> Result<(VMStatus, TransactionOutput), VMStatus> { + ) -> Result<(VMStatus, TransactionOutputWithDeltas), VMStatus> { fail_point!("move_adapter::execute_module", |_| { Err(VMStatus::Error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, @@ -415,17 +436,29 @@ impl AptosVM { self.success_transaction_cleanup(session, gas_status, txn_data, log_context) } + fn execute_modules( + &self, + mut session: SessionExt, + gas_status: &mut GasStatus, + txn_data: &TransactionMetadata, + modules: &ModuleBundle, + log_context: &AdapterLogSchema, + ) -> Result<(VMStatus, TransactionOutput), VMStatus> { + self.execute_modules_with_deltas(session, gas_status, txn_data, modules, log_context) + .map(|(vm_status, output)| (vm_status, output.into())) + } + pub(crate) fn execute_user_transaction( &self, storage: &S, txn: &SignatureCheckedTransaction, log_context: &AdapterLogSchema, - ) -> (VMStatus, TransactionOutput) { + ) -> (VMStatus, TransactionOutputWithDeltas) { macro_rules! unwrap_or_discard { ($res: expr) => { match $res { Ok(s) => s, - Err(e) => return discard_error_vm_status(e), + Err(e) => return discard_error_vm_status_with_deltas(e), } }; } @@ -439,7 +472,7 @@ impl AptosVM { false, log_context, ) { - return discard_error_vm_status(err); + return discard_error_vm_status_with_deltas(err); }; let gas_schedule = unwrap_or_discard!(self.0.get_gas_schedule(log_context)); @@ -449,18 +482,24 @@ impl AptosVM { let result = match txn.payload() { payload @ TransactionPayload::Script(_) | payload @ TransactionPayload::ScriptFunction(_) => self - .execute_script_or_script_function( + .execute_script_or_script_function_with_deltas( session, &mut gas_status, &txn_data, payload, log_context, ), - TransactionPayload::ModuleBundle(m) => { - self.execute_modules(session, &mut gas_status, &txn_data, m, log_context) - } + TransactionPayload::ModuleBundle(m) => self.execute_modules_with_deltas( + session, + &mut gas_status, + &txn_data, + m, + log_context, + ), TransactionPayload::WriteSet(_) => { - return discard_error_vm_status(VMStatus::Error(StatusCode::UNREACHABLE)); + return discard_error_vm_status_with_deltas(VMStatus::Error( + StatusCode::UNREACHABLE, + )); } }; @@ -475,7 +514,7 @@ impl AptosVM { Err(err) => { let txn_status = TransactionStatus::from(err.clone()); if txn_status.is_discarded() { - discard_error_vm_status(err) + discard_error_vm_status_with_deltas(err) } else { self.failed_transaction_cleanup_and_keep_vm_status( err, @@ -973,12 +1012,16 @@ impl VMAdapter for AptosVM { txn: &PreprocessedTransaction, data_cache: &S, log_context: &AdapterLogSchema, - ) -> Result<(VMStatus, TransactionOutput, Option), VMStatus> { + ) -> Result<(VMStatus, TransactionOutputWithDeltas, Option), VMStatus> { Ok(match txn { PreprocessedTransaction::BlockMetadata(block_metadata) => { let (vm_status, output) = self.process_block_prologue(data_cache, block_metadata.clone(), log_context)?; - (vm_status, output, Some("block_prologue".to_string())) + ( + vm_status, + TransactionOutputWithDeltas::from(output), + Some("block_prologue".to_string()), + ) } PreprocessedTransaction::WaypointWriteSet(write_set_payload) => { let (vm_status, output) = self.process_waypoint_change_set( @@ -986,7 +1029,11 @@ impl VMAdapter for AptosVM { write_set_payload.clone(), log_context, )?; - (vm_status, output, Some("waypoint_write_set".to_string())) + ( + vm_status, + TransactionOutputWithDeltas::from(output), + Some("waypoint_write_set".to_string()), + ) } PreprocessedTransaction::UserTransaction(txn) => { let sender = txn.sender().to_string(); @@ -1008,12 +1055,16 @@ impl VMAdapter for AptosVM { PreprocessedTransaction::WriteSet(txn) => { let (vm_status, output) = self.process_writeset_transaction(data_cache, txn, log_context)?; - (vm_status, output, Some("write_set".to_string())) + ( + vm_status, + TransactionOutputWithDeltas::from(output), + Some("write_set".to_string()), + ) } PreprocessedTransaction::InvalidSignature => { let (vm_status, output) = discard_error_vm_status(VMStatus::Error(StatusCode::INVALID_SIGNATURE)); - (vm_status, output, None) + (vm_status, TransactionOutputWithDeltas::from(output), None) } PreprocessedTransaction::StateCheckpoint => { let output = TransactionOutput::new( @@ -1022,7 +1073,11 @@ impl VMAdapter for AptosVM { 0, TransactionStatus::Keep(ExecutionStatus::Success), ); - (VMStatus::Executed, output, Some("state_checkpoint".into())) + ( + VMStatus::Executed, + TransactionOutputWithDeltas::from(output), + Some("state_checkpoint".into()), + ) } }) } @@ -1110,13 +1165,15 @@ impl AptosSimulationVM { if txn_status.is_discarded() { discard_error_vm_status(err) } else { - self.0.failed_transaction_cleanup_and_keep_vm_status( - err, - &mut gas_status, - &txn_data, - storage, - log_context, - ) + let (vm_status, empty_output) = + self.0.failed_transaction_cleanup_and_keep_vm_status( + err, + &mut gas_status, + &txn_data, + storage, + log_context, + ); + (vm_status, empty_output.into()) } } } diff --git a/aptos-move/aptos-vm/src/aptos_vm_impl.rs b/aptos-move/aptos-vm/src/aptos_vm_impl.rs index 335abf3d9bca4..0d6c21cc76dec 100644 --- a/aptos-move/aptos-vm/src/aptos_vm_impl.rs +++ b/aptos-move/aptos-vm/src/aptos_vm_impl.rs @@ -18,7 +18,9 @@ use aptos_types::{ on_chain_config::{ ConfigStorage, OnChainConfig, VMConfig, VMPublishingOption, Version, APTOS_VERSION_3, }, - transaction::{ExecutionStatus, TransactionOutput, TransactionStatus}, + transaction::{ + ExecutionStatus, TransactionOutput, TransactionOutputWithDeltas, TransactionStatus, + }, vm_status::{StatusCode, VMStatus}, }; use fail::fail_point; @@ -563,24 +565,41 @@ pub(crate) fn charge_global_write_gas_usage( .map_err(|p_err| p_err.finish(Location::Undefined).into_vm_status()) } -pub(crate) fn get_transaction_output( +pub(crate) fn get_transaction_output_with_deltas( ap_cache: &mut A, session: SessionExt, gas_left: GasUnits, txn_data: &TransactionMetadata, status: ExecutionStatus, -) -> Result { +) -> Result { let gas_used: u64 = txn_data.max_gas_amount().sub(gas_left).get(); let session_out = session.finish().map_err(|e| e.into_vm_status())?; - let (write_set, events) = session_out.into_change_set(ap_cache)?.into_inner(); - - Ok(TransactionOutput::new( - write_set, - events, - gas_used, - TransactionStatus::Keep(status), - )) + + // TODO: this will also extract deltas, which can be preocessed separately. + // + // let (deltas, change_set) = ... + // + let change_set = session_out + .into_change_set_with_deltas(ap_cache)? + .into_inner(); + let (write_set, events) = change_set.into_inner(); + + let txn_output = + TransactionOutput::new(write_set, events, gas_used, TransactionStatus::Keep(status)); + + Ok(TransactionOutputWithDeltas::new(txn_output)) +} + +pub(crate) fn get_transaction_output( + ap_cache: &mut A, + session: SessionExt, + gas_left: GasUnits, + txn_data: &TransactionMetadata, + status: ExecutionStatus, +) -> Result { + get_transaction_output_with_deltas(ap_cache, session, gas_left, txn_data, status) + .map(TransactionOutputWithDeltas::into) } #[test] diff --git a/aptos-move/aptos-vm/src/move_vm_ext/session.rs b/aptos-move/aptos-vm/src/move_vm_ext/session.rs index ead428ca050c0..0ff8e11fe16e2 100644 --- a/aptos-move/aptos-vm/src/move_vm_ext/session.rs +++ b/aptos-move/aptos-vm/src/move_vm_ext/session.rs @@ -11,7 +11,7 @@ use aptos_types::{ block_metadata::BlockMetadata, contract_event::ContractEvent, state_store::state_key::StateKey, - transaction::{ChangeSet, SignatureCheckedTransaction}, + transaction::{ChangeSet, ChangeSetWithDeltas, SignatureCheckedTransaction}, write_set::{WriteOp, WriteSetMut}, }; use move_deps::{ @@ -194,6 +194,14 @@ impl SessionOutput { Ok(ChangeSet::new(write_set, events)) } + pub fn into_change_set_with_deltas( + self, + ap_cache: &mut C, + ) -> Result { + // TODO: create DeltaSet here and pass them on to the result. + self.into_change_set(ap_cache).map(ChangeSetWithDeltas::new) + } + pub fn squash(&mut self, other: Self) -> Result<(), VMStatus> { self.change_set .squash(other.change_set) diff --git a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs index 84abb998ce582..339897faacc72 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs @@ -66,7 +66,10 @@ impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { .vm .execute_single_transaction(txn, &versioned_view, &log_context) { - Ok((vm_status, output, sender)) => { + Ok((vm_status, output_with_deltas, sender)) => { + // TODO: here also extract deltas. + let output = output_with_deltas.into(); + if output.status().is_discarded() { match sender { Some(s) => trace!( @@ -80,6 +83,8 @@ impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { } }; } + + // TODO: here also pass deltas to `AptosTransactionOutput::new`. if AptosVM::should_restart_execution(&output) { ExecutionStatus::SkipRest(AptosTransactionOutput::new(output)) } else { diff --git a/types/src/transaction/change_set.rs b/types/src/transaction/change_set.rs index 5fa602d92668d..c3717dafe6fa1 100644 --- a/types/src/transaction/change_set.rs +++ b/types/src/transaction/change_set.rs @@ -27,3 +27,18 @@ impl ChangeSet { &self.events } } + +pub struct ChangeSetWithDeltas { + // TODO: add deltas here. + change_set: ChangeSet, +} + +impl ChangeSetWithDeltas { + pub fn new(change_set: ChangeSet) -> Self { + ChangeSetWithDeltas { change_set } + } + + pub fn into_inner(self) -> ChangeSet { + self.change_set + } +} diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index f23cd6a2139a6..f2886cf9a31fa 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -40,7 +40,7 @@ mod module; mod script; mod transaction_argument; -pub use change_set::ChangeSet; +pub use change_set::{ChangeSet, ChangeSetWithDeltas}; pub use module::{Module, ModuleBundle}; pub use script::{ ArgumentABI, Script, ScriptABI, ScriptFunction, ScriptFunctionABI, TransactionScriptABI, @@ -951,6 +951,45 @@ impl TransactionOutput { } } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TransactionOutputWithDeltas { + // TODO: add deltas here (same as in ChangeSetWithDeltas). + output: TransactionOutput, +} + +impl TransactionOutputWithDeltas { + pub fn new(output: TransactionOutput) -> Self { + TransactionOutputWithDeltas { output } + } + + pub fn write_set(&self) -> &WriteSet { + &self.output.write_set + } + + pub fn events(&self) -> &[ContractEvent] { + &self.output.events + } + + pub fn gas_used(&self) -> u64 { + self.output.gas_used + } + + pub fn status(&self) -> &TransactionStatus { + &self.output.status + } + + pub fn into(self) -> TransactionOutput { + self.output + } +} + +impl From for TransactionOutputWithDeltas { + fn from(output: TransactionOutput) -> Self { + // Create empty deltas. + TransactionOutputWithDeltas { output } + } +} + /// `TransactionInfo` is the object we store in the transaction accumulator. It consists of the /// transaction as well as the execution result of this transaction. #[derive(Clone, CryptoHasher, BCSCryptoHash, Debug, Eq, PartialEq, Serialize, Deserialize)]