Skip to content

Commit

Permalink
feat: track ping failures and disconnect (#3597)
Browse files Browse the repository at this point in the history
Description
---
Keep track of failed pings in liveness and disconnect peer's that do not respond.

Motivation and Context
---
This allows dead connections to be detected and disconnected. This is mainly advantageous in tor connections
which can silently disconnect.

How Has This Been Tested?
---
Unit test
Manually - though no occurrence of this case
  • Loading branch information
sdbondi authored Nov 21, 2021
1 parent 519290c commit 91fe921
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 32 deletions.
3 changes: 3 additions & 0 deletions base_layer/p2p/src/services/liveness/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct LivenessConfig {
pub num_peers_per_round: usize,
/// Peers to include in every auto ping round (Default: <empty>)
pub monitored_peers: Vec<NodeId>,
/// Number of ping failures to tolerate before disconnecting the peer. A value of zero disables this feature.
pub max_allowed_ping_failures: usize,
}

impl Default for LivenessConfig {
Expand All @@ -46,6 +48,7 @@ impl Default for LivenessConfig {
refresh_random_pool_interval: Duration::from_secs(2 * 60 * 60),
num_peers_per_round: 8,
monitored_peers: Default::default(),
max_allowed_ping_failures: 2,
}
}
}
4 changes: 3 additions & 1 deletion base_layer/p2p/src/services/liveness/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_comms::{connectivity::ConnectivityError, message::MessageError};
use tari_comms::{connectivity::ConnectivityError, message::MessageError, PeerConnectionError};
use tari_comms_dht::{outbound::DhtOutboundError, DhtActorError};
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
Expand All @@ -31,6 +31,8 @@ pub enum LivenessError {
DhtOutboundError(#[from] DhtOutboundError),
#[error("Connectivity error: `{0}`")]
ConnectivityError(#[from] ConnectivityError),
#[error("Peer connection error: `{0}`")]
PeerConnectionError(#[from] PeerConnectionError),
#[error("DHT actor error: `{0}`")]
DhtActorError(#[from] DhtActorError),
#[error("Failed to send a pong message")]
Expand Down
27 changes: 26 additions & 1 deletion base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ where
if let Err(err) = self.start_ping_round().await {
warn!(target: LOG_TARGET, "Error when pinging peers: {}", err);
}
if self.config.max_allowed_ping_failures > 0 {
if let Err(err) = self.disconnect_failed_peers().await {
error!(target: LOG_TARGET, "Error occurred while disconnecting failed peers: {}", err);
}
}
},

// Incoming messages from the Comms layer
Expand Down Expand Up @@ -179,7 +184,7 @@ where
return Ok(());
}

let maybe_latency = self.state.record_pong(ping_pong_msg.nonce);
let maybe_latency = self.state.record_pong(ping_pong_msg.nonce, &node_id);
debug!(
target: LOG_TARGET,
"Received pong from peer '{}' with useragent '{}'. {} (Trace: {})",
Expand Down Expand Up @@ -285,6 +290,26 @@ where
Ok(())
}

async fn disconnect_failed_peers(&mut self) -> Result<(), LivenessError> {
let max_allowed_ping_failures = self.config.max_allowed_ping_failures;
for node_id in self
.state
.failed_pings_iter()
.filter(|(_, n)| **n > max_allowed_ping_failures)
.map(|(node_id, _)| node_id)
{
if let Some(mut conn) = self.connectivity.get_connection(node_id.clone()).await? {
debug!(
target: LOG_TARGET,
"Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures
);
conn.disconnect().await?;
}
}
self.state.clear_failed_pings();
Ok(())
}

fn publish_event(&mut self, event: LivenessEvent) {
let _ = self.event_publisher.send(Arc::new(event)).map_err(|_| {
trace!(
Expand Down
111 changes: 81 additions & 30 deletions base_layer/p2p/src/services/liveness/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use super::LOG_TARGET;
use crate::proto::liveness::MetadataKey;
use chrono::{NaiveDateTime, Utc};
use log::*;
use std::{
collections::{hash_map::RandomState, HashMap},
collections::HashMap,
convert::TryInto,
time::Duration,
time::{Duration, Instant},
};
use tari_comms::peer_manager::NodeId;

const LATENCY_SAMPLE_WINDOW_SIZE: usize = 25;
const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(20);
const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(40);

/// Represents metadata in a ping/pong message.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
Expand Down Expand Up @@ -62,7 +63,7 @@ impl From<HashMap<i32, Vec<u8>>> for Metadata {
}
}

impl From<Metadata> for HashMap<i32, Vec<u8>, RandomState> {
impl From<Metadata> for HashMap<i32, Vec<u8>> {
fn from(metadata: Metadata) -> Self {
metadata.inner
}
Expand All @@ -71,8 +72,9 @@ impl From<Metadata> for HashMap<i32, Vec<u8>, RandomState> {
/// State for the LivenessService.
#[derive(Default, Debug)]
pub struct LivenessState {
inflight_pings: HashMap<u64, (NodeId, NaiveDateTime)>,
inflight_pings: HashMap<u64, (NodeId, Instant)>,
peer_latency: HashMap<NodeId, AverageLatency>,
failed_pings: HashMap<NodeId, usize>,

pings_received: usize,
pongs_received: usize,
Expand Down Expand Up @@ -133,18 +135,27 @@ impl LivenessState {

/// Adds a ping to the inflight ping list, while noting the current time that a ping was sent.
pub fn add_inflight_ping(&mut self, nonce: u64, node_id: NodeId) {
let now = Utc::now().naive_utc();
self.inflight_pings.insert(nonce, (node_id, now));
self.inflight_pings.insert(nonce, (node_id, Instant::now()));
self.clear_stale_inflight_pings();
}

/// Clears inflight ping requests which have not responded
/// Clears inflight ping requests which have not responded and adds them to failed_ping counter
fn clear_stale_inflight_pings(&mut self) {
self.inflight_pings = self
let (inflight, expired) = self
.inflight_pings
.drain()
.filter(|(_, (_, time))| convert_to_std_duration(Utc::now().naive_utc() - *time) <= MAX_INFLIGHT_TTL)
.collect();
.partition(|(_, (_, time))| time.elapsed() <= MAX_INFLIGHT_TTL);

self.inflight_pings = inflight;

for (_, (node_id, _)) in expired {
self.failed_pings
.entry(node_id)
.and_modify(|v| {
*v = *v + 1;
})
.or_insert(1);
}
}

/// Returns true if the nonce is inflight, otherwise false
Expand All @@ -153,19 +164,25 @@ impl LivenessState {
}

/// Records a pong. Specifically, the pong counter is incremented and
/// a latency sample is added and calculated.
pub fn record_pong(&mut self, nonce: u64) -> Option<u32> {
/// a latency sample is added and calculated. The given `peer` must match the recorded peer
pub fn record_pong(&mut self, nonce: u64, sent_by: &NodeId) -> Option<u32> {
self.inc_pongs_received();

match self.inflight_pings.remove_entry(&nonce) {
Some((_, (node_id, sent_time))) => {
let now = Utc::now().naive_utc();
let latency = self
.add_latency_sample(node_id, convert_to_std_duration(now - sent_time))
.calc_average();
Some(latency)
},
None => None,
self.failed_pings.remove_entry(&sent_by);

let (node_id, _) = self.inflight_pings.get(&nonce)?;
if node_id == sent_by {
self.inflight_pings
.remove(&nonce)
.map(|(node_id, sent_time)| self.add_latency_sample(node_id, sent_time.elapsed()).calc_average())
} else {
warn!(
target: LOG_TARGET,
"Peer {} sent an nonce for another peer {}. This could indicate malicious behaviour or a bug. \
Ignoring.",
sent_by,
node_id
);
None
}
}

Expand Down Expand Up @@ -195,11 +212,14 @@ impl LivenessState {
// num_peers in map will always be > 0
.map(|latency| latency / num_peers as u32)
}
}

/// Convert `chrono::Duration` to `std::time::Duration`
pub(super) fn convert_to_std_duration(old_duration: chrono::Duration) -> Duration {
Duration::from_millis(old_duration.num_milliseconds() as u64)
pub fn failed_pings_iter(&self) -> impl Iterator<Item = (&NodeId, &usize)> {
self.failed_pings.iter()
}

pub fn clear_failed_pings(&mut self) {
self.failed_pings.clear();
}
}

/// A very simple implementation for calculating average latency. Samples are added in milliseconds and the mean average
Expand Down Expand Up @@ -299,9 +319,9 @@ mod test {
let mut state = LivenessState::new();

let node_id = NodeId::default();
state.add_inflight_ping(123, node_id);
state.add_inflight_ping(123, node_id.clone());

let latency = state.record_pong(123).unwrap();
let latency = state.record_pong(123, &node_id).unwrap();
assert!(latency < 50);
}

Expand All @@ -311,4 +331,35 @@ mod test {
state.set_metadata_entry(MetadataKey::ChainMetadata, b"dummy-data".to_vec());
assert_eq!(state.metadata().get(MetadataKey::ChainMetadata).unwrap(), b"dummy-data");
}

#[test]
fn clear_stale_inflight_pings() {
let mut state = LivenessState::new();

let peer1 = NodeId::default();
state.add_inflight_ping(1, peer1.clone());
let peer2 = NodeId::from_public_key(&Default::default());
state.add_inflight_ping(2, peer2.clone());
state.add_inflight_ping(3, peer2.clone());

assert!(state.failed_pings.get(&peer1).is_none());
assert!(state.failed_pings.get(&peer2).is_none());

// MAX_INFLIGHT_TTL passes
for n in [1, 2, 3] {
let (_, time) = state.inflight_pings.get_mut(&n).unwrap();
*time = Instant::now() - (MAX_INFLIGHT_TTL + Duration::from_secs(1));
}

state.clear_stale_inflight_pings();
let n = state.failed_pings.get(&peer1).unwrap();
assert_eq!(*n, 1);
let n = state.failed_pings.get(&peer2).unwrap();
assert_eq!(*n, 2);

assert!(state.record_pong(2, &peer2).is_none());
let n = state.failed_pings.get(&peer1).unwrap();
assert_eq!(*n, 1);
assert!(state.failed_pings.get(&peer2).is_none());
}
}

0 comments on commit 91fe921

Please sign in to comment.