Skip to content

Commit

Permalink
feat(base-node): split list connections into wallet and node lists
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 2, 2022
1 parent 8560c44 commit bf9d2e8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 59 deletions.
140 changes: 83 additions & 57 deletions applications/tari_base_node/src/commands/command/list_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use anyhow::Error;
use async_trait::async_trait;
use clap::Parser;
use tari_comms::PeerConnection;
use tari_core::base_node::state_machine_service::states::PeerMetadata;

use super::{CommandContext, HandleCommand};
Expand All @@ -40,68 +41,93 @@ impl HandleCommand<Args> for CommandContext {
}

impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
if conns.is_empty() {
println!("No active peer connections.");
} else {
println!();
let num_connections = conns.len();
let mut table = Table::new();
table.set_titles(vec![
"NodeId",
"Public Key",
"Address",
"Direction",
"Age",
"User Agent",
"Info",
async fn list_connections_print_table(&mut self, conns: &[PeerConnection]) {
let num_connections = conns.len();
let mut table = Table::new();
table.set_titles(vec![
"NodeId",
"Public Key",
"Address",
"Direction",
"Age",
"User Agent",
"Info",
]);
for conn in conns {
let peer = self
.peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
.expect("Peer not found");

let chain_height = peer
.get_metadata(1)
.and_then(|v| bincode::deserialize::<PeerMetadata>(v).ok())
.map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain()));

let ua = peer.user_agent;
let rpc_sessions = self
.rpc_server
.get_num_active_sessions_for(peer.node_id.clone())
.await
.unwrap_or(0);
table.add_row(row![
peer.node_id,
peer.public_key,
conn.address(),
conn.direction(),
format_duration_basic(conn.age()),
{
if ua.is_empty() {
"<unknown>"
} else {
ua.as_ref()
}
},
format!(
"{}hnd: {}, ss: {}, rpc: {}",
chain_height.map(|s| format!("{}, ", s)).unwrap_or_default(),
conn.handle_count(),
conn.substream_count(),
rpc_sessions
),
]);
for conn in conns {
let peer = self
.peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
.expect("Peer not found");
}

let chain_height = peer
.get_metadata(1)
.and_then(|v| bincode::deserialize::<PeerMetadata>(v).ok())
.map(|metadata| format!("height: {}", metadata.metadata.height_of_longest_chain()));
table.print_stdout();

let ua = peer.user_agent;
let rpc_sessions = self
.rpc_server
.get_num_active_sessions_for(peer.node_id.clone())
.await?;
table.add_row(row![
peer.node_id,
peer.public_key,
conn.address(),
conn.direction(),
format_duration_basic(conn.age()),
{
if ua.is_empty() {
"<unknown>"
} else {
ua.as_ref()
}
},
format!(
"{}hnd: {}, ss: {}, rpc: {}",
chain_height.map(|s| format!("{}, ", s)).unwrap_or_default(),
conn.handle_count(),
conn.substream_count(),
rpc_sessions
),
]);
}
println!("{} active connection(s)", num_connections);
}
}

table.print_stdout();
impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
let (mut nodes, mut clients) = conns
.into_iter()
.partition::<Vec<_>, _>(|a| a.peer_features().is_node());
nodes.sort_by(|a, b| a.peer_node_id().cmp(b.peer_node_id()));
clients.sort_by(|a, b| a.peer_node_id().cmp(b.peer_node_id()));

println!("{} active connection(s)", num_connections);
println!();
println!("Base Nodes");
println!("----------");
if nodes.is_empty() {
println!("No active node connections.");
} else {
println!();
self.list_connections_print_table(&nodes).await;
}
println!();
println!("Wallets");
println!("-------");
if nodes.is_empty() {
println!("No active wallet connections.");
} else {
println!();
self.list_connections_print_table(&clients).await;
}
Ok(())
}
Expand Down
9 changes: 7 additions & 2 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,18 @@ impl ConnectivityManagerActor {
}

async fn reap_inactive_connections(&mut self) {
if self.pool.count_connected() < self.config.reaper_min_connection_threshold {
let excess_connections = self
.pool
.count_connected()
.saturating_sub(self.config.reaper_min_connection_threshold);
if excess_connections == 0 {
return;
}

let connections = self
let mut connections = self
.pool
.get_inactive_outbound_connections_mut(self.config.reaper_min_inactive_age);
connections.truncate(excess_connections as usize);
for conn in connections {
if !conn.is_connected() {
continue;
Expand Down

0 comments on commit bf9d2e8

Please sign in to comment.