From c84ba19d2688b7026a23027553f684ad3b10552f Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Fri, 22 Nov 2024 15:34:05 -0700 Subject: [PATCH] feat(s2n-quic-dc): implement cache events (#2386) --- dc/s2n-quic-dc/events/map.rs | 26 ++ dc/s2n-quic-dc/src/event/generated.rs | 268 ++++++++++++++++++ .../src/event/generated/metrics/aggregate.rs | 98 ++++++- .../src/event/generated/metrics/probe.rs | 21 ++ dc/s2n-quic-dc/src/path/secret/map.rs | 31 +- .../src/path/secret/map/event_tests.rs | 4 +- dc/s2n-quic-dc/src/path/secret/map/peer.rs | 37 +++ dc/s2n-quic-dc/src/path/secret/map/state.rs | 45 ++- .../src/path/secret/map/state/tests.rs | 2 +- dc/s2n-quic-dc/src/path/secret/map/store.rs | 14 +- dc/s2n-quic-dc/src/stream/client/tokio.rs | 39 +-- dc/s2n-quic-dc/src/stream/endpoint.rs | 22 +- dc/s2n-quic-dc/src/stream/testing.rs | 46 ++- 13 files changed, 536 insertions(+), 117 deletions(-) create mode 100644 dc/s2n-quic-dc/src/path/secret/map/peer.rs diff --git a/dc/s2n-quic-dc/events/map.rs b/dc/s2n-quic-dc/events/map.rs index b3fe15c81..24c3ae227 100644 --- a/dc/s2n-quic-dc/events/map.rs +++ b/dc/s2n-quic-dc/events/map.rs @@ -281,3 +281,29 @@ struct StaleKeyPacketDropped<'a> { #[snapshot("[HIDDEN]")] credential_id: &'a [u8], } + +#[event("path_secret_map:address_cache_accessed")] +#[subject(endpoint)] +/// Emitted when the cache is accessed by peer address +/// +/// This can be used to track cache hit ratios +struct PathSecretMapAddressCacheAccessed<'a> { + #[nominal_counter("peer_address.protocol")] + peer_address: SocketAddress<'a>, + + #[bool_counter("hit")] + hit: bool, +} + +#[event("path_secret_map:id_cache_accessed")] +#[subject(endpoint)] +/// Emitted when the cache is accessed by path secret ID +/// +/// This can be used to track cache hit ratios +struct PathSecretMapIdCacheAccessed<'a> { + #[snapshot("[HIDDEN]")] + credential_id: &'a [u8], + + #[bool_counter("hit")] + hit: bool, +} diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 920d9fe50..50527ccf0 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -1148,6 +1148,48 @@ pub mod api { impl<'a> Event for StaleKeyPacketDropped<'a> { const NAME: &'static str = "path_secret_map:stale_key_packet_dropped"; } + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Emitted when the cache is accessed by peer address"] + #[doc = ""] + #[doc = " This can be used to track cache hit ratios"] + pub struct PathSecretMapAddressCacheAccessed<'a> { + pub peer_address: SocketAddress<'a>, + pub hit: bool, + } + #[cfg(any(test, feature = "testing"))] + impl<'a> crate::event::snapshot::Fmt for PathSecretMapAddressCacheAccessed<'a> { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + let mut fmt = fmt.debug_struct("PathSecretMapAddressCacheAccessed"); + fmt.field("peer_address", &self.peer_address); + fmt.field("hit", &self.hit); + fmt.finish() + } + } + impl<'a> Event for PathSecretMapAddressCacheAccessed<'a> { + const NAME: &'static str = "path_secret_map:address_cache_accessed"; + } + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Emitted when the cache is accessed by path secret ID"] + #[doc = ""] + #[doc = " This can be used to track cache hit ratios"] + pub struct PathSecretMapIdCacheAccessed<'a> { + pub credential_id: &'a [u8], + pub hit: bool, + } + #[cfg(any(test, feature = "testing"))] + impl<'a> crate::event::snapshot::Fmt for PathSecretMapIdCacheAccessed<'a> { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + let mut fmt = fmt.debug_struct("PathSecretMapIdCacheAccessed"); + fmt.field("credential_id", &"[HIDDEN]"); + fmt.field("hit", &self.hit); + fmt.finish() + } + } + impl<'a> Event for PathSecretMapIdCacheAccessed<'a> { + const NAME: &'static str = "path_secret_map:id_cache_accessed"; + } impl IntoEvent for s2n_codec::DecoderError { fn into_event(self) -> builder::AcceptorPacketDropReason { use builder::AcceptorPacketDropReason as Reason; @@ -1787,6 +1829,26 @@ pub mod tracing { } = event; tracing :: event ! (target : "stale_key_packet_dropped" , parent : parent , tracing :: Level :: DEBUG , peer_address = tracing :: field :: debug (peer_address) , credential_id = tracing :: field :: debug (credential_id)); } + #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + let parent = self.parent(meta); + let api::PathSecretMapAddressCacheAccessed { peer_address, hit } = event; + tracing :: event ! (target : "path_secret_map_address_cache_accessed" , parent : parent , tracing :: Level :: DEBUG , peer_address = tracing :: field :: debug (peer_address) , hit = tracing :: field :: debug (hit)); + } + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + let parent = self.parent(meta); + let api::PathSecretMapIdCacheAccessed { credential_id, hit } = event; + tracing :: event ! (target : "path_secret_map_id_cache_accessed" , parent : parent , tracing :: Level :: DEBUG , credential_id = tracing :: field :: debug (credential_id) , hit = tracing :: field :: debug (hit)); + } } } pub mod builder { @@ -2895,6 +2957,44 @@ pub mod builder { } } } + #[derive(Clone, Debug)] + #[doc = " Emitted when the cache is accessed by peer address"] + #[doc = ""] + #[doc = " This can be used to track cache hit ratios"] + pub struct PathSecretMapAddressCacheAccessed<'a> { + pub peer_address: SocketAddress<'a>, + pub hit: bool, + } + impl<'a> IntoEvent> + for PathSecretMapAddressCacheAccessed<'a> + { + #[inline] + fn into_event(self) -> api::PathSecretMapAddressCacheAccessed<'a> { + let PathSecretMapAddressCacheAccessed { peer_address, hit } = self; + api::PathSecretMapAddressCacheAccessed { + peer_address: peer_address.into_event(), + hit: hit.into_event(), + } + } + } + #[derive(Clone, Debug)] + #[doc = " Emitted when the cache is accessed by path secret ID"] + #[doc = ""] + #[doc = " This can be used to track cache hit ratios"] + pub struct PathSecretMapIdCacheAccessed<'a> { + pub credential_id: &'a [u8], + pub hit: bool, + } + impl<'a> IntoEvent> for PathSecretMapIdCacheAccessed<'a> { + #[inline] + fn into_event(self) -> api::PathSecretMapIdCacheAccessed<'a> { + let PathSecretMapIdCacheAccessed { credential_id, hit } = self; + api::PathSecretMapIdCacheAccessed { + credential_id: credential_id.into_event(), + hit: hit.into_event(), + } + } + } } pub use traits::*; mod traits { @@ -3403,6 +3503,26 @@ mod traits { let _ = meta; let _ = event; } + #[doc = "Called when the `PathSecretMapAddressCacheAccessed` event is triggered"] + #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + let _ = meta; + let _ = event; + } + #[doc = "Called when the `PathSecretMapIdCacheAccessed` event is triggered"] + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + let _ = meta; + let _ = event; + } #[doc = r" Called for each event that relates to the endpoint and all connections"] #[inline] fn on_event(&self, meta: &M, event: &E) { @@ -3810,6 +3930,24 @@ mod traits { self.as_ref().on_stale_key_packet_dropped(meta, event); } #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + self.as_ref() + .on_path_secret_map_address_cache_accessed(meta, event); + } + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + self.as_ref() + .on_path_secret_map_id_cache_accessed(meta, event); + } + #[inline] fn on_event(&self, meta: &M, event: &E) { self.as_ref().on_event(meta, event); } @@ -4246,6 +4384,24 @@ mod traits { (self.1).on_stale_key_packet_dropped(meta, event); } #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + (self.0).on_path_secret_map_address_cache_accessed(meta, event); + (self.1).on_path_secret_map_address_cache_accessed(meta, event); + } + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + (self.0).on_path_secret_map_id_cache_accessed(meta, event); + (self.1).on_path_secret_map_id_cache_accessed(meta, event); + } + #[inline] fn on_event(&self, meta: &M, event: &E) { self.0.on_event(meta, event); self.1.on_event(meta, event); @@ -4379,6 +4535,16 @@ mod traits { fn on_stale_key_packet_rejected(&self, event: builder::StaleKeyPacketRejected); #[doc = "Publishes a `StaleKeyPacketDropped` event to the publisher's subscriber"] fn on_stale_key_packet_dropped(&self, event: builder::StaleKeyPacketDropped); + #[doc = "Publishes a `PathSecretMapAddressCacheAccessed` event to the publisher's subscriber"] + fn on_path_secret_map_address_cache_accessed( + &self, + event: builder::PathSecretMapAddressCacheAccessed, + ); + #[doc = "Publishes a `PathSecretMapIdCacheAccessed` event to the publisher's subscriber"] + fn on_path_secret_map_id_cache_accessed( + &self, + event: builder::PathSecretMapIdCacheAccessed, + ); #[doc = r" Returns the QUIC version, if any"] fn quic_version(&self) -> Option; } @@ -4726,6 +4892,26 @@ mod traits { self.subscriber.on_event(&self.meta, &event); } #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + event: builder::PathSecretMapAddressCacheAccessed, + ) { + let event = event.into_event(); + self.subscriber + .on_path_secret_map_address_cache_accessed(&self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + event: builder::PathSecretMapIdCacheAccessed, + ) { + let event = event.into_event(); + self.subscriber + .on_path_secret_map_id_cache_accessed(&self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] fn quic_version(&self) -> Option { self.quic_version } @@ -4853,6 +5039,8 @@ pub mod testing { pub stale_key_packet_accepted: AtomicU32, pub stale_key_packet_rejected: AtomicU32, pub stale_key_packet_dropped: AtomicU32, + pub path_secret_map_address_cache_accessed: AtomicU32, + pub path_secret_map_id_cache_accessed: AtomicU32, } impl Drop for Subscriber { fn drop(&mut self) { @@ -4927,6 +5115,8 @@ pub mod testing { stale_key_packet_accepted: AtomicU32::new(0), stale_key_packet_rejected: AtomicU32::new(0), stale_key_packet_dropped: AtomicU32::new(0), + path_secret_map_address_cache_accessed: AtomicU32::new(0), + path_secret_map_id_cache_accessed: AtomicU32::new(0), } } } @@ -5442,6 +5632,30 @@ pub mod testing { let out = format!("{meta:?} {event:?}"); self.output.lock().unwrap().push(out); } + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + self.path_secret_map_address_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + self.path_secret_map_id_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } } } #[derive(Debug)] @@ -5493,6 +5707,8 @@ pub mod testing { pub stale_key_packet_accepted: AtomicU32, pub stale_key_packet_rejected: AtomicU32, pub stale_key_packet_dropped: AtomicU32, + pub path_secret_map_address_cache_accessed: AtomicU32, + pub path_secret_map_id_cache_accessed: AtomicU32, } impl Drop for Subscriber { fn drop(&mut self) { @@ -5569,6 +5785,8 @@ pub mod testing { stale_key_packet_accepted: AtomicU32::new(0), stale_key_packet_rejected: AtomicU32::new(0), stale_key_packet_dropped: AtomicU32::new(0), + path_secret_map_address_cache_accessed: AtomicU32::new(0), + path_secret_map_id_cache_accessed: AtomicU32::new(0), } } } @@ -6112,6 +6330,30 @@ pub mod testing { let out = format!("{meta:?} {event:?}"); self.output.lock().unwrap().push(out); } + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + self.path_secret_map_address_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + self.path_secret_map_id_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } } #[derive(Debug)] pub struct Publisher { @@ -6162,6 +6404,8 @@ pub mod testing { pub stale_key_packet_accepted: AtomicU32, pub stale_key_packet_rejected: AtomicU32, pub stale_key_packet_dropped: AtomicU32, + pub path_secret_map_address_cache_accessed: AtomicU32, + pub path_secret_map_id_cache_accessed: AtomicU32, } impl Publisher { #[doc = r" Creates a publisher with snapshot assertions enabled"] @@ -6228,6 +6472,8 @@ pub mod testing { stale_key_packet_accepted: AtomicU32::new(0), stale_key_packet_rejected: AtomicU32::new(0), stale_key_packet_dropped: AtomicU32::new(0), + path_secret_map_address_cache_accessed: AtomicU32::new(0), + path_secret_map_id_cache_accessed: AtomicU32::new(0), } } } @@ -6589,6 +6835,28 @@ pub mod testing { let out = format!("{event:?}"); self.output.lock().unwrap().push(out); } + fn on_path_secret_map_address_cache_accessed( + &self, + event: builder::PathSecretMapAddressCacheAccessed, + ) { + self.path_secret_map_address_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let event = event.into_event(); + let event = crate::event::snapshot::Fmt::to_snapshot(&event); + let out = format!("{event:?}"); + self.output.lock().unwrap().push(out); + } + fn on_path_secret_map_id_cache_accessed( + &self, + event: builder::PathSecretMapIdCacheAccessed, + ) { + self.path_secret_map_id_cache_accessed + .fetch_add(1, Ordering::Relaxed); + let event = event.into_event(); + let event = crate::event::snapshot::Fmt::to_snapshot(&event); + let out = format!("{event:?}"); + self.output.lock().unwrap().push(out); + } fn quic_version(&self) -> Option { Some(1) } diff --git a/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs b/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs index 6a1835414..d81ffdba8 100644 --- a/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs +++ b/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs @@ -12,7 +12,7 @@ use crate::event::{ AsVariant, BoolRecorder, Info, Metric, NominalRecorder, Recorder, Registry, Units, }, }; -static INFO: &[Info; 110usize] = &[ +static INFO: &[Info; 115usize] = &[ info::Builder { id: 0usize, name: Str::new("acceptor_tcp_started\0"), @@ -673,6 +673,36 @@ static INFO: &[Info; 110usize] = &[ units: Units::None, } .build(), + info::Builder { + id: 110usize, + name: Str::new("path_secret_map_address_cache_accessed\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 111usize, + name: Str::new("path_secret_map_address_cache_accessed.peer_address.protocol\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 112usize, + name: Str::new("path_secret_map_address_cache_accessed.hit\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 113usize, + name: Str::new("path_secret_map_id_cache_accessed\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 114usize, + name: Str::new("path_secret_map_id_cache_accessed.hit\0"), + units: Units::None, + } + .build(), ]; #[derive(Clone, Copy, Debug)] #[allow(dead_code)] @@ -681,13 +711,13 @@ pub struct ConnectionContext { } pub struct Subscriber { #[allow(dead_code)] - counters: Box<[R::Counter; 47usize]>, + counters: Box<[R::Counter; 49usize]>, #[allow(dead_code)] - bool_counters: Box<[R::BoolCounter; 8usize]>, + bool_counters: Box<[R::BoolCounter; 10usize]>, #[allow(dead_code)] nominal_counters: Box<[R::NominalCounter]>, #[allow(dead_code)] - nominal_counter_offsets: Box<[usize; 25usize]>, + nominal_counter_offsets: Box<[usize; 26usize]>, #[allow(dead_code)] measures: Box<[R::Measure; 23usize]>, #[allow(dead_code)] @@ -716,10 +746,10 @@ impl Subscriber { #[allow(unused_mut)] #[inline] pub fn new(registry: R) -> Self { - let mut counters = Vec::with_capacity(47usize); - let mut bool_counters = Vec::with_capacity(8usize); - let mut nominal_counters = Vec::with_capacity(25usize); - let mut nominal_counter_offsets = Vec::with_capacity(25usize); + let mut counters = Vec::with_capacity(49usize); + let mut bool_counters = Vec::with_capacity(10usize); + let mut nominal_counters = Vec::with_capacity(26usize); + let mut nominal_counter_offsets = Vec::with_capacity(26usize); let mut measures = Vec::with_capacity(23usize); let mut gauges = Vec::with_capacity(0usize); let mut timers = Vec::with_capacity(7usize); @@ -772,6 +802,8 @@ impl Subscriber { counters.push(registry.register_counter(&INFO[104usize])); counters.push(registry.register_counter(&INFO[106usize])); counters.push(registry.register_counter(&INFO[108usize])); + counters.push(registry.register_counter(&INFO[110usize])); + counters.push(registry.register_counter(&INFO[113usize])); bool_counters.push(registry.register_bool_counter(&INFO[19usize])); bool_counters.push(registry.register_bool_counter(&INFO[20usize])); bool_counters.push(registry.register_bool_counter(&INFO[34usize])); @@ -780,6 +812,8 @@ impl Subscriber { bool_counters.push(registry.register_bool_counter(&INFO[37usize])); bool_counters.push(registry.register_bool_counter(&INFO[58usize])); bool_counters.push(registry.register_bool_counter(&INFO[59usize])); + bool_counters.push(registry.register_bool_counter(&INFO[112usize])); + bool_counters.push(registry.register_bool_counter(&INFO[114usize])); { #[allow(unused_imports)] use api::*; @@ -1058,6 +1092,17 @@ impl Subscriber { debug_assert_ne!(count, 0, "field type needs at least one variant"); nominal_counter_offsets.push(offset); } + { + let offset = nominal_counters.len(); + let mut count = 0; + for variant in ::VARIANTS.iter() { + nominal_counters + .push(registry.register_nominal_counter(&INFO[111usize], variant)); + count += 1; + } + debug_assert_ne!(count, 0, "field type needs at least one variant"); + nominal_counter_offsets.push(offset); + } } measures.push(registry.register_measure(&INFO[2usize])); measures.push(registry.register_measure(&INFO[3usize])); @@ -1170,6 +1215,8 @@ impl Subscriber { 44usize => (&INFO[104usize], entry), 45usize => (&INFO[106usize], entry), 46usize => (&INFO[108usize], entry), + 47usize => (&INFO[110usize], entry), + 48usize => (&INFO[113usize], entry), _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -1195,6 +1242,8 @@ impl Subscriber { 5usize => (&INFO[37usize], entry), 6usize => (&INFO[58usize], entry), 7usize => (&INFO[59usize], entry), + 8usize => (&INFO[112usize], entry), + 9usize => (&INFO[114usize], entry), _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -1365,6 +1414,12 @@ impl Subscriber { let entries = &self.nominal_counters[offset..offset + variants.len()]; (&INFO[109usize], entries, variants) } + 25usize => { + let offset = *entry; + let variants = ::VARIANTS; + let entries = &self.nominal_counters[offset..offset + variants.len()]; + (&INFO[111usize], entries, variants) + } _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -2059,4 +2114,31 @@ impl event::Subscriber for Subscriber { let _ = event; let _ = meta; } + #[inline] + fn on_path_secret_map_address_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapAddressCacheAccessed, + ) { + #[allow(unused_imports)] + use api::*; + self.count(110usize, 47usize, 1usize); + self.count_nominal(111usize, 25usize, &event.peer_address); + self.count_bool(112usize, 8usize, event.hit); + let _ = event; + let _ = meta; + } + #[inline] + fn on_path_secret_map_id_cache_accessed( + &self, + meta: &api::EndpointMeta, + event: &api::PathSecretMapIdCacheAccessed, + ) { + #[allow(unused_imports)] + use api::*; + self.count(113usize, 48usize, 1usize); + self.count_bool(114usize, 9usize, event.hit); + let _ = event; + let _ = meta; + } } diff --git a/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs b/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs index 07207c205..4a2ad236e 100644 --- a/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs +++ b/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs @@ -64,6 +64,8 @@ mod counter { 104usize => Self(stale_key_packet_accepted), 106usize => Self(stale_key_packet_rejected), 108usize => Self(stale_key_packet_dropped), + 110usize => Self(path_secret_map_address_cache_accessed), + 113usize => Self(path_secret_map_id_cache_accessed), _ => unreachable!("invalid info: {info:?}"), } } @@ -169,6 +171,10 @@ mod counter { fn stale_key_packet_rejected(value: u64); # [link_name = s2n_quic_dc__event__counter__stale_key_packet_dropped] fn stale_key_packet_dropped(value: u64); + # [link_name = s2n_quic_dc__event__counter__path_secret_map_address_cache_accessed] + fn path_secret_map_address_cache_accessed(value: u64); + # [link_name = s2n_quic_dc__event__counter__path_secret_map_id_cache_accessed] + fn path_secret_map_id_cache_accessed(value: u64); } ); pub mod bool { @@ -186,6 +192,8 @@ mod counter { 37usize => Self(acceptor_udp_packet_received__is_fin_known), 58usize => Self(endpoint_initialized__tcp), 59usize => Self(endpoint_initialized__udp), + 112usize => Self(path_secret_map_address_cache_accessed__hit), + 114usize => Self(path_secret_map_id_cache_accessed__hit), _ => unreachable!("invalid info: {info:?}"), } } @@ -213,6 +221,10 @@ mod counter { fn endpoint_initialized__tcp(value: bool); # [link_name = s2n_quic_dc__event__counter__bool__endpoint_initialized__udp] fn endpoint_initialized__udp(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__path_secret_map_address_cache_accessed__hit] + fn path_secret_map_address_cache_accessed__hit(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__path_secret_map_id_cache_accessed__hit] + fn path_secret_map_id_cache_accessed__hit(value: bool); } ); } @@ -251,6 +263,9 @@ mod counter { 105usize => Self(stale_key_packet_accepted__peer_address__protocol), 107usize => Self(stale_key_packet_rejected__peer_address__protocol), 109usize => Self(stale_key_packet_dropped__peer_address__protocol), + 111usize => { + Self(path_secret_map_address_cache_accessed__peer_address__protocol) + } _ => unreachable!("invalid info: {info:?}"), } } @@ -417,6 +432,12 @@ mod counter { variant: u64, variant_name: &info::Str, ); + # [link_name = s2n_quic_dc__event__counter__nominal__path_secret_map_address_cache_accessed__peer_address__protocol] + fn path_secret_map_address_cache_accessed__peer_address__protocol( + value: u64, + variant: u64, + variant_name: &info::Str, + ); } ); } diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index 19310e936..4ba7043e0 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -5,7 +5,7 @@ use crate::{ credentials::{Credentials, Id}, event, packet::{secret_control as control, Packet}, - path::secret::{open, seal, stateless_reset}, + path::secret::{open, seal, stateless_reset, HandshakeKind}, stream::TransportFeatures, }; use s2n_quic_core::{dc, time}; @@ -14,6 +14,7 @@ use std::{net::SocketAddr, sync::Arc}; mod cleaner; mod entry; mod handshake; +mod peer; mod size_of; mod state; mod status; @@ -29,6 +30,7 @@ use entry::Entry; use store::Store; pub use entry::{ApplicationPair, Bidirectional, ControlPair}; +pub use peer::Peer; pub(crate) use size_of::SizeOf; pub(crate) use status::Dedup; @@ -88,13 +90,13 @@ impl Map { self.store.contains(peer) } - pub fn seal_once( - &self, - peer: SocketAddr, - ) -> Option<(seal::Once, Credentials, dc::ApplicationParams)> { - let entry = self.store.get_by_addr(&peer)?; - let (sealer, credentials) = entry.uni_sealer(); - Some((sealer, credentials, entry.parameters())) + /// Gets the [`Peer`] entry for the given address + /// + /// NOTE: This function is used to track cache hit ratios so it + /// should only be used for connection attempts. + pub fn get_tracked(&self, peer: SocketAddr, handshake: HandshakeKind) -> Option { + let entry = self.store.get_by_addr_tracked(&peer, handshake)?; + Some(Peer::new(&entry, self)) } /// Retrieve a sealer by path secret ID. @@ -105,7 +107,7 @@ impl Map { /// Note that unlike by-IP lookup this should typically not be done significantly after the /// original secret was used for decryption. pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, dc::ApplicationParams)> { - let entry = self.store.get_by_id(&id)?; + let entry = self.store.get_by_id_tracked(&id)?; let (sealer, credentials) = entry.uni_sealer(); Some((sealer, credentials, entry.parameters())) } @@ -120,17 +122,6 @@ impl Map { Some(opener) } - pub fn pair_for_peer( - &self, - peer: SocketAddr, - features: &TransportFeatures, - ) -> Option<(entry::Bidirectional, dc::ApplicationParams)> { - let entry = self.store.get_by_addr(&peer)?; - let keys = entry.bidi_local(features); - - Some((keys, entry.parameters())) - } - pub fn pair_for_credentials( &self, credentials: &Credentials, diff --git a/dc/s2n-quic-dc/src/path/secret/map/event_tests.rs b/dc/s2n-quic-dc/src/path/secret/map/event_tests.rs index 78031ca94..1f0f76f03 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/event_tests.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/event_tests.rs @@ -70,8 +70,8 @@ fn control_packets() { }; } - let server_entry = server.store.get_by_id(&id).unwrap().clone(); - let client_entry = client.store.get_by_id(&id).unwrap().clone(); + let server_entry = server.store.get_by_id_untracked(&id).unwrap().clone(); + let client_entry = client.store.get_by_id_untracked(&id).unwrap().clone(); let fake_secret = crate::path::secret::seal::control::Secret::new(&[0; 32], &aws_lc_rs::hmac::HMAC_SHA256); diff --git a/dc/s2n-quic-dc/src/path/secret/map/peer.rs b/dc/s2n-quic-dc/src/path/secret/map/peer.rs new file mode 100644 index 000000000..f961c4b53 --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/peer.rs @@ -0,0 +1,37 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{dc, seal, Bidirectional, Credentials, Entry, Map, TransportFeatures}; +use std::sync::Arc; + +pub struct Peer { + entry: Arc, + map: Map, +} + +impl Peer { + pub(super) fn new(entry: &Arc, map: &Map) -> Self { + Self { + entry: entry.clone(), + map: map.clone(), + } + } + + #[inline] + pub fn seal_once(&self) -> (seal::Once, Credentials, dc::ApplicationParams) { + let (sealer, credentials) = self.entry.uni_sealer(); + (sealer, credentials, self.entry.parameters()) + } + + #[inline] + pub fn pair(&self, features: &TransportFeatures) -> (Bidirectional, dc::ApplicationParams) { + let keys = self.entry.bidi_local(features); + + (keys, self.entry.parameters()) + } + + #[inline] + pub fn map(&self) -> &Map { + &self.map + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/state.rs b/dc/s2n-quic-dc/src/path/secret/map/state.rs index 76070cc9c..92b30091a 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/state.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state.rs @@ -8,7 +8,7 @@ use crate::{ event::{self, EndpointPublisher as _, IntoEvent as _}, fixed_map::{self, ReadGuard}, packet::{secret_control as control, Packet}, - path::secret::receiver, + path::secret::{receiver, HandshakeKind}, }; use s2n_quic_core::{ inet::SocketAddress, @@ -168,7 +168,8 @@ where }, ); - let Some(entry) = self.get_by_id(packet.credential_id()) else { + // don't track access patterns here since it's not initiated by the local application + let Some(entry) = self.get_by_id_untracked(packet.credential_id()) else { self.subscriber().on_unknown_path_secret_packet_dropped( event::builder::UnknownPathSecretPacketDropped { credential_id: packet.credential_id().into_event(), @@ -407,14 +408,48 @@ where }); } - fn get_by_addr(&self, peer: &SocketAddr) -> Option>> { - self.peers.get_by_key(peer) + fn get_by_addr_tracked( + &self, + peer: &SocketAddr, + handshake: HandshakeKind, + ) -> Option>> { + let result = self.peers.get_by_key(peer)?; + + // If this is trying to use a cached handshake but we've got a request to do a handshake, then + // force the application to do a new handshake. This is consistent with the `contains` method. + if matches!(handshake, HandshakeKind::Cached) + && self.requested_handshakes.pin().contains(peer) + { + return None; + } + + self.subscriber().on_path_secret_map_address_cache_accessed( + event::builder::PathSecretMapAddressCacheAccessed { + peer_address: SocketAddress::from(*peer).into_event(), + hit: matches!(handshake, HandshakeKind::Cached), + }, + ); + + Some(result) } - fn get_by_id(&self, id: &Id) -> Option>> { + fn get_by_id_untracked(&self, id: &Id) -> Option>> { self.ids.get_by_key(id) } + fn get_by_id_tracked(&self, id: &Id) -> Option>> { + let result = self.ids.get_by_key(id); + + self.subscriber().on_path_secret_map_id_cache_accessed( + event::builder::PathSecretMapIdCacheAccessed { + credential_id: id.into_event(), + hit: result.is_some(), + }, + ); + + result + } + fn handle_control_packet(&self, packet: &control::Packet, peer: &SocketAddr) { match packet { control::Packet::StaleKey(packet) => self.handle_stale_key(packet, &(*peer).into()), diff --git a/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs b/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs index 5bc4a63a0..adf36059c 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs @@ -149,7 +149,7 @@ impl Model { self.invariants.retain(|invariant| { if let Invariant::ContainsId(id) = invariant { if state - .get_by_id(id) + .get_by_id_untracked(id) .map_or(true, |v| v.retired_at().is_some()) { invalidated.push(*id); diff --git a/dc/s2n-quic-dc/src/path/secret/map/store.rs b/dc/s2n-quic-dc/src/path/secret/map/store.rs index 80ca4b579..d8ac133d6 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/store.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/store.rs @@ -6,7 +6,7 @@ use crate::{ credentials::{Credentials, Id}, fixed_map::ReadGuard, packet::{secret_control as control, Packet, WireVersion}, - path::secret::{receiver, stateless_reset}, + path::secret::{receiver, stateless_reset, HandshakeKind}, }; use core::time::Duration; use s2n_codec::EncoderBuffer; @@ -33,9 +33,15 @@ pub trait Store: 'static + Send + Sync { self.on_handshake_complete(entry); } - fn get_by_addr(&self, peer: &SocketAddr) -> Option>>; + fn get_by_addr_tracked( + &self, + peer: &SocketAddr, + handshake: HandshakeKind, + ) -> Option>>; + + fn get_by_id_untracked(&self, id: &Id) -> Option>>; - fn get_by_id(&self, id: &Id) -> Option>>; + fn get_by_id_tracked(&self, id: &Id) -> Option>>; fn handle_unexpected_packet(&self, packet: &Packet, peer: &SocketAddr); @@ -70,7 +76,7 @@ pub trait Store: 'static + Send + Sync { identity: &Credentials, control_out: &mut Vec, ) -> Option> { - let Some(state) = self.get_by_id(&identity.id) else { + let Some(state) = self.get_by_id_tracked(&identity.id) else { let packet = control::UnknownPathSecret { wire_version: WireVersion::ZERO, credential_id: identity.id, diff --git a/dc/s2n-quic-dc/src/stream/client/tokio.rs b/dc/s2n-quic-dc/src/stream/client/tokio.rs index a0cdc1a73..1333ed5cd 100644 --- a/dc/s2n-quic-dc/src/stream/client/tokio.rs +++ b/dc/s2n-quic-dc/src/stream/client/tokio.rs @@ -16,25 +16,17 @@ use tokio::net::TcpStream; /// Connects using the UDP transport layer #[inline] pub async fn connect_udp( - handshake_addr: SocketAddr, handshake: H, acceptor_addr: SocketAddr, env: &Environment, - map: &secret::Map, ) -> io::Result where - H: core::future::Future>, + H: core::future::Future>, { // ensure we have a secret for the peer - handshake.await?; + let peer = handshake.await?; - let stream = endpoint::open_stream( - env, - handshake_addr.into(), - env::UdpUnbound(acceptor_addr.into()), - map, - None, - )?; + let stream = endpoint::open_stream(env, peer, env::UdpUnbound(acceptor_addr.into()), None)?; // build the stream inside the application context let mut stream = stream.build_without_event()?; @@ -49,25 +41,17 @@ where /// Connects using the TCP transport layer #[inline] pub async fn connect_tcp( - handshake_addr: SocketAddr, handshake: H, acceptor_addr: SocketAddr, env: &Environment, - map: &secret::Map, ) -> io::Result where - H: core::future::Future>, + H: core::future::Future>, { // Race TCP handshake with the TLS handshake - let (socket, _) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?; + let (socket, peer) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?; - let stream = endpoint::open_stream( - env, - handshake_addr.into(), - env::TcpRegistered(socket), - map, - None, - )?; + let stream = endpoint::open_stream(env, peer, env::TcpRegistered(socket), None)?; // build the stream inside the application context let mut stream = stream.build_without_event()?; @@ -86,18 +70,11 @@ where /// The provided `map` must contain a shared secret for the `handshake_addr` #[inline] pub async fn connect_tcp_with( - handshake_addr: SocketAddr, + peer: secret::map::Peer, stream: TcpStream, env: &Environment, - map: &secret::Map, ) -> io::Result { - let stream = endpoint::open_stream( - env, - handshake_addr.into(), - env::TcpRegistered(stream), - map, - None, - )?; + let stream = endpoint::open_stream(env, peer, env::TcpRegistered(stream), None)?; // build the stream inside the application context let mut stream = stream.build_without_event()?; diff --git a/dc/s2n-quic-dc/src/stream/endpoint.rs b/dc/s2n-quic-dc/src/stream/endpoint.rs index cc6229aed..0933d0a54 100644 --- a/dc/s2n-quic-dc/src/stream/endpoint.rs +++ b/dc/s2n-quic-dc/src/stream/endpoint.rs @@ -3,7 +3,7 @@ use crate::{ msg, packet, - path::secret::{self, Map}, + path::secret::{self, map, Map}, random::Random, stream::{ application, @@ -15,10 +15,7 @@ use crate::{ }; use core::cell::UnsafeCell; use s2n_quic_core::{ - dc, endpoint, - inet::{ExplicitCongestionNotification, SocketAddress}, - time::Clock as _, - varint::VarInt, + dc, endpoint, inet::ExplicitCongestionNotification, time::Clock as _, varint::VarInt, }; use std::{io, sync::Arc}; use tracing::{debug_span, Instrument as _}; @@ -34,24 +31,15 @@ pub struct AcceptError { #[inline] pub fn open_stream( env: &Env, - handshake_addr: SocketAddress, + entry: map::Peer, peer: P, - map: &Map, parameter_override: Option<&dyn Fn(dc::ApplicationParams) -> dc::ApplicationParams>, ) -> Result where Env: Environment, P: Peer, { - // derive secrets for the new stream - let Some((crypto, mut parameters)) = map.pair_for_peer(handshake_addr.into(), &peer.features()) - else { - // the application didn't perform a handshake with the server before opening the stream - return Err(io::Error::new( - io::ErrorKind::NotFound, - format!("missing credentials for server: {handshake_addr}"), - )); - }; + let (crypto, mut parameters) = entry.pair(&peer.features()); if let Some(o) = parameter_override { parameters = o(parameters); @@ -70,7 +58,7 @@ where stream_id, None, crypto, - map, + entry.map(), parameters, None, None, diff --git a/dc/s2n-quic-dc/src/stream/testing.rs b/dc/s2n-quic-dc/src/stream/testing.rs index cf0f90355..c4791f5b9 100644 --- a/dc/s2n-quic-dc/src/stream/testing.rs +++ b/dc/s2n-quic-dc/src/stream/testing.rs @@ -4,7 +4,7 @@ use super::{server::tokio::stats, socket::Protocol}; use crate::{ event, - path::secret, + path::secret::{self, HandshakeKind}, stream::{ application::Stream, client::tokio as client, @@ -35,16 +35,22 @@ impl Client { pub fn handshake_with>( &self, server: &S, - ) -> io::Result { + ) -> io::Result { let server = server.as_ref(); - if self.map.contains(server.local_addr) { - Ok(secret::HandshakeKind::Cached) - } else { - let local_addr = "127.0.0.1:1337".parse().unwrap(); - self.map - .test_insert_pair(local_addr, &server.map, server.local_addr); - Ok(secret::HandshakeKind::Fresh) + let peer = server.local_addr; + if let Some(peer) = self.map.get_tracked(peer, HandshakeKind::Cached) { + return Ok(peer); } + + let local_addr = "127.0.0.1:1337".parse().unwrap(); + self.map + .test_insert_pair(local_addr, &server.map, server.local_addr); + + self.map + .get_tracked(peer, HandshakeKind::Fresh) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::AddrNotAvailable, "path secret not available") + }) } pub async fn connect_to>(&self, server: &S) -> io::Result { @@ -52,26 +58,8 @@ impl Client { let handshake = async { self.handshake_with(server) }; match server.protocol { - Protocol::Tcp => { - client::connect_tcp( - server.local_addr, - handshake, - server.local_addr, - &self.env, - &self.map, - ) - .await - } - Protocol::Udp => { - client::connect_udp( - server.local_addr, - handshake, - server.local_addr, - &self.env, - &self.map, - ) - .await - } + Protocol::Tcp => client::connect_tcp(handshake, server.local_addr, &self.env).await, + Protocol::Udp => client::connect_udp(handshake, server.local_addr, &self.env).await, Protocol::Other(name) => { todo!("protocol {name:?} not implemented") }