diff --git a/Cargo.lock b/Cargo.lock index 59260b6b179ab..c6b3c015805df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,6 +570,7 @@ dependencies = [ "anyhow", "aptos-bitvec", "aptos-bounded-executor", + "aptos-cached-packages", "aptos-channels", "aptos-config", "aptos-consensus-notifications", @@ -618,6 +619,7 @@ dependencies = [ "once_cell", "proptest", "rand 0.7.3", + "rayon", "serde 1.0.149", "serde_bytes", "serde_json", diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 358b58244fbf4..da485cd6b1ff0 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -147,6 +147,7 @@ impl BlockAptosVM { // Verify the signatures of all the transactions in parallel. // This is time consuming so don't wait and do the checking // sequentially while executing the transactions. + // TODO: state sync runs this code but doesn't need to verify signatures let signature_verification_timer = BLOCK_EXECUTOR_SIGNATURE_VERIFICATION_SECONDS.start_timer(); let signature_verified_block: Vec = diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index ee58103d1dae1..6c14f6baa1e56 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -60,6 +60,7 @@ num-derive = { workspace = true } num-traits = { workspace = true } once_cell = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } @@ -68,6 +69,7 @@ tokio = { workspace = true } tokio-metrics = { workspace = true } [dev-dependencies] +aptos-cached-packages = { workspace = true } aptos-config = { workspace = true, features = ["fuzzing"] } aptos-consensus-types = { workspace = true, features = ["fuzzing"] } aptos-executor-test-helpers = { workspace = true } diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 0aa18617cf7e6..90a6cd0c687d5 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -122,12 +122,32 @@ pub static TXN_SHUFFLE_SECONDS: Lazy = Lazy::new(|| { // metric name "aptos_execution_transaction_shuffle_seconds", // metric description - "The time spent in seconds in initializing the VM in the block executor", + "The time spent in seconds in shuffle of transactions", exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), ) .unwrap() }); +/// Transaction dedup call latency +pub static TXN_DEDUP_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + // metric name + "aptos_execution_transaction_dedup_seconds", + // metric description + "The time spent in seconds in dedup of transaction", + exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + +/// Transaction dedup number of filtered +pub static TXN_DEDUP_FILTERED: Lazy = Lazy::new(|| { + register_avg_counter( + "aptos_execution_transaction_dedup_filtered", + "The number of duplicates filtered per block", + ) +}); + /// Number of rounds we were collecting votes for proposer /// (similar to PROPOSALS_COUNT, but can be larger, if we failed in creating/sending of the proposal) pub static PROPOSER_COLLECTED_ROUND_COUNT: Lazy = Lazy::new(|| { diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index c90a20aecf711..e97b413283f57 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -46,6 +46,7 @@ use crate::{ recovery_manager::RecoveryManager, round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent}, state_replication::StateComputer, + transaction_deduper::create_transaction_deduper, transaction_shuffler::create_transaction_shuffler, util::time_service::TimeService, }; @@ -681,6 +682,8 @@ impl EpochManager { let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); let block_gas_limit = onchain_execution_config.block_gas_limit(); + let transaction_deduper = + create_transaction_deduper(onchain_execution_config.transaction_deduper_type()); self.quorum_store_msg_tx = quorum_store_msg_tx; let payload_client = QuorumStoreClient::new( @@ -694,6 +697,7 @@ impl EpochManager { payload_manager.clone(), transaction_shuffler, block_gas_limit, + transaction_deduper, ); let state_computer = if onchain_consensus_config.decoupled_execution() { Arc::new(self.spawn_decoupled_execution( diff --git a/consensus/src/experimental/ordering_state_computer.rs b/consensus/src/experimental/ordering_state_computer.rs index 7fb5f63e4c50f..a71dbcf5917c8 100644 --- a/consensus/src/experimental/ordering_state_computer.rs +++ b/consensus/src/experimental/ordering_state_computer.rs @@ -10,6 +10,7 @@ use crate::{ }, payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; @@ -126,6 +127,7 @@ impl StateComputer for OrderingStateComputer { _payload_manager: Arc, _: Arc, _: Option, + _: Arc, ) { } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 455bb42c3e3fb..88e735374cb09 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -49,7 +49,9 @@ pub mod counters; pub mod network_interface; mod payload_manager; mod sender_aware_shuffler; +mod transaction_deduper; mod transaction_shuffler; +mod txn_hash_and_authenticator_deduper; use aptos_metrics_core::IntGauge; pub use consensusdb::create_checkpoint; diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 1a8028651cf43..eab828da754d4 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -9,6 +9,7 @@ use crate::{ monitor, payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, txn_notifier::TxnNotifier, }; @@ -57,6 +58,7 @@ pub struct ExecutionProxy { write_mutex: AsyncMutex, payload_manager: Mutex>>, transaction_shuffler: Mutex>>, + transaction_deduper: Mutex>>, } impl ExecutionProxy { @@ -90,6 +92,7 @@ impl ExecutionProxy { write_mutex: AsyncMutex::new(LogicalTime::new(0, 0)), payload_manager: Mutex::new(None), transaction_shuffler: Mutex::new(None), + transaction_deduper: Mutex::new(None), } } } @@ -117,10 +120,12 @@ impl StateComputer for ExecutionProxy { ); let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); + let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); let txns = payload_manager.get_transactions(block).await?; - let shuffled_txns = txn_shuffler.shuffle(txns); + let deduped_txns = txn_deduper.dedup(txns); + let shuffled_txns = txn_shuffler.shuffle(deduped_txns); let block_gas_limit = self.executor.get_block_gas_limit(); @@ -177,6 +182,7 @@ impl StateComputer for ExecutionProxy { let block_timestamp = finality_proof.commit_info().timestamp_usecs(); let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); + let txn_deduper = self.transaction_deduper.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); let block_gas_limit = self.executor.get_block_gas_limit(); @@ -189,7 +195,8 @@ impl StateComputer for ExecutionProxy { } let signed_txns = payload_manager.get_transactions(block.block()).await?; - let shuffled_txns = txn_shuffler.shuffle(signed_txns); + let deduped_txns = txn_deduper.dedup(signed_txns); + let shuffled_txns = txn_shuffler.shuffle(deduped_txns); txns.extend(block.transactions_to_commit( &self.validators.lock(), @@ -289,6 +296,7 @@ impl StateComputer for ExecutionProxy { payload_manager: Arc, transaction_shuffler: Arc, _block_gas_limit: Option, + transaction_deduper: Arc, ) { *self.validators.lock() = epoch_state .verifier @@ -301,6 +309,7 @@ impl StateComputer for ExecutionProxy { // TODO: Temporarily disable initializing block gas limit and leave it as default None, // until there is a better way to handle the possible panic when executor is initialized. // self.executor.update_block_gas_limit(block_gas_limit); + self.transaction_deduper.lock().replace(transaction_deduper); } // Clears the epoch-specific state. Only a sync_to call is expected before calling new_epoch @@ -313,11 +322,17 @@ impl StateComputer for ExecutionProxy { #[tokio::test] async fn test_commit_sync_race() { - use crate::{error::MempoolError, transaction_shuffler::create_transaction_shuffler}; + use crate::{ + error::MempoolError, transaction_deduper::create_transaction_deduper, + transaction_shuffler::create_transaction_shuffler, + }; use aptos_consensus_notifications::Error; use aptos_types::{ - aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo, - on_chain_config::TransactionShufflerType, transaction::SignedTransaction, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + ledger_info::LedgerInfo, + on_chain_config::{TransactionDeduperType, TransactionShufflerType}, + transaction::SignedTransaction, }; struct RecordedCommit { @@ -424,6 +439,7 @@ async fn test_commit_sync_race() { Arc::new(PayloadManager::DirectMempool), create_transaction_shuffler(TransactionShufflerType::NoShuffling), None, + create_transaction_deduper(TransactionDeduperType::NoDedup), ); executor .commit(&[], generate_li(1, 1), callback.clone()) diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index 8af7c35fed81d..55ea3400f8576 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -5,6 +5,7 @@ use crate::{ error::{QuorumStoreError, StateSyncError}, payload_manager::PayloadManager, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; @@ -78,6 +79,7 @@ pub trait StateComputer: Send + Sync { payload_manager: Arc, transaction_shuffler: Arc, block_gas_limit: Option, + transaction_deduper: Arc, ); // Reconfigure to clear epoch state at end of epoch. diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index eaac348fed730..b03e90f60d7b7 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -8,6 +8,7 @@ use crate::{ payload_manager::PayloadManager, state_replication::{StateComputer, StateComputerCommitCallBackType}, test_utils::mock_storage::MockStorage, + transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::{format_err, Result}; @@ -139,6 +140,7 @@ impl StateComputer for MockStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } @@ -176,6 +178,7 @@ impl StateComputer for EmptyStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } @@ -237,6 +240,7 @@ impl StateComputer for RandomComputeResultStateComputer { _: Arc, _: Arc, _: Option, + _: Arc, ) { } diff --git a/consensus/src/transaction_deduper.rs b/consensus/src/transaction_deduper.rs new file mode 100644 index 0000000000000..24d76f0dd332f --- /dev/null +++ b/consensus/src/transaction_deduper.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper; +use aptos_logger::info; +use aptos_types::{on_chain_config::TransactionDeduperType, transaction::SignedTransaction}; +use std::sync::Arc; + +/// Interface to dedup transactions. The dedup filters duplicate transactions within a block. +pub trait TransactionDeduper: Send + Sync { + fn dedup(&self, txns: Vec) -> Vec; +} + +/// No Op Deduper to maintain backward compatibility +struct NoOpDeduper {} + +impl TransactionDeduper for NoOpDeduper { + fn dedup(&self, txns: Vec) -> Vec { + txns + } +} + +pub fn create_transaction_deduper( + deduper_type: TransactionDeduperType, +) -> Arc { + match deduper_type { + TransactionDeduperType::NoDedup => Arc::new(NoOpDeduper {}), + TransactionDeduperType::TxnHashAndAuthenticatorV1 => { + info!("Using simple hash set transaction deduper"); + Arc::new(TxnHashAndAuthenticatorDeduper::new()) + }, + } +} diff --git a/consensus/src/txn_hash_and_authenticator_deduper.rs b/consensus/src/txn_hash_and_authenticator_deduper.rs new file mode 100644 index 0000000000000..38ec5aeef421b --- /dev/null +++ b/consensus/src/txn_hash_and_authenticator_deduper.rs @@ -0,0 +1,362 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + counters::{TXN_DEDUP_FILTERED, TXN_DEDUP_SECONDS}, + transaction_deduper::TransactionDeduper, +}; +use aptos_types::transaction::SignedTransaction; +use rayon::prelude::*; +use std::collections::{HashMap, HashSet}; + +/// An implementation of TransactionDeduper. Duplicate filtering is done using the pair +/// (raw_txn.hash(), authenticator). Both the hash and signature are required because dedup +/// happens before signatures are verified and transaction prologue is checked. (So, e.g., a bad +/// transaction could contain a txn and signature that are unrelated.) If the checks are done +/// beforehand only one of the txn hash or signature would be required. +/// +/// The implementation is written to avoid and/or parallelize the most expensive operations. Below +/// are the steps: +/// 1. Mark possible duplicates (sequential): Using a helper HashMap, mark transactions with 2+ +/// (sender, seq_no) pairs as possible duplicates. If no possible duplicates, return the original +/// transactions. +/// 2. Calculate txn hashes (parallel): For all possible duplicates, calculate the txn hash. This +/// is an expensive operation. +/// 3. Filter duplicates (sequential): Using a helper HashSet with the txn hashes calculated above +/// and signatures, filter actual duplicate transactions. +/// +/// Possible future optimizations: +/// a. Note the possible duplicates in Step 1 are independent of each other, so they could be +/// grouped independently and run in parallel in Step 3. +/// b. Txn hashes are calculated at many places within a validator. A per-txn hash cache could speed +/// up dedup or later operations. +/// c. If signature verification is moved to before dedup, then only the signature has to be matched +/// for duplicates and not the hash. +pub(crate) struct TxnHashAndAuthenticatorDeduper {} + +impl TransactionDeduper for TxnHashAndAuthenticatorDeduper { + fn dedup(&self, transactions: Vec) -> Vec { + let _timer = TXN_DEDUP_SECONDS.start_timer(); + let mut seen = HashMap::new(); + let mut is_possible_duplicate = false; + let mut possible_duplicates = vec![false; transactions.len()]; + for (i, txn) in transactions.iter().enumerate() { + match seen.get(&(txn.sender(), txn.sequence_number())) { + None => { + seen.insert((txn.sender(), txn.sequence_number()), i); + }, + Some(first_index) => { + is_possible_duplicate = true; + possible_duplicates[*first_index] = true; + possible_duplicates[i] = true; + }, + } + } + if !is_possible_duplicate { + TXN_DEDUP_FILTERED.observe(0 as f64); + return transactions; + } + + let hash_and_authenticators: Vec<_> = possible_duplicates + .into_par_iter() + .zip(&transactions) + .with_min_len(25) + .map(|(need_hash, txn)| match need_hash { + true => Some((txn.clone().committed_hash(), txn.authenticator())), + false => None, + }) + .collect(); + + // TODO: Possibly parallelize. See struct comment. + let mut seen_hashes = HashSet::new(); + let mut num_duplicates: usize = 0; + let filtered: Vec<_> = hash_and_authenticators + .into_iter() + .zip(transactions) + .filter_map(|(maybe_hash, txn)| match maybe_hash { + None => Some(txn), + Some(hash_and_authenticator) => { + if seen_hashes.insert(hash_and_authenticator) { + Some(txn) + } else { + num_duplicates += 1; + None + } + }, + }) + .collect(); + + TXN_DEDUP_FILTERED.observe(num_duplicates as f64); + filtered + } +} + +impl TxnHashAndAuthenticatorDeduper { + pub fn new() -> Self { + Self {} + } +} + +#[cfg(test)] +mod tests { + use crate::{ + transaction_deduper::TransactionDeduper, + txn_hash_and_authenticator_deduper::TxnHashAndAuthenticatorDeduper, + }; + use aptos_cached_packages::aptos_stdlib; + use aptos_crypto::ed25519::{Ed25519PrivateKey, Ed25519PublicKey}; + use aptos_keygen::KeyGen; + use aptos_types::{ + chain_id::ChainId, + transaction::{RawTransaction, Script, SignedTransaction, TransactionPayload}, + }; + use move_core_types::account_address::AccountAddress; + use std::time::Instant; + + struct Account { + addr: AccountAddress, + /// The current private key for this account. + pub privkey: Ed25519PrivateKey, + /// The current public key for this account. + pub pubkey: Ed25519PublicKey, + } + + impl Account { + pub fn new() -> Self { + let (privkey, pubkey) = KeyGen::from_os_rng().generate_ed25519_keypair(); + Self::with_keypair(privkey, pubkey) + } + + pub fn with_keypair(privkey: Ed25519PrivateKey, pubkey: Ed25519PublicKey) -> Self { + let addr = aptos_types::account_address::from_public_key(&pubkey); + Account { + addr, + privkey, + pubkey, + } + } + } + + fn raw_txn( + payload: TransactionPayload, + sender: AccountAddress, + seq_num: u64, + gas_unit_price: u64, + ) -> RawTransaction { + RawTransaction::new( + sender, + seq_num, + payload, + 500_000, + gas_unit_price, + 0, + ChainId::new(10), + ) + } + + fn empty_txn(sender: AccountAddress, seq_num: u64, gas_unit_price: u64) -> RawTransaction { + let payload = TransactionPayload::Script(Script::new(vec![], vec![], vec![])); + raw_txn(payload, sender, seq_num, gas_unit_price) + } + + fn peer_to_peer_txn( + sender: AccountAddress, + receiver: AccountAddress, + seq_num: u64, + gas_unit_price: u64, + ) -> RawTransaction { + let payload = aptos_stdlib::aptos_coin_transfer(receiver, 1); + raw_txn(payload, sender, seq_num, gas_unit_price) + } + + fn block(refs: Vec<&SignedTransaction>) -> Vec { + refs.into_iter().cloned().collect() + } + + #[test] + fn test_single_txn() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = vec![txn]; + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + } + + #[test] + fn test_single_duplicate() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = block(vec![&txn, &txn]); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + } + + #[test] + fn test_repeated_sequence_number() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + + let txn_0a = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner(); + // Different txn, same sender and sequence number. Should not be filtered. + let txn_0b = peer_to_peer_txn(sender.addr, receiver.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns = block(vec![&txn_0a, &txn_0b, &txn_0a]); + let expected = block(vec![&txn_0a, &txn_0b]); + let deduped_txns = deduper.dedup(txns); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + } + + #[test] + fn test_bad_signer() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let bad_signer = Account::new(); + + // Txn signed by a bad signer (not the sender) + let txn_0a = empty_txn(sender.addr, 0, 100) + .sign(&bad_signer.privkey, bad_signer.pubkey.clone()) + .unwrap() + .into_inner(); + // Same txn, but signed by the correct signer (sender). Should not be filtered. + let txn_0b = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner(); + let txns = block(vec![&txn_0a, &txn_0b]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + } + + // The perf tests are simple micro-benchmarks and just output results without checking for regressions + static PERF_TXN_PER_BLOCK: usize = 10_000; + + fn measure_dedup_time( + deduper: TxnHashAndAuthenticatorDeduper, + txns: Vec, + ) -> f64 { + let start = Instant::now(); + let mut iterations = 0; + loop { + deduper.dedup(txns.clone()); + iterations += 1; + if iterations % 100 == 0 && start.elapsed().as_millis() > 2000 { + break; + } + } + let elapsed = start.elapsed(); + println!( + "elapsed: {}, iterations: {}, time per iteration: {}", + elapsed.as_secs_f64(), + iterations, + elapsed.as_secs_f64() / iterations as f64, + ); + elapsed.as_secs_f64() / iterations as f64 + } + + #[test] + fn test_performance_unique_empty_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txns: Vec<_> = (0..PERF_TXN_PER_BLOCK) + .into_iter() + .map(|i| { + empty_txn(sender.addr, i as u64, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner() + }) + .collect(); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_duplicate_empty_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let txn = empty_txn(sender.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns: Vec<_> = std::iter::repeat(txn.clone()) + .take(PERF_TXN_PER_BLOCK) + .collect(); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_unique_p2p_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + let txns: Vec<_> = (0..PERF_TXN_PER_BLOCK) + .into_iter() + .map(|i| { + peer_to_peer_txn(sender.addr, receiver.addr, i as u64, 100) + .sign(&sender.privkey, sender.pubkey.clone()) + .unwrap() + .into_inner() + }) + .collect(); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(txns.len(), deduped_txns.len()); + assert_eq!(txns, deduped_txns); + + measure_dedup_time(deduper, txns); + } + + #[test] + fn test_performance_duplicate_p2p_txns() { + let deduper = TxnHashAndAuthenticatorDeduper::new(); + + let sender = Account::new(); + let receiver = Account::new(); + let txn = peer_to_peer_txn(sender.addr, receiver.addr, 0, 100) + .sign(&sender.privkey, sender.pubkey) + .unwrap() + .into_inner(); + let txns: Vec<_> = std::iter::repeat(txn.clone()) + .take(PERF_TXN_PER_BLOCK) + .collect(); + let expected = block(vec![&txn]); + let deduped_txns = deduper.dedup(txns.clone()); + assert_eq!(expected.len(), deduped_txns.len()); + assert_eq!(expected, deduped_txns); + + measure_dedup_time(deduper, txns); + } +} diff --git a/types/src/on_chain_config/execution_config.rs b/types/src/on_chain_config/execution_config.rs index 3b031ee39f40d..642bf469d3502 100644 --- a/types/src/on_chain_config/execution_config.rs +++ b/types/src/on_chain_config/execution_config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub enum OnChainExecutionConfig { V1(ExecutionConfigV1), V2(ExecutionConfigV2), + V3(ExecutionConfigV3), } /// The public interface that exposes all values with safe fallback. @@ -19,6 +20,7 @@ impl OnChainExecutionConfig { match &self { OnChainExecutionConfig::V1(config) => config.transaction_shuffler_type.clone(), OnChainExecutionConfig::V2(config) => config.transaction_shuffler_type.clone(), + OnChainExecutionConfig::V3(config) => config.transaction_shuffler_type.clone(), } } @@ -27,6 +29,16 @@ impl OnChainExecutionConfig { match &self { OnChainExecutionConfig::V1(_config) => None, OnChainExecutionConfig::V2(config) => config.block_gas_limit, + OnChainExecutionConfig::V3(config) => config.block_gas_limit, + } + } + + /// The type of the transaction deduper being used. + pub fn transaction_deduper_type(&self) -> TransactionDeduperType { + match &self { + OnChainExecutionConfig::V1(_config) => TransactionDeduperType::NoDedup, + OnChainExecutionConfig::V2(_config) => TransactionDeduperType::NoDedup, + OnChainExecutionConfig::V3(config) => config.transaction_deduper_type.clone(), } } } @@ -84,6 +96,23 @@ impl Default for ExecutionConfigV2 { } } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct ExecutionConfigV3 { + pub transaction_shuffler_type: TransactionShufflerType, + pub block_gas_limit: Option, + pub transaction_deduper_type: TransactionDeduperType, +} + +impl Default for ExecutionConfigV3 { + fn default() -> Self { + Self { + transaction_shuffler_type: TransactionShufflerType::NoShuffling, + block_gas_limit: None, + transaction_deduper_type: TransactionDeduperType::NoDedup, + } + } +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it pub enum TransactionShufflerType { @@ -91,6 +120,13 @@ pub enum TransactionShufflerType { SenderAwareV1(u32), } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it +pub enum TransactionDeduperType { + NoDedup, + TxnHashAndAuthenticatorV1, +} + #[cfg(test)] mod test { use super::*; diff --git a/types/src/on_chain_config/mod.rs b/types/src/on_chain_config/mod.rs index 10f39d1dbd5a9..e319a42ba4762 100644 --- a/types/src/on_chain_config/mod.rs +++ b/types/src/on_chain_config/mod.rs @@ -40,7 +40,8 @@ pub use self::{ ProposerElectionType, }, execution_config::{ - ExecutionConfigV1, ExecutionConfigV2, OnChainExecutionConfig, TransactionShufflerType, + ExecutionConfigV1, ExecutionConfigV2, OnChainExecutionConfig, TransactionDeduperType, + TransactionShufflerType, }, gas_schedule::{GasSchedule, GasScheduleV2, StorageGasSchedule}, timed_features::{TimedFeatureFlag, TimedFeatureOverride, TimedFeatures}, diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index 1d14060e4bbbb..67605578c1496 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -555,6 +555,10 @@ impl SignedTransaction { self.authenticator.clone() } + pub fn authenticator_ref(&self) -> &TransactionAuthenticator { + &self.authenticator + } + pub fn sender(&self) -> AccountAddress { self.raw_txn.sender } @@ -563,6 +567,10 @@ impl SignedTransaction { self.raw_txn } + pub fn raw_transaction_ref(&self) -> &RawTransaction { + &self.raw_txn + } + pub fn sequence_number(&self) -> u64 { self.raw_txn.sequence_number }