From be18154fd224a076bf7fffa2f6daeead9f1d6dc7 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 21 Sep 2023 16:18:15 -0700 Subject: [PATCH 1/5] [Networking] remove TCP window overrides and configs --- config/src/config/network_config.rs | 12 --- .../src/check_endpoint.rs | 6 +- network/builder/src/builder.rs | 10 --- network/framework/src/peer_manager/builder.rs | 20 +---- network/framework/src/transport/mod.rs | 4 +- network/netcore/src/transport/tcp.rs | 78 +------------------ 6 files changed, 10 insertions(+), 120 deletions(-) diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 9bd212636f4b7..a8ce3fcffcf4a 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -53,10 +53,6 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */ pub const CONNECTION_BACKOFF_BASE: u64 = 2; pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */; pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE; -pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency -pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon -pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency -pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -89,10 +85,6 @@ pub struct NetworkConfig { pub network_id: NetworkId, /// Number of threads to run for networking pub runtime_threads: Option, - pub inbound_rx_buffer_size_bytes: Option, - pub inbound_tx_buffer_size_bytes: Option, - pub outbound_rx_buffer_size_bytes: Option, - pub outbound_tx_buffer_size_bytes: Option, /// Addresses of initial peers to connect to. In a mutual_authentication network, /// we will extract the public keys from these addresses to set our initial /// trusted peers set. TODO: Replace usage in configs with `seeds` this is for backwards compatibility @@ -157,10 +149,6 @@ impl NetworkConfig { inbound_rate_limit_config: None, outbound_rate_limit_config: None, max_message_size: MAX_MESSAGE_SIZE, - inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE), - inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE), - outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE), - outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE), max_parallel_deserialization_tasks: None, }; diff --git a/crates/aptos-network-checker/src/check_endpoint.rs b/crates/aptos-network-checker/src/check_endpoint.rs index a470ea3c06182..ad5bb0be9607f 100644 --- a/crates/aptos-network-checker/src/check_endpoint.rs +++ b/crates/aptos-network-checker/src/check_endpoint.rs @@ -12,7 +12,7 @@ use aptos_network::{ noise::{HandshakeAuthMode, NoiseUpgrader}, protocols::wire::handshake::v1::ProtocolIdSet, transport::{ - resolve_and_connect, upgrade_outbound, TCPBufferCfg, TcpSocket, UpgradeContext, + resolve_and_connect, upgrade_outbound, TcpSocket, UpgradeContext, SUPPORTED_MESSAGING_PROTOCOL, }, }; @@ -87,7 +87,7 @@ async fn check_endpoint_with_handshake( ) -> Result { // Connect to the address, this should handle DNS resolution if necessary. let fut_socket = async { - resolve_and_connect(address.clone(), TCPBufferCfg::new()) + resolve_and_connect(address.clone()) .await .map(TcpSocket::new) }; @@ -118,7 +118,7 @@ async fn check_endpoint_with_handshake( const INVALID_NOISE_HEADER: &[u8; 152] = &[7; 152]; async fn check_endpoint_no_handshake(address: NetworkAddress) -> Result { - let mut socket = resolve_and_connect(address.clone(), TCPBufferCfg::new()) + let mut socket = resolve_and_connect(address.clone()) .await .map(TcpSocket::new) .map_err(|error| { diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 09b76c724acd9..116d48d045f8c 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -24,7 +24,6 @@ use aptos_event_notifications::{ DbBackedOnChainConfig, EventSubscriptionService, ReconfigNotificationListener, }; use aptos_logger::prelude::*; -use aptos_netcore::transport::tcp::TCPBufferCfg; use aptos_network::{ application::storage::PeersAndMetadata, connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest}, @@ -89,7 +88,6 @@ impl NetworkBuilder { network_channel_size: usize, max_concurrent_network_reqs: usize, inbound_connection_limit: usize, - tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // A network cannot exist without a PeerManager // TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests. @@ -106,7 +104,6 @@ impl NetworkBuilder { max_message_size, enable_proxy_protocol, inbound_connection_limit, - tcp_buffer_cfg, ); NetworkBuilder { @@ -146,7 +143,6 @@ impl NetworkBuilder { NETWORK_CHANNEL_SIZE, MAX_CONCURRENT_NETWORK_REQS, MAX_INBOUND_CONNECTIONS, - TCPBufferCfg::default(), ); builder.add_connectivity_manager( @@ -197,12 +193,6 @@ impl NetworkBuilder { config.network_channel_size, config.max_concurrent_network_reqs, config.max_inbound_connections, - TCPBufferCfg::new_configs( - config.inbound_rx_buffer_size_bytes, - config.inbound_tx_buffer_size_bytes, - config.outbound_rx_buffer_size_bytes, - config.outbound_tx_buffer_size_bytes, - ), ); network_builder.add_connection_monitoring( diff --git a/network/framework/src/peer_manager/builder.rs b/network/framework/src/peer_manager/builder.rs index c952e7a0cb75f..f1fd0e0492072 100644 --- a/network/framework/src/peer_manager/builder.rs +++ b/network/framework/src/peer_manager/builder.rs @@ -24,7 +24,7 @@ use aptos_logger::prelude::*; #[cfg(any(test, feature = "testing", feature = "fuzzing"))] use aptos_netcore::transport::memory::MemoryTransport; use aptos_netcore::transport::{ - tcp::{TCPBufferCfg, TcpSocket, TcpTransport}, + tcp::{TcpSocket, TcpTransport}, Transport, }; use aptos_time_service::TimeService; @@ -78,7 +78,6 @@ struct PeerManagerContext { max_frame_size: usize, max_message_size: usize, inbound_connection_limit: usize, - tcp_buffer_cfg: TCPBufferCfg, } impl PeerManagerContext { @@ -101,7 +100,6 @@ impl PeerManagerContext { max_frame_size: usize, max_message_size: usize, inbound_connection_limit: usize, - tcp_buffer_cfg: TCPBufferCfg, ) -> Self { Self { pm_reqs_tx, @@ -118,7 +116,6 @@ impl PeerManagerContext { max_frame_size, max_message_size, inbound_connection_limit, - tcp_buffer_cfg, } } @@ -176,7 +173,6 @@ impl PeerManagerBuilder { max_message_size: usize, enable_proxy_protocol: bool, inbound_connection_limit: usize, - tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // Setup channel to send requests to peer manager. let (pm_reqs_tx, pm_reqs_rx) = aptos_channel::new( @@ -211,7 +207,6 @@ impl PeerManagerBuilder { max_frame_size, max_message_size, inbound_connection_limit, - tcp_buffer_cfg, )), peer_manager: None, listen_address, @@ -267,15 +262,11 @@ impl PeerManagerBuilder { ), }; - let mut aptos_tcp_transport = APTOS_TCP_TRANSPORT.clone(); - let tcp_cfg = self.get_tcp_buffers_cfg(); - aptos_tcp_transport.set_tcp_buffers(&tcp_cfg); - self.peer_manager = match self.listen_address.as_slice() { [Ip4(_), Tcp(_)] | [Ip6(_), Tcp(_)] => { Some(TransportPeerManager::Tcp(self.build_with_transport( AptosNetTransport::new( - aptos_tcp_transport, + APTOS_TCP_TRANSPORT.clone(), self.network_context, self.time_service.clone(), key, @@ -386,13 +377,6 @@ impl PeerManagerBuilder { .add_connection_event_listener() } - pub fn get_tcp_buffers_cfg(&self) -> TCPBufferCfg { - self.peer_manager_context - .as_ref() - .expect("Cannot add an event listener if PeerManager has already been built.") - .tcp_buffer_cfg - } - /// Register a client that's interested in some set of protocols and return /// the outbound channels into network. pub fn add_client( diff --git a/network/framework/src/transport/mod.rs b/network/framework/src/transport/mod.rs index f3b0543fd730a..11cb8552fd854 100644 --- a/network/framework/src/transport/mod.rs +++ b/network/framework/src/transport/mod.rs @@ -18,7 +18,7 @@ use aptos_crypto::x25519; use aptos_id_generator::{IdGenerator, U32IdGenerator}; use aptos_logger::prelude::*; // Re-exposed for aptos-network-checker -pub use aptos_netcore::transport::tcp::{resolve_and_connect, TCPBufferCfg, TcpSocket}; +pub use aptos_netcore::transport::tcp::{resolve_and_connect, TcpSocket}; use aptos_netcore::transport::{proxy_protocol, tcp, ConnectionOrigin, Transport}; use aptos_short_hex_str::AsShortHexStr; use aptos_time_service::{timeout, TimeService, TimeServiceTrait}; @@ -54,8 +54,6 @@ pub const APTOS_TCP_TRANSPORT: tcp::TcpTransport = tcp::TcpTransport { ttl: None, // Use TCP_NODELAY for Aptos tcp connections. nodelay: Some(true), - // Use default TCP setting, overridden by Network config - tcp_buff_cfg: tcp::TCPBufferCfg::new(), }; /// A trait alias for "socket-like" things. diff --git a/network/netcore/src/transport/tcp.rs b/network/netcore/src/transport/tcp.rs index 196e40340dbcb..90b5ed647f5bb 100644 --- a/network/netcore/src/transport/tcp.rs +++ b/network/netcore/src/transport/tcp.rs @@ -29,39 +29,6 @@ use tokio::{ use tokio_util::compat::Compat; use url::Url; -#[derive(Debug, Clone, Copy, Default)] -pub struct TCPBufferCfg { - inbound_rx_buffer_bytes: Option, - inbound_tx_buffer_bytes: Option, - outbound_rx_buffer_bytes: Option, - outbound_tx_buffer_bytes: Option, -} - -impl TCPBufferCfg { - pub const fn new() -> Self { - Self { - inbound_rx_buffer_bytes: None, - inbound_tx_buffer_bytes: None, - outbound_rx_buffer_bytes: None, - outbound_tx_buffer_bytes: None, - } - } - - pub fn new_configs( - inbound_rx: Option, - inbound_tx: Option, - outbound_rx: Option, - outbound_tx: Option, - ) -> Self { - Self { - inbound_rx_buffer_bytes: inbound_rx, - inbound_tx_buffer_bytes: inbound_tx, - outbound_rx_buffer_bytes: outbound_rx, - outbound_tx_buffer_bytes: outbound_tx, - } - } -} - /// Transport to build TCP connections #[derive(Debug, Clone, Default)] pub struct TcpTransport { @@ -69,8 +36,6 @@ pub struct TcpTransport { pub ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. pub nodelay: Option, - - pub tcp_buff_cfg: TCPBufferCfg, } impl TcpTransport { @@ -85,10 +50,6 @@ impl TcpTransport { Ok(()) } - - pub fn set_tcp_buffers(&mut self, configs: &TCPBufferCfg) { - self.tcp_buff_cfg = *configs; - } } impl Transport for TcpTransport { @@ -116,12 +77,6 @@ impl Transport for TcpTransport { tokio::net::TcpSocket::new_v6()? }; - if let Some(rx_buf) = self.tcp_buff_cfg.inbound_rx_buffer_bytes { - socket.set_recv_buffer_size(rx_buf)?; - } - if let Some(tx_buf) = self.tcp_buff_cfg.inbound_tx_buffer_bytes { - socket.set_send_buffer_size(tx_buf)?; - } socket.set_reuseaddr(true)?; socket.bind(addr)?; @@ -176,7 +131,7 @@ impl Transport for TcpTransport { let f: Pin> + Send + 'static>> = Box::pin(match proxy_addr { Some(proxy_addr) => Either::Left(connect_via_proxy(proxy_addr, addr)), - None => Either::Right(resolve_and_connect(addr, self.tcp_buff_cfg)), + None => Either::Right(resolve_and_connect(addr)), }); Ok(TcpOutbound { @@ -197,40 +152,15 @@ async fn resolve_with_filter( .filter(move |socketaddr| ip_filter.matches(socketaddr.ip()))) } -pub async fn connect_with_config( - port: u16, - ipaddr: std::net::IpAddr, - tcp_buff_cfg: TCPBufferCfg, -) -> io::Result { - let addr = SocketAddr::new(ipaddr, port); - - let socket = if addr.is_ipv4() { - tokio::net::TcpSocket::new_v4()? - } else { - tokio::net::TcpSocket::new_v6()? - }; - - if let Some(rx_buf) = tcp_buff_cfg.outbound_rx_buffer_bytes { - socket.set_recv_buffer_size(rx_buf)?; - } - if let Some(tx_buf) = tcp_buff_cfg.outbound_tx_buffer_bytes { - socket.set_send_buffer_size(tx_buf)?; - } - socket.connect(addr).await -} - /// Note: we need to take ownership of this `NetworkAddress` (instead of just /// borrowing the `&[Protocol]` slice) so this future can be `Send + 'static`. -pub async fn resolve_and_connect( - addr: NetworkAddress, - tcp_buff_cfg: TCPBufferCfg, -) -> io::Result { +pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result { let protos = addr.as_slice(); if let Some(((ipaddr, port), _addr_suffix)) = parse_ip_tcp(protos) { // this is an /ip4 or /ip6 address, so we can just connect without any // extra resolving or filtering. - connect_with_config(port, ipaddr, tcp_buff_cfg).await + TcpStream::connect((ipaddr, port)).await } else if let Some(((ip_filter, dns_name, port), _addr_suffix)) = parse_dns_tcp(protos) { // resolve dns name and filter let socketaddr_iter = resolve_with_filter(ip_filter, dns_name.as_ref(), port).await?; @@ -238,7 +168,7 @@ pub async fn resolve_and_connect( // try to connect until the first succeeds for socketaddr in socketaddr_iter { - match connect_with_config(socketaddr.port(), socketaddr.ip(), tcp_buff_cfg).await { + match TcpStream::connect((socketaddr.ip(), socketaddr.port())).await { Ok(stream) => return Ok(stream), Err(err) => last_err = Some(err), } From 367954abf1bbb6b2806bb9c1815ad70de1f645ea Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 21 Sep 2023 16:23:03 -0700 Subject: [PATCH 2/5] update tests to reflect improvement --- testsuite/forge-cli/src/main.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 06c2278450aa4..1aa8483780985 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -1367,12 +1367,12 @@ fn netbench_config_100_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { netbench_config.direct_send_per_second = 1000; } -fn netbench_config_2_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { +fn netbench_config_3_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { netbench_config.enabled = true; netbench_config.max_network_channel_size = 1000; netbench_config.enable_direct_send_testing = true; netbench_config.direct_send_data_size = 100000; - netbench_config.direct_send_per_second = 20; + netbench_config.direct_send_per_second = 30; } fn net_bench() -> ForgeConfig { @@ -1393,7 +1393,7 @@ fn net_bench_two_region_env() -> ForgeConfig { .with_validator_override_node_config_fn(Arc::new(|config, _| { // Not using 100 MBps here, as it will lead to throughput collapse let mut netbench_config = NetbenchConfig::default(); - netbench_config_2_megabytes_per_sec(&mut netbench_config); + netbench_config_3_megabytes_per_sec(&mut netbench_config); config.netbench = Some(netbench_config); })) } @@ -2062,13 +2062,7 @@ fn pfn_performance( add_network_emulation: bool, ) -> ForgeConfig { // Determine the minimum expected TPS - let min_expected_tps = if add_cpu_chaos { - 3000 - } else if add_network_emulation { - 4000 - } else { - 4500 - }; + let min_expected_tps = 4500; // Create the forge config ForgeConfig::default() From 94d937c47be017982a3187f647d362651feb36d5 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 21 Sep 2023 21:48:48 -0700 Subject: [PATCH 3/5] 4 MB/s --- testsuite/forge-cli/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 1aa8483780985..9f8621d50b8f4 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -1367,12 +1367,12 @@ fn netbench_config_100_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { netbench_config.direct_send_per_second = 1000; } -fn netbench_config_3_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { +fn netbench_config_4_megabytes_per_sec(netbench_config: &mut NetbenchConfig) { netbench_config.enabled = true; netbench_config.max_network_channel_size = 1000; netbench_config.enable_direct_send_testing = true; netbench_config.direct_send_data_size = 100000; - netbench_config.direct_send_per_second = 30; + netbench_config.direct_send_per_second = 40; } fn net_bench() -> ForgeConfig { @@ -1393,7 +1393,7 @@ fn net_bench_two_region_env() -> ForgeConfig { .with_validator_override_node_config_fn(Arc::new(|config, _| { // Not using 100 MBps here, as it will lead to throughput collapse let mut netbench_config = NetbenchConfig::default(); - netbench_config_3_megabytes_per_sec(&mut netbench_config); + netbench_config_4_megabytes_per_sec(&mut netbench_config); config.netbench = Some(netbench_config); })) } From 08d8263d943ef0aafe30a19560d7394340e40c89 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 22 Sep 2023 08:48:38 -0700 Subject: [PATCH 4/5] Revert "[Networking] remove TCP window overrides and configs" This reverts commit be18154fd224a076bf7fffa2f6daeead9f1d6dc7. --- config/src/config/network_config.rs | 12 +++ .../src/check_endpoint.rs | 6 +- network/builder/src/builder.rs | 10 +++ network/framework/src/peer_manager/builder.rs | 20 ++++- network/framework/src/transport/mod.rs | 4 +- network/netcore/src/transport/tcp.rs | 78 ++++++++++++++++++- 6 files changed, 120 insertions(+), 10 deletions(-) diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index a8ce3fcffcf4a..9bd212636f4b7 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -53,6 +53,10 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */ pub const CONNECTION_BACKOFF_BASE: u64 = 2; pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */; pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE; +pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency +pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon +pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency +pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -85,6 +89,10 @@ pub struct NetworkConfig { pub network_id: NetworkId, /// Number of threads to run for networking pub runtime_threads: Option, + pub inbound_rx_buffer_size_bytes: Option, + pub inbound_tx_buffer_size_bytes: Option, + pub outbound_rx_buffer_size_bytes: Option, + pub outbound_tx_buffer_size_bytes: Option, /// Addresses of initial peers to connect to. In a mutual_authentication network, /// we will extract the public keys from these addresses to set our initial /// trusted peers set. TODO: Replace usage in configs with `seeds` this is for backwards compatibility @@ -149,6 +157,10 @@ impl NetworkConfig { inbound_rate_limit_config: None, outbound_rate_limit_config: None, max_message_size: MAX_MESSAGE_SIZE, + inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE), + inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE), + outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE), + outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE), max_parallel_deserialization_tasks: None, }; diff --git a/crates/aptos-network-checker/src/check_endpoint.rs b/crates/aptos-network-checker/src/check_endpoint.rs index ad5bb0be9607f..a470ea3c06182 100644 --- a/crates/aptos-network-checker/src/check_endpoint.rs +++ b/crates/aptos-network-checker/src/check_endpoint.rs @@ -12,7 +12,7 @@ use aptos_network::{ noise::{HandshakeAuthMode, NoiseUpgrader}, protocols::wire::handshake::v1::ProtocolIdSet, transport::{ - resolve_and_connect, upgrade_outbound, TcpSocket, UpgradeContext, + resolve_and_connect, upgrade_outbound, TCPBufferCfg, TcpSocket, UpgradeContext, SUPPORTED_MESSAGING_PROTOCOL, }, }; @@ -87,7 +87,7 @@ async fn check_endpoint_with_handshake( ) -> Result { // Connect to the address, this should handle DNS resolution if necessary. let fut_socket = async { - resolve_and_connect(address.clone()) + resolve_and_connect(address.clone(), TCPBufferCfg::new()) .await .map(TcpSocket::new) }; @@ -118,7 +118,7 @@ async fn check_endpoint_with_handshake( const INVALID_NOISE_HEADER: &[u8; 152] = &[7; 152]; async fn check_endpoint_no_handshake(address: NetworkAddress) -> Result { - let mut socket = resolve_and_connect(address.clone()) + let mut socket = resolve_and_connect(address.clone(), TCPBufferCfg::new()) .await .map(TcpSocket::new) .map_err(|error| { diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 116d48d045f8c..09b76c724acd9 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -24,6 +24,7 @@ use aptos_event_notifications::{ DbBackedOnChainConfig, EventSubscriptionService, ReconfigNotificationListener, }; use aptos_logger::prelude::*; +use aptos_netcore::transport::tcp::TCPBufferCfg; use aptos_network::{ application::storage::PeersAndMetadata, connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest}, @@ -88,6 +89,7 @@ impl NetworkBuilder { network_channel_size: usize, max_concurrent_network_reqs: usize, inbound_connection_limit: usize, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // A network cannot exist without a PeerManager // TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests. @@ -104,6 +106,7 @@ impl NetworkBuilder { max_message_size, enable_proxy_protocol, inbound_connection_limit, + tcp_buffer_cfg, ); NetworkBuilder { @@ -143,6 +146,7 @@ impl NetworkBuilder { NETWORK_CHANNEL_SIZE, MAX_CONCURRENT_NETWORK_REQS, MAX_INBOUND_CONNECTIONS, + TCPBufferCfg::default(), ); builder.add_connectivity_manager( @@ -193,6 +197,12 @@ impl NetworkBuilder { config.network_channel_size, config.max_concurrent_network_reqs, config.max_inbound_connections, + TCPBufferCfg::new_configs( + config.inbound_rx_buffer_size_bytes, + config.inbound_tx_buffer_size_bytes, + config.outbound_rx_buffer_size_bytes, + config.outbound_tx_buffer_size_bytes, + ), ); network_builder.add_connection_monitoring( diff --git a/network/framework/src/peer_manager/builder.rs b/network/framework/src/peer_manager/builder.rs index f1fd0e0492072..c952e7a0cb75f 100644 --- a/network/framework/src/peer_manager/builder.rs +++ b/network/framework/src/peer_manager/builder.rs @@ -24,7 +24,7 @@ use aptos_logger::prelude::*; #[cfg(any(test, feature = "testing", feature = "fuzzing"))] use aptos_netcore::transport::memory::MemoryTransport; use aptos_netcore::transport::{ - tcp::{TcpSocket, TcpTransport}, + tcp::{TCPBufferCfg, TcpSocket, TcpTransport}, Transport, }; use aptos_time_service::TimeService; @@ -78,6 +78,7 @@ struct PeerManagerContext { max_frame_size: usize, max_message_size: usize, inbound_connection_limit: usize, + tcp_buffer_cfg: TCPBufferCfg, } impl PeerManagerContext { @@ -100,6 +101,7 @@ impl PeerManagerContext { max_frame_size: usize, max_message_size: usize, inbound_connection_limit: usize, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { Self { pm_reqs_tx, @@ -116,6 +118,7 @@ impl PeerManagerContext { max_frame_size, max_message_size, inbound_connection_limit, + tcp_buffer_cfg, } } @@ -173,6 +176,7 @@ impl PeerManagerBuilder { max_message_size: usize, enable_proxy_protocol: bool, inbound_connection_limit: usize, + tcp_buffer_cfg: TCPBufferCfg, ) -> Self { // Setup channel to send requests to peer manager. let (pm_reqs_tx, pm_reqs_rx) = aptos_channel::new( @@ -207,6 +211,7 @@ impl PeerManagerBuilder { max_frame_size, max_message_size, inbound_connection_limit, + tcp_buffer_cfg, )), peer_manager: None, listen_address, @@ -262,11 +267,15 @@ impl PeerManagerBuilder { ), }; + let mut aptos_tcp_transport = APTOS_TCP_TRANSPORT.clone(); + let tcp_cfg = self.get_tcp_buffers_cfg(); + aptos_tcp_transport.set_tcp_buffers(&tcp_cfg); + self.peer_manager = match self.listen_address.as_slice() { [Ip4(_), Tcp(_)] | [Ip6(_), Tcp(_)] => { Some(TransportPeerManager::Tcp(self.build_with_transport( AptosNetTransport::new( - APTOS_TCP_TRANSPORT.clone(), + aptos_tcp_transport, self.network_context, self.time_service.clone(), key, @@ -377,6 +386,13 @@ impl PeerManagerBuilder { .add_connection_event_listener() } + pub fn get_tcp_buffers_cfg(&self) -> TCPBufferCfg { + self.peer_manager_context + .as_ref() + .expect("Cannot add an event listener if PeerManager has already been built.") + .tcp_buffer_cfg + } + /// Register a client that's interested in some set of protocols and return /// the outbound channels into network. pub fn add_client( diff --git a/network/framework/src/transport/mod.rs b/network/framework/src/transport/mod.rs index 11cb8552fd854..f3b0543fd730a 100644 --- a/network/framework/src/transport/mod.rs +++ b/network/framework/src/transport/mod.rs @@ -18,7 +18,7 @@ use aptos_crypto::x25519; use aptos_id_generator::{IdGenerator, U32IdGenerator}; use aptos_logger::prelude::*; // Re-exposed for aptos-network-checker -pub use aptos_netcore::transport::tcp::{resolve_and_connect, TcpSocket}; +pub use aptos_netcore::transport::tcp::{resolve_and_connect, TCPBufferCfg, TcpSocket}; use aptos_netcore::transport::{proxy_protocol, tcp, ConnectionOrigin, Transport}; use aptos_short_hex_str::AsShortHexStr; use aptos_time_service::{timeout, TimeService, TimeServiceTrait}; @@ -54,6 +54,8 @@ pub const APTOS_TCP_TRANSPORT: tcp::TcpTransport = tcp::TcpTransport { ttl: None, // Use TCP_NODELAY for Aptos tcp connections. nodelay: Some(true), + // Use default TCP setting, overridden by Network config + tcp_buff_cfg: tcp::TCPBufferCfg::new(), }; /// A trait alias for "socket-like" things. diff --git a/network/netcore/src/transport/tcp.rs b/network/netcore/src/transport/tcp.rs index 90b5ed647f5bb..196e40340dbcb 100644 --- a/network/netcore/src/transport/tcp.rs +++ b/network/netcore/src/transport/tcp.rs @@ -29,6 +29,39 @@ use tokio::{ use tokio_util::compat::Compat; use url::Url; +#[derive(Debug, Clone, Copy, Default)] +pub struct TCPBufferCfg { + inbound_rx_buffer_bytes: Option, + inbound_tx_buffer_bytes: Option, + outbound_rx_buffer_bytes: Option, + outbound_tx_buffer_bytes: Option, +} + +impl TCPBufferCfg { + pub const fn new() -> Self { + Self { + inbound_rx_buffer_bytes: None, + inbound_tx_buffer_bytes: None, + outbound_rx_buffer_bytes: None, + outbound_tx_buffer_bytes: None, + } + } + + pub fn new_configs( + inbound_rx: Option, + inbound_tx: Option, + outbound_rx: Option, + outbound_tx: Option, + ) -> Self { + Self { + inbound_rx_buffer_bytes: inbound_rx, + inbound_tx_buffer_bytes: inbound_tx, + outbound_rx_buffer_bytes: outbound_rx, + outbound_tx_buffer_bytes: outbound_tx, + } + } +} + /// Transport to build TCP connections #[derive(Debug, Clone, Default)] pub struct TcpTransport { @@ -36,6 +69,8 @@ pub struct TcpTransport { pub ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. pub nodelay: Option, + + pub tcp_buff_cfg: TCPBufferCfg, } impl TcpTransport { @@ -50,6 +85,10 @@ impl TcpTransport { Ok(()) } + + pub fn set_tcp_buffers(&mut self, configs: &TCPBufferCfg) { + self.tcp_buff_cfg = *configs; + } } impl Transport for TcpTransport { @@ -77,6 +116,12 @@ impl Transport for TcpTransport { tokio::net::TcpSocket::new_v6()? }; + if let Some(rx_buf) = self.tcp_buff_cfg.inbound_rx_buffer_bytes { + socket.set_recv_buffer_size(rx_buf)?; + } + if let Some(tx_buf) = self.tcp_buff_cfg.inbound_tx_buffer_bytes { + socket.set_send_buffer_size(tx_buf)?; + } socket.set_reuseaddr(true)?; socket.bind(addr)?; @@ -131,7 +176,7 @@ impl Transport for TcpTransport { let f: Pin> + Send + 'static>> = Box::pin(match proxy_addr { Some(proxy_addr) => Either::Left(connect_via_proxy(proxy_addr, addr)), - None => Either::Right(resolve_and_connect(addr)), + None => Either::Right(resolve_and_connect(addr, self.tcp_buff_cfg)), }); Ok(TcpOutbound { @@ -152,15 +197,40 @@ async fn resolve_with_filter( .filter(move |socketaddr| ip_filter.matches(socketaddr.ip()))) } +pub async fn connect_with_config( + port: u16, + ipaddr: std::net::IpAddr, + tcp_buff_cfg: TCPBufferCfg, +) -> io::Result { + let addr = SocketAddr::new(ipaddr, port); + + let socket = if addr.is_ipv4() { + tokio::net::TcpSocket::new_v4()? + } else { + tokio::net::TcpSocket::new_v6()? + }; + + if let Some(rx_buf) = tcp_buff_cfg.outbound_rx_buffer_bytes { + socket.set_recv_buffer_size(rx_buf)?; + } + if let Some(tx_buf) = tcp_buff_cfg.outbound_tx_buffer_bytes { + socket.set_send_buffer_size(tx_buf)?; + } + socket.connect(addr).await +} + /// Note: we need to take ownership of this `NetworkAddress` (instead of just /// borrowing the `&[Protocol]` slice) so this future can be `Send + 'static`. -pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result { +pub async fn resolve_and_connect( + addr: NetworkAddress, + tcp_buff_cfg: TCPBufferCfg, +) -> io::Result { let protos = addr.as_slice(); if let Some(((ipaddr, port), _addr_suffix)) = parse_ip_tcp(protos) { // this is an /ip4 or /ip6 address, so we can just connect without any // extra resolving or filtering. - TcpStream::connect((ipaddr, port)).await + connect_with_config(port, ipaddr, tcp_buff_cfg).await } else if let Some(((ip_filter, dns_name, port), _addr_suffix)) = parse_dns_tcp(protos) { // resolve dns name and filter let socketaddr_iter = resolve_with_filter(ip_filter, dns_name.as_ref(), port).await?; @@ -168,7 +238,7 @@ pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result // try to connect until the first succeeds for socketaddr in socketaddr_iter { - match TcpStream::connect((socketaddr.ip(), socketaddr.port())).await { + match connect_with_config(socketaddr.port(), socketaddr.ip(), tcp_buff_cfg).await { Ok(stream) => return Ok(stream), Err(err) => last_err = Some(err), } From 8ed6877357f5d59cf33eddaf46d1e9ad2cd1dcad Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 22 Sep 2023 08:58:15 -0700 Subject: [PATCH 5/5] Set the default values to None and comment to warn --- config/src/config/network_config.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 9bd212636f4b7..4fb512e80f3e5 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -53,10 +53,6 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */ pub const CONNECTION_BACKOFF_BASE: u64 = 2; pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */; pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE; -pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency -pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon -pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency -pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -89,6 +85,14 @@ pub struct NetworkConfig { pub network_id: NetworkId, /// Number of threads to run for networking pub runtime_threads: Option, + /// Overrides for the size of the inbound and outbound buffers for each peer. + /// NOTE: The defaults are None, so socket options are not called. Change to Some values with + /// caution. Experiments have shown that relying on Linux's default tcp auto-tuning can perform + /// better than setting these. In particular, for larger values to take effect, the + /// `net.core.rmem_max` and `net.core.wmem_max` sysctl values may need to be increased. On a + /// vanilla GCP machine, these are set to 212992. Without increasing the sysctl values and + /// setting a value will constrain the buffer size to the sysctl value. (In contrast, default + /// auto-tuning can increase beyond these values.) pub inbound_rx_buffer_size_bytes: Option, pub inbound_tx_buffer_size_bytes: Option, pub outbound_rx_buffer_size_bytes: Option, @@ -157,10 +161,10 @@ impl NetworkConfig { inbound_rate_limit_config: None, outbound_rate_limit_config: None, max_message_size: MAX_MESSAGE_SIZE, - inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE), - inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE), - outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE), - outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE), + inbound_rx_buffer_size_bytes: None, + inbound_tx_buffer_size_bytes: None, + outbound_rx_buffer_size_bytes: None, + outbound_tx_buffer_size_bytes: None, max_parallel_deserialization_tasks: None, };