Skip to content

Commit

Permalink
simplify TransactionReplayer::replay implementation (#14920)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Oct 11, 2024
1 parent a6ad9d3 commit 5a90c6a
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 197 deletions.
31 changes: 1 addition & 30 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
proof::accumulator::InMemoryTransactionAccumulator,
state_store::{combine_or_add_sharded_state_updates, ShardedStateUpdates},
state_store::ShardedStateUpdates,
transaction::{
block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit,
Version,
Expand Down Expand Up @@ -119,35 +119,6 @@ impl LedgerUpdateOutput {
)
}

pub fn combine(&mut self, rhs: Self) {
assert!(self.block_end_info.is_none());
assert!(rhs.block_end_info.is_none());
let Self {
statuses_for_input_txns,
to_commit,
subscribable_events,
transaction_info_hashes,
state_updates_until_last_checkpoint: state_updates_before_last_checkpoint,
sharded_state_cache,
transaction_accumulator,
block_end_info: _block_end_info,
} = rhs;

if let Some(updates) = state_updates_before_last_checkpoint {
combine_or_add_sharded_state_updates(
&mut self.state_updates_until_last_checkpoint,
updates,
);
}

self.statuses_for_input_txns.extend(statuses_for_input_txns);
self.to_commit.extend(to_commit);
self.subscribable_events.extend(subscribable_events);
self.transaction_info_hashes.extend(transaction_info_hashes);
self.sharded_state_cache.combine(sharded_state_cache);
self.transaction_accumulator = transaction_accumulator;
}

pub fn next_version(&self) -> Version {
self.transaction_accumulator.num_leaves() as Version
}
Expand Down
4 changes: 2 additions & 2 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ impl VerifyExecutionMode {
}

pub trait TransactionReplayer: Send {
fn replay(
fn enqueue_chunks(
&self,
transactions: Vec<Transaction>,
transaction_infos: Vec<TransactionInfo>,
write_sets: Vec<WriteSet>,
event_vecs: Vec<Vec<ContractEvent>>,
verify_execution_mode: &VerifyExecutionMode,
) -> Result<()>;
) -> Result<usize>;

fn commit(&self) -> Result<Version>;
}
Expand Down
112 changes: 49 additions & 63 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

use crate::{
components::{
apply_chunk_output::{ensure_no_discard, ensure_no_retry, ApplyChunkOutput},
apply_chunk_output::ApplyChunkOutput,
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, StateSyncChunkVerifier},
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
executed_chunk::ExecutedChunk,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
},
Expand All @@ -28,7 +28,7 @@ use aptos_logger::prelude::*;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use aptos_storage_interface::{
async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView,
state_delta::StateDelta, DbReaderWriter, ExecutedTrees,
state_delta::StateDelta, DbReaderWriter,
};
use aptos_types::{
block_executor::config::BlockExecutorConfigFromOnchain,
Expand Down Expand Up @@ -93,6 +93,10 @@ impl<V: VMExecutor> ChunkExecutor<V> {
error
})
}

pub fn is_empty(&self) -> bool {
self.with_inner(|inner| Ok(inner.is_empty())).unwrap()
}
}

impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
Expand Down Expand Up @@ -279,6 +283,10 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
Ok(chunk)
}

fn is_empty(&self) -> bool {
self.commit_queue.lock().is_empty()
}

// ************************* Chunk Executor Implementation *************************
fn enqueue_chunk<Chunk: TransactionChunk + Sync>(
&self,
Expand Down Expand Up @@ -401,24 +409,28 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
}

impl<V: VMExecutor> TransactionReplayer for ChunkExecutor<V> {
fn replay(
fn enqueue_chunks(
&self,
transactions: Vec<Transaction>,
transaction_infos: Vec<TransactionInfo>,
write_sets: Vec<WriteSet>,
event_vecs: Vec<Vec<ContractEvent>>,
verify_execution_mode: &VerifyExecutionMode,
) -> Result<()> {
) -> Result<usize> {
let _guard = CONCURRENCY_GAUGE.concurrency_with(&["replayer", "replay"]);

self.maybe_initialize()?;
self.inner.read().as_ref().expect("not reset").replay(
transactions,
transaction_infos,
write_sets,
event_vecs,
verify_execution_mode,
)
self.inner
.read()
.as_ref()
.expect("not reset")
.enqueue_chunks(
transactions,
transaction_infos,
write_sets,
event_vecs,
verify_execution_mode,
)
}

fn commit(&self) -> Result<Version> {
Expand All @@ -428,19 +440,18 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutor<V> {
}
}

impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
fn replay(
impl<V: VMExecutor> ChunkExecutorInner<V> {
fn enqueue_chunks(
&self,
mut transactions: Vec<Transaction>,
mut transaction_infos: Vec<TransactionInfo>,
mut write_sets: Vec<WriteSet>,
mut event_vecs: Vec<Vec<ContractEvent>>,
verify_execution_mode: &VerifyExecutionMode,
) -> Result<()> {
) -> Result<usize> {
let started = Instant::now();
let num_txns = transactions.len();
let mut latest_view = self.commit_queue.lock().expect_latest_view()?;
let chunk_begin = latest_view.num_transactions() as Version;
let chunk_begin = self.commit_queue.lock().expecting_version();
let chunk_end = chunk_begin + num_txns as Version; // right-exclusive

// Find epoch boundaries.
Expand All @@ -459,12 +470,10 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
epochs.push((epoch_begin, chunk_end));
}

let mut executed_chunk = None;
let mut chunks_enqueued = 0;
// Replay epoch by epoch.
for (begin, end) in epochs {
self.remove_and_replay_epoch(
&mut executed_chunk,
&mut latest_view,
chunks_enqueued += self.remove_and_replay_epoch(
&mut transactions,
&mut transaction_infos,
&mut write_sets,
Expand All @@ -475,16 +484,13 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
)?;
}

self.commit_queue
.lock()
.enqueue_chunk_to_commit_directly(executed_chunk.expect("Nothing to commit."))?;
info!(
num_txns = num_txns,
tps = (num_txns as f64 / started.elapsed().as_secs_f64()),
"TransactionReplayer::replay() OK"
);

Ok(())
Ok(chunks_enqueued)
}

fn commit(&self) -> Result<Version> {
Expand All @@ -504,45 +510,42 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
.current_version
.expect("Version must exist after commit."))
}
}

impl<V: VMExecutor> ChunkExecutorInner<V> {
/// Remove `end_version - begin_version` transactions from the mutable input arguments and replay.
/// The input range indicated by `[begin_version, end_version]` is guaranteed not to cross epoch boundaries.
/// Notice there can be known broken versions inside the range.
fn remove_and_replay_epoch(
&self,
executed_chunk: &mut Option<ExecutedChunk>,
latest_view: &mut ExecutedTrees,
transactions: &mut Vec<Transaction>,
transaction_infos: &mut Vec<TransactionInfo>,
write_sets: &mut Vec<WriteSet>,
event_vecs: &mut Vec<Vec<ContractEvent>>,
begin_version: Version,
end_version: Version,
verify_execution_mode: &VerifyExecutionMode,
) -> Result<()> {
) -> Result<usize> {
// we try to apply the txns in sub-batches split by known txns to skip and the end of the batch
let txns_to_skip = verify_execution_mode.txns_to_skip();
let mut batch_ends = txns_to_skip
.range(begin_version..end_version)
.chain(once(&end_version));

let mut chunks_enqueued = 0;

let mut batch_begin = begin_version;
let mut batch_end = *batch_ends.next().unwrap();
while batch_begin < end_version {
if batch_begin == batch_end {
// batch_end is a known broken version that won't pass execution verification
self.remove_and_apply(
executed_chunk,
latest_view,
transactions,
transaction_infos,
write_sets,
event_vecs,
batch_begin,
batch_begin + 1,
)?;
chunks_enqueued += 1;
info!(
version_skipped = batch_begin,
"Skipped known broken transaction, applied transaction output directly."
Expand All @@ -555,7 +558,6 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
// Try to run the transactions with the VM
let next_begin = if verify_execution_mode.should_verify() {
self.verify_execution(
latest_view,
transactions,
transaction_infos,
write_sets,
Expand All @@ -568,24 +570,22 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
batch_end
};
self.remove_and_apply(
executed_chunk,
latest_view,
transactions,
transaction_infos,
write_sets,
event_vecs,
batch_begin,
next_begin,
)?;
chunks_enqueued += 1;
batch_begin = next_begin;
}

Ok(())
Ok(chunks_enqueued)
}

fn verify_execution(
&self,
latest_view: &mut ExecutedTrees,
transactions: &[Transaction],
transaction_infos: &[TransactionInfo],
write_sets: &[WriteSet],
Expand All @@ -595,7 +595,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
verify_execution_mode: &VerifyExecutionMode,
) -> Result<Version> {
// Execute transactions.
let state_view = self.latest_state_view(latest_view.state())?;
let state_view = self.latest_state_view(&self.commit_queue.lock().latest_state())?;
let txns = transactions
.iter()
.take((end_version - begin_version) as usize)
Expand Down Expand Up @@ -639,8 +639,6 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
/// It's guaranteed that there's no known broken versions or epoch endings in the range.
fn remove_and_apply(
&self,
executed_chunk: &mut Option<ExecutedChunk>,
latest_view: &mut ExecutedTrees,
transactions: &mut Vec<Transaction>,
transaction_infos: &mut Vec<TransactionInfo>,
write_sets: &mut Vec<WriteSet>,
Expand All @@ -650,7 +648,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
) -> Result<()> {
let num_txns = (end_version - begin_version) as usize;
let txn_infos: Vec<_> = transaction_infos.drain(..num_txns).collect();
let (txns, txn_outs) = multizip((
let (transactions, transaction_outputs) = multizip((
transactions.drain(..num_txns),
txn_infos.iter(),
write_sets.drain(..num_txns),
Expand All @@ -670,28 +668,16 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
})
.unzip();

let state_view = self.latest_state_view(latest_view.state())?;
let chunk_output = ChunkOutput::by_transaction_output(txns, txn_outs, state_view)?;
let (executed_batch, to_discard, to_retry) = chunk_output.apply_to_ledger(
latest_view,
Some(
txn_infos
.iter()
.map(|txn_info| txn_info.state_checkpoint_hash())
.collect(),
),
)?;
ensure_no_discard(to_discard)?;
ensure_no_retry(to_retry)?;
executed_batch
.ledger_update_output
.ensure_transaction_infos_match(&txn_infos)?;

match executed_chunk {
Some(chunk) => chunk.combine(executed_batch),
None => *executed_chunk = Some(executed_batch),
}
*latest_view = executed_chunk.as_ref().unwrap().result_view();
let chunk = ChunkToApply {
transactions,
transaction_outputs,
first_version: begin_version,
};
let chunk_verifier = Arc::new(ReplayChunkVerifier {
transaction_infos: txn_infos,
});
self.enqueue_chunk(chunk, chunk_verifier, "replay")?;

Ok(())
}
}
28 changes: 5 additions & 23 deletions execution/executor/src/components/chunk_commit_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,7 @@ impl ChunkCommitQueue {
}

pub(crate) fn expecting_version(&self) -> Version {
self.latest_txn_accumulator.num_leaves()
}

pub(crate) fn expect_latest_view(&self) -> Result<ExecutedTrees> {
ensure!(
self.to_update_ledger.is_empty(),
"Pending chunk to update_ledger, can't construct latest ExecutedTrees."
);
Ok(ExecutedTrees::new(
self.latest_state.clone(),
self.latest_txn_accumulator.clone(),
))
self.latest_state.next_version()
}

pub(crate) fn enqueue_for_ledger_update(
Expand Down Expand Up @@ -130,17 +119,6 @@ impl ChunkCommitQueue {
Ok((self.persisted_state.clone(), chunk))
}

pub(crate) fn enqueue_chunk_to_commit_directly(&mut self, chunk: ExecutedChunk) -> Result<()> {
ensure!(
self.to_update_ledger.is_empty(),
"Mixed usage of different modes."
);
self.latest_state = chunk.result_state.clone();
self.latest_txn_accumulator = chunk.ledger_update_output.transaction_accumulator.clone();
self.to_commit.push_back(Some(chunk));
Ok(())
}

pub(crate) fn dequeue_committed(&mut self, latest_state: StateDelta) -> Result<()> {
ensure!(!self.to_commit.is_empty(), "to_commit is empty.");
ensure!(
Expand All @@ -154,4 +132,8 @@ impl ChunkCommitQueue {
.log_generation("commit_queue_base");
Ok(())
}

pub(crate) fn is_empty(&self) -> bool {
self.to_commit.is_empty() && self.to_update_ledger.is_empty()
}
}
Loading

0 comments on commit 5a90c6a

Please sign in to comment.