From 75ca612fe053c091631adba2cc4a3fd7e5db690d Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Wed, 31 May 2023 10:49:03 +0900 Subject: [PATCH] [Execution] Add TransactionDeduper trait and implementation using (hash, signature) (#8367) ### Description Adding TransactionDeduper trait and an implementation. The dedup is done within a block, just before transaction shuffle. It's guarded by an onchain config. The purpose is to not send duplicate transactions to execution. While execution correctness is not affected by duplicates (and other work removed false error logs that fired due to them), it's possible that duplicate transactions could hurt throughput of parallel execution. By deduping ahead of time, we don't have to worry about that. The implementation finds duplicates by matching (raw txn bcs hash, signature). Because calculating hash can be relatively expensive, it is only done when a shallow match is found of (account, seq_no), and it's done in parallel. Overhead (as seen on forge): * When there are no duplicates, the dedup is negligible -- it takes ~2ms per second. * When there are ~100 duplicates per block, the dedup takes ~10ms per second. ### Test Plan Added unit tests. --- Cargo.lock | 2 + aptos-move/aptos-vm/src/block_executor/mod.rs | 1 + consensus/Cargo.toml | 2 + consensus/src/counters.rs | 22 +- consensus/src/epoch_manager.rs | 4 + .../experimental/ordering_state_computer.rs | 2 + consensus/src/lib.rs | 2 + consensus/src/state_computer.rs | 26 +- consensus/src/state_replication.rs | 2 + .../src/test_utils/mock_state_computer.rs | 4 + consensus/src/transaction_deduper.rs | 33 ++ .../src/txn_hash_and_authenticator_deduper.rs | 362 ++++++++++++++++++ types/src/on_chain_config/execution_config.rs | 36 ++ types/src/on_chain_config/mod.rs | 3 +- types/src/transaction/mod.rs | 8 + 15 files changed, 502 insertions(+), 7 deletions(-) create mode 100644 consensus/src/transaction_deduper.rs create mode 100644 consensus/src/txn_hash_and_authenticator_deduper.rs diff --git a/Cargo.lock b/Cargo.lock index 0adcbf48d8d80d..ba6a4ab66ba74d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,6 +570,7 @@ dependencies = [ "anyhow", "aptos-bitvec", "aptos-bounded-executor", + "aptos-cached-packages", "aptos-channels", "aptos-config", "aptos-consensus-notifications", @@ -618,6 +619,7 @@ dependencies = [ "once_cell", "proptest", "rand 0.7.3", + "rayon", "serde 1.0.149", "serde_bytes", "serde_json", diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 358b58244fbf4b..da485cd6b1ff01 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -147,6 +147,7 @@ impl BlockAptosVM { // Verify the signatures of all the transactions in parallel. // This is time consuming so don't wait and do the checking // sequentially while executing the transactions. + // TODO: state sync runs this code but doesn't need to verify signatures let signature_verification_timer = BLOCK_EXECUTOR_SIGNATURE_VERIFICATION_SECONDS.start_timer(); let signature_verified_block: Vec = diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index ee58103d1dae19..6c14f6baa1e563 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -60,6 +60,7 @@ num-derive = { workspace = true } num-traits = { workspace = true } once_cell = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } @@ -68,6 +69,7 @@ tokio = { workspace = true } tokio-metrics = { workspace = true } [dev-dependencies] +aptos-cached-packages = { workspace = true } aptos-config = { workspace = true, features = ["fuzzing"] } aptos-consensus-types = { workspace = true, features = ["fuzzing"] } aptos-executor-test-helpers = { workspace = true } diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 0aa18617cf7e69..90a6cd0c687d52 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -122,12 +122,32 @@ pub static TXN_SHUFFLE_SECONDS: Lazy = Lazy::new(|| { // metric name "aptos_execution_transaction_shuffle_seconds", // metric description - "The time spent in seconds in initializing the VM in the block executor", + "The time spent in seconds in shuffle of transactions", exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), ) .unwrap() }); +/// Transaction dedup call latency +pub static TXN_DEDUP_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + // metric name + "aptos_execution_transaction_dedup_seconds", + // metric description + "The time spent in seconds in dedup of transaction", + exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + +/// Transaction dedup number of filtered +pub static TXN_DEDUP_FILTERED: Lazy = Lazy::new(|| { + register_avg_counter( + "aptos_execution_transaction_dedup_filtered", + "The number of duplicates filtered per block", + ) +}); + /// Number of rounds we were collecting votes for proposer /// (similar to PROPOSALS_COUNT, but can be larger, if we failed in creating/sending of the proposal) pub static PROPOSER_COLLECTED_ROUND_COUNT: Lazy = Lazy::new(|| { diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index c90a20aecf711a..e97b413283f571 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -46,6 +46,7 @@ use crate::{ recovery_manager::RecoveryManager, round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent}, state_replication::StateComputer, + transaction_deduper::create_transaction_deduper, transaction_shuffler::create_transaction_shuffler, util::time_service::TimeService, }; @@ -681,6 +682,8 @@ impl EpochManager { let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); let block_gas_limit = onchain_execution_config.block_gas_limit(); + let transaction_deduper = + create_transaction_deduper(onchain_execution_config.transaction_deduper_type()); self.quorum_store_msg_tx = quorum_store_msg_tx; let payload_client = QuorumStoreClient::new( @@ -694,6 +697,7 @@ impl EpochManager { payload_manager.clone(), transaction_shuffler, block_gas_limit, + transaction_deduper, ); let state_computer = if onchain_consensus_config.decoupled_execution() { Arc::new(self.spawn_decoupled_execution( diff --git a/consensus/src/experimental/ordering_state_computer.rs b/consensus/src/experimental/ordering_state_computer.rs index 7fb5f63e4c50ff..a71dbcf5917c80 100644 --- a/consensus/src/experimental/ordering_state_computer.rs +++ b/consensus/src/experimental/ordering_state_computer.rs @@ -10,6 +10,7 @@ use crate::{ }, payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; @@ -126,6 +127,7 @@ impl StateComputer for OrderingStateComputer { _payload_manager: Arc, _: Arc, _: Option, + _: Arc, ) { } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 455bb42c3e3fb1..88e735374cb097 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -49,7 +49,9 @@ pub mod counters; pub mod network_interface; mod payload_manager; mod sender_aware_shuffler; +mod transaction_deduper; mod transaction_shuffler; +mod txn_hash_and_authenticator_deduper; use aptos_metrics_core::IntGauge; pub use consensusdb::create_checkpoint; diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 1a8028651cf438..eab828da754d41 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -9,6 +9,7 @@ use crate::{ monitor, payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, txn_notifier::TxnNotifier, }; @@ -57,6 +58,7 @@ pub struct ExecutionProxy { write_mutex: AsyncMutex, payload_manager: Mutex>>, transaction_shuffler: Mutex>>, + transaction_deduper: Mutex>>, } impl ExecutionProxy { @@ -90,6 +92,7 @@ impl ExecutionProxy { write_mutex: AsyncMutex::new(LogicalTime::new(0, 0)), payload_manager: Mutex::new(None), transaction_shuffler: Mutex::new(None), + transaction_deduper: Mutex::new(None), } } } @@ -117,10 +120,12 @@ impl StateComputer for ExecutionProxy { ); let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); + let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); let txns = payload_manager.get_transactions(block).await?; - let shuffled_txns = txn_shuffler.shuffle(txns); + let deduped_txns = txn_deduper.dedup(txns); + let shuffled_txns = txn_shuffler.shuffle(deduped_txns); let block_gas_limit = self.executor.get_block_gas_limit(); @@ -177,6 +182,7 @@ impl StateComputer for ExecutionProxy { let block_timestamp = finality_proof.commit_info().timestamp_usecs(); let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); + let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); let block_gas_limit = self.executor.get_block_gas_limit(); @@ -189,7 +195,8 @@ impl StateComputer for ExecutionProxy { } let signed_txns = payload_manager.get_transactions(block.block()).await?; - let shuffled_txns = txn_shuffler.shuffle(signed_txns); + let deduped_txns = txn_deduper.dedup(signed_txns); + let shuffled_txns = txn_shuffler.shuffle(deduped_txns); txns.extend(block.transactions_to_commit( &self.validators.lock(), @@ -289,6 +296,7 @@ impl StateComputer for ExecutionProxy { payload_manager: Arc, transaction_shuffler: Arc, _block_gas_limit: Option, + transaction_deduper: Arc, ) { *self.validators.lock() = epoch_state .verifier @@ -301,6 +309,7 @@ impl StateComputer for ExecutionProxy { // TODO: Temporarily disable initializing block gas limit and leave it as default None, // until there is a better way to handle the possible panic when executor is initialized. // self.executor.update_block_gas_limit(block_gas_limit); + self.transaction_deduper.lock().replace(transaction_deduper); } // Clears the epoch-specific state. Only a sync_to call is expected before calling new_epoch @@ -313,11 +322,17 @@ impl StateComputer for ExecutionProxy { #[tokio::test] async fn test_commit_sync_race() { - use crate::{error::MempoolError, transaction_shuffler::create_transaction_shuffler}; + use crate::{ + error::MempoolError, transaction_deduper::create_transaction_deduper, + transaction_shuffler::create_transaction_shuffler, + }; use aptos_consensus_notifications::Error; use aptos_types::{ - aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo, - on_chain_config::TransactionShufflerType, transaction::SignedTransaction, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + ledger_info::LedgerInfo, + on_chain_config::{TransactionDeduperType, TransactionShufflerType}, + transaction::SignedTransaction, }; struct RecordedCommit { @@ -424,6 +439,7 @@ async fn test_commit_sync_race() { Arc::new(PayloadManager::DirectMempool), create_transaction_shuffler(TransactionShufflerType::NoShuffling), None, + create_transaction_deduper(TransactionDeduperType::NoDedup), ); executor .commit(&[], generate_li(1, 1), callback.clone()) diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index 8af7c35fed81db..55ea3400f85761 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -5,6 +5,7 @@ use crate::{ error::{QuorumStoreError, StateSyncError}, payload_manager::PayloadManager, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; @@ -78,6 +79,7 @@ pub trait StateComputer: Send + Sync { payload_manager: Arc, transaction_shuffler: Arc, block_gas_limit: Option, + transaction_deduper: Arc, ); // Reconfigure to clear epoch state at end of epoch. diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index eaac348fed730a..b03e90f60d7b75 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -8,6 +8,7 @@ use crate::{ payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, test_utils::mock_storage::MockStorage, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::{format_err, Result}; @@ -139,6 +140,7 @@ impl StateComputer for MockStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } @@ -176,6 +178,7 @@ impl StateComputer for EmptyStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } @@ -237,6 +240,7 @@ impl StateComputer for RandomComputeResultStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } diff --git a/consensus/src/transaction_deduper.rs b/consensus/src/transaction_deduper.rs new file mode 100644 index 00000000000000..24d76f0dd332ff --- /dev/null +++ b/consensus/src/transaction_deduper.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper; +use aptos_logger::info; +use aptos_types::{on_chain_config::TransactionDeduperType, transaction::SignedTransaction}; +use std::sync::Arc; + +/// Interface to dedup transactions. The dedup filters duplicate transactions within a block. +pub trait TransactionDeduper: Send + Sync { + fn dedup(&self, txns: Vec) -> Vec; +} + +/// No Op Deduper to maintain backward compatibility +struct NoOpDeduper {} + +impl TransactionDeduper for NoOpDeduper { + fn dedup(&self, txns: Vec) -> Vec { + txns + } +} + +pub fn create_transaction_deduper( + deduper_type: TransactionDeduperType, +) -> Arc { + match deduper_type { + TransactionDeduperType::NoDedup => Arc::new(NoOpDeduper {}), + TransactionDeduperType::TxnHashAndAuthenticatorV1 => { + info!("Using simple hash set transaction deduper"); + Arc::new(TxnHashAndAuthenticatorDeduper::new()) + }, + } +} diff --git a/consensus/src/txn_hash_and_authenticator_deduper.rs b/consensus/src/txn_hash_and_authenticator_deduper.rs new file mode 100644 index 00000000000000..38ec5aeef421b9 --- /dev/null +++ b/consensus/src/txn_hash_and_authenticator_deduper.rs @@ -0,0 +1,362 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + counters::{TXN_DEDUP_FILTERED, TXN_DEDUP_SECONDS}, + transaction_deduper::TransactionDeduper, +}; +use aptos_types::transaction::SignedTransaction; +use rayon::prelude::*; +use std::collections::{HashMap, HashSet}; + +/// An implementation of TransactionDeduper. Duplicate filtering is done using the pair +/// (raw_txn.hash(), authenticator). Both the hash and signature are required because dedup +/// happens before signatures are verified and transaction prologue is checked. (So, e.g., a bad +/// transaction could contain a txn and signature that are unrelated.) If the checks are done +/// beforehand only one of the txn hash or signature would be required. +/// +/// The implementation is written to avoid and/or parallelize the most expensive operations. Below +/// are the steps: +/// 1. Mark possible duplicates (sequential): Using a helper HashMap, mark transactions with 2+ +/// (sender, seq_no) pairs as possible duplicates. If no possible duplicates, return the original +/// transactions. +/// 2. Calculate txn hashes (parallel): For all possible duplicates, calculate the txn hash. This +/// is an expensive operation. +/// 3. Filter duplicates (sequential): Using a helper HashSet with the txn hashes calculated above +/// and signatures, filter actual duplicate transactions. +/// +/// Possible future optimizations: +/// a. Note the possible duplicates in Step 1 are independent of each other, so they could be +/// grouped independently and run in parallel in Step 3. +/// b. Txn hashes are calculated at many places within a validator. A per-txn hash cache could speed +/// up dedup or later operations. +/// c. If signature verification is moved to before dedup, then only the signature has to be matched +/// for duplicates and not the hash. +pub(crate) struct TxnHashAndAuthenticatorDeduper {} + +impl TransactionDeduper for TxnHashAndAuthenticatorDeduper { + fn dedup(&self, transactions: Vec) -> Vec { + let _timer = TXN_DEDUP_SECONDS.start_timer(); + let mut seen = HashMap::new(); + let mut is_possible_duplicate = false; + let mut possible_duplicates = vec![false; transactions.len()]; + for (i, txn) in transactions.iter().enumerate() { + match seen.get(&(txn.sender(), txn.sequence_number())) { + None => { + seen.insert((txn.sender(), txn.sequence_number()), i); + }, + Some(first_index) => { + is_possible_duplicate = true; + possible_duplicates[*first_index] = true; + possible_duplicates[i] = true; + }, + } + } + if !is_possible_duplicate { + TXN_DEDUP_FILTERED.observe(0 as f64); + return transactions; + } + + let hash_and_authenticators: Vec<_> = possible_duplicates + .into_par_iter() + .zip(&transactions) + .with_min_len(25) + .map(|(need_hash, txn)| match need_hash { + true => Some((txn.clone().committed_hash(), txn.authenticator())), + false => None, + }) + .collect(); + + // TODO: Possibly parallelize. See struct comment. + let mut seen_hashes = HashSet::new(); + let mut num_duplicates: usize = 0; + let filtered: Vec<_> = hash_and_authenticators + .into_iter() + .zip(transactions) + .filter_map(|(maybe_hash, txn)| match maybe_hash { + None => Some(txn), + Some(hash_and_authenticator) => { + if seen_hashes.insert(hash_and_authenticator) { + Some(txn) + } else { + num_duplicates += 1; + None + } + }, + }) + .collect(); + + TXN_DEDUP_FILTERED.observe(num_duplicates as f64); + filtered + } +} + +impl TxnHashAndAuthenticatorDeduper { + pub fn new() -> Self { + Self {} + } +} + +#[cfg(test)] +mod tests { + use crate::{ + transaction_deduper::TransactionDeduper, + txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper, + }; + use aptos_cached_packages::aptos_stdlib; + use aptos_crypto::ed25519::{Ed25519PrivateKey, Ed25519PublicKey}; + use aptos_keygen::KeyGen; + use aptos_types::{ + chain_id::ChainId, + transaction::{RawTransaction, Script, SignedTransaction, TransactionPayload}, + }; + use move_core_types::account_address::AccountAddress; + use std::time::Instant; + + struct Account { + addr: AccountAddress, + /// The current private key for this account. + pub privkey: Ed25519PrivateKey, + /// The current public key for this account. + pub pubkey: Ed25519PublicKey, + } + + impl Account { + pub fn new() -> Self { + let (privkey, pubkey) = KeyGen::from_os_rng().generate_ed25519_keypair(); + Self::with_keypair(privkey, pubkey) + } + + pub fn with_keypair(privkey: Ed25519PrivateKey, pubkey: Ed25519PublicKey) -> Self { + let addr = aptos_types::account_address::from_public_key(&pubkey); + Account { + addr, + privkey, + pubkey, + } + } + } + + fn raw_txn( + payload: TransactionPayload, + sender: AccountAddress, + seq_num: u64, + gas_unit_price: u64, + ) -> RawTransaction { + RawTransaction::new( + sender, + seq_num, + payload, + 500_000, + gas_unit_price, + 0, + ChainId::new(10), + ) + } + + fn empty_txn(sender: AccountAddress, seq_num: u64, gas_unit_price: u64) -> RawTransaction { + let payload = TransactionPayload::Script(Script::new(vec![], vec![], vec![])); + raw_txn(payload, sender, seq_num, gas_unit_price) + } + + fn peer_to_peer_txn( + sender: AccountAddress, + receiver: AccountAddress, + seq_num: u64, + gas_unit_price: u64, + ) -> RawTransaction { + let payload = aptos_stdlib::aptos_coin_transfer(receiver, 1); + raw_txn(payload, sender, seq_num, gas_unit_price) + } + + fn block(refs: Vec<&SignedTransaction>) -> Vec { + refs.into_iter().cloned().collect() + } + + #[test] + fn test_single_txn() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = vec![txn]; + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + } + + #[test] + fn test_single_duplicate() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = block(vec![&txn, &txn]); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + } + + #[test] + fn test_repeated_sequence_number() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + + let txn_0a = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner(); + // Different txn, same sender and sequence number. Should not be filtered. + let txn_0b = peer_to_peer_txn(sender.addr, receiver.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = block(vec![&txn_0a, &txn_0b, &txn_0a]); + let expected = block(vec![&txn_0a, &txn_0b]); + let deduped_txns = deduper.dedup(txns); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + } + + #[test] + fn test_bad_signer() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let bad_signer = Account::new(); + + // Txn signed by a bad signer (not the sender) + let txn_0a = empty_txn(sender.addr, 0, 100) + .sign(&bad_signer.privkey, bad_signer.pubkey.clone()) + .unwrap() + .into_inner(); + // Same txn, but signed by the correct signer (sender). Should not be filtered. + let txn_0b = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner(); + let txns = block(vec![&txn_0a, &txn_0b]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + } + + // The perf tests are simple micro-benchmarks and just output results without checking for regressions + static PERF_TXN_PER_BLOCK: usize = 10_000; + + fn measure_dedup_time( + deduper: TxnHashAndAuthenticatorDeduper, + txns: Vec, + ) -> f64 { + let start = Instant::now(); + let mut iterations = 0; + loop { + deduper.dedup(txns.clone()); + iterations += 1; + if iterations % 100 == 0 && start.elapsed().as_millis() > 2000 { + break; + } + } + let elapsed = start.elapsed(); + println!( + "elapsed: {}, iterations: {}, time per iteration: {}", + elapsed.as_secs_f64(), + iterations, + elapsed.as_secs_f64() / iterations as f64, + ); + elapsed.as_secs_f64() / iterations as f64 + } + + #[test] + fn test_performance_unique_empty_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txns: Vec<_> = (0..PERF_TXN_PER_BLOCK) + .into_iter() + .map(|i| { + empty_txn(sender.addr, i as u64, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner() + }) + .collect(); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_duplicate_empty_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns: Vec<_> = std::iter::repeat(txn.clone()) + .take(PERF_TXN_PER_BLOCK) + .collect(); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_unique_p2p_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + let txns: Vec<_> = (0..PERF_TXN_PER_BLOCK) + .into_iter() + .map(|i| { + peer_to_peer_txn(sender.addr, receiver.addr, i as u64, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner() + }) + .collect(); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_duplicate_p2p_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + let txn = peer_to_peer_txn(sender.addr, receiver.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns: Vec<_> = std::iter::repeat(txn.clone()) + .take(PERF_TXN_PER_BLOCK) + .collect(); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + + measure_dedup_time(deduper, txns); + } +} diff --git a/types/src/on_chain_config/execution_config.rs b/types/src/on_chain_config/execution_config.rs index 3b031ee39f40d5..642bf469d35024 100644 --- a/types/src/on_chain_config/execution_config.rs +++ b/types/src/on_chain_config/execution_config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub enum OnChainExecutionConfig { V1(ExecutionConfigV1), V2(ExecutionConfigV2), + V3(ExecutionConfigV3), } /// The public interface that exposes all values with safe fallback. @@ -19,6 +20,7 @@ impl OnChainExecutionConfig { match &self { OnChainExecutionConfig::V1(config) => config.transaction_shuffler_type.clone(), OnChainExecutionConfig::V2(config) => config.transaction_shuffler_type.clone(), + OnChainExecutionConfig::V3(config) => config.transaction_shuffler_type.clone(), } } @@ -27,6 +29,16 @@ impl OnChainExecutionConfig { match &self { OnChainExecutionConfig::V1(_config) => None, OnChainExecutionConfig::V2(config) => config.block_gas_limit, + OnChainExecutionConfig::V3(config) => config.block_gas_limit, + } + } + + /// The type of the transaction deduper being used. + pub fn transaction_deduper_type(&self) -> TransactionDeduperType { + match &self { + OnChainExecutionConfig::V1(_config) => TransactionDeduperType::NoDedup, + OnChainExecutionConfig::V2(_config) => TransactionDeduperType::NoDedup, + OnChainExecutionConfig::V3(config) => config.transaction_deduper_type.clone(), } } } @@ -84,6 +96,23 @@ impl Default for ExecutionConfigV2 { } } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct ExecutionConfigV3 { + pub transaction_shuffler_type: TransactionShufflerType, + pub block_gas_limit: Option, + pub transaction_deduper_type: TransactionDeduperType, +} + +impl Default for ExecutionConfigV3 { + fn default() -> Self { + Self { + transaction_shuffler_type: TransactionShufflerType::NoShuffling, + block_gas_limit: None, + transaction_deduper_type: TransactionDeduperType::NoDedup, + } + } +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it pub enum TransactionShufflerType { @@ -91,6 +120,13 @@ pub enum TransactionShufflerType { SenderAwareV1(u32), } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it +pub enum TransactionDeduperType { + NoDedup, + TxnHashAndAuthenticatorV1, +} + #[cfg(test)] mod test { use super::*; diff --git a/types/src/on_chain_config/mod.rs b/types/src/on_chain_config/mod.rs index 10f39d1dbd5a9b..e319a42ba47626 100644 --- a/types/src/on_chain_config/mod.rs +++ b/types/src/on_chain_config/mod.rs @@ -40,7 +40,8 @@ pub use self::{ ProposerElectionType, }, execution_config::{ - ExecutionConfigV1, ExecutionConfigV2, OnChainExecutionConfig, TransactionShufflerType, + ExecutionConfigV1, ExecutionConfigV2, OnChainExecutionConfig, TransactionDeduperType, + TransactionShufflerType, }, gas_schedule::{GasSchedule, GasScheduleV2, StorageGasSchedule}, timed_features::{TimedFeatureFlag, TimedFeatureOverride, TimedFeatures}, diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index 1d14060e4bbbb8..67605578c14969 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -555,6 +555,10 @@ impl SignedTransaction { self.authenticator.clone() } + pub fn authenticator_ref(&self) -> &TransactionAuthenticator { + &self.authenticator + } + pub fn sender(&self) -> AccountAddress { self.raw_txn.sender } @@ -563,6 +567,10 @@ impl SignedTransaction { self.raw_txn } + pub fn raw_transaction_ref(&self) -> &RawTransaction { + &self.raw_txn + } + pub fn sequence_number(&self) -> u64 { self.raw_txn.sequence_number }