Skip to content

Commit

Permalink
Fix the priority of broadcast batched label
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Jul 18, 2024
1 parent b2f2291 commit 93a3f2e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 34 deletions.
4 changes: 3 additions & 1 deletion mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,13 @@ impl Mempool {
timeline_id: &MultiBucketTimelineIndexIds,
count: usize,
before: Option<Instant>,
priority_of_receiver: BroadcastPeerPriority,
) -> (
Vec<(SignedTransaction, SystemTime)>,
MultiBucketTimelineIndexIds,
) {
self.transactions.read_timeline(timeline_id, count, before)
self.transactions
.read_timeline(timeline_id, count, before, priority_of_receiver)
}

/// Read transactions from timeline from `start_id` (exclusive) to `end_id` (inclusive),
Expand Down
4 changes: 3 additions & 1 deletion mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ impl TransactionStore {
timeline_id: &MultiBucketTimelineIndexIds,
count: usize,
before: Option<Instant>,
// The priority of the receipient of the transactions
priority_of_receiver: BroadcastPeerPriority,
) -> (
Vec<(SignedTransaction, SystemTime)>,
MultiBucketTimelineIndexIds,
Expand Down Expand Up @@ -613,7 +615,7 @@ impl TransactionStore {
&txn.insertion_info,
bucket,
BROADCAST_BATCHED_LABEL,
txn.priority_of_sender.to_string().as_str(),
priority_of_receiver.to_string().as_str(),
);
counters::core_mempool_txn_ranking_score(
BROADCAST_BATCHED_LABEL,
Expand Down
1 change: 1 addition & 0 deletions mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf
&state.timeline_id,
self.mempool_config.shared_mempool_batch_size,
before,
peer_priority,
);
(
MultiBatchId::from_timeline_ids(&state.timeline_id, &new_timeline_id),
Expand Down
184 changes: 154 additions & 30 deletions mempool/src/tests/core_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,25 +371,29 @@ fn test_timeline() {
TestTransaction::new(1, 5, 1),
]);

let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(view(timeline), vec![0, 1]);
// Txns 3 and 5 should be in parking lot.
assert_eq!(2, pool.get_parking_lot_size());

// Add txn 2 to unblock txn3.
add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 1)]);
let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(view(timeline), vec![0, 1, 2, 3]);
// Txn 5 should be in parking lot.
assert_eq!(1, pool.get_parking_lot_size());

// Try different start read position.
let (timeline, _) = pool.read_timeline(&vec![2].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![2].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(view(timeline), vec![2, 3]);

// Simulate callback from consensus to unblock txn 5.
pool.commit_transaction(&TestTransaction::get_address(1), 4);
let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(view(timeline), vec![5]);
// check parking lot is empty
assert_eq!(0, pool.get_parking_lot_size());
Expand All @@ -410,16 +414,23 @@ fn test_timeline_before() {
&vec![0].into(),
10,
Some(insertion_done_time - Duration::from_millis(200)),
BroadcastPeerPriority::Primary,
);
assert!(timeline.is_empty());

let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, Some(insertion_done_time));
let (timeline, _) = pool.read_timeline(
&vec![0].into(),
10,
Some(insertion_done_time),
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1]);

let (timeline, _) = pool.read_timeline(
&vec![0].into(),
10,
Some(insertion_done_time + Duration::from_millis(200)),
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1]);
}
Expand All @@ -434,41 +445,96 @@ fn test_multi_bucket_timeline() {
TestTransaction::new(1, 5, 300), // bucket 2
]);

let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1]);
// Txns 3 and 5 should be in parking lot.
assert_eq!(2, pool.get_parking_lot_size());

// Add txn 2 to unblock txn3.
add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 1)]);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1, 2, 3]);
// Txn 5 should be in parking lot.
assert_eq!(1, pool.get_parking_lot_size());

// Try different start read positions. Expected buckets: [[0, 1, 2], [3], []]
let (timeline, _) = pool.read_timeline(&vec![1, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![1, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![1, 2, 3]);
let (timeline, _) = pool.read_timeline(&vec![2, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![2, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2, 3]);
let (timeline, _) = pool.read_timeline(&vec![0, 1, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 1, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1, 2]);
let (timeline, _) = pool.read_timeline(&vec![1, 1, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![1, 1, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![1, 2]);
let (timeline, _) = pool.read_timeline(&vec![2, 1, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![2, 1, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2]);
let (timeline, _) = pool.read_timeline(&vec![3, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![3, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![3]);
let (timeline, _) = pool.read_timeline(&vec![3, 1, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![3, 1, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert!(view(timeline).is_empty());

// Ensure high gas is prioritized.
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 1, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
1,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![3]);

// Simulate callback from consensus to unblock txn 5.
pool.commit_transaction(&TestTransaction::get_address(1), 4);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![5]);
// check parking lot is empty
assert_eq!(0, pool.get_parking_lot_size());
Expand All @@ -485,26 +551,56 @@ fn test_multi_bucket_gas_ranking_update() {
]);

// txn 2 and 3 are prioritized
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 2, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
2,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2, 3]);
// read only bucket 2
let (timeline, _) = pool.read_timeline(&vec![10, 10, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![10, 10, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert!(view(timeline).is_empty());

// resubmit with higher gas: move txn 2 to bucket 2
add_txns_to_mempool(&mut pool, vec![TestTransaction::new(1, 2, 400)]);

// txn 2 is now prioritized
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 1, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
1,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2]);
// then txn 3 is prioritized
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 2, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
2,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2, 3]);
// read only bucket 2
let (timeline, _) = pool.read_timeline(&vec![10, 10, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![10, 10, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2]);
// read only bucket 1
let (timeline, _) = pool.read_timeline(&vec![10, 0, 10].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![10, 0, 10].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![3]);
}

Expand All @@ -518,23 +614,48 @@ fn test_multi_bucket_removal() {
TestTransaction::new(1, 3, 200), // bucket 1
]);

let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![0, 1, 2, 3]);

pool.commit_transaction(&TestTransaction::get_address(1), 0);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![1, 2, 3]);

pool.commit_transaction(&TestTransaction::get_address(1), 1);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![2, 3]);

pool.commit_transaction(&TestTransaction::get_address(1), 2);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(view(timeline), vec![3]);

pool.commit_transaction(&TestTransaction::get_address(1), 3);
let (timeline, _) = pool.read_timeline(&vec![0, 0, 0].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0, 0, 0].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert!(view(timeline).is_empty());
}

Expand Down Expand Up @@ -720,7 +841,8 @@ fn test_gc_ready_transaction() {
add_txn(&mut pool, TestTransaction::new(1, 3, 1)).unwrap();

// Check that all txns are ready.
let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(timeline.len(), 4);

// GC expired transaction.
Expand All @@ -731,15 +853,17 @@ fn test_gc_ready_transaction() {
assert_eq!(block.len(), 1);
assert_eq!(block[0].sequence_number(), 0);

let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(timeline.len(), 1);
assert_eq!(timeline[0].0.sequence_number(), 0);

// Resubmit txn 1
add_txn(&mut pool, TestTransaction::new(1, 1, 1)).unwrap();

// Make sure txns 2 and 3 can be broadcast after txn 1 is resubmitted
let (timeline, _) = pool.read_timeline(&vec![0].into(), 10, None);
let (timeline, _) =
pool.read_timeline(&vec![0].into(), 10, None, BroadcastPeerPriority::Primary);
assert_eq!(timeline.len(), 4);
}

Expand Down
15 changes: 13 additions & 2 deletions mempool/src/tests/shared_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{
mocks::MockSharedMempool,
network::BroadcastPeerPriority,
tests::common::{batch_add_signed_txn, TestTransaction},
QuorumStoreRequest,
};
Expand Down Expand Up @@ -48,7 +49,12 @@ async fn test_consensus_events_rejected_txns() {

let pool = smp.mempool.lock();
// TODO: make less brittle to broadcast buckets changes
let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0; 10].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
assert_eq!(timeline.len(), 2);
assert_eq!(timeline.first().unwrap().0, kept_txn);
}
Expand Down Expand Up @@ -89,7 +95,12 @@ async fn test_mempool_notify_committed_txns() {
let wait_for_commit = async {
let pool = smp.mempool.lock();
// TODO: make less brittle to broadcast buckets changes
let (timeline, _) = pool.read_timeline(&vec![0; 10].into(), 10, None);
let (timeline, _) = pool.read_timeline(
&vec![0; 10].into(),
10,
None,
BroadcastPeerPriority::Primary,
);
if timeline.len() == 10 && timeline.first().unwrap().0 == kept_txn {
return; // Mempool handled the commit notification
}
Expand Down

0 comments on commit 93a3f2e

Please sign in to comment.