Skip to content

Commit

Permalink
feat: add resilience to initial wallet connect (#6527)
Browse files Browse the repository at this point in the history
Description
---
Added resilience to the initial wallet connection in cases where a
custom base node has not been set, where the wallet will try all base
nodes or all seeds it knows of and not only one it chose at random.

Motivation and Context
---
Wallets would try indefinitely to establish an initial connection to an
offline peer.

How Has This Been Tested?
---
- Added a new unit test.
- System-level testing
Start a wallet with multiple custom seed nodes where only one is online.
Shut down the wallet after the established connection, delete its peer
db, and redo. Do this multiple times. Confirm that the wallet
establishes a connection every time.

What process can a PR reviewer use to test or verify this change?
---
Review code changes

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Sep 20, 2024
1 parent a954383 commit 6b5ef7d
Show file tree
Hide file tree
Showing 30 changed files with 375 additions and 140 deletions.
11 changes: 6 additions & 5 deletions applications/minotari_console_wallet/src/automation/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn set_base_node_peer(
println!("Setting base node peer...");
println!("{}::{}", public_key, address);
wallet
.set_base_node_peer(public_key.clone(), Some(address.clone()))
.set_base_node_peer(public_key.clone(), Some(address.clone()), None)
.await?;
Ok((public_key, address))
}
Expand Down Expand Up @@ -1885,17 +1885,18 @@ pub async fn command_runner(

let peer_config = PeerConfig::new(selected_base_node, base_node_peers, peer_seeds);

let base_node = peer_config
.get_base_node_peer()
let base_nodes = peer_config
.get_base_node_peers()
.map_err(|e| CommandError::General(e.to_string()))?;
new_wallet
.set_base_node_peer(
base_node.public_key.clone(),
base_nodes[0].public_key.clone(),
Some(
base_node
base_nodes[0]
.last_address_used()
.ok_or(CommandError::General("No address found".to_string()))?,
),
Some(base_nodes),
)
.await
.map_err(|e| CommandError::General(e.to_string()))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl wallet_server::Wallet for WalletGrpcServer {
println!("{}::{}", public_key, net_address);
let mut wallet = self.wallet.clone();
wallet
.set_base_node_peer(public_key.clone(), Some(net_address.clone()))
.set_base_node_peer(public_key.clone(), Some(net_address.clone()), None)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?;

Expand Down
19 changes: 16 additions & 3 deletions applications/minotari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant};

use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled};
use digest::crypto_common::rand_core::OsRng;
use log::*;
use minotari_app_utilities::{consts, identity_management::setup_node_identity};
#[cfg(feature = "ledger")]
Expand All @@ -41,6 +42,7 @@ use minotari_wallet::{
WalletConfig,
WalletSqlite,
};
use rand::prelude::SliceRandom;
use rpassword::prompt_password_stdout;
use rustyline::Editor;
use tari_common::{
Expand Down Expand Up @@ -570,18 +572,29 @@ fn setup_identity_from_db<D: WalletBackend + 'static>(
/// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols.
pub async fn start_wallet(
wallet: &mut WalletSqlite,
base_node: &Peer,
base_nodes: &[Peer],
wallet_mode: &WalletMode,
) -> Result<(), ExitError> {
debug!(target: LOG_TARGET, "Setting base node peer");

let net_address = base_node
if base_nodes.is_empty() {
return Err(ExitError::new(
ExitCode::WalletError,
"No base nodes configured to connect to",
));
}
let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty");
let net_address = selected_base_node
.addresses
.best()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?;

wallet
.set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone()))
.set_base_node_peer(
selected_base_node.public_key.clone(),
Some(net_address.address().clone()),
Some(base_nodes.to_vec()),
)
.await
.map_err(|e| {
ExitError::new(
Expand Down
4 changes: 2 additions & 2 deletions applications/minotari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ pub fn run_wallet_with_cli(
&mut wallet,
cli.non_interactive_mode,
))?;
let base_node_selected = base_node_config.get_base_node_peer()?;
let base_nodes_peers = base_node_config.get_base_node_peers()?;

let wallet_mode = wallet_mode(&cli, boot_mode);

// start wallet
runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?;
runtime.block_on(start_wallet(&mut wallet, &base_nodes_peers, &wallet_mode))?;

debug!(target: LOG_TARGET, "Starting app");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand All @@ -1061,6 +1062,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand Down Expand Up @@ -1099,6 +1101,7 @@ impl AppStateInner {
.set_base_node_peer(
previous.public_key.clone(),
Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl WalletEventMonitor {
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer).await;
self.trigger_base_node_peer_refresh(peer.get_current_peer()).await;
self.trigger_balance_refresh();
}
}
Expand Down
21 changes: 6 additions & 15 deletions applications/minotari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,15 @@ impl PeerConfig {

/// Get the prioritised base node peer from the PeerConfig.
/// 1. Custom Base Node
/// 2. First configured Base Node Peer
/// 3. Random configured Peer Seed
pub fn get_base_node_peer(&self) -> Result<Peer, ExitError> {
/// 2. All configured Base Node Peers (a random node will be prioritised)
/// 3. All configured Peer Seeds (a random node will be prioritised)
pub fn get_base_node_peers(&self) -> Result<Vec<Peer>, ExitError> {
if let Some(base_node) = self.base_node_custom.clone() {
Ok(base_node)
Ok(vec![base_node])
} else if !self.base_node_peers.is_empty() {
Ok(self
.base_node_peers
.first()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))?
.clone())
Ok(self.base_node_peers.clone())
} else if !self.peer_seeds.is_empty() {
// pick a random peer seed
Ok(self
.peer_seeds
.choose(&mut OsRng)
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))?
.clone())
Ok(self.peer_seeds.clone())
} else {
Err(ExitError::new(
ExitCode::ConfigError,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
timer.elapsed().as_millis()
);

let base_node_id = match self.wallet_connectivity.get_current_base_node_id() {
let base_node_id = match self.wallet_connectivity.get_current_base_node_peer_node_id() {
Some(n) => n,
None => continue,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::Display;

use tari_comms::peer_manager::Peer;
use tari_utilities::hex::Hex;

use crate::connectivity_service::WalletConnectivityError;

/// The selected peer is a current base node and an optional list of backup peers.
#[derive(Clone)]
pub struct BaseNodePeerManager {
// The current base node that the wallet is connected to
current_peer_index: usize,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Vec<Peer>,
}

impl BaseNodePeerManager {
/// Create a new BaseNodePeerManager, with the preferred peer index and a list of peers.
pub fn new(preferred_peer_index: usize, peer_list: Vec<Peer>) -> Result<Self, WalletConnectivityError> {
if preferred_peer_index >= peer_list.len() {
return Err(WalletConnectivityError::PeerIndexOutOfBounds(format!(
"Preferred index: {}, Max index: {}",
preferred_peer_index,
peer_list.len() - 1
)));
}
Ok(Self {
current_peer_index: preferred_peer_index,
peer_list,
})
}

/// Get the current peer
pub fn get_current_peer(&self) -> Peer {
self.peer_list
.get(self.current_peer_index)
.cloned()
.unwrap_or(self.peer_list[0].clone())
}

/// Get the next peer in the list
pub fn get_next_peer(&mut self) -> Peer {
self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len();
self.peer_list[self.current_peer_index].clone()
}
}

impl Display for BaseNodePeerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BaseNodePeerManager {{ current_peer_index: {}, peer_list: {:?} }}",
self.current_peer_index,
self.peer_list
.iter()
.map(|p| (p.node_id.to_hex(), p.public_key.to_hex()))
.collect::<Vec<_>>()
)
}
}
2 changes: 2 additions & 0 deletions base_layer/wallet/src/connectivity_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum WalletConnectivityError {
ConnectivityError(#[from] ConnectivityError),
#[error("Service is terminated and can no longer response to requests")]
ServiceTerminated,
#[error("Preferred peer index is out of bounds: {0}")]
PeerIndexOutOfBounds(String),
}

impl From<mpsc::SendError> for WalletConnectivityError {
Expand Down
34 changes: 23 additions & 11 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync
use tokio::sync::{mpsc, oneshot, watch};

use super::service::OnlineStatus;
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use crate::{
connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface},
util::watch::Watch,
};

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
Expand All @@ -39,14 +42,14 @@ pub enum WalletConnectivityRequest {
#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -59,16 +62,16 @@ impl WalletConnectivityHandle {

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
if let Some(peer) = self.base_node_watch.borrow().as_ref() {
if peer.public_key == base_node_peer.public_key {
fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) {
if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().public_key == base_node_peer.get_current_peer().public_key {
return;
}
}
self.base_node_watch.send(Some(base_node_peer));
}

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<BaseNodePeerManager>> {
self.base_node_watch.get_receiver()
}

Expand Down Expand Up @@ -120,15 +123,24 @@ impl WalletConnectivityInterface for WalletConnectivityHandle {
}

fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().clone())
}

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
fn get_current_base_node_peer_node_id(&self) -> Option<NodeId> {
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().node_id.clone())
}

fn is_base_node_set(&self) -> bool {
Expand Down
9 changes: 2 additions & 7 deletions base_layer/wallet/src/connectivity_service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,8 @@ impl ServiceInitializer for WalletConnectivityInitializer {

context.spawn_until_shutdown(move |handles| {
let connectivity = handles.expect_handle();
let service = WalletConnectivityService::new(
config,
receiver,
base_node_watch.get_receiver(),
online_status_watch,
connectivity,
);
let service =
WalletConnectivityService::new(config, receiver, base_node_watch, online_status_watch, connectivity);
service.start()
});

Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/src/connectivity_service/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use tari_comms::{
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

use crate::connectivity_service::OnlineStatus;
use crate::connectivity_service::{BaseNodePeerManager, OnlineStatus};

#[async_trait::async_trait]
pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {
fn set_base_node(&mut self, base_node_peer: Peer);
fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager);

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>>;
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<BaseNodePeerManager>>;

/// Obtain a BaseNodeWalletRpcClient.
///
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey>;

fn get_current_base_node_id(&self) -> Option<NodeId>;
fn get_current_base_node_peer_node_id(&self) -> Option<NodeId>;

fn is_base_node_set(&self) -> bool;
}
Loading

0 comments on commit 6b5ef7d

Please sign in to comment.