From 5cd180eea99094ef926a6381fb3f2f05868b0492 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Wed, 15 Sep 2021 15:53:55 -0500 Subject: [PATCH] Add banking metrics for buffered and dropped packets (#19902) (cherry picked from commit ca3f1476703292a5fec1739643f580417b4c3e4a) --- core/src/banking_stage.rs | 89 ++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3201ae2dcc801e..bb323d7df3a3df 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -91,9 +91,11 @@ pub struct BankingStageStats { id: u32, process_packets_count: AtomicUsize, new_tx_count: AtomicUsize, - dropped_batches_count: AtomicUsize, + dropped_packet_batches_count: AtomicUsize, + dropped_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, + current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, @@ -143,8 +145,13 @@ impl BankingStageStats { i64 ), ( - "dropped_batches_count", - self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64, + "dropped_packet_batches_count", + self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "dropped_packets_count", + self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -152,6 +159,12 @@ impl BankingStageStats { self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "current_buffered_packet_batches_count", + self.current_buffered_packet_batches_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "current_buffered_packets_count", self.current_buffered_packets_count @@ -1225,7 +1238,8 @@ impl BankingStage { let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); - let mut dropped_batches_count = 0; + let mut dropped_packets_count = 0; + let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); @@ -1235,7 +1249,8 @@ impl BankingStage { buffered_packets, msgs, packet_indexes, - &mut dropped_batches_count, + &mut dropped_packets_count, + &mut dropped_packet_batches_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1264,7 +1279,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1290,7 +1306,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1325,14 +1342,18 @@ impl BankingStage { .new_tx_count .fetch_add(new_tx_count, Ordering::Relaxed); banking_stage_stats - .dropped_batches_count - .fetch_add(dropped_batches_count, Ordering::Relaxed); + .dropped_packet_batches_count + .fetch_add(dropped_packet_batches_count, Ordering::Relaxed); banking_stage_stats .newly_buffered_packets_count .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats - .current_buffered_packets_count + .current_buffered_packet_batches_count .swap(buffered_packets.len(), Ordering::Relaxed); + banking_stage_stats.current_buffered_packets_count.swap( + buffered_packets.iter().map(|packets| packets.1.len()).sum(), + Ordering::Relaxed, + ); *recv_start = Instant::now(); Ok(()) } @@ -1341,7 +1362,8 @@ impl BankingStage { unprocessed_packets: &mut UnprocessedPackets, packets: Packets, mut packet_indexes: Vec, - dropped_batches_count: &mut usize, + dropped_packet_batches_count: &mut usize, + dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, duplicates: &Arc, PacketHasher)>>, @@ -1368,8 +1390,10 @@ impl BankingStage { } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { - *dropped_batches_count += 1; - unprocessed_packets.pop_front(); + *dropped_packet_batches_count += 1; + if let Some(dropped_batch) = unprocessed_packets.pop_front() { + *dropped_packets_count += dropped_batch.1.len(); + } } *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packets.push_back((packets, packet_indexes, false)); @@ -2772,23 +2796,22 @@ mod tests { #[test] fn test_push_unprocessed_batch_limit() { solana_logger::setup(); - // Create `Packets` with 1 unprocessed element - let single_element_packets = Packets::new(vec![Packet::default()]); + // Create `Packets` with 2 unprocessed elements + let new_packets = Packets::new(vec![Packet::default(); 2]); let mut unprocessed_packets: UnprocessedPackets = - vec![(single_element_packets.clone(), vec![0], false)] - .into_iter() - .collect(); + vec![(new_packets, vec![0, 1], false)].into_iter().collect(); // Set the limit to 2 let batch_limit = 2; // Create some new unprocessed packets - let new_packets = single_element_packets; + let new_packets = Packets::new(vec![Packet::default()]); let packet_indexes = vec![]; let duplicates = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); - let mut dropped_batches_count = 0; + let mut dropped_packet_batches_count = 0; + let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; let banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the @@ -2797,14 +2820,16 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); + assert_eq!(dropped_packets_count, 0); assert_eq!(newly_buffered_packets_count, 0); // Because the set of unprocessed `packet_indexes` is non-empty, the @@ -2814,14 +2839,16 @@ mod tests { &mut unprocessed_packets, new_packets, packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); + assert_eq!(dropped_packets_count, 0); assert_eq!(newly_buffered_packets_count, 1); // Because we've reached the batch limit, old unprocessed packets are @@ -2836,7 +2863,8 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, @@ -2844,15 +2872,17 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); + assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); - // Check duplicates are dropped + // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, 3, &duplicates, @@ -2860,7 +2890,8 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); + assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); }