diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 71c2cbc421..30164ebf00 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -130,11 +130,7 @@ where .obtain_base_node_wallet_rpc_client() .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; - trace!( - target: LOG_TARGET, - "Obtain RPC client {} ms", - timer.elapsed().as_millis() - ); + trace!(target: LOG_TARGET, "Obtain RPC client {} ms", timer.elapsed().as_millis()); let base_node_id = match self.wallet_connectivity.get_current_base_node_peer_node_id() { Some(n) => n, @@ -155,22 +151,14 @@ where .and_then(|metadata| { ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse) })?; - trace!( - target: LOG_TARGET, - "Obtain tip info in {} ms", - timer.elapsed().as_millis() - ); + trace!(target: LOG_TARGET, "Obtain tip info in {} ms", timer.elapsed().as_millis()); let timer = Instant::now(); let latency = match client.get_last_request_latency() { Some(latency) => latency, None => continue, }; - trace!( - target: LOG_TARGET, - "Obtain latency info in {} ms", - timer.elapsed().as_millis() - ); + trace!(target: LOG_TARGET, "Obtain latency info in {} ms", timer.elapsed().as_millis()); self.db.set_chain_metadata(chain_metadata.clone())?; diff --git a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs index 808bcd9fd3..405e53f586 100644 --- a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs +++ b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs @@ -91,7 +91,7 @@ impl BaseNodePeerManager { } /// Get the last connection attempt stats - pub fn get_last_connection_attempt(&self) -> Option { + pub fn time_since_last_connection_attempt(&self) -> Option { if let Some(stats) = self.last_connection_attempt.clone() { if stats.peer_index == self.current_peer_index { Some(stats.attempt_time.elapsed()) @@ -106,7 +106,7 @@ impl BaseNodePeerManager { impl Display for BaseNodePeerManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let last_connection_attempt = match self.get_last_connection_attempt() { + let last_connection_attempt = match self.time_since_last_connection_attempt() { Some(stats) => format!("{:?}", stats.as_secs()), None => "Never".to_string(), }; diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index ea09debde0..ffbef54b7c 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{mem, time::Duration}; +use std::{collections::HashMap, mem, time::Duration}; use log::*; use tari_comms::{ @@ -61,7 +61,7 @@ pub struct WalletConnectivityService { connectivity: ConnectivityRequester, base_node_watch_receiver: watch::Receiver>, base_node_watch: Watch>, - pools: Option, + pools: HashMap, online_status_watch: Watch, pending_requests: Vec, } @@ -85,7 +85,7 @@ impl WalletConnectivityService { connectivity, base_node_watch_receiver: base_node_watch.get_receiver(), base_node_watch, - pools: None, + pools: HashMap::new(), pending_requests: Vec::new(), online_status_watch, } @@ -105,7 +105,8 @@ impl WalletConnectivityService { Ok(_) = self.base_node_watch_receiver.changed() => { if self.base_node_watch_receiver.borrow().is_some() { // This will block the rest until the connection is established. This is what we want. - self.setup_base_node_connection().await; + trace!(target: LOG_TARGET, "start: base_node_watch_receiver.changed"); + self.check_connection().await; } }, @@ -114,6 +115,7 @@ impl WalletConnectivityService { }, _ = check_connection.tick() => { + trace!(target: LOG_TARGET, "start: check_connection.tick"); self.check_connection().await; } } @@ -121,39 +123,40 @@ impl WalletConnectivityService { } async fn check_connection(&mut self) { - match self.pools.as_ref() { - Some(pool) => { - if !pool.base_node_wallet_rpc_client.is_connected().await { - if let Some(current_base_node) = self.current_base_node() { - if let Ok(Some(connection)) = self.connectivity.get_connection(current_base_node.clone()).await - { - if connection.is_connected() { - trace!( - target: LOG_TARGET, - "Current base node peer '{}' is connected, but pool says otherwise", - current_base_node - ); - } + if let Some(peer_manager) = self.get_base_node_peer_manager() { + let current_base_node = peer_manager.get_current_peer().node_id.clone(); + trace!(target: LOG_TARGET, "check_connection: has current_base_node"); + if let Ok(Some(connection)) = self.connectivity.get_connection(current_base_node.clone()).await { + trace!(target: LOG_TARGET, "check_connection: has connection"); + if connection.is_connected() { + trace!(target: LOG_TARGET, "check_connection: is connected"); + if let Some(pool) = self.pools.get(¤t_base_node) { + trace!(target: LOG_TARGET, "check_connection: has rpc pool"); + if pool.base_node_wallet_rpc_client.is_connected().await { + trace!(target: LOG_TARGET, "check_connection: rpc pool is already connected"); + self.set_online_status(OnlineStatus::Online); + return; } + debug!( + target: LOG_TARGET, + "Peer RPC connection '{:?}' lost. Attempting to reconnect...", + self.current_base_node() + ); } - - debug!( - target: LOG_TARGET, - "Peer connection '{:?}' lost. Attempting to reconnect...", - self.current_base_node() - ); - self.setup_base_node_connection().await; + trace!(target: LOG_TARGET, "check_connection: no rpc pool for connection"); } - }, - None => { - debug!( - target: LOG_TARGET, - "No connection to '{:?}'. Attempting to connect...", - self.current_base_node() - ); - self.set_online_status(OnlineStatus::Offline); - self.setup_base_node_connection().await; - }, + trace!(target: LOG_TARGET, "check_connection: current base node has connection but not connected"); + } + trace!( + target: LOG_TARGET, + "check_connection: current base node has no connection, setup connection to: '{}'", + peer_manager + ); + self.set_online_status(OnlineStatus::Connecting); + self.setup_base_node_connection().await; + } else { + self.set_online_status(OnlineStatus::Offline); + debug!(target: LOG_TARGET, "Base node peer manger has not been set, cannot connect"); } } @@ -181,15 +184,25 @@ impl WalletConnectivityService { &mut self, reply: oneshot::Sender>, ) { - match self.pools { - Some(ref pools) => match pools.base_node_wallet_rpc_client.get().await { + let node_id = if let Some(val) = self.current_base_node() { + val + } else { + self.pending_requests.push(reply.into()); + warn!(target: LOG_TARGET, "{} wallet requests waiting for connection", self.pending_requests.len()); + return; + }; + + match self.pools.get(&node_id) { + Some(pools) => match pools.base_node_wallet_rpc_client.get().await { Ok(client) => { let _result = reply.send(client); }, Err(e) => { warn!( target: LOG_TARGET, - "Base node connection failed: {}. Reconnecting...", e + "Base node '{}' wallet RPC pool connection failed ({}). Reconnecting...", + node_id, + e ); if let Some(node_id) = self.current_base_node() { self.disconnect_base_node(node_id).await; @@ -199,13 +212,12 @@ impl WalletConnectivityService { }, None => { self.pending_requests.push(reply.into()); - if self.base_node_watch_receiver.borrow().is_none() { - warn!( - target: LOG_TARGET, - "{} requests are waiting for base node to be set", - self.pending_requests.len() - ); - } + warn!( + target: LOG_TARGET, + "Wallet RPC pool for base node `{}` not found, {} requests waiting", + node_id, + self.pending_requests.len() + ); }, } } @@ -214,15 +226,25 @@ impl WalletConnectivityService { &mut self, reply: oneshot::Sender>, ) { - match self.pools { - Some(ref pools) => match pools.base_node_sync_rpc_client.get().await { + let node_id = if let Some(val) = self.current_base_node() { + val + } else { + self.pending_requests.push(reply.into()); + warn!(target: LOG_TARGET, "{} sync requests waiting for connection", self.pending_requests.len()); + return; + }; + + match self.pools.get(&node_id) { + Some(pools) => match pools.base_node_sync_rpc_client.get().await { Ok(client) => { let _result = reply.send(client); }, Err(e) => { warn!( target: LOG_TARGET, - "Base node connection failed: {}. Reconnecting...", e + "Base node '{}' sync RPC pool connection failed ({}). Reconnecting...", + node_id, + e ); if let Some(node_id) = self.current_base_node() { self.disconnect_base_node(node_id).await; @@ -232,13 +254,12 @@ impl WalletConnectivityService { }, None => { self.pending_requests.push(reply.into()); - if self.base_node_watch_receiver.borrow().is_none() { - warn!( - target: LOG_TARGET, - "{} requests are waiting for base node to be set", - self.pending_requests.len() - ); - } + warn!( + target: LOG_TARGET, + "Sync RPC pool for base node `{}` not found, {} requests waiting", + node_id, + self.pending_requests.len() + ); }, } } @@ -260,36 +281,24 @@ impl WalletConnectivityService { Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id), Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e), } - self.pools = None; + self.pools.remove(&node_id); }; } async fn setup_base_node_connection(&mut self) { - // Check current connection status - if let Some(current_base_node) = self.current_base_node() { - if let Ok(Some(connection)) = self.connectivity.get_connection(current_base_node.clone()).await { - if connection.is_connected() { - trace!(target: LOG_TARGET, "Already connected to base node peer: '{}'", current_base_node); - self.set_online_status(OnlineStatus::Online); - return; - } - } - } let mut peer_manager = if let Some(val) = self.get_base_node_peer_manager() { val } else { - self.set_online_status(OnlineStatus::Offline); return; }; - - // We are not connected or not connected to one of the required peers; establish connection - self.pools = None; - self.set_online_status(OnlineStatus::Connecting); - trace!(target: LOG_TARGET, "Setup base node connection to: '{}'", peer_manager); loop { - let node_id = if let Some(time) = peer_manager.get_last_connection_attempt() { + let node_id = if let Some(time) = peer_manager.time_since_last_connection_attempt() { if time < Duration::from_secs(COOL_OFF_PERIOD) { - peer_manager.get_next_peer().node_id + if peer_manager.get_current_peer().node_id == peer_manager.get_next_peer().node_id { + // If we only have one peer in the list, wait a bit before retrying + time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await; + } + peer_manager.get_current_peer().node_id } else { peer_manager.get_current_peer().node_id } @@ -302,10 +311,23 @@ impl WalletConnectivityService { target: LOG_TARGET, "Attempting base node peer '{}'... (last attempt {:?})", node_id, - peer_manager.get_last_connection_attempt() + peer_manager.time_since_last_connection_attempt() ); + self.pools.remove(&node_id); match self.try_setup_rpc_pool(node_id.clone()).await { Ok(true) => { + if self.peer_list_change_detected(&peer_manager) { + debug!( + target: LOG_TARGET, + "The peer list has changed while connecting, aborting connection attempt." + ); + self.set_online_status(OnlineStatus::Offline); + break; + } + self.base_node_watch.send(Some(peer_manager.clone())); + if let Err(e) = self.notify_pending_requests().await { + warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e); + } self.set_online_status(OnlineStatus::Online); debug!( target: LOG_TARGET, @@ -318,7 +340,6 @@ impl WalletConnectivityService { target: LOG_TARGET, "The peer has changed while connecting. Attempting to connect to new base node." ); - continue; }, Err(WalletConnectivityError::ConnectivityError(ConnectivityError::DialCancelled)) => { debug!( @@ -327,7 +348,6 @@ impl WalletConnectivityService { Duration::from_secs(CONNECTIVITY_WAIT).as_secs() ); time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await; - continue; }, Err(e) => { warn!(target: LOG_TARGET, "{}", e); @@ -335,23 +355,47 @@ impl WalletConnectivityService { self.disconnect_base_node(node_id).await; time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await; } - continue; }, } + if self.peer_list_change_detected(&peer_manager) { + debug!( + target: LOG_TARGET, + "The peer list has changed while connecting, aborting connection attempt." + ); + self.set_online_status(OnlineStatus::Offline); + break; + } } + } - self.base_node_watch.send(Some(peer_manager)); + fn peer_list_change_detected(&self, peer_manager: &BaseNodePeerManager) -> bool { + if let Some(current) = self.get_base_node_peer_manager() { + current + .get_state() + .1 + .iter() + .map(|p| p.node_id.clone()) + .collect::>() != + peer_manager + .get_state() + .1 + .iter() + .map(|p| p.node_id.clone()) + .collect::>() + } else { + true + } } fn set_online_status(&self, status: OnlineStatus) { self.online_status_watch.send(status); } - async fn try_setup_rpc_pool(&mut self, peer: NodeId) -> Result { - let conn = match self.try_dial_peer(peer.clone()).await? { + async fn try_setup_rpc_pool(&mut self, peer_node_id: NodeId) -> Result { + let conn = match self.try_dial_peer(peer_node_id.clone()).await? { Some(c) => c, None => { - warn!(target: LOG_TARGET, "Could not dial base node peer '{}'", peer); + warn!(target: LOG_TARGET, "Could not dial base node peer '{}'", peer_node_id); return Ok(false); }, }; @@ -360,13 +404,12 @@ impl WalletConnectivityService { "Successfully established peer connection to base node '{}'", conn.peer_node_id() ); - self.pools = Some(ClientPoolContainer { + self.pools.insert(peer_node_id.clone(), ClientPoolContainer { base_node_sync_rpc_client: conn.create_rpc_client_pool(1, Default::default()), base_node_wallet_rpc_client: conn .create_rpc_client_pool(self.config.base_node_rpc_pool_size, Default::default()), }); - self.notify_pending_requests().await?; - debug!(target: LOG_TARGET, "Successfully established RPC connection to '{}'", peer); + debug!(target: LOG_TARGET, "Successfully established RPC connection to base node '{}'", peer_node_id); Ok(true) } diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index 7b8739d7e6..78ec9ea055 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -103,6 +103,7 @@ async fn it_dials_peer_when_base_node_is_set() { #[tokio::test] async fn it_resolves_many_pending_rpc_session_requests() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` let (mut handle, mock_server, mock_state, _shutdown) = setup().await; let base_node_peer = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; @@ -132,7 +133,8 @@ async fn it_resolves_many_pending_rpc_session_requests() { } #[tokio::test] -async fn it_changes_to_a_new_base_node() { +async fn it_changes_to_a_new_base_node_if_online() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` let (mut handle, mock_server, mock_state, _shutdown) = setup().await; let base_node_peer1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let conn1 = mock_server.create_mockimpl_connection(base_node_peer1.to_peer()).await; @@ -221,6 +223,7 @@ async fn it_changes_to_a_new_base_node_if_preferred_is_offline() { #[tokio::test] async fn it_gracefully_handles_connect_fail_reconnect() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` let (mut handle, mock_server, mock_state, _shutdown) = setup().await; let base_node_peer = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let mut conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await;