Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add banking metrics for buffered and dropped packets (backport #19902) #19926

Merged
merged 1 commit into from
Sep 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 60 additions & 29 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ pub struct BankingStageStats {
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
dropped_batches_count: AtomicUsize,
dropped_packet_batches_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize,
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,

Expand Down Expand Up @@ -143,15 +145,26 @@ impl BankingStageStats {
i64
),
(
"dropped_batches_count",
self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64,
"dropped_packet_batches_count",
self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"dropped_packets_count",
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"current_buffered_packet_batches_count",
self.current_buffered_packet_batches_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"current_buffered_packets_count",
self.current_buffered_packets_count
Expand Down Expand Up @@ -1225,7 +1238,8 @@ impl BankingStage {
let mut new_tx_count = 0;

let mut mms_iter = mms.into_iter();
let mut dropped_batches_count = 0;
let mut dropped_packets_count = 0;
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
Expand All @@ -1235,7 +1249,8 @@ impl BankingStage {
buffered_packets,
msgs,
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packets_count,
&mut dropped_packet_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand Down Expand Up @@ -1264,7 +1279,8 @@ impl BankingStage {
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand All @@ -1290,7 +1306,8 @@ impl BankingStage {
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
Expand Down Expand Up @@ -1325,14 +1342,18 @@ impl BankingStage {
.new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed);
banking_stage_stats
.dropped_batches_count
.fetch_add(dropped_batches_count, Ordering::Relaxed);
.dropped_packet_batches_count
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);
banking_stage_stats
.newly_buffered_packets_count
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packets_count
.current_buffered_packet_batches_count
.swap(buffered_packets.len(), Ordering::Relaxed);
banking_stage_stats.current_buffered_packets_count.swap(
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
Ordering::Relaxed,
);
*recv_start = Instant::now();
Ok(())
}
Expand All @@ -1341,7 +1362,8 @@ impl BankingStage {
unprocessed_packets: &mut UnprocessedPackets,
packets: Packets,
mut packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize,
dropped_packet_batches_count: &mut usize,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
Expand All @@ -1368,8 +1390,10 @@ impl BankingStage {
}
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit {
*dropped_batches_count += 1;
unprocessed_packets.pop_front();
*dropped_packet_batches_count += 1;
if let Some(dropped_batch) = unprocessed_packets.pop_front() {
*dropped_packets_count += dropped_batch.1.len();
}
}
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packets.push_back((packets, packet_indexes, false));
Expand Down Expand Up @@ -2772,23 +2796,22 @@ mod tests {
#[test]
fn test_push_unprocessed_batch_limit() {
solana_logger::setup();
// Create `Packets` with 1 unprocessed element
let single_element_packets = Packets::new(vec![Packet::default()]);
// Create `Packets` with 2 unprocessed elements
let new_packets = Packets::new(vec![Packet::default(); 2]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets.clone(), vec![0], false)]
.into_iter()
.collect();
vec![(new_packets, vec![0, 1], false)].into_iter().collect();
// Set the limit to 2
let batch_limit = 2;
// Create some new unprocessed packets
let new_packets = single_element_packets;
let new_packets = Packets::new(vec![Packet::default()]);
let packet_indexes = vec![];

let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let mut dropped_batches_count = 0;
let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the
Expand All @@ -2797,14 +2820,16 @@ mod tests {
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0);
assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(dropped_packets_count, 0);
assert_eq!(newly_buffered_packets_count, 0);

// Because the set of unprocessed `packet_indexes` is non-empty, the
Expand All @@ -2814,14 +2839,16 @@ mod tests {
&mut unprocessed_packets,
new_packets,
packet_indexes.clone(),
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0);
assert_eq!(dropped_packet_batches_count, 0);
assert_eq!(dropped_packets_count, 0);
assert_eq!(newly_buffered_packets_count, 1);

// Because we've reached the batch limit, old unprocessed packets are
Expand All @@ -2836,31 +2863,35 @@ mod tests {
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes.clone(),
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2);

// Check duplicates are dropped
// Check duplicates are dropped (newly buffered shouldn't change)
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
3,
&duplicates,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2);
}

Expand Down