From 8deb9c6b4ab6651f525d28a7e4f0b335cc1d6da1 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Tue, 2 Jun 2020 10:53:07 +0200 Subject: [PATCH] Reinitialize pools if node transitions to offline Fetch a fresh neighbour pool and random pool if node transitions to offline. --- .../core/src/base_node/states/block_sync.rs | 3 +- comms/dht/src/connectivity/mod.rs | 102 +++++++++++------- comms/dht/src/connectivity/test.rs | 47 +++++++- .../test_utils/mocks/connectivity_manager.rs | 19 +--- 4 files changed, 109 insertions(+), 62 deletions(-) diff --git a/base_layer/core/src/base_node/states/block_sync.rs b/base_layer/core/src/base_node/states/block_sync.rs index b9c403302a..fb3e680bcb 100644 --- a/base_layer/core/src/base_node/states/block_sync.rs +++ b/base_layer/core/src/base_node/states/block_sync.rs @@ -719,7 +719,7 @@ async fn exclude_sync_peer(sync_peers: &mut Vec, sync_peer: NodeId) -> R Ok(()) } -// Ban and disconnect the provided sync peer. +// Ban and disconnect the provided sync peer if this node is online async fn ban_sync_peer_if_online( shared: &mut BaseNodeStateMachine, sync_peers: &mut Vec, @@ -746,7 +746,6 @@ async fn ban_sync_peer( ) -> Result<(), BlockSyncError> { info!(target: LOG_TARGET, "Banning peer {} from local node.", sync_peer); - sync_peers.retain(|p| *p != sync_peer); shared.connectivity.ban_peer(sync_peer.clone(), ban_duration).await?; exclude_sync_peer(sync_peers, sync_peer).await } diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index 7737d42a41..9a828b8b94 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -176,6 +176,22 @@ impl DhtConnectivity { Ok(()) } + async fn reinitialize_pools(&mut self) -> Result<(), DhtConnectivityError> { + info!( + target: LOG_TARGET, + "Reinitializing neighbour pool. Draining neighbour list (len={})", + self.neighbours.len(), + ); + for neighbour in self.neighbours.drain(..) { + self.connectivity.remove_peer(neighbour).await?; + } + + self.initialize_neighbours().await?; + self.refresh_random_pool().await?; + + Ok(()) + } + async fn handle_connectivity_event(&mut self, event: &ConnectivityEvent) -> Result<(), DhtConnectivityError> { use ConnectivityEvent::*; match event { @@ -201,6 +217,9 @@ impl DhtConnectivity { } } }, + ConnectivityStateOffline => { + self.reinitialize_pools().await?; + }, _ => {}, } @@ -208,52 +227,53 @@ impl DhtConnectivity { } async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> { - if self.should_refresh_random_pool() { - let mut random_peers = self - .fetch_random_peers(self.config.num_random_nodes, &self.neighbours) - .await?; - if random_peers.is_empty() { - warn!( - target: LOG_TARGET, - "Unable to refresh random peer pool because there are insufficient known peers", - ); - } else { - let (keep, to_remove) = self - .random_pool - .iter() - .partition::, _>(|n| random_peers.contains(n)); - // Remove the peers that we want to keep from the `random_peers` to be added - random_peers.retain(|n| !keep.contains(&n)); - debug!( - target: LOG_TARGET, - "Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})", - random_peers.len(), - keep.len(), - to_remove.len() - ); - trace!( - target: LOG_TARGET, - "Random peers: Adding = {:?}, Removing = {:?}", - random_peers, - to_remove - ); - self.connectivity.add_managed_peers(random_peers).await?; - for n in to_remove { - self.connectivity.remove_peer(n.clone()).await?; - } - } - self.random_pool_last_refresh = Some(Instant::now()); + let should_refresh = self.config.num_random_nodes > 0 && + self.random_pool_last_refresh + .map(|instant| instant.elapsed() >= self.config.connectivity_random_pool_refresh) + .unwrap_or(true); + if should_refresh { + self.refresh_random_pool().await?; } Ok(()) } - #[inline] - fn should_refresh_random_pool(&self) -> bool { - self.config.num_random_nodes > 0 && - self.random_pool_last_refresh - .map(|instant| instant.elapsed() >= self.config.connectivity_random_pool_refresh) - .unwrap_or(true) + async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> { + let mut random_peers = self + .fetch_random_peers(self.config.num_random_nodes, &self.neighbours) + .await?; + if random_peers.is_empty() { + warn!( + target: LOG_TARGET, + "Unable to refresh random peer pool because there are insufficient known peers", + ); + } else { + let (keep, to_remove) = self + .random_pool + .iter() + .partition::, _>(|n| random_peers.contains(n)); + // Remove the peers that we want to keep from the `random_peers` to be added + random_peers.retain(|n| !keep.contains(&n)); + debug!( + target: LOG_TARGET, + "Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})", + random_peers.len(), + keep.len(), + to_remove.len() + ); + trace!( + target: LOG_TARGET, + "Random peers: Adding = {:?}, Removing = {:?}", + random_peers, + to_remove + ); + self.connectivity.add_managed_peers(random_peers).await?; + for n in to_remove { + self.connectivity.remove_peer(n.clone()).await?; + } + } + self.random_pool_last_refresh = Some(Instant::now()); + Ok(()) } async fn handle_new_peer_connected(&mut self, conn: &PeerConnection) -> Result<(), DhtConnectivityError> { diff --git a/comms/dht/src/connectivity/test.rs b/comms/dht/src/connectivity/test.rs index c4e5b98adf..a73d6d7145 100644 --- a/comms/dht/src/connectivity/test.rs +++ b/comms/dht/src/connectivity/test.rs @@ -107,7 +107,7 @@ async fn initialize() { // Wait for calls to add peers async_assert!( - connectivity.call_count() >= 2, + connectivity.call_count().await >= 2, max_attempts = 20, interval = Duration::from_millis(10), ); @@ -146,7 +146,7 @@ async fn added_neighbours() { // Wait for calls to add peers async_assert!( - connectivity.call_count() >= 1, + connectivity.call_count().await >= 1, max_attempts = 20, interval = Duration::from_millis(10), ); @@ -158,7 +158,7 @@ async fn added_neighbours() { connectivity.publish_event(ConnectivityEvent::PeerConnected(conn)); async_assert!( - connectivity.call_count() >= 2, + connectivity.call_count().await >= 2, max_attempts = 20, interval = Duration::from_millis(10), ); @@ -173,6 +173,47 @@ async fn added_neighbours() { assert!(managed.contains(closer_peer.node_id())); } +#[tokio_macros::test_basic] +async fn reinitialize_pools_when_offline() { + let node_identity = make_node_identity(); + let node_identities = repeat_with(|| make_node_identity()).take(5).collect::>(); + // Closest to this node + let peers = node_identities.iter().map(|ni| ni.to_peer()).collect::>(); + + let config = DhtConfig { + num_neighbouring_nodes: 5, + num_random_nodes: 0, + ..Default::default() + }; + let (dht_connectivity, _, connectivity, _, _, _shutdown) = setup(config, node_identity, peers).await; + dht_connectivity.spawn(); + + // Wait for calls to add peers + async_assert!( + connectivity.call_count().await >= 1, + max_attempts = 20, + interval = Duration::from_millis(10), + ); + + let calls = connectivity.take_calls().await; + assert_eq!(count_string_occurrences(&calls, &["AddManagedPeers"]), 1); + + connectivity.publish_event(ConnectivityEvent::ConnectivityStateOffline); + + async_assert!( + connectivity.call_count().await >= 1, + max_attempts = 20, + interval = Duration::from_millis(10), + ); + let calls = connectivity.take_calls().await; + assert_eq!(count_string_occurrences(&calls, &["RemovePeer"]), 5); + assert_eq!(count_string_occurrences(&calls, &["AddManagedPeers"]), 1); + + // Check that the closer neighbour was added to managed peers + let managed = connectivity.get_managed_peers().await; + assert_eq!(managed.len(), 5); +} + #[tokio_macros::test_basic] async fn insert_neighbour() { let node_identity = make_node_identity(); diff --git a/comms/src/test_utils/mocks/connectivity_manager.rs b/comms/src/test_utils/mocks/connectivity_manager.rs index 454612e736..07a85927be 100644 --- a/comms/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/src/test_utils/mocks/connectivity_manager.rs @@ -26,13 +26,7 @@ use crate::{ peer_manager::NodeId, }; use futures::{channel::mpsc, lock::Mutex, stream::Fuse, StreamExt}; -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; +use std::{collections::HashMap, sync::Arc}; use tokio::{sync::broadcast, task}; pub fn create_connectivity_mock() -> (ConnectivityRequester, ConnectivityManagerMock) { @@ -46,7 +40,6 @@ pub fn create_connectivity_mock() -> (ConnectivityRequester, ConnectivityManager #[derive(Debug, Clone)] pub struct ConnectivityManagerMockState { - call_count: Arc, calls: Arc>>, active_conns: Arc>>, selected_connections: Arc>>, @@ -57,7 +50,6 @@ pub struct ConnectivityManagerMockState { impl ConnectivityManagerMockState { pub fn new(event_tx: broadcast::Sender>) -> Self { Self { - call_count: Arc::new(AtomicUsize::new(0)), calls: Arc::new(Mutex::new(Vec::new())), event_tx, selected_connections: Arc::new(Mutex::new(Vec::new())), @@ -66,10 +58,6 @@ impl ConnectivityManagerMockState { } } - fn inc_call_count(&self) { - self.call_count.fetch_add(1, Ordering::SeqCst); - } - async fn add_call(&self, call_str: String) { self.calls.lock().await.push(call_str); } @@ -91,8 +79,8 @@ impl ConnectivityManagerMockState { } #[allow(dead_code)] - pub fn call_count(&self) -> usize { - self.call_count.load(Ordering::SeqCst) + pub async fn call_count(&self) -> usize { + self.calls.lock().await.len() } #[allow(dead_code)] @@ -139,7 +127,6 @@ impl ConnectivityManagerMock { async fn handle_request(&self, req: ConnectivityRequest) { use ConnectivityRequest::*; - self.state.inc_call_count(); self.state.add_call(format!("{:?}", req)).await; match req { DialPeer(node_id, reply_tx) => {