diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a2a79bed79e523..ec51e90163d9a1 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -82,9 +82,11 @@ pub struct BankingStageStats { id: u32, process_packets_count: AtomicUsize, new_tx_count: AtomicUsize, - dropped_batches_count: AtomicUsize, + dropped_packet_batches_count: AtomicUsize, + dropped_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, + current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, reset_cost_tracker_count: AtomicUsize, @@ -129,8 +131,13 @@ impl BankingStageStats { i64 ), ( - "dropped_batches_count", - self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64, + "dropped_packet_batches_count", + self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "dropped_packets_count", + self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -138,6 +145,12 @@ impl BankingStageStats { self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "current_buffered_packet_batches_count", + self.current_buffered_packet_batches_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "current_buffered_packets_count", self.current_buffered_packets_count @@ -1345,7 +1358,8 @@ impl BankingStage { let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); - let mut dropped_batches_count = 0; + let mut dropped_packets_count = 0; + let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); @@ -1355,7 +1369,8 @@ impl BankingStage { buffered_packets, msgs, packet_indexes, - &mut dropped_batches_count, + &mut dropped_packets_count, + &mut dropped_packet_batches_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1386,7 +1401,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1414,7 +1430,8 @@ impl BankingStage { buffered_packets, msgs, unprocessed_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, duplicates, @@ -1449,14 +1466,18 @@ impl BankingStage { .new_tx_count .fetch_add(new_tx_count, Ordering::Relaxed); banking_stage_stats - .dropped_batches_count - .fetch_add(dropped_batches_count, Ordering::Relaxed); + .dropped_packet_batches_count + .fetch_add(dropped_packet_batches_count, Ordering::Relaxed); banking_stage_stats .newly_buffered_packets_count .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); banking_stage_stats - .current_buffered_packets_count + .current_buffered_packet_batches_count .swap(buffered_packets.len(), Ordering::Relaxed); + banking_stage_stats.current_buffered_packets_count.swap( + buffered_packets.iter().map(|packets| packets.1.len()).sum(), + Ordering::Relaxed, + ); *recv_start = Instant::now(); Ok(()) } @@ -1465,7 +1486,8 @@ impl BankingStage { unprocessed_packets: &mut UnprocessedPackets, packets: Packets, mut packet_indexes: Vec, - dropped_batches_count: &mut usize, + dropped_packet_batches_count: &mut usize, + dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, duplicates: &Arc, PacketHasher)>>, @@ -1492,8 +1514,10 @@ impl BankingStage { } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { - *dropped_batches_count += 1; - unprocessed_packets.pop_front(); + *dropped_packet_batches_count += 1; + if let Some(dropped_batch) = unprocessed_packets.pop_front() { + *dropped_packets_count += dropped_batch.1.len(); + } } *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packets.push_back((packets, packet_indexes, false)); @@ -2823,22 +2847,23 @@ mod tests { fn test_push_unprocessed_batch_limit() { solana_logger::setup(); // Create `Packets` with 1 unprocessed element - let single_element_packets = Packets::new(vec![Packet::default()]); + let new_packets = Packets::new(vec![Packet::default(); 2]); let mut unprocessed_packets: UnprocessedPackets = - vec![(single_element_packets.clone(), vec![0], false)] + vec![(new_packets.clone(), vec![0, 1], false)] .into_iter() .collect(); // Set the limit to 2 let batch_limit = 2; // Create some new unprocessed packets - let new_packets = single_element_packets; + let new_packets = Packets::new(vec![Packet::default()]); let packet_indexes = vec![]; let duplicates = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); - let mut dropped_batches_count = 0; + let mut dropped_packet_batches_count = 0; + let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; let banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the @@ -2847,14 +2872,16 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); + assert_eq!(dropped_packets_count, 0); assert_eq!(newly_buffered_packets_count, 0); // Because the set of unprocessed `packet_indexes` is non-empty, the @@ -2864,14 +2891,16 @@ mod tests { &mut unprocessed_packets, new_packets, packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(dropped_batches_count, 0); + assert_eq!(dropped_packet_batches_count, 0); + assert_eq!(dropped_packets_count, 0); assert_eq!(newly_buffered_packets_count, 1); // Because we've reached the batch limit, old unprocessed packets are @@ -2886,7 +2915,8 @@ mod tests { &mut unprocessed_packets, new_packets.clone(), packet_indexes.clone(), - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, &duplicates, @@ -2894,15 +2924,17 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); + assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); - // Check duplicates are dropped + // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, new_packets.clone(), packet_indexes, - &mut dropped_batches_count, + &mut dropped_packet_batches_count, + &mut dropped_packets_count, &mut newly_buffered_packets_count, 3, &duplicates, @@ -2910,7 +2942,8 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); - assert_eq!(dropped_batches_count, 1); + assert_eq!(dropped_packet_batches_count, 1); + assert_eq!(dropped_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2); }