Skip to content

Commit

Permalink
[Execution] Add TransactionDeduper trait and implementation using (ha…
Browse files Browse the repository at this point in the history
…sh, signature) (#8367)

### Description

Adding TransactionDeduper trait and an implementation. The dedup is done within a block, just before transaction shuffle. It's guarded by an onchain config.

The purpose is to not send duplicate transactions to execution. While execution correctness is not affected by duplicates (and other work removed false error logs that fired due to them), it's possible that duplicate transactions could hurt throughput of parallel execution. By deduping ahead of time, we don't have to worry about that.

The implementation finds duplicates by matching (raw txn bcs hash, signature). Because calculating hash can be relatively expensive, it is only done when a shallow match is found of (account, seq_no), and it's done in parallel.

Overhead (as seen on forge):
* When there are no duplicates, the dedup is negligible -- it takes ~2ms per second.
* When there are ~100 duplicates per block, the dedup takes ~10ms per second.

### Test Plan

Added unit tests.
  • Loading branch information
bchocho committed Jun 1, 2023
1 parent 1d5d5df commit 75ca612
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl BlockAptosVM {
// Verify the signatures of all the transactions in parallel.
// This is time consuming so don't wait and do the checking
// sequentially while executing the transactions.
// TODO: state sync runs this code but doesn't need to verify signatures
let signature_verification_timer =
BLOCK_EXECUTOR_SIGNATURE_VERIFICATION_SECONDS.start_timer();
let signature_verified_block: Vec<PreprocessedTransaction> =
Expand Down
2 changes: 2 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ num-derive = { workspace = true }
num-traits = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -68,6 +69,7 @@ tokio = { workspace = true }
tokio-metrics = { workspace = true }

[dev-dependencies]
aptos-cached-packages = { workspace = true }
aptos-config = { workspace = true, features = ["fuzzing"] }
aptos-consensus-types = { workspace = true, features = ["fuzzing"] }
aptos-executor-test-helpers = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,32 @@ pub static TXN_SHUFFLE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
// metric name
"aptos_execution_transaction_shuffle_seconds",
// metric description
"The time spent in seconds in initializing the VM in the block executor",
"The time spent in seconds in shuffle of transactions",
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Transaction dedup call latency
pub static TXN_DEDUP_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
// metric name
"aptos_execution_transaction_dedup_seconds",
// metric description
"The time spent in seconds in dedup of transaction",
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Transaction dedup number of filtered
pub static TXN_DEDUP_FILTERED: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"aptos_execution_transaction_dedup_filtered",
"The number of duplicates filtered per block",
)
});

/// Number of rounds we were collecting votes for proposer
/// (similar to PROPOSALS_COUNT, but can be larger, if we failed in creating/sending of the proposal)
pub static PROPOSER_COLLECTED_ROUND_COUNT: Lazy<IntCounter> = Lazy::new(|| {
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
recovery_manager::RecoveryManager,
round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent},
state_replication::StateComputer,
transaction_deduper::create_transaction_deduper,
transaction_shuffler::create_transaction_shuffler,
util::time_service::TimeService,
};
Expand Down Expand Up @@ -681,6 +682,8 @@ impl EpochManager {
let transaction_shuffler =
create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type());
let block_gas_limit = onchain_execution_config.block_gas_limit();
let transaction_deduper =
create_transaction_deduper(onchain_execution_config.transaction_deduper_type());
self.quorum_store_msg_tx = quorum_store_msg_tx;

let payload_client = QuorumStoreClient::new(
Expand All @@ -694,6 +697,7 @@ impl EpochManager {
payload_manager.clone(),
transaction_shuffler,
block_gas_limit,
transaction_deduper,
);
let state_computer = if onchain_consensus_config.decoupled_execution() {
Arc::new(self.spawn_decoupled_execution(
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/experimental/ordering_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::Result;
Expand Down Expand Up @@ -126,6 +127,7 @@ impl StateComputer for OrderingStateComputer {
_payload_manager: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down
2 changes: 2 additions & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub mod counters;
pub mod network_interface;
mod payload_manager;
mod sender_aware_shuffler;
mod transaction_deduper;
mod transaction_shuffler;
mod txn_hash_and_authenticator_deduper;

use aptos_metrics_core::IntGauge;
pub use consensusdb::create_checkpoint;
Expand Down
26 changes: 21 additions & 5 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
monitor,
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
txn_notifier::TxnNotifier,
};
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct ExecutionProxy {
write_mutex: AsyncMutex<LogicalTime>,
payload_manager: Mutex<Option<Arc<PayloadManager>>>,
transaction_shuffler: Mutex<Option<Arc<dyn TransactionShuffler>>>,
transaction_deduper: Mutex<Option<Arc<dyn TransactionDeduper>>>,
}

impl ExecutionProxy {
Expand Down Expand Up @@ -90,6 +92,7 @@ impl ExecutionProxy {
write_mutex: AsyncMutex::new(LogicalTime::new(0, 0)),
payload_manager: Mutex::new(None),
transaction_shuffler: Mutex::new(None),
transaction_deduper: Mutex::new(None),
}
}
}
Expand Down Expand Up @@ -117,10 +120,12 @@ impl StateComputer for ExecutionProxy {
);

let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone();
let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone();
let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone();
let txns = payload_manager.get_transactions(block).await?;

let shuffled_txns = txn_shuffler.shuffle(txns);
let deduped_txns = txn_deduper.dedup(txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

let block_gas_limit = self.executor.get_block_gas_limit();

Expand Down Expand Up @@ -177,6 +182,7 @@ impl StateComputer for ExecutionProxy {
let block_timestamp = finality_proof.commit_info().timestamp_usecs();

let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone();
let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone();
let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone();

let block_gas_limit = self.executor.get_block_gas_limit();
Expand All @@ -189,7 +195,8 @@ impl StateComputer for ExecutionProxy {
}

let signed_txns = payload_manager.get_transactions(block.block()).await?;
let shuffled_txns = txn_shuffler.shuffle(signed_txns);
let deduped_txns = txn_deduper.dedup(signed_txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

txns.extend(block.transactions_to_commit(
&self.validators.lock(),
Expand Down Expand Up @@ -289,6 +296,7 @@ impl StateComputer for ExecutionProxy {
payload_manager: Arc<PayloadManager>,
transaction_shuffler: Arc<dyn TransactionShuffler>,
_block_gas_limit: Option<u64>,
transaction_deduper: Arc<dyn TransactionDeduper>,
) {
*self.validators.lock() = epoch_state
.verifier
Expand All @@ -301,6 +309,7 @@ impl StateComputer for ExecutionProxy {
// TODO: Temporarily disable initializing block gas limit and leave it as default None,
// until there is a better way to handle the possible panic when executor is initialized.
// self.executor.update_block_gas_limit(block_gas_limit);
self.transaction_deduper.lock().replace(transaction_deduper);
}

// Clears the epoch-specific state. Only a sync_to call is expected before calling new_epoch
Expand All @@ -313,11 +322,17 @@ impl StateComputer for ExecutionProxy {

#[tokio::test]
async fn test_commit_sync_race() {
use crate::{error::MempoolError, transaction_shuffler::create_transaction_shuffler};
use crate::{
error::MempoolError, transaction_deduper::create_transaction_deduper,
transaction_shuffler::create_transaction_shuffler,
};
use aptos_consensus_notifications::Error;
use aptos_types::{
aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo,
on_chain_config::TransactionShufflerType, transaction::SignedTransaction,
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
ledger_info::LedgerInfo,
on_chain_config::{TransactionDeduperType, TransactionShufflerType},
transaction::SignedTransaction,
};

struct RecordedCommit {
Expand Down Expand Up @@ -424,6 +439,7 @@ async fn test_commit_sync_race() {
Arc::new(PayloadManager::DirectMempool),
create_transaction_shuffler(TransactionShufflerType::NoShuffling),
None,
create_transaction_deduper(TransactionDeduperType::NoDedup),
);
executor
.commit(&[], generate_li(1, 1), callback.clone())
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/state_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
error::{QuorumStoreError, StateSyncError},
payload_manager::PayloadManager,
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::Result;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait StateComputer: Send + Sync {
payload_manager: Arc<PayloadManager>,
transaction_shuffler: Arc<dyn TransactionShuffler>,
block_gas_limit: Option<u64>,
transaction_deduper: Arc<dyn TransactionDeduper>,
);

// Reconfigure to clear epoch state at end of epoch.
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/test_utils/mock_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
test_utils::mock_storage::MockStorage,
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::{format_err, Result};
Expand Down Expand Up @@ -139,6 +140,7 @@ impl StateComputer for MockStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down Expand Up @@ -176,6 +178,7 @@ impl StateComputer for EmptyStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down Expand Up @@ -237,6 +240,7 @@ impl StateComputer for RandomComputeResultStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down
33 changes: 33 additions & 0 deletions consensus/src/transaction_deduper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper;
use aptos_logger::info;
use aptos_types::{on_chain_config::TransactionDeduperType, transaction::SignedTransaction};
use std::sync::Arc;

/// Interface to dedup transactions. The dedup filters duplicate transactions within a block.
pub trait TransactionDeduper: Send + Sync {
fn dedup(&self, txns: Vec<SignedTransaction>) -> Vec<SignedTransaction>;
}

/// No Op Deduper to maintain backward compatibility
struct NoOpDeduper {}

impl TransactionDeduper for NoOpDeduper {
fn dedup(&self, txns: Vec<SignedTransaction>) -> Vec<SignedTransaction> {
txns
}
}

pub fn create_transaction_deduper(
deduper_type: TransactionDeduperType,
) -> Arc<dyn TransactionDeduper> {
match deduper_type {
TransactionDeduperType::NoDedup => Arc::new(NoOpDeduper {}),
TransactionDeduperType::TxnHashAndAuthenticatorV1 => {
info!("Using simple hash set transaction deduper");
Arc::new(TxnHashAndAuthenticatorDeduper::new())
},
}
}
Loading

0 comments on commit 75ca612

Please sign in to comment.