Skip to content

Commit

Permalink
avoid TransactionToCommit (#14999)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Oct 18, 2024
1 parent 1645c96 commit 79d5257
Show file tree
Hide file tree
Showing 27 changed files with 538 additions and 566 deletions.
5 changes: 4 additions & 1 deletion consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ impl ExecutionProxy {
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result.transactions_to_commit();
let txns = state_compute_result
.ledger_update_output
.transactions
.clone();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand Down
59 changes: 27 additions & 32 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use aptos_types::{
proof::accumulator::InMemoryTransactionAccumulator,
state_store::ShardedStateUpdates,
transaction::{
block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit,
Version,
block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput,
TransactionStatus, Version,
},
};
use derive_more::Deref;
Expand All @@ -32,7 +32,7 @@ impl LedgerUpdateOutput {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
pub fn new_dummy_with_input_txns(txns: Vec<Transaction>) -> Self {
Self::new_impl(Inner::new_dummy_with_input_txns(txns))
}

Expand All @@ -46,7 +46,10 @@ impl LedgerUpdateOutput {

pub fn new(
statuses_for_input_txns: Vec<TransactionStatus>,
to_commit: Vec<TransactionToCommit>,
transactions: Vec<Transaction>,
transaction_outputs: Vec<TransactionOutput>,
transaction_infos: Vec<TransactionInfo>,
per_version_state_updates: Vec<ShardedStateUpdates>,
subscribable_events: Vec<ContractEvent>,
transaction_info_hashes: Vec<HashValue>,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -57,7 +60,10 @@ impl LedgerUpdateOutput {
) -> Self {
Self::new_impl(Inner {
statuses_for_input_txns,
to_commit,
transactions,
transaction_outputs,
transaction_infos,
per_version_state_updates,
subscribable_events,
transaction_info_hashes,
state_updates_until_last_checkpoint,
Expand All @@ -78,7 +84,10 @@ impl LedgerUpdateOutput {
#[derive(Default, Debug)]
pub struct Inner {
pub statuses_for_input_txns: Vec<TransactionStatus>,
pub to_commit: Vec<TransactionToCommit>,
pub transactions: Vec<Transaction>,
pub transaction_outputs: Vec<TransactionOutput>,
pub transaction_infos: Vec<TransactionInfo>,
pub per_version_state_updates: Vec<ShardedStateUpdates>,
pub subscribable_events: Vec<ContractEvent>,
pub transaction_info_hashes: Vec<HashValue>,
pub state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -100,17 +109,10 @@ impl Inner {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
let num_txns = txns.len();
let to_commit = txns
.into_iter()
.chain(std::iter::once(
aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()),
))
.map(TransactionToCommit::dummy_with_transaction)
.collect();
pub fn new_dummy_with_input_txns(transactions: Vec<Transaction>) -> Self {
let num_txns = transactions.len();
Self {
to_commit,
transactions,
statuses_for_input_txns: vec![
TransactionStatus::Keep(
aptos_types::transaction::ExecutionStatus::Success
Expand All @@ -136,19 +138,15 @@ impl Inner {
&self.transaction_accumulator
}

pub fn transactions_to_commit(&self) -> &Vec<TransactionToCommit> {
&self.to_commit
}

/// Ensure that every block committed by consensus ends with a state checkpoint. That can be
/// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused
/// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the
/// block.
pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> {
ensure!(
self.to_commit
self.transactions
.last()
.map_or(true, |txn| txn.transaction().is_non_reconfig_block_ending()),
.map_or(true, |t| t.is_non_reconfig_block_ending()),
"Block not ending with a state checkpoint.",
);
Ok(())
Expand All @@ -158,20 +156,17 @@ impl Inner {
&self,
transaction_infos: &[TransactionInfo],
) -> Result<()> {
let first_version =
self.transaction_accumulator.version() + 1 - self.to_commit.len() as Version;
ensure!(
self.transactions_to_commit().len() == transaction_infos.len(),
self.transaction_infos.len() == transaction_infos.len(),
"Lengths don't match. {} vs {}",
self.transactions_to_commit().len(),
self.transaction_infos.len(),
transaction_infos.len(),
);

let mut version = first_version;
for (txn_to_commit, expected_txn_info) in
zip_eq(self.to_commit.iter(), transaction_infos.iter())
let mut version = self.first_version();
for (txn_info, expected_txn_info) in
zip_eq(self.transaction_infos.iter(), transaction_infos.iter())
{
let txn_info = txn_to_commit.transaction_info();
ensure!(
txn_info == expected_txn_info,
"Transaction infos don't match. version:{version}, txn_info:{txn_info}, expected_txn_info:{expected_txn_info}",
Expand All @@ -186,10 +181,10 @@ impl Inner {
}

pub fn first_version(&self) -> Version {
self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version
self.parent_accumulator.num_leaves
}

pub fn num_txns(&self) -> usize {
self.to_commit.len()
self.transactions.len()
}
}
7 changes: 5 additions & 2 deletions execution/executor-types/src/parsed_transaction_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ impl TransactionsWithParsedOutput {
self.transactions
}

pub fn into_inner(self) -> (Vec<Transaction>, Vec<ParsedTransactionOutput>) {
(self.transactions, self.parsed_output)
pub fn into_inner(self) -> (Vec<Transaction>, Vec<TransactionOutput>) {
(
self.transactions,
self.parsed_output.into_iter().map(|t| t.output).collect(),
)
}

pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &ParsedTransactionOutput)> {
Expand Down
20 changes: 7 additions & 13 deletions execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,7 @@ impl StateComputeResult {
}

pub fn transactions_to_commit_len(&self) -> usize {
self.ledger_update_output.to_commit.len()
}

/// On top of input transactions (which contain BlockMetadata and Validator txns),
/// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed.
pub fn transactions_to_commit(&self) -> Vec<Transaction> {
self.ledger_update_output
.to_commit
.iter()
.map(|t| t.transaction.clone())
.collect()
self.ledger_update_output.transactions.len()
}

pub fn epoch_state(&self) -> &Option<EpochState> {
Expand Down Expand Up @@ -155,22 +145,26 @@ impl StateComputeResult {
pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.ledger_update_output.subscribable_events.clone(),
committed_transactions: self.transactions_to_commit(),
committed_transactions: self.ledger_update_output.transactions.clone(),
reconfiguration_occurred: self.has_reconfiguration(),
}
}

pub fn as_chunk_to_commit(&self) -> ChunkToCommit {
ChunkToCommit {
first_version: self.ledger_update_output.first_version(),
transactions: &self.ledger_update_output.transactions,
transaction_outputs: &self.ledger_update_output.transaction_outputs,
transaction_infos: &self.ledger_update_output.transaction_infos,
per_version_state_updates: &self.ledger_update_output.per_version_state_updates,
base_state_version: self.parent_state.base_version,
txns_to_commit: &self.ledger_update_output.to_commit,
latest_in_memory_state: &self.result_state,
state_updates_until_last_checkpoint: self
.ledger_update_output
.state_updates_until_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache),
is_reconfig: self.ledger_update_output.block_end_info.is_some(),
}
}
}
7 changes: 2 additions & 5 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::{
components::{
apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput,
block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate,
partial_state_compute_result::PartialStateComputeResult,
},
logging::{LogEntry, LogSchema},
Expand Down Expand Up @@ -318,10 +318,7 @@ where
parent_output.reconfig_suffix()
} else {
let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())
})?;
output
};
Expand Down
15 changes: 5 additions & 10 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
do_ledger_update::DoLedgerUpdate,
executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
Expand Down Expand Up @@ -349,10 +350,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let first_version = parent_accumulator.num_leaves();
let (ledger_update_output, to_discard, to_retry) = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]);
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)?
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())?
};

ensure!(to_discard.is_empty(), "Unexpected discard.");
Expand Down Expand Up @@ -494,19 +492,16 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let started = Instant::now();

let chunk = self.commit_chunk_impl()?;
let output = chunk.output.expect_complete_result();

let num_committed = chunk.transactions_to_commit().len();
let num_committed = output.transactions_to_commit_len();
info!(
num_committed = num_committed,
tps = num_committed as f64 / started.elapsed().as_secs_f64(),
"TransactionReplayer::commit() OK"
);

Ok(chunk
.output
.result_state
.current_version
.expect("Version must exist after commit."))
Ok(output.version())
}

/// Remove `end_version - begin_version` transactions from the mutable input arguments and replay.
Expand Down
Loading

0 comments on commit 79d5257

Please sign in to comment.