Skip to content

Commit

Permalink
transport: Make TCP_NODELAY configurable (#146)
Browse files Browse the repository at this point in the history
Allow configuring `TCP_NODELAY` through TCP/WebSocket configurations.
Polkadot configures it to `true` btw:
https://github.com/paritytech/polkadot-sdk/blob/b65313e81465dd730e48d4ce00deb76922618375/substrate/client/network/src/transport.rs#L58
  • Loading branch information
altonen authored Jun 12, 2024
1 parent 70f94b1 commit ec48b04
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 11 deletions.
6 changes: 6 additions & 0 deletions src/transport/tcp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ pub struct Config {
/// Defaults to `true`.
pub reuse_port: bool,

/// Enable `TCP_NODELAY`.
///
/// Defaults to `false`.
pub nodelay: bool,

/// Yamux configuration.
pub yamux_config: crate::yamux::Config,

Expand Down Expand Up @@ -85,6 +90,7 @@ impl Default for Config {
"/ip6/::/tcp/0".parse().expect("valid address"),
],
reuse_port: true,
nodelay: false,
yamux_config: Default::default(),
noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR,
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,
Expand Down
6 changes: 6 additions & 0 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -775,6 +776,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -916,6 +918,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -961,6 +964,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1115,6 +1119,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1224,6 +1229,7 @@ mod tests {
.with(Protocol::Tcp(address.port())),
Default::default(),
Duration::from_secs(10),
false,
)
.await
.unwrap();
Expand Down
12 changes: 8 additions & 4 deletions src/transport/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl TcpListener {
pub fn new(
addresses: Vec<Multiaddr>,
reuse_port: bool,
nodelay: bool,
) -> (Self, Vec<Multiaddr>, DialAddresses) {
let (listeners, listen_addresses): (_, Vec<Vec<_>>) = addresses
.into_iter()
Expand All @@ -131,6 +132,7 @@ impl TcpListener {
},
};

socket.set_nodelay(nodelay).ok()?;
socket.set_nonblocking(true).ok()?;
socket.set_reuse_address(true).ok()?;
#[cfg(unix)]
Expand Down Expand Up @@ -339,7 +341,7 @@ mod tests {

#[tokio::test]
async fn no_listeners() {
let (mut listener, _, _) = TcpListener::new(Vec::new(), true);
let (mut listener, _, _) = TcpListener::new(Vec::new(), true, false);

futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) {
Poll::Pending => Poll::Ready(()),
Expand All @@ -351,7 +353,8 @@ mod tests {
#[tokio::test]
async fn one_listener() {
let address: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address.clone()], true);
let (mut listener, listen_addresses, _) =
TcpListener::new(vec![address.clone()], true, false);
let Some(Protocol::Tcp(port)) =
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
else {
Expand All @@ -368,7 +371,8 @@ mod tests {
async fn two_listeners() {
let address1: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address1, address2], true);
let (mut listener, listen_addresses, _) =
TcpListener::new(vec![address1, address2], true, false);
let Some(Protocol::Tcp(port1)) =
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
else {
Expand Down Expand Up @@ -421,7 +425,7 @@ mod tests {
async fn show_all_addresses() {
let address1: Multiaddr = "/ip6/::/tcp/0".parse().unwrap();
let address2: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2], true);
let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2], true, false);

println!("{listen_addresses:#?}");
}
Expand Down
15 changes: 13 additions & 2 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl TcpTransport {
address: Multiaddr,
dial_addresses: DialAddresses,
connection_open_timeout: Duration,
nodelay: bool,
) -> crate::Result<(Multiaddr, TcpStream)> {
let (socket_address, _) = TcpListener::get_socket_address(&address)?;
let remote_address = match socket_address {
Expand Down Expand Up @@ -198,6 +199,7 @@ impl TcpTransport {
socket.set_only_v6(true)?;
}
socket.set_nonblocking(true)?;
socket.set_nodelay(nodelay)?;

match dial_addresses.local_dial_address(&remote_address.ip()) {
Ok(Some(dial_address)) => {
Expand Down Expand Up @@ -261,6 +263,7 @@ impl TransportBuilder for TcpTransport {
let (listener, listen_addresses, dial_addresses) = TcpListener::new(
std::mem::take(&mut config.listen_addresses),
config.reuse_port,
config.nodelay,
);

Ok((
Expand Down Expand Up @@ -293,11 +296,12 @@ impl Transport for TcpTransport {
let substream_open_timeout = self.config.substream_open_timeout;
let dial_addresses = self.dial_addresses.clone();
let keypair = self.context.keypair.clone();
let nodelay = self.config.nodelay;

self.pending_dials.insert(connection_id, address.clone());
self.pending_connections.push(Box::pin(async move {
let (_, stream) =
TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout)
TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout, nodelay)
.await
.map_err(|error| (connection_id, error))?;

Expand Down Expand Up @@ -370,9 +374,16 @@ impl Transport for TcpTransport {
.map(|address| {
let dial_addresses = self.dial_addresses.clone();
let connection_open_timeout = self.config.connection_open_timeout;
let nodelay = self.config.nodelay;

async move {
TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout).await
TcpTransport::dial_peer(
address,
dial_addresses,
connection_open_timeout,
nodelay,
)
.await
}
})
.collect();
Expand Down
6 changes: 6 additions & 0 deletions src/transport/websocket/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ pub struct Config {
/// Defaults to `true`.
pub reuse_port: bool,

/// Enable `TCP_NODELAY`.
///
/// Defaults to `false`.
pub nodelay: bool,

/// Yamux configuration.
pub yamux_config: crate::yamux::Config,

Expand Down Expand Up @@ -85,6 +90,7 @@ impl Default for Config {
"/ip6/::/tcp/0/ws".parse().expect("valid address"),
],
reuse_port: true,
nodelay: false,
yamux_config: Default::default(),
noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR,
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,
Expand Down
8 changes: 5 additions & 3 deletions src/transport/websocket/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl WebSocketListener {
pub fn new(
addresses: Vec<Multiaddr>,
reuse_port: bool,
nodelay: bool,
) -> (Self, Vec<Multiaddr>, DialAddresses) {
let (listeners, listen_addresses): (_, Vec<Vec<_>>) = addresses
.into_iter()
Expand Down Expand Up @@ -135,6 +136,7 @@ impl WebSocketListener {
.ok()?,
};

socket.set_nodelay(nodelay).ok()?;
socket.set_nonblocking(true).ok()?;
socket.set_reuse_address(true).ok()?;
#[cfg(unix)]
Expand Down Expand Up @@ -398,7 +400,7 @@ mod tests {

#[tokio::test]
async fn no_listeners() {
let (mut listener, _, _) = WebSocketListener::new(Vec::new(), true);
let (mut listener, _, _) = WebSocketListener::new(Vec::new(), true, false);

futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) {
Poll::Pending => Poll::Ready(()),
Expand All @@ -411,7 +413,7 @@ mod tests {
async fn one_listener() {
let address: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap();
let (mut listener, listen_addresses, _) =
WebSocketListener::new(vec![address.clone()], true);
WebSocketListener::new(vec![address.clone()], true, false);
let Some(Protocol::Tcp(port)) =
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
else {
Expand All @@ -429,7 +431,7 @@ mod tests {
let address1: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap();
let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap();
let (mut listener, listen_addresses, _) =
WebSocketListener::new(vec![address1, address2], true);
WebSocketListener::new(vec![address1, address2], true, false);

let Some(Protocol::Tcp(port1)) =
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
Expand Down
16 changes: 14 additions & 2 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl WebSocketTransport {
address: Multiaddr,
dial_addresses: DialAddresses,
connection_open_timeout: Duration,
nodelay: bool,
) -> crate::Result<(Multiaddr, WebSocketStream<MaybeTlsStream<TcpStream>>)> {
let (url, _) = Self::multiaddr_into_url(address.clone())?;
let (socket_address, _) = WebSocketListener::get_socket_address(&address)?;
Expand Down Expand Up @@ -245,6 +246,7 @@ impl WebSocketTransport {
socket.set_only_v6(true)?;
}
socket.set_nonblocking(true)?;
socket.set_nodelay(nodelay)?;

match dial_addresses.local_dial_address(&remote_address.ip()) {
Ok(Some(dial_address)) => {
Expand Down Expand Up @@ -315,6 +317,7 @@ impl TransportBuilder for WebSocketTransport {
let (listener, listen_addresses, dial_addresses) = WebSocketListener::new(
std::mem::take(&mut config.listen_addresses),
config.reuse_port,
config.nodelay,
);

Ok((
Expand Down Expand Up @@ -344,6 +347,8 @@ impl Transport for WebSocketTransport {
let max_read_ahead_factor = self.config.noise_read_ahead_frame_count;
let max_write_buffer_size = self.config.noise_write_buffer_size;
let dial_addresses = self.dial_addresses.clone();
let nodelay = self.config.nodelay;

self.pending_dials.insert(connection_id, address.clone());

tracing::debug!(target: LOG_TARGET, ?connection_id, ?address, "open connection");
Expand All @@ -353,6 +358,7 @@ impl Transport for WebSocketTransport {
address.clone(),
dial_addresses,
connection_open_timeout,
nodelay,
)
.await
.map_err(|error| WebSocketError::new(error, Some(connection_id)))?;
Expand Down Expand Up @@ -437,10 +443,16 @@ impl Transport for WebSocketTransport {
.map(|address| {
let connection_open_timeout = self.config.connection_open_timeout;
let dial_addresses = self.dial_addresses.clone();
let nodelay = self.config.nodelay;

async move {
WebSocketTransport::dial_peer(address, dial_addresses, connection_open_timeout)
.await
WebSocketTransport::dial_peer(
address,
dial_addresses,
connection_open_timeout,
nodelay,
)
.await
}
})
.collect();
Expand Down

0 comments on commit ec48b04

Please sign in to comment.