diff --git a/connection-cache/src/client_connection.rs b/connection-cache/src/client_connection.rs index a58f33d7304d26..6579157b79df9c 100644 --- a/connection-cache/src/client_connection.rs +++ b/connection-cache/src/client_connection.rs @@ -19,6 +19,15 @@ pub struct ClientStats { pub acks: MovingStat, pub make_connection_ms: AtomicU64, pub send_timeout: AtomicU64, + /// The time spent sending packets when packets are successfully sent. This include both time + /// preparing for a connection (either obtaining from cache or create a new one in case of cache miss + /// or connection error) + pub send_packets_us: AtomicU64, + /// `prepare_connection_us` differs from `make_connection_ms` in that it accounts for the time spent + /// on obtaining a successful connection including time spent on retries when sending a packet. + pub prepare_connection_us: AtomicU64, + /// Count of packets successfully sent + pub successful_packets: AtomicU64, } pub trait ClientConnection: Sync + Send { diff --git a/connection-cache/src/connection_cache_stats.rs b/connection-cache/src/connection_cache_stats.rs index e6d7d4cabe69ee..1bf50983f7aea0 100644 --- a/connection-cache/src/connection_cache_stats.rs +++ b/connection-cache/src/connection_cache_stats.rs @@ -60,6 +60,18 @@ impl ConnectionCacheStats { client_stats.send_timeout.load(Ordering::Relaxed), Ordering::Relaxed, ); + self.total_client_stats.send_packets_us.fetch_add( + client_stats.send_packets_us.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + self.total_client_stats.successful_packets.fetch_add( + client_stats.successful_packets.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + self.total_client_stats.prepare_connection_us.fetch_add( + client_stats.prepare_connection_us.load(Ordering::Relaxed), + Ordering::Relaxed, + ); self.sent_packets .fetch_add(num_packets as u64, Ordering::Relaxed); self.total_batches.fetch_add(1, Ordering::Relaxed); @@ -71,6 +83,26 @@ impl ConnectionCacheStats { } pub(super) fn report(&self, name: &'static str) { + let successful_packets = self + .total_client_stats + .successful_packets + .swap(0, Ordering::Relaxed); + + let (average_send_packet_us, average_prepare_connection_us) = if successful_packets > 0 { + ( + self.total_client_stats + .send_packets_us + .swap(0, Ordering::Relaxed) + / successful_packets, + self.total_client_stats + .prepare_connection_us + .swap(0, Ordering::Relaxed) + / successful_packets, + ) + } else { + (0, 0) + }; + datapoint_info!( name, ( @@ -193,6 +225,13 @@ impl ConnectionCacheStats { .swap(0, Ordering::Relaxed), i64 ), + ("average_send_packet_us", average_send_packet_us, i64), + ("successful_packets", successful_packets, i64), + ( + "average_prepare_connection_us", + average_prepare_connection_us, + i64 + ), ); } } diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 53be5966817150..1fe951cc7f0d9b 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -288,10 +288,11 @@ impl QuicClient { stats: &ClientStats, connection_stats: Arc, ) -> Result, QuicError> { + let mut measure_send_packet = Measure::start("send_packet_us"); + let mut measure_prepare_connection = Measure::start("prepare_connection"); let mut connection_try_count = 0; let mut last_connection_id = 0; let mut last_error = None; - while connection_try_count < 2 { let connection = { let mut conn_guard = self.connection.lock().await; @@ -390,8 +391,28 @@ impl QuicClient { } last_connection_id = connection.stable_id(); + measure_prepare_connection.stop(); + match Self::_send_buffer_using_conn(data, &connection).await { Ok(()) => { + measure_send_packet.stop(); + stats.successful_packets.fetch_add(1, Ordering::Relaxed); + stats + .send_packets_us + .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed); + stats + .prepare_connection_us + .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed); + trace!( + "Succcessfully sent to {} with id {}, thread: {:?}, data len: {}, send_packet_us: {} prepare_connection_us: {}", + self.addr, + connection.stable_id(), + thread::current().id(), + data.len(), + measure_send_packet.as_us(), + measure_prepare_connection.as_us(), + ); + return Ok(connection); } Err(err) => match err {