Skip to content

Commit

Permalink
Revert "avoid TransactionToCommit (#14999)"
Browse files Browse the repository at this point in the history
This reverts commit 79d5257.
  • Loading branch information
msmouse committed Oct 18, 2024
1 parent f83c87e commit a48febf
Show file tree
Hide file tree
Showing 27 changed files with 566 additions and 538 deletions.
5 changes: 1 addition & 4 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ impl ExecutionProxy {
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result
.ledger_update_output
.transactions
.clone();
let txns = state_compute_result.transactions_to_commit();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand Down
59 changes: 32 additions & 27 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use aptos_types::{
proof::accumulator::InMemoryTransactionAccumulator,
state_store::ShardedStateUpdates,
transaction::{
block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput,
TransactionStatus, Version,
block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit,
Version,
},
};
use derive_more::Deref;
Expand All @@ -32,7 +32,7 @@ impl LedgerUpdateOutput {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<Transaction>) -> Self {
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
Self::new_impl(Inner::new_dummy_with_input_txns(txns))
}

Expand All @@ -46,10 +46,7 @@ impl LedgerUpdateOutput {

pub fn new(
statuses_for_input_txns: Vec<TransactionStatus>,
transactions: Vec<Transaction>,
transaction_outputs: Vec<TransactionOutput>,
transaction_infos: Vec<TransactionInfo>,
per_version_state_updates: Vec<ShardedStateUpdates>,
to_commit: Vec<TransactionToCommit>,
subscribable_events: Vec<ContractEvent>,
transaction_info_hashes: Vec<HashValue>,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -60,10 +57,7 @@ impl LedgerUpdateOutput {
) -> Self {
Self::new_impl(Inner {
statuses_for_input_txns,
transactions,
transaction_outputs,
transaction_infos,
per_version_state_updates,
to_commit,
subscribable_events,
transaction_info_hashes,
state_updates_until_last_checkpoint,
Expand All @@ -84,10 +78,7 @@ impl LedgerUpdateOutput {
#[derive(Default, Debug)]
pub struct Inner {
pub statuses_for_input_txns: Vec<TransactionStatus>,
pub transactions: Vec<Transaction>,
pub transaction_outputs: Vec<TransactionOutput>,
pub transaction_infos: Vec<TransactionInfo>,
pub per_version_state_updates: Vec<ShardedStateUpdates>,
pub to_commit: Vec<TransactionToCommit>,
pub subscribable_events: Vec<ContractEvent>,
pub transaction_info_hashes: Vec<HashValue>,
pub state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -109,10 +100,17 @@ impl Inner {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(transactions: Vec<Transaction>) -> Self {
let num_txns = transactions.len();
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
let num_txns = txns.len();
let to_commit = txns
.into_iter()
.chain(std::iter::once(
aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()),
))
.map(TransactionToCommit::dummy_with_transaction)
.collect();
Self {
transactions,
to_commit,
statuses_for_input_txns: vec![
TransactionStatus::Keep(
aptos_types::transaction::ExecutionStatus::Success
Expand All @@ -138,15 +136,19 @@ impl Inner {
&self.transaction_accumulator
}

pub fn transactions_to_commit(&self) -> &Vec<TransactionToCommit> {
&self.to_commit
}

/// Ensure that every block committed by consensus ends with a state checkpoint. That can be
/// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused
/// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the
/// block.
pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> {
ensure!(
self.transactions
self.to_commit
.last()
.map_or(true, |t| t.is_non_reconfig_block_ending()),
.map_or(true, |txn| txn.transaction().is_non_reconfig_block_ending()),
"Block not ending with a state checkpoint.",
);
Ok(())
Expand All @@ -156,17 +158,20 @@ impl Inner {
&self,
transaction_infos: &[TransactionInfo],
) -> Result<()> {
let first_version =
self.transaction_accumulator.version() + 1 - self.to_commit.len() as Version;
ensure!(
self.transaction_infos.len() == transaction_infos.len(),
self.transactions_to_commit().len() == transaction_infos.len(),
"Lengths don't match. {} vs {}",
self.transaction_infos.len(),
self.transactions_to_commit().len(),
transaction_infos.len(),
);

let mut version = self.first_version();
for (txn_info, expected_txn_info) in
zip_eq(self.transaction_infos.iter(), transaction_infos.iter())
let mut version = first_version;
for (txn_to_commit, expected_txn_info) in
zip_eq(self.to_commit.iter(), transaction_infos.iter())
{
let txn_info = txn_to_commit.transaction_info();
ensure!(
txn_info == expected_txn_info,
"Transaction infos don't match. version:{version}, txn_info:{txn_info}, expected_txn_info:{expected_txn_info}",
Expand All @@ -181,10 +186,10 @@ impl Inner {
}

pub fn first_version(&self) -> Version {
self.parent_accumulator.num_leaves
self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version
}

pub fn num_txns(&self) -> usize {
self.transactions.len()
self.to_commit.len()
}
}
7 changes: 2 additions & 5 deletions execution/executor-types/src/parsed_transaction_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,8 @@ impl TransactionsWithParsedOutput {
self.transactions
}

pub fn into_inner(self) -> (Vec<Transaction>, Vec<TransactionOutput>) {
(
self.transactions,
self.parsed_output.into_iter().map(|t| t.output).collect(),
)
pub fn into_inner(self) -> (Vec<Transaction>, Vec<ParsedTransactionOutput>) {
(self.transactions, self.parsed_output)
}

pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &ParsedTransactionOutput)> {
Expand Down
20 changes: 13 additions & 7 deletions execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,17 @@ impl StateComputeResult {
}

pub fn transactions_to_commit_len(&self) -> usize {
self.ledger_update_output.transactions.len()
self.ledger_update_output.to_commit.len()
}

/// On top of input transactions (which contain BlockMetadata and Validator txns),
/// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed.
pub fn transactions_to_commit(&self) -> Vec<Transaction> {
self.ledger_update_output
.to_commit
.iter()
.map(|t| t.transaction.clone())
.collect()
}

pub fn epoch_state(&self) -> &Option<EpochState> {
Expand Down Expand Up @@ -145,26 +155,22 @@ impl StateComputeResult {
pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.ledger_update_output.subscribable_events.clone(),
committed_transactions: self.ledger_update_output.transactions.clone(),
committed_transactions: self.transactions_to_commit(),
reconfiguration_occurred: self.has_reconfiguration(),
}
}

pub fn as_chunk_to_commit(&self) -> ChunkToCommit {
ChunkToCommit {
first_version: self.ledger_update_output.first_version(),
transactions: &self.ledger_update_output.transactions,
transaction_outputs: &self.ledger_update_output.transaction_outputs,
transaction_infos: &self.ledger_update_output.transaction_infos,
per_version_state_updates: &self.ledger_update_output.per_version_state_updates,
base_state_version: self.parent_state.base_version,
txns_to_commit: &self.ledger_update_output.to_commit,
latest_in_memory_state: &self.result_state,
state_updates_until_last_checkpoint: self
.ledger_update_output
.state_updates_until_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache),
is_reconfig: self.ledger_update_output.block_end_info.is_some(),
}
}
}
7 changes: 5 additions & 2 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::{
components::{
block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate,
apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput,
partial_state_compute_result::PartialStateComputeResult,
},
logging::{LogEntry, LogSchema},
Expand Down Expand Up @@ -318,7 +318,10 @@ where
parent_output.reconfig_suffix()
} else {
let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)
})?;
output
};
Expand Down
15 changes: 10 additions & 5 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
do_ledger_update::DoLedgerUpdate,
executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
Expand Down Expand Up @@ -350,7 +349,10 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let first_version = parent_accumulator.num_leaves();
let (ledger_update_output, to_discard, to_retry) = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]);
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())?
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)?
};

ensure!(to_discard.is_empty(), "Unexpected discard.");
Expand Down Expand Up @@ -492,16 +494,19 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let started = Instant::now();

let chunk = self.commit_chunk_impl()?;
let output = chunk.output.expect_complete_result();

let num_committed = output.transactions_to_commit_len();
let num_committed = chunk.transactions_to_commit().len();
info!(
num_committed = num_committed,
tps = num_committed as f64 / started.elapsed().as_secs_f64(),
"TransactionReplayer::commit() OK"
);

Ok(output.version())
Ok(chunk
.output
.result_state
.current_version
.expect("Version must exist after commit."))
}

/// Remove `end_version - begin_version` transactions from the mutable input arguments and replay.
Expand Down
Loading

0 comments on commit a48febf

Please sign in to comment.