diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index 806b449a96..a807e02133 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -59,8 +59,15 @@ use tari_core::{ tari_utilities::{hex::Hex, message_format::MessageFormat}, }; use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::Hashable}; -use tari_p2p::auto_update::SoftwareUpdaterHandle; -use tokio::{runtime, sync::watch}; +use tari_p2p::{ + auto_update::SoftwareUpdaterHandle, + services::liveness::{LivenessEvent, LivenessHandle}, +}; +use tokio::{ + runtime, + sync::{broadcast, watch}, + time, +}; pub enum StatusOutput { Log, @@ -77,6 +84,7 @@ pub struct CommandHandler { base_node_identity: Arc, peer_manager: Arc, connectivity: ConnectivityRequester, + liveness: LivenessHandle, node_service: LocalNodeCommsInterface, mempool_service: LocalMempoolService, state_machine_info: watch::Receiver, @@ -85,7 +93,7 @@ pub struct CommandHandler { impl CommandHandler { pub fn new(executor: runtime::Handle, ctx: &BaseNodeContext) -> Self { - CommandHandler { + Self { executor, config: ctx.config(), blockchain_db: ctx.blockchain_db().into(), @@ -95,6 +103,7 @@ impl CommandHandler { base_node_identity: ctx.base_node_identity(), peer_manager: ctx.base_node_comms().peer_manager(), connectivity: ctx.base_node_comms().connectivity(), + liveness: ctx.liveness(), node_service: ctx.local_node(), mempool_service: ctx.local_mempool(), state_machine_info: ctx.get_state_machine_info_channel(), @@ -535,6 +544,42 @@ impl CommandHandler { }); } + pub fn ping_peer(&self, dest_node_id: NodeId) { + let mut liveness = self.liveness.clone(); + + self.executor.spawn(time::timeout(Duration::from_secs(30), async move { + println!("🏓 Pinging peer..."); + let mut liveness_events = liveness.get_event_stream(); + + match liveness.send_ping(dest_node_id.clone()).await { + Ok(_) => loop { + match liveness_events.recv().await { + Ok(event) => + { + #[allow(clippy::single_match)] + match &*event { + LivenessEvent::ReceivedPong(pong) => { + if pong.node_id == dest_node_id { + println!("🏓️ Pong received, latency in is {}ms!", pong.latency.unwrap_or(0)); + break; + } + }, + _ => {}, + } + }, + Err(broadcast::error::RecvError::Closed) => { + break; + }, + _ => {}, + } + }, + Err(err) => { + println!("📞 Could not send ping: {}", err); + }, + } + })); + } + pub fn ban_peer(&self, node_id: NodeId, duration: Duration, must_ban: bool) { if self.base_node_identity.node_id() == &node_id { println!("Cannot ban our own node"); diff --git a/applications/tari_base_node/src/parser.rs b/applications/tari_base_node/src/parser.rs index 9296496eea..899338e488 100644 --- a/applications/tari_base_node/src/parser.rs +++ b/applications/tari_base_node/src/parser.rs @@ -57,6 +57,7 @@ pub enum BaseNodeCommand { GetPeer, ListPeers, DialPeer, + PingPeer, ResetOfflinePeers, RewindBlockchain, BanPeer, @@ -191,6 +192,9 @@ impl Parser { DialPeer => { self.process_dial_peer(args); }, + PingPeer => { + self.process_ping_peer(args); + }, DiscoverPeer => { self.process_discover_peer(args); }, @@ -295,6 +299,9 @@ impl Parser { DialPeer => { println!("Attempt to connect to a known peer"); }, + PingPeer => { + println!("Send a ping to a known peer and wait for a pong reply"); + }, DiscoverPeer => { println!("Attempt to discover a peer on the Tari network"); }, @@ -541,7 +548,7 @@ impl Parser { Some(n) => n, None => { println!("Please enter a valid destination public key or emoji id"); - println!("discover-peer [hex public key or emoji id]"); + println!("dial-peer [hex public key or emoji id]"); return; }, }; @@ -549,6 +556,24 @@ impl Parser { self.command_handler.dial_peer(dest_node_id) } + /// Function to process the dial-peer command + fn process_ping_peer<'a, I: Iterator>(&mut self, mut args: I) { + let dest_node_id = match args + .next() + .and_then(parse_emoji_id_or_public_key_or_node_id) + .map(either_to_node_id) + { + Some(n) => n, + None => { + println!("Please enter a valid destination public key or emoji id"); + println!("ping-peer [hex public key or emoji id]"); + return; + }, + }; + + self.command_handler.ping_peer(dest_node_id) + } + /// Function to process the ban-peer command fn process_ban_peer<'a, I: Iterator>(&mut self, mut args: I, must_ban: bool) { let node_id = match args diff --git a/base_layer/p2p/src/services/liveness/handle.rs b/base_layer/p2p/src/services/liveness/handle.rs index eb1331505c..0392368430 100644 --- a/base_layer/p2p/src/services/liveness/handle.rs +++ b/base_layer/p2p/src/services/liveness/handle.rs @@ -110,7 +110,7 @@ impl LivenessHandle { } } - /// Returns a fused event stream for the liveness service + /// Returns an event stream for the liveness service pub fn get_event_stream(&self) -> LivenessEventReceiver { self.event_stream_sender.subscribe() }