Skip to content

Commit

Permalink
filter previous block's commits at start of execute
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Sep 10, 2024
1 parent 613a35c commit ee70734
Showing 1 changed file with 74 additions and 4 deletions.
78 changes: 74 additions & 4 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ use crate::{
monitor,
state_computer::{PipelineExecutionResult, StateComputeResultFut},
};
use aptos_consensus_types::{block::Block, pipelined_block::OrderedBlockWindow};
use aptos_consensus_types::{block::Block, common::Round, pipelined_block::OrderedBlockWindow};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorError,
ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, error};
use aptos_logger::{debug, error, info};
use aptos_types::{
block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock},
block_executor::{
config::BlockExecutorConfigFromOnchain,
partitioner::{ExecutableBlock, ExecutableTransactions},
},
block_metadata_ext::BlockMetadataExt,
transaction::{
signature_verified_transaction::SignatureVerifiedTransaction, SignedTransaction,
signature_verified_transaction::{
SignatureVerifiedTransaction, SignatureVerifiedTransaction::Valid,
},
SignedTransaction,
Transaction::UserTransaction,
},
};
use fail::fail_point;
use once_cell::sync::Lazy;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -142,6 +150,8 @@ impl ExecutionPipeline {
.send(ExecuteBlockCommand {
input_txns,
block: (block.id(), sig_verified_txns).into(),
block_window,
round: block.round(),
parent_block_id,
block_executor_onchain_config,
result_tx,
Expand Down Expand Up @@ -173,12 +183,70 @@ impl ExecutionPipeline {
while let Some(ExecuteBlockCommand {
input_txns,
block,
block_window,
round,
parent_block_id,
block_executor_onchain_config,
result_tx,
}) = block_rx.recv().await
{
let block_id = block.block_id;

let now = Instant::now();
// Filter out transactions that were committed in the previous block.
let previous_block_transactions =
if let Some(latest_block) = block_window.pipelined_blocks().last() {
if latest_block.round() == round - 1 {
latest_block.wait_for_committed_transactions().to_vec()
} else {
vec![]
}
} else {
vec![]
};
let mut committed_transactions: HashSet<HashValue> = HashSet::new();
for hash in previous_block_transactions {
committed_transactions.insert(hash);
}

// TODO: if filter out causes issues, we can move the transactions to the back
// TODO: such a hacky way of filtering both views, need to combine into one if possible
let input_txns_len = input_txns.len();
let input_txns: Vec<_> = input_txns
.into_iter()
.filter(|txn| !committed_transactions.contains(&txn.committed_hash()))
.collect();
info!(
"(round: {}) Filtered out {} transactions from the previous block, in {} ms",
round,
input_txns_len - input_txns.len(),
now.elapsed().as_millis()
);

let transactions = match block.transactions {
ExecutableTransactions::Unsharded(txns) => {
let transactions: Vec<_> = txns
.into_iter()
.filter(|txn| {
if let Valid(txn) = txn {
if let UserTransaction(user_txn) = txn {
!committed_transactions.contains(&user_txn.committed_hash())
} else {
true
}
} else {
true
}
})
.collect();
ExecutableTransactions::Unsharded(transactions)
},
ExecutableTransactions::Sharded(_) => {
unimplemented!("Sharded transactions are not supported yet.")
},
};
let block = ExecutableBlock::new(block.block_id, transactions);

debug!("execute_stage received block {}.", block_id);
let executor = executor.clone();
let state_checkpoint_output = monitor!(
Expand Down Expand Up @@ -267,6 +335,8 @@ struct PrepareBlockCommand {
struct ExecuteBlockCommand {
input_txns: Vec<SignedTransaction>,
block: ExecutableBlock,
block_window: OrderedBlockWindow,
round: Round,
parent_block_id: HashValue,
block_executor_onchain_config: BlockExecutorConfigFromOnchain,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
Expand Down

0 comments on commit ee70734

Please sign in to comment.