Skip to content

Commit

Permalink
[Network] set socket RX/TX Buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
Markuze authored and perryjrandall committed Oct 11, 2022
1 parent 858bdbc commit 9c8d6c7
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 16 deletions.
12 changes: 12 additions & 0 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */
pub const CONNECTION_BACKOFF_BASE: u64 = 2;
pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */;
pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE;
pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon
pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand Down Expand Up @@ -79,6 +83,10 @@ pub struct NetworkConfig {
pub mutual_authentication: bool,
pub network_id: NetworkId,
pub runtime_threads: Option<usize>,
pub inbound_rx_buffer_size_bytes: Option<u32>,
pub inbound_tx_buffer_size_bytes: Option<u32>,
pub outbound_rx_buffer_size_bytes: Option<u32>,
pub outbound_tx_buffer_size_bytes: Option<u32>,
// Addresses of initial peers to connect to. In a mutual_authentication network,
// we will extract the public keys from these addresses to set our initial
// trusted peers set. TODO: Replace usage in configs with `seeds` this is for backwards compatibility
Expand Down Expand Up @@ -141,6 +149,10 @@ impl NetworkConfig {
inbound_rate_limit_config: None,
outbound_rate_limit_config: None,
max_message_size: MAX_MESSAGE_SIZE,
inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE),
inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE),
outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE),
outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE),
};
config.prepare_identity();
config
Expand Down
8 changes: 4 additions & 4 deletions crates/aptos-network-checker/src/check_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::args::CheckEndpointArgs;
use anyhow::{bail, Context, Result};
use aptos_config::{
config::{RoleType, HANDSHAKE_VERSION},
Expand All @@ -9,6 +10,7 @@ use aptos_config::{
use aptos_crypto::x25519::{self, PRIVATE_KEY_SIZE};
use aptos_types::{account_address, chain_id::ChainId, network_address::NetworkAddress, PeerId};
use futures::{AsyncReadExt, AsyncWriteExt};
use network::transport::TCPBufferCfg;
use network::{
noise::{HandshakeAuthMode, NoiseUpgrader},
protocols::wire::handshake::v1::ProtocolIdSet,
Expand All @@ -18,8 +20,6 @@ use network::{
use std::{collections::BTreeMap, sync::Arc};
use tokio::time::Duration;

use crate::args::CheckEndpointArgs;

// This function must take the private key in as an owned value vs as part of
// the args struct because private key needs to be owned, and cannot be cloned.
pub async fn check_endpoint(
Expand Down Expand Up @@ -86,7 +86,7 @@ async fn check_endpoint_with_handshake(
) -> Result<String> {
// Connect to the address, this should handle DNS resolution if necessary.
let fut_socket = async {
resolve_and_connect(address.clone())
resolve_and_connect(address.clone(), TCPBufferCfg::new())
.await
.map(TcpSocket::new)
};
Expand All @@ -112,7 +112,7 @@ async fn check_endpoint_with_handshake(
const INVALID_NOISE_HEADER: &[u8; 152] = &[7; 152];

async fn check_endpoint_no_handshake(address: NetworkAddress) -> Result<String> {
let mut socket = resolve_and_connect(address.clone())
let mut socket = resolve_and_connect(address.clone(), TCPBufferCfg::new())
.await
.map(TcpSocket::new)
.with_context(|| format!("Failed to connect to {}", address))?;
Expand Down
11 changes: 11 additions & 0 deletions network/builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use network::{
network::{AppConfig, NewNetworkEvents, NewNetworkSender},
},
};

use netcore::transport::tcp::TCPBufferCfg;
use network_discovery::DiscoveryChangeListener;
use std::{
clone::Clone,
Expand Down Expand Up @@ -90,6 +92,7 @@ impl NetworkBuilder {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
// A network cannot exist without a PeerManager
// TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests.
Expand All @@ -109,6 +112,7 @@ impl NetworkBuilder {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
);

NetworkBuilder {
Expand Down Expand Up @@ -152,6 +156,7 @@ impl NetworkBuilder {
MAX_INBOUND_CONNECTIONS,
None,
None,
TCPBufferCfg::default(),
);

builder.add_connectivity_manager(
Expand Down Expand Up @@ -207,6 +212,12 @@ impl NetworkBuilder {
config.max_inbound_connections,
config.inbound_rate_limit_config,
config.outbound_rate_limit_config,
TCPBufferCfg::new_configs(
config.inbound_rx_buffer_size_bytes,
config.inbound_tx_buffer_size_bytes,
config.outbound_rx_buffer_size_bytes,
config.outbound_tx_buffer_size_bytes,
),
);

network_builder.add_connection_monitoring(
Expand Down
93 changes: 84 additions & 9 deletions network/netcore/src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::{
};
use proxy::Proxy;
use std::{
convert::TryFrom,
fmt::Debug,
io,
net::SocketAddr,
Expand All @@ -29,13 +28,47 @@ use tokio::{
use tokio_util::compat::Compat;
use url::Url;

#[derive(Debug, Clone, Copy, Default)]
pub struct TCPBufferCfg {
inbound_rx_buffer_bytes: Option<u32>,
inbound_tx_buffer_bytes: Option<u32>,
outbound_rx_buffer_bytes: Option<u32>,
outbound_tx_buffer_bytes: Option<u32>,
}

impl TCPBufferCfg {
pub const fn new() -> Self {
Self {
inbound_rx_buffer_bytes: None,
inbound_tx_buffer_bytes: None,
outbound_rx_buffer_bytes: None,
outbound_tx_buffer_bytes: None,
}
}
pub fn new_configs(
inbound_rx: Option<u32>,
inbound_tx: Option<u32>,
outbound_rx: Option<u32>,
outbound_tx: Option<u32>,
) -> Self {
Self {
inbound_rx_buffer_bytes: inbound_rx,
inbound_tx_buffer_bytes: inbound_tx,
outbound_rx_buffer_bytes: outbound_rx,
outbound_tx_buffer_bytes: outbound_tx,
}
}
}

/// Transport to build TCP connections
#[derive(Debug, Clone, Default)]
pub struct TcpTransport {
/// TTL to set for opened sockets, or `None` to keep default.
pub ttl: Option<u32>,
/// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
pub nodelay: Option<bool>,

pub tcp_buff_cfg: TCPBufferCfg,
}

impl TcpTransport {
Expand All @@ -50,6 +83,10 @@ impl TcpTransport {

Ok(())
}

pub fn set_tcp_buffers(&mut self, configs: &TCPBufferCfg) {
self.tcp_buff_cfg = *configs;
}
}

impl Transport for TcpTransport {
Expand All @@ -69,9 +106,23 @@ impl Transport for TcpTransport {
return Err(invalid_addr_error(&addr));
}

let listener = ::std::net::TcpListener::bind((ipaddr, port))?;
listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(listener)?;
let addr = SocketAddr::new(ipaddr, port);

let socket = if ipaddr.is_ipv4() {
tokio::net::TcpSocket::new_v4()?
} else {
tokio::net::TcpSocket::new_v6()?
};

if let Some(rx_buf) = self.tcp_buff_cfg.inbound_rx_buffer_bytes {
socket.set_recv_buffer_size(rx_buf)?;
}
if let Some(tx_buf) = self.tcp_buff_cfg.inbound_tx_buffer_bytes {
socket.set_send_buffer_size(tx_buf)?;
}
socket.bind(addr)?;

let listener = socket.listen(256)?;
let listen_addr = NetworkAddress::from(listener.local_addr()?);

Ok((
Expand All @@ -88,7 +139,6 @@ impl Transport for TcpTransport {

// ensure addr is well formed to save some work before potentially
// spawning a dial task that will fail anyway.
// TODO(philiphayes): base tcp transport should not allow trailing protocols
parse_ip_tcp(protos)
.map(|_| ())
.or_else(|| parse_dns_tcp(protos).map(|_| ()))
Expand Down Expand Up @@ -123,7 +173,7 @@ impl Transport for TcpTransport {
let f: Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send + 'static>> =
Box::pin(match proxy_addr {
Some(proxy_addr) => Either::Left(connect_via_proxy(proxy_addr, addr)),
None => Either::Right(resolve_and_connect(addr)),
None => Either::Right(resolve_and_connect(addr, self.tcp_buff_cfg)),
});

Ok(TcpOutbound {
Expand All @@ -144,23 +194,48 @@ async fn resolve_with_filter(
.filter(move |socketaddr| ip_filter.matches(socketaddr.ip())))
}

pub async fn connect_with_config(
port: u16,
ipaddr: std::net::IpAddr,
tcp_buff_cfg: TCPBufferCfg,
) -> io::Result<TcpStream> {
let addr = SocketAddr::new(ipaddr, port);

let socket = if addr.is_ipv4() {
tokio::net::TcpSocket::new_v4()?
} else {
tokio::net::TcpSocket::new_v6()?
};

if let Some(rx_buf) = tcp_buff_cfg.outbound_rx_buffer_bytes {
socket.set_recv_buffer_size(rx_buf)?;
}
if let Some(tx_buf) = tcp_buff_cfg.outbound_tx_buffer_bytes {
socket.set_send_buffer_size(tx_buf)?;
}
socket.connect(addr).await
}

/// Note: we need to take ownership of this `NetworkAddress` (instead of just
/// borrowing the `&[Protocol]` slice) so this future can be `Send + 'static`.
pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result<TcpStream> {
pub async fn resolve_and_connect(
addr: NetworkAddress,
tcp_buff_cfg: TCPBufferCfg,
) -> io::Result<TcpStream> {
let protos = addr.as_slice();

if let Some(((ipaddr, port), _addr_suffix)) = parse_ip_tcp(protos) {
// this is an /ip4 or /ip6 address, so we can just connect without any
// extra resolving or filtering.
TcpStream::connect((ipaddr, port)).await
connect_with_config(port, ipaddr, tcp_buff_cfg).await
} else if let Some(((ip_filter, dns_name, port), _addr_suffix)) = parse_dns_tcp(protos) {
// resolve dns name and filter
let socketaddr_iter = resolve_with_filter(ip_filter, dns_name.as_ref(), port).await?;
let mut last_err = None;

// try to connect until the first succeeds
for socketaddr in socketaddr_iter {
match TcpStream::connect(socketaddr).await {
match connect_with_config(socketaddr.port(), socketaddr.ip(), tcp_buff_cfg).await {
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err),
}
Expand Down
20 changes: 18 additions & 2 deletions network/src/peer_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use channel::{self, aptos_channel, message_queues::QueueStyle};
#[cfg(any(test, feature = "testing", feature = "fuzzing"))]
use netcore::transport::memory::MemoryTransport;
use netcore::transport::{
tcp::{TcpSocket, TcpTransport},
tcp::{TCPBufferCfg, TcpSocket, TcpTransport},
Transport,
};
use std::{clone::Clone, collections::HashMap, fmt::Debug, net::IpAddr, sync::Arc};
Expand Down Expand Up @@ -82,6 +82,7 @@ struct PeerManagerContext {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
}

impl PeerManagerContext {
Expand All @@ -107,6 +108,7 @@ impl PeerManagerContext {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
Self {
pm_reqs_tx,
Expand All @@ -126,6 +128,7 @@ impl PeerManagerContext {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
}
}

Expand Down Expand Up @@ -186,6 +189,7 @@ impl PeerManagerBuilder {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
// Setup channel to send requests to peer manager.
let (pm_reqs_tx, pm_reqs_rx) = aptos_channel::new(
Expand Down Expand Up @@ -223,6 +227,7 @@ impl PeerManagerBuilder {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
)),
peer_manager: None,
listen_address,
Expand Down Expand Up @@ -278,11 +283,15 @@ impl PeerManagerBuilder {
),
};

let mut aptos_tcp_transport = APTOS_TCP_TRANSPORT.clone();
let tcp_cfg = self.get_tcp_buffers_cfg();
aptos_tcp_transport.set_tcp_buffers(&tcp_cfg);

self.peer_manager = match self.listen_address.as_slice() {
[Ip4(_), Tcp(_)] | [Ip6(_), Tcp(_)] => {
Some(TransportPeerManager::Tcp(self.build_with_transport(
AptosNetTransport::new(
APTOS_TCP_TRANSPORT.clone(),
aptos_tcp_transport,
self.network_context,
self.time_service.clone(),
key,
Expand Down Expand Up @@ -406,6 +415,13 @@ impl PeerManagerBuilder {
.add_connection_event_listener()
}

pub fn get_tcp_buffers_cfg(&self) -> TCPBufferCfg {
self.peer_manager_context
.as_ref()
.expect("Cannot add an event listener if PeerManager has already been built.")
.tcp_buffer_cfg
}

/// Register a peer-to-peer service (i.e., both client and service) for given
/// protocols.
pub fn add_p2p_service(
Expand Down
4 changes: 3 additions & 1 deletion network/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use short_hex_str::AsShortHexStr;
use std::{collections::BTreeMap, convert::TryFrom, fmt, io, pin::Pin, sync::Arc, time::Duration};

// Re-exposed for aptos-network-checker
pub use netcore::transport::tcp::{resolve_and_connect, TcpSocket};
pub use netcore::transport::tcp::{resolve_and_connect, TCPBufferCfg, TcpSocket};

#[cfg(test)]
mod test;
Expand All @@ -54,6 +54,8 @@ pub const APTOS_TCP_TRANSPORT: tcp::TcpTransport = tcp::TcpTransport {
ttl: None,
// Use TCP_NODELAY for Aptos tcp connections.
nodelay: Some(true),
// Use default TCP setting, overridden by Network config
tcp_buff_cfg: tcp::TCPBufferCfg::new(),
};

/// A trait alias for "socket-like" things.
Expand Down

0 comments on commit 9c8d6c7

Please sign in to comment.