Skip to content

Commit

Permalink
feat(comms): adds periodic socket-level liveness checks (#4819)
Browse files Browse the repository at this point in the history
Description
---
- adds socket-level liveness checks
- adds configuration to enable liveness checks (currently enabled by default in base node, disabled in wallet)
- update status line to display liveness status

Motivation and Context
---
Allows us to gain visibility on the base latency of the transport without including overhead of the noise socket and yamux

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Oct 19, 2022
1 parent c004e30 commit 2bea05f
Show file tree
Hide file tree
Showing 39 changed files with 361 additions and 119 deletions.
8 changes: 7 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{cmp, str::FromStr, sync::Arc};
use std::{cmp, str::FromStr, sync::Arc, time::Duration};

use log::*;
use tari_app_utilities::{consts, identity_management, identity_management::load_from_json};
Expand Down Expand Up @@ -106,6 +106,12 @@ where B: BlockchainBackend + 'static
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
p2p_config.transport.tor.identity = tor_identity;

// TODO: This should probably be disabled in future and have it optionally set/unset in the config - this check
// does allow MITM/ISP/tor router to connect this node's IP to a destination IP/onion address.
// Specifically, "pingpong" text is periodically sent on an unencrypted socket allowing anyone observing
// the traffic to recognise the sending IP address as almost certainly a tari node.
p2p_config.listener_liveness_check_interval = Some(Duration::from_secs(15));

let mut handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_base_node/src/commands/command/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub struct ArgsAddPeer {
impl HandleCommand<ArgsAddPeer> for CommandContext {
async fn handle_command(&mut self, args: ArgsAddPeer) -> Result<(), Error> {
let public_key = args.public_key.into();
if self.peer_manager.exists(&public_key).await {
let peer_manager = self.comms.peer_manager();
if peer_manager.exists(&public_key).await {
return Err(anyhow!("Peer with public key '{}' already exists", public_key));
}
let node_id = NodeId::from_public_key(&public_key);
Expand All @@ -57,7 +58,7 @@ impl HandleCommand<ArgsAddPeer> for CommandContext {
vec![],
String::new(),
);
self.peer_manager.add_peer(peer).await?;
peer_manager.add_peer(peer).await?;
println!("Peer with node id '{}'was added to the base node.", node_id);
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_base_node/src/commands/command/ban_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ impl CommandContext {
if self.base_node_identity.node_id() == &node_id {
Err(ArgsError::BanSelf.into())
} else if must_ban {
self.connectivity
self.comms
.connectivity()
.ban_peer_until(node_id.clone(), duration, "UI manual ban".to_string())
.await?;
println!("Peer was banned in base node.");
Ok(())
} else {
self.peer_manager.unban_peer(&node_id).await?;
self.comms.peer_manager().unban_peer(&node_id).await?;
println!("Peer ban was removed from base node.");
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
/// Function to process the dial-peer command
pub async fn dial_peer(&self, dest_node_id: NodeId) -> Result<(), Error> {
let connectivity = self.connectivity.clone();
let connectivity = self.comms.connectivity();
task::spawn(async move {
let start = Instant::now();
println!("☎️ Dialing peer...");
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_base_node/src/commands/command/get_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ enum ArgsError {

impl CommandContext {
pub async fn get_peer(&self, partial: Vec<u8>, original_str: String) -> Result<(), Error> {
let peers = self.peer_manager.find_all_starts_with(&partial).await?;
let peer_manager = self.comms.peer_manager();
let peers = peer_manager.find_all_starts_with(&partial).await?;
let peer = {
if let Some(peer) = peers.into_iter().next() {
peer
} else {
let pk = parse_emoji_id_or_public_key(&original_str).ok_or_else(|| ArgsError::NoPeerMatching {
original_str: original_str.clone(),
})?;
let peer = self
.peer_manager
let peer = peer_manager
.find_by_public_key(&pk)
.await?
.ok_or(ArgsError::NoPeerMatching { original_str })?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl CommandContext {
"User Agent",
"Info",
]);
let peer_manager = self.comms.peer_manager();
for conn in conns {
let peer = self
.peer_manager
let peer = peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
Expand Down Expand Up @@ -105,7 +105,7 @@ impl CommandContext {
impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
let conns = self.comms.connectivity().get_active_connections().await?;
let (mut nodes, mut clients) = conns
.into_iter()
.partition::<Vec<_>, _>(|a| a.peer_features().is_node());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CommandContext {
_ => false,
})
}
let peers = self.peer_manager.perform_query(query).await?;
let peers = self.comms.peer_manager().perform_query(query).await?;
let num_peers = peers.len();
println!();
let mut table = Table::new();
Expand Down
12 changes: 5 additions & 7 deletions applications/tari_base_node/src/commands/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ use async_trait::async_trait;
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
use strum::{EnumVariantNames, VariantNames};
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::{Peer, PeerManager, PeerManagerError, PeerQuery},
peer_manager::{Peer, PeerManagerError, PeerQuery},
protocol::rpc::RpcServerHandle,
CommsNode,
NodeIdentity,
};
use tari_comms_dht::{DhtDiscoveryRequester, MetricsCollectorHandle};
Expand Down Expand Up @@ -155,8 +155,7 @@ pub struct CommandContext {
dht_metrics_collector: MetricsCollectorHandle,
rpc_server: RpcServerHandle,
base_node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
connectivity: ConnectivityRequester,
comms: CommsNode,
liveness: LivenessHandle,
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
Expand All @@ -176,8 +175,7 @@ impl CommandContext {
dht_metrics_collector: ctx.base_node_dht().metrics_collector(),
rpc_server: ctx.rpc_server(),
base_node_identity: ctx.base_node_identity(),
peer_manager: ctx.base_node_comms().peer_manager(),
connectivity: ctx.base_node_comms().connectivity(),
comms: ctx.base_node_comms().clone(),
liveness: ctx.liveness(),
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
Expand Down Expand Up @@ -297,7 +295,7 @@ impl HandleCommand<Command> for CommandContext {

impl CommandContext {
async fn fetch_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
let pm = &self.peer_manager;
let pm = self.comms.peer_manager();
let query = PeerQuery::new().select_where(|p| p.is_banned());
pm.perform_query(query).await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
pub async fn reset_offline_peers(&self) -> Result<(), Error> {
let num_updated = self
.peer_manager
.comms
.peer_manager()
.update_each(|mut peer| {
if peer.is_offline() {
peer.set_offline(false);
Expand Down
17 changes: 16 additions & 1 deletion applications/tari_base_node/src/commands/command/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use clap::Parser;
use tari_app_utilities::consts;
use tari_comms::connection_manager::LivenessStatus;
use tokio::time;

use super::{CommandContext, HandleCommand};
Expand All @@ -47,6 +48,7 @@ impl HandleCommand<Args> for CommandContext {
}

impl CommandContext {
#[allow(clippy::too_many_lines)]
pub async fn status(&mut self, output: StatusLineOutput) -> Result<(), Error> {
let mut full_log = false;
if self.last_time_full.elapsed() > Duration::from_secs(120) {
Expand Down Expand Up @@ -102,7 +104,7 @@ impl CommandContext {
status_line.add_field("Mempool", "query timed out");
};

let conns = self.connectivity.get_active_connections().await?;
let conns = self.comms.connectivity().get_active_connections().await?;
let (num_nodes, num_clients) = conns.iter().fold((0usize, 0usize), |(nodes, clients), conn| {
if conn.peer_features().is_node() {
(nodes + 1, clients)
Expand Down Expand Up @@ -139,6 +141,19 @@ impl CommandContext {
);
}

match self.comms.listening_info().liveness_status() {
LivenessStatus::Disabled => {},
LivenessStatus::Checking => {
status_line.add("⏳️️");
},
LivenessStatus::Unreachable => {
status_line.add("‼️");
},
LivenessStatus::Live(latency) => {
status_line.add(format!("⚡️ {:.2?}", latency));
},
}

let target = "base_node::app::status";
match output {
StatusLineOutput::StdOutAndLog => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
pub async fn unban_all_peers(&self) -> Result<(), Error> {
let query = PeerQuery::new().select_where(|p| p.is_banned());
let peers = self.peer_manager.perform_query(query).await?;
let peer_manager = self.comms.peer_manager();
let peers = peer_manager.perform_query(query).await?;
let num_peers = peers.len();
for peer in peers {
if let Err(err) = self.peer_manager.unban_peer(&peer.node_id).await {
if let Err(err) = peer_manager.unban_peer(&peer.node_id).await {
println!("Failed to unban peer: {}", err);
}
}
Expand Down
6 changes: 5 additions & 1 deletion applications/tari_base_node/src/commands/status_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl StatusLine {
Default::default()
}

pub fn add<T: ToString>(&mut self, value: T) -> &mut Self {
self.add_field("", value)
}

pub fn add_field<T: ToString>(&mut self, name: &'static str, value: T) -> &mut Self {
self.fields.push((name, value.to_string()));
self
Expand All @@ -54,7 +58,7 @@ impl Display for StatusLine {
write!(f, "{} ", Local::now().format("%H:%M"))?;
let s = self.fields.iter().map(|(k, v)| format(k, v)).collect::<Vec<_>>();

write!(f, "{}", s.join(", "))
write!(f, "{}", s.join(" "))
}
}

Expand Down
10 changes: 9 additions & 1 deletion base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
time::Duration,
};

use serde::{Deserialize, Serialize};
use tari_common::{
configuration::{
serializers,
utils::{deserialize_string_or_struct, serialize_string},
StringList,
},
Expand Down Expand Up @@ -105,6 +109,9 @@ pub struct P2pConfig {
/// Liveness sessions can be used by third party tooling to determine node liveness.
/// A value of 0 will disallow any liveness sessions.
pub listener_liveness_max_sessions: usize,
/// If Some, enables periodic socket-level liveness checks
#[serde(with = "serializers::optional_seconds")]
pub listener_liveness_check_interval: Option<Duration>,
/// CIDR for addresses allowed to enter into liveness check mode on the listener.
pub listener_liveness_allowlist_cidrs: StringList,
/// User agent string for this node
Expand Down Expand Up @@ -137,6 +144,7 @@ impl Default for P2pConfig {
},
allow_test_addresses: false,
listener_liveness_max_sessions: 0,
listener_liveness_check_interval: None,
listener_liveness_allowlist_cidrs: StringList::default(),
user_agent: String::new(),
auxiliary_tcp_listener_address: None,
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ impl ServiceInitializer for P2pInitializer {
minor_version: MINOR_NETWORK_VERSION,
network_byte: self.network.as_byte(),
user_agent: config.user_agent.clone(),
});
})
.set_liveness_check(config.listener_liveness_check_interval);

if config.allow_test_addresses || config.dht.allow_test_addresses {
// The default is false, so ensure that both settings are true in this case
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Default for WalletConfig {
fn default() -> Self {
let p2p = P2pConfig {
datastore_path: PathBuf::from("peer_db/wallet"),
listener_liveness_check_interval: None,
..Default::default()
};
Self {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
user_agent: "tari/test-wallet".to_string(),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async fn create_wallet(
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};

let sql_database_path = comms_config
Expand Down Expand Up @@ -679,6 +680,7 @@ async fn test_import_utxo() {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};
let config = WalletConfig {
p2p: comms_config,
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3919,6 +3919,7 @@ pub unsafe extern "C" fn comms_config_create(
user_agent: format!("tari/mobile_wallet/{}", env!("CARGO_PKG_VERSION")),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};

Box::into_raw(Box::new(config))
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ track_reorgs = true

# CIDR for addresses allowed to enter into liveness check mode on the listener.
#listener_liveness_allowlist_cidrs = []
# Enables periodic socket-level liveness checks. Default: Disabled
listener_liveness_check_interval = 15

# User agent string for this node
#user_agent = ""
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ event_channel_size = 3500

# CIDR for addresses allowed to enter into liveness check mode on the listener.
#listener_liveness_allowlist_cidrs = []
# Enables periodic socket-level liveness checks. Default: Disabled
# listener_liveness_check_interval = 15

# User agent string for this node
#user_agent = ""
Expand Down
13 changes: 12 additions & 1 deletion comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{iter, sync::Arc};
use std::{iter, sync::Arc, time::Duration};

use log::*;
use tari_shutdown::ShutdownSignal;
Expand Down Expand Up @@ -125,6 +125,12 @@ impl UnspawnedCommsNode {
self
}

/// Set to true to enable self liveness checking for the configured public address
pub fn set_liveness_check(mut self, interval: Option<Duration>) -> Self {
self.builder = self.builder.set_liveness_check(interval);
self
}

/// Spawn a new node using the specified [Transport](crate::transports::Transport).
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
Expand Down Expand Up @@ -317,6 +323,11 @@ impl CommsNode {
self.listening_info.bind_address()
}

/// Return [ListenerInfo]
pub fn listening_info(&self) -> &ListenerInfo {
&self.listening_info
}

/// Return the Ip/Tcp address that this node is listening on
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
Expand Down
6 changes: 6 additions & 0 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ impl CommsBuilder {
self
}

/// Enable and set interval for self-liveness checks, or None to disable it (default)
pub fn set_liveness_check(mut self, check_interval: Option<Duration>) -> Self {
self.connection_manager_config.liveness_self_check_interval = check_interval;
self
}

fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
let file_lock = self.peer_storage_file_lock.take();

Expand Down
Loading

0 comments on commit 2bea05f

Please sign in to comment.