diff --git a/src/p2p/addressbook.rs b/src/p2p/addressbook.rs index 7db89e878..5416c22cc 100644 --- a/src/p2p/addressbook.rs +++ b/src/p2p/addressbook.rs @@ -22,6 +22,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll}, }; +use std::task::Waker; #[derive(Default, Debug, Copy, Clone)] pub struct Config { @@ -37,10 +38,11 @@ pub struct Behaviour { connections: HashMap>, peer_addresses: HashMap>, peer_keepalive: HashSet, - can_reconnect: HashMap, + can_reconnect: HashMap, peer_reconnect_attempts: HashMap, reconnect_peers: FutureMap, config: Config, + waker: Option, } impl Debug for Behaviour { @@ -80,10 +82,14 @@ impl Behaviour { self.keep_peer_alive(peer_id); } - if let Some(opt) = opt.reconnect_opt() { - self.can_reconnect.insert(*peer_id, opt); + if let Some((duration, attempts)) = opt.reconnect_opt() { + self.can_reconnect.insert(*peer_id, (duration, attempts, false)); } - + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + true } @@ -100,6 +106,9 @@ impl Behaviour { self.dont_keep_peer_alive(peer_id); } } + if let Some(waker) = self.waker.take() { + waker.wake(); + } true } @@ -107,6 +116,9 @@ impl Behaviour { let removed = self.peer_addresses.remove(peer_id).is_some(); if removed { self.dont_keep_peer_alive(peer_id); + if let Some(waker) = self.waker.take() { + waker.wake(); + } } removed } @@ -176,7 +188,13 @@ impl Behaviour { .insert(connection_id); self.reconnect_peers.remove(&peer_id); - + self.peer_reconnect_attempts.remove(&peer_id); + + if let Entry::Occupied(mut e) = self.can_reconnect.entry(peer_id) { + let (_, _, backoff) = e.get_mut(); + *backoff = false; + } + if self.config.keep_connection_alive && !self.peer_keepalive.contains(&peer_id) { self.keep_peer_alive(&peer_id); } @@ -208,12 +226,15 @@ impl Behaviour { list.remove(&connection_id); if list.is_empty() && remaining_established == 0 { entry.remove(); - if let Some((duration, attempts)) = self.can_reconnect.get(&peer_id) { - if *attempts == 0 { + if let Some((duration, attempts, backoff)) = self.can_reconnect.get(&peer_id) { + if *attempts == 0 || *backoff { return; } self.reconnect_peers.insert(peer_id, Delay::new(*duration)); self.peer_reconnect_attempts.insert(peer_id, 0); + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } } @@ -285,16 +306,28 @@ impl Behaviour { DialError::Transport(_) => {} } - if let Some((duration, attempts)) = self.can_reconnect.get(&peer_id) { + if let Some((duration, attempts, backoff)) = self.can_reconnect.get_mut(&peer_id) { let current_attempts = self.peer_reconnect_attempts.entry(peer_id).or_insert(0); if *current_attempts >= *attempts { + let current_attempts = *current_attempts; self.peer_reconnect_attempts.remove(&peer_id); self.reconnect_peers.remove(&peer_id); + *backoff = true; + tracing::debug!(%peer_id, current_attempts, max_attempts = attempts, "unable to reconnect. backing off on attempts at reconnection"); return; } - + + if *backoff { + return; + } + *current_attempts += 1; + tracing::info!(%peer_id, next_attempt = *current_attempts, max_attempts = attempts, "attempting reconnection to peer"); + + // We perform this check because entry may have been ejected from `FutureMap` at some point after its task was completed. + // In which case, we would check to determine if the entry exist and if so, reset the delay, otherwise insert a new one + if !self.reconnect_peers.contains_key(&peer_id) { self.reconnect_peers.insert(peer_id, Delay::new(*duration)); } else { @@ -304,6 +337,10 @@ impl Behaviour { .expect("timer available"); timer.reset(*duration); } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } }