Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding configurabele TCP buffers and minor hafixes #4649

Merged
merged 5 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */
pub const CONNECTION_BACKOFF_BASE: u64 = 2;
pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */;
pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE;
pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a technical term, or just informal wording? use a bigger spoon 😄 Not sure how to respond based on the comment lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Informal but descriptive.

pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand Down Expand Up @@ -79,6 +83,10 @@ pub struct NetworkConfig {
pub mutual_authentication: bool,
pub network_id: NetworkId,
pub runtime_threads: Option<usize>,
pub inbound_rx_buffer_size_bytes: Option<u32>,
pub inbound_tx_buffer_size_bytes: Option<u32>,
pub outbound_rx_buffer_size_bytes: Option<u32>,
pub outbound_tx_buffer_size_bytes: Option<u32>,
Comment on lines +86 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard Options are not friendly to terraform templating, etc, shall we just default to the default numbers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it should be fine, so long as this lands: #4847

// Addresses of initial peers to connect to. In a mutual_authentication network,
// we will extract the public keys from these addresses to set our initial
// trusted peers set. TODO: Replace usage in configs with `seeds` this is for backwards compatibility
Expand Down Expand Up @@ -141,6 +149,10 @@ impl NetworkConfig {
inbound_rate_limit_config: None,
outbound_rate_limit_config: None,
max_message_size: MAX_MESSAGE_SIZE,
inbound_rx_buffer_size_bytes: Some(INBOUND_TCP_RX_BUFFER_SIZE),
inbound_tx_buffer_size_bytes: Some(INBOUND_TCP_TX_BUFFER_SIZE),
outbound_rx_buffer_size_bytes: Some(OUTBOUND_TCP_RX_BUFFER_SIZE),
outbound_tx_buffer_size_bytes: Some(OUTBOUND_TCP_TX_BUFFER_SIZE),
};
config.prepare_identity();
config
Expand Down
8 changes: 4 additions & 4 deletions crates/aptos-network-checker/src/check_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::args::CheckEndpointArgs;
use anyhow::{bail, Context, Result};
use aptos_config::{
config::{RoleType, HANDSHAKE_VERSION},
Expand All @@ -9,6 +10,7 @@ use aptos_config::{
use aptos_crypto::x25519::{self, PRIVATE_KEY_SIZE};
use aptos_types::{account_address, chain_id::ChainId, network_address::NetworkAddress, PeerId};
use futures::{AsyncReadExt, AsyncWriteExt};
use network::transport::TCPBufferCfg;
use network::{
noise::{HandshakeAuthMode, NoiseUpgrader},
protocols::wire::handshake::v1::ProtocolIdSet,
Expand All @@ -18,8 +20,6 @@ use network::{
use std::{collections::BTreeMap, sync::Arc};
use tokio::time::Duration;

use crate::args::CheckEndpointArgs;

// This function must take the private key in as an owned value vs as part of
// the args struct because private key needs to be owned, and cannot be cloned.
pub async fn check_endpoint(
Expand Down Expand Up @@ -86,7 +86,7 @@ async fn check_endpoint_with_handshake(
) -> Result<String> {
// Connect to the address, this should handle DNS resolution if necessary.
let fut_socket = async {
resolve_and_connect(address.clone())
resolve_and_connect(address.clone(), TCPBufferCfg::new())
.await
.map(TcpSocket::new)
};
Expand All @@ -112,7 +112,7 @@ async fn check_endpoint_with_handshake(
const INVALID_NOISE_HEADER: &[u8; 152] = &[7; 152];

async fn check_endpoint_no_handshake(address: NetworkAddress) -> Result<String> {
let mut socket = resolve_and_connect(address.clone())
let mut socket = resolve_and_connect(address.clone(), TCPBufferCfg::new())
.await
.map(TcpSocket::new)
.with_context(|| format!("Failed to connect to {}", address))?;
Expand Down
11 changes: 11 additions & 0 deletions network/builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use network::{
network::{AppConfig, NewNetworkEvents, NewNetworkSender},
},
};

use netcore::transport::tcp::TCPBufferCfg;
use network_discovery::DiscoveryChangeListener;
use std::{
clone::Clone,
Expand Down Expand Up @@ -90,6 +92,7 @@ impl NetworkBuilder {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
// A network cannot exist without a PeerManager
// TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests.
Expand All @@ -109,6 +112,7 @@ impl NetworkBuilder {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
);

NetworkBuilder {
Expand Down Expand Up @@ -152,6 +156,7 @@ impl NetworkBuilder {
MAX_INBOUND_CONNECTIONS,
None,
None,
TCPBufferCfg::default(),
);

builder.add_connectivity_manager(
Expand Down Expand Up @@ -207,6 +212,12 @@ impl NetworkBuilder {
config.max_inbound_connections,
config.inbound_rate_limit_config,
config.outbound_rate_limit_config,
TCPBufferCfg::new_configs(
config.inbound_rx_buffer_size_bytes,
config.inbound_tx_buffer_size_bytes,
config.outbound_rx_buffer_size_bytes,
config.outbound_tx_buffer_size_bytes,
),
);

network_builder.add_connection_monitoring(
Expand Down
93 changes: 84 additions & 9 deletions network/netcore/src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::{
};
use proxy::Proxy;
use std::{
convert::TryFrom,
fmt::Debug,
io,
net::SocketAddr,
Expand All @@ -29,13 +28,47 @@ use tokio::{
use tokio_util::compat::Compat;
use url::Url;

#[derive(Debug, Clone, Copy, Default)]
pub struct TCPBufferCfg {
inbound_rx_buffer_bytes: Option<u32>,
inbound_tx_buffer_bytes: Option<u32>,
outbound_rx_buffer_bytes: Option<u32>,
outbound_tx_buffer_bytes: Option<u32>,
}

impl TCPBufferCfg {
pub const fn new() -> Self {
Self {
inbound_rx_buffer_bytes: None,
inbound_tx_buffer_bytes: None,
outbound_rx_buffer_bytes: None,
outbound_tx_buffer_bytes: None,
}
}
pub fn new_configs(
inbound_rx: Option<u32>,
inbound_tx: Option<u32>,
outbound_rx: Option<u32>,
outbound_tx: Option<u32>,
) -> Self {
Self {
inbound_rx_buffer_bytes: inbound_rx,
inbound_tx_buffer_bytes: inbound_tx,
outbound_rx_buffer_bytes: outbound_rx,
outbound_tx_buffer_bytes: outbound_tx,
}
}
}

/// Transport to build TCP connections
#[derive(Debug, Clone, Default)]
pub struct TcpTransport {
/// TTL to set for opened sockets, or `None` to keep default.
pub ttl: Option<u32>,
/// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
pub nodelay: Option<bool>,

pub tcp_buff_cfg: TCPBufferCfg,
}

impl TcpTransport {
Expand All @@ -50,6 +83,10 @@ impl TcpTransport {

Ok(())
}

pub fn set_tcp_buffers(&mut self, configs: &TCPBufferCfg) {
self.tcp_buff_cfg = *configs;
}
}

impl Transport for TcpTransport {
Expand All @@ -69,9 +106,23 @@ impl Transport for TcpTransport {
return Err(invalid_addr_error(&addr));
}

let listener = ::std::net::TcpListener::bind((ipaddr, port))?;
listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(listener)?;
let addr = SocketAddr::new(ipaddr, port);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we change this to not set_nonblocking?

let socket = if ipaddr.is_ipv4() {
tokio::net::TcpSocket::new_v4()?
} else {
tokio::net::TcpSocket::new_v6()?
};

if let Some(rx_buf) = self.tcp_buff_cfg.inbound_rx_buffer_bytes {
socket.set_recv_buffer_size(rx_buf)?;
}
if let Some(tx_buf) = self.tcp_buff_cfg.inbound_tx_buffer_bytes {
socket.set_send_buffer_size(tx_buf)?;
}
socket.bind(addr)?;

let listener = socket.listen(256)?;
let listen_addr = NetworkAddress::from(listener.local_addr()?);

Ok((
Expand All @@ -88,7 +139,6 @@ impl Transport for TcpTransport {

// ensure addr is well formed to save some work before potentially
// spawning a dial task that will fail anyway.
// TODO(philiphayes): base tcp transport should not allow trailing protocols
parse_ip_tcp(protos)
.map(|_| ())
.or_else(|| parse_dns_tcp(protos).map(|_| ()))
Expand Down Expand Up @@ -123,7 +173,7 @@ impl Transport for TcpTransport {
let f: Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send + 'static>> =
Box::pin(match proxy_addr {
Some(proxy_addr) => Either::Left(connect_via_proxy(proxy_addr, addr)),
None => Either::Right(resolve_and_connect(addr)),
None => Either::Right(resolve_and_connect(addr, self.tcp_buff_cfg)),
});

Ok(TcpOutbound {
Expand All @@ -144,23 +194,48 @@ async fn resolve_with_filter(
.filter(move |socketaddr| ip_filter.matches(socketaddr.ip())))
}

pub async fn connect_with_config(
port: u16,
ipaddr: std::net::IpAddr,
tcp_buff_cfg: TCPBufferCfg,
) -> io::Result<TcpStream> {
let addr = SocketAddr::new(ipaddr, port);

let socket = if addr.is_ipv4() {
tokio::net::TcpSocket::new_v4()?
} else {
tokio::net::TcpSocket::new_v6()?
};

if let Some(rx_buf) = tcp_buff_cfg.outbound_rx_buffer_bytes {
socket.set_recv_buffer_size(rx_buf)?;
}
if let Some(tx_buf) = tcp_buff_cfg.outbound_tx_buffer_bytes {
socket.set_send_buffer_size(tx_buf)?;
}
socket.connect(addr).await
}

/// Note: we need to take ownership of this `NetworkAddress` (instead of just
/// borrowing the `&[Protocol]` slice) so this future can be `Send + 'static`.
pub async fn resolve_and_connect(addr: NetworkAddress) -> io::Result<TcpStream> {
pub async fn resolve_and_connect(
addr: NetworkAddress,
tcp_buff_cfg: TCPBufferCfg,
) -> io::Result<TcpStream> {
let protos = addr.as_slice();

if let Some(((ipaddr, port), _addr_suffix)) = parse_ip_tcp(protos) {
// this is an /ip4 or /ip6 address, so we can just connect without any
// extra resolving or filtering.
TcpStream::connect((ipaddr, port)).await
connect_with_config(port, ipaddr, tcp_buff_cfg).await
} else if let Some(((ip_filter, dns_name, port), _addr_suffix)) = parse_dns_tcp(protos) {
// resolve dns name and filter
let socketaddr_iter = resolve_with_filter(ip_filter, dns_name.as_ref(), port).await?;
let mut last_err = None;

// try to connect until the first succeeds
for socketaddr in socketaddr_iter {
match TcpStream::connect(socketaddr).await {
match connect_with_config(socketaddr.port(), socketaddr.ip(), tcp_buff_cfg).await {
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err),
}
Expand Down
20 changes: 18 additions & 2 deletions network/src/peer_manager/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use channel::{self, aptos_channel, message_queues::QueueStyle};
#[cfg(any(test, feature = "testing", feature = "fuzzing"))]
use netcore::transport::memory::MemoryTransport;
use netcore::transport::{
tcp::{TcpSocket, TcpTransport},
tcp::{TCPBufferCfg, TcpSocket, TcpTransport},
Transport,
};
use std::{clone::Clone, collections::HashMap, fmt::Debug, net::IpAddr, sync::Arc};
Expand Down Expand Up @@ -82,6 +82,7 @@ struct PeerManagerContext {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
}

impl PeerManagerContext {
Expand All @@ -107,6 +108,7 @@ impl PeerManagerContext {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
Self {
pm_reqs_tx,
Expand All @@ -126,6 +128,7 @@ impl PeerManagerContext {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
}
}

Expand Down Expand Up @@ -186,6 +189,7 @@ impl PeerManagerBuilder {
inbound_connection_limit: usize,
inbound_rate_limit_config: Option<RateLimitConfig>,
outbound_rate_limit_config: Option<RateLimitConfig>,
tcp_buffer_cfg: TCPBufferCfg,
) -> Self {
// Setup channel to send requests to peer manager.
let (pm_reqs_tx, pm_reqs_rx) = aptos_channel::new(
Expand Down Expand Up @@ -223,6 +227,7 @@ impl PeerManagerBuilder {
inbound_connection_limit,
inbound_rate_limit_config,
outbound_rate_limit_config,
tcp_buffer_cfg,
)),
peer_manager: None,
listen_address,
Expand Down Expand Up @@ -278,11 +283,15 @@ impl PeerManagerBuilder {
),
};

let mut aptos_tcp_transport = APTOS_TCP_TRANSPORT.clone();
let tcp_cfg = self.get_tcp_buffers_cfg();
aptos_tcp_transport.set_tcp_buffers(&tcp_cfg);

self.peer_manager = match self.listen_address.as_slice() {
[Ip4(_), Tcp(_)] | [Ip6(_), Tcp(_)] => {
Some(TransportPeerManager::Tcp(self.build_with_transport(
AptosNetTransport::new(
APTOS_TCP_TRANSPORT.clone(),
aptos_tcp_transport,
self.network_context,
self.time_service.clone(),
key,
Expand Down Expand Up @@ -406,6 +415,13 @@ impl PeerManagerBuilder {
.add_connection_event_listener()
}

pub fn get_tcp_buffers_cfg(&self) -> TCPBufferCfg {
self.peer_manager_context
.as_ref()
.expect("Cannot add an event listener if PeerManager has already been built.")
.tcp_buffer_cfg
}

/// Register a peer-to-peer service (i.e., both client and service) for given
/// protocols.
pub fn add_p2p_service(
Expand Down
4 changes: 3 additions & 1 deletion network/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use short_hex_str::AsShortHexStr;
use std::{collections::BTreeMap, convert::TryFrom, fmt, io, pin::Pin, sync::Arc, time::Duration};

// Re-exposed for aptos-network-checker
pub use netcore::transport::tcp::{resolve_and_connect, TcpSocket};
pub use netcore::transport::tcp::{resolve_and_connect, TCPBufferCfg, TcpSocket};

#[cfg(test)]
mod test;
Expand All @@ -54,6 +54,8 @@ pub const APTOS_TCP_TRANSPORT: tcp::TcpTransport = tcp::TcpTransport {
ttl: None,
// Use TCP_NODELAY for Aptos tcp connections.
nodelay: Some(true),
// Use default TCP setting, overridden by Network config
tcp_buff_cfg: tcp::TCPBufferCfg::new(),
};

/// A trait alias for "socket-like" things.
Expand Down