From 7d4510b54c36dc26bb885f59d32ecf6b1add58c0 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 26 Sep 2024 16:57:05 -0700 Subject: [PATCH 1/6] Adds metrics Subscriber --- dc/s2n-quic-dc/src/event/generated.rs | 72 ++- quic/s2n-quic-core/src/event.rs | 1 + quic/s2n-quic-core/src/event/generated.rs | 567 ++++++++++++++++++++++ quic/s2n-quic-core/src/event/metrics.rs | 11 + quic/s2n-quic-events/src/main.rs | 77 ++- quic/s2n-quic-events/src/parser.rs | 20 + 6 files changed, 734 insertions(+), 14 deletions(-) create mode 100644 quic/s2n-quic-core/src/event/metrics.rs diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 7639587eea..a386a27dd7 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -376,16 +376,70 @@ mod traits { } } } +pub mod metrics { + use super::*; + use core::sync::atomic::{AtomicU32, Ordering}; + use s2n_quic_core::event::metrics::Recorder; + use std::sync::Mutex; + #[derive(Clone, Debug)] + pub struct Subscriber { + provider: P, + } + impl Subscriber

{ + pub fn new(provider: P) -> Self { + Self { provider } + } + } + pub struct Context { + recorder: R, + pub frame_sent: AtomicU32, + } + pub trait Provider: 'static + Send + Sync { + type Recorder: Recorder; + fn recorder( + &self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::Recorder; + } + impl super::Subscriber for Subscriber

{ + type ConnectionContext = Context; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + Context { + recorder: self.provider.recorder(meta, info), + frame_sent: AtomicU32::new(0), + } + } + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::FrameSent, + ) { + context.frame_sent.fetch_add(1, Ordering::Relaxed); + } + } + impl Drop for Context { + fn drop(&mut self) { + self.recorder + .increment_counter("frame_sent", self.frame_sent.load(Ordering::Relaxed) as _); + } + } +} #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::Mutex; #[derive(Clone, Debug)] pub struct Subscriber { location: Option, - output: Arc>>, - pub frame_sent: Arc, + output: Mutex>, + pub frame_sent: AtomicU32, } impl Drop for Subscriber { fn drop(&mut self) { @@ -410,7 +464,7 @@ pub mod testing { Self { location: None, output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), + frame_sent: AtomicU32::new(0), } } } @@ -428,7 +482,7 @@ pub mod testing { meta: &api::ConnectionMeta, event: &api::FrameSent, ) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); + self.frame_sent.fetch_add(1, Ordering::Relaxed); if self.location.is_some() { self.output .lock() @@ -440,8 +494,8 @@ pub mod testing { #[derive(Clone, Debug)] pub struct Publisher { location: Option, - output: Arc>>, - pub frame_sent: Arc, + output: Mutex>, + pub frame_sent: AtomicU32, } impl Publisher { #[doc = r" Creates a publisher with snapshot assertions enabled"] @@ -456,7 +510,7 @@ pub mod testing { Self { location: None, output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), + frame_sent: AtomicU32::new(0), } } } @@ -467,7 +521,7 @@ pub mod testing { } impl super::ConnectionPublisher for Publisher { fn on_frame_sent(&self, event: builder::FrameSent) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); + self.frame_sent.fetch_add(1, Ordering::Relaxed); let event = event.into_event(); if self.location.is_some() { self.output.lock().unwrap().push(format!("{event:?}")); diff --git a/quic/s2n-quic-core/src/event.rs b/quic/s2n-quic-core/src/event.rs index 18feaf2110..0d193c4e5f 100644 --- a/quic/s2n-quic-core/src/event.rs +++ b/quic/s2n-quic-core/src/event.rs @@ -5,6 +5,7 @@ use crate::{connection, endpoint}; use core::{ops::RangeInclusive, time::Duration}; mod generated; +pub mod metrics; pub use generated::*; /// All event types which can be emitted from this library. diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 4463609f3a..856db2f9bf 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -6834,6 +6834,573 @@ mod traits { } } } +pub mod metrics { + use super::*; + use crate::event::metrics::Recorder; + #[derive(Clone, Debug)] + pub struct Subscriber { + provider: P, + } + impl Subscriber

{ + pub fn new(provider: P) -> Self { + Self { provider } + } + } + pub struct Context { + recorder: R, + pub application_protocol_information: u32, + pub server_name_information: u32, + pub packet_skipped: u32, + pub packet_sent: u32, + pub packet_received: u32, + pub active_path_updated: u32, + pub path_created: u32, + pub frame_sent: u32, + pub frame_received: u32, + pub packet_lost: u32, + pub recovery_metrics: u32, + pub congestion: u32, + pub ack_processed: u32, + pub rx_ack_range_dropped: u32, + pub ack_range_received: u32, + pub ack_range_sent: u32, + pub packet_dropped: u32, + pub key_update: u32, + pub key_space_discarded: u32, + pub connection_started: u32, + pub connection_closed: u32, + pub duplicate_packet: u32, + pub transport_parameters_received: u32, + pub datagram_sent: u32, + pub datagram_received: u32, + pub datagram_dropped: u32, + pub connection_id_updated: u32, + pub ecn_state_changed: u32, + pub connection_migration_denied: u32, + pub handshake_status_updated: u32, + pub tls_exporter_ready: u32, + pub path_challenge_updated: u32, + pub tls_client_hello: u32, + pub tls_server_hello: u32, + pub rx_stream_progress: u32, + pub tx_stream_progress: u32, + pub keep_alive_timer_expired: u32, + pub mtu_updated: u32, + pub slow_start_exited: u32, + pub delivery_rate_sampled: u32, + pub pacing_rate_updated: u32, + pub bbr_state_changed: u32, + pub dc_state_changed: u32, + } + pub trait Provider: 'static + Send + Sync { + type Recorder: Recorder; + fn recorder( + &mut self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::Recorder; + } + impl super::Subscriber for Subscriber

{ + type ConnectionContext = Context; + fn create_connection_context( + &mut self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + Context { + recorder: self.provider.recorder(meta, info), + application_protocol_information: 0, + server_name_information: 0, + packet_skipped: 0, + packet_sent: 0, + packet_received: 0, + active_path_updated: 0, + path_created: 0, + frame_sent: 0, + frame_received: 0, + packet_lost: 0, + recovery_metrics: 0, + congestion: 0, + ack_processed: 0, + rx_ack_range_dropped: 0, + ack_range_received: 0, + ack_range_sent: 0, + packet_dropped: 0, + key_update: 0, + key_space_discarded: 0, + connection_started: 0, + connection_closed: 0, + duplicate_packet: 0, + transport_parameters_received: 0, + datagram_sent: 0, + datagram_received: 0, + datagram_dropped: 0, + connection_id_updated: 0, + ecn_state_changed: 0, + connection_migration_denied: 0, + handshake_status_updated: 0, + tls_exporter_ready: 0, + path_challenge_updated: 0, + tls_client_hello: 0, + tls_server_hello: 0, + rx_stream_progress: 0, + tx_stream_progress: 0, + keep_alive_timer_expired: 0, + mtu_updated: 0, + slow_start_exited: 0, + delivery_rate_sampled: 0, + pacing_rate_updated: 0, + bbr_state_changed: 0, + dc_state_changed: 0, + } + } + fn on_application_protocol_information( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ApplicationProtocolInformation, + ) { + context.application_protocol_information += 1; + } + fn on_server_name_information( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ServerNameInformation, + ) { + context.server_name_information += 1; + } + fn on_packet_skipped( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacketSkipped, + ) { + context.packet_skipped += 1; + } + fn on_packet_sent( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacketSent, + ) { + context.packet_sent += 1; + } + fn on_packet_received( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacketReceived, + ) { + context.packet_received += 1; + } + fn on_active_path_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ActivePathUpdated, + ) { + context.active_path_updated += 1; + } + fn on_path_created( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PathCreated, + ) { + context.path_created += 1; + } + fn on_frame_sent( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::FrameSent, + ) { + context.frame_sent += 1; + } + fn on_frame_received( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::FrameReceived, + ) { + context.frame_received += 1; + } + fn on_packet_lost( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacketLost, + ) { + context.packet_lost += 1; + } + fn on_recovery_metrics( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::RecoveryMetrics, + ) { + context.recovery_metrics += 1; + } + fn on_congestion( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::Congestion, + ) { + context.congestion += 1; + } + #[allow(deprecated)] + fn on_ack_processed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::AckProcessed, + ) { + context.ack_processed += 1; + } + fn on_rx_ack_range_dropped( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::RxAckRangeDropped, + ) { + context.rx_ack_range_dropped += 1; + } + fn on_ack_range_received( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::AckRangeReceived, + ) { + context.ack_range_received += 1; + } + fn on_ack_range_sent( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::AckRangeSent, + ) { + context.ack_range_sent += 1; + } + fn on_packet_dropped( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacketDropped, + ) { + context.packet_dropped += 1; + } + fn on_key_update( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::KeyUpdate, + ) { + context.key_update += 1; + } + fn on_key_space_discarded( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::KeySpaceDiscarded, + ) { + context.key_space_discarded += 1; + } + fn on_connection_started( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ConnectionStarted, + ) { + context.connection_started += 1; + } + fn on_connection_closed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ConnectionClosed, + ) { + context.connection_closed += 1; + } + fn on_duplicate_packet( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DuplicatePacket, + ) { + context.duplicate_packet += 1; + } + fn on_transport_parameters_received( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::TransportParametersReceived, + ) { + context.transport_parameters_received += 1; + } + fn on_datagram_sent( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DatagramSent, + ) { + context.datagram_sent += 1; + } + fn on_datagram_received( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DatagramReceived, + ) { + context.datagram_received += 1; + } + fn on_datagram_dropped( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DatagramDropped, + ) { + context.datagram_dropped += 1; + } + fn on_connection_id_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ConnectionIdUpdated, + ) { + context.connection_id_updated += 1; + } + fn on_ecn_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::EcnStateChanged, + ) { + context.ecn_state_changed += 1; + } + fn on_connection_migration_denied( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::ConnectionMigrationDenied, + ) { + context.connection_migration_denied += 1; + } + fn on_handshake_status_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::HandshakeStatusUpdated, + ) { + context.handshake_status_updated += 1; + } + fn on_tls_exporter_ready( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::TlsExporterReady, + ) { + context.tls_exporter_ready += 1; + } + fn on_path_challenge_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PathChallengeUpdated, + ) { + context.path_challenge_updated += 1; + } + fn on_tls_client_hello( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::TlsClientHello, + ) { + context.tls_client_hello += 1; + } + fn on_tls_server_hello( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::TlsServerHello, + ) { + context.tls_server_hello += 1; + } + fn on_rx_stream_progress( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::RxStreamProgress, + ) { + context.rx_stream_progress += 1; + } + fn on_tx_stream_progress( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::TxStreamProgress, + ) { + context.tx_stream_progress += 1; + } + fn on_keep_alive_timer_expired( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::KeepAliveTimerExpired, + ) { + context.keep_alive_timer_expired += 1; + } + fn on_mtu_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::MtuUpdated, + ) { + context.mtu_updated += 1; + } + fn on_slow_start_exited( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::SlowStartExited, + ) { + context.slow_start_exited += 1; + } + fn on_delivery_rate_sampled( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DeliveryRateSampled, + ) { + context.delivery_rate_sampled += 1; + } + fn on_pacing_rate_updated( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::PacingRateUpdated, + ) { + context.pacing_rate_updated += 1; + } + fn on_bbr_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::BbrStateChanged, + ) { + context.bbr_state_changed += 1; + } + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + _event: &api::DcStateChanged, + ) { + context.dc_state_changed += 1; + } + } + impl Drop for Context { + fn drop(&mut self) { + self.recorder.increment_counter( + "application_protocol_information", + self.application_protocol_information as _, + ); + self.recorder + .increment_counter("server_name_information", self.server_name_information as _); + self.recorder + .increment_counter("packet_skipped", self.packet_skipped as _); + self.recorder + .increment_counter("packet_sent", self.packet_sent as _); + self.recorder + .increment_counter("packet_received", self.packet_received as _); + self.recorder + .increment_counter("active_path_updated", self.active_path_updated as _); + self.recorder + .increment_counter("path_created", self.path_created as _); + self.recorder + .increment_counter("frame_sent", self.frame_sent as _); + self.recorder + .increment_counter("frame_received", self.frame_received as _); + self.recorder + .increment_counter("packet_lost", self.packet_lost as _); + self.recorder + .increment_counter("recovery_metrics", self.recovery_metrics as _); + self.recorder + .increment_counter("congestion", self.congestion as _); + self.recorder + .increment_counter("ack_processed", self.ack_processed as _); + self.recorder + .increment_counter("rx_ack_range_dropped", self.rx_ack_range_dropped as _); + self.recorder + .increment_counter("ack_range_received", self.ack_range_received as _); + self.recorder + .increment_counter("ack_range_sent", self.ack_range_sent as _); + self.recorder + .increment_counter("packet_dropped", self.packet_dropped as _); + self.recorder + .increment_counter("key_update", self.key_update as _); + self.recorder + .increment_counter("key_space_discarded", self.key_space_discarded as _); + self.recorder + .increment_counter("connection_started", self.connection_started as _); + self.recorder + .increment_counter("connection_closed", self.connection_closed as _); + self.recorder + .increment_counter("duplicate_packet", self.duplicate_packet as _); + self.recorder.increment_counter( + "transport_parameters_received", + self.transport_parameters_received as _, + ); + self.recorder + .increment_counter("datagram_sent", self.datagram_sent as _); + self.recorder + .increment_counter("datagram_received", self.datagram_received as _); + self.recorder + .increment_counter("datagram_dropped", self.datagram_dropped as _); + self.recorder + .increment_counter("connection_id_updated", self.connection_id_updated as _); + self.recorder + .increment_counter("ecn_state_changed", self.ecn_state_changed as _); + self.recorder.increment_counter( + "connection_migration_denied", + self.connection_migration_denied as _, + ); + self.recorder.increment_counter( + "handshake_status_updated", + self.handshake_status_updated as _, + ); + self.recorder + .increment_counter("tls_exporter_ready", self.tls_exporter_ready as _); + self.recorder + .increment_counter("path_challenge_updated", self.path_challenge_updated as _); + self.recorder + .increment_counter("tls_client_hello", self.tls_client_hello as _); + self.recorder + .increment_counter("tls_server_hello", self.tls_server_hello as _); + self.recorder + .increment_counter("rx_stream_progress", self.rx_stream_progress as _); + self.recorder + .increment_counter("tx_stream_progress", self.tx_stream_progress as _); + self.recorder.increment_counter( + "keep_alive_timer_expired", + self.keep_alive_timer_expired as _, + ); + self.recorder + .increment_counter("mtu_updated", self.mtu_updated as _); + self.recorder + .increment_counter("slow_start_exited", self.slow_start_exited as _); + self.recorder + .increment_counter("delivery_rate_sampled", self.delivery_rate_sampled as _); + self.recorder + .increment_counter("pacing_rate_updated", self.pacing_rate_updated as _); + self.recorder + .increment_counter("bbr_state_changed", self.bbr_state_changed as _); + self.recorder + .increment_counter("dc_state_changed", self.dc_state_changed as _); + } + } +} #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; diff --git a/quic/s2n-quic-core/src/event/metrics.rs b/quic/s2n-quic-core/src/event/metrics.rs new file mode 100644 index 0000000000..6ca03016d3 --- /dev/null +++ b/quic/s2n-quic-core/src/event/metrics.rs @@ -0,0 +1,11 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/// A Recorder should arrange to emit the properties and counters on Drop into some output. +pub trait Recorder: 'static + Send + Sync { + /// Registers a counter with the recorder instance + fn increment_counter(&self, name: &str, amount: usize); + + /// Associates a key/value pair with the recorder instance + fn set_value(&self, key: &str, value: V); +} diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 97387e2167..ef1c3d9f06 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -25,25 +25,32 @@ impl OutputMode { } fn counter_type(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc), + OutputMode::Ref => quote!(AtomicU32), OutputMode::Mut => quote!(u32), } } fn counter_init(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc::new(AtomicU32::new(0))), + OutputMode::Ref => quote!(AtomicU32::new(0)), OutputMode::Mut => quote!(0), } } fn counter_increment(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(.fetch_add(1, Ordering::SeqCst)), + OutputMode::Ref => quote!(.fetch_add(1, Ordering::Relaxed)), OutputMode::Mut => quote!(+= 1), } } + fn counter_load(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(.load(Ordering::Relaxed)), + OutputMode::Mut => quote!(), + } + } + fn lock(&self) -> TokenStream { match self { OutputMode::Ref => quote!(.lock().unwrap()), @@ -55,7 +62,7 @@ impl OutputMode { match self { OutputMode::Ref => quote!( use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::Mutex; ), OutputMode::Mut => quote!(), } @@ -63,7 +70,7 @@ impl OutputMode { fn testing_output_type(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc>>), + OutputMode::Ref => quote!(Mutex>), OutputMode::Mut => quote!(Vec), } } @@ -298,8 +305,13 @@ struct Output { pub subscriber_testing: TokenStream, pub endpoint_publisher_testing: TokenStream, pub connection_publisher_testing: TokenStream, + pub metrics_fields: TokenStream, + pub metrics_fields_init: TokenStream, + pub metrics_record: TokenStream, + pub subscriber_metrics: TokenStream, pub extra: TokenStream, pub mode: OutputMode, + pub s2n_quic_core_path: TokenStream, } impl ToTokens for Output { @@ -319,8 +331,13 @@ impl ToTokens for Output { subscriber_testing, endpoint_publisher_testing, connection_publisher_testing, + metrics_fields, + metrics_fields_init, + metrics_record, + subscriber_metrics, extra, mode, + s2n_quic_core_path, } = self; let imports = self.mode.imports(); @@ -670,6 +687,52 @@ impl ToTokens for Output { } } + pub mod metrics { + use super::*; + #imports + use #s2n_quic_core_path::event::metrics::Recorder; + #[derive(Clone, Debug)] + pub struct Subscriber { + provider: P, + } + + impl Subscriber

{ + pub fn new(provider: P) -> Self { + Self { provider } + } + } + + pub struct Context { + recorder: R, + #metrics_fields + } + + pub trait Provider: 'static + Send + Sync { + type Recorder: Recorder; + + fn recorder(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::Recorder; + } + + impl super::Subscriber for Subscriber

{ + type ConnectionContext = Context; + + fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::ConnectionContext { + Context { + recorder: self.provider.recorder(meta, info), + #metrics_fields_init + } + } + + #subscriber_metrics + } + + impl Drop for Context { + fn drop(&mut self) { + #metrics_record + } + } + } + #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; @@ -844,6 +907,7 @@ struct EventInfo<'a> { input_path: &'a str, output_path: &'a str, output_mode: OutputMode, + s2n_quic_core_path: TokenStream, } fn main() -> Result<()> { @@ -858,6 +922,7 @@ fn main() -> Result<()> { "/../../dc/s2n-quic-dc/src/event/generated.rs" ), output_mode: OutputMode::Ref, + s2n_quic_core_path: quote!(s2n_quic_core), }, EventInfo { input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"), @@ -866,6 +931,7 @@ fn main() -> Result<()> { "/../s2n-quic-core/src/event/generated.rs" ), output_mode: OutputMode::Mut, + s2n_quic_core_path: quote!(crate), }, ]; @@ -880,6 +946,7 @@ fn main() -> Result<()> { let mut output = Output { mode: event_info.output_mode, + s2n_quic_core_path: event_info.s2n_quic_core_path, ..Default::default() }; diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index 948bee45b5..0088212d45 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -150,6 +150,7 @@ impl Struct { let counter_type = output.mode.counter_type(); let counter_init = output.mode.counter_init(); + let counter_load = output.mode.counter_load(); // add a counter for testing structs output.testing_fields.extend(quote!( @@ -282,6 +283,25 @@ impl Struct { } )); + // Metrics is only connection-level events + output.metrics_fields.extend(quote!( + pub #counter: #counter_type, + )); + output.metrics_fields_init.extend(quote!( + #counter: #counter_init, + )); + + output.metrics_record.extend(quote!( + self.recorder.increment_counter(#snake, self.#counter #counter_load as _); + )); + + output.subscriber_metrics.extend(quote!( + #allow_deprecated + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, _meta: &api::ConnectionMeta, _event: &api::#ident) { + context.#counter #counter_increment; + } + )); + output.subscriber_testing.extend(quote!( #allow_deprecated fn #function(&#receiver self, _context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { From 3a4fe2e793d32b8504715862f8f165e2054e5920 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 26 Sep 2024 17:06:50 -0700 Subject: [PATCH 2/6] fix dead code --- quic/s2n-quic-core/src/event/metrics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/quic/s2n-quic-core/src/event/metrics.rs b/quic/s2n-quic-core/src/event/metrics.rs index 6ca03016d3..92547d0c4d 100644 --- a/quic/s2n-quic-core/src/event/metrics.rs +++ b/quic/s2n-quic-core/src/event/metrics.rs @@ -1,6 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +pub use super::generated::*; + /// A Recorder should arrange to emit the properties and counters on Drop into some output. pub trait Recorder: 'static + Send + Sync { /// Registers a counter with the recorder instance From 2ca10b5c8fc280bdddd627153daa99b891789c5d Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 26 Sep 2024 17:09:35 -0700 Subject: [PATCH 3/6] rustfmt --- quic/s2n-quic-events/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index ef1c3d9f06..58a6626706 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -717,7 +717,7 @@ impl ToTokens for Output { type ConnectionContext = Context; fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::ConnectionContext { - Context { + Context { recorder: self.provider.recorder(meta, info), #metrics_fields_init } From b901fe5577c36bd8c580b151c4ea49789e563927 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 27 Sep 2024 16:53:28 -0700 Subject: [PATCH 4/6] PR advice --- dc/s2n-quic-dc/src/event/generated.rs | 43 ++- quic/s2n-quic-core/src/event/generated.rs | 424 ++++++++++++++-------- quic/s2n-quic-events/src/main.rs | 37 +- quic/s2n-quic-events/src/parser.rs | 6 +- 4 files changed, 326 insertions(+), 184 deletions(-) diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index a386a27dd7..f166a6c736 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -380,47 +380,50 @@ pub mod metrics { use super::*; use core::sync::atomic::{AtomicU32, Ordering}; use s2n_quic_core::event::metrics::Recorder; - use std::sync::Mutex; #[derive(Clone, Debug)] - pub struct Subscriber { - provider: P, + pub struct Subscriber + where + S::ConnectionContext: Recorder, + { + subscriber: S, } - impl Subscriber

{ - pub fn new(provider: P) -> Self { - Self { provider } + impl Subscriber + where + S::ConnectionContext: Recorder, + { + pub fn new(subscriber: S) -> Self { + Self { subscriber } } } pub struct Context { recorder: R, - pub frame_sent: AtomicU32, + frame_sent: AtomicU32, } - pub trait Provider: 'static + Send + Sync { - type Recorder: Recorder; - fn recorder( - &self, - meta: &api::ConnectionMeta, - info: &api::ConnectionInfo, - ) -> Self::Recorder; - } - impl super::Subscriber for Subscriber

{ - type ConnectionContext = Context; + impl super::Subscriber for Subscriber + where + S::ConnectionContext: Recorder, + { + type ConnectionContext = Context; fn create_connection_context( &self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo, ) -> Self::ConnectionContext { Context { - recorder: self.provider.recorder(meta, info), + recorder: self.subscriber.create_connection_context(meta, info), frame_sent: AtomicU32::new(0), } } + #[inline] fn on_frame_sent( &self, context: &Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::FrameSent, + meta: &api::ConnectionMeta, + event: &api::FrameSent, ) { context.frame_sent.fetch_add(1, Ordering::Relaxed); + self.subscriber + .on_frame_sent(&mut context.recorder, meta, event); } } impl Drop for Context { diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 856db2f9bf..16eab29367 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -1290,7 +1290,7 @@ pub mod api { } } macro_rules! impl_conn_id { - ($name:ident) => { + ($ name : ident) => { impl<'a> IntoEvent> for &'a crate::connection::id::$name { #[inline] fn into_event(self) -> builder::ConnectionId<'a> { @@ -6838,77 +6838,78 @@ pub mod metrics { use super::*; use crate::event::metrics::Recorder; #[derive(Clone, Debug)] - pub struct Subscriber { - provider: P, + pub struct Subscriber + where + S::ConnectionContext: Recorder, + { + subscriber: S, } - impl Subscriber

{ - pub fn new(provider: P) -> Self { - Self { provider } + impl Subscriber + where + S::ConnectionContext: Recorder, + { + pub fn new(subscriber: S) -> Self { + Self { subscriber } } } pub struct Context { recorder: R, - pub application_protocol_information: u32, - pub server_name_information: u32, - pub packet_skipped: u32, - pub packet_sent: u32, - pub packet_received: u32, - pub active_path_updated: u32, - pub path_created: u32, - pub frame_sent: u32, - pub frame_received: u32, - pub packet_lost: u32, - pub recovery_metrics: u32, - pub congestion: u32, - pub ack_processed: u32, - pub rx_ack_range_dropped: u32, - pub ack_range_received: u32, - pub ack_range_sent: u32, - pub packet_dropped: u32, - pub key_update: u32, - pub key_space_discarded: u32, - pub connection_started: u32, - pub connection_closed: u32, - pub duplicate_packet: u32, - pub transport_parameters_received: u32, - pub datagram_sent: u32, - pub datagram_received: u32, - pub datagram_dropped: u32, - pub connection_id_updated: u32, - pub ecn_state_changed: u32, - pub connection_migration_denied: u32, - pub handshake_status_updated: u32, - pub tls_exporter_ready: u32, - pub path_challenge_updated: u32, - pub tls_client_hello: u32, - pub tls_server_hello: u32, - pub rx_stream_progress: u32, - pub tx_stream_progress: u32, - pub keep_alive_timer_expired: u32, - pub mtu_updated: u32, - pub slow_start_exited: u32, - pub delivery_rate_sampled: u32, - pub pacing_rate_updated: u32, - pub bbr_state_changed: u32, - pub dc_state_changed: u32, - } - pub trait Provider: 'static + Send + Sync { - type Recorder: Recorder; - fn recorder( - &mut self, - meta: &api::ConnectionMeta, - info: &api::ConnectionInfo, - ) -> Self::Recorder; - } - impl super::Subscriber for Subscriber

{ - type ConnectionContext = Context; + application_protocol_information: u32, + server_name_information: u32, + packet_skipped: u32, + packet_sent: u32, + packet_received: u32, + active_path_updated: u32, + path_created: u32, + frame_sent: u32, + frame_received: u32, + packet_lost: u32, + recovery_metrics: u32, + congestion: u32, + ack_processed: u32, + rx_ack_range_dropped: u32, + ack_range_received: u32, + ack_range_sent: u32, + packet_dropped: u32, + key_update: u32, + key_space_discarded: u32, + connection_started: u32, + connection_closed: u32, + duplicate_packet: u32, + transport_parameters_received: u32, + datagram_sent: u32, + datagram_received: u32, + datagram_dropped: u32, + connection_id_updated: u32, + ecn_state_changed: u32, + connection_migration_denied: u32, + handshake_status_updated: u32, + tls_exporter_ready: u32, + path_challenge_updated: u32, + tls_client_hello: u32, + tls_server_hello: u32, + rx_stream_progress: u32, + tx_stream_progress: u32, + keep_alive_timer_expired: u32, + mtu_updated: u32, + slow_start_exited: u32, + delivery_rate_sampled: u32, + pacing_rate_updated: u32, + bbr_state_changed: u32, + dc_state_changed: u32, + } + impl super::Subscriber for Subscriber + where + S::ConnectionContext: Recorder, + { + type ConnectionContext = Context; fn create_connection_context( &mut self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo, ) -> Self::ConnectionContext { Context { - recorder: self.provider.recorder(meta, info), + recorder: self.subscriber.create_connection_context(meta, info), application_protocol_information: 0, server_name_information: 0, packet_skipped: 0, @@ -6954,350 +6955,479 @@ pub mod metrics { dc_state_changed: 0, } } + #[inline] fn on_application_protocol_information( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ApplicationProtocolInformation, + meta: &api::ConnectionMeta, + event: &api::ApplicationProtocolInformation, ) { context.application_protocol_information += 1; + self.subscriber + .on_application_protocol_information(&mut context.recorder, meta, event); } + #[inline] fn on_server_name_information( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ServerNameInformation, + meta: &api::ConnectionMeta, + event: &api::ServerNameInformation, ) { context.server_name_information += 1; + self.subscriber + .on_server_name_information(&mut context.recorder, meta, event); } + #[inline] fn on_packet_skipped( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacketSkipped, + meta: &api::ConnectionMeta, + event: &api::PacketSkipped, ) { context.packet_skipped += 1; + self.subscriber + .on_packet_skipped(&mut context.recorder, meta, event); } + #[inline] fn on_packet_sent( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacketSent, + meta: &api::ConnectionMeta, + event: &api::PacketSent, ) { context.packet_sent += 1; + self.subscriber + .on_packet_sent(&mut context.recorder, meta, event); } + #[inline] fn on_packet_received( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacketReceived, + meta: &api::ConnectionMeta, + event: &api::PacketReceived, ) { context.packet_received += 1; + self.subscriber + .on_packet_received(&mut context.recorder, meta, event); } + #[inline] fn on_active_path_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ActivePathUpdated, + meta: &api::ConnectionMeta, + event: &api::ActivePathUpdated, ) { context.active_path_updated += 1; + self.subscriber + .on_active_path_updated(&mut context.recorder, meta, event); } + #[inline] fn on_path_created( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PathCreated, + meta: &api::ConnectionMeta, + event: &api::PathCreated, ) { context.path_created += 1; + self.subscriber + .on_path_created(&mut context.recorder, meta, event); } + #[inline] fn on_frame_sent( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::FrameSent, + meta: &api::ConnectionMeta, + event: &api::FrameSent, ) { context.frame_sent += 1; + self.subscriber + .on_frame_sent(&mut context.recorder, meta, event); } + #[inline] fn on_frame_received( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::FrameReceived, + meta: &api::ConnectionMeta, + event: &api::FrameReceived, ) { context.frame_received += 1; + self.subscriber + .on_frame_received(&mut context.recorder, meta, event); } + #[inline] fn on_packet_lost( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacketLost, + meta: &api::ConnectionMeta, + event: &api::PacketLost, ) { context.packet_lost += 1; + self.subscriber + .on_packet_lost(&mut context.recorder, meta, event); } + #[inline] fn on_recovery_metrics( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::RecoveryMetrics, + meta: &api::ConnectionMeta, + event: &api::RecoveryMetrics, ) { context.recovery_metrics += 1; + self.subscriber + .on_recovery_metrics(&mut context.recorder, meta, event); } + #[inline] fn on_congestion( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::Congestion, + meta: &api::ConnectionMeta, + event: &api::Congestion, ) { context.congestion += 1; + self.subscriber + .on_congestion(&mut context.recorder, meta, event); } + #[inline] #[allow(deprecated)] fn on_ack_processed( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::AckProcessed, + meta: &api::ConnectionMeta, + event: &api::AckProcessed, ) { context.ack_processed += 1; + self.subscriber + .on_ack_processed(&mut context.recorder, meta, event); } + #[inline] fn on_rx_ack_range_dropped( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::RxAckRangeDropped, + meta: &api::ConnectionMeta, + event: &api::RxAckRangeDropped, ) { context.rx_ack_range_dropped += 1; + self.subscriber + .on_rx_ack_range_dropped(&mut context.recorder, meta, event); } + #[inline] fn on_ack_range_received( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::AckRangeReceived, + meta: &api::ConnectionMeta, + event: &api::AckRangeReceived, ) { context.ack_range_received += 1; + self.subscriber + .on_ack_range_received(&mut context.recorder, meta, event); } + #[inline] fn on_ack_range_sent( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::AckRangeSent, + meta: &api::ConnectionMeta, + event: &api::AckRangeSent, ) { context.ack_range_sent += 1; + self.subscriber + .on_ack_range_sent(&mut context.recorder, meta, event); } + #[inline] fn on_packet_dropped( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacketDropped, + meta: &api::ConnectionMeta, + event: &api::PacketDropped, ) { context.packet_dropped += 1; + self.subscriber + .on_packet_dropped(&mut context.recorder, meta, event); } + #[inline] fn on_key_update( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::KeyUpdate, + meta: &api::ConnectionMeta, + event: &api::KeyUpdate, ) { context.key_update += 1; + self.subscriber + .on_key_update(&mut context.recorder, meta, event); } + #[inline] fn on_key_space_discarded( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::KeySpaceDiscarded, + meta: &api::ConnectionMeta, + event: &api::KeySpaceDiscarded, ) { context.key_space_discarded += 1; + self.subscriber + .on_key_space_discarded(&mut context.recorder, meta, event); } + #[inline] fn on_connection_started( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ConnectionStarted, + meta: &api::ConnectionMeta, + event: &api::ConnectionStarted, ) { context.connection_started += 1; + self.subscriber + .on_connection_started(&mut context.recorder, meta, event); } + #[inline] fn on_connection_closed( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ConnectionClosed, + meta: &api::ConnectionMeta, + event: &api::ConnectionClosed, ) { context.connection_closed += 1; + self.subscriber + .on_connection_closed(&mut context.recorder, meta, event); } + #[inline] fn on_duplicate_packet( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DuplicatePacket, + meta: &api::ConnectionMeta, + event: &api::DuplicatePacket, ) { context.duplicate_packet += 1; + self.subscriber + .on_duplicate_packet(&mut context.recorder, meta, event); } + #[inline] fn on_transport_parameters_received( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::TransportParametersReceived, + meta: &api::ConnectionMeta, + event: &api::TransportParametersReceived, ) { context.transport_parameters_received += 1; + self.subscriber + .on_transport_parameters_received(&mut context.recorder, meta, event); } + #[inline] fn on_datagram_sent( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DatagramSent, + meta: &api::ConnectionMeta, + event: &api::DatagramSent, ) { context.datagram_sent += 1; + self.subscriber + .on_datagram_sent(&mut context.recorder, meta, event); } + #[inline] fn on_datagram_received( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DatagramReceived, + meta: &api::ConnectionMeta, + event: &api::DatagramReceived, ) { context.datagram_received += 1; + self.subscriber + .on_datagram_received(&mut context.recorder, meta, event); } + #[inline] fn on_datagram_dropped( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DatagramDropped, + meta: &api::ConnectionMeta, + event: &api::DatagramDropped, ) { context.datagram_dropped += 1; + self.subscriber + .on_datagram_dropped(&mut context.recorder, meta, event); } + #[inline] fn on_connection_id_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ConnectionIdUpdated, + meta: &api::ConnectionMeta, + event: &api::ConnectionIdUpdated, ) { context.connection_id_updated += 1; + self.subscriber + .on_connection_id_updated(&mut context.recorder, meta, event); } + #[inline] fn on_ecn_state_changed( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::EcnStateChanged, + meta: &api::ConnectionMeta, + event: &api::EcnStateChanged, ) { context.ecn_state_changed += 1; + self.subscriber + .on_ecn_state_changed(&mut context.recorder, meta, event); } + #[inline] fn on_connection_migration_denied( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::ConnectionMigrationDenied, + meta: &api::ConnectionMeta, + event: &api::ConnectionMigrationDenied, ) { context.connection_migration_denied += 1; + self.subscriber + .on_connection_migration_denied(&mut context.recorder, meta, event); } + #[inline] fn on_handshake_status_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::HandshakeStatusUpdated, + meta: &api::ConnectionMeta, + event: &api::HandshakeStatusUpdated, ) { context.handshake_status_updated += 1; + self.subscriber + .on_handshake_status_updated(&mut context.recorder, meta, event); } + #[inline] fn on_tls_exporter_ready( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::TlsExporterReady, + meta: &api::ConnectionMeta, + event: &api::TlsExporterReady, ) { context.tls_exporter_ready += 1; + self.subscriber + .on_tls_exporter_ready(&mut context.recorder, meta, event); } + #[inline] fn on_path_challenge_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PathChallengeUpdated, + meta: &api::ConnectionMeta, + event: &api::PathChallengeUpdated, ) { context.path_challenge_updated += 1; + self.subscriber + .on_path_challenge_updated(&mut context.recorder, meta, event); } + #[inline] fn on_tls_client_hello( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::TlsClientHello, + meta: &api::ConnectionMeta, + event: &api::TlsClientHello, ) { context.tls_client_hello += 1; + self.subscriber + .on_tls_client_hello(&mut context.recorder, meta, event); } + #[inline] fn on_tls_server_hello( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::TlsServerHello, + meta: &api::ConnectionMeta, + event: &api::TlsServerHello, ) { context.tls_server_hello += 1; + self.subscriber + .on_tls_server_hello(&mut context.recorder, meta, event); } + #[inline] fn on_rx_stream_progress( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::RxStreamProgress, + meta: &api::ConnectionMeta, + event: &api::RxStreamProgress, ) { context.rx_stream_progress += 1; + self.subscriber + .on_rx_stream_progress(&mut context.recorder, meta, event); } + #[inline] fn on_tx_stream_progress( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::TxStreamProgress, + meta: &api::ConnectionMeta, + event: &api::TxStreamProgress, ) { context.tx_stream_progress += 1; + self.subscriber + .on_tx_stream_progress(&mut context.recorder, meta, event); } + #[inline] fn on_keep_alive_timer_expired( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::KeepAliveTimerExpired, + meta: &api::ConnectionMeta, + event: &api::KeepAliveTimerExpired, ) { context.keep_alive_timer_expired += 1; + self.subscriber + .on_keep_alive_timer_expired(&mut context.recorder, meta, event); } + #[inline] fn on_mtu_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::MtuUpdated, + meta: &api::ConnectionMeta, + event: &api::MtuUpdated, ) { context.mtu_updated += 1; + self.subscriber + .on_mtu_updated(&mut context.recorder, meta, event); } + #[inline] fn on_slow_start_exited( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::SlowStartExited, + meta: &api::ConnectionMeta, + event: &api::SlowStartExited, ) { context.slow_start_exited += 1; + self.subscriber + .on_slow_start_exited(&mut context.recorder, meta, event); } + #[inline] fn on_delivery_rate_sampled( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DeliveryRateSampled, + meta: &api::ConnectionMeta, + event: &api::DeliveryRateSampled, ) { context.delivery_rate_sampled += 1; + self.subscriber + .on_delivery_rate_sampled(&mut context.recorder, meta, event); } + #[inline] fn on_pacing_rate_updated( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::PacingRateUpdated, + meta: &api::ConnectionMeta, + event: &api::PacingRateUpdated, ) { context.pacing_rate_updated += 1; + self.subscriber + .on_pacing_rate_updated(&mut context.recorder, meta, event); } + #[inline] fn on_bbr_state_changed( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::BbrStateChanged, + meta: &api::ConnectionMeta, + event: &api::BbrStateChanged, ) { context.bbr_state_changed += 1; + self.subscriber + .on_bbr_state_changed(&mut context.recorder, meta, event); } + #[inline] fn on_dc_state_changed( &mut self, context: &mut Self::ConnectionContext, - _meta: &api::ConnectionMeta, - _event: &api::DcStateChanged, + meta: &api::ConnectionMeta, + event: &api::DcStateChanged, ) { context.dc_state_changed += 1; + self.subscriber + .on_dc_state_changed(&mut context.recorder, meta, event); } } impl Drop for Context { diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 58a6626706..c4b5f85a89 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -62,12 +62,18 @@ impl OutputMode { match self { OutputMode::Ref => quote!( use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::Mutex; ), OutputMode::Mut => quote!(), } } + fn mutex(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(use std::sync::Mutex;), + OutputMode::Mut => quote!(), + } + } + fn testing_output_type(&self) -> TokenStream { match self { OutputMode::Ref => quote!(Mutex>), @@ -341,6 +347,7 @@ impl ToTokens for Output { } = self; let imports = self.mode.imports(); + let mutex = self.mode.mutex(); let testing_output_type = self.mode.testing_output_type(); let lock = self.mode.lock(); let target_crate = self.mode.target_crate(); @@ -691,14 +698,17 @@ impl ToTokens for Output { use super::*; #imports use #s2n_quic_core_path::event::metrics::Recorder; + #[derive(Clone, Debug)] - pub struct Subscriber { - provider: P, + pub struct Subscriber + where S::ConnectionContext: Recorder { + subscriber: S, } - impl Subscriber

{ - pub fn new(provider: P) -> Self { - Self { provider } + impl Subscriber + where S::ConnectionContext: Recorder { + pub fn new(subscriber: S) -> Self { + Self { subscriber } } } @@ -707,18 +717,13 @@ impl ToTokens for Output { #metrics_fields } - pub trait Provider: 'static + Send + Sync { - type Recorder: Recorder; - - fn recorder(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::Recorder; - } - - impl super::Subscriber for Subscriber

{ - type ConnectionContext = Context; + impl super::Subscriber for Subscriber + where S::ConnectionContext: Recorder { + type ConnectionContext = Context; fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::ConnectionContext { Context { - recorder: self.provider.recorder(meta, info), + recorder: self.subscriber.create_connection_context(meta, info), #metrics_fields_init } } @@ -737,6 +742,8 @@ impl ToTokens for Output { pub mod testing { use super::*; #imports + #mutex + #[derive(Clone, Debug)] pub struct Subscriber { location: Option, diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index 0088212d45..469c502671 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -285,7 +285,7 @@ impl Struct { // Metrics is only connection-level events output.metrics_fields.extend(quote!( - pub #counter: #counter_type, + #counter: #counter_type, )); output.metrics_fields_init.extend(quote!( #counter: #counter_init, @@ -296,9 +296,11 @@ impl Struct { )); output.subscriber_metrics.extend(quote!( + #[inline] #allow_deprecated - fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, _meta: &api::ConnectionMeta, _event: &api::#ident) { + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { context.#counter #counter_increment; + self.subscriber.#function(&mut context.recorder, meta, event); } )); From 8d25a588f3ac677db12661454cc695a85af4cfa1 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 27 Sep 2024 16:55:05 -0700 Subject: [PATCH 5/6] cargo fmt --- quic/s2n-quic-events/src/main.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index c4b5f85a89..94d00b9955 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -69,7 +69,9 @@ impl OutputMode { fn mutex(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(use std::sync::Mutex;), + OutputMode::Ref => quote!( + use std::sync::Mutex; + ), OutputMode::Mut => quote!(), } } @@ -700,7 +702,7 @@ impl ToTokens for Output { use #s2n_quic_core_path::event::metrics::Recorder; #[derive(Clone, Debug)] - pub struct Subscriber + pub struct Subscriber where S::ConnectionContext: Recorder { subscriber: S, } From 26925c680571672aa2c60eabd6fc8b95fb128720 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 27 Sep 2024 16:58:36 -0700 Subject: [PATCH 6/6] cargo fmt --- quic/s2n-quic-core/src/event/generated.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 16eab29367..6c3ebd8255 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -1290,7 +1290,7 @@ pub mod api { } } macro_rules! impl_conn_id { - ($ name : ident) => { + ($name:ident) => { impl<'a> IntoEvent> for &'a crate::connection::id::$name { #[inline] fn into_event(self) -> builder::ConnectionId<'a> {