Skip to content

Commit

Permalink
Add resiliance to initial wallet connect
Browse files Browse the repository at this point in the history
Added resiliance to initial wallet connect, where the wallet will try all base
nodes or all seeds it knows of and not only one it chose at random.
  • Loading branch information
hansieodendaal committed Sep 18, 2024
1 parent 4763579 commit a5d8073
Show file tree
Hide file tree
Showing 29 changed files with 370 additions and 124 deletions.
11 changes: 6 additions & 5 deletions applications/minotari_console_wallet/src/automation/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn set_base_node_peer(
println!("Setting base node peer...");
println!("{}::{}", public_key, address);
wallet
.set_base_node_peer(public_key.clone(), Some(address.clone()))
.set_base_node_peer(public_key.clone(), Some(address.clone()), None)
.await?;
Ok((public_key, address))
}
Expand Down Expand Up @@ -1885,17 +1885,18 @@ pub async fn command_runner(

let peer_config = PeerConfig::new(selected_base_node, base_node_peers, peer_seeds);

let base_node = peer_config
.get_base_node_peer()
let base_nodes = peer_config
.get_base_node_peers()
.map_err(|e| CommandError::General(e.to_string()))?;
new_wallet
.set_base_node_peer(
base_node.public_key.clone(),
base_nodes[0].public_key.clone(),
Some(
base_node
base_nodes[0]
.last_address_used()
.ok_or(CommandError::General("No address found".to_string()))?,
),
Some(base_nodes),
)
.await
.map_err(|e| CommandError::General(e.to_string()))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl wallet_server::Wallet for WalletGrpcServer {
println!("{}::{}", public_key, net_address);
let mut wallet = self.wallet.clone();
wallet
.set_base_node_peer(public_key.clone(), Some(net_address.clone()))
.set_base_node_peer(public_key.clone(), Some(net_address.clone()), None)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?;

Expand Down
24 changes: 21 additions & 3 deletions applications/minotari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant};

use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled};
use digest::crypto_common::rand_core::OsRng;
use log::*;
use minotari_app_utilities::{consts, identity_management::setup_node_identity};
#[cfg(feature = "ledger")]
Expand All @@ -41,6 +42,7 @@ use minotari_wallet::{
WalletConfig,
WalletSqlite,
};
use rand::prelude::SliceRandom;
use rpassword::prompt_password_stdout;
use rustyline::Editor;
use tari_common::{
Expand Down Expand Up @@ -570,18 +572,34 @@ fn setup_identity_from_db<D: WalletBackend + 'static>(
/// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols.
pub async fn start_wallet(
wallet: &mut WalletSqlite,
base_node: &Peer,
base_nodes: &[Peer],
wallet_mode: &WalletMode,
) -> Result<(), ExitError> {
debug!(target: LOG_TARGET, "Setting base node peer");

let net_address = base_node
if base_nodes.is_empty() {
return Err(ExitError::new(
ExitCode::WalletError,
"No base nodes configured to connect to",
));
}
let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty");
let net_address = selected_base_node
.addresses
.best()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?;
let backup_peers = base_nodes
.iter()
.filter(|&v| v != selected_base_node)
.cloned()
.collect::<Vec<_>>();

wallet
.set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone()))
.set_base_node_peer(
selected_base_node.public_key.clone(),
Some(net_address.address().clone()),
Some(backup_peers),
)
.await
.map_err(|e| {
ExitError::new(
Expand Down
4 changes: 2 additions & 2 deletions applications/minotari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ pub fn run_wallet_with_cli(
&mut wallet,
cli.non_interactive_mode,
))?;
let base_node_selected = base_node_config.get_base_node_peer()?;
let base_nodes_peers = base_node_config.get_base_node_peers()?;

let wallet_mode = wallet_mode(&cli, boot_mode);

// start wallet
runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?;
runtime.block_on(start_wallet(&mut wallet, &base_nodes_peers, &wallet_mode))?;

debug!(target: LOG_TARGET, "Starting app");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand All @@ -1061,6 +1062,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand Down Expand Up @@ -1099,6 +1101,7 @@ impl AppStateInner {
.set_base_node_peer(
previous.public_key.clone(),
Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl WalletEventMonitor {
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer).await;
self.trigger_base_node_peer_refresh(peer.get_current_peer()).await;
self.trigger_balance_refresh();
}
}
Expand Down
21 changes: 6 additions & 15 deletions applications/minotari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,15 @@ impl PeerConfig {

/// Get the prioritised base node peer from the PeerConfig.
/// 1. Custom Base Node
/// 2. First configured Base Node Peer
/// 3. Random configured Peer Seed
pub fn get_base_node_peer(&self) -> Result<Peer, ExitError> {
/// 2. All configured Base Node Peers (a random node will be prioritised)
/// 3. All configured Peer Seeds (a random node will be prioritised)
pub fn get_base_node_peers(&self) -> Result<Vec<Peer>, ExitError> {
if let Some(base_node) = self.base_node_custom.clone() {
Ok(base_node)
Ok(vec![base_node])
} else if !self.base_node_peers.is_empty() {
Ok(self
.base_node_peers
.first()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))?
.clone())
Ok(self.base_node_peers.clone())
} else if !self.peer_seeds.is_empty() {
// pick a random peer seed
Ok(self
.peer_seeds
.choose(&mut OsRng)
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))?
.clone())
Ok(self.peer_seeds.clone())
} else {
Err(ExitError::new(
ExitCode::ConfigError,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
timer.elapsed().as_millis()
);

let base_node_id = match self.wallet_connectivity.get_current_base_node_id() {
let base_node_id = match self.wallet_connectivity.get_current_base_node_peer_node_id() {
Some(n) => n,
None => continue,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::Display;

use tari_comms::peer_manager::Peer;
use tari_utilities::hex::Hex;

use crate::connectivity_service::WalletConnectivityError;

/// The selected peer is a current base node and an optional list of backup peers.
#[derive(Clone)]
pub struct BaseNodePeerManager {
// The current base node that the wallet is connected to
current_peer_index: usize,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Vec<Peer>,
}

impl BaseNodePeerManager {
/// Create a new BaseNodePeerManager, with the preferred peer index and a list of peers.
pub fn new(preferred_peer_index: usize, peer_list: Vec<Peer>) -> Result<Self, WalletConnectivityError> {
if preferred_peer_index >= peer_list.len() {
return Err(WalletConnectivityError::PeerIndexOutOfBounds(format!(
"Preferred index: {}, Max index: {}",
preferred_peer_index,
peer_list.len() - 1
)));
}
Ok(Self {
current_peer_index: preferred_peer_index,
peer_list,
})
}

/// Get the current peer
pub fn get_current_peer(&self) -> Peer {
self.peer_list
.get(self.current_peer_index)
.cloned()
.unwrap_or(self.peer_list[0].clone())
}

/// Get the next peer in the list
pub fn get_next_peer(&mut self) -> Peer {
self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len();
self.peer_list[self.current_peer_index].clone()
}
}

impl Display for BaseNodePeerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BaseNodePeerManager {{ current_peer_index: {}, peer_list: {:?} }}",
self.current_peer_index,
self.peer_list
.iter()
.map(|p| (p.node_id.to_hex(), p.public_key.to_hex()))
.collect::<Vec<_>>()
)
}
}
2 changes: 2 additions & 0 deletions base_layer/wallet/src/connectivity_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum WalletConnectivityError {
ConnectivityError(#[from] ConnectivityError),
#[error("Service is terminated and can no longer response to requests")]
ServiceTerminated,
#[error("Preferred peer index is out of bounds: {0}")]
PeerIndexOutOfBounds(String),
}

impl From<mpsc::SendError> for WalletConnectivityError {
Expand Down
34 changes: 23 additions & 11 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync
use tokio::sync::{mpsc, oneshot, watch};

use super::service::OnlineStatus;
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use crate::{
connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface},
util::watch::Watch,
};

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
Expand All @@ -39,14 +42,14 @@ pub enum WalletConnectivityRequest {
#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -59,16 +62,16 @@ impl WalletConnectivityHandle {

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
if let Some(peer) = self.base_node_watch.borrow().as_ref() {
if peer.public_key == base_node_peer.public_key {
fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) {
if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().public_key == base_node_peer.get_current_peer().public_key {
return;
}
}
self.base_node_watch.send(Some(base_node_peer));
}

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<BaseNodePeerManager>> {
self.base_node_watch.get_receiver()
}

Expand Down Expand Up @@ -120,15 +123,24 @@ impl WalletConnectivityInterface for WalletConnectivityHandle {
}

fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().clone())
}

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
fn get_current_base_node_peer_node_id(&self) -> Option<NodeId> {
self.base_node_watch
.borrow()
.as_ref()
.map(|p| p.get_current_peer().node_id.clone())
}

fn is_base_node_set(&self) -> bool {
Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/src/connectivity_service/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use tari_comms::{
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

use crate::connectivity_service::OnlineStatus;
use crate::connectivity_service::{BaseNodePeerManager, OnlineStatus};

#[async_trait::async_trait]
pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {
fn set_base_node(&mut self, base_node_peer: Peer);
fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager);

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>>;
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<BaseNodePeerManager>>;

/// Obtain a BaseNodeWalletRpcClient.
///
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey>;

fn get_current_base_node_id(&self) -> Option<NodeId>;
fn get_current_base_node_peer_node_id(&self) -> Option<NodeId>;

fn is_base_node_set(&self) -> bool;
}
Loading

0 comments on commit a5d8073

Please sign in to comment.