Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-events): Adds metrics subscriber #2335

Merged
merged 6 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 66 additions & 9 deletions dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,73 @@ mod traits {
}
}
}
pub mod metrics {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use s2n_quic_core::event::metrics::Recorder;
#[derive(Clone, Debug)]
pub struct Subscriber<S: super::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: super::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
frame_sent: AtomicU32,
}
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
frame_sent: AtomicU32::new(0),
}
}
#[inline]
fn on_frame_sent(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::FrameSent,
) {
context.frame_sent.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_frame_sent(&mut context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder
.increment_counter("frame_sent", self.frame_sent.load(Ordering::Relaxed) as _);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub mod testing {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Subscriber {
location: Option<Location>,
output: Arc<Mutex<Vec<String>>>,
pub frame_sent: Arc<AtomicU32>,
output: Mutex<Vec<String>>,
pub frame_sent: AtomicU32,
}
impl Drop for Subscriber {
fn drop(&mut self) {
Expand All @@ -410,7 +467,7 @@ pub mod testing {
Self {
location: None,
output: Default::default(),
frame_sent: Arc::new(AtomicU32::new(0)),
frame_sent: AtomicU32::new(0),
}
}
}
Expand All @@ -428,7 +485,7 @@ pub mod testing {
meta: &api::ConnectionMeta,
event: &api::FrameSent,
) {
self.frame_sent.fetch_add(1, Ordering::SeqCst);
self.frame_sent.fetch_add(1, Ordering::Relaxed);
if self.location.is_some() {
self.output
.lock()
Expand All @@ -440,8 +497,8 @@ pub mod testing {
#[derive(Clone, Debug)]
pub struct Publisher {
location: Option<Location>,
output: Arc<Mutex<Vec<String>>>,
pub frame_sent: Arc<AtomicU32>,
output: Mutex<Vec<String>>,
pub frame_sent: AtomicU32,
}
impl Publisher {
#[doc = r" Creates a publisher with snapshot assertions enabled"]
Expand All @@ -456,7 +513,7 @@ pub mod testing {
Self {
location: None,
output: Default::default(),
frame_sent: Arc::new(AtomicU32::new(0)),
frame_sent: AtomicU32::new(0),
}
}
}
Expand All @@ -467,7 +524,7 @@ pub mod testing {
}
impl super::ConnectionPublisher for Publisher {
fn on_frame_sent(&self, event: builder::FrameSent) {
self.frame_sent.fetch_add(1, Ordering::SeqCst);
self.frame_sent.fetch_add(1, Ordering::Relaxed);
let event = event.into_event();
if self.location.is_some() {
self.output.lock().unwrap().push(format!("{event:?}"));
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{connection, endpoint};
use core::{ops::RangeInclusive, time::Duration};

mod generated;
pub mod metrics;
pub use generated::*;

/// All event types which can be emitted from this library.
Expand Down
Loading
Loading