Skip to content

Commit

Permalink
feat(s2n-quic-transport): Support de-duplicating concurrent connectio…
Browse files Browse the repository at this point in the history
…ns to the same peer (#2272)

* Track multiple open interests for the same connection

* Implement de-duplicating client connection open requests

Open connections can now be found when calling connect(), including
in-flight connections. Currently this property is applied to *all*
connections, the next commit will add support for configuring whether
the application is interested in this support.

* Add note to optimistic ack test

(No longer actually changed by previous commits, but seems worth the
comment regardless to save future time).
  • Loading branch information
Mark-Simulacrum authored Jul 17, 2024
1 parent b3eb094 commit d55d258
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 15 deletions.
2 changes: 1 addition & 1 deletion quic/s2n-quic-core/src/application/server_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::Bytes;
/// - It can be converted into [`Bytes`] which supports zero-copy slicing and
/// reference counting.
/// - It can be accessed as `&str` so that applications can reason about the string value.
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ServerName(Bytes);

/// A static value for localhost
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ exclude = ["corpus.tar.gz"]
default = ["std"]
std = ["futures-channel/std"]
unstable_resumption = []
unstable-provider-dc = []

[dependencies]
bytes = { version = "1", default-features = false }
Expand Down
79 changes: 69 additions & 10 deletions quic/s2n-quic-transport/src/connection/connection_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use s2n_quic_core::{
time::Timestamp,
transport,
};
use smallvec::SmallVec;

// Intrusive list adapter for managing the list of `done` connections
intrusive_adapter!(DoneConnectionsAdapter<C, L> = Arc<ConnectionNode<C, L>>: ConnectionNode<C, L> {
Expand Down Expand Up @@ -367,7 +368,11 @@ struct InterestLists<C: connection::Trait, L: connection::Lock<C>> {
waiting_for_connection_id: LinkedList<WaitingForConnectionIdAdapter<C, L>>,
/// Connections which are waiting for a timeout to occur
waiting_for_timeout: RBTree<WaitingForTimeoutAdapter<C, L>>,
waiting_for_open: BTreeMap<InternalConnectionId, ConnectionSender>,
/// Connections which are waiting for a handshake to complete.
///
/// The senders are a vector to allow multiple tasks to register interest in the same
/// connection being opened.
waiting_for_open: BTreeMap<InternalConnectionId, SmallVec<[ConnectionSender; 1]>>,
/// Inflight handshake count
handshake_connections: usize,
/// Total connection count
Expand Down Expand Up @@ -509,10 +514,24 @@ impl<C: connection::Trait, L: connection::Lock<C>> InterestLists<C, L> {
}
}
endpoint::Type::Client => {
if let Some(sender) = self.waiting_for_open.remove(&id) {
if let Err(Ok(handle)) = sender.send(Ok(handle)) {
// close the connection if the application is no longer waiting for the handshake
handle.api.close_connection(None);
if let Some(mut senders) = self.waiting_for_open.remove(&id) {
let mut any_interest = false;
let last = senders.pop();
for sender in senders {
if let Err(Ok(_handle)) = sender.send(Ok(handle.clone())) {
// This particular handle is not interested anymore, but maybe one
// of the others will be.
} else {
any_interest = true;
}
}
if let Some(sender) = last {
if let Err(Ok(handle)) = sender.send(Ok(handle)) {
if !any_interest {
// close the connection if the application is no longer waiting for the handshake
handle.api.close_connection(None);
}
}
}
} else {
debug_assert!(false, "client connection tried to open more than once");
Expand All @@ -524,7 +543,7 @@ impl<C: connection::Trait, L: connection::Lock<C>> InterestLists<C, L> {
if interests.finalization != node.done_connections_link.is_linked() {
if interests.finalization {
if <C::Config as endpoint::Config>::ENDPOINT_TYPE.is_client() {
if let Some(sender) = self.waiting_for_open.remove(&id) {
if let Some(senders) = self.waiting_for_open.remove(&id) {
let err = node.inner.read(|conn| conn.error());
let err = match err {
Ok(Some(err)) => {
Expand All @@ -542,7 +561,9 @@ impl<C: connection::Trait, L: connection::Lock<C>> InterestLists<C, L> {
.into()
}
};
let _ = sender.send(Err(err));
for sender in senders {
let _ = sender.send(Err(err));
}
}
}

Expand Down Expand Up @@ -770,13 +791,34 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
) {
debug_assert!(<C::Config as endpoint::Config>::ENDPOINT_TYPE.is_client());

self.interest_lists
.waiting_for_open
.insert(internal_connection_id, connection_sender);
self.interest_lists.waiting_for_open.insert(
internal_connection_id,
smallvec::smallvec![connection_sender],
);

self.insert_connection(connection, internal_connection_id)
}

/// Potentially register a sender with an existing client Connection
pub fn register_sender_for_client_connection(
&mut self,
internal_connection_id: &InternalConnectionId,
connection_sender: ConnectionSender,
) -> Result<(), ConnectionSender> {
debug_assert!(<C::Config as endpoint::Config>::ENDPOINT_TYPE.is_client());

if let Some(list) = self
.interest_lists
.waiting_for_open
.get_mut(internal_connection_id)
{
list.push(connection_sender);
Ok(())
} else {
Err(connection_sender)
}
}

pub(crate) fn poll_connection_request(
&mut self,
cx: &mut Context,
Expand Down Expand Up @@ -822,6 +864,23 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
self.interest_lists.connection_count
}

pub fn get_connection_handle(
&mut self,
id: &InternalConnectionId,
) -> Option<crate::connection::api::Connection> {
let cursor = self.connection_map.find(id);
let node = cursor.get()?;
let handle = unsafe {
// We have to obtain an `Arc<ConnectionNode>` in order to be able to
// perform interest updates later on. However the intrusive tree
// API only provides us a raw reference.
// Safety: We know that all of our ConnectionNode's are stored in
// reference counted pointers.
node.arc_from_ref()
};
Some(crate::connection::api::Connection::new(handle))
}

/// Looks up the `Connection` with the given ID and executes the provided function
/// on it.
///
Expand Down
65 changes: 65 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_id_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ impl InitialIdMap {
}
}

/// Bidirectional map for mapping from initial ID to internal connection ID and vice-versa
#[derive(Debug)]
pub(crate) struct OpenRequestMap {
/// Maps from initial id to internal connection ID
// No need for custom hashing since keys are locally controlled, not by remote.
open_request_map: HashMap<crate::endpoint::connect::Connect, InternalConnectionId>,
}

impl OpenRequestMap {
/// Constructs a new `InitialIdMap`
fn new() -> Self {
Self {
open_request_map: HashMap::new(),
}
}
}

#[derive(Debug)]
pub(crate) struct ConnectionIdMapperState {
/// Maps from external to internal connection IDs
Expand All @@ -194,6 +211,10 @@ pub(crate) struct ConnectionIdMapperState {
pub(crate) stateless_reset_map: StatelessResetMap,
/// Maps from initial id to internal connection IDs
pub(crate) initial_id_map: InitialIdMap,
/// Maps from connection open request to internal connection IDs
/// This is used for looking up a connection handle if one is already open,
/// rather than opening a new one each time.
pub(crate) open_request_map: OpenRequestMap,
}

impl ConnectionIdMapperState {
Expand All @@ -205,6 +226,7 @@ impl ConnectionIdMapperState {
HashState::new(random_generator),
HashState::new(random_generator),
),
open_request_map: OpenRequestMap::new(),
}
}
}
Expand Down Expand Up @@ -368,6 +390,49 @@ impl ConnectionIdMapper {
rotate_handshake_connection_id,
)
}

/// Returns the internal connection ID corresponding to the connect request, if there is a
/// pending or already open connection for that ID.
///
/// If no such connection exists, associates the connect request with the provided internal ID,
/// which is returned in future requests.
pub(crate) fn lazy_open(
&self,
new_connection_internal_id: InternalConnectionId,
connect: crate::endpoint::connect::Connect,
) -> Result<InternalConnectionId, OpenRegistry> {
let mut guard = self.state.lock().unwrap();
match guard.open_request_map.open_request_map.entry(connect) {
Entry::Occupied(e) => Ok(*e.get()),
Entry::Vacant(e) => {
let connect = e.key().clone();
e.insert(new_connection_internal_id);
Err(OpenRegistry {
state: self.state.clone(),
connect,
})
}
}
}
}

#[derive(Debug)]
pub struct OpenRegistry {
/// The shared state between mapper and registration
state: Arc<Mutex<ConnectionIdMapperState>>,
connect: crate::endpoint::connect::Connect,
}

impl Drop for OpenRegistry {
fn drop(&mut self) {
if let Ok(mut guard) = self.state.lock() {
// Stop tracking this open connection.
guard
.open_request_map
.open_request_map
.remove(&self.connect);
}
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ impl From<connection::Error> for ConnectionState {
pub struct ConnectionImpl<Config: endpoint::Config> {
/// The local ID registry which should be utilized by the connection
local_id_registry: connection::LocalIdRegistry,
/// The open registry (only used for its Drop impl)
open_registry: Option<connection::OpenRegistry>,
/// The timers which are used within the connection
timers: ConnectionTimers,
/// Describes whether the connection is known to be accepted by the application
Expand Down Expand Up @@ -555,6 +557,7 @@ impl<Config: endpoint::Config> ConnectionImpl<Config> {
};

if is_finished {
self.open_registry = None;
self.error = Err(transport::Error::NO_ERROR.into());
return Poll::Ready(());
}
Expand Down Expand Up @@ -629,6 +632,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
let waker = Waker::from(wakeup_handle.clone());
let mut connection = Self {
local_id_registry: parameters.local_id_registry,
open_registry: parameters.open_registry,
timers: Default::default(),
accept_state: AcceptState::Handshaking,
state: ConnectionState::Handshaking,
Expand Down Expand Up @@ -728,6 +732,9 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
self.state = error.into();
self.error = Err(error);

// Disable access to the connection from concurrent open attempts.
self.open_registry = None;

//= https://www.rfc-editor.org/rfc/rfc9000#section-10.3
//# An endpoint that wishes to communicate a fatal
//# connection error MUST use a CONNECTION_CLOSE frame if it is able.
Expand Down Expand Up @@ -1907,6 +1914,9 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
return;
}

// Disable access to the connection from concurrent open attempts.
self.open_registry = None;

if let Some(error) = error {
self.error = Err(connection::Error::application(error));
} else {
Expand Down
5 changes: 4 additions & 1 deletion quic/s2n-quic-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) mod transmission;

pub(crate) use api_provider::{ConnectionApi, ConnectionApiProvider};
pub(crate) use connection_container::{ConnectionContainer, ConnectionContainerIterationResult};
pub(crate) use connection_id_mapper::ConnectionIdMapper;
pub(crate) use connection_id_mapper::{ConnectionIdMapper, OpenRegistry};
pub(crate) use connection_interests::ConnectionInterests;
pub(crate) use connection_timers::ConnectionTimers;
pub(crate) use connection_trait::ConnectionTrait as Trait;
Expand All @@ -52,6 +52,9 @@ pub struct Parameters<'a, Cfg: endpoint::Config> {
pub local_id_registry: LocalIdRegistry,
/// The peer ID registry which should be utilized by the connection
pub peer_id_registry: PeerIdRegistry,
/// The open connections registry which should be utilized by the connection
/// None for accepted/inbound connections.
pub open_registry: Option<OpenRegistry>,
/// The last utilized remote Connection ID
pub peer_connection_id: PeerId,
/// The last utilized local Connection ID
Expand Down
20 changes: 19 additions & 1 deletion quic/s2n-quic-transport/src/endpoint/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ pub(crate) type ConnectionReceiver = oneshot::Receiver<Result<Connection, connec
/// its been created.
pub(crate) type ConnectionSender = oneshot::Sender<Result<Connection, connection::Error>>;

#[derive(Clone, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Connect {
pub(crate) remote_address: RemoteAddress,
pub(crate) server_name: Option<ServerName>,
pub(crate) deduplicate: bool,
}

impl fmt::Display for Connect {
Expand All @@ -49,6 +50,7 @@ impl Connect {
Self {
remote_address: addr.into().into(),
server_name: None,
deduplicate: false,
}
}

Expand All @@ -60,6 +62,22 @@ impl Connect {
..self
}
}

/// Specifies whether to deduplicate this connect request with other concurrent connect
/// requests and with any existing open connections.
///
/// Only a connection opened with `deduplicate: true` can be later found by a subsequent
/// request.
///
/// Note that this is only supported with the `dc` provider enabled on the s2n-quic endpoint.
#[must_use]
#[cfg(feature = "unstable-provider-dc")]
pub fn with_deduplicate(self, deduplicate: bool) -> Self {
Self {
deduplicate,
..self
}
}
}

/// Make it easy for applications to create a connection attempt without importing the `Connect` struct
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-transport/src/endpoint/initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
event_subscriber: endpoint_context.event_subscriber,
datagram_endpoint: endpoint_context.datagram,
dc_endpoint: endpoint_context.dc,
open_registry: None,
};

let mut connection = <Config as endpoint::Config>::Connection::new(connection_parameters)?;
Expand Down
Loading

0 comments on commit d55d258

Please sign in to comment.