From ddd660e2d39440094ade3396289c598c94bfcdd6 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Tue, 9 Aug 2022 17:39:14 -0700 Subject: [PATCH] Different staked vs unstaked chunks_received (#27033) * Different staked vs unstaked chunks_received * Suppress a clippy warning --- streamer/src/nonblocking/quic.rs | 18 ++++++++++++++++++ streamer/src/quic.rs | 13 +++++++++++++ 2 files changed, 31 insertions(+) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b1124ab3e3a041..05faa3f5075ca2 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -275,6 +275,7 @@ fn handle_and_cache_new_connection( timing::timestamp(), params.max_connections_per_peer, ) { + let peer_type = connection_table_l.peer_type; drop(connection_table_l); tokio::spawn(handle_connection( uni_streams, @@ -286,6 +287,7 @@ fn handle_and_cache_new_connection( stream_exit, params.stats.clone(), params.stake, + peer_type, )); Ok(()) } else { @@ -478,6 +480,7 @@ async fn setup_connection( } } +#[allow(clippy::too_many_arguments)] async fn handle_connection( mut uni_streams: IncomingUniStreams, packet_sender: Sender, @@ -488,6 +491,7 @@ async fn handle_connection( stream_exit: Arc, stats: Arc, stake: u64, + peer_type: ConnectionPeerType, ) { debug!( "quic new connection {} streams: {} connections: {}", @@ -527,6 +531,7 @@ async fn handle_connection( &packet_sender, stats.clone(), stake, + peer_type, ) { last_update.store(timing::timestamp(), Ordering::Relaxed); break; @@ -574,6 +579,7 @@ fn handle_chunk( packet_sender: &Sender, stats: Arc, stake: u64, + peer_type: ConnectionPeerType, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -620,6 +626,18 @@ fn handle_chunk( .copy_from_slice(&chunk.bytes); batch[0].meta.size = std::cmp::max(batch[0].meta.size, end_of_chunk); stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); + match peer_type { + ConnectionPeerType::Staked => { + stats + .total_staked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Unstaked => { + stats + .total_unstaked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } + } } } else { trace!("chunk is none"); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index c350fc2430e01b..41379fa22330f3 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -123,6 +123,8 @@ pub struct StreamStats { pub(crate) total_invalid_chunk_size: AtomicUsize, pub(crate) total_packets_allocated: AtomicUsize, pub(crate) total_chunks_received: AtomicUsize, + pub(crate) total_staked_chunks_received: AtomicUsize, + pub(crate) total_unstaked_chunks_received: AtomicUsize, pub(crate) total_packet_batch_send_err: AtomicUsize, pub(crate) total_packet_batches_sent: AtomicUsize, pub(crate) total_packet_batches_none: AtomicUsize, @@ -252,6 +254,17 @@ impl StreamStats { self.total_chunks_received.swap(0, Ordering::Relaxed), i64 ), + ( + "staked_chunks_received", + self.total_staked_chunks_received.swap(0, Ordering::Relaxed), + i64 + ), + ( + "unstaked_chunks_received", + self.total_unstaked_chunks_received + .swap(0, Ordering::Relaxed), + i64 + ), ( "packet_batch_send_error", self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),