Skip to content

Commit

Permalink
Add banking metrics for buffered and dropped packets
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry committed Sep 15, 2021
1 parent 691bea8 commit cd26fd1
Showing 1 changed file with 51 additions and 22 deletions.
73 changes: 51 additions & 22 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ pub struct BankingStageStats {
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
dropped_batches_count: AtomicUsize,
dropped_packet_batches_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize,
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
reset_cost_tracker_count: AtomicUsize,
Expand Down Expand Up @@ -129,15 +131,26 @@ impl BankingStageStats {
i64
),
(
"dropped_batches_count",
self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64,
"dropped_packet_batches_count",
self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"dropped_packets_count",
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"current_buffered_packet_batches_count",
self.current_buffered_packet_batches_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"current_buffered_packets_count",
self.current_buffered_packets_count
Expand Down Expand Up @@ -1345,7 +1358,8 @@ impl BankingStage {
let mut new_tx_count = 0;

let mut mms_iter = mms.into_iter();
let mut dropped_batches_count = 0;
let mut dropped_packets_count = 0;
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
Expand All @@ -1355,7 +1369,8 @@ impl BankingStage {
buffered_packets,
msgs,
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packets_count,
&mut dropped_packet_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand Down Expand Up @@ -1386,7 +1401,8 @@ impl BankingStage {
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand Down Expand Up @@ -1414,7 +1430,8 @@ impl BankingStage {
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand Down Expand Up @@ -1449,14 +1466,18 @@ impl BankingStage {
.new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed);
banking_stage_stats
.dropped_batches_count
.fetch_add(dropped_batches_count, Ordering::Relaxed);
.dropped_packet_batches_count
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
banking_stage_stats
.newly_buffered_packets_count
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packets_count
.current_buffered_packet_batches_count
.swap(buffered_packets.len(), Ordering::Relaxed);
banking_stage_stats.current_buffered_packets_count.swap(
buffered_packets.iter().map(|packets| packets.1).sum(),
Ordering::Relaxed,
);
*recv_start = Instant::now();
Ok(())
}
Expand All @@ -1465,7 +1486,8 @@ impl BankingStage {
unprocessed_packets: &mut UnprocessedPackets,
packets: Packets,
mut packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize,
dropped_packet_batches_count: &mut usize,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
Expand All @@ -1492,8 +1514,10 @@ impl BankingStage {
}
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit {
*dropped_batches_count += 1;
unprocessed_packets.pop_front();
*dropped_packet_batches_count += 1;
if let Some(dropped_batch) = unprocessed_packets.pop_front() {
*dropped_packets_count += dropped_batch.1.len();
}
}
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packets.push_back((packets, packet_indexes, false));
Expand Down Expand Up @@ -2838,7 +2862,8 @@ mod tests {
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let mut dropped_batches_count = 0;
let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the
Expand All @@ -2847,14 +2872,15 @@ mod tests {
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0);
assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(newly_buffered_packets_count, 0);

// Because the set of unprocessed `packet_indexes` is non-empty, the
Expand All @@ -2864,14 +2890,15 @@ mod tests {
&mut unprocessed_packets,
new_packets,
packet_indexes.clone(),
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0);
assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(newly_buffered_packets_count, 1);

// Because we've reached the batch limit, old unprocessed packets are
Expand All @@ -2886,31 +2913,33 @@ mod tests {
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes.clone(),
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(newly_buffered_packets_count, 2);

// Check duplicates are dropped
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
3,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(newly_buffered_packets_count, 2);
}

Expand Down

0 comments on commit cd26fd1

Please sign in to comment.