Skip to content

Commit

Permalink
Different staked vs unstaked chunks_received (#27033)
Browse files Browse the repository at this point in the history
* Different staked vs unstaked chunks_received

* Suppress a clippy warning
  • Loading branch information
lijunwangs authored Aug 10, 2022
1 parent 52d8a20 commit ddd660e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
18 changes: 18 additions & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ fn handle_and_cache_new_connection(
timing::timestamp(),
params.max_connections_per_peer,
) {
let peer_type = connection_table_l.peer_type;
drop(connection_table_l);
tokio::spawn(handle_connection(
uni_streams,
Expand All @@ -286,6 +287,7 @@ fn handle_and_cache_new_connection(
stream_exit,
params.stats.clone(),
params.stake,
peer_type,
));
Ok(())
} else {
Expand Down Expand Up @@ -478,6 +480,7 @@ async fn setup_connection(
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_connection(
mut uni_streams: IncomingUniStreams,
packet_sender: Sender<PacketBatch>,
Expand All @@ -488,6 +491,7 @@ async fn handle_connection(
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
stake: u64,
peer_type: ConnectionPeerType,
) {
debug!(
"quic new connection {} streams: {} connections: {}",
Expand Down Expand Up @@ -527,6 +531,7 @@ async fn handle_connection(
&packet_sender,
stats.clone(),
stake,
peer_type,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
Expand Down Expand Up @@ -574,6 +579,7 @@ fn handle_chunk(
packet_sender: &Sender<PacketBatch>,
stats: Arc<StreamStats>,
stake: u64,
peer_type: ConnectionPeerType,
) -> bool {
match chunk {
Ok(maybe_chunk) => {
Expand Down Expand Up @@ -620,6 +626,18 @@ fn handle_chunk(
.copy_from_slice(&chunk.bytes);
batch[0].meta.size = std::cmp::max(batch[0].meta.size, end_of_chunk);
stats.total_chunks_received.fetch_add(1, Ordering::Relaxed);
match peer_type {
ConnectionPeerType::Staked => {
stats
.total_staked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Unstaked => {
stats
.total_unstaked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
}
}
} else {
trace!("chunk is none");
Expand Down
13 changes: 13 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub struct StreamStats {
pub(crate) total_invalid_chunk_size: AtomicUsize,
pub(crate) total_packets_allocated: AtomicUsize,
pub(crate) total_chunks_received: AtomicUsize,
pub(crate) total_staked_chunks_received: AtomicUsize,
pub(crate) total_unstaked_chunks_received: AtomicUsize,
pub(crate) total_packet_batch_send_err: AtomicUsize,
pub(crate) total_packet_batches_sent: AtomicUsize,
pub(crate) total_packet_batches_none: AtomicUsize,
Expand Down Expand Up @@ -252,6 +254,17 @@ impl StreamStats {
self.total_chunks_received.swap(0, Ordering::Relaxed),
i64
),
(
"staked_chunks_received",
self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
i64
),
(
"unstaked_chunks_received",
self.total_unstaked_chunks_received
.swap(0, Ordering::Relaxed),
i64
),
(
"packet_batch_send_error",
self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
Expand Down

0 comments on commit ddd660e

Please sign in to comment.