Skip to content

Commit

Permalink
feat(s2n-quic-dc): import 10/17/24 (#2351)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Oct 17, 2024
1 parent 4626ffe commit 6bda48e
Show file tree
Hide file tree
Showing 24 changed files with 1,885 additions and 48 deletions.
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bytes = "1"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-queue = { version = "0.3" }
event-listener-strategy = "0.5"
flurry = "0.5"
libc = "0.2"
num-rational = { version = "0.4", default-features = false }
Expand Down
22 changes: 10 additions & 12 deletions dc/s2n-quic-dc/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,7 @@ pub use s2n_quic_core::varint::VarInt as KeyId;
pub mod testing;

#[derive(
Clone,
Copy,
Default,
PartialEq,
Eq,
Hash,
AsBytes,
FromBytes,
FromZeroes,
Unaligned,
PartialOrd,
Ord,
Clone, Copy, Default, PartialEq, Eq, AsBytes, FromBytes, FromZeroes, Unaligned, PartialOrd, Ord,
)]
#[cfg_attr(
any(test, feature = "testing"),
Expand All @@ -36,6 +25,15 @@ pub mod testing;
#[repr(C)]
pub struct Id([u8; 16]);

impl std::hash::Hash for Id {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// The ID has very high quality entropy already, so write just one half of it to keep hash
// costs as low as possible. For the main use of the Hash impl in the fixed-size ID map
// this translates to just directly using these bytes for the indexing.
state.write_u64(u64::from_ne_bytes(self.0[..8].try_into().unwrap()));
}
}

impl fmt::Debug for Id {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
format_args!("{:#01x}", u128::from_be_bytes(self.0)).fmt(f)
Expand Down
9 changes: 7 additions & 2 deletions dc/s2n-quic-dc/src/fixed_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! extent possible) reducing the likelihood.

use core::{
fmt::Debug,
hash::Hash,
sync::atomic::{AtomicU8, Ordering},
};
Expand All @@ -21,7 +22,7 @@ pub struct Map<K, V, S = RandomState> {

impl<K, V, S> Map<K, V, S>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
S: BuildHasher,
{
pub fn with_capacity(entries: usize, hasher: S) -> Self {
Expand Down Expand Up @@ -108,7 +109,7 @@ struct Slot<K, V> {

impl<K, V> Slot<K, V>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
{
fn new() -> Self {
Slot {
Expand Down Expand Up @@ -139,6 +140,10 @@ where
// If `new_key` isn't already in this slot, replace one of the existing entries with the
// new key. For now we rotate through based on `next_write`.
let replacement = self.next_write.fetch_add(1, Ordering::Relaxed) as usize % SLOT_CAPACITY;
tracing::trace!(
"evicting {:?} - bucket overflow",
values[replacement].as_mut().unwrap().0
);
values[replacement] = Some((new_key, new_value));
None
}
Expand Down
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod random;
pub mod recovery;
pub mod socket;
pub mod stream;
pub mod sync;
pub mod task;

#[cfg(any(test, feature = "testing"))]
Expand Down
10 changes: 10 additions & 0 deletions dc/s2n-quic-dc/src/path/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ pub mod stateless_reset;

pub use key::{open, seal};
pub use map::Map;

/// The handshake operation may return immediately if state for the target is already cached,
/// or perform an actual handshake if not.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum HandshakeKind {
/// Handshake was skipped because a secret was already present in the cache
Cached,
/// Handshake was performed to generate a new secret
Fresh,
}
44 changes: 38 additions & 6 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use s2n_quic_core::{
};
use std::{
fmt,
hash::{BuildHasherDefault, Hasher},
net::{Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Expand Down Expand Up @@ -51,6 +52,24 @@ pub struct Map {
pub(super) state: Arc<State>,
}

#[derive(Default)]
pub(super) struct NoopIdHasher(Option<u64>);

impl Hasher for NoopIdHasher {
fn finish(&self) -> u64 {
self.0.unwrap()
}

fn write(&mut self, _bytes: &[u8]) {
unimplemented!()
}

fn write_u64(&mut self, x: u64) {
debug_assert!(self.0.is_none());
self.0 = Some(x);
}
}

// # Managing memory consumption
//
// For regular rotation with live peers, we retain at most two secrets: one derived from the most
Expand Down Expand Up @@ -93,7 +112,7 @@ pub(super) struct State {
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,

// All known entries.
pub(super) ids: fixed_map::Map<Id, Arc<Entry>>,
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,

pub(super) signer: stateless_reset::Signer,

Expand Down Expand Up @@ -232,7 +251,7 @@ impl State {
}

impl Map {
pub fn new(signer: stateless_reset::Signer) -> Self {
pub fn new(signer: stateless_reset::Signer, capacity: usize) -> Self {
// FIXME: Avoid unwrap and the whole socket.
//
// We only ever send on this socket - but we really should be sending on the same
Expand All @@ -244,11 +263,11 @@ impl Map {
control_socket.set_nonblocking(true).unwrap();
let state = State {
// This is around 500MB with current entry size.
max_capacity: 500_000,
max_capacity: capacity,
// FIXME: Allow configuring the rehandshake_period.
rehandshake_period: Duration::from_secs(3600 * 24),
peers: fixed_map::Map::with_capacity(500_000, Default::default()),
ids: fixed_map::Map::with_capacity(500_000, Default::default()),
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
requested_handshakes: Default::default(),
cleaner: Cleaner::new(),
signer,
Expand Down Expand Up @@ -301,6 +320,19 @@ impl Map {
Some((sealer, credentials, state.parameters.clone()))
}

/// Retrieve a sealer by path secret ID.
///
/// Generally callers should prefer to use one of the `pair` APIs; this is primarily useful for
/// "response" datagrams which want to be bound to the exact same shared secret.
///
/// Note that unlike by-IP lookup this should typically not be done significantly after the
/// original secret was used for decryption.
pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, ApplicationParams)> {
let state = self.state.ids.get_by_key(&id)?;
let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
&self,
credentials: &Credentials,
Expand Down Expand Up @@ -485,7 +517,7 @@ impl Map {
pub fn for_test_with_peers(
peers: Vec<(schedule::Ciphersuite, dc::Version, SocketAddr)>,
) -> (Self, Vec<Id>) {
let provider = Self::new(stateless_reset::Signer::random());
let provider = Self::new(stateless_reset::Signer::random(), peers.len() * 3);
let mut secret = [0; 32];
aws_lc_rs::rand::fill(&mut secret).unwrap();
let mut stateless_reset = [0; control::TAG_LEN];
Expand Down
19 changes: 12 additions & 7 deletions dc/s2n-quic-dc/src/path/secret/map/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn fake_entry(peer: u16) -> Arc<Entry> {
#[test]
fn cleans_after_delay() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 50);

// Stop background processing. We expect to manually invoke clean, and a background worker
// might interfere with our state.
Expand Down Expand Up @@ -60,7 +60,7 @@ fn cleans_after_delay() {
#[test]
fn thread_shutdown() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10);
let state = Arc::downgrade(&map.state);
drop(map);

Expand Down Expand Up @@ -263,7 +263,7 @@ fn check_invariants() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let mut map = Map::new(signer);
let mut map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
Expand Down Expand Up @@ -293,7 +293,7 @@ fn check_invariants_no_overflow() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
Expand All @@ -316,7 +316,7 @@ fn check_invariants_no_overflow() {
#[ignore = "memory growth takes a long time to run"]
fn no_memory_growth() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 100_000);
map.state.cleaner.stop();
for idx in 0..500_000 {
// FIXME: this ends up 2**16 peers in the `peers` map
Expand All @@ -325,10 +325,15 @@ fn no_memory_growth() {
}

#[test]
#[cfg(all(target_pointer_width = "64", target_os = "linux"))]
fn entry_size() {
let mut should_check = true;

should_check &= cfg!(target_pointer_width = "64");
should_check &= cfg!(target_os = "linux");
should_check &= std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok();

// This gates to running only on specific GHA to reduce false positives.
if std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok() {
if should_check {
assert_eq!(fake_entry(0).size(), 238);
}
}
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const DEFAULT_INFLIGHT_TIMEOUT: Duration = Duration::from_secs(5);
pub const MAX_DATAGRAM_SIZE: usize = 1 << 15; // 32k

pub mod application;
pub mod client;
pub mod crypto;
pub mod endpoint;
pub mod environment;
Expand Down
4 changes: 4 additions & 0 deletions dc/s2n-quic-dc/src/stream/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod tokio;
121 changes: 121 additions & 0 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
path::secret,
stream::{
application::Stream,
endpoint,
environment::tokio::{self as env, Environment},
socket::Protocol,
},
};
use std::{io, net::SocketAddr};
use tokio::net::TcpStream;

/// Connects using the UDP transport layer
#[inline]
pub async fn connect_udp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// ensure we have a secret for the peer
handshake.await?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::UdpUnbound(acceptor_addr.into()),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Udp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects using the TCP transport layer
#[inline]
pub async fn connect_tcp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// Race TCP handshake with the TLS handshake
let (socket, _) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(socket),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects with a pre-existing TCP stream
///
/// # Note
///
/// The provided `map` must contain a shared secret for the `handshake_addr`
#[inline]
pub async fn connect_tcp_with(
handshake_addr: SocketAddr,
stream: TcpStream,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream> {
let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(stream),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

#[inline]
async fn write_prelude(stream: &mut Stream) -> io::Result<()> {
// TODO should we actually write the prelude here or should we do late sealer binding on
// the first packet to reduce secret reordering on the peer

stream
.write_from(&mut s2n_quic_core::buffer::reader::storage::Empty)
.await
.map(|_| ())
}
Loading

0 comments on commit 6bda48e

Please sign in to comment.