diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index b8ee487b9c..e7c2918fb3 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -490,7 +490,7 @@ async fn handle_outbound_tx( exclude_peers: Vec, ) -> Result<(), MempoolServiceError> { let result = outbound_message_service - .propagate( + .flood( NodeDestination::Unknown, OutboundEncryption::ClearText, exclude_peers, diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 5a2c3a7e76..90fdf62772 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -31,7 +31,11 @@ use crate::{ }; use chrono::Utc; use log::*; -use std::{convert::TryFrom, sync::Arc, time::Duration}; +use std::{ + convert::TryFrom, + sync::Arc, + time::{Duration, Instant}, +}; use tari_common_types::chain_metadata::ChainMetadata; use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcError}; use tokio::{sync::RwLock, time}; @@ -79,8 +83,13 @@ impl BaseNodeMonitor { Err(e @ BaseNodeMonitorError::RpcFailed(_)) => { warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e); debug!(target: LOG_TARGET, "Setting as OFFLINE and retrying...",); - self.set_offline().await; + // Disconnecting and re-setting the base node peer would revive a stale peer connection - most + // the client can do. As the wallet's base node monitor service runs in a slow loop, it will + // assist other services that are dependent on the same base node peer connection; if one service + // suffers due to a bad/stale peer connection all services suffer. + self.reconnect_current_base_node_peer().await; + time::delay_for(self.interval).await; continue; }, Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) | @@ -96,8 +105,9 @@ impl BaseNodeMonitor { ); } - async fn update_connectivity_status(&self) -> NodeId { + async fn update_connectivity_status(&mut self) -> NodeId { let mut watcher = self.wallet_connectivity.get_connectivity_status_watch(); + let mut start = Instant::now(); loop { use OnlineStatus::*; match watcher.recv().await.unwrap_or(Offline) { @@ -112,9 +122,27 @@ impl BaseNodeMonitor { self.set_offline().await; }, } + if Instant::now().duration_since(start) > self.interval { + warn!( + target: LOG_TARGET, + "Could not connect to base node in {}s", + Instant::now().duration_since(start).as_secs() + ); + // Disconnecting and re-setting the base node peer would revive a stale peer connection; this breaks + // the unending loop as watcher events are not guaranteed. + self.reconnect_current_base_node_peer().await; + start = Instant::now(); + } } } + async fn reconnect_current_base_node_peer(&mut self) { + if let Some(peer) = self.wallet_connectivity.get_current_base_node_peer() { + let _ = self.wallet_connectivity.disconnect_base_node(peer.clone()).await; + let _ = self.wallet_connectivity.set_base_node(peer).await; + }; + } + async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> { loop { let peer_node_id = self.update_connectivity_status().await; diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index ac218edc5e..ae6d2594be 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -36,6 +36,7 @@ use tokio::sync::watch; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), ObtainBaseNodeSyncRpcClient(oneshot::Sender>), + DisconnectBaseNode(Box), } #[derive(Clone)] @@ -63,6 +64,13 @@ impl WalletConnectivityHandle { Ok(()) } + pub async fn disconnect_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> { + self.sender + .send(WalletConnectivityRequest::DisconnectBaseNode(Box::new(base_node_peer))) + .await?; + Ok(()) + } + /// Obtain a BaseNodeWalletRpcClient. /// /// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index c0cf474b96..93c9482076 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -113,6 +113,10 @@ impl WalletConnectivityService { ObtainBaseNodeSyncRpcClient(reply) => { self.handle_pool_request(reply.into()).await; }, + + DisconnectBaseNode(peer) => { + self.disconnect_base_node(*peer).await; + }, } } @@ -205,6 +209,12 @@ impl WalletConnectivityService { self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) } + async fn disconnect_base_node(&mut self, peer: Peer) { + if let Ok(Some(connection)) = self.connectivity.get_connection(peer.node_id).await { + if connection.clone().disconnect().await.is_ok() {} + }; + } + async fn setup_base_node_connection(&mut self) { self.pools = None; loop {