diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a2a79bed79e523..c62041b6a2198a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -82,9 +82,11 @@ pub struct BankingStageStats { id: u32, process_packets_count: AtomicUsize, new_tx_count: AtomicUsize, - dropped_batches_count: AtomicUsize, + dropped_packet_batches_count: AtomicUsize, + dropped_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, + current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, reset_cost_tracker_count: AtomicUsize, @@ -129,8 +131,13 @@ impl BankingStageStats { i64 ), ( - "dropped_batches_count", - self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64, + "dropped_packet_batches_count", + self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "dropped_packets_count", + self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -138,6 +145,12 @@ impl BankingStageStats { self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "current_buffered_packet_batches_count", + self.current_buffered_packet_batches_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "current_buffered_packets_count", self.current_buffered_packets_count @@ -1345,7 +1358,8 @@ impl BankingStage { let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); - let mut dropped_batches_count = 0; + let mut dropped_packets_count = 0; + let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); @@ -1355,7 +1369,8 @@ impl BankingStage { buffered_packets, msgs, packet_indexes, - &mut dropped_batches_count, + &mut dropped_packets_count, + &mut dropped_packet_batches_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1386,7 +1401,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1414,7 +1430,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1449,14 +1466,18 @@ impl BankingStage { .new_tx_count .fetch_add(new_tx_count, Ordering::Relaxed); banking_stage_stats - .dropped_batches_count - .fetch_add(dropped_batches_count, Ordering::Relaxed); + .dropped_packet_batches_count + .fetch_add(dropped_packet_batches_count, Ordering::Relaxed); banking_stage_stats .newly_buffered_packets_count .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats - .current_buffered_packets_count + .current_buffered_packet_batches_count .swap(buffered_packets.len(), Ordering::Relaxed); + banking_stage_stats.current_buffered_packets_count.swap( + buffered_packets.map(|packets| packets.1).sum(), + Ordering::Relaxed, + ); *recv_start = Instant::now(); Ok(()) } @@ -1465,7 +1486,8 @@ impl BankingStage { unprocessed_packets: &mut UnprocessedPackets, packets: Packets, mut packet_indexes: Vec, - dropped_batches_count: &mut usize, + dropped_packet_batches_count: &mut usize, + dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, duplicates: &Arc, PacketHasher)>>, @@ -1492,8 +1514,10 @@ impl BankingStage { } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { - *dropped_batches_count += 1; - unprocessed_packets.pop_front(); + *dropped_packet_batches_count += 1; + if let Some(dropped_batch) = unprocessed_packets.pop_front() { + *dropped_packets_count += dropped_batch.1.len(); + } } *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packets.push_back((packets, packet_indexes, false)); @@ -2838,7 +2862,8 @@ mod tests { LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); - let mut dropped_batches_count = 0; + let mut dropped_packet_batches_count = 0; + let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; let banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the @@ -2847,14 +2872,15 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); assert_eq!(newly_buffered_packets_count, 0); // Because the set of unprocessed `packet_indexes` is non-empty, the @@ -2864,14 +2890,15 @@ mod tests { &mut unprocessed_packets, new_packets, packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); assert_eq!(newly_buffered_packets_count, 1); // Because we've reached the batch limit, old unprocessed packets are @@ -2886,7 +2913,8 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, @@ -2894,7 +2922,7 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); assert_eq!(newly_buffered_packets_count, 2); // Check duplicates are dropped @@ -2902,7 +2930,8 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, 3, &duplicates, @@ -2910,7 +2939,7 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); assert_eq!(newly_buffered_packets_count, 2); }