Skip to content

Commit

Permalink
Avoid TransactionToCommit
Browse files Browse the repository at this point in the history
1. avoid unnecessary packing and unpacking, it's desirable because I'm
   gonna make the PartialStateComputeResult consist of multiple
   immutable stage outputs.
2. extract DoLedgerUpdate algorithm
  • Loading branch information
msmouse committed Oct 18, 2024
1 parent 71e0302 commit 20f2fde
Show file tree
Hide file tree
Showing 27 changed files with 538 additions and 566 deletions.
5 changes: 4 additions & 1 deletion consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ impl ExecutionProxy {
Box::pin(async move {
pre_commit_notifier
.send(Box::pin(async move {
let txns = state_compute_result.transactions_to_commit();
let txns = state_compute_result
.ledger_update_output
.transactions
.clone();
let subscribable_events =
state_compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand Down
59 changes: 27 additions & 32 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use aptos_types::{
proof::accumulator::InMemoryTransactionAccumulator,
state_store::ShardedStateUpdates,
transaction::{
block_epilogue::BlockEndInfo, TransactionInfo, TransactionStatus, TransactionToCommit,
Version,
block_epilogue::BlockEndInfo, Transaction, TransactionInfo, TransactionOutput,
TransactionStatus, Version,
},
};
use derive_more::Deref;
Expand All @@ -32,7 +32,7 @@ impl LedgerUpdateOutput {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
pub fn new_dummy_with_input_txns(txns: Vec<Transaction>) -> Self {
Self::new_impl(Inner::new_dummy_with_input_txns(txns))
}

Expand All @@ -46,7 +46,10 @@ impl LedgerUpdateOutput {

pub fn new(
statuses_for_input_txns: Vec<TransactionStatus>,
to_commit: Vec<TransactionToCommit>,
transactions: Vec<Transaction>,
transaction_outputs: Vec<TransactionOutput>,
transaction_infos: Vec<TransactionInfo>,
per_version_state_updates: Vec<ShardedStateUpdates>,
subscribable_events: Vec<ContractEvent>,
transaction_info_hashes: Vec<HashValue>,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -57,7 +60,10 @@ impl LedgerUpdateOutput {
) -> Self {
Self::new_impl(Inner {
statuses_for_input_txns,
to_commit,
transactions,
transaction_outputs,
transaction_infos,
per_version_state_updates,
subscribable_events,
transaction_info_hashes,
state_updates_until_last_checkpoint,
Expand All @@ -78,7 +84,10 @@ impl LedgerUpdateOutput {
#[derive(Default, Debug)]
pub struct Inner {
pub statuses_for_input_txns: Vec<TransactionStatus>,
pub to_commit: Vec<TransactionToCommit>,
pub transactions: Vec<Transaction>,
pub transaction_outputs: Vec<TransactionOutput>,
pub transaction_infos: Vec<TransactionInfo>,
pub per_version_state_updates: Vec<ShardedStateUpdates>,
pub subscribable_events: Vec<ContractEvent>,
pub transaction_info_hashes: Vec<HashValue>,
pub state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
Expand All @@ -100,17 +109,10 @@ impl Inner {
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<aptos_types::transaction::Transaction>) -> Self {
let num_txns = txns.len();
let to_commit = txns
.into_iter()
.chain(std::iter::once(
aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()),
))
.map(TransactionToCommit::dummy_with_transaction)
.collect();
pub fn new_dummy_with_input_txns(transactions: Vec<Transaction>) -> Self {
let num_txns = transactions.len();
Self {
to_commit,
transactions,
statuses_for_input_txns: vec![
TransactionStatus::Keep(
aptos_types::transaction::ExecutionStatus::Success
Expand All @@ -136,19 +138,15 @@ impl Inner {
&self.transaction_accumulator
}

pub fn transactions_to_commit(&self) -> &Vec<TransactionToCommit> {
&self.to_commit
}

/// Ensure that every block committed by consensus ends with a state checkpoint. That can be
/// one of the two cases: 1. a reconfiguration (txns in the proposed block after the txn caused
/// the reconfiguration will be retried) 2. a Transaction::StateCheckpoint at the end of the
/// block.
pub fn ensure_ends_with_state_checkpoint(&self) -> Result<()> {
ensure!(
self.to_commit
self.transactions
.last()
.map_or(true, |txn| txn.transaction().is_non_reconfig_block_ending()),
.map_or(true, |t| t.is_non_reconfig_block_ending()),
"Block not ending with a state checkpoint.",
);
Ok(())
Expand All @@ -158,20 +156,17 @@ impl Inner {
&self,
transaction_infos: &[TransactionInfo],
) -> Result<()> {
let first_version =
self.transaction_accumulator.version() + 1 - self.to_commit.len() as Version;
ensure!(
self.transactions_to_commit().len() == transaction_infos.len(),
self.transaction_infos.len() == transaction_infos.len(),
"Lengths don't match. {} vs {}",
self.transactions_to_commit().len(),
self.transaction_infos.len(),
transaction_infos.len(),
);

let mut version = first_version;
for (txn_to_commit, expected_txn_info) in
zip_eq(self.to_commit.iter(), transaction_infos.iter())
let mut version = self.first_version();
for (txn_info, expected_txn_info) in
zip_eq(self.transaction_infos.iter(), transaction_infos.iter())
{
let txn_info = txn_to_commit.transaction_info();
ensure!(
txn_info == expected_txn_info,
"Transaction infos don't match. version:{version}, txn_info:{txn_info}, expected_txn_info:{expected_txn_info}",
Expand All @@ -186,10 +181,10 @@ impl Inner {
}

pub fn first_version(&self) -> Version {
self.transaction_accumulator.num_leaves() - self.to_commit.len() as Version
self.parent_accumulator.num_leaves
}

pub fn num_txns(&self) -> usize {
self.to_commit.len()
self.transactions.len()
}
}
7 changes: 5 additions & 2 deletions execution/executor-types/src/parsed_transaction_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ impl TransactionsWithParsedOutput {
self.transactions
}

pub fn into_inner(self) -> (Vec<Transaction>, Vec<ParsedTransactionOutput>) {
(self.transactions, self.parsed_output)
pub fn into_inner(self) -> (Vec<Transaction>, Vec<TransactionOutput>) {
(
self.transactions,
self.parsed_output.into_iter().map(|t| t.output).collect(),
)
}

pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &ParsedTransactionOutput)> {
Expand Down
20 changes: 7 additions & 13 deletions execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,7 @@ impl StateComputeResult {
}

pub fn transactions_to_commit_len(&self) -> usize {
self.ledger_update_output.to_commit.len()
}

/// On top of input transactions (which contain BlockMetadata and Validator txns),
/// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed.
pub fn transactions_to_commit(&self) -> Vec<Transaction> {
self.ledger_update_output
.to_commit
.iter()
.map(|t| t.transaction.clone())
.collect()
self.ledger_update_output.transactions.len()
}

pub fn epoch_state(&self) -> &Option<EpochState> {
Expand Down Expand Up @@ -155,22 +145,26 @@ impl StateComputeResult {
pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.ledger_update_output.subscribable_events.clone(),
committed_transactions: self.transactions_to_commit(),
committed_transactions: self.ledger_update_output.transactions.clone(),
reconfiguration_occurred: self.has_reconfiguration(),
}
}

pub fn as_chunk_to_commit(&self) -> ChunkToCommit {
ChunkToCommit {
first_version: self.ledger_update_output.first_version(),
transactions: &self.ledger_update_output.transactions,
transaction_outputs: &self.ledger_update_output.transaction_outputs,
transaction_infos: &self.ledger_update_output.transaction_infos,
per_version_state_updates: &self.ledger_update_output.per_version_state_updates,
base_state_version: self.parent_state.base_version,
txns_to_commit: &self.ledger_update_output.to_commit,
latest_in_memory_state: &self.result_state,
state_updates_until_last_checkpoint: self
.ledger_update_output
.state_updates_until_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.ledger_update_output.sharded_state_cache),
is_reconfig: self.ledger_update_output.block_end_info.is_some(),
}
}
}
7 changes: 2 additions & 5 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::{
components::{
apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput,
block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate,
partial_state_compute_result::PartialStateComputeResult,
},
logging::{LogEntry, LogSchema},
Expand Down Expand Up @@ -318,10 +318,7 @@ where
parent_output.reconfig_suffix()
} else {
let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())
})?;
output
};
Expand Down
15 changes: 5 additions & 10 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
do_ledger_update::DoLedgerUpdate,
executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
Expand Down Expand Up @@ -349,10 +350,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let first_version = parent_accumulator.num_leaves();
let (ledger_update_output, to_discard, to_retry) = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]);
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)?
DoLedgerUpdate::run(state_checkpoint_output, parent_accumulator.clone())?
};

ensure!(to_discard.is_empty(), "Unexpected discard.");
Expand Down Expand Up @@ -494,19 +492,16 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let started = Instant::now();

let chunk = self.commit_chunk_impl()?;
let output = chunk.output.expect_complete_result();

let num_committed = chunk.transactions_to_commit().len();
let num_committed = output.transactions_to_commit_len();
info!(
num_committed = num_committed,
tps = num_committed as f64 / started.elapsed().as_secs_f64(),
"TransactionReplayer::commit() OK"
);

Ok(chunk
.output
.result_state
.current_version
.expect("Version must exist after commit."))
Ok(output.version())
}

/// Remove `end_version - begin_version` transactions from the mutable input arguments and replay.
Expand Down
Loading

0 comments on commit 20f2fde

Please sign in to comment.