diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index 02fdfb7fd1..11c4202e64 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -25,7 +25,7 @@ use std::{ atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, - time::Duration, + time::{Duration, Instant}, }; use zeroize::Zeroizing; @@ -71,6 +71,8 @@ pub(super) struct State { // This is in number of entries. max_capacity: usize, + rehandshake_period: Duration, + // peers is the most recent entry originating from a locally *or* remote initiated handshake. // // Handshakes use s2n-quic and the SocketAddr is the address of the handshake socket. Since @@ -83,13 +85,6 @@ pub(super) struct State { // needed. pub(super) peers: flurry::HashMap>, - // This is used for deduplicating outgoing handshakes. We manage this here as it's a - // property required for correctness (see comment on the struct). - // - // FIXME: make use of this. - #[allow(unused)] - pub(super) ongoing_handshakes: flurry::HashMap, - // Stores the set of SocketAddr for which we received a UnknownPathSecret packet. // When handshake_with is called we will allow a new handshake if this contains a socket, this // is a temporary solution until we implement proper background handshaking. @@ -174,6 +169,8 @@ impl Cleaner { fn clean(&self, state: &State, eviction_cycles: u64) { let current_epoch = self.epoch.fetch_add(1, Ordering::Relaxed); + let now = Instant::now(); + // FIXME: Rather than just tracking one minimum, we might want to try to do some counting // as we iterate to have a higher likelihood of identifying 1% of peers falling into the // epoch we pick. Exactly how to do that without collecting a ~full distribution by epoch @@ -192,6 +189,13 @@ impl Cleaner { // Find the minimum non-retired epoch currently in the set. minimum = std::cmp::min(entry.used_at.load(Ordering::Relaxed), minimum); + // For non-retired entries, if it's time for them to handshake again, request a + // handshake to happen. This handshake will happen on the next request for this + // particular peer. + if entry.rehandshake_time() <= now { + state.requested_handshakes.pin().insert(entry.peer); + } + // Not retired. continue; } @@ -243,8 +247,9 @@ impl Map { let state = State { // This is around 500MB with current entry size. max_capacity: 500_000, + // FIXME: Allow configuring the rehandshake_period. + rehandshake_period: Duration::from_secs(3600 * 24), peers: Default::default(), - ongoing_handshakes: Default::default(), requested_handshakes: Default::default(), ids: Default::default(), cleaner: Cleaner::new(), @@ -498,6 +503,7 @@ impl Map { sender, receiver_shared.clone().new_receiver(), dc::testing::TEST_APPLICATION_PARAMS, + dc::testing::TEST_REHANDSHAKE_PERIOD, ); let entry = Arc::new(entry); provider.insert(entry); @@ -525,6 +531,7 @@ impl Map { sender, receiver, dc::testing::TEST_APPLICATION_PARAMS, + dc::testing::TEST_REHANDSHAKE_PERIOD, ); self.insert(Arc::new(entry)); } @@ -584,6 +591,8 @@ impl receiver::Error { #[derive(Debug)] pub(super) struct Entry { + creation_time: Instant, + rehandshake_delta_secs: u32, peer: SocketAddr, secret: schedule::Secret, retired: IsRetired, @@ -622,8 +631,15 @@ impl Entry { sender: sender::State, receiver: receiver::State, parameters: ApplicationParams, + rehandshake_time: Duration, ) -> Self { + assert!(rehandshake_time.as_secs() <= u32::MAX as u64); Self { + creation_time: Instant::now(), + // Schedule another handshake sometime in [5 minutes, rehandshake_time] from now. + rehandshake_delta_secs: rand::thread_rng().gen_range( + std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(), + ) as u32, peer, secret, retired: Default::default(), @@ -682,6 +698,10 @@ impl Entry { (sealer, opener) } + + fn rehandshake_time(&self) -> Instant { + self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs)) + } } pub struct Dedup { @@ -871,6 +891,7 @@ impl dc::Path for HandshakingPath { sender, receiver, self.parameters, + self.map.state.rehandshake_period, ); let entry = Arc::new(entry); self.map.insert(entry); diff --git a/dc/s2n-quic-dc/src/path/secret/map/test.rs b/dc/s2n-quic-dc/src/path/secret/map/test.rs index b0a2e7c1cd..0b0e1c0270 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/test.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/test.rs @@ -25,6 +25,7 @@ fn fake_entry(peer: u16) -> Arc { sender::State::new([0; 16]), receiver::State::without_shared(), dc::testing::TEST_APPLICATION_PARAMS, + dc::testing::TEST_REHANDSHAKE_PERIOD, )) } @@ -138,6 +139,7 @@ impl Model { sender::State::new(stateless_reset), state.state.receiver_shared.clone().new_receiver(), dc::testing::TEST_APPLICATION_PARAMS, + dc::testing::TEST_REHANDSHAKE_PERIOD, ))); self.invariants.insert(Invariant::ContainsIp(ip)); diff --git a/quic/s2n-quic-core/src/dc/testing.rs b/quic/s2n-quic-core/src/dc/testing.rs index bbbb8bf0d0..e1ccd1abde 100644 --- a/quic/s2n-quic-core/src/dc/testing.rs +++ b/quic/s2n-quic-core/src/dc/testing.rs @@ -76,3 +76,5 @@ pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams { max_idle_timeout: Some(Duration::from_secs(30)), max_ack_delay: Duration::from_millis(25), }; + +pub const TEST_REHANDSHAKE_PERIOD: Duration = Duration::from_secs(3600 * 12);