From 90f4f7411c1914c932d7a6d4321218c68f79a9c7 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Sun, 24 Oct 2021 18:34:42 +0400 Subject: [PATCH] fix: improve responsiveness of wallet base node switching --- .../wallet/src/base_node_service/config.rs | 2 +- .../wallet/src/base_node_service/mod.rs | 13 +++-- .../wallet/src/base_node_service/monitor.rs | 54 ++++++++++++----- .../wallet/src/base_node_service/service.rs | 58 +++++++------------ .../wallet/src/connectivity_service/handle.rs | 5 ++ 5 files changed, 74 insertions(+), 58 deletions(-) diff --git a/base_layer/wallet/src/base_node_service/config.rs b/base_layer/wallet/src/base_node_service/config.rs index e7468a809e..f6ee2637fb 100644 --- a/base_layer/wallet/src/base_node_service/config.rs +++ b/base_layer/wallet/src/base_node_service/config.rs @@ -36,7 +36,7 @@ pub struct BaseNodeServiceConfig { impl Default for BaseNodeServiceConfig { fn default() -> Self { Self { - base_node_monitor_refresh_interval: Duration::from_secs(5), + base_node_monitor_refresh_interval: Duration::from_secs(3), base_node_rpc_pool_size: 10, request_max_age: Duration::from_secs(60), event_channel_size: 250, diff --git a/base_layer/wallet/src/base_node_service/mod.rs b/base_layer/wallet/src/base_node_service/mod.rs index 94ff82aaf6..66d01d679a 100644 --- a/base_layer/wallet/src/base_node_service/mod.rs +++ b/base_layer/wallet/src/base_node_service/mod.rs @@ -82,7 +82,7 @@ where T: WalletBackend + 'static context.spawn_when_ready(move |handles| async move { let wallet_connectivity = handles.expect_handle::(); - let service = BaseNodeService::new( + let result = BaseNodeService::new( config, request_stream, wallet_connectivity, @@ -90,10 +90,13 @@ where T: WalletBackend + 'static handles.get_shutdown_signal(), db, ) - .start(); - futures::pin_mut!(service); - let _ = service.await; - info!(target: LOG_TARGET, "Wallet Base Node Service shutdown"); + .start() + .await; + + info!( + target: LOG_TARGET, + "Wallet Base Node Service shutdown with result {:?}", result + ); }); Ok(()) diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index ba46b481ef..49218f31af 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -30,14 +30,16 @@ use crate::{ storage::database::{WalletBackend, WalletDatabase}, }; use chrono::Utc; +use futures::future; use log::*; use std::{ convert::TryFrom, + future::Future, sync::Arc, time::{Duration, Instant}, }; use tari_common_types::chain_metadata::ChainMetadata; -use tari_comms::protocol::rpc::RpcError; +use tari_comms::protocol::rpc::{RpcError, __macro_reexports::future::Either}; use tokio::{sync::RwLock, time}; const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor"; @@ -109,17 +111,18 @@ where } async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> { + let mut base_node_watch = self.wallet_connectivity.get_current_base_node_watcher(); loop { - let start = Instant::now(); + let timer = Instant::now(); let mut client = self .wallet_connectivity .obtain_base_node_wallet_rpc_client() .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; - trace!( + debug!( target: LOG_TARGET, "Obtain RPC client {} ms", - start.elapsed().as_millis() + timer.elapsed().as_millis() ); let base_node_id = match self.wallet_connectivity.get_current_base_node_id() { @@ -127,15 +130,26 @@ where None => continue, }; - let start = Instant::now(); - let tip_info = client.get_tip_info().await?; + let timer = Instant::now(); + let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await { + Some(tip_info) => tip_info?, + None => { + self.map_state(move |_| Default::default()).await; + continue; + }, + }; let chain_metadata = tip_info .metadata .ok_or_else(|| BaseNodeMonitorError::InvalidBaseNodeResponse("Tip info no metadata".to_string())) .and_then(|metadata| { ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse) })?; - let latency = start.elapsed(); + debug!(target: LOG_TARGET, "get_tip_info took {:.2?}", timer.elapsed()); + + let latency = match client.get_last_request_latency().await? { + Some(latency) => latency, + None => continue, + }; let is_synced = tip_info.is_synced; debug!( @@ -147,15 +161,8 @@ where latency.as_millis() ); - let start = Instant::now(); self.db.set_chain_metadata(chain_metadata.clone()).await?; - trace!( - target: LOG_TARGET, - "Update metadata in db {} ms", - start.elapsed().as_millis() - ); - let start = Instant::now(); self.map_state(move |_| BaseNodeState { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), @@ -163,9 +170,11 @@ where latency: Some(latency), }) .await; - trace!(target: LOG_TARGET, "Publish event {} ms", start.elapsed().as_millis()); - time::sleep(self.interval).await + let delay = time::sleep(self.interval.saturating_sub(latency)); + if interrupt(base_node_watch.changed(), delay).await.is_none() { + self.map_state(move |_| Default::default()).await; + } } // loop only exits on shutdown/error @@ -200,3 +209,16 @@ enum BaseNodeMonitorError { #[error("Wallet storage error: {0}")] WalletStorageError(#[from] WalletStorageError), } + +async fn interrupt(interrupt: F1, fut: F2) -> Option +where + F1: Future, + F2: Future, +{ + tokio::pin!(interrupt); + tokio::pin!(fut); + match future::select(interrupt, fut).await { + Either::Left(_) => None, + Either::Right((v, _)) => Some(v), + } +} diff --git a/base_layer/wallet/src/base_node_service/service.rs b/base_layer/wallet/src/base_node_service/service.rs index 416ba1eab5..c00af91d47 100644 --- a/base_layer/wallet/src/base_node_service/service.rs +++ b/base_layer/wallet/src/base_node_service/service.rs @@ -42,7 +42,7 @@ use tokio::sync::RwLock; const LOG_TARGET: &str = "wallet::base_node_service::service"; /// State determined from Base Node Service Requests -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct BaseNodeState { pub chain_metadata: Option, pub is_synced: Option, @@ -50,17 +50,6 @@ pub struct BaseNodeState { pub latency: Option, } -impl Default for BaseNodeState { - fn default() -> Self { - Self { - chain_metadata: None, - is_synced: None, - updated: None, - latency: None, - } - } -} - /// The base node service is responsible for handling requests to be sent to the connected base node. pub struct BaseNodeService where T: WalletBackend + 'static @@ -69,7 +58,7 @@ where T: WalletBackend + 'static request_stream: Option>>, wallet_connectivity: WalletConnectivityHandle, event_publisher: BaseNodeEventSender, - shutdown_signal: Option, + shutdown_signal: ShutdownSignal, state: Arc>, db: WalletDatabase, } @@ -90,7 +79,7 @@ where T: WalletBackend + 'static request_stream: Some(request_stream), wallet_connectivity, event_publisher, - shutdown_signal: Some(shutdown_signal), + shutdown_signal, state: Default::default(), db, } @@ -103,33 +92,13 @@ where T: WalletBackend + 'static /// Starts the service. pub async fn start(mut self) -> Result<(), BaseNodeServiceError> { - let shutdown_signal = self - .shutdown_signal - .take() - .expect("Wallet Base Node Service initialized without shutdown signal"); - - let monitor = BaseNodeMonitor::new( - self.config.base_node_monitor_refresh_interval, - self.state.clone(), - self.db.clone(), - self.wallet_connectivity.clone(), - self.event_publisher.clone(), - ); - - tokio::spawn({ - let shutdown_signal = shutdown_signal.clone(); - async move { - let monitor_fut = monitor.run(); - futures::pin_mut!(monitor_fut); - future::select(shutdown_signal, monitor_fut).await; - } - }); + self.spawn_monitor(); let mut request_stream = self .request_stream .take() .expect("Wallet Base Node Service initialized without request_stream") - .take_until(shutdown_signal); + .take_until(self.shutdown_signal.clone()); info!(target: LOG_TARGET, "Wallet Base Node Service started"); while let Some(request_context) = request_stream.next().await { @@ -152,6 +121,23 @@ where T: WalletBackend + 'static Ok(()) } + fn spawn_monitor(&self) { + let monitor = BaseNodeMonitor::new( + self.config.base_node_monitor_refresh_interval, + self.state.clone(), + self.db.clone(), + self.wallet_connectivity.clone(), + self.event_publisher.clone(), + ); + + let shutdown_signal = self.shutdown_signal.clone(); + tokio::spawn(async move { + let monitor_fut = monitor.run(); + futures::pin_mut!(monitor_fut); + future::select(shutdown_signal, monitor_fut).await; + }); + } + /// This handler is called when requests arrive from the various streams async fn handle_request( &mut self, diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 77ef2ddab3..25916e51d9 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -59,6 +59,11 @@ impl WalletConnectivityHandle { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityHandle { fn set_base_node(&mut self, base_node_peer: Peer) { + if let Some(peer) = self.base_node_watch.borrow().as_ref() { + if peer.public_key == base_node_peer.public_key { + return; + } + } self.base_node_watch.send(Some(base_node_peer)); }