Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sync rpc client pool to wallet connectivity #3199

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 19 additions & 12 deletions base_layer/core/src/base_node/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,6 @@
mod service;
#[cfg(feature = "base_node")]
use crate::base_node::StateMachineHandle;
use crate::proto::{
base_node::{
FetchMatchingUtxos,
FetchUtxosResponse,
Signatures,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
TxSubmissionResponse,
},
types::{Signature, Transaction},
};
#[cfg(feature = "base_node")]
use crate::{
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
Expand All @@ -44,6 +32,22 @@ use crate::{
#[cfg(feature = "base_node")]
pub use service::BaseNodeWalletRpcService;

use crate::{
proto,
proto::{
base_node::{
FetchMatchingUtxos,
FetchUtxosResponse,
Signatures,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
TxSubmissionResponse,
},
types::{Signature, Transaction},
},
};

use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms_rpc_macros::tari_rpc;

Expand Down Expand Up @@ -72,6 +76,9 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {

#[rpc(method = 5)]
async fn get_tip_info(&self, request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus>;

#[rpc(method = 6)]
async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
13 changes: 13 additions & 0 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput},
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
base_node::{
FetchMatchingUtxos,
Expand Down Expand Up @@ -327,4 +328,16 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
is_synced,
}))
}

async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
let height = request.into_message();
let header = self
.db()
.fetch_header(height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("Header not found at height {}", height)))?;

Ok(Response::new(header.into()))
}
}
4 changes: 2 additions & 2 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
let peer_node_id = self.update_connectivity_status().await;
let mut client = self
.wallet_connectivity
.obtain_base_node_rpc_client()
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;
trace!(
debug!(
target: LOG_TARGET,
"Base node {} latency: {} ms",
peer_node_id,
Expand Down
23 changes: 20 additions & 3 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use futures::{
SinkExt,
};
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease};
use tari_core::base_node::rpc;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>),
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
SetBaseNode(NodeId),
}

Expand Down Expand Up @@ -68,7 +69,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// node/nodes. It will be blocked until this is happening. The ONLY other time it will return is if the node is

/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_rpc_client(&mut self) -> Option<RpcClientLease<rpc::BaseNodeWalletRpcClient>> {
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
// Under what conditions do the (1) mpsc channel and (2) oneshot channel error?
// (1) when the receiver has been dropped
Expand All @@ -85,6 +86,22 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

/// Obtain a BaseNodeSyncRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeSyncRpcClient rpc session from a currently selected base
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// node/nodes. It will be blocked until this is happening. The ONLY other time it will return is if the node is

/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will this be used in the wallet? Is it for get_chain_metadata?

let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx))
.await
.ok()?;

reply_rx.await.ok()
}

pub async fn get_connectivity_status(&mut self) -> OnlineStatus {
self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline)
}
Expand Down
111 changes: 93 additions & 18 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{RpcClientLease, RpcClientPool},
};
use tari_core::base_node::rpc;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::time;

const LOG_TARGET: &str = "wallet::connectivity";
Expand All @@ -54,9 +54,14 @@ pub struct WalletConnectivityService {
request_stream: Fuse<mpsc::Receiver<WalletConnectivityRequest>>,
connectivity: ConnectivityRequester,
base_node_watch: Watch<Option<NodeId>>,
base_node_wallet_rpc_client_pool: Option<RpcClientPool<rpc::BaseNodeWalletRpcClient>>,
pools: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_base_node_rpc_requests: Vec<oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>>,
pending_requests: Vec<ReplyOneshot>,
}

struct ClientPoolContainer {
pub base_node_wallet_rpc_client: RpcClientPool<BaseNodeWalletRpcClient>,
pub base_node_sync_rpc_client: RpcClientPool<BaseNodeSyncRpcClient>,
}

impl WalletConnectivityService {
Expand All @@ -72,8 +77,8 @@ impl WalletConnectivityService {
request_stream: request_stream.fuse(),
connectivity,
base_node_watch,
base_node_wallet_rpc_client_pool: None,
pending_base_node_rpc_requests: Vec::new(),
pools: None,
pending_requests: Vec::new(),
online_status_watch,
}
}
Expand All @@ -100,7 +105,10 @@ impl WalletConnectivityService {
use WalletConnectivityRequest::*;
match request {
ObtainBaseNodeWalletRpcClient(reply) => {
self.handle_get_base_node_wallet_rpc_client(reply).await;
self.handle_pool_request(reply.into()).await;
},
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},

SetBaseNode(peer) => {
Expand All @@ -109,12 +117,20 @@ impl WalletConnectivityService {
}
}

async fn handle_pool_request(&mut self, reply: ReplyOneshot) {
use ReplyOneshot::*;
match reply {
WalletRpc(tx) => self.handle_get_base_node_wallet_rpc_client(tx).await,
SyncRpc(tx) => self.handle_get_base_node_sync_rpc_client(tx).await,
}
}

async fn handle_get_base_node_wallet_rpc_client(
&mut self,
reply: oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>,
reply: oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>,
) {
match self.base_node_wallet_rpc_client_pool {
Some(ref pool) => match pool.get().await {
match self.pools {
Some(ref pools) => match pools.base_node_wallet_rpc_client.get().await {
Ok(client) => {
let _ = reply.send(client);
},
Expand All @@ -124,16 +140,47 @@ impl WalletConnectivityService {
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_base_node_rpc_requests.push(reply);
self.pending_requests.push(reply.into());
},
},
None => {
self.pending_base_node_rpc_requests.push(reply);
self.pending_requests.push(reply.into());
if self.base_node_watch.borrow().is_none() {
warn!(
target: LOG_TARGET,
"{} requests are waiting for base node to be set",
self.pending_base_node_rpc_requests.len()
self.pending_requests.len()
);
}
},
}
}

async fn handle_get_base_node_sync_rpc_client(
&mut self,
reply: oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>,
) {
match self.pools {
Some(ref pools) => match pools.base_node_sync_rpc_client.get().await {
Ok(client) => {
let _ = reply.send(client);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_requests.push(reply.into());
},
},
None => {
self.pending_requests.push(reply.into());
if self.base_node_watch.borrow().is_none() {
warn!(
target: LOG_TARGET,
"{} requests are waiting for base node to be set",
self.pending_requests.len()
);
}
},
Expand All @@ -151,12 +198,12 @@ impl WalletConnectivityService {
}

fn set_base_node_peer(&mut self, peer: NodeId) {
self.base_node_wallet_rpc_client_pool = None;
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

async fn setup_base_node_connection(&mut self, peer: NodeId) {
self.base_node_wallet_rpc_client_pool = None;
self.pools = None;
loop {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -194,8 +241,10 @@ impl WalletConnectivityService {
"Successfully established peer connection to base node {}",
conn.peer_node_id()
);
let pool = conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size);
self.base_node_wallet_rpc_client_pool = Some(pool);
self.pools = Some(ClientPoolContainer {
base_node_sync_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
base_node_wallet_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
Copy link
Contributor

@hansieodendaal hansieodendaal Aug 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base_node_wallet_rpc_client should have its own pool size configuration setting and not reuse the one from base_node_sync_rpc_client. Both pool size values should be present in the config.toml configuration file for case by case tuning. A busy console wallet needs to carry out much more than base_node_rpc_pool_size: 10, conversations in parallel, specifically where the base_node_wallet_rpc_client is concerned, if it is going to be used for the transaction monitoring protocols.

});
self.notify_pending_requests().await?;
debug!(
target: LOG_TARGET,
Expand All @@ -206,14 +255,40 @@ impl WalletConnectivityService {
}

async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> {
let current_pending = mem::take(&mut self.pending_base_node_rpc_requests);
let current_pending = mem::take(&mut self.pending_requests);
for reply in current_pending {
if reply.is_canceled() {
continue;
}

self.handle_get_base_node_wallet_rpc_client(reply).await;
self.handle_pool_request(reply).await;
}
Ok(())
}
}

enum ReplyOneshot {
WalletRpc(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
SyncRpc(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
}

impl ReplyOneshot {
pub fn is_canceled(&self) -> bool {
use ReplyOneshot::*;
match self {
WalletRpc(tx) => tx.is_canceled(),
SyncRpc(tx) => tx.is_canceled(),
}
}
}

impl From<oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>> for ReplyOneshot {
fn from(tx: oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>) -> Self {
ReplyOneshot::WalletRpc(tx)
}
}
impl From<oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>> for ReplyOneshot {
fn from(tx: oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>) -> Self {
ReplyOneshot::SyncRpc(tx)
}
}
12 changes: 6 additions & 6 deletions base_layer/wallet/src/connectivity_service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn it_dials_peer_when_base_node_is_set() {
// Now a connection will given to the service
mock_state.add_active_connection(conn).await;

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}

Expand All @@ -106,7 +106,7 @@ async fn it_resolves_many_pending_rpc_session_requests() {
let pending_requests = iter::repeat_with(|| {
let mut handle = handle.clone();
task::spawn(async move {
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
rpc_client.is_connected()
})
})
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn it_changes_to_a_new_base_node() {
assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1);
let _ = mock_state.take_calls().await;

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());

// Initiate a connection to the base node
Expand All @@ -150,7 +150,7 @@ async fn it_changes_to_a_new_base_node() {
mock_state.expect_dial_peer(base_node_peer2.node_id()).await;
assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1);

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}

Expand Down Expand Up @@ -178,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() {
let barrier = barrier.clone();
async move {
barrier.wait().await;
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}
});
Expand Down Expand Up @@ -215,7 +215,7 @@ async fn it_gracefully_handles_multiple_connection_failures() {
let barrier = barrier.clone();
async move {
barrier.wait().await;
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}
});
Expand Down
Loading