Skip to content

Commit

Permalink
feat: ping-peer command (#3295)
Browse files Browse the repository at this point in the history
Description
---
`ping-peer [emoji id | pk | nodeid ]`

Motivation and Context
---
Allows latency tests pinging of peers using the liveness service

How Has This Been Tested?
---
Manually by running the command on a base node
  • Loading branch information
sdbondi authored Sep 6, 2021
1 parent bc95c18 commit a04a2a6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
51 changes: 48 additions & 3 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ use tari_core::{
tari_utilities::{hex::Hex, message_format::MessageFormat},
};
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::Hashable};
use tari_p2p::auto_update::SoftwareUpdaterHandle;
use tokio::{runtime, sync::watch};
use tari_p2p::{
auto_update::SoftwareUpdaterHandle,
services::liveness::{LivenessEvent, LivenessHandle},
};
use tokio::{
runtime,
sync::{broadcast, watch},
time,
};

pub enum StatusOutput {
Log,
Expand All @@ -77,6 +84,7 @@ pub struct CommandHandler {
base_node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
connectivity: ConnectivityRequester,
liveness: LivenessHandle,
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
state_machine_info: watch::Receiver<StatusInfo>,
Expand All @@ -85,7 +93,7 @@ pub struct CommandHandler {

impl CommandHandler {
pub fn new(executor: runtime::Handle, ctx: &BaseNodeContext) -> Self {
CommandHandler {
Self {
executor,
config: ctx.config(),
blockchain_db: ctx.blockchain_db().into(),
Expand All @@ -95,6 +103,7 @@ impl CommandHandler {
base_node_identity: ctx.base_node_identity(),
peer_manager: ctx.base_node_comms().peer_manager(),
connectivity: ctx.base_node_comms().connectivity(),
liveness: ctx.liveness(),
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
state_machine_info: ctx.get_state_machine_info_channel(),
Expand Down Expand Up @@ -535,6 +544,42 @@ impl CommandHandler {
});
}

pub fn ping_peer(&self, dest_node_id: NodeId) {
let mut liveness = self.liveness.clone();

self.executor.spawn(time::timeout(Duration::from_secs(30), async move {
println!("🏓 Pinging peer...");
let mut liveness_events = liveness.get_event_stream();

match liveness.send_ping(dest_node_id.clone()).await {
Ok(_) => loop {
match liveness_events.recv().await {
Ok(event) =>
{
#[allow(clippy::single_match)]
match &*event {
LivenessEvent::ReceivedPong(pong) => {
if pong.node_id == dest_node_id {
println!("🏓️ Pong received, latency in is {}ms!", pong.latency.unwrap_or(0));
break;
}
},
_ => {},
}
},
Err(broadcast::error::RecvError::Closed) => {
break;
},
_ => {},
}
},
Err(err) => {
println!("📞 Could not send ping: {}", err);
},
}
}));
}

pub fn ban_peer(&self, node_id: NodeId, duration: Duration, must_ban: bool) {
if self.base_node_identity.node_id() == &node_id {
println!("Cannot ban our own node");
Expand Down
27 changes: 26 additions & 1 deletion applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum BaseNodeCommand {
GetPeer,
ListPeers,
DialPeer,
PingPeer,
ResetOfflinePeers,
RewindBlockchain,
BanPeer,
Expand Down Expand Up @@ -191,6 +192,9 @@ impl Parser {
DialPeer => {
self.process_dial_peer(args);
},
PingPeer => {
self.process_ping_peer(args);
},
DiscoverPeer => {
self.process_discover_peer(args);
},
Expand Down Expand Up @@ -295,6 +299,9 @@ impl Parser {
DialPeer => {
println!("Attempt to connect to a known peer");
},
PingPeer => {
println!("Send a ping to a known peer and wait for a pong reply");
},
DiscoverPeer => {
println!("Attempt to discover a peer on the Tari network");
},
Expand Down Expand Up @@ -541,14 +548,32 @@ impl Parser {
Some(n) => n,
None => {
println!("Please enter a valid destination public key or emoji id");
println!("discover-peer [hex public key or emoji id]");
println!("dial-peer [hex public key or emoji id]");
return;
},
};

self.command_handler.dial_peer(dest_node_id)
}

/// Function to process the dial-peer command
fn process_ping_peer<'a, I: Iterator<Item = &'a str>>(&mut self, mut args: I) {
let dest_node_id = match args
.next()
.and_then(parse_emoji_id_or_public_key_or_node_id)
.map(either_to_node_id)
{
Some(n) => n,
None => {
println!("Please enter a valid destination public key or emoji id");
println!("ping-peer [hex public key or emoji id]");
return;
},
};

self.command_handler.ping_peer(dest_node_id)
}

/// Function to process the ban-peer command
fn process_ban_peer<'a, I: Iterator<Item = &'a str>>(&mut self, mut args: I, must_ban: bool) {
let node_id = match args
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/services/liveness/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl LivenessHandle {
}
}

/// Returns a fused event stream for the liveness service
/// Returns an event stream for the liveness service
pub fn get_event_stream(&self) -> LivenessEventReceiver {
self.event_stream_sender.subscribe()
}
Expand Down

0 comments on commit a04a2a6

Please sign in to comment.