Skip to content

Commit

Permalink
fix: improve responsiveness of wallet base node switching
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 24, 2021
1 parent 9bd4760 commit 90f4f74
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 58 deletions.
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct BaseNodeServiceConfig {
impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(5),
base_node_monitor_refresh_interval: Duration::from_secs(3),
base_node_rpc_pool_size: 10,
request_max_age: Duration::from_secs(60),
event_channel_size: 250,
Expand Down
13 changes: 8 additions & 5 deletions base_layer/wallet/src/base_node_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,21 @@ where T: WalletBackend + 'static
context.spawn_when_ready(move |handles| async move {
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();

let service = BaseNodeService::new(
let result = BaseNodeService::new(
config,
request_stream,
wallet_connectivity,
event_publisher,
handles.get_shutdown_signal(),
db,
)
.start();
futures::pin_mut!(service);
let _ = service.await;
info!(target: LOG_TARGET, "Wallet Base Node Service shutdown");
.start()
.await;

info!(
target: LOG_TARGET,
"Wallet Base Node Service shutdown with result {:?}", result
);
});

Ok(())
Expand Down
54 changes: 38 additions & 16 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ use crate::{
storage::database::{WalletBackend, WalletDatabase},
};
use chrono::Utc;
use futures::future;
use log::*;
use std::{
convert::TryFrom,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::protocol::rpc::RpcError;
use tari_comms::protocol::rpc::{RpcError, __macro_reexports::future::Either};
use tokio::{sync::RwLock, time};

const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";
Expand Down Expand Up @@ -109,33 +111,45 @@ where
}

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
let mut base_node_watch = self.wallet_connectivity.get_current_base_node_watcher();
loop {
let start = Instant::now();
let timer = Instant::now();
let mut client = self
.wallet_connectivity
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
trace!(
debug!(
target: LOG_TARGET,
"Obtain RPC client {} ms",
start.elapsed().as_millis()
timer.elapsed().as_millis()
);

let base_node_id = match self.wallet_connectivity.get_current_base_node_id() {
Some(n) => n,
None => continue,
};

let start = Instant::now();
let tip_info = client.get_tip_info().await?;
let timer = Instant::now();
let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await {
Some(tip_info) => tip_info?,
None => {
self.map_state(move |_| Default::default()).await;
continue;
},
};
let chain_metadata = tip_info
.metadata
.ok_or_else(|| BaseNodeMonitorError::InvalidBaseNodeResponse("Tip info no metadata".to_string()))
.and_then(|metadata| {
ChainMetadata::try_from(metadata).map_err(BaseNodeMonitorError::InvalidBaseNodeResponse)
})?;
let latency = start.elapsed();
debug!(target: LOG_TARGET, "get_tip_info took {:.2?}", timer.elapsed());

let latency = match client.get_last_request_latency().await? {
Some(latency) => latency,
None => continue,
};

let is_synced = tip_info.is_synced;
debug!(
Expand All @@ -147,25 +161,20 @@ where
latency.as_millis()
);

let start = Instant::now();
self.db.set_chain_metadata(chain_metadata.clone()).await?;
trace!(
target: LOG_TARGET,
"Update metadata in db {} ms",
start.elapsed().as_millis()
);

let start = Instant::now();
self.map_state(move |_| BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;
trace!(target: LOG_TARGET, "Publish event {} ms", start.elapsed().as_millis());

time::sleep(self.interval).await
let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.map_state(move |_| Default::default()).await;
}
}

// loop only exits on shutdown/error
Expand Down Expand Up @@ -200,3 +209,16 @@ enum BaseNodeMonitorError {
#[error("Wallet storage error: {0}")]
WalletStorageError(#[from] WalletStorageError),
}

async fn interrupt<F1, F2>(interrupt: F1, fut: F2) -> Option<F2::Output>
where
F1: Future,
F2: Future,
{
tokio::pin!(interrupt);
tokio::pin!(fut);
match future::select(interrupt, fut).await {
Either::Left(_) => None,
Either::Right((v, _)) => Some(v),
}
}
58 changes: 22 additions & 36 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,14 @@ use tokio::sync::RwLock;
const LOG_TARGET: &str = "wallet::base_node_service::service";

/// State determined from Base Node Service Requests
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct BaseNodeState {
pub chain_metadata: Option<ChainMetadata>,
pub is_synced: Option<bool>,
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
}

impl Default for BaseNodeState {
fn default() -> Self {
Self {
chain_metadata: None,
is_synced: None,
updated: None,
latency: None,
}
}
}

/// The base node service is responsible for handling requests to be sent to the connected base node.
pub struct BaseNodeService<T>
where T: WalletBackend + 'static
Expand All @@ -69,7 +58,7 @@ where T: WalletBackend + 'static
request_stream: Option<Receiver<BaseNodeServiceRequest, Result<BaseNodeServiceResponse, BaseNodeServiceError>>>,
wallet_connectivity: WalletConnectivityHandle,
event_publisher: BaseNodeEventSender,
shutdown_signal: Option<ShutdownSignal>,
shutdown_signal: ShutdownSignal,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<T>,
}
Expand All @@ -90,7 +79,7 @@ where T: WalletBackend + 'static
request_stream: Some(request_stream),
wallet_connectivity,
event_publisher,
shutdown_signal: Some(shutdown_signal),
shutdown_signal,
state: Default::default(),
db,
}
Expand All @@ -103,33 +92,13 @@ where T: WalletBackend + 'static

/// Starts the service.
pub async fn start(mut self) -> Result<(), BaseNodeServiceError> {
let shutdown_signal = self
.shutdown_signal
.take()
.expect("Wallet Base Node Service initialized without shutdown signal");

let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
self.event_publisher.clone(),
);

tokio::spawn({
let shutdown_signal = shutdown_signal.clone();
async move {
let monitor_fut = monitor.run();
futures::pin_mut!(monitor_fut);
future::select(shutdown_signal, monitor_fut).await;
}
});
self.spawn_monitor();

let mut request_stream = self
.request_stream
.take()
.expect("Wallet Base Node Service initialized without request_stream")
.take_until(shutdown_signal);
.take_until(self.shutdown_signal.clone());

info!(target: LOG_TARGET, "Wallet Base Node Service started");
while let Some(request_context) = request_stream.next().await {
Expand All @@ -152,6 +121,23 @@ where T: WalletBackend + 'static
Ok(())
}

fn spawn_monitor(&self) {
let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
self.event_publisher.clone(),
);

let shutdown_signal = self.shutdown_signal.clone();
tokio::spawn(async move {
let monitor_fut = monitor.run();
futures::pin_mut!(monitor_fut);
future::select(shutdown_signal, monitor_fut).await;
});
}

/// This handler is called when requests arrive from the various streams
async fn handle_request(
&mut self,
Expand Down
5 changes: 5 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl WalletConnectivityHandle {
#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
if let Some(peer) = self.base_node_watch.borrow().as_ref() {
if peer.public_key == base_node_peer.public_key {
return;
}
}
self.base_node_watch.send(Some(base_node_peer));
}

Expand Down

0 comments on commit 90f4f74

Please sign in to comment.