Skip to content

Commit

Permalink
Add disconnect/re-connect to base node monitor
Browse files Browse the repository at this point in the history
- This PR disconnects and re-sets a wallet's base node peer if a valid RPC
connection to that peer cannot be obtained, but only for the wallet's base
node monitor service. The spinoff is that all subsequent RPC connections to
the same peer/node will benefit from the comms connection service retrying
to re-establish the peer connection. As the wallet's base node monitor
service runs in a slow loop, it will assist other services that are
dependent on the same base node peer connection; if one service suffers
due to a bad/stale peer connection all services suffer.
- Also implementing sending transaction to all connected peers as per tari-project#3239
- Fixed the base_node_service_config not being initialized with values
from the config file.
  • Loading branch information
hansieodendaal committed Sep 1, 2021
1 parent cedb1d4 commit 7b9b3ae
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 3 deletions.
14 changes: 14 additions & 0 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
},
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
// debug!(target: LOG_TARGET, "Reconnect current base node peer...");
// Disconnecting and re-setting the base node peer would revive a stale peer connection - most
// the client can do. As the wallet's base node monitor service runs in a slow loop, it will
// assist other services that are dependent on the same base node peer connection; if one service
// suffers due to a bad/stale peer connection all services suffer.
// self.reconnect_current_base_node_peer().await;
// time::sleep(self.interval).await;
continue;
},
Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
Expand All @@ -93,6 +100,13 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
);
}

// async fn reconnect_current_base_node_peer(&mut self) {
// if let Some(peer) = self.wallet_connectivity.get_current_base_node_peer() {
// self.wallet_connectivity.disconnect_base_node(peer.clone()).await;
// let _ = self.wallet_connectivity.set_base_node(peer).await;
// };
// }

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
loop {
let mut client = self
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tokio::sync::{mpsc, oneshot, watch};
pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
DisconnectBaseNode(Box<Peer>),
}

#[derive(Clone)]
Expand Down Expand Up @@ -59,6 +60,13 @@ impl WalletConnectivityHandle {
Ok(())
}

pub async fn disconnect_base_node(&mut self, base_node_peer: Peer) {
let _ = self
.sender
.send(WalletConnectivityRequest::DisconnectBaseNode(Box::new(base_node_peer)))
.await;
}

/// Obtain a BaseNodeWalletRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base
Expand Down
31 changes: 28 additions & 3 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ impl WalletConnectivityService {
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},
DisconnectBaseNode(peer) => {
self.disconnect_base_node(peer.node_id.clone()).await;
},
}
}

Expand All @@ -154,6 +157,9 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
if let Some(node_id) = self.current_base_node() {
self.disconnect_base_node(node_id).await;
};
self.pending_requests.push(reply.into());
},
},
Expand Down Expand Up @@ -184,6 +190,9 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
if let Some(node_id) = self.current_base_node() {
self.disconnect_base_node(node_id).await;
};
self.pending_requests.push(reply.into());
},
},
Expand All @@ -204,6 +213,14 @@ impl WalletConnectivityService {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

async fn disconnect_base_node(&mut self, node_id: NodeId) {
if let Ok(Some(connection)) = self.connectivity.get_connection(node_id.clone()).await {
if connection.clone().disconnect().await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id);
}
};
}

async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
Expand All @@ -227,15 +244,18 @@ impl WalletConnectivityService {
},
Ok(false) => {
// Retry with updated peer
self.disconnect_base_node(node_id).await;
time::sleep(self.config.base_node_monitor_refresh_interval).await;
continue;
},
Err(e) => {
if self.current_base_node() != Some(node_id) {
if self.current_base_node() != Some(node_id.clone()) {
self.set_online_status(OnlineStatus::Connecting);
} else {
self.set_online_status(OnlineStatus::Offline);
}
warn!(target: LOG_TARGET, "{}", e);
self.disconnect_base_node(node_id).await;
time::sleep(self.config.base_node_monitor_refresh_interval).await;
continue;
},
Expand All @@ -249,9 +269,14 @@ impl WalletConnectivityService {

async fn try_setup_rpc_pool(&mut self, peer: NodeId) -> Result<bool, WalletConnectivityError> {
self.connectivity.add_managed_peers(vec![peer.clone()]).await?;
let conn = match self.try_dial_peer(peer).await? {
let conn = match self.try_dial_peer(peer.clone()).await? {
Some(peer) => peer,
None => return Ok(false),
None => {
return {
warn!(target: LOG_TARGET, "Could not dial base node peer {}", peer);
Ok(false)
}
},
};
debug!(
target: LOG_TARGET,
Expand Down

0 comments on commit 7b9b3ae

Please sign in to comment.