Skip to content

Commit

Permalink
Improve comms limits to combat network timeouts
Browse files Browse the repository at this point in the history
- Improved various comms configuration settings to combat RPC connection and response timeouts
- Improved wire mode logging
  • Loading branch information
hansieodendaal committed Aug 23, 2021
1 parent f577df8 commit dada8d9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 13 deletions.
8 changes: 7 additions & 1 deletion base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use tari_wallet::{
types::ValidationRetryStrategy,
};

use tari_comms::protocol::rpc::RpcClientConfig;
use tari_wallet::output_manager_service::storage::models::OutputStatus;
use tokio::{
runtime::Runtime,
Expand Down Expand Up @@ -1816,7 +1817,12 @@ fn test_txo_validation_rpc_timeout() {
.unwrap();

runtime.block_on(async {
let mut delay = delay_for(Duration::from_secs(100)).fuse();
let mut delay = delay_for(
RpcClientConfig::default().deadline.unwrap() +
RpcClientConfig::default().deadline_grace_period +
Duration::from_secs(30),
)
.fuse();
let mut failed = 0;
loop {
futures::select! {
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ where
.map_err(|_| ConnectionManagerError::WireFormatSendFailed)?;

let noise_socket = time::timeout(
Duration::from_secs(30),
Duration::from_secs(40),
noise_config.upgrade_socket(socket, ConnectionDirection::Outbound),
)
.await
Expand Down
40 changes: 36 additions & 4 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
protocol::ProtocolId,
runtime,
transports::Transport,
types::CommsPublicKey,
utils::multiaddr::multiaddr_to_socketaddr,
PeerManager,
};
Expand All @@ -67,6 +68,7 @@ use std::{
},
time::Duration,
};
use tari_crypto::tari_utilities::hex::Hex;
use tari_shutdown::ShutdownSignal;
use tokio::time;

Expand Down Expand Up @@ -249,7 +251,7 @@ where
let result = Self::perform_socket_upgrade_procedure(
node_identity,
peer_manager,
noise_config,
noise_config.clone(),
conn_man_notifier.clone(),
socket,
peer_addr,
Expand Down Expand Up @@ -286,10 +288,12 @@ where
}
},
Ok(WireMode::Comms(byte)) => {
let public_key = Self::remote_public_key_from_socket(socket, noise_config).await;
warn!(
target: LOG_TARGET,
"Peer at address '{}' sent invalid wire format byte. Expected {:x?} got: {:x?} ",
"Peer at address '{}' ({}) sent invalid wire format byte. Expected {:x?} got: {:x?} ",
peer_addr,
public_key,
config.network_info.network_byte,
byte,
);
Expand All @@ -313,11 +317,13 @@ where
}
},
Err(err) => {
let public_key = Self::remote_public_key_from_socket(socket, noise_config).await;
warn!(
target: LOG_TARGET,
"Peer at address '{}' failed to send wire format. Expected network byte {:x?} or liveness \
byte {:x?} not received. Error: {}",
"Peer at address '{}' ({}) failed to send its wire format . Expected network byte {:x?} or \
liveness byte {:x?} not received. Error: {}",
peer_addr,
public_key,
config.network_info.network_byte,
LIVENESS_WIRE_MODE,
err
Expand All @@ -331,6 +337,32 @@ where
self.bounded_executor.spawn(inbound_fut).await;
}

async fn remote_public_key_from_socket(socket: TTransport::Output, noise_config: NoiseConfig) -> String {
let public_key: Option<CommsPublicKey> = match time::timeout(
Duration::from_secs(30),
noise_config.upgrade_socket(socket, ConnectionDirection::Inbound),
)
.await
.map_err(|_| ConnectionManagerError::NoiseProtocolTimeout)
{
Ok(Ok(noise_socket)) => {
match noise_socket
.get_remote_public_key()
.ok_or(ConnectionManagerError::InvalidStaticPublicKey)
{
Ok(pk) => Some(pk),
_ => None,
}
},
_ => None,
};

match public_key {
None => "public key not known".to_string(),
Some(pk) => pk.to_hex(),
}
}

#[allow(clippy::too_many_arguments)]
async fn perform_socket_upgrade_procedure(
node_identity: Arc<NodeIdentity>,
Expand Down
8 changes: 4 additions & 4 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ pub struct ConnectionManagerConfig {
/// The number of dial attempts to make before giving up. Default: 3
pub max_dial_attempts: usize,
/// The maximum number of connection tasks that will be spawned at the same time. Once this limit is reached, peers
/// attempting to connect will have to wait for another connection attempt to complete. Default: 20
/// attempting to connect will have to wait for another connection attempt to complete. Default: 100
pub max_simultaneous_inbound_connects: usize,
/// Set to true to allow peers to send loopback, local-link and other addresses normally not considered valid for
/// peer-to-peer comms. Default: false
pub allow_test_addresses: bool,
/// Version information for this node
pub network_info: NodeNetworkInfo,
/// The maximum time to wait for the first byte before closing the connection. Default: 7s
/// The maximum time to wait for the first byte before closing the connection. Default: 45s
pub time_to_first_byte: Duration,
/// The number of liveness check sessions to allow. Default: 0
pub liveness_max_sessions: usize,
Expand All @@ -122,15 +122,15 @@ impl Default for ConnectionManagerConfig {
#[cfg(test)]
listener_address: "/memory/0".parse().unwrap(),
max_dial_attempts: 3,
max_simultaneous_inbound_connects: 20,
max_simultaneous_inbound_connects: 100,
network_info: Default::default(),
#[cfg(not(test))]
allow_test_addresses: false,
// This must always be true for internal crate tests
#[cfg(test)]
allow_test_addresses: true,
liveness_max_sessions: 0,
time_to_first_byte: Duration::from_secs(7),
time_to_first_byte: Duration::from_secs(45),
liveness_cidr_allowlist: vec![cidr::AnyIpCidr::V4("127.0.0.1/32".parse().unwrap())],
auxilary_tcp_listener_address: None,
}
Expand Down
6 changes: 3 additions & 3 deletions comms/src/protocol/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ impl RpcClientConfig {
impl Default for RpcClientConfig {
fn default() -> Self {
Self {
deadline: Some(Duration::from_secs(30)),
deadline_grace_period: Duration::from_secs(30),
handshake_timeout: Duration::from_secs(30),
deadline: Some(Duration::from_secs(120)),
deadline_grace_period: Duration::from_secs(60),
handshake_timeout: Duration::from_secs(90),
}
}
}
Expand Down

0 comments on commit dada8d9

Please sign in to comment.