From 41fc4449e43ec83844a305ae44c12d1e62c92562 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 30 Aug 2024 16:43:28 +0200 Subject: [PATCH] Add resiliance to initial wallet connect Added resiliance to initial wallet connect, where the wallet will try all base nodes or all seeds it knows of and not only one it chose at random. --- .../src/automation/commands.rs | 11 +-- .../src/grpc/wallet_grpc_server.rs | 2 +- .../minotari_console_wallet/src/init/mod.rs | 24 +++++- .../minotari_console_wallet/src/lib.rs | 4 +- .../src/ui/state/app_state.rs | 3 + .../src/ui/state/wallet_event_monitor.rs | 2 +- .../src/wallet_modes.rs | 21 ++--- .../wallet/src/base_node_service/monitor.rs | 2 +- .../base_node_peer_manager.rs | 82 +++++++++++++++++++ .../wallet/src/connectivity_service/error.rs | 2 + .../wallet/src/connectivity_service/handle.rs | 34 +++++--- .../src/connectivity_service/interface.rs | 8 +- .../wallet/src/connectivity_service/mock.rs | 29 ++++--- .../wallet/src/connectivity_service/mod.rs | 4 + .../src/connectivity_service/service.rs | 48 ++++++++--- .../wallet/src/connectivity_service/test.rs | 77 ++++++++++++++--- base_layer/wallet/src/error.rs | 3 + .../src/output_manager_service/service.rs | 4 +- .../tasks/txo_validation_task.rs | 8 +- .../transaction_broadcast_protocol.rs | 11 ++- .../wallet/src/transaction_service/service.rs | 6 +- .../src/utxo_scanner_service/service.rs | 12 +-- .../utxo_scanner_service/utxo_scanner_task.rs | 2 +- base_layer/wallet/src/wallet.rs | 29 +++++-- .../output_manager_service_tests/service.rs | 5 +- .../transaction_service_tests/service.rs | 25 +++--- .../transaction_protocols.rs | 23 ++++-- base_layer/wallet_ffi/src/lib.rs | 9 +- .../test_utils/mocks/connectivity_manager.rs | 4 + 29 files changed, 370 insertions(+), 124 deletions(-) create mode 100644 base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs diff --git a/applications/minotari_console_wallet/src/automation/commands.rs b/applications/minotari_console_wallet/src/automation/commands.rs index 4e27f6d711..960cbf5873 100644 --- a/applications/minotari_console_wallet/src/automation/commands.rs +++ b/applications/minotari_console_wallet/src/automation/commands.rs @@ -399,7 +399,7 @@ async fn set_base_node_peer( println!("Setting base node peer..."); println!("{}::{}", public_key, address); wallet - .set_base_node_peer(public_key.clone(), Some(address.clone())) + .set_base_node_peer(public_key.clone(), Some(address.clone()), None) .await?; Ok((public_key, address)) } @@ -1885,17 +1885,18 @@ pub async fn command_runner( let peer_config = PeerConfig::new(selected_base_node, base_node_peers, peer_seeds); - let base_node = peer_config - .get_base_node_peer() + let base_nodes = peer_config + .get_base_node_peers() .map_err(|e| CommandError::General(e.to_string()))?; new_wallet .set_base_node_peer( - base_node.public_key.clone(), + base_nodes[0].public_key.clone(), Some( - base_node + base_nodes[0] .last_address_used() .ok_or(CommandError::General("No address found".to_string()))?, ), + Some(base_nodes), ) .await .map_err(|e| CommandError::General(e.to_string()))?; diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index 7c4909b74f..178e2476c2 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -257,7 +257,7 @@ impl wallet_server::Wallet for WalletGrpcServer { println!("{}::{}", public_key, net_address); let mut wallet = self.wallet.clone(); wallet - .set_base_node_peer(public_key.clone(), Some(net_address.clone())) + .set_base_node_peer(public_key.clone(), Some(net_address.clone()), None) .await .map_err(|e| Status::internal(format!("{:?}", e)))?; diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index f8c04058a9..bb396e6ec7 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -25,6 +25,7 @@ use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant}; use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled}; +use digest::crypto_common::rand_core::OsRng; use log::*; use minotari_app_utilities::{consts, identity_management::setup_node_identity}; #[cfg(feature = "ledger")] @@ -41,6 +42,7 @@ use minotari_wallet::{ WalletConfig, WalletSqlite, }; +use rand::prelude::SliceRandom; use rpassword::prompt_password_stdout; use rustyline::Editor; use tari_common::{ @@ -570,18 +572,34 @@ fn setup_identity_from_db( /// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols. pub async fn start_wallet( wallet: &mut WalletSqlite, - base_node: &Peer, + base_nodes: &[Peer], wallet_mode: &WalletMode, ) -> Result<(), ExitError> { debug!(target: LOG_TARGET, "Setting base node peer"); - let net_address = base_node + if base_nodes.is_empty() { + return Err(ExitError::new( + ExitCode::WalletError, + "No base nodes configured to connect to", + )); + } + let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty"); + let net_address = selected_base_node .addresses .best() .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?; + let backup_peers = base_nodes + .iter() + .filter(|&v| v != selected_base_node) + .cloned() + .collect::>(); wallet - .set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone())) + .set_base_node_peer( + selected_base_node.public_key.clone(), + Some(net_address.address().clone()), + Some(backup_peers), + ) .await .map_err(|e| { ExitError::new( diff --git a/applications/minotari_console_wallet/src/lib.rs b/applications/minotari_console_wallet/src/lib.rs index 7c11e535b2..fab816ddcb 100644 --- a/applications/minotari_console_wallet/src/lib.rs +++ b/applications/minotari_console_wallet/src/lib.rs @@ -229,12 +229,12 @@ pub fn run_wallet_with_cli( &mut wallet, cli.non_interactive_mode, ))?; - let base_node_selected = base_node_config.get_base_node_peer()?; + let base_nodes_peers = base_node_config.get_base_node_peers()?; let wallet_mode = wallet_mode(&cli, boot_mode); // start wallet - runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?; + runtime.block_on(start_wallet(&mut wallet, &base_nodes_peers, &wallet_mode))?; debug!(target: LOG_TARGET, "Starting app"); diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index 686648a0ed..9fe1cfb3d5 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -1036,6 +1036,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; @@ -1061,6 +1062,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; @@ -1099,6 +1101,7 @@ impl AppStateInner { .set_base_node_peer( previous.public_key.clone(), Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; diff --git a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs index 7069102107..3d08315bf0 100644 --- a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -244,7 +244,7 @@ impl WalletEventMonitor { _ = base_node_changed.changed() => { let peer = base_node_changed.borrow().as_ref().cloned(); if let Some(peer) = peer { - self.trigger_base_node_peer_refresh(peer).await; + self.trigger_base_node_peer_refresh(peer.get_current_peer()).await; self.trigger_balance_refresh(); } } diff --git a/applications/minotari_console_wallet/src/wallet_modes.rs b/applications/minotari_console_wallet/src/wallet_modes.rs index eb6281eba8..c28da1add4 100644 --- a/applications/minotari_console_wallet/src/wallet_modes.rs +++ b/applications/minotari_console_wallet/src/wallet_modes.rs @@ -89,24 +89,15 @@ impl PeerConfig { /// Get the prioritised base node peer from the PeerConfig. /// 1. Custom Base Node - /// 2. First configured Base Node Peer - /// 3. Random configured Peer Seed - pub fn get_base_node_peer(&self) -> Result { + /// 2. All configured Base Node Peers (a random node will be prioritised) + /// 3. All configured Peer Seeds (a random node will be prioritised) + pub fn get_base_node_peers(&self) -> Result, ExitError> { if let Some(base_node) = self.base_node_custom.clone() { - Ok(base_node) + Ok(vec![base_node]) } else if !self.base_node_peers.is_empty() { - Ok(self - .base_node_peers - .first() - .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))? - .clone()) + Ok(self.base_node_peers.clone()) } else if !self.peer_seeds.is_empty() { - // pick a random peer seed - Ok(self - .peer_seeds - .choose(&mut OsRng) - .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))? - .clone()) + Ok(self.peer_seeds.clone()) } else { Err(ExitError::new( ExitCode::ConfigError, diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 0acccd01e7..c75c744d54 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -136,7 +136,7 @@ where timer.elapsed().as_millis() ); - let base_node_id = match self.wallet_connectivity.get_current_base_node_id() { + let base_node_id = match self.wallet_connectivity.get_current_base_node_peer_node_id() { Some(n) => n, None => continue, }; diff --git a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs new file mode 100644 index 0000000000..2a125f25a3 --- /dev/null +++ b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs @@ -0,0 +1,82 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::fmt::Display; + +use tari_comms::peer_manager::Peer; +use tari_utilities::hex::Hex; + +use crate::connectivity_service::WalletConnectivityError; + +/// The selected peer is a current base node and an optional list of backup peers. +#[derive(Clone)] +pub struct BaseNodePeerManager { + // The current base node that the wallet is connected to + current_peer_index: usize, + // The other base nodes that the wallet can connect to if the selected peer is not available + peer_list: Vec, +} + +impl BaseNodePeerManager { + /// Create a new BaseNodePeerManager, with the preferred peer index and a list of peers. + pub fn new(preferred_peer_index: usize, peer_list: Vec) -> Result { + if preferred_peer_index >= peer_list.len() { + return Err(WalletConnectivityError::PeerIndexOutOfBounds(format!( + "Preferred index: {}, Max index: {}", + preferred_peer_index, + peer_list.len() - 1 + ))); + } + Ok(Self { + current_peer_index: preferred_peer_index, + peer_list, + }) + } + + /// Get the current peer + pub fn get_current_peer(&self) -> Peer { + self.peer_list + .get(self.current_peer_index) + .cloned() + .unwrap_or(self.peer_list[0].clone()) + } + + /// Get the next peer in the list + pub fn get_next_peer(&mut self) -> Peer { + self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len(); + self.peer_list[self.current_peer_index].clone() + } +} + +impl Display for BaseNodePeerManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BaseNodePeerManager {{ current_peer_index: {}, peer_list: {:?} }}", + self.current_peer_index, + self.peer_list + .iter() + .map(|p| (p.node_id.to_hex(), p.public_key.to_hex())) + .collect::>() + ) + } +} diff --git a/base_layer/wallet/src/connectivity_service/error.rs b/base_layer/wallet/src/connectivity_service/error.rs index a151003faf..33bc788070 100644 --- a/base_layer/wallet/src/connectivity_service/error.rs +++ b/base_layer/wallet/src/connectivity_service/error.rs @@ -31,6 +31,8 @@ pub enum WalletConnectivityError { ConnectivityError(#[from] ConnectivityError), #[error("Service is terminated and can no longer response to requests")] ServiceTerminated, + #[error("Preferred peer index is out of bounds: {0}")] + PeerIndexOutOfBounds(String), } impl From for WalletConnectivityError { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 94bdde2cb7..af9bb72fc4 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -29,7 +29,10 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync use tokio::sync::{mpsc, oneshot, watch}; use super::service::OnlineStatus; -use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch}; +use crate::{ + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, + util::watch::Watch, +}; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), @@ -39,14 +42,14 @@ pub enum WalletConnectivityRequest { #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, ) -> Self { Self { @@ -59,16 +62,16 @@ impl WalletConnectivityHandle { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityHandle { - fn set_base_node(&mut self, base_node_peer: Peer) { - if let Some(peer) = self.base_node_watch.borrow().as_ref() { - if peer.public_key == base_node_peer.public_key { + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) { + if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() { + if selected_peer.get_current_peer().public_key == base_node_peer.get_current_peer().public_key { return; } } self.base_node_watch.send(Some(base_node_peer)); } - fn get_current_base_node_watcher(&self) -> watch::Receiver> { + fn get_current_base_node_watcher(&self) -> watch::Receiver> { self.base_node_watch.get_receiver() } @@ -120,15 +123,24 @@ impl WalletConnectivityInterface for WalletConnectivityHandle { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().clone() + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().public_key.clone()) } - fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + fn get_current_base_node_peer_node_id(&self) -> Option { + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/interface.rs b/base_layer/wallet/src/connectivity_service/interface.rs index 8a5527ce7c..e0a3639b2f 100644 --- a/base_layer/wallet/src/connectivity_service/interface.rs +++ b/base_layer/wallet/src/connectivity_service/interface.rs @@ -30,13 +30,13 @@ use tari_comms::{ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::sync::watch; -use crate::connectivity_service::OnlineStatus; +use crate::connectivity_service::{BaseNodePeerManager, OnlineStatus}; #[async_trait::async_trait] pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { - fn set_base_node(&mut self, base_node_peer: Peer); + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager); - fn get_current_base_node_watcher(&self) -> watch::Receiver>; + fn get_current_base_node_watcher(&self) -> watch::Receiver>; /// Obtain a BaseNodeWalletRpcClient. /// @@ -72,7 +72,7 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { fn get_current_base_node_peer_public_key(&self) -> Option; - fn get_current_base_node_id(&self) -> Option; + fn get_current_base_node_peer_node_id(&self) -> Option; fn is_base_node_set(&self) -> bool; } diff --git a/base_layer/wallet/src/connectivity_service/mock.rs b/base_layer/wallet/src/connectivity_service/mock.rs index 11228b6661..f61625ec32 100644 --- a/base_layer/wallet/src/connectivity_service/mock.rs +++ b/base_layer/wallet/src/connectivity_service/mock.rs @@ -29,7 +29,7 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync use tokio::sync::watch::Receiver; use crate::{ - connectivity_service::{OnlineStatus, WalletConnectivityInterface}, + connectivity_service::{BaseNodePeerManager, OnlineStatus, WalletConnectivityInterface}, util::watch::Watch, }; @@ -40,7 +40,7 @@ pub fn create() -> WalletConnectivityMock { #[derive(Clone)] pub struct WalletConnectivityMock { online_status_watch: Watch, - base_node_watch: Watch>, + base_node_watch: Watch>, base_node_wallet_rpc_client: Watch>>, base_node_sync_rpc_client: Watch>>, } @@ -65,11 +65,11 @@ impl WalletConnectivityMock { self.base_node_sync_rpc_client.send(Some(RpcClientLease::new(client))); } - pub fn notify_base_node_set(&self, base_node_peer: Peer) { + pub fn notify_base_node_set(&self, base_node_peer: BaseNodePeerManager) { self.base_node_watch.send(Some(base_node_peer)); } - pub async fn base_node_changed(&mut self) -> Option { + pub async fn base_node_changed(&mut self) -> Option { self.base_node_watch.changed().await; self.base_node_watch.borrow().as_ref().cloned() } @@ -82,11 +82,11 @@ impl WalletConnectivityMock { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityMock { - fn set_base_node(&mut self, base_node_peer: Peer) { + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) { self.notify_base_node_set(base_node_peer); } - fn get_current_base_node_watcher(&self) -> Receiver> { + fn get_current_base_node_watcher(&self) -> Receiver> { self.base_node_watch.get_receiver() } @@ -121,15 +121,24 @@ impl WalletConnectivityInterface for WalletConnectivityMock { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().as_ref().cloned() + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().public_key.clone()) } - fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + fn get_current_base_node_peer_node_id(&self) -> Option { + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/mod.rs b/base_layer/wallet/src/connectivity_service/mod.rs index 75b99ed330..c425953332 100644 --- a/base_layer/wallet/src/connectivity_service/mod.rs +++ b/base_layer/wallet/src/connectivity_service/mod.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod error; + pub use error::WalletConnectivityError; mod handle; @@ -40,3 +41,6 @@ pub use mock::{create as create_wallet_connectivity_mock, WalletConnectivityMock mod interface; pub use interface::WalletConnectivityInterface; + +mod base_node_peer_manager; +pub use base_node_peer_manager::BaseNodePeerManager; diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index f880739bfb..57e712de24 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -25,7 +25,7 @@ use std::{mem, time::Duration}; use log::*; use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester}, - peer_manager::{NodeId, Peer}, + peer_manager::NodeId, protocol::rpc::{RpcClientLease, RpcClientPool}, Minimized, PeerConnection, @@ -39,12 +39,13 @@ use tokio::{ use crate::{ base_node_service::config::BaseNodeServiceConfig, - connectivity_service::{error::WalletConnectivityError, handle::WalletConnectivityRequest}, + connectivity_service::{error::WalletConnectivityError, handle::WalletConnectivityRequest, BaseNodePeerManager}, util::watch::Watch, }; const LOG_TARGET: &str = "wallet::connectivity"; -const CONNECTIVITY_WAIT: u64 = 5; +pub(crate) const CONNECTIVITY_WAIT: u64 = 5; +pub(crate) const CONNECTIVITY_RETRIES: usize = 3; /// Connection status of the Base Node #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -58,7 +59,7 @@ pub struct WalletConnectivityService { config: BaseNodeServiceConfig, request_receiver: mpsc::Receiver, connectivity: ConnectivityRequester, - base_node_watch: watch::Receiver>, + base_node_watch: watch::Receiver>, pools: Option, online_status_watch: Watch, pending_requests: Vec, @@ -73,7 +74,7 @@ impl WalletConnectivityService { pub(super) fn new( config: BaseNodeServiceConfig, request_receiver: mpsc::Receiver, - base_node_watch: watch::Receiver>, + base_node_watch: watch::Receiver>, online_status_watch: Watch, connectivity: ConnectivityRequester, ) -> Self { @@ -221,7 +222,14 @@ impl WalletConnectivityService { } fn current_base_node(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) + } + + fn get_base_node_peer_manager(&self) -> Option { + self.base_node_watch.borrow().as_ref().map(|p| p.clone()) } async fn disconnect_base_node(&mut self, node_id: NodeId) { @@ -235,15 +243,31 @@ impl WalletConnectivityService { } async fn setup_base_node_connection(&mut self) { + let mut count = 0; self.pools = None; + let mut peer_manager = if let Some(val) = self.get_base_node_peer_manager() { + val + } else { + self.set_online_status(OnlineStatus::Offline); + return; + }; + trace!(target: LOG_TARGET, "Setup base node connection to: {}", peer_manager); loop { - let node_id = match self.current_base_node() { - Some(n) => n, - None => { - self.set_online_status(OnlineStatus::Offline); - return; - }, + let node_id; + if count < CONNECTIVITY_RETRIES { + match self.current_base_node() { + Some(n) => node_id = n, + None => { + self.set_online_status(OnlineStatus::Offline); + return; + }, + }; + } else { + node_id = peer_manager.get_next_peer().node_id; + count = 0; }; + count += 1; + debug!( target: LOG_TARGET, "Attempting to connect to base node peer {}...", node_id diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index a0da32e40d..2b4a7e7d47 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -21,11 +21,11 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use core::convert; -use std::{iter, sync::Arc}; +use std::{iter, sync::Arc, time::Duration}; use futures::future; use tari_comms::{ - peer_manager::PeerFeatures, + peer_manager::{NodeId, PeerFeatures}, protocol::rpc::{ mock::{MockRpcImpl, MockRpcServer}, RpcPoolClient, @@ -41,11 +41,12 @@ use tari_test_utils::runtime::spawn_until_shutdown; use tokio::{ sync::{mpsc, Barrier}, task, + time::{sleep, timeout}, }; -use super::service::WalletConnectivityService; +use super::service::{WalletConnectivityService, CONNECTIVITY_RETRIES, CONNECTIVITY_WAIT}; use crate::{ - connectivity_service::{OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, + connectivity_service::{BaseNodePeerManager, OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, util::watch::Watch, }; @@ -87,7 +88,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Set the mock to defer returning a result for the peer connection mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Wait for connection request mock_state.await_call_count(1).await; @@ -110,7 +111,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); @@ -142,7 +143,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.add_active_connection(conn2).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer1.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer1.to_peer()]).unwrap()); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; @@ -153,7 +154,7 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); // Initiate a connection to the base node - handle.set_base_node(base_node_peer2.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer2.to_peer()]).unwrap()); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; @@ -162,6 +163,62 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); } +async fn wait_for_peers_to_be_dialed( + mock_state: &ConnectivityManagerMockState, + peers: &[&NodeId], + timeout_duration: Duration, +) { + let check_interval = Duration::from_millis(100); // Interval to check the condition + + let result = timeout(timeout_duration, async { + loop { + let all_dialed = futures::future::join_all(peers.iter().map(|peer| mock_state.is_peer_dialed(peer))).await; + if all_dialed.into_iter().all(|dialed| dialed) { + break; + } + sleep(check_interval).await; + } + }) + .await; + + if result.is_err() { + panic!("Timeout reached while waiting for all peers to be dialed"); + } +} + +#[tokio::test] +async fn it_changes_to_a_new_base_node_if_preferred_is_offline() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + let (mut handle, mock_server, mock_state, _shutdown) = setup().await; + let base_node_peer1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let base_node_peer2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let conn2 = mock_server.create_mockimpl_connection(base_node_peer2.to_peer()).await; + + mock_state.add_active_connection(conn2).await; + + // Initiate a connection to the base node + handle.set_base_node( + BaseNodePeerManager::new(0, vec![base_node_peer1.to_peer(), base_node_peer2.to_peer()]).unwrap(), + ); + + mock_state.await_call_count(2).await; + + wait_for_peers_to_be_dialed( + &mock_state, + &[base_node_peer1.node_id(), base_node_peer2.node_id()], + Duration::from_secs(2 * CONNECTIVITY_WAIT * CONNECTIVITY_RETRIES as u64), + ) + .await; + + assert!(mock_state.count_calls_containing("DialPeer").await >= 2); + let _result = mock_state.take_calls().await; + + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); + assert!(rpc_client.is_connected()); + + handle.get_current_base_node_peer().unwrap(); +} + #[tokio::test] async fn it_gracefully_handles_connect_fail_reconnect() { let (mut handle, mock_server, mock_state, _shutdown) = setup().await; @@ -172,7 +229,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; @@ -212,7 +269,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; diff --git a/base_layer/wallet/src/error.rs b/base_layer/wallet/src/error.rs index 1ce1cbed13..ccd5759734 100644 --- a/base_layer/wallet/src/error.rs +++ b/base_layer/wallet/src/error.rs @@ -41,6 +41,7 @@ use thiserror::Error; use crate::{ base_node_service::error::BaseNodeServiceError, + connectivity_service::WalletConnectivityError, output_manager_service::error::OutputManagerError, storage::database::DbKey, transaction_service::error::TransactionServiceError, @@ -103,6 +104,8 @@ pub enum WalletError { UnexpectedApiResponse { method: String, api: String }, #[error("Public address not set for this wallet")] PublicAddressNotSet, + #[error("Wallet connectivity error: `{0}`")] + WalletConnectivityError(#[from] WalletConnectivityError), } pub const LOG_TARGET: &str = "minotari::application"; diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 91d8901f94..7fe03bcbef 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -556,7 +556,7 @@ where let current_base_node = self .resources .connectivity - .get_current_base_node_id() + .get_current_base_node_peer_node_id() .ok_or(OutputManagerError::NoBaseNodeKeysProvided)?; let id = OsRng.next_u64(); let txo_validation = TxoValidationTask::new( @@ -634,7 +634,7 @@ where }, _ = base_node_watch.changed() => { if let Some(peer) = base_node_watch.borrow().as_ref() { - if peer.node_id != current_base_node { + if peer.get_current_peer().node_id != current_base_node { debug!( target: LOG_TARGET, "TXO Validation Protocol (Id: {}) cancelled because base node changed", id diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index 4abd7d683e..e77114317e 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -28,7 +28,7 @@ use std::{ use chrono::{Duration, Utc}; use log::*; use tari_common_types::types::{BlockHash, FixedHash}; -use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed}; +use tari_comms::protocol::rpc::RpcError::RequestFailed; use tari_core::{ base_node::rpc::BaseNodeWalletRpcClient, blocks::BlockHeader, @@ -38,7 +38,7 @@ use tari_utilities::hex::Hex; use tokio::sync::watch; use crate::{ - connectivity_service::WalletConnectivityInterface, + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, output_manager_service::{ config::OutputManagerServiceConfig, error::{OutputManagerError, OutputManagerProtocolError, OutputManagerProtocolErrorExt}, @@ -56,7 +56,7 @@ const LOG_TARGET: &str = "wallet::output_service::txo_validation_task"; pub struct TxoValidationTask { operation_id: u64, db: OutputManagerDatabase, - base_node_watch: watch::Receiver>, + base_node_watch: watch::Receiver>, connectivity: TWalletConnectivity, event_publisher: OutputManagerEventSender, config: OutputManagerServiceConfig, @@ -103,7 +103,7 @@ where .base_node_watch .borrow() .as_ref() - .map(|p| p.node_id.clone()) + .map(|p| p.get_current_peer().node_id.clone()) .ok_or_else(|| OutputManagerProtocolError::new(self.operation_id, OutputManagerError::BaseNodeChanged))?; debug!( target: LOG_TARGET, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index 4f4e32efb3..c0405c397f 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -136,10 +136,12 @@ where loop { tokio::select! { _ = current_base_node_watcher.changed() => { - if let Some(peer) = &*current_base_node_watcher.borrow() { + if let Some(selected_peer) = &*current_base_node_watcher.borrow() { info!( target: LOG_TARGET, - "Transaction Broadcast protocol (TxId: {}) Base Node Public key updated to {} (NodeID: {})", self.tx_id, peer.public_key, peer.node_id + "Transaction Broadcast protocol (TxId: {}) Base Node Public key updated to {} (NodeID: {})", + self.tx_id, selected_peer.get_current_peer().public_key, + selected_peer.get_current_peer().node_id, ); } self.last_rejection = None; @@ -154,7 +156,10 @@ where }, TxBroadcastMode::TransactionQuery => { if result? { - debug!(target: LOG_TARGET, "Transaction broadcast, transaction validation protocol will continue from here"); + debug!( + target: LOG_TARGET, + "Transaction broadcast, transaction validation protocol will continue from here" + ); return Ok(self.tx_id) } }, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 9cf8c18c85..390fb57376 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -3168,7 +3168,7 @@ where let current_base_node = self .resources .connectivity - .get_current_base_node_id() + .get_current_base_node_peer_node_id() .ok_or(TransactionServiceError::NoBaseNodeKeysProvided)?; trace!(target: LOG_TARGET, "Starting transaction validation protocol"); @@ -3200,8 +3200,8 @@ where return result; }, _ = base_node_watch.changed() => { - if let Some(peer) = base_node_watch.borrow().as_ref() { - if peer.node_id != current_base_node { + if let Some(selected_peer) = base_node_watch.borrow().as_ref() { + if selected_peer.get_current_peer().node_id != current_base_node { debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol"); return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged { task_name: "transaction validation_protocol", diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs index 4bfc99d0fe..f0f13e5134 100644 --- a/base_layer/wallet/src/utxo_scanner_service/service.rs +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -24,7 +24,7 @@ use chrono::NaiveDateTime; use futures::FutureExt; use log::*; use tari_common_types::{tari_address::TariAddress, types::HashOutput}; -use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey}; +use tari_comms::{connectivity::ConnectivityRequester, types::CommsPublicKey}; use tari_core::transactions::{tari_amount::MicroMinotari, CryptoFactories}; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{ @@ -34,7 +34,7 @@ use tokio::{ use crate::{ base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle}, - connectivity_service::WalletConnectivityInterface, + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, error::WalletError, output_manager_service::handle::OutputManagerHandle, storage::database::{WalletBackend, WalletDatabase}, @@ -161,9 +161,9 @@ where } _ = self.resources.current_base_node_watcher.changed() => { debug!(target: LOG_TARGET, "Base node change detected."); - let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); - if let Some(peer) = peer { - self.peer_seeds = vec![peer.public_key]; + let selected_peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); + if let Some(peer) = selected_peer { + self.peer_seeds = vec![peer.get_current_peer().public_key]; } local_shutdown.trigger(); }, @@ -190,7 +190,7 @@ pub struct UtxoScannerResources { pub db: WalletDatabase, pub comms_connectivity: ConnectivityRequester, pub wallet_connectivity: TWalletConnectivity, - pub current_base_node_watcher: watch::Receiver>, + pub current_base_node_watcher: watch::Receiver>, pub output_manager_service: OutputManagerHandle, pub transaction_service: TransactionServiceHandle, pub one_sided_tari_address: TariAddress, diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index 19187c61b2..a533e9f369 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -206,7 +206,7 @@ where async fn attempt_sync(&mut self, peer: NodeId) -> Result<(u64, u64, MicroMinotari, Duration), UtxoScannerError> { self.publish_event(UtxoScannerEvent::ConnectingToBaseNode(peer.clone())); - let selected_peer = self.resources.wallet_connectivity.get_current_base_node_id(); + let selected_peer = self.resources.wallet_connectivity.get_current_base_node_peer_node_id(); let mut client = if selected_peer.map(|p| p == peer).unwrap_or(false) { // Use the wallet connectivity service so that RPC pools are correctly managed diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2022b97de9..644aea0cd8 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -85,7 +85,12 @@ use tari_utilities::{hex::Hex, ByteArray}; use crate::{ base_node_service::{handle::BaseNodeServiceHandle, BaseNodeServiceInitializer}, config::WalletConfig, - connectivity_service::{WalletConnectivityHandle, WalletConnectivityInitializer, WalletConnectivityInterface}, + connectivity_service::{ + BaseNodePeerManager, + WalletConnectivityHandle, + WalletConnectivityInitializer, + WalletConnectivityInterface, + }, consts, error::{WalletError, WalletStorageError}, output_manager_service::{ @@ -381,13 +386,14 @@ where &mut self, public_key: CommsPublicKey, address: Option, + backup_peers: Option>, ) -> Result<(), WalletError> { info!( "Wallet setting base node peer, public key: {}, net address: {:?}.", public_key, address ); - if let Some(current_node) = self.wallet_connectivity.get_current_base_node_id() { + if let Some(current_node) = self.wallet_connectivity.get_current_base_node_peer_node_id() { self.comms .connectivity() .remove_peer_from_allow_list(current_node) @@ -396,6 +402,7 @@ where let peer_manager = self.comms.peer_manager(); let mut connectivity = self.comms.connectivity(); + let mut backup_peers = backup_peers.unwrap_or_default(); if let Some(mut current_peer) = peer_manager.find_by_public_key(&public_key).await? { // Only invalidate the identity signature if addresses are different if address.is_some() { @@ -415,7 +422,13 @@ where connectivity .add_peer_to_allow_list(current_peer.node_id.clone()) .await?; - self.wallet_connectivity.set_base_node(current_peer); + let mut peer_list = vec![current_peer]; + if let Some(pos) = backup_peers.iter().position(|p| p.public_key == public_key) { + backup_peers.remove(pos); + } + peer_list.append(&mut backup_peers); + self.wallet_connectivity + .set_base_node(BaseNodePeerManager::new(0, peer_list)?); } else { let node_id = NodeId::from_key(&public_key); if address.is_none() { @@ -430,7 +443,7 @@ where }); } let peer = Peer::new( - public_key, + public_key.clone(), node_id, MultiaddressesWithStats::from_addresses_with_source(vec![address.unwrap()], &PeerAddressSource::Config), PeerFlags::empty(), @@ -440,7 +453,13 @@ where ); peer_manager.add_peer(peer.clone()).await?; connectivity.add_peer_to_allow_list(peer.node_id.clone()).await?; - self.wallet_connectivity.set_base_node(peer); + let mut peer_list = vec![peer]; + if let Some(pos) = backup_peers.iter().position(|p| p.public_key == public_key) { + backup_peers.remove(pos); + } + peer_list.append(&mut backup_peers); + self.wallet_connectivity + .set_base_node(BaseNodePeerManager::new(0, peer_list)?); } Ok(()) diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index a11bf6b820..25ce1ea3dc 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use minotari_wallet::{ base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle}, - connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityMock}, + connectivity_service::{create_wallet_connectivity_mock, BaseNodePeerManager, WalletConnectivityMock}, output_manager_service::{ config::OutputManagerServiceConfig, error::{OutputManagerError, OutputManagerStorageError}, @@ -135,7 +135,8 @@ async fn setup_output_manager_service( let mut wallet_connectivity_mock = create_wallet_connectivity_mock(); let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE); - wallet_connectivity_mock.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity_mock + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); wallet_connectivity_mock.base_node_changed().await; let service = BaseNodeWalletRpcMockService::new(); diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 486d714c72..f83241725e 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -41,6 +41,7 @@ use minotari_wallet::{ base_node_service::{config::BaseNodeServiceConfig, handle::BaseNodeServiceHandle, BaseNodeServiceInitializer}, connectivity_service::{ create_wallet_connectivity_mock, + BaseNodePeerManager, WalletConnectivityHandle, WalletConnectivityInitializer, WalletConnectivityInterface, @@ -361,7 +362,7 @@ async fn setup_transaction_service_no_comms( wallet_connectivity_service_mock .set_base_node_wallet_rpc_client(connect_rpc_client(&mut rpc_server_connection).await); - wallet_connectivity_service_mock.set_base_node(node_identity.to_peer()); + wallet_connectivity_service_mock.set_base_node(BaseNodePeerManager::new(0, vec![node_identity.to_peer()]).unwrap()); wallet_connectivity_service_mock.base_node_changed().await; let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap(); @@ -3066,11 +3067,13 @@ async fn test_power_mode_updates() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_ts_interface.base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap()); alice_ts_interface .wallet_connectivity_service_mock - .notify_base_node_set(alice_ts_interface.base_node_identity.to_peer()); + .notify_base_node_set( + BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap(), + ); alice_ts_interface .base_node_rpc_mock_state @@ -4287,7 +4290,7 @@ async fn test_restarting_transaction_protocols() { bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![base_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle .restart_transaction_protocols() @@ -4327,7 +4330,7 @@ async fn test_restarting_transaction_protocols() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![base_node_identity.to_peer()]).unwrap()); assert!(alice_ts_interface .transaction_service_handle @@ -4685,7 +4688,7 @@ async fn test_resend_on_startup() { // Need to set something for alices base node, doesn't matter what alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(alice_ts_interface .transaction_service_handle @@ -4735,7 +4738,7 @@ async fn test_resend_on_startup() { // Need to set something for alices base node, doesn't matter what alice2_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(alice2_ts_interface .transaction_service_handle @@ -4819,7 +4822,7 @@ async fn test_resend_on_startup() { // Need to set something for bobs base node, doesn't matter what bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle @@ -4866,7 +4869,7 @@ async fn test_resend_on_startup() { // Need to set something for bobs base node, doesn't matter what bob2_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(bob2_ts_interface .transaction_service_handle @@ -5218,7 +5221,7 @@ async fn test_transaction_timeout_cancellation() { // Need to set something for bobs base node, doesn't matter what bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(bob_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![bob_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle .restart_broadcast_protocols() @@ -5324,7 +5327,7 @@ async fn transaction_service_tx_broadcast() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_ts_interface.base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap()); let connection2 = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection2, None).await; diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index 867eab6688..3a077193a7 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -26,7 +26,7 @@ use chacha20poly1305::{Key, KeyInit, XChaCha20Poly1305}; use chrono::Utc; use futures::StreamExt; use minotari_wallet::{ - connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityMock}, + connectivity_service::{create_wallet_connectivity_mock, BaseNodePeerManager, WalletConnectivityMock}, output_manager_service::{ error::OutputManagerError, handle::{OutputManagerHandle, OutputManagerRequest, OutputManagerResponse}, @@ -277,7 +277,8 @@ async fn tx_broadcast_protocol_submit_success() { ) = setup().await; let mut event_stream = resources.event_publisher.subscribe(); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -363,7 +364,8 @@ async fn tx_broadcast_protocol_submit_rejection() { add_transaction_to_database(1u64.into(), 1 * T, None, resources.db.clone()).await; let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -445,7 +447,8 @@ async fn tx_broadcast_protocol_restart_protocol_as_query() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server @@ -530,7 +533,8 @@ async fn tx_broadcast_protocol_submit_success_followed_by_rejection() { resources.config.broadcast_monitoring_timeout = Duration::from_secs(60); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server @@ -624,7 +628,8 @@ async fn tx_broadcast_protocol_submit_already_mined() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -695,7 +700,8 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -739,7 +745,8 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { }); // Change Base Node - wallet_connectivity.notify_base_node_set(new_server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![new_server_node_identity.to_peer()]).unwrap()); // Wait for 1 query let _schnorr_signatures = new_rpc_service_state diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 421d47486c..9f459f5647 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -6862,10 +6862,11 @@ pub unsafe extern "C" fn wallet_set_base_node_peer( } }; - if let Err(e) = (*wallet) - .runtime - .block_on((*wallet).wallet.set_base_node_peer((*public_key).clone(), parsed_addr)) - { + if let Err(e) = (*wallet).runtime.block_on((*wallet).wallet.set_base_node_peer( + (*public_key).clone(), + parsed_addr, + None, + )) { error = LibWalletError::from(e).code; ptr::swap(error_out, &mut error as *mut c_int); return false; diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index 52e88f60b9..b66cdaa523 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -148,6 +148,10 @@ impl ConnectivityManagerMockState { assert!(is_found, "expected call to dial peer {} but no dial was found", peer); } + pub async fn is_peer_dialed(&self, peer: &NodeId) -> bool { + self.with_state(|state| state.dialed_peers.contains(peer)).await + } + pub async fn await_call_count(&self, count: usize) { let mut attempts = 0; while self.call_count().await < count {