Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): reduce socket addr calls even more #2406

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,18 @@ where
// Make sure TCP_NODELAY is set
let _ = socket.set_nodelay(true);

let stream = endpoint::open_stream(env, peer, env::TcpRegistered(socket), subscriber, None)?;
let local_port = socket.local_addr()?.port();
let stream = endpoint::open_stream(
env,
peer,
env::TcpRegistered {
socket,
peer_addr: acceptor_addr.into(),
local_port,
},
subscriber,
None,
)?;

// build the stream inside the application context
let mut stream = stream.connect()?;
Expand All @@ -85,14 +96,26 @@ where
#[inline]
pub async fn connect_tcp_with<Sub>(
peer: secret::map::Peer,
stream: TcpStream,
socket: TcpStream,
env: &Environment<Sub>,
subscriber: Sub,
) -> io::Result<Stream<Sub>>
where
Sub: event::Subscriber,
{
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(stream), subscriber, None)?;
let local_port = socket.local_addr()?.port();
let peer_addr = socket.peer_addr()?.into();
let stream = endpoint::open_stream(
env,
peer,
env::TcpRegistered {
socket,
peer_addr,
local_port,
},
subscriber,
None,
)?;

// build the stream inside the application context
let mut stream = stream.connect()?;
Expand Down
24 changes: 16 additions & 8 deletions dc/s2n-quic-dc/src/stream/environment/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ where
}

/// A socket that is already registered with the application runtime
pub struct TcpRegistered(pub TcpStream);
pub struct TcpRegistered {
pub socket: TcpStream,
pub peer_addr: SocketAddress,
pub local_port: u16,
}

impl<Sub> super::Peer<Environment<Sub>> for TcpRegistered
where
Expand All @@ -274,9 +278,9 @@ where

#[inline]
fn setup(self, _env: &Environment<Sub>) -> super::Result<super::SocketSet<Self::WorkerSocket>> {
let remote_addr = self.0.peer_addr()?.into();
let source_control_port = self.0.local_addr()?.port();
let application = Box::new(self.0);
let remote_addr = self.peer_addr;
let source_control_port = self.local_port;
let application = Box::new(self.socket);
Ok(super::SocketSet {
application,
read_worker: None,
Expand All @@ -289,7 +293,11 @@ where
}

/// A socket that should be reregistered with the application runtime
pub struct TcpReregistered(pub TcpStream, pub SocketAddress);
pub struct TcpReregistered {
pub socket: TcpStream,
pub peer_addr: SocketAddress,
pub local_port: u16,
}

impl<Sub> super::Peer<Environment<Sub>> for TcpReregistered
where
Expand All @@ -308,9 +316,9 @@ where

#[inline]
fn setup(self, _env: &Environment<Sub>) -> super::Result<super::SocketSet<Self::WorkerSocket>> {
let remote_addr = self.1;
let source_control_port = self.0.local_addr()?.port();
let application = Box::new(self.0.into_std()?);
let source_control_port = self.local_port;
let remote_addr = self.peer_addr;
let application = Box::new(self.socket.into_std()?);
Ok(super::SocketSet {
application,
read_worker: None,
Expand Down
12 changes: 10 additions & 2 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ impl FreshQueue {
remaining -= 1;

if remaining == 0 {
// if we're yielding then we need to wake ourselves up again
cx.waker().wake_by_ref();
break;
}
}
Expand Down Expand Up @@ -444,6 +446,7 @@ where
secrets: secret::Map,
accept_flavor: accept::Flavor,
subscriber: Sub,
local_port: u16,
}

impl<Sub> WorkerContext<Sub>
Expand All @@ -458,6 +461,7 @@ where
secrets: acceptor.secrets.clone(),
accept_flavor: acceptor.accept_flavor,
subscriber: acceptor.subscriber.clone(),
local_port: acceptor.socket.local_addr().unwrap().port(),
}
}
}
Expand Down Expand Up @@ -691,7 +695,11 @@ impl WorkerState {
let stream_builder = match endpoint::accept_stream(
now,
&context.env,
env::TcpReregistered(socket, remote_address),
env::TcpReregistered {
socket,
peer_addr: remote_address,
local_port: context.local_port,
},
&initial_packet,
None,
Some(recv_buffer),
Expand All @@ -702,7 +710,7 @@ impl WorkerState {
) {
Ok(stream) => stream,
Err(error) => {
if let Some(env::TcpReregistered(socket, remote_address)) = error.peer {
if let Some(env::TcpReregistered { socket, .. }) = error.peer {
if !error.secret_control.is_empty() {
// if we need to send an error then update the state and loop back
// around
Expand Down
Loading