diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 8e0298ca27..da4e6af512 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -31,7 +31,11 @@ use crate::{ }; use chrono::Utc; use log::*; -use std::{convert::TryFrom, sync::Arc, time::Duration}; +use std::{ + convert::TryFrom, + sync::Arc, + time::{Duration, Instant}, +}; use tari_common_types::chain_metadata::ChainMetadata; use tari_comms::protocol::rpc::RpcError; use tokio::{sync::RwLock, time}; @@ -78,6 +82,13 @@ impl BaseNodeMonitor { }, Err(e @ BaseNodeMonitorError::RpcFailed(_)) => { warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e); + debug!(target: LOG_TARGET, "Reconnect current base node peer..."); + // Disconnecting and re-setting the base node peer would revive a stale peer connection - most + // the client can do. As the wallet's base node monitor service runs in a slow loop, it will + // assist other services that are dependent on the same base node peer connection; if one service + // suffers due to a bad/stale peer connection all services suffer. + self.reconnect_current_base_node_peer().await; + time::delay_for(self.interval).await; continue; }, Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) | @@ -93,6 +104,13 @@ impl BaseNodeMonitor { ); } + async fn reconnect_current_base_node_peer(&mut self) { + if let Some(peer) = self.wallet_connectivity.get_current_base_node_peer() { + let _ = self.wallet_connectivity.disconnect_base_node(peer.clone()).await; + let _ = self.wallet_connectivity.set_base_node(peer).await; + }; + } + async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> { loop { let mut client = self diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 5a35696e14..6a310bb2dd 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -32,6 +32,7 @@ use tokio::sync::{mpsc, oneshot, watch}; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), ObtainBaseNodeSyncRpcClient(oneshot::Sender>), + DisconnectBaseNode(Box), } #[derive(Clone)] @@ -59,6 +60,13 @@ impl WalletConnectivityHandle { Ok(()) } + pub async fn disconnect_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> { + self.sender + .send(WalletConnectivityRequest::DisconnectBaseNode(Box::new(base_node_peer))) + .await?; + Ok(()) + } + /// Obtain a BaseNodeWalletRpcClient. /// /// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index 950b9a9a72..8cf881aff9 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -129,6 +129,10 @@ impl WalletConnectivityService { ObtainBaseNodeSyncRpcClient(reply) => { self.handle_pool_request(reply.into()).await; }, + + DisconnectBaseNode(peer) => { + self.disconnect_base_node(*peer).await; + }, } } @@ -204,6 +208,12 @@ impl WalletConnectivityService { self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) } + async fn disconnect_base_node(&mut self, peer: Peer) { + if let Ok(Some(connection)) = self.connectivity.get_connection(peer.node_id).await { + if connection.clone().disconnect().await.is_ok() {} + }; + } + async fn setup_base_node_connection(&mut self) { self.pools = None; loop {