Skip to content

Commit

Permalink
fix(comms): only reap when number of connections exceeds threshold (#…
Browse files Browse the repository at this point in the history
…4607)

Description
---
- only release connection handles of non-neighbouring peers after successful connect
- adds min threshold for connection reaping with default 50
- only reap connections that have less than 3 substreams
- only reap "excess" (num_connections - 50) connections
- adds RpcServer query that returns number of sessions for a peer
- updates list-connections command to display number of peer connection handles and rpc sessions 
- updates list-connections to display two tables, one with wallets and one with base nodes

Motivation and Context
---
Previously, connection handles would be dropped (making them reapable) when refreshing the neighbour peer pool. The neighbour pool only starts the attempt to connect, but the non-neighbouring connection handles should only be dropped if a replacement neighbour was actually able to connect.
Reaping should only apply when we have many connections, otherwise many connections are acceptable.

Fixes #4608 

How Has This Been Tested?
---
Manually, checking that connections to other base nodes/wallets running this PR stay connected
  • Loading branch information
sdbondi authored Sep 2, 2022
1 parent 7c9e22c commit 415f339
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 71 deletions.
143 changes: 84 additions & 59 deletions applications/tari_base_node/src/commands/command/list_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use anyhow::Error;
use async_trait::async_trait;
use clap::Parser;
use tari_comms::PeerConnection;
use tari_core::base_node::state_machine_service::states::PeerMetadata;

use super::{CommandContext, HandleCommand};
Expand All @@ -40,70 +41,94 @@ impl HandleCommand<Args> for CommandContext {
}

impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
if conns.is_empty() {
println!("No active peer connections.");
} else {
println!();
let num_connections = conns.len();
let mut table = Table::new();
table.set_titles(vec![
"NodeId",
"Public Key",
"Address",
"Direction",
"Age",
"Role",
"User Agent",
"Info",
async fn list_connections_print_table(&mut self, conns: &[PeerConnection]) {
let num_connections = conns.len();
let mut table = Table::new();
table.set_titles(vec![
"NodeId",
"Public Key",
"Address",
"Direction",
"Age",
"User Agent",
"Info",
]);
for conn in conns {
let peer = self
.peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
.expect("Peer not found");

let chain_height = peer
.get_metadata(1)
.and_then(|v| bincode::deserialize::<PeerMetadata>(v).ok())
.map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain()));

let ua = peer.user_agent;
let rpc_sessions = self
.rpc_server
.get_num_active_sessions_for(peer.node_id.clone())
.await
.unwrap_or(0);
table.add_row(row![
peer.node_id,
peer.public_key,
conn.address(),
conn.direction(),
format_duration_basic(conn.age()),
{
if ua.is_empty() {
"<unknown>"
} else {
ua.as_ref()
}
},
format!(
"{}hnd: {}, ss: {}, rpc: {}",
chain_height.map(|s| format!("{}, ", s)).unwrap_or_default(),
// Exclude the handle held by list-connections
conn.handle_count().saturating_sub(1),
conn.substream_count(),
rpc_sessions
),
]);
for conn in conns {
let peer = self
.peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
.expect("Peer not found");
}

let chain_height = peer
.get_metadata(1)
.and_then(|v| bincode::deserialize::<PeerMetadata>(v).ok())
.map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain()));
table.print_stdout();

let ua = peer.user_agent;
table.add_row(row![
peer.node_id,
peer.public_key,
conn.address(),
conn.direction(),
format_duration_basic(conn.age()),
{
if peer.features.is_client() {
"Wallet"
} else {
"Base node"
}
},
{
if ua.is_empty() {
"<unknown>"
} else {
ua.as_ref()
}
},
format!(
"substreams: {}{}",
conn.substream_count(),
chain_height.map(|s| format!(", {}", s)).unwrap_or_default()
),
]);
}
println!("{} active connection(s)", num_connections);
}
}

table.print_stdout();
impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
let (mut nodes, mut clients) = conns
.into_iter()
.partition::<Vec<_>, _>(|a| a.peer_features().is_node());
nodes.sort_by(|a, b| a.peer_node_id().cmp(b.peer_node_id()));
clients.sort_by(|a, b| a.peer_node_id().cmp(b.peer_node_id()));

println!("{} active connection(s)", num_connections);
println!();
println!("Base Nodes");
println!("----------");
if nodes.is_empty() {
println!("No active node connections.");
} else {
println!();
self.list_connections_print_table(&nodes).await;
}
println!();
println!("Wallets");
println!("-------");
if nodes.is_empty() {
println!("No active wallet connections.");
} else {
println!();
self.list_connections_print_table(&clients).await;
}
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions comms/core/src/connectivity/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct ConnectivityConfig {
pub connection_pool_refresh_interval: Duration,
/// True if connection reaping is enabled, otherwise false (default: true)
pub is_connection_reaping_enabled: bool,
/// The minimum number of connections that must exist before any connections may be reaped
/// Default: 50
pub reaper_min_connection_threshold: usize,
/// The minimum age of the connection before it can be reaped. This prevents a connection that has just been
/// established from being reaped due to inactivity. Default: 20 minutes
pub reaper_min_inactive_age: Duration,
Expand All @@ -54,6 +57,7 @@ impl Default for ConnectivityConfig {
min_connectivity: 1,
connection_pool_refresh_interval: Duration::from_secs(60),
reaper_min_inactive_age: Duration::from_secs(20 * 60),
reaper_min_connection_threshold: 50,
is_connection_reaping_enabled: true,
max_failures_mark_offline: 1,
connection_tie_break_linger: Duration::from_secs(2),
Expand Down
6 changes: 4 additions & 2 deletions comms/core/src/connectivity/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ impl ConnectionPool {
.unwrap_or(ConnectionStatus::NotConnected)
}

pub fn get_inactive_connections_mut(&mut self, min_age: Duration) -> Vec<&mut PeerConnection> {
self.filter_connections_mut(|conn| conn.age() > min_age && conn.handle_count() <= 1)
pub fn get_inactive_outbound_connections_mut(&mut self, min_age: Duration) -> Vec<&mut PeerConnection> {
self.filter_connections_mut(|conn| {
conn.age() > min_age && conn.handle_count() <= 1 && conn.substream_count() > 2
})
}

pub(in crate::connectivity) fn filter_drain<P>(&mut self, mut predicate: P) -> Vec<PeerConnectionState>
Expand Down
18 changes: 14 additions & 4 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,28 @@ impl ConnectivityManagerActor {
}

async fn reap_inactive_connections(&mut self) {
let connections = self
let excess_connections = self
.pool
.get_inactive_connections_mut(self.config.reaper_min_inactive_age);
.count_connected()
.saturating_sub(self.config.reaper_min_connection_threshold);
if excess_connections == 0 {
return;
}

let mut connections = self
.pool
.get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
connections.truncate(excess_connections as usize);
for conn in connections {
if !conn.is_connected() {
continue;
}

debug!(
target: LOG_TARGET,
"Disconnecting '{}' because connection was inactive",
conn.peer_node_id().short_str()
"Disconnecting '{}' because connection was inactive ({} handles)",
conn.peer_node_id().short_str(),
conn.handle_count()
);
if let Err(err) = conn.disconnect().await {
// Already disconnected
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/protocol/rpc/server/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
use tokio::sync::{mpsc, oneshot};

use super::RpcServerError;
use crate::peer_manager::NodeId;

#[derive(Debug)]
pub enum RpcServerRequest {
GetNumActiveSessions(oneshot::Sender<usize>),
GetNumActiveSessionsForPeer(NodeId, oneshot::Sender<usize>),
}

#[derive(Debug, Clone)]
Expand All @@ -47,4 +49,13 @@ impl RpcServerHandle {
.map_err(|_| RpcServerError::RequestCanceled)?;
resp.await.map_err(Into::into)
}

pub async fn get_num_active_sessions_for(&mut self, peer: NodeId) -> Result<usize, RpcServerError> {
let (req, resp) = oneshot::channel();
self.sender
.send(RpcServerRequest::GetNumActiveSessionsForPeer(peer, req))
.await
.map_err(|_| RpcServerError::RequestCanceled)?;
resp.await.map_err(Into::into)
}
}
18 changes: 17 additions & 1 deletion comms/core/src/protocol/rpc/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ where
}

async fn handle_request(&self, req: RpcServerRequest) {
use RpcServerRequest::GetNumActiveSessions;
#[allow(clippy::enum_glob_use)]
use RpcServerRequest::*;
match req {
GetNumActiveSessions(reply) => {
let max_sessions = self
Expand All @@ -321,6 +322,21 @@ where
let num_active = max_sessions.saturating_sub(self.executor.num_available());
let _ = reply.send(num_active);
},
GetNumActiveSessionsForPeer(node_id, reply) => {
let num_active = self
.sessions
.get(&node_id)
.map(|num_sessions| {
let max_sessions = self
.config
.maximum_sessions_per_client
.unwrap_or_else(BoundedExecutor::max_theoretical_tasks);
max_sessions.saturating_sub(*num_sessions)
})
.unwrap_or(0);

let _ = reply.send(num_active);
},
}
}

Expand Down
5 changes: 0 additions & 5 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,6 @@ impl DhtConnectivity {
self.insert_neighbour(peer);
});

// Drop any connection handles that removed from the neighbour pool
difference.iter().for_each(|peer| {
self.remove_connection_handle(peer);
});

if !new_neighbours.is_empty() {
self.connectivity.request_many_dials(new_neighbours).await?;
}
Expand Down

0 comments on commit 415f339

Please sign in to comment.