Skip to content

Commit

Permalink
fix: add backoff to reconnect attempts and add waker
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 20, 2024
1 parent 2016513 commit 0518b79
Showing 1 changed file with 46 additions and 9 deletions.
55 changes: 46 additions & 9 deletions src/p2p/addressbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
task::{Context, Poll},
};
use std::task::Waker;

#[derive(Default, Debug, Copy, Clone)]
pub struct Config {
Expand All @@ -37,10 +38,11 @@ pub struct Behaviour {
connections: HashMap<PeerId, HashSet<ConnectionId>>,
peer_addresses: HashMap<PeerId, HashSet<Multiaddr>>,
peer_keepalive: HashSet<PeerId>,
can_reconnect: HashMap<PeerId, (Duration, u8)>,
can_reconnect: HashMap<PeerId, (Duration, u8, bool)>,
peer_reconnect_attempts: HashMap<PeerId, u8>,
reconnect_peers: FutureMap<PeerId, Delay>,
config: Config,
waker: Option<Waker>,
}

impl Debug for Behaviour {
Expand Down Expand Up @@ -80,10 +82,14 @@ impl Behaviour {
self.keep_peer_alive(peer_id);
}

if let Some(opt) = opt.reconnect_opt() {
self.can_reconnect.insert(*peer_id, opt);
if let Some((duration, attempts)) = opt.reconnect_opt() {
self.can_reconnect.insert(*peer_id, (duration, attempts, false));
}


if let Some(waker) = self.waker.take() {
waker.wake();
}

true
}

Expand All @@ -100,13 +106,19 @@ impl Behaviour {
self.dont_keep_peer_alive(peer_id);
}
}
if let Some(waker) = self.waker.take() {
waker.wake();
}
true
}

pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
let removed = self.peer_addresses.remove(peer_id).is_some();
if removed {
self.dont_keep_peer_alive(peer_id);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
removed
}
Expand Down Expand Up @@ -176,7 +188,13 @@ impl Behaviour {
.insert(connection_id);

self.reconnect_peers.remove(&peer_id);

self.peer_reconnect_attempts.remove(&peer_id);

if let Entry::Occupied(mut e) = self.can_reconnect.entry(peer_id) {
let (_, _, backoff) = e.get_mut();
*backoff = false;
}

if self.config.keep_connection_alive && !self.peer_keepalive.contains(&peer_id) {
self.keep_peer_alive(&peer_id);
}
Expand Down Expand Up @@ -208,12 +226,15 @@ impl Behaviour {
list.remove(&connection_id);
if list.is_empty() && remaining_established == 0 {
entry.remove();
if let Some((duration, attempts)) = self.can_reconnect.get(&peer_id) {
if *attempts == 0 {
if let Some((duration, attempts, backoff)) = self.can_reconnect.get(&peer_id) {
if *attempts == 0 || *backoff {
return;
}
self.reconnect_peers.insert(peer_id, Delay::new(*duration));
self.peer_reconnect_attempts.insert(peer_id, 0);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}
}
Expand Down Expand Up @@ -285,16 +306,28 @@ impl Behaviour {
DialError::Transport(_) => {}
}

if let Some((duration, attempts)) = self.can_reconnect.get(&peer_id) {
if let Some((duration, attempts, backoff)) = self.can_reconnect.get_mut(&peer_id) {
let current_attempts = self.peer_reconnect_attempts.entry(peer_id).or_insert(0);
if *current_attempts >= *attempts {
let current_attempts = *current_attempts;
self.peer_reconnect_attempts.remove(&peer_id);
self.reconnect_peers.remove(&peer_id);
*backoff = true;
tracing::debug!(%peer_id, current_attempts, max_attempts = attempts, "unable to reconnect. backing off on attempts at reconnection");
return;
}


if *backoff {
return;
}

*current_attempts += 1;

tracing::info!(%peer_id, next_attempt = *current_attempts, max_attempts = attempts, "attempting reconnection to peer");

// We perform this check because entry may have been ejected from `FutureMap` at some point after its task was completed.
// In which case, we would check to determine if the entry exist and if so, reset the delay, otherwise insert a new one

if !self.reconnect_peers.contains_key(&peer_id) {
self.reconnect_peers.insert(peer_id, Delay::new(*duration));
} else {
Expand All @@ -304,6 +337,10 @@ impl Behaviour {
.expect("timer available");
timer.reset(*duration);
}

if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

Expand Down

0 comments on commit 0518b79

Please sign in to comment.