diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 2f8011afb08b35..7af959d1d288da 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -65,6 +65,7 @@ struct RetransmitStats { epoch_cache_update: AtomicU64, repair_total: AtomicU64, discard_total: AtomicU64, + duplicate_retransmit: AtomicU64, retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, @@ -81,6 +82,7 @@ fn update_retransmit_stats( retransmit_total: u64, discard_total: u64, repair_total: u64, + duplicate_retransmit: u64, compute_turbine_peers_total: u64, peers_len: usize, packets_by_slot: HashMap, @@ -102,6 +104,9 @@ fn update_retransmit_stats( stats .discard_total .fetch_add(discard_total, Ordering::Relaxed); + stats + .duplicate_retransmit + .fetch_add(duplicate_retransmit, Ordering::Relaxed); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); @@ -180,6 +185,11 @@ fn update_retransmit_stats( stats.discard_total.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "duplicate_retransmit", + stats.duplicate_retransmit.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); let old_packets_by_slot = std::mem::take(&mut *packets_by_slot); @@ -334,6 +344,7 @@ fn retransmit( let socket_addr_space = cluster_info.socket_addr_space(); let mut discard_total = 0; let mut repair_total = 0; + let mut duplicate_retransmit = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; let mut retransmit_tree_mismatch = 0; @@ -354,7 +365,11 @@ fn retransmit( } let shred_slot = match check_if_already_received(packet, shreds_received) { Some(slot) => slot, - None => continue, + None => { + total_packets -= 1; + duplicate_retransmit += 1; + continue; + } }; max_slot = max_slot.max(shred_slot); @@ -429,6 +444,7 @@ fn retransmit( retransmit_total, discard_total, repair_total, + duplicate_retransmit, compute_turbine_peers_total, peers_len, packets_by_slot,