From dada8d98f0e38c778f718514261f04e344620faa Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 20 Aug 2021 14:42:18 +0200 Subject: [PATCH] Improve comms limits to combat network timeouts - Improved various comms configuration settings to combat RPC connection and response timeouts - Improved wire mode logging --- .../tests/output_manager_service/service.rs | 8 +++- comms/src/connection_manager/dialer.rs | 2 +- comms/src/connection_manager/listener.rs | 40 +++++++++++++++++-- comms/src/connection_manager/manager.rs | 8 ++-- comms/src/protocol/rpc/client.rs | 6 +-- 5 files changed, 51 insertions(+), 13 deletions(-) diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service/service.rs index 415b5d8261..c6c23da53e 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service/service.rs @@ -84,6 +84,7 @@ use tari_wallet::{ types::ValidationRetryStrategy, }; +use tari_comms::protocol::rpc::RpcClientConfig; use tari_wallet::output_manager_service::storage::models::OutputStatus; use tokio::{ runtime::Runtime, @@ -1816,7 +1817,12 @@ fn test_txo_validation_rpc_timeout() { .unwrap(); runtime.block_on(async { - let mut delay = delay_for(Duration::from_secs(100)).fuse(); + let mut delay = delay_for( + RpcClientConfig::default().deadline.unwrap() + + RpcClientConfig::default().deadline_grace_period + + Duration::from_secs(30), + ) + .fuse(); let mut failed = 0; loop { futures::select! { diff --git a/comms/src/connection_manager/dialer.rs b/comms/src/connection_manager/dialer.rs index 26371c009b..b8fffb3029 100644 --- a/comms/src/connection_manager/dialer.rs +++ b/comms/src/connection_manager/dialer.rs @@ -514,7 +514,7 @@ where .map_err(|_| ConnectionManagerError::WireFormatSendFailed)?; let noise_socket = time::timeout( - Duration::from_secs(30), + Duration::from_secs(40), noise_config.upgrade_socket(socket, ConnectionDirection::Outbound), ) .await diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index bd9a65eca2..08bbe808b6 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -42,6 +42,7 @@ use crate::{ protocol::ProtocolId, runtime, transports::Transport, + types::CommsPublicKey, utils::multiaddr::multiaddr_to_socketaddr, PeerManager, }; @@ -67,6 +68,7 @@ use std::{ }, time::Duration, }; +use tari_crypto::tari_utilities::hex::Hex; use tari_shutdown::ShutdownSignal; use tokio::time; @@ -249,7 +251,7 @@ where let result = Self::perform_socket_upgrade_procedure( node_identity, peer_manager, - noise_config, + noise_config.clone(), conn_man_notifier.clone(), socket, peer_addr, @@ -286,10 +288,12 @@ where } }, Ok(WireMode::Comms(byte)) => { + let public_key = Self::remote_public_key_from_socket(socket, noise_config).await; warn!( target: LOG_TARGET, - "Peer at address '{}' sent invalid wire format byte. Expected {:x?} got: {:x?} ", + "Peer at address '{}' ({}) sent invalid wire format byte. Expected {:x?} got: {:x?} ", peer_addr, + public_key, config.network_info.network_byte, byte, ); @@ -313,11 +317,13 @@ where } }, Err(err) => { + let public_key = Self::remote_public_key_from_socket(socket, noise_config).await; warn!( target: LOG_TARGET, - "Peer at address '{}' failed to send wire format. Expected network byte {:x?} or liveness \ - byte {:x?} not received. Error: {}", + "Peer at address '{}' ({}) failed to send its wire format . Expected network byte {:x?} or \ + liveness byte {:x?} not received. Error: {}", peer_addr, + public_key, config.network_info.network_byte, LIVENESS_WIRE_MODE, err @@ -331,6 +337,32 @@ where self.bounded_executor.spawn(inbound_fut).await; } + async fn remote_public_key_from_socket(socket: TTransport::Output, noise_config: NoiseConfig) -> String { + let public_key: Option = match time::timeout( + Duration::from_secs(30), + noise_config.upgrade_socket(socket, ConnectionDirection::Inbound), + ) + .await + .map_err(|_| ConnectionManagerError::NoiseProtocolTimeout) + { + Ok(Ok(noise_socket)) => { + match noise_socket + .get_remote_public_key() + .ok_or(ConnectionManagerError::InvalidStaticPublicKey) + { + Ok(pk) => Some(pk), + _ => None, + } + }, + _ => None, + }; + + match public_key { + None => "public key not known".to_string(), + Some(pk) => pk.to_hex(), + } + } + #[allow(clippy::too_many_arguments)] async fn perform_socket_upgrade_procedure( node_identity: Arc, diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs index d339af71ea..d1019f33d4 100644 --- a/comms/src/connection_manager/manager.rs +++ b/comms/src/connection_manager/manager.rs @@ -94,14 +94,14 @@ pub struct ConnectionManagerConfig { /// The number of dial attempts to make before giving up. Default: 3 pub max_dial_attempts: usize, /// The maximum number of connection tasks that will be spawned at the same time. Once this limit is reached, peers - /// attempting to connect will have to wait for another connection attempt to complete. Default: 20 + /// attempting to connect will have to wait for another connection attempt to complete. Default: 100 pub max_simultaneous_inbound_connects: usize, /// Set to true to allow peers to send loopback, local-link and other addresses normally not considered valid for /// peer-to-peer comms. Default: false pub allow_test_addresses: bool, /// Version information for this node pub network_info: NodeNetworkInfo, - /// The maximum time to wait for the first byte before closing the connection. Default: 7s + /// The maximum time to wait for the first byte before closing the connection. Default: 45s pub time_to_first_byte: Duration, /// The number of liveness check sessions to allow. Default: 0 pub liveness_max_sessions: usize, @@ -122,7 +122,7 @@ impl Default for ConnectionManagerConfig { #[cfg(test)] listener_address: "/memory/0".parse().unwrap(), max_dial_attempts: 3, - max_simultaneous_inbound_connects: 20, + max_simultaneous_inbound_connects: 100, network_info: Default::default(), #[cfg(not(test))] allow_test_addresses: false, @@ -130,7 +130,7 @@ impl Default for ConnectionManagerConfig { #[cfg(test)] allow_test_addresses: true, liveness_max_sessions: 0, - time_to_first_byte: Duration::from_secs(7), + time_to_first_byte: Duration::from_secs(45), liveness_cidr_allowlist: vec![cidr::AnyIpCidr::V4("127.0.0.1/32".parse().unwrap())], auxilary_tcp_listener_address: None, } diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 737255bbd5..4d86c24ea3 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -245,9 +245,9 @@ impl RpcClientConfig { impl Default for RpcClientConfig { fn default() -> Self { Self { - deadline: Some(Duration::from_secs(30)), - deadline_grace_period: Duration::from_secs(30), - handshake_timeout: Duration::from_secs(30), + deadline: Some(Duration::from_secs(120)), + deadline_grace_period: Duration::from_secs(60), + handshake_timeout: Duration::from_secs(90), } } }