From 780a2760a515fd4fa4bde39fb24c2d7b314897df Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Thu, 29 Sep 2022 17:00:15 +0300 Subject: [PATCH] [Network] set socket RX/TX Buffers --- config/src/config/network_config.rs | 12 +++ .../src/check_endpoint.rs | 8 +- network/builder/src/builder.rs | 11 +++ network/netcore/src/transport/tcp.rs | 93 +++++++++++++++++-- network/src/peer_manager/builder.rs | 20 +++- network/src/transport/mod.rs | 4 +- 6 files changed, 132 insertions(+), 16 deletions(-) diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 5deb10bce5e74..1a598643e569a 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -51,6 +51,10 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */ pub const CONNECTION_BACKOFF_BASE: u64 = 2; pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */; pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE; +pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency +pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon +pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency +pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -79,6 +83,10 @@ pub struct NetworkConfig { pub mutual_authentication: bool, pub network_id: NetworkId, pub runtime_threads: Option, + pub inbound_rx_buffer_size_bytes: Option, + pub inbound_tx_buffer_size_bytes: Option, + pub outbound_rx_buffer_size_bytes: Option, + pub outbound_tx_buffer_size_bytes: Option, // Addresses of initial peers to connect to. In a mutual_authentication network, // we will extract the public keys from these addresses to set our initial // trusted peers set. TODO: Replace usage in configs with `seeds` this is for backwards compatibility @@ -141,6 +149,10 @@ impl NetworkConfig { inbound_rate_limit_config: None, outbound_rate_limit_config: None, max_message_size: MAX_MESSAGE_SIZE, + inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE), + inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE), + outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE), + outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE), }; config.prepare_identity(); config diff --git a/crates/aptos-network-checker/src/check_endpoint.rs b/crates/aptos-network-checker/src/check_endpoint.rs index f1ea6c6e48ae4..74c181ce3ff24 100644 --- a/crates/aptos-network-checker/src/check_endpoint.rs +++ b/crates/aptos-network-checker/src/check_endpoint.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::args::CheckEndpointArgs; use anyhow::{bail, Context, Result}; use aptos_config::{ config::{RoleType, HANDSHAKE_VERSION}, @@ -9,6 +10,7 @@ use aptos_config::{ use aptos_crypto::x25519::{self, PRIVATE_KEY_SIZE}; use aptos_types::{account_address, chain_id::ChainId, network_address::NetworkAddress, PeerId}; use futures::{AsyncReadExt, AsyncWriteExt}; +use network::transport::TCPBufferCfg; use network::{ noise::{HandshakeAuthMode, NoiseUpgrader}, protocols::wire::handshake::v1::ProtocolIdSet, @@ -18,8 +20,6 @@ use network::{ use std::{collections::BTreeMap, sync::Arc}; use tokio::time::Duration; -use crate::args::CheckEndpointArgs; - // This function must take the private key in as an owned value vs as part of // the args struct because private key needs to be owned, and cannot be cloned. pub async fn check_endpoint( @@ -86,7 +86,7 @@ async fn check_endpoint_with_handshake( ) -> Result { // Connect to the address, this should handle DNS resolution if necessary. let fut_socket = async { - resolve_and_connect(address.clone()) + resolve_and_connect(address.clone(), TCPBufferCfg::new()) .await .map(TcpSocket::new) }; @@ -112,7 +112,7 @@ async fn check_endpoint_with_handshake( const INVALID_NOISE_HEADER: &[u8; 152] = &[7; 152]; async fn check_endpoint_no_handshake(address: NetworkAddress) -> Result { - let mut socket = resolve_and_connect(address.clone()) + let mut socket = resolve_and_connect(address.clone(), TCPBufferCfg::new()) .await .map(TcpSocket::new) .with_context(|| format!("Failed to connect to {}", address))?; diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index f5d5596ab9f37..d70e40bb79e3f 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -38,6 +38,8 @@ use network::{ network::{AppConfig, NewNetworkEvents, NewNetworkSender}, }, }; + +use netcore::transport::tcp::TCPBufferCfg; use network_discovery::DiscoveryChangeListener; use std::{ clone::Clone, @@ -90,6 +92,7 @@ impl NetworkBuilder { inbound_connection_limit: usize, inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // A network cannot exist without a PeerManager // TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests. @@ -109,6 +112,7 @@ impl NetworkBuilder { inbound_connection_limit, inbound_rate_limit_config, outbound_rate_limit_config, + tcp_buffer_cfg, ); NetworkBuilder { @@ -152,6 +156,7 @@ impl NetworkBuilder { MAX_INBOUND_CONNECTIONS, None, None, + TCPBufferCfg::default(), ); builder.add_connectivity_manager( @@ -207,6 +212,12 @@ impl NetworkBuilder { config.max_inbound_connections, config.inbound_rate_limit_config, config.outbound_rate_limit_config, + TCPBufferCfg::new_configs( + config.inbound_rx_buffer_size_bytes, + config.inbound_tx_buffer_size_bytes, + config.outbound_rx_buffer_size_bytes, + config.outbound_tx_buffer_size_bytes, + ), ); network_builder.add_connection_monitoring( diff --git a/network/netcore/src/transport/tcp.rs b/network/netcore/src/transport/tcp.rs index 1398cd149fb5a..cfb5943d9790a 100644 --- a/network/netcore/src/transport/tcp.rs +++ b/network/netcore/src/transport/tcp.rs @@ -15,7 +15,6 @@ use futures::{ }; use proxy::Proxy; use std::{ - convert::TryFrom, fmt::Debug, io, net::SocketAddr, @@ -29,6 +28,38 @@ use tokio::{ use tokio_util::compat::Compat; use url::Url; +#[derive(Debug, Clone, Copy, Default)] +pub struct TCPBufferCfg { + inbound_rx_buffer_bytes: Option, + inbound_tx_buffer_bytes: Option, + outbound_rx_buffer_bytes: Option, + outbound_tx_buffer_bytes: Option, +} + +impl TCPBufferCfg { + pub const fn new() -> Self { + Self { + inbound_rx_buffer_bytes: None, + inbound_tx_buffer_bytes: None, + outbound_rx_buffer_bytes: None, + outbound_tx_buffer_bytes: None, + } + } + pub fn new_configs( + inbound_rx: Option, + inbound_tx: Option, + outbound_rx: Option, + outbound_tx: Option, + ) -> Self { + Self { + inbound_rx_buffer_bytes: inbound_rx, + inbound_tx_buffer_bytes: inbound_tx, + outbound_rx_buffer_bytes: outbound_rx, + outbound_tx_buffer_bytes: outbound_tx, + } + } +} + /// Transport to build TCP connections #[derive(Debug, Clone, Default)] pub struct TcpTransport { @@ -36,6 +67,8 @@ pub struct TcpTransport { pub ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. pub nodelay: Option, + + pub tcp_buff_cfg: TCPBufferCfg, } impl TcpTransport { @@ -50,6 +83,10 @@ impl TcpTransport { Ok(()) } + + pub fn set_tcp_buffers(&mut self, configs: &TCPBufferCfg) { + self.tcp_buff_cfg = *configs; + } } impl Transport for TcpTransport { @@ -69,9 +106,23 @@ impl Transport for TcpTransport { return Err(invalid_addr_error(&addr)); } - let listener = ::std::net::TcpListener::bind((ipaddr, port))?; - listener.set_nonblocking(true)?; - let listener = TcpListener::try_from(listener)?; + let addr = SocketAddr::new(ipaddr, port); + + let socket = if ipaddr.is_ipv4() { + tokio::net::TcpSocket::new_v4()? + } else { + tokio::net::TcpSocket::new_v6()? + }; + + if let Some(rx_buf) = self.tcp_buff_cfg.inbound_rx_buffer_bytes { + socket.set_recv_buffer_size(rx_buf)?; + } + if let Some(tx_buf) = self.tcp_buff_cfg.inbound_tx_buffer_bytes { + socket.set_send_buffer_size(tx_buf)?; + } + socket.bind(addr)?; + + let listener = socket.listen(256)?; let listen_addr = NetworkAddress::from(listener.local_addr()?); Ok(( @@ -88,7 +139,6 @@ impl Transport for TcpTransport { // ensure addr is well formed to save some work before potentially // spawning a dial task that will fail anyway. - // TODO(philiphayes): base tcp transport should not allow trailing protocols parse_ip_tcp(protos) .map(|_| ()) .or_else(|| parse_dns_tcp(protos).map(|_| ())) @@ -123,7 +173,7 @@ impl Transport for TcpTransport { let f: Pin> + Send + 'static>> = Box::pin(match proxy_addr { Some(proxy_addr) => Either::Left(connect_via_proxy(proxy_addr, addr)), - None => Either::Right(resolve_and_connect(addr)), + None => Either::Right(resolve_and_connect(addr, self.tcp_buff_cfg)), }); Ok(TcpOutbound { @@ -144,15 +194,40 @@ async fn resolve_with_filter( .filter(move |socketaddr| ip_filter.matches(socketaddr.ip()))) } +pub async fn connect_with_config( + port: u16, + ipaddr: std::net::IpAddr, + tcp_buff_cfg: TCPBufferCfg, +) -> io::Result { + let addr = SocketAddr::new(ipaddr, port); + + let socket = if addr.is_ipv4() { + tokio::net::TcpSocket::new_v4()? + } else { + tokio::net::TcpSocket::new_v6()? + }; + + if let Some(rx_buf) = tcp_buff_cfg.outbound_rx_buffer_bytes { + socket.set_recv_buffer_size(rx_buf)?; + } + if let Some(tx_buf) = tcp_buff_cfg.outbound_tx_buffer_bytes { + socket.set_send_buffer_size(tx_buf)?; + } + socket.connect(addr).await +} + /// Note: we need to take ownership of this `NetworkAddress` (instead of just /// borrowing the `&[Protocol]` slice) so this future can be `Send + 'static`. -pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result { +pub async fn resolve_and_connect( + addr: NetworkAddress, + tcp_buff_cfg: TCPBufferCfg, +) -> io::Result { let protos = addr.as_slice(); if let Some(((ipaddr, port), _addr_suffix)) = parse_ip_tcp(protos) { // this is an /ip4 or /ip6 address, so we can just connect without any // extra resolving or filtering. - TcpStream::connect((ipaddr, port)).await + connect_with_config(port, ipaddr, tcp_buff_cfg).await } else if let Some(((ip_filter, dns_name, port), _addr_suffix)) = parse_dns_tcp(protos) { // resolve dns name and filter let socketaddr_iter = resolve_with_filter(ip_filter, dns_name.as_ref(), port).await?; @@ -160,7 +235,7 @@ pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result // try to connect until the first succeeds for socketaddr in socketaddr_iter { - match TcpStream::connect(socketaddr).await { + match connect_with_config(socketaddr.port(), socketaddr.ip(), tcp_buff_cfg).await { Ok(stream) => return Ok(stream), Err(err) => last_err = Some(err), } diff --git a/network/src/peer_manager/builder.rs b/network/src/peer_manager/builder.rs index 3b2d559cfda9b..f44c33f9a55df 100644 --- a/network/src/peer_manager/builder.rs +++ b/network/src/peer_manager/builder.rs @@ -28,7 +28,7 @@ use channel::{self, aptos_channel, message_queues::QueueStyle}; #[cfg(any(test, feature = "testing", feature = "fuzzing"))] use netcore::transport::memory::MemoryTransport; use netcore::transport::{ - tcp::{TcpSocket, TcpTransport}, + tcp::{TCPBufferCfg, TcpSocket, TcpTransport}, Transport, }; use std::{clone::Clone, collections::HashMap, fmt::Debug, net::IpAddr, sync::Arc}; @@ -82,6 +82,7 @@ struct PeerManagerContext { inbound_connection_limit: usize, inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, + tcp_buffer_cfg: TCPBufferCfg, } impl PeerManagerContext { @@ -107,6 +108,7 @@ impl PeerManagerContext { inbound_connection_limit: usize, inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { Self { pm_reqs_tx, @@ -126,6 +128,7 @@ impl PeerManagerContext { inbound_connection_limit, inbound_rate_limit_config, outbound_rate_limit_config, + tcp_buffer_cfg, } } @@ -186,6 +189,7 @@ impl PeerManagerBuilder { inbound_connection_limit: usize, inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // Setup channel to send requests to peer manager. let (pm_reqs_tx, pm_reqs_rx) = aptos_channel::new( @@ -223,6 +227,7 @@ impl PeerManagerBuilder { inbound_connection_limit, inbound_rate_limit_config, outbound_rate_limit_config, + tcp_buffer_cfg, )), peer_manager: None, listen_address, @@ -278,11 +283,15 @@ impl PeerManagerBuilder { ), }; + let mut aptos_tcp_transport = APTOS_TCP_TRANSPORT.clone(); + let tcp_cfg = self.get_tcp_buffers_cfg(); + aptos_tcp_transport.set_tcp_buffers(&tcp_cfg); + self.peer_manager = match self.listen_address.as_slice() { [Ip4(_), Tcp(_)] | [Ip6(_), Tcp(_)] => { Some(TransportPeerManager::Tcp(self.build_with_transport( AptosNetTransport::new( - APTOS_TCP_TRANSPORT.clone(), + aptos_tcp_transport, self.network_context, self.time_service.clone(), key, @@ -406,6 +415,13 @@ impl PeerManagerBuilder { .add_connection_event_listener() } + pub fn get_tcp_buffers_cfg(&self) -> TCPBufferCfg { + self.peer_manager_context + .as_ref() + .expect("Cannot add an event listener if PeerManager has already been built.") + .tcp_buffer_cfg + } + /// Register a peer-to-peer service (i.e., both client and service) for given /// protocols. pub fn add_p2p_service( diff --git a/network/src/transport/mod.rs b/network/src/transport/mod.rs index e06c9e06763be..bf4f2810efe61 100644 --- a/network/src/transport/mod.rs +++ b/network/src/transport/mod.rs @@ -33,7 +33,7 @@ use short_hex_str::AsShortHexStr; use std::{collections::BTreeMap, convert::TryFrom, fmt, io, pin::Pin, sync::Arc, time::Duration}; // Re-exposed for aptos-network-checker -pub use netcore::transport::tcp::{resolve_and_connect, TcpSocket}; +pub use netcore::transport::tcp::{resolve_and_connect, TCPBufferCfg, TcpSocket}; #[cfg(test)] mod test; @@ -54,6 +54,8 @@ pub const APTOS_TCP_TRANSPORT: tcp::TcpTransport = tcp::TcpTransport { ttl: None, // Use TCP_NODELAY for Aptos tcp connections. nodelay: Some(true), + // Use default TCP setting, overridden by Network config + tcp_buff_cfg: tcp::TCPBufferCfg::new(), }; /// A trait alias for "socket-like" things.