From 93a3f2e8f46d8858c0a8d3938a8eb8f6cec0d754 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 15:32:33 -0700 Subject: [PATCH] Fix the priority of broadcast batched label --- mempool/src/core_mempool/mempool.rs | 4 +- mempool/src/core_mempool/transaction_store.rs | 4 +- mempool/src/shared_mempool/network.rs | 1 + mempool/src/tests/core_mempool_test.rs | 184 +++++++++++++++--- mempool/src/tests/shared_mempool_test.rs | 15 +- 5 files changed, 174 insertions(+), 34 deletions(-) diff --git a/mempool/src/core_mempool/mempool.rs b/mempool/src/core_mempool/mempool.rs index e0da989c90fd1..3fb4931bfacf6 100644 --- a/mempool/src/core_mempool/mempool.rs +++ b/mempool/src/core_mempool/mempool.rs @@ -502,11 +502,13 @@ impl Mempool { timeline_id: &MultiBucketTimelineIndexIds, count: usize, before: Option, + priority_of_receiver: BroadcastPeerPriority, ) -> ( Vec<(SignedTransaction, SystemTime)>, MultiBucketTimelineIndexIds, ) { - self.transactions.read_timeline(timeline_id, count, before) + self.transactions + .read_timeline(timeline_id, count, before, priority_of_receiver) } /// Read transactions from timeline from `start_id` (exclusive) to `end_id` (inclusive), diff --git a/mempool/src/core_mempool/transaction_store.rs b/mempool/src/core_mempool/transaction_store.rs index 616cd211feac0..19de47373a2f0 100644 --- a/mempool/src/core_mempool/transaction_store.rs +++ b/mempool/src/core_mempool/transaction_store.rs @@ -571,6 +571,8 @@ impl TransactionStore { timeline_id: &MultiBucketTimelineIndexIds, count: usize, before: Option, + // The priority of the receipient of the transactions + priority_of_receiver: BroadcastPeerPriority, ) -> ( Vec<(SignedTransaction, SystemTime)>, MultiBucketTimelineIndexIds, @@ -613,7 +615,7 @@ impl TransactionStore { &txn.insertion_info, bucket, BROADCAST_BATCHED_LABEL, - txn.priority_of_sender.to_string().as_str(), + priority_of_receiver.to_string().as_str(), ); counters::core_mempool_txn_ranking_score( BROADCAST_BATCHED_LABEL, diff --git a/mempool/src/shared_mempool/network.rs b/mempool/src/shared_mempool/network.rs index d7cef6d28917c..0d3393333a924 100644 --- a/mempool/src/shared_mempool/network.rs +++ b/mempool/src/shared_mempool/network.rs @@ -452,6 +452,7 @@ impl> MempoolNetworkInterf &state.timeline_id, self.mempool_config.shared_mempool_batch_size, before, + peer_priority, ); ( MultiBatchId::from_timeline_ids(&state.timeline_id, &new_timeline_id), diff --git a/mempool/src/tests/core_mempool_test.rs b/mempool/src/tests/core_mempool_test.rs index 9881552a8d144..b624b9daf7561 100644 --- a/mempool/src/tests/core_mempool_test.rs +++ b/mempool/src/tests/core_mempool_test.rs @@ -371,25 +371,29 @@ fn test_timeline() { TestTransaction::new(1, 5, 1), ]); - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(view(timeline), vec![0, 1]); // Txns 3 and 5 should be in parking lot. assert_eq!(2, pool.get_parking_lot_size()); // Add txn 2 to unblock txn3. add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 1)]); - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(view(timeline), vec![0, 1, 2, 3]); // Txn 5 should be in parking lot. assert_eq!(1, pool.get_parking_lot_size()); // Try different start read position. - let (timeline, _) = pool.read_timeline(&vec![2].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![2].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(view(timeline), vec![2, 3]); // Simulate callback from consensus to unblock txn 5. pool.commit_transaction(&TestTransaction::get_address(1), 4); - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(view(timeline), vec![5]); // check parking lot is empty assert_eq!(0, pool.get_parking_lot_size()); @@ -410,16 +414,23 @@ fn test_timeline_before() { &vec![0].into(), 10, Some(insertion_done_time - Duration::from_millis(200)), + BroadcastPeerPriority::Primary, ); assert!(timeline.is_empty()); - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, Some(insertion_done_time)); + let (timeline, _) = pool.read_timeline( + &vec![0].into(), + 10, + Some(insertion_done_time), + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![0, 1]); let (timeline, _) = pool.read_timeline( &vec![0].into(), 10, Some(insertion_done_time + Duration::from_millis(200)), + BroadcastPeerPriority::Primary, ); assert_eq!(view(timeline), vec![0, 1]); } @@ -434,41 +445,96 @@ fn test_multi_bucket_timeline() { TestTransaction::new(1, 5, 300), // bucket 2 ]); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![0, 1]); // Txns 3 and 5 should be in parking lot. assert_eq!(2, pool.get_parking_lot_size()); // Add txn 2 to unblock txn3. add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 1)]); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![0, 1, 2, 3]); // Txn 5 should be in parking lot. assert_eq!(1, pool.get_parking_lot_size()); // Try different start read positions. Expected buckets: [[0, 1, 2], [3], []] - let (timeline, _) = pool.read_timeline(&vec![1, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![1, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![1, 2, 3]); - let (timeline, _) = pool.read_timeline(&vec![2, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![2, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2, 3]); - let (timeline, _) = pool.read_timeline(&vec![0, 1, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 1, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![0, 1, 2]); - let (timeline, _) = pool.read_timeline(&vec![1, 1, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![1, 1, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![1, 2]); - let (timeline, _) = pool.read_timeline(&vec![2, 1, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![2, 1, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2]); - let (timeline, _) = pool.read_timeline(&vec![3, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![3, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![3]); - let (timeline, _) = pool.read_timeline(&vec![3, 1, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![3, 1, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert!(view(timeline).is_empty()); // Ensure high gas is prioritized. - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 1, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 1, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![3]); // Simulate callback from consensus to unblock txn 5. pool.commit_transaction(&TestTransaction::get_address(1), 4); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![5]); // check parking lot is empty assert_eq!(0, pool.get_parking_lot_size()); @@ -485,26 +551,56 @@ fn test_multi_bucket_gas_ranking_update() { ]); // txn 2 and 3 are prioritized - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 2, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 2, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2, 3]); // read only bucket 2 - let (timeline, _) = pool.read_timeline(&vec![10, 10, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![10, 10, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert!(view(timeline).is_empty()); // resubmit with higher gas: move txn 2 to bucket 2 add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 400)]); // txn 2 is now prioritized - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 1, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 1, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2]); // then txn 3 is prioritized - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 2, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 2, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2, 3]); // read only bucket 2 - let (timeline, _) = pool.read_timeline(&vec![10, 10, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![10, 10, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2]); // read only bucket 1 - let (timeline, _) = pool.read_timeline(&vec![10, 0, 10].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![10, 0, 10].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![3]); } @@ -518,23 +614,48 @@ fn test_multi_bucket_removal() { TestTransaction::new(1, 3, 200), // bucket 1 ]); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![0, 1, 2, 3]); pool.commit_transaction(&TestTransaction::get_address(1), 0); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![1, 2, 3]); pool.commit_transaction(&TestTransaction::get_address(1), 1); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![2, 3]); pool.commit_transaction(&TestTransaction::get_address(1), 2); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(view(timeline), vec![3]); pool.commit_transaction(&TestTransaction::get_address(1), 3); - let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0, 0, 0].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert!(view(timeline).is_empty()); } @@ -720,7 +841,8 @@ fn test_gc_ready_transaction() { add_txn(&mut pool, TestTransaction::new(1, 3, 1)).unwrap(); // Check that all txns are ready. - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(timeline.len(), 4); // GC expired transaction. @@ -731,7 +853,8 @@ fn test_gc_ready_transaction() { assert_eq!(block.len(), 1); assert_eq!(block[0].sequence_number(), 0); - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(timeline.len(), 1); assert_eq!(timeline[0].0.sequence_number(), 0); @@ -739,7 +862,8 @@ fn test_gc_ready_transaction() { add_txn(&mut pool, TestTransaction::new(1, 1, 1)).unwrap(); // Make sure txns 2 and 3 can be broadcast after txn 1 is resubmitted - let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None); + let (timeline, _) = + pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary); assert_eq!(timeline.len(), 4); } diff --git a/mempool/src/tests/shared_mempool_test.rs b/mempool/src/tests/shared_mempool_test.rs index 701a5e53c6541..520fd3391555c 100644 --- a/mempool/src/tests/shared_mempool_test.rs +++ b/mempool/src/tests/shared_mempool_test.rs @@ -4,6 +4,7 @@ use crate::{ mocks::MockSharedMempool, + network::BroadcastPeerPriority, tests::common::{batch_add_signed_txn, TestTransaction}, QuorumStoreRequest, }; @@ -48,7 +49,12 @@ async fn test_consensus_events_rejected_txns() { let pool = smp.mempool.lock(); // TODO: make less brittle to broadcast buckets changes - let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0; 10].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); assert_eq!(timeline.len(), 2); assert_eq!(timeline.first().unwrap().0, kept_txn); } @@ -89,7 +95,12 @@ async fn test_mempool_notify_committed_txns() { let wait_for_commit = async { let pool = smp.mempool.lock(); // TODO: make less brittle to broadcast buckets changes - let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10, None); + let (timeline, _) = pool.read_timeline( + &vec![0; 10].into(), + 10, + None, + BroadcastPeerPriority::Primary, + ); if timeline.len() == 10 && timeline.first().unwrap().0 == kept_txn { return; // Mempool handled the commit notification }