Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): import 10/17/24 #2351

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bytes = "1"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-queue = { version = "0.3" }
event-listener-strategy = "0.5"
flurry = "0.5"
libc = "0.2"
num-rational = { version = "0.4", default-features = false }
Expand Down
22 changes: 10 additions & 12 deletions dc/s2n-quic-dc/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,7 @@ pub use s2n_quic_core::varint::VarInt as KeyId;
pub mod testing;

#[derive(
Clone,
Copy,
Default,
PartialEq,
Eq,
Hash,
AsBytes,
FromBytes,
FromZeroes,
Unaligned,
PartialOrd,
Ord,
Clone, Copy, Default, PartialEq, Eq, AsBytes, FromBytes, FromZeroes, Unaligned, PartialOrd, Ord,
)]
#[cfg_attr(
any(test, feature = "testing"),
Expand All @@ -36,6 +25,15 @@ pub mod testing;
#[repr(C)]
pub struct Id([u8; 16]);

impl std::hash::Hash for Id {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// The ID has very high quality entropy already, so write just one half of it to keep hash
// costs as low as possible. For the main use of the Hash impl in the fixed-size ID map
// this translates to just directly using these bytes for the indexing.
state.write_u64(u64::from_ne_bytes(self.0[..8].try_into().unwrap()));
}
}

impl fmt::Debug for Id {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
format_args!("{:#01x}", u128::from_be_bytes(self.0)).fmt(f)
Expand Down
9 changes: 7 additions & 2 deletions dc/s2n-quic-dc/src/fixed_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! extent possible) reducing the likelihood.

use core::{
fmt::Debug,
hash::Hash,
sync::atomic::{AtomicU8, Ordering},
};
Expand All @@ -21,7 +22,7 @@ pub struct Map<K, V, S = RandomState> {

impl<K, V, S> Map<K, V, S>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
S: BuildHasher,
{
pub fn with_capacity(entries: usize, hasher: S) -> Self {
Expand Down Expand Up @@ -108,7 +109,7 @@ struct Slot<K, V> {

impl<K, V> Slot<K, V>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
{
fn new() -> Self {
Slot {
Expand Down Expand Up @@ -139,6 +140,10 @@ where
// If `new_key` isn't already in this slot, replace one of the existing entries with the
// new key. For now we rotate through based on `next_write`.
let replacement = self.next_write.fetch_add(1, Ordering::Relaxed) as usize % SLOT_CAPACITY;
tracing::trace!(
"evicting {:?} - bucket overflow",
values[replacement].as_mut().unwrap().0
);
values[replacement] = Some((new_key, new_value));
None
}
Expand Down
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod random;
pub mod recovery;
pub mod socket;
pub mod stream;
pub mod sync;
pub mod task;

#[cfg(any(test, feature = "testing"))]
Expand Down
10 changes: 10 additions & 0 deletions dc/s2n-quic-dc/src/path/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ pub mod stateless_reset;

pub use key::{open, seal};
pub use map::Map;

/// The handshake operation may return immediately if state for the target is already cached,
/// or perform an actual handshake if not.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum HandshakeKind {
/// Handshake was skipped because a secret was already present in the cache
Cached,
/// Handshake was performed to generate a new secret
Fresh,
}
44 changes: 38 additions & 6 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use s2n_quic_core::{
};
use std::{
fmt,
hash::{BuildHasherDefault, Hasher},
net::{Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Expand Down Expand Up @@ -51,6 +52,24 @@ pub struct Map {
pub(super) state: Arc<State>,
}

#[derive(Default)]
pub(super) struct NoopIdHasher(Option<u64>);

impl Hasher for NoopIdHasher {
fn finish(&self) -> u64 {
self.0.unwrap()
}

fn write(&mut self, _bytes: &[u8]) {
unimplemented!()
}

fn write_u64(&mut self, x: u64) {
debug_assert!(self.0.is_none());
self.0 = Some(x);
}
}

// # Managing memory consumption
//
// For regular rotation with live peers, we retain at most two secrets: one derived from the most
Expand Down Expand Up @@ -93,7 +112,7 @@ pub(super) struct State {
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,

// All known entries.
pub(super) ids: fixed_map::Map<Id, Arc<Entry>>,
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,

pub(super) signer: stateless_reset::Signer,

Expand Down Expand Up @@ -232,7 +251,7 @@ impl State {
}

impl Map {
pub fn new(signer: stateless_reset::Signer) -> Self {
pub fn new(signer: stateless_reset::Signer, capacity: usize) -> Self {
// FIXME: Avoid unwrap and the whole socket.
//
// We only ever send on this socket - but we really should be sending on the same
Expand All @@ -244,11 +263,11 @@ impl Map {
control_socket.set_nonblocking(true).unwrap();
let state = State {
// This is around 500MB with current entry size.
max_capacity: 500_000,
max_capacity: capacity,
// FIXME: Allow configuring the rehandshake_period.
rehandshake_period: Duration::from_secs(3600 * 24),
peers: fixed_map::Map::with_capacity(500_000, Default::default()),
ids: fixed_map::Map::with_capacity(500_000, Default::default()),
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
requested_handshakes: Default::default(),
cleaner: Cleaner::new(),
signer,
Expand Down Expand Up @@ -301,6 +320,19 @@ impl Map {
Some((sealer, credentials, state.parameters.clone()))
}

/// Retrieve a sealer by path secret ID.
///
/// Generally callers should prefer to use one of the `pair` APIs; this is primarily useful for
/// "response" datagrams which want to be bound to the exact same shared secret.
///
/// Note that unlike by-IP lookup this should typically not be done significantly after the
/// original secret was used for decryption.
pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, ApplicationParams)> {
let state = self.state.ids.get_by_key(&id)?;
let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
&self,
credentials: &Credentials,
Expand Down Expand Up @@ -485,7 +517,7 @@ impl Map {
pub fn for_test_with_peers(
peers: Vec<(schedule::Ciphersuite, dc::Version, SocketAddr)>,
) -> (Self, Vec<Id>) {
let provider = Self::new(stateless_reset::Signer::random());
let provider = Self::new(stateless_reset::Signer::random(), peers.len() * 3);
let mut secret = [0; 32];
aws_lc_rs::rand::fill(&mut secret).unwrap();
let mut stateless_reset = [0; control::TAG_LEN];
Expand Down
19 changes: 12 additions & 7 deletions dc/s2n-quic-dc/src/path/secret/map/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn fake_entry(peer: u16) -> Arc<Entry> {
#[test]
fn cleans_after_delay() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 50);

// Stop background processing. We expect to manually invoke clean, and a background worker
// might interfere with our state.
Expand Down Expand Up @@ -60,7 +60,7 @@ fn cleans_after_delay() {
#[test]
fn thread_shutdown() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10);
let state = Arc::downgrade(&map.state);
drop(map);

Expand Down Expand Up @@ -263,7 +263,7 @@ fn check_invariants() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let mut map = Map::new(signer);
let mut map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
Expand Down Expand Up @@ -293,7 +293,7 @@ fn check_invariants_no_overflow() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
Expand All @@ -316,7 +316,7 @@ fn check_invariants_no_overflow() {
#[ignore = "memory growth takes a long time to run"]
fn no_memory_growth() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 100_000);
map.state.cleaner.stop();
for idx in 0..500_000 {
// FIXME: this ends up 2**16 peers in the `peers` map
Expand All @@ -325,10 +325,15 @@ fn no_memory_growth() {
}

#[test]
#[cfg(all(target_pointer_width = "64", target_os = "linux"))]
fn entry_size() {
let mut should_check = true;

should_check &= cfg!(target_pointer_width = "64");
should_check &= cfg!(target_os = "linux");
should_check &= std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok();

// This gates to running only on specific GHA to reduce false positives.
if std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok() {
if should_check {
assert_eq!(fake_entry(0).size(), 238);
}
}
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const DEFAULT_INFLIGHT_TIMEOUT: Duration = Duration::from_secs(5);
pub const MAX_DATAGRAM_SIZE: usize = 1 << 15; // 32k

pub mod application;
pub mod client;
pub mod crypto;
pub mod endpoint;
pub mod environment;
Expand Down
4 changes: 4 additions & 0 deletions dc/s2n-quic-dc/src/stream/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod tokio;
121 changes: 121 additions & 0 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
path::secret,
stream::{
application::Stream,
endpoint,
environment::tokio::{self as env, Environment},
socket::Protocol,
},
};
use std::{io, net::SocketAddr};
use tokio::net::TcpStream;

/// Connects using the UDP transport layer
#[inline]
pub async fn connect_udp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// ensure we have a secret for the peer
handshake.await?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::UdpUnbound(acceptor_addr.into()),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Udp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects using the TCP transport layer
#[inline]
pub async fn connect_tcp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// Race TCP handshake with the TLS handshake
let (socket, _) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(socket),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects with a pre-existing TCP stream
///
/// # Note
///
/// The provided `map` must contain a shared secret for the `handshake_addr`
#[inline]
pub async fn connect_tcp_with(
handshake_addr: SocketAddr,
stream: TcpStream,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream> {
let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(stream),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

#[inline]
async fn write_prelude(stream: &mut Stream) -> io::Result<()> {
// TODO should we actually write the prelude here or should we do late sealer binding on
// the first packet to reduce secret reordering on the peer

stream
.write_from(&mut s2n_quic_core::buffer::reader::storage::Empty)
.await
.map(|_| ())
}
Loading
Loading