Skip to content

Commit

Permalink
Reinitialize pools if node transitions to offline (#1941)
Browse files Browse the repository at this point in the history
Merge pull request #1941

Fetch a fresh neighbour pool and random pool if node transitions to
offline.

* pull/1941/head:
  Reinitialize pools if node transitions to offline
  • Loading branch information
sdbondi committed Jun 3, 2020
2 parents ca24b0a + 8deb9c6 commit 997b14d
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 62 deletions.
3 changes: 1 addition & 2 deletions base_layer/core/src/base_node/states/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ async fn exclude_sync_peer(sync_peers: &mut Vec<NodeId>, sync_peer: NodeId) -> R
Ok(())
}

// Ban and disconnect the provided sync peer.
// Ban and disconnect the provided sync peer if this node is online
async fn ban_sync_peer_if_online<B: BlockchainBackend + 'static>(
shared: &mut BaseNodeStateMachine<B>,
sync_peers: &mut Vec<NodeId>,
Expand All @@ -746,7 +746,6 @@ async fn ban_sync_peer<B: BlockchainBackend + 'static>(
) -> Result<(), BlockSyncError>
{
info!(target: LOG_TARGET, "Banning peer {} from local node.", sync_peer);
sync_peers.retain(|p| *p != sync_peer);
shared.connectivity.ban_peer(sync_peer.clone(), ban_duration).await?;
exclude_sync_peer(sync_peers, sync_peer).await
}
Expand Down
102 changes: 61 additions & 41 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ impl DhtConnectivity {
Ok(())
}

async fn reinitialize_pools(&mut self) -> Result<(), DhtConnectivityError> {
info!(
target: LOG_TARGET,
"Reinitializing neighbour pool. Draining neighbour list (len={})",
self.neighbours.len(),
);
for neighbour in self.neighbours.drain(..) {
self.connectivity.remove_peer(neighbour).await?;
}

self.initialize_neighbours().await?;
self.refresh_random_pool().await?;

Ok(())
}

async fn handle_connectivity_event(&mut self, event: &ConnectivityEvent) -> Result<(), DhtConnectivityError> {
use ConnectivityEvent::*;
match event {
Expand All @@ -201,59 +217,63 @@ impl DhtConnectivity {
}
}
},
ConnectivityStateOffline => {
self.reinitialize_pools().await?;
},
_ => {},
}

Ok(())
}

async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
if self.should_refresh_random_pool() {
let mut random_peers = self
.fetch_random_peers(self.config.num_random_nodes, &self.neighbours)
.await?;
if random_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Unable to refresh random peer pool because there are insufficient known peers",
);
} else {
let (keep, to_remove) = self
.random_pool
.iter()
.partition::<Vec<_>, _>(|n| random_peers.contains(n));
// Remove the peers that we want to keep from the `random_peers` to be added
random_peers.retain(|n| !keep.contains(&n));
debug!(
target: LOG_TARGET,
"Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})",
random_peers.len(),
keep.len(),
to_remove.len()
);
trace!(
target: LOG_TARGET,
"Random peers: Adding = {:?}, Removing = {:?}",
random_peers,
to_remove
);
self.connectivity.add_managed_peers(random_peers).await?;
for n in to_remove {
self.connectivity.remove_peer(n.clone()).await?;
}
}
self.random_pool_last_refresh = Some(Instant::now());
let should_refresh = self.config.num_random_nodes > 0 &&
self.random_pool_last_refresh
.map(|instant| instant.elapsed() >= self.config.connectivity_random_pool_refresh)
.unwrap_or(true);
if should_refresh {
self.refresh_random_pool().await?;
}

Ok(())
}

#[inline]
fn should_refresh_random_pool(&self) -> bool {
self.config.num_random_nodes > 0 &&
self.random_pool_last_refresh
.map(|instant| instant.elapsed() >= self.config.connectivity_random_pool_refresh)
.unwrap_or(true)
async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> {
let mut random_peers = self
.fetch_random_peers(self.config.num_random_nodes, &self.neighbours)
.await?;
if random_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Unable to refresh random peer pool because there are insufficient known peers",
);
} else {
let (keep, to_remove) = self
.random_pool
.iter()
.partition::<Vec<_>, _>(|n| random_peers.contains(n));
// Remove the peers that we want to keep from the `random_peers` to be added
random_peers.retain(|n| !keep.contains(&n));
debug!(
target: LOG_TARGET,
"Adding new peers to random peer pool (#new = {}, #keeping = {}, #removing = {})",
random_peers.len(),
keep.len(),
to_remove.len()
);
trace!(
target: LOG_TARGET,
"Random peers: Adding = {:?}, Removing = {:?}",
random_peers,
to_remove
);
self.connectivity.add_managed_peers(random_peers).await?;
for n in to_remove {
self.connectivity.remove_peer(n.clone()).await?;
}
}
self.random_pool_last_refresh = Some(Instant::now());
Ok(())
}

async fn handle_new_peer_connected(&mut self, conn: &PeerConnection) -> Result<(), DhtConnectivityError> {
Expand Down
47 changes: 44 additions & 3 deletions comms/dht/src/connectivity/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn initialize() {

// Wait for calls to add peers
async_assert!(
connectivity.call_count() >= 2,
connectivity.call_count().await >= 2,
max_attempts = 20,
interval = Duration::from_millis(10),
);
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn added_neighbours() {

// Wait for calls to add peers
async_assert!(
connectivity.call_count() >= 1,
connectivity.call_count().await >= 1,
max_attempts = 20,
interval = Duration::from_millis(10),
);
Expand All @@ -158,7 +158,7 @@ async fn added_neighbours() {
connectivity.publish_event(ConnectivityEvent::PeerConnected(conn));

async_assert!(
connectivity.call_count() >= 2,
connectivity.call_count().await >= 2,
max_attempts = 20,
interval = Duration::from_millis(10),
);
Expand All @@ -173,6 +173,47 @@ async fn added_neighbours() {
assert!(managed.contains(closer_peer.node_id()));
}

#[tokio_macros::test_basic]
async fn reinitialize_pools_when_offline() {
let node_identity = make_node_identity();
let node_identities = repeat_with(|| make_node_identity()).take(5).collect::<Vec<_>>();
// Closest to this node
let peers = node_identities.iter().map(|ni| ni.to_peer()).collect::<Vec<_>>();

let config = DhtConfig {
num_neighbouring_nodes: 5,
num_random_nodes: 0,
..Default::default()
};
let (dht_connectivity, _, connectivity, _, _, _shutdown) = setup(config, node_identity, peers).await;
dht_connectivity.spawn();

// Wait for calls to add peers
async_assert!(
connectivity.call_count().await >= 1,
max_attempts = 20,
interval = Duration::from_millis(10),
);

let calls = connectivity.take_calls().await;
assert_eq!(count_string_occurrences(&calls, &["AddManagedPeers"]), 1);

connectivity.publish_event(ConnectivityEvent::ConnectivityStateOffline);

async_assert!(
connectivity.call_count().await >= 1,
max_attempts = 20,
interval = Duration::from_millis(10),
);
let calls = connectivity.take_calls().await;
assert_eq!(count_string_occurrences(&calls, &["RemovePeer"]), 5);
assert_eq!(count_string_occurrences(&calls, &["AddManagedPeers"]), 1);

// Check that the closer neighbour was added to managed peers
let managed = connectivity.get_managed_peers().await;
assert_eq!(managed.len(), 5);
}

#[tokio_macros::test_basic]
async fn insert_neighbour() {
let node_identity = make_node_identity();
Expand Down
19 changes: 3 additions & 16 deletions comms/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ use crate::{
peer_manager::NodeId,
};
use futures::{channel::mpsc, lock::Mutex, stream::Fuse, StreamExt};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use std::{collections::HashMap, sync::Arc};
use tokio::{sync::broadcast, task};

pub fn create_connectivity_mock() -> (ConnectivityRequester, ConnectivityManagerMock) {
Expand All @@ -46,7 +40,6 @@ pub fn create_connectivity_mock() -> (ConnectivityRequester, ConnectivityManager

#[derive(Debug, Clone)]
pub struct ConnectivityManagerMockState {
call_count: Arc<AtomicUsize>,
calls: Arc<Mutex<Vec<String>>>,
active_conns: Arc<Mutex<HashMap<NodeId, PeerConnection>>>,
selected_connections: Arc<Mutex<Vec<PeerConnection>>>,
Expand All @@ -57,7 +50,6 @@ pub struct ConnectivityManagerMockState {
impl ConnectivityManagerMockState {
pub fn new(event_tx: broadcast::Sender<Arc<ConnectivityEvent>>) -> Self {
Self {
call_count: Arc::new(AtomicUsize::new(0)),
calls: Arc::new(Mutex::new(Vec::new())),
event_tx,
selected_connections: Arc::new(Mutex::new(Vec::new())),
Expand All @@ -66,10 +58,6 @@ impl ConnectivityManagerMockState {
}
}

fn inc_call_count(&self) {
self.call_count.fetch_add(1, Ordering::SeqCst);
}

async fn add_call(&self, call_str: String) {
self.calls.lock().await.push(call_str);
}
Expand All @@ -91,8 +79,8 @@ impl ConnectivityManagerMockState {
}

#[allow(dead_code)]
pub fn call_count(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
pub async fn call_count(&self) -> usize {
self.calls.lock().await.len()
}

#[allow(dead_code)]
Expand Down Expand Up @@ -139,7 +127,6 @@ impl ConnectivityManagerMock {

async fn handle_request(&self, req: ConnectivityRequest) {
use ConnectivityRequest::*;
self.state.inc_call_count();
self.state.add_call(format!("{:?}", req)).await;
match req {
DialPeer(node_id, reply_tx) => {
Expand Down

0 comments on commit 997b14d

Please sign in to comment.