Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Execution] Add TransactionDeduper trait and implementation using (hash, signature) #8367

Merged
merged 11 commits into from
May 31, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ num-derive = { workspace = true }
num-traits = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -68,6 +69,7 @@ tokio = { workspace = true }
tokio-metrics = { workspace = true }

[dev-dependencies]
aptos-cached-packages = { workspace = true }
aptos-config = { workspace = true, features = ["fuzzing"] }
aptos-consensus-types = { workspace = true, features = ["fuzzing"] }
aptos-executor-test-helpers = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,32 @@ pub static TXN_SHUFFLE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
// metric name
"aptos_execution_transaction_shuffle_seconds",
// metric description
"The time spent in seconds in initializing the VM in the block executor",
"The time spent in seconds in shuffle of transactions",
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Transaction dedup call latency
pub static TXN_DEDUP_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
// metric name
"aptos_execution_transaction_dedup_seconds",
// metric description
"The time spent in seconds in dedup of transaction",
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Transaction dedup number of filtered
pub static TXN_DEDUP_FILTERED: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"aptos_execution_transaction_dedup_filtered",
"The number of duplicates filtered per block",
)
});

/// Number of rounds we were collecting votes for proposer
/// (similar to PROPOSALS_COUNT, but can be larger, if we failed in creating/sending of the proposal)
pub static PROPOSER_COLLECTED_ROUND_COUNT: Lazy<IntCounter> = Lazy::new(|| {
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
recovery_manager::RecoveryManager,
round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent},
state_replication::StateComputer,
transaction_deduper::create_transaction_deduper,
transaction_shuffler::create_transaction_shuffler,
util::time_service::TimeService,
};
Expand Down Expand Up @@ -681,6 +682,8 @@ impl EpochManager {
let transaction_shuffler =
create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type());
let block_gas_limit = onchain_execution_config.block_gas_limit();
let transaction_deduper =
create_transaction_deduper(onchain_execution_config.transaction_deduper_type());
self.quorum_store_msg_tx = quorum_store_msg_tx;

let payload_client = QuorumStoreClient::new(
Expand All @@ -694,6 +697,7 @@ impl EpochManager {
payload_manager.clone(),
transaction_shuffler,
block_gas_limit,
transaction_deduper,
);
let state_computer = if onchain_consensus_config.decoupled_execution() {
Arc::new(self.spawn_decoupled_execution(
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/experimental/ordering_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::Result;
Expand Down Expand Up @@ -126,6 +127,7 @@ impl StateComputer for OrderingStateComputer {
_payload_manager: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down
2 changes: 2 additions & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub mod counters;
pub mod network_interface;
mod payload_manager;
mod sender_aware_shuffler;
mod transaction_deduper;
mod transaction_shuffler;
mod txn_hash_and_authenticator_deduper;

use aptos_metrics_core::IntGauge;
pub use consensusdb::create_checkpoint;
Expand Down
26 changes: 21 additions & 5 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
monitor,
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
txn_notifier::TxnNotifier,
};
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct ExecutionProxy {
write_mutex: AsyncMutex<LogicalTime>,
payload_manager: Mutex<Option<Arc<PayloadManager>>>,
transaction_shuffler: Mutex<Option<Arc<dyn TransactionShuffler>>>,
transaction_deduper: Mutex<Option<Arc<dyn TransactionDeduper>>>,
}

impl ExecutionProxy {
Expand Down Expand Up @@ -90,6 +92,7 @@ impl ExecutionProxy {
write_mutex: AsyncMutex::new(LogicalTime::new(0, 0)),
payload_manager: Mutex::new(None),
transaction_shuffler: Mutex::new(None),
transaction_deduper: Mutex::new(None),
}
}
}
Expand Down Expand Up @@ -117,10 +120,12 @@ impl StateComputer for ExecutionProxy {
);

let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone();
let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone();
let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone();
let txns = payload_manager.get_transactions(block).await?;

let shuffled_txns = txn_shuffler.shuffle(txns);
let deduped_txns = txn_deduper.dedup(txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

let block_gas_limit = self.executor.get_block_gas_limit();

Expand Down Expand Up @@ -177,6 +182,7 @@ impl StateComputer for ExecutionProxy {
let block_timestamp = finality_proof.commit_info().timestamp_usecs();

let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone();
let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone();
let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone();

let block_gas_limit = self.executor.get_block_gas_limit();
Expand All @@ -189,7 +195,8 @@ impl StateComputer for ExecutionProxy {
}

let signed_txns = payload_manager.get_transactions(block.block()).await?;
let shuffled_txns = txn_shuffler.shuffle(signed_txns);
let deduped_txns = txn_deduper.dedup(signed_txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

txns.extend(block.transactions_to_commit(
&self.validators.lock(),
Expand Down Expand Up @@ -289,6 +296,7 @@ impl StateComputer for ExecutionProxy {
payload_manager: Arc<PayloadManager>,
transaction_shuffler: Arc<dyn TransactionShuffler>,
_block_gas_limit: Option<u64>,
transaction_deduper: Arc<dyn TransactionDeduper>,
) {
*self.validators.lock() = epoch_state
.verifier
Expand All @@ -301,6 +309,7 @@ impl StateComputer for ExecutionProxy {
// TODO: Temporarily disable initializing block gas limit and leave it as default None,
// until there is a better way to handle the possible panic when executor is initialized.
// self.executor.update_block_gas_limit(block_gas_limit);
self.transaction_deduper.lock().replace(transaction_deduper);
}

// Clears the epoch-specific state. Only a sync_to call is expected before calling new_epoch
Expand All @@ -313,11 +322,17 @@ impl StateComputer for ExecutionProxy {

#[tokio::test]
async fn test_commit_sync_race() {
use crate::{error::MempoolError, transaction_shuffler::create_transaction_shuffler};
use crate::{
error::MempoolError, transaction_deduper::create_transaction_deduper,
transaction_shuffler::create_transaction_shuffler,
};
use aptos_consensus_notifications::Error;
use aptos_types::{
aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo,
on_chain_config::TransactionShufflerType, transaction::SignedTransaction,
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
ledger_info::LedgerInfo,
on_chain_config::{TransactionDeduperType, TransactionShufflerType},
transaction::SignedTransaction,
};

struct RecordedCommit {
Expand Down Expand Up @@ -424,6 +439,7 @@ async fn test_commit_sync_race() {
Arc::new(PayloadManager::DirectMempool),
create_transaction_shuffler(TransactionShufflerType::NoShuffling),
None,
create_transaction_deduper(TransactionDeduperType::NoDedup),
);
executor
.commit(&[], generate_li(1, 1), callback.clone())
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/state_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
error::{QuorumStoreError, StateSyncError},
payload_manager::PayloadManager,
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::Result;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait StateComputer: Send + Sync {
payload_manager: Arc<PayloadManager>,
transaction_shuffler: Arc<dyn TransactionShuffler>,
block_gas_limit: Option<u64>,
transaction_deduper: Arc<dyn TransactionDeduper>,
);

// Reconfigure to clear epoch state at end of epoch.
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/test_utils/mock_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
test_utils::mock_storage::MockStorage,
transaction_deduper::TransactionDeduper,
transaction_shuffler::TransactionShuffler,
};
use anyhow::{format_err, Result};
Expand Down Expand Up @@ -139,6 +140,7 @@ impl StateComputer for MockStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down Expand Up @@ -176,6 +178,7 @@ impl StateComputer for EmptyStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down Expand Up @@ -237,6 +240,7 @@ impl StateComputer for RandomComputeResultStateComputer {
_: Arc<PayloadManager>,
_: Arc<dyn TransactionShuffler>,
_: Option<u64>,
_: Arc<dyn TransactionDeduper>,
) {
}

Expand Down
33 changes: 33 additions & 0 deletions consensus/src/transaction_deduper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper;
use aptos_logger::info;
use aptos_types::{on_chain_config::TransactionDeduperType, transaction::SignedTransaction};
use std::sync::Arc;

/// Interface to dedup transactions
pub trait TransactionDeduper: Send + Sync {
fn dedup(&self, txns: Vec<SignedTransaction>) -> Vec<SignedTransaction>;
}

/// No Op Deduper to maintain backward compatibility
pub struct NoOpDeduper {}

impl TransactionDeduper for NoOpDeduper {
fn dedup(&self, txns: Vec<SignedTransaction>) -> Vec<SignedTransaction> {
txns
}
}

pub fn create_transaction_deduper(
deduper_type: TransactionDeduperType,
) -> Arc<dyn TransactionDeduper> {
match deduper_type {
TransactionDeduperType::NoDedup => Arc::new(NoOpDeduper {}),
TransactionDeduperType::TxnHashAndAuthenticatorV1 => {
info!("Using simple hash set transaction deduper");
Arc::new(TxnHashAndAuthenticatorDeduper::new())
},
}
}
Loading