diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index bce352b1621dc..a1dbe0033089d 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -51,6 +51,36 @@ impl fmt::Display for TransactionSummary { } } +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize, Hash, Ord, PartialOrd)] +pub struct TxnSummaryWithExpiration { + pub sender: AccountAddress, + pub sequence_number: u64, + pub expiration_timestamp_secs: u64, + pub hash: HashValue, +} + +impl TxnSummaryWithExpiration { + pub fn new( + sender: AccountAddress, + sequence_number: u64, + expiration_timestamp_secs: u64, + hash: HashValue, + ) -> Self { + Self { + sender, + sequence_number, + expiration_timestamp_secs, + hash, + } + } +} + +impl fmt::Display for TxnSummaryWithExpiration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:{}", self.sender, self.sequence_number,) + } +} + #[derive(Clone)] pub struct TransactionInProgress { pub gas_unit_price: u64, diff --git a/consensus/consensus-types/src/request_response.rs b/consensus/consensus-types/src/request_response.rs index 6011a3f8cc88e..ef9e9485a6a6b 100644 --- a/consensus/consensus-types/src/request_response.rs +++ b/consensus/consensus-types/src/request_response.rs @@ -4,7 +4,7 @@ use crate::common::{Payload, PayloadFilter}; use anyhow::Result; use futures::channel::oneshot; -use std::{fmt, fmt::Formatter}; +use std::{fmt, fmt::Formatter, time::Duration}; pub enum GetPayloadCommand { /// Request to pull block to submit to consensus. @@ -25,6 +25,8 @@ pub enum GetPayloadCommand { PayloadFilter, // callback to respond to oneshot::Sender>, + // block timestamp + Duration, ), } @@ -40,11 +42,12 @@ impl fmt::Display for GetPayloadCommand { return_non_full, excluded, _, + block_timestamp, ) => { write!( f, - "GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}]", - max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded + "GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]", + max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp ) }, } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index f2fe23ea42524..2a81472d7cf14 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -268,6 +268,7 @@ impl DagDriver { false, 0, 0.0, + self.time_service.now_unix_time(), ) .await { diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index b2a95e88a547a..c6a5545b9ac0c 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -376,6 +376,7 @@ impl ProposalGenerator { pending_ordering, pending_blocks.len(), max_fill_fraction, + timestamp, ) .await .context("Fail to retrieve payload")?; diff --git a/consensus/src/payload_client/mixed.rs b/consensus/src/payload_client/mixed.rs index 72a458c5481d2..3b7ede4d813f0 100644 --- a/consensus/src/payload_client/mixed.rs +++ b/consensus/src/payload_client/mixed.rs @@ -78,6 +78,7 @@ impl PayloadClient for MixedPayloadClient { pending_ordering: bool, pending_uncommitted_blocks: usize, recent_max_fill_fraction: f32, + block_timestamp: Duration, ) -> anyhow::Result<(Vec, Payload), QuorumStoreError> { // Pull validator txns first. let validator_txn_pull_timer = Instant::now(); @@ -124,6 +125,7 @@ impl PayloadClient for MixedPayloadClient { pending_ordering, pending_uncommitted_blocks, recent_max_fill_fraction, + block_timestamp, ) .await?; @@ -165,6 +167,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { false, 0, 0., + aptos_infallible::duration_since_epoch(), ) .await .unwrap() @@ -189,6 +192,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { false, 0, 0., + aptos_infallible::duration_since_epoch(), ) .await .unwrap() @@ -213,6 +217,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { false, 0, 0., + aptos_infallible::duration_since_epoch(), ) .await .unwrap() @@ -237,6 +242,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { false, 0, 0., + aptos_infallible::duration_since_epoch(), ) .await .unwrap() @@ -279,6 +285,7 @@ async fn mixed_payload_client_should_respect_validator_txn_feature_flag() { false, 0, 0., + aptos_infallible::duration_since_epoch(), ) .await .unwrap() diff --git a/consensus/src/payload_client/mod.rs b/consensus/src/payload_client/mod.rs index 61cc98a5ba036..cef8293d7500f 100644 --- a/consensus/src/payload_client/mod.rs +++ b/consensus/src/payload_client/mod.rs @@ -28,6 +28,7 @@ pub trait PayloadClient: Send + Sync { pending_ordering: bool, pending_uncommitted_blocks: usize, recent_max_fill_fraction: f32, + block_timestamp: Duration, ) -> anyhow::Result<(Vec, Payload), QuorumStoreError>; fn trace_payloads(&self) {} diff --git a/consensus/src/payload_client/user/mod.rs b/consensus/src/payload_client/user/mod.rs index a19f77e3dfc11..3f5c4ffde1a47 100644 --- a/consensus/src/payload_client/user/mod.rs +++ b/consensus/src/payload_client/user/mod.rs @@ -27,6 +27,7 @@ pub trait UserPayloadClient: Send + Sync { pending_ordering: bool, pending_uncommitted_blocks: usize, recent_max_fill_fraction: f32, + block_timestamp: Duration, ) -> anyhow::Result; } @@ -59,6 +60,7 @@ impl UserPayloadClient for DummyClient { _pending_ordering: bool, _pending_uncommitted_blocks: usize, _recent_max_fill_fraction: f32, + _block_timestamp: Duration, ) -> anyhow::Result { let timer = Instant::now(); let mut nxt_txn_idx = 0; diff --git a/consensus/src/payload_client/user/quorum_store_client.rs b/consensus/src/payload_client/user/quorum_store_client.rs index 1cd3caefd50aa..576dfea4c7259 100644 --- a/consensus/src/payload_client/user/quorum_store_client.rs +++ b/consensus/src/payload_client/user/quorum_store_client.rs @@ -52,6 +52,7 @@ impl QuorumStoreClient { max_inline_bytes: u64, return_non_full: bool, exclude_payloads: PayloadFilter, + block_timestamp: Duration, ) -> anyhow::Result { let (callback, callback_rcv) = oneshot::channel(); let req = GetPayloadCommand::GetPayloadRequest( @@ -63,6 +64,7 @@ impl QuorumStoreClient { return_non_full, exclude_payloads.clone(), callback, + block_timestamp, ); // send to shared mempool self.consensus_to_quorum_store_sender @@ -99,6 +101,7 @@ impl UserPayloadClient for QuorumStoreClient { pending_ordering: bool, pending_uncommitted_blocks: usize, recent_max_fill_fraction: f32, + block_timestamp: Duration, ) -> anyhow::Result { let return_non_full = recent_max_fill_fraction < self.wait_for_full_blocks_above_recent_fill_threshold @@ -126,6 +129,7 @@ impl UserPayloadClient for QuorumStoreClient { max_inline_bytes, return_non_full || return_empty || done, exclude.clone(), + block_timestamp, ) .await?; if payload.is_empty() && !return_empty && !done { diff --git a/consensus/src/quorum_store/direct_mempool_quorum_store.rs b/consensus/src/quorum_store/direct_mempool_quorum_store.rs index bdabd98fed099..3c7cd599da73a 100644 --- a/consensus/src/quorum_store/direct_mempool_quorum_store.rs +++ b/consensus/src/quorum_store/direct_mempool_quorum_store.rs @@ -146,6 +146,7 @@ impl DirectMempoolQuorumStore { return_non_full, payload_filter, callback, + _block_timestamp, ) => { self.handle_block_request( max_txns_after_filtering, diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index 6def979c567ee..8a17ef4f9e132 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -11,7 +11,7 @@ use crate::{ }, }; use aptos_consensus_types::{ - common::{Payload, PayloadFilter, ProofWithData, TransactionSummary}, + common::{Payload, PayloadFilter, ProofWithData, TxnSummaryWithExpiration}, proof_of_store::{BatchInfo, ProofOfStore, ProofOfStoreMsg}, request_response::{GetPayloadCommand, GetPayloadResponse}, }; @@ -29,7 +29,7 @@ use std::{ #[derive(Debug)] pub enum ProofManagerCommand { ReceiveProofs(ProofOfStoreMsg), - ReceiveBatches(Vec<(BatchInfo, Vec)>), + ReceiveBatches(Vec<(BatchInfo, Vec)>), CommitNotification(u64, Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } @@ -168,7 +168,7 @@ impl ProofManager { pub(crate) fn receive_batches( &mut self, - batch_summaries: Vec<(BatchInfo, Vec)>, + batch_summaries: Vec<(BatchInfo, Vec)>, ) { if self.allow_batches_without_pos_in_proposal { let batches = batch_summaries @@ -212,6 +212,7 @@ impl ProofManager { return_non_full, filter, callback, + block_timestamp, ) => { let excluded_batches: HashSet<_> = match filter { PayloadFilter::Empty => HashSet::new(), @@ -228,6 +229,7 @@ impl ProofManager { max_txns_after_filtering, max_bytes, return_non_full, + block_timestamp, ); counters::NUM_BATCHES_WITHOUT_PROOF_OF_STORE.observe(self.batch_queue.len() as f64); diff --git a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs index 35fdc3c7ed97a..03d9aa45cae37 100644 --- a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs +++ b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs @@ -38,6 +38,7 @@ async fn test_block_request_no_txns() { true, PayloadFilter::DirectMempool(vec![]), consensus_callback, + aptos_infallible::duration_since_epoch(), )) .unwrap(); diff --git a/consensus/src/quorum_store/tests/proof_manager_test.rs b/consensus/src/quorum_store/tests/proof_manager_test.rs index 8741dceb03d08..3424bccb45441 100644 --- a/consensus/src/quorum_store/tests/proof_manager_test.rs +++ b/consensus/src/quorum_store/tests/proof_manager_test.rs @@ -62,6 +62,7 @@ async fn get_proposal( true, PayloadFilter::InQuorumStore(filter_set), callback_tx, + aptos_infallible::duration_since_epoch(), ); proof_manager.handle_proposal_request(req); let GetPayloadResponse::GetPayloadResponse(payload) = callback_rx.await.unwrap().unwrap(); diff --git a/consensus/src/quorum_store/tests/utils.rs b/consensus/src/quorum_store/tests/utils.rs index ea4b5a1ab1cf4..6214bb7db6603 100644 --- a/consensus/src/quorum_store/tests/utils.rs +++ b/consensus/src/quorum_store/tests/utils.rs @@ -3,13 +3,13 @@ use crate::quorum_store::utils::ProofQueue; use aptos_consensus_types::{ - common::TransactionSummary, + common::TxnSummaryWithExpiration, proof_of_store::{BatchId, BatchInfo, ProofOfStore}, }; use aptos_crypto::HashValue; use aptos_types::{aggregate_signature::AggregateSignature, PeerId}; use maplit::hashset; -use std::collections::HashSet; +use std::{collections::HashSet, time::Duration}; /// Return a ProofOfStore with minimal fields used by ProofQueue tests. fn proof_of_store( @@ -61,7 +61,14 @@ fn test_proof_queue_sorting() { } // Expect: [600, 300] - let (pulled, num_unique_txns, _) = proof_queue.pull_proofs(&hashset![], 4, 2, 2, true); + let (pulled, num_unique_txns, _) = proof_queue.pull_proofs( + &hashset![], + 4, + 2, + 2, + true, + aptos_infallible::duration_since_epoch(), + ); let mut count_author_0 = 0; let mut count_author_1 = 0; let mut prev: Option<&ProofOfStore> = None; @@ -83,7 +90,14 @@ fn test_proof_queue_sorting() { assert_eq!(num_unique_txns, 2); // Expect: [600, 500, 300, 100] - let (pulled, num_unique_txns, _) = proof_queue.pull_proofs(&hashset![], 6, 4, 4, true); + let (pulled, num_unique_txns, _) = proof_queue.pull_proofs( + &hashset![], + 6, + 4, + 4, + true, + aptos_infallible::duration_since_epoch(), + ); let mut count_author_0 = 0; let mut count_author_1 = 0; let mut prev: Option<&ProofOfStore> = None; @@ -109,28 +123,59 @@ fn test_proof_queue_sorting() { fn test_proof_calculate_remaining_txns_and_proofs() { let my_peer_id = PeerId::random(); let mut proof_queue = ProofQueue::new(my_peer_id); - let now = aptos_infallible::duration_since_epoch().as_micros() as u64; + let now_in_secs = aptos_infallible::duration_since_epoch().as_secs() as u64; + let now_in_usecs = aptos_infallible::duration_since_epoch().as_micros() as u64; let author_0 = PeerId::random(); let author_1 = PeerId::random(); let txns = vec![ - TransactionSummary::new(PeerId::ONE, 0, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 1, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 2, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 3, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 0, now_in_secs + 1, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 1, now_in_secs + 1, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 2, now_in_secs + 1, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 3, now_in_secs + 1, HashValue::zero()), ]; let author_0_batches = vec![ - proof_of_store(author_0, BatchId::new_for_test(0), 100, now + 50000), - proof_of_store(author_0, BatchId::new_for_test(1), 200, now + 70000), - proof_of_store(author_0, BatchId::new_for_test(2), 50, now + 20000), - proof_of_store(author_0, BatchId::new_for_test(3), 300, now + 10000), + proof_of_store( + author_0, + BatchId::new_for_test(0), + 100, + now_in_usecs + 50000, + ), + proof_of_store( + author_0, + BatchId::new_for_test(1), + 200, + now_in_usecs + 70000, + ), + proof_of_store(author_0, BatchId::new_for_test(2), 50, now_in_usecs + 20000), + proof_of_store( + author_0, + BatchId::new_for_test(3), + 300, + now_in_usecs + 10000, + ), ]; let author_1_batches = vec![ - proof_of_store(author_1, BatchId::new_for_test(4), 500, now + 20000), - proof_of_store(author_1, BatchId::new_for_test(5), 400, now + 30000), - proof_of_store(author_1, BatchId::new_for_test(6), 600, now + 50000), - proof_of_store(author_1, BatchId::new_for_test(7), 50, now + 60000), + proof_of_store( + author_1, + BatchId::new_for_test(4), + 500, + now_in_usecs + 20000, + ), + proof_of_store( + author_1, + BatchId::new_for_test(5), + 400, + now_in_usecs + 30000, + ), + proof_of_store( + author_1, + BatchId::new_for_test(6), + 600, + now_in_usecs + 50000, + ), + proof_of_store(author_1, BatchId::new_for_test(7), 50, now_in_usecs + 60000), ]; let info_1 = author_0_batches[0].info().clone(); @@ -145,30 +190,35 @@ fn test_proof_calculate_remaining_txns_and_proofs() { proof_queue.add_batch_summaries(vec![(info_1.clone(), vec![txns[0]])]); // batch_summaries: [1 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (0, 0)); + assert_eq!(proof_queue.batch_summaries_len(), 1); proof_queue.push(author_0_batches[0].clone()); // txns: [txn_0] // proofs: [1] // batch_summaries: [1 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (1, 1)); + assert_eq!(proof_queue.batch_summaries_len(), 1); proof_queue.push(author_0_batches[1].clone()); // txns: [txn_0] + txns(proof_2) // proofs: [1, 2] // batch_summaries: [1 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 1); proof_queue.add_batch_summaries(vec![(info_2, vec![txns[1]])]); // txns: [txn_0, txn_1] // proofs: [1, 2] // batch_summaries: [1 -> txn_0, 2 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 2); proof_queue.add_batch_summaries(vec![(info_3.clone(), vec![txns[0]])]); // txns: [txn_0, txn_1] // proofs: [1, 2] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 3); // Adding the batch again shouldn't have an effect proof_queue.add_batch_summaries(vec![(info_3.clone(), vec![txns[0]])]); @@ -176,12 +226,14 @@ fn test_proof_calculate_remaining_txns_and_proofs() { // proofs: [1, 2] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 3); proof_queue.push(author_0_batches[2].clone()); // txns: [txn_0, txn_1] // proofs: [1, 2, 3] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 3)); + assert_eq!(proof_queue.batch_summaries_len(), 3); // Adding the batch again shouldn't have an effect proof_queue.add_batch_summaries(vec![(info_3.clone(), vec![txns[0]])]); @@ -189,49 +241,57 @@ fn test_proof_calculate_remaining_txns_and_proofs() { // proofs: [1, 2, 3] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 3)); + assert_eq!(proof_queue.batch_summaries_len(), 3); proof_queue.push(author_1_batches[0].clone()); // txns: [txn_0, txn_1] + txns(proof_5) // proofs: [1, 2, 3, 5] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (3, 4)); + assert_eq!(proof_queue.batch_summaries_len(), 3); proof_queue.add_batch_summaries(vec![(info_5, vec![txns[1]])]); // txns: [txn_0, txn_1] // proofs: [1, 2, 3, 5] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0, 5 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 4)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.add_batch_summaries(vec![(info_4, vec![txns[2]])]); // txns: [txn_0, txn_1] // proofs: [1, 2, 3, 5] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0, 4 -> txn_2, 5 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 4)); + assert_eq!(proof_queue.batch_summaries_len(), 5); proof_queue.push(author_0_batches[3].clone()); // txns: [txn_0, txn_1, txn_2] // proofs: [1, 2, 3, 4, 5] // batch_summaries: [1 -> txn_0, 2 -> txn_1, 3 -> txn_0, 4 -> txn_2, 5 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (3, 5)); + assert_eq!(proof_queue.batch_summaries_len(), 5); proof_queue.mark_committed(vec![info_1.clone()]); // txns: [txn_0, txn_1, txn_2] // proofs: [2, 3, 4, 5] // batch_summaries: [2 -> txn_1, 3 -> txn_0, 4 -> txn_2, 5 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (3, 4)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.push(author_1_batches[1].clone()); // txns: [txn_0, txn_1, txn_2] + txns(proof_6) // proofs: [2, 3, 4, 5, 6] // batch_summaries: [2 -> txn_1, 3 -> txn_0, 4 -> txn_2, 5 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (4, 5)); + assert_eq!(proof_queue.batch_summaries_len(), 4); - proof_queue.handle_updated_block_timestamp(now + 20000); + proof_queue.handle_updated_block_timestamp(now_in_usecs + 20000); // Expires info_3, info_4, info_5 // txns: [txn_1] + txns(proof_6) // proofs: [2, 6] // batch_summaries: [2 -> txn_1] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 1); // Adding an expired batch again proof_queue.add_batch_summaries(vec![(info_3, vec![txns[0]])]); @@ -239,6 +299,7 @@ fn test_proof_calculate_remaining_txns_and_proofs() { // proofs: [2, 6] // batch_summaries: [2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 2); // Adding an expired proof again. Should have no effect proof_queue.push(author_0_batches[2].clone()); @@ -246,90 +307,150 @@ fn test_proof_calculate_remaining_txns_and_proofs() { // proofs: [2, 6] // batch_summaries: [2 -> txn_1, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 2); proof_queue.add_batch_summaries(vec![(info_7, vec![txns[3]])]); // txns: [txn_1] + txns(proof_6) // proofs: [2, 6] // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 3); - proof_queue.handle_updated_block_timestamp(now + 30000); + proof_queue.handle_updated_block_timestamp(now_in_usecs + 30000); // Expires info_6 // txns: [txn_1] // proofs: [2] // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (1, 1)); + assert_eq!(proof_queue.batch_summaries_len(), 3); proof_queue.add_batch_summaries(vec![(info_6, vec![txns[0]])]); // Expired batch not added to batch summaries // txns: [txn_1] // proofs: [2] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (1, 1)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.push(author_1_batches[2].clone()); // txns: [txn_1, txn_3] // proofs: [2, 7] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.push(author_1_batches[3].clone()); // txns: [txn_1, txn_3] + txns(proof_8) // proofs: [2, 7, 8] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (3, 3)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.mark_committed(vec![info_8.clone()]); // txns: [txn_1, txn_3] // proofs: [2, 7] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 4); proof_queue.add_batch_summaries(vec![(info_8, vec![txns[0]])]); // Committed batch not added to batch summaries // txns: [txn_1, txn_3] // proofs: [2, 7] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0, 8 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 5); proof_queue.push(author_1_batches[3].clone()); // Committed proof added again. Should have no effect // txns: [txn_1, txn_3] // proofs: [2, 7, 8] - // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0] + // batch_summaries: [2 -> txn_1, 7 -> txn_3, 3 -> txn_0, 6 -> txn_0, 8 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (2, 2)); + assert_eq!(proof_queue.batch_summaries_len(), 5); - proof_queue.handle_updated_block_timestamp(now + 70000); + proof_queue.handle_updated_block_timestamp(now_in_usecs + 70000); // Expires info_2, info_7 // txns: [] // proofs: [] - // batch_summaries: [3 -> txn_0] + // batch_summaries: [3 -> txn_0, 6 -> txn_0, 8 -> txn_0] assert_eq!(proof_queue.remaining_txns_and_proofs(), (0, 0)); + assert_eq!(proof_queue.batch_summaries_len(), 3); } #[test] fn test_proof_pull_proofs_with_duplicates() { let my_peer_id = PeerId::random(); let mut proof_queue = ProofQueue::new(my_peer_id); - let now = aptos_infallible::duration_since_epoch().as_micros() as u64; - + let now_in_secs = aptos_infallible::duration_since_epoch().as_secs() as u64; + let now_in_usecs = now_in_secs * 1_000_000; let txns = vec![ - TransactionSummary::new(PeerId::ONE, 0, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 1, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 2, HashValue::zero()), - TransactionSummary::new(PeerId::ONE, 3, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 0, now_in_secs + 2, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 1, now_in_secs + 1, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 2, now_in_secs + 3, HashValue::zero()), + TxnSummaryWithExpiration::new(PeerId::ONE, 3, now_in_secs + 4, HashValue::zero()), ]; let author_0 = PeerId::random(); let author_1 = PeerId::random(); let author_0_batches = vec![ - proof_of_store(author_0, BatchId::new_for_test(0), 100, now + 5000), - proof_of_store(author_0, BatchId::new_for_test(1), 200, now + 5000), - proof_of_store(author_0, BatchId::new_for_test(2), 50, now + 5000), - proof_of_store(author_0, BatchId::new_for_test(3), 300, now + 5000), + proof_of_store( + author_0, + BatchId::new_for_test(0), + 100, + now_in_usecs + 1_100_000, + ), + proof_of_store( + author_0, + BatchId::new_for_test(1), + 200, + now_in_usecs + 3_000_000, + ), + proof_of_store( + author_0, + BatchId::new_for_test(2), + 50, + now_in_usecs + 5_000_000, + ), + proof_of_store( + author_0, + BatchId::new_for_test(3), + 300, + now_in_usecs + 4_000_000, + ), + ]; + + let author_1_batches = vec![ + proof_of_store( + author_1, + BatchId::new_for_test(4), + 500, + now_in_usecs + 4_000_000, + ), + proof_of_store( + author_1, + BatchId::new_for_test(5), + 400, + now_in_usecs + 2_500_000, + ), + proof_of_store( + author_1, + BatchId::new_for_test(6), + 600, + now_in_usecs + 3_500_000, + ), + proof_of_store( + author_1, + BatchId::new_for_test(7), + 50, + now_in_usecs + 4_500_000, + ), ]; + let info_0 = author_0_batches[0].info().clone(); + let info_7 = author_1_batches[2].info().clone(); + proof_queue.add_batch_summaries(vec![(author_0_batches[0].info().clone(), vec![txns[0]])]); proof_queue.add_batch_summaries(vec![(author_0_batches[1].info().clone(), vec![txns[1]])]); proof_queue.add_batch_summaries(vec![(author_0_batches[2].info().clone(), vec![txns[2]])]); @@ -339,12 +460,6 @@ fn test_proof_pull_proofs_with_duplicates() { proof_queue.push(batch); } - let author_1_batches = vec![ - proof_of_store(author_1, BatchId::new_for_test(4), 500, now + 5000), - proof_of_store(author_1, BatchId::new_for_test(5), 400, now + 5000), - proof_of_store(author_1, BatchId::new_for_test(6), 600, now + 5000), - proof_of_store(author_1, BatchId::new_for_test(7), 50, now + 5000), - ]; proof_queue.add_batch_summaries(vec![(author_1_batches[0].info().clone(), vec![txns[1]])]); proof_queue.add_batch_summaries(vec![(author_1_batches[1].info().clone(), vec![txns[2]])]); proof_queue.add_batch_summaries(vec![(author_1_batches[2].info().clone(), vec![txns[3]])]); @@ -355,9 +470,16 @@ fn test_proof_pull_proofs_with_duplicates() { } assert_eq!(proof_queue.remaining_txns_and_proofs(), (4, 8)); - let result = proof_queue.pull_proofs(&hashset![], 8, 4, 3000, true); - assert!(result.0.len() >= 4); - assert!(result.0.len() <= 8); + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs), + ); + assert_eq!(result.1, 4); + let mut pulled_txns = HashSet::new(); for proof in result.0 { match proof.batch_id() { @@ -372,13 +494,133 @@ fn test_proof_pull_proofs_with_duplicates() { _ => panic!("Unexpected batch id"), }; } - assert!(pulled_txns.len() == 4); - assert!(result.1 == 4); - assert!( - proof_queue - .pull_proofs(&hashset![info_0], 8, 4, 400, true) - .0 - .len() - == 7 + assert_eq!(pulled_txns.len(), 4); + + let result = proof_queue.pull_proofs( + &hashset![info_0.clone()], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs), + ); + assert_eq!(result.0.len(), 7); + // filtered_txns: txn_0 (included in excluded batches) + assert_eq!(result.1, 3); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 500_000); + // Nothing changes + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 5, + 400, + true, + Duration::from_micros(now_in_usecs + 500_100), + ); + assert_eq!(result.1, 4); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 1_000_000); + // txn_1 expired + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 5, + 400, + true, + Duration::from_micros(now_in_usecs + 1_000_100), + ); + assert_eq!(result.0.len(), 8); + assert_eq!(result.1, 3); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 1_200_000); + // author_0_batches[0] is removed. txn_1 expired. + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 1_200_100), + ); + assert_eq!(result.0.len(), 7); + assert_eq!(result.1, 3); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 2_000_000); + // author_0_batches[0] is removed. txn_0, txn_1 are expired. + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 2_000_100), + ); + assert_eq!(result.0.len(), 7); + assert_eq!(result.1, 2); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 2_500_000); + // author_0_batches[0], author_1_batches[1] is removed. txn_0, txn_1 is expired. + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 2_500_100), + ); + assert_eq!(result.0.len(), 6); + assert_eq!(result.1, 2); + + let result = proof_queue.pull_proofs( + &hashset![info_7], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 2_500_100), + ); + // author_0_batches[0], author_1_batches[1] is removed. author_1_batches[2] is excluded. txn_0, txn_1 are expired. + assert_eq!(result.0.len(), 5); + assert_eq!(result.1, 1); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 3_000_000); + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 3_000_100), + ); + // author_0_batches[0], author_0_batches[1], author_1_batches[1] are removed. txn_0, txn_1, txn_2 are expired. + assert_eq!(result.0.len(), 5); + assert_eq!(result.1, 1); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 3_500_000); + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 3_500_100), + ); + // author_0_batches[0], author_0_batches[1], author_1_batches[1], author_1_batches[2] are removed. txn_0, txn_1, txn_0 are expired. + assert_eq!(result.0.len(), 4); + assert_eq!(result.1, 0); + + proof_queue.handle_updated_block_timestamp(now_in_usecs + 4_000_000); + let result = proof_queue.pull_proofs( + &hashset![], + 8, + 4, + 400, + true, + Duration::from_micros(now_in_usecs + 4_000_100), ); + // author_0_batches[0], author_0_batches[1], author_0_batches[3], author_1_batches[0], author_1_batches[1], author_1_batches[2] are removed. + // txn_0, txn_1, txn_2 are expired. + assert_eq!(result.0.len(), 2); + assert_eq!(result.1, 0); } diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 65c01b839e424..91d07487ba404 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -3,7 +3,7 @@ use anyhow::ensure; use aptos_consensus_types::{ - common::{BatchPayload, TransactionSummary}, + common::{BatchPayload, TxnSummaryWithExpiration}, proof_of_store::{BatchId, BatchInfo}, }; use aptos_crypto::{hash::CryptoHash, HashValue}; @@ -58,14 +58,15 @@ impl PersistedValue { &self.maybe_payload } - pub fn summary(&self) -> Vec { + pub fn summary(&self) -> Vec { if let Some(payload) = &self.maybe_payload { return payload .iter() .map(|txn| { - TransactionSummary::new( + TxnSummaryWithExpiration::new( txn.sender(), txn.sequence_number(), + txn.expiration_timestamp_secs(), txn.committed_hash(), ) }) diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index aaa9727c1ca51..458edcde7b745 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -3,7 +3,7 @@ use crate::{monitor, quorum_store::counters}; use aptos_consensus_types::{ - common::{TransactionInProgress, TransactionSummary}, + common::{TransactionInProgress, TransactionSummary, TxnSummaryWithExpiration}, proof_of_store::{BatchId, BatchInfo, ProofOfStore}, }; use aptos_logger::prelude::*; @@ -198,14 +198,14 @@ pub struct ProofQueue { author_to_batches: HashMap>, // ProofOfStore and insertion_time. None if committed batch_to_proof: HashMap>, - // Number of unexpired and uncommitted proofs in which the txn_summary = (sender, sequence number, hash) + // Number of unexpired and uncommitted proofs in which the txn_summary = (sender, sequence number, hash, expiration) // has been included. We only count those batches that are in both batch_to_proof and author_to_batches. - txn_summary_num_occurrences: HashMap, + txn_summary_num_occurrences: HashMap, // List of transaction summaries for each batch, along with expiration time and a boolean stating whether a proof for the // batch has been received. If the boolean is true, then all the transaction summaries in the batch have // been added to txn_summary_num_occurrences. The boolean has been added to avoid double counting when a // batch summary is received multiple times. - batch_summaries: HashMap, u64, bool)>, + batch_summaries: HashMap, u64, bool)>, // Expiration index expirations: TimeExpirations, latest_block_timestamp: u64, @@ -252,6 +252,10 @@ impl ProofQueue { } } + pub(crate) fn batch_summaries_len(&self) -> usize { + self.batch_summaries.len() + } + fn remaining_txns_without_duplicates(&self) -> u64 { // txn_summary_num_occurrences counts all the unexpired and uncommitted proofs that have txn summaries // in batch_summaries. @@ -334,7 +338,7 @@ impl ProofQueue { pub(crate) fn add_batch_summaries( &mut self, - batch_summaries: Vec<(BatchInfo, Vec)>, + batch_summaries: Vec<(BatchInfo, Vec)>, ) { let start = Instant::now(); for (batch_info, txn_summaries) in batch_summaries { @@ -455,6 +459,7 @@ impl ProofQueue { max_txns_after_filtering: u64, max_bytes: u64, return_non_full: bool, + block_timestamp: Duration, ) -> (Vec, u64, bool) { let mut ret = vec![]; let mut cur_bytes = 0; @@ -494,7 +499,11 @@ impl ProofQueue { cur_unique_txns + txn_summaries .iter() - .filter(|txn_summary| !filtered_txns.contains(txn_summary)) + .filter(|txn_summary| { + !filtered_txns.contains(txn_summary) + && block_timestamp.as_secs() + < txn_summary.expiration_timestamp_secs + }) .count() as u64 } else { cur_unique_txns + batch.num_txns() @@ -516,7 +525,11 @@ impl ProofQueue { |(summaries, _, _)| { summaries .iter() - .filter(|summary| filtered_txns.insert(**summary)) + .filter(|summary| { + filtered_txns.insert(**summary) + && block_timestamp.as_secs() + < summary.expiration_timestamp_secs + }) .count() as u64 }, ); diff --git a/consensus/src/test_utils/mock_payload_manager.rs b/consensus/src/test_utils/mock_payload_manager.rs index e855347ec4ebb..fec52c63599b2 100644 --- a/consensus/src/test_utils/mock_payload_manager.rs +++ b/consensus/src/test_utils/mock_payload_manager.rs @@ -68,6 +68,7 @@ impl PayloadClient for MockPayloadManager { _pending_ordering: bool, _pending_uncommitted_blocks: usize, _recent_fill_fraction: f32, + _block_timestamp: Duration, ) -> Result<(Vec, Payload), QuorumStoreError> { // generate 1k txn is too slow with coverage instrumentation Ok((