diff --git a/applications/minotari_console_wallet/src/automation/commands.rs b/applications/minotari_console_wallet/src/automation/commands.rs index 4e27f6d711..960cbf5873 100644 --- a/applications/minotari_console_wallet/src/automation/commands.rs +++ b/applications/minotari_console_wallet/src/automation/commands.rs @@ -399,7 +399,7 @@ async fn set_base_node_peer( println!("Setting base node peer..."); println!("{}::{}", public_key, address); wallet - .set_base_node_peer(public_key.clone(), Some(address.clone())) + .set_base_node_peer(public_key.clone(), Some(address.clone()), None) .await?; Ok((public_key, address)) } @@ -1885,17 +1885,18 @@ pub async fn command_runner( let peer_config = PeerConfig::new(selected_base_node, base_node_peers, peer_seeds); - let base_node = peer_config - .get_base_node_peer() + let base_nodes = peer_config + .get_base_node_peers() .map_err(|e| CommandError::General(e.to_string()))?; new_wallet .set_base_node_peer( - base_node.public_key.clone(), + base_nodes[0].public_key.clone(), Some( - base_node + base_nodes[0] .last_address_used() .ok_or(CommandError::General("No address found".to_string()))?, ), + Some(base_nodes), ) .await .map_err(|e| CommandError::General(e.to_string()))?; diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index 2b47fcaea7..59707a0c2f 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -263,7 +263,7 @@ impl wallet_server::Wallet for WalletGrpcServer { println!("{}::{}", public_key, net_address); let mut wallet = self.wallet.clone(); wallet - .set_base_node_peer(public_key.clone(), Some(net_address.clone())) + .set_base_node_peer(public_key.clone(), Some(net_address.clone()), None) .await .map_err(|e| Status::internal(format!("{:?}", e)))?; diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index f8c04058a9..7537e9c4fb 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -25,6 +25,7 @@ use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant}; use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled}; +use digest::crypto_common::rand_core::OsRng; use log::*; use minotari_app_utilities::{consts, identity_management::setup_node_identity}; #[cfg(feature = "ledger")] @@ -41,6 +42,7 @@ use minotari_wallet::{ WalletConfig, WalletSqlite, }; +use rand::prelude::SliceRandom; use rpassword::prompt_password_stdout; use rustyline::Editor; use tari_common::{ @@ -570,18 +572,29 @@ fn setup_identity_from_db( /// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols. pub async fn start_wallet( wallet: &mut WalletSqlite, - base_node: &Peer, + base_nodes: &[Peer], wallet_mode: &WalletMode, ) -> Result<(), ExitError> { debug!(target: LOG_TARGET, "Setting base node peer"); - let net_address = base_node + if base_nodes.is_empty() { + return Err(ExitError::new( + ExitCode::WalletError, + "No base nodes configured to connect to", + )); + } + let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty"); + let net_address = selected_base_node .addresses .best() .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?; wallet - .set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone())) + .set_base_node_peer( + selected_base_node.public_key.clone(), + Some(net_address.address().clone()), + Some(base_nodes.to_vec()), + ) .await .map_err(|e| { ExitError::new( diff --git a/applications/minotari_console_wallet/src/lib.rs b/applications/minotari_console_wallet/src/lib.rs index 7c11e535b2..fab816ddcb 100644 --- a/applications/minotari_console_wallet/src/lib.rs +++ b/applications/minotari_console_wallet/src/lib.rs @@ -229,12 +229,12 @@ pub fn run_wallet_with_cli( &mut wallet, cli.non_interactive_mode, ))?; - let base_node_selected = base_node_config.get_base_node_peer()?; + let base_nodes_peers = base_node_config.get_base_node_peers()?; let wallet_mode = wallet_mode(&cli, boot_mode); // start wallet - runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?; + runtime.block_on(start_wallet(&mut wallet, &base_nodes_peers, &wallet_mode))?; debug!(target: LOG_TARGET, "Starting app"); diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index 686648a0ed..9fe1cfb3d5 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -1036,6 +1036,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; @@ -1061,6 +1062,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; @@ -1099,6 +1101,7 @@ impl AppStateInner { .set_base_node_peer( previous.public_key.clone(), Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None, ) .await?; diff --git a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs index 7069102107..3d08315bf0 100644 --- a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -244,7 +244,7 @@ impl WalletEventMonitor { _ = base_node_changed.changed() => { let peer = base_node_changed.borrow().as_ref().cloned(); if let Some(peer) = peer { - self.trigger_base_node_peer_refresh(peer).await; + self.trigger_base_node_peer_refresh(peer.get_current_peer()).await; self.trigger_balance_refresh(); } } diff --git a/applications/minotari_console_wallet/src/wallet_modes.rs b/applications/minotari_console_wallet/src/wallet_modes.rs index eb6281eba8..c28da1add4 100644 --- a/applications/minotari_console_wallet/src/wallet_modes.rs +++ b/applications/minotari_console_wallet/src/wallet_modes.rs @@ -89,24 +89,15 @@ impl PeerConfig { /// Get the prioritised base node peer from the PeerConfig. /// 1. Custom Base Node - /// 2. First configured Base Node Peer - /// 3. Random configured Peer Seed - pub fn get_base_node_peer(&self) -> Result { + /// 2. All configured Base Node Peers (a random node will be prioritised) + /// 3. All configured Peer Seeds (a random node will be prioritised) + pub fn get_base_node_peers(&self) -> Result, ExitError> { if let Some(base_node) = self.base_node_custom.clone() { - Ok(base_node) + Ok(vec![base_node]) } else if !self.base_node_peers.is_empty() { - Ok(self - .base_node_peers - .first() - .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))? - .clone()) + Ok(self.base_node_peers.clone()) } else if !self.peer_seeds.is_empty() { - // pick a random peer seed - Ok(self - .peer_seeds - .choose(&mut OsRng) - .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))? - .clone()) + Ok(self.peer_seeds.clone()) } else { Err(ExitError::new( ExitCode::ConfigError, diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index c7c38c1d87..71c2cbc421 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -136,7 +136,7 @@ where timer.elapsed().as_millis() ); - let base_node_id = match self.wallet_connectivity.get_current_base_node_id() { + let base_node_id = match self.wallet_connectivity.get_current_base_node_peer_node_id() { Some(n) => n, None => continue, }; diff --git a/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs new file mode 100644 index 0000000000..2a125f25a3 --- /dev/null +++ b/base_layer/wallet/src/connectivity_service/base_node_peer_manager.rs @@ -0,0 +1,82 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::fmt::Display; + +use tari_comms::peer_manager::Peer; +use tari_utilities::hex::Hex; + +use crate::connectivity_service::WalletConnectivityError; + +/// The selected peer is a current base node and an optional list of backup peers. +#[derive(Clone)] +pub struct BaseNodePeerManager { + // The current base node that the wallet is connected to + current_peer_index: usize, + // The other base nodes that the wallet can connect to if the selected peer is not available + peer_list: Vec, +} + +impl BaseNodePeerManager { + /// Create a new BaseNodePeerManager, with the preferred peer index and a list of peers. + pub fn new(preferred_peer_index: usize, peer_list: Vec) -> Result { + if preferred_peer_index >= peer_list.len() { + return Err(WalletConnectivityError::PeerIndexOutOfBounds(format!( + "Preferred index: {}, Max index: {}", + preferred_peer_index, + peer_list.len() - 1 + ))); + } + Ok(Self { + current_peer_index: preferred_peer_index, + peer_list, + }) + } + + /// Get the current peer + pub fn get_current_peer(&self) -> Peer { + self.peer_list + .get(self.current_peer_index) + .cloned() + .unwrap_or(self.peer_list[0].clone()) + } + + /// Get the next peer in the list + pub fn get_next_peer(&mut self) -> Peer { + self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len(); + self.peer_list[self.current_peer_index].clone() + } +} + +impl Display for BaseNodePeerManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BaseNodePeerManager {{ current_peer_index: {}, peer_list: {:?} }}", + self.current_peer_index, + self.peer_list + .iter() + .map(|p| (p.node_id.to_hex(), p.public_key.to_hex())) + .collect::>() + ) + } +} diff --git a/base_layer/wallet/src/connectivity_service/error.rs b/base_layer/wallet/src/connectivity_service/error.rs index a151003faf..33bc788070 100644 --- a/base_layer/wallet/src/connectivity_service/error.rs +++ b/base_layer/wallet/src/connectivity_service/error.rs @@ -31,6 +31,8 @@ pub enum WalletConnectivityError { ConnectivityError(#[from] ConnectivityError), #[error("Service is terminated and can no longer response to requests")] ServiceTerminated, + #[error("Preferred peer index is out of bounds: {0}")] + PeerIndexOutOfBounds(String), } impl From for WalletConnectivityError { diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 94bdde2cb7..af9bb72fc4 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -29,7 +29,10 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync use tokio::sync::{mpsc, oneshot, watch}; use super::service::OnlineStatus; -use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch}; +use crate::{ + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, + util::watch::Watch, +}; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), @@ -39,14 +42,14 @@ pub enum WalletConnectivityRequest { #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, ) -> Self { Self { @@ -59,16 +62,16 @@ impl WalletConnectivityHandle { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityHandle { - fn set_base_node(&mut self, base_node_peer: Peer) { - if let Some(peer) = self.base_node_watch.borrow().as_ref() { - if peer.public_key == base_node_peer.public_key { + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) { + if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() { + if selected_peer.get_current_peer().public_key == base_node_peer.get_current_peer().public_key { return; } } self.base_node_watch.send(Some(base_node_peer)); } - fn get_current_base_node_watcher(&self) -> watch::Receiver> { + fn get_current_base_node_watcher(&self) -> watch::Receiver> { self.base_node_watch.get_receiver() } @@ -120,15 +123,24 @@ impl WalletConnectivityInterface for WalletConnectivityHandle { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().clone() + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().public_key.clone()) } - fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + fn get_current_base_node_peer_node_id(&self) -> Option { + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/initializer.rs b/base_layer/wallet/src/connectivity_service/initializer.rs index 3df03b5254..cd4747c33b 100644 --- a/base_layer/wallet/src/connectivity_service/initializer.rs +++ b/base_layer/wallet/src/connectivity_service/initializer.rs @@ -64,13 +64,8 @@ impl ServiceInitializer for WalletConnectivityInitializer { context.spawn_until_shutdown(move |handles| { let connectivity = handles.expect_handle(); - let service = WalletConnectivityService::new( - config, - receiver, - base_node_watch.get_receiver(), - online_status_watch, - connectivity, - ); + let service = + WalletConnectivityService::new(config, receiver, base_node_watch, online_status_watch, connectivity); service.start() }); diff --git a/base_layer/wallet/src/connectivity_service/interface.rs b/base_layer/wallet/src/connectivity_service/interface.rs index 8a5527ce7c..e0a3639b2f 100644 --- a/base_layer/wallet/src/connectivity_service/interface.rs +++ b/base_layer/wallet/src/connectivity_service/interface.rs @@ -30,13 +30,13 @@ use tari_comms::{ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; use tokio::sync::watch; -use crate::connectivity_service::OnlineStatus; +use crate::connectivity_service::{BaseNodePeerManager, OnlineStatus}; #[async_trait::async_trait] pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { - fn set_base_node(&mut self, base_node_peer: Peer); + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager); - fn get_current_base_node_watcher(&self) -> watch::Receiver>; + fn get_current_base_node_watcher(&self) -> watch::Receiver>; /// Obtain a BaseNodeWalletRpcClient. /// @@ -72,7 +72,7 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { fn get_current_base_node_peer_public_key(&self) -> Option; - fn get_current_base_node_id(&self) -> Option; + fn get_current_base_node_peer_node_id(&self) -> Option; fn is_base_node_set(&self) -> bool; } diff --git a/base_layer/wallet/src/connectivity_service/mock.rs b/base_layer/wallet/src/connectivity_service/mock.rs index 11228b6661..f61625ec32 100644 --- a/base_layer/wallet/src/connectivity_service/mock.rs +++ b/base_layer/wallet/src/connectivity_service/mock.rs @@ -29,7 +29,7 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync use tokio::sync::watch::Receiver; use crate::{ - connectivity_service::{OnlineStatus, WalletConnectivityInterface}, + connectivity_service::{BaseNodePeerManager, OnlineStatus, WalletConnectivityInterface}, util::watch::Watch, }; @@ -40,7 +40,7 @@ pub fn create() -> WalletConnectivityMock { #[derive(Clone)] pub struct WalletConnectivityMock { online_status_watch: Watch, - base_node_watch: Watch>, + base_node_watch: Watch>, base_node_wallet_rpc_client: Watch>>, base_node_sync_rpc_client: Watch>>, } @@ -65,11 +65,11 @@ impl WalletConnectivityMock { self.base_node_sync_rpc_client.send(Some(RpcClientLease::new(client))); } - pub fn notify_base_node_set(&self, base_node_peer: Peer) { + pub fn notify_base_node_set(&self, base_node_peer: BaseNodePeerManager) { self.base_node_watch.send(Some(base_node_peer)); } - pub async fn base_node_changed(&mut self) -> Option { + pub async fn base_node_changed(&mut self) -> Option { self.base_node_watch.changed().await; self.base_node_watch.borrow().as_ref().cloned() } @@ -82,11 +82,11 @@ impl WalletConnectivityMock { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityMock { - fn set_base_node(&mut self, base_node_peer: Peer) { + fn set_base_node(&mut self, base_node_peer: BaseNodePeerManager) { self.notify_base_node_set(base_node_peer); } - fn get_current_base_node_watcher(&self) -> Receiver> { + fn get_current_base_node_watcher(&self) -> Receiver> { self.base_node_watch.get_receiver() } @@ -121,15 +121,24 @@ impl WalletConnectivityInterface for WalletConnectivityMock { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().as_ref().cloned() + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().public_key.clone()) } - fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + fn get_current_base_node_peer_node_id(&self) -> Option { + self.base_node_watch + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/mod.rs b/base_layer/wallet/src/connectivity_service/mod.rs index 75b99ed330..c425953332 100644 --- a/base_layer/wallet/src/connectivity_service/mod.rs +++ b/base_layer/wallet/src/connectivity_service/mod.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod error; + pub use error::WalletConnectivityError; mod handle; @@ -40,3 +41,6 @@ pub use mock::{create as create_wallet_connectivity_mock, WalletConnectivityMock mod interface; pub use interface::WalletConnectivityInterface; + +mod base_node_peer_manager; +pub use base_node_peer_manager::BaseNodePeerManager; diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index f880739bfb..39e276ebe5 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -25,7 +25,7 @@ use std::{mem, time::Duration}; use log::*; use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester}, - peer_manager::{NodeId, Peer}, + peer_manager::NodeId, protocol::rpc::{RpcClientLease, RpcClientPool}, Minimized, PeerConnection, @@ -39,12 +39,12 @@ use tokio::{ use crate::{ base_node_service::config::BaseNodeServiceConfig, - connectivity_service::{error::WalletConnectivityError, handle::WalletConnectivityRequest}, + connectivity_service::{error::WalletConnectivityError, handle::WalletConnectivityRequest, BaseNodePeerManager}, util::watch::Watch, }; const LOG_TARGET: &str = "wallet::connectivity"; -const CONNECTIVITY_WAIT: u64 = 5; +pub(crate) const CONNECTIVITY_WAIT: u64 = 5; /// Connection status of the Base Node #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -58,7 +58,8 @@ pub struct WalletConnectivityService { config: BaseNodeServiceConfig, request_receiver: mpsc::Receiver, connectivity: ConnectivityRequester, - base_node_watch: watch::Receiver>, + base_node_watch_receiver: watch::Receiver>, + base_node_watch: Watch>, pools: Option, online_status_watch: Watch, pending_requests: Vec, @@ -73,7 +74,7 @@ impl WalletConnectivityService { pub(super) fn new( config: BaseNodeServiceConfig, request_receiver: mpsc::Receiver, - base_node_watch: watch::Receiver>, + base_node_watch: Watch>, online_status_watch: Watch, connectivity: ConnectivityRequester, ) -> Self { @@ -81,6 +82,7 @@ impl WalletConnectivityService { config, request_receiver, connectivity, + base_node_watch_receiver: base_node_watch.get_receiver(), base_node_watch, pools: None, pending_requests: Vec::new(), @@ -99,8 +101,8 @@ impl WalletConnectivityService { // BIASED: select branches are in order of priority biased; - Ok(_) = self.base_node_watch.changed() => { - if self.base_node_watch.borrow().is_some() { + Ok(_) = self.base_node_watch_receiver.changed() => { + if self.base_node_watch_receiver.borrow().is_some() { // This will block the rest until the connection is established. This is what we want. self.setup_base_node_connection().await; } @@ -176,7 +178,7 @@ impl WalletConnectivityService { }, None => { self.pending_requests.push(reply.into()); - if self.base_node_watch.borrow().is_none() { + if self.base_node_watch_receiver.borrow().is_none() { warn!( target: LOG_TARGET, "{} requests are waiting for base node to be set", @@ -209,7 +211,7 @@ impl WalletConnectivityService { }, None => { self.pending_requests.push(reply.into()); - if self.base_node_watch.borrow().is_none() { + if self.base_node_watch_receiver.borrow().is_none() { warn!( target: LOG_TARGET, "{} requests are waiting for base node to be set", @@ -221,7 +223,14 @@ impl WalletConnectivityService { } fn current_base_node(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + self.base_node_watch_receiver + .borrow() + .as_ref() + .map(|p| p.get_current_peer().node_id.clone()) + } + + fn get_base_node_peer_manager(&self) -> Option { + self.base_node_watch_receiver.borrow().as_ref().map(|p| p.clone()) } async fn disconnect_base_node(&mut self, node_id: NodeId) { @@ -235,20 +244,28 @@ impl WalletConnectivityService { } async fn setup_base_node_connection(&mut self) { + let mut initial_connect = true; self.pools = None; + let mut peer_manager = if let Some(val) = self.get_base_node_peer_manager() { + val + } else { + self.set_online_status(OnlineStatus::Offline); + return; + }; + self.set_online_status(OnlineStatus::Connecting); + trace!(target: LOG_TARGET, "Setup base node connection to: {}", peer_manager); loop { - let node_id = match self.current_base_node() { - Some(n) => n, - None => { - self.set_online_status(OnlineStatus::Offline); - return; - }, + let node_id = if initial_connect { + initial_connect = false; + peer_manager.get_current_peer().node_id + } else { + peer_manager.get_next_peer().node_id }; + debug!( target: LOG_TARGET, "Attempting to connect to base node peer {}...", node_id ); - self.set_online_status(OnlineStatus::Connecting); match self.try_setup_rpc_pool(node_id.clone()).await { Ok(true) => { self.set_online_status(OnlineStatus::Online); @@ -271,7 +288,6 @@ impl WalletConnectivityService { "Dial was cancelled. Retrying after {}s ...", self.config.base_node_monitor_max_refresh_interval.as_secs() ); - self.set_online_status(OnlineStatus::Offline); time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await; continue; }, @@ -279,13 +295,20 @@ impl WalletConnectivityService { warn!(target: LOG_TARGET, "{}", e); if self.current_base_node().as_ref() == Some(&node_id) { self.disconnect_base_node(node_id).await; - self.set_online_status(OnlineStatus::Offline); time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await; } continue; }, } } + + if let Some(val) = self.get_base_node_peer_manager() { + if peer_manager.get_current_peer().public_key != val.get_current_peer().public_key { + self.base_node_watch.send(Some(peer_manager)); + } + } else { + self.base_node_watch.send(Some(peer_manager)); + } } fn set_online_status(&self, status: OnlineStatus) { @@ -319,7 +342,7 @@ impl WalletConnectivityService { tokio::select! { biased; - _ = self.base_node_watch.changed() => { + _ = self.base_node_watch_receiver.changed() => { Ok(None) } result = self.connectivity.dial_peer(peer) => { diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index a0da32e40d..7b8739d7e6 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -21,11 +21,11 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use core::convert; -use std::{iter, sync::Arc}; +use std::{iter, sync::Arc, time::Duration}; use futures::future; use tari_comms::{ - peer_manager::PeerFeatures, + peer_manager::{NodeId, PeerFeatures}, protocol::rpc::{ mock::{MockRpcImpl, MockRpcServer}, RpcPoolClient, @@ -41,11 +41,12 @@ use tari_test_utils::runtime::spawn_until_shutdown; use tokio::{ sync::{mpsc, Barrier}, task, + time::{sleep, timeout}, }; -use super::service::WalletConnectivityService; +use super::service::{WalletConnectivityService, CONNECTIVITY_WAIT}; use crate::{ - connectivity_service::{OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, + connectivity_service::{BaseNodePeerManager, OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, util::watch::Watch, }; @@ -65,7 +66,7 @@ async fn setup() -> ( let service = WalletConnectivityService::new( Default::default(), rx, - base_node_watch.get_receiver(), + base_node_watch, online_status_watch, connectivity, ); @@ -87,7 +88,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Set the mock to defer returning a result for the peer connection mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Wait for connection request mock_state.await_call_count(1).await; @@ -110,7 +111,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); @@ -142,7 +143,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.add_active_connection(conn2).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer1.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer1.to_peer()]).unwrap()); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; @@ -153,7 +154,7 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); // Initiate a connection to the base node - handle.set_base_node(base_node_peer2.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer2.to_peer()]).unwrap()); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; @@ -162,6 +163,62 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); } +async fn wait_for_peers_to_be_dialed( + mock_state: &ConnectivityManagerMockState, + peers: &[&NodeId], + timeout_duration: Duration, +) { + let check_interval = Duration::from_millis(100); // Interval to check the condition + + let result = timeout(timeout_duration, async { + loop { + let all_dialed = futures::future::join_all(peers.iter().map(|peer| mock_state.is_peer_dialed(peer))).await; + if all_dialed.into_iter().all(|dialed| dialed) { + break; + } + sleep(check_interval).await; + } + }) + .await; + + if result.is_err() { + panic!("Timeout reached while waiting for all peers to be dialed"); + } +} + +#[tokio::test] +async fn it_changes_to_a_new_base_node_if_preferred_is_offline() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + let (mut handle, mock_server, mock_state, _shutdown) = setup().await; + let base_node_peer1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let base_node_peer2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let conn2 = mock_server.create_mockimpl_connection(base_node_peer2.to_peer()).await; + + mock_state.add_active_connection(conn2).await; + + // Initiate a connection to the base node + handle.set_base_node( + BaseNodePeerManager::new(0, vec![base_node_peer1.to_peer(), base_node_peer2.to_peer()]).unwrap(), + ); + + mock_state.await_call_count(2).await; + + wait_for_peers_to_be_dialed( + &mock_state, + &[base_node_peer1.node_id(), base_node_peer2.node_id()], + Duration::from_secs(2 * CONNECTIVITY_WAIT), + ) + .await; + + assert!(mock_state.count_calls_containing("DialPeer").await >= 2); + let _result = mock_state.take_calls().await; + + let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap(); + assert!(rpc_client.is_connected()); + + handle.get_current_base_node_peer().unwrap(); +} + #[tokio::test] async fn it_gracefully_handles_connect_fail_reconnect() { let (mut handle, mock_server, mock_state, _shutdown) = setup().await; @@ -172,7 +229,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; @@ -212,7 +269,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(BaseNodePeerManager::new(0, vec![base_node_peer.to_peer()]).unwrap()); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; diff --git a/base_layer/wallet/src/error.rs b/base_layer/wallet/src/error.rs index 1ce1cbed13..ccd5759734 100644 --- a/base_layer/wallet/src/error.rs +++ b/base_layer/wallet/src/error.rs @@ -41,6 +41,7 @@ use thiserror::Error; use crate::{ base_node_service::error::BaseNodeServiceError, + connectivity_service::WalletConnectivityError, output_manager_service::error::OutputManagerError, storage::database::DbKey, transaction_service::error::TransactionServiceError, @@ -103,6 +104,8 @@ pub enum WalletError { UnexpectedApiResponse { method: String, api: String }, #[error("Public address not set for this wallet")] PublicAddressNotSet, + #[error("Wallet connectivity error: `{0}`")] + WalletConnectivityError(#[from] WalletConnectivityError), } pub const LOG_TARGET: &str = "minotari::application"; diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index ff7921a1dd..5f12775b3a 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -556,7 +556,7 @@ where let current_base_node = self .resources .connectivity - .get_current_base_node_id() + .get_current_base_node_peer_node_id() .ok_or(OutputManagerError::NoBaseNodeKeysProvided)?; let id = OsRng.next_u64(); let txo_validation = TxoValidationTask::new( @@ -634,7 +634,7 @@ where }, _ = base_node_watch.changed() => { if let Some(peer) = base_node_watch.borrow().as_ref() { - if peer.node_id != current_base_node { + if peer.get_current_peer().node_id != current_base_node { debug!( target: LOG_TARGET, "TXO Validation Protocol (Id: {}) cancelled because base node changed", id diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index 4abd7d683e..e77114317e 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -28,7 +28,7 @@ use std::{ use chrono::{Duration, Utc}; use log::*; use tari_common_types::types::{BlockHash, FixedHash}; -use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed}; +use tari_comms::protocol::rpc::RpcError::RequestFailed; use tari_core::{ base_node::rpc::BaseNodeWalletRpcClient, blocks::BlockHeader, @@ -38,7 +38,7 @@ use tari_utilities::hex::Hex; use tokio::sync::watch; use crate::{ - connectivity_service::WalletConnectivityInterface, + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, output_manager_service::{ config::OutputManagerServiceConfig, error::{OutputManagerError, OutputManagerProtocolError, OutputManagerProtocolErrorExt}, @@ -56,7 +56,7 @@ const LOG_TARGET: &str = "wallet::output_service::txo_validation_task"; pub struct TxoValidationTask { operation_id: u64, db: OutputManagerDatabase, - base_node_watch: watch::Receiver>, + base_node_watch: watch::Receiver>, connectivity: TWalletConnectivity, event_publisher: OutputManagerEventSender, config: OutputManagerServiceConfig, @@ -103,7 +103,7 @@ where .base_node_watch .borrow() .as_ref() - .map(|p| p.node_id.clone()) + .map(|p| p.get_current_peer().node_id.clone()) .ok_or_else(|| OutputManagerProtocolError::new(self.operation_id, OutputManagerError::BaseNodeChanged))?; debug!( target: LOG_TARGET, diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index 4f4e32efb3..c0405c397f 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -136,10 +136,12 @@ where loop { tokio::select! { _ = current_base_node_watcher.changed() => { - if let Some(peer) = &*current_base_node_watcher.borrow() { + if let Some(selected_peer) = &*current_base_node_watcher.borrow() { info!( target: LOG_TARGET, - "Transaction Broadcast protocol (TxId: {}) Base Node Public key updated to {} (NodeID: {})", self.tx_id, peer.public_key, peer.node_id + "Transaction Broadcast protocol (TxId: {}) Base Node Public key updated to {} (NodeID: {})", + self.tx_id, selected_peer.get_current_peer().public_key, + selected_peer.get_current_peer().node_id, ); } self.last_rejection = None; @@ -154,7 +156,10 @@ where }, TxBroadcastMode::TransactionQuery => { if result? { - debug!(target: LOG_TARGET, "Transaction broadcast, transaction validation protocol will continue from here"); + debug!( + target: LOG_TARGET, + "Transaction broadcast, transaction validation protocol will continue from here" + ); return Ok(self.tx_id) } }, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 86582384a7..58c7bd1c1e 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -3241,7 +3241,7 @@ where let current_base_node = self .resources .connectivity - .get_current_base_node_id() + .get_current_base_node_peer_node_id() .ok_or(TransactionServiceError::NoBaseNodeKeysProvided)?; trace!(target: LOG_TARGET, "Starting transaction validation protocol"); @@ -3273,8 +3273,8 @@ where return result; }, _ = base_node_watch.changed() => { - if let Some(peer) = base_node_watch.borrow().as_ref() { - if peer.node_id != current_base_node { + if let Some(selected_peer) = base_node_watch.borrow().as_ref() { + if selected_peer.get_current_peer().node_id != current_base_node { debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol"); return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged { task_name: "transaction validation_protocol", diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs index 4bfc99d0fe..f0f13e5134 100644 --- a/base_layer/wallet/src/utxo_scanner_service/service.rs +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -24,7 +24,7 @@ use chrono::NaiveDateTime; use futures::FutureExt; use log::*; use tari_common_types::{tari_address::TariAddress, types::HashOutput}; -use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey}; +use tari_comms::{connectivity::ConnectivityRequester, types::CommsPublicKey}; use tari_core::transactions::{tari_amount::MicroMinotari, CryptoFactories}; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{ @@ -34,7 +34,7 @@ use tokio::{ use crate::{ base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle}, - connectivity_service::WalletConnectivityInterface, + connectivity_service::{BaseNodePeerManager, WalletConnectivityInterface}, error::WalletError, output_manager_service::handle::OutputManagerHandle, storage::database::{WalletBackend, WalletDatabase}, @@ -161,9 +161,9 @@ where } _ = self.resources.current_base_node_watcher.changed() => { debug!(target: LOG_TARGET, "Base node change detected."); - let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); - if let Some(peer) = peer { - self.peer_seeds = vec![peer.public_key]; + let selected_peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); + if let Some(peer) = selected_peer { + self.peer_seeds = vec![peer.get_current_peer().public_key]; } local_shutdown.trigger(); }, @@ -190,7 +190,7 @@ pub struct UtxoScannerResources { pub db: WalletDatabase, pub comms_connectivity: ConnectivityRequester, pub wallet_connectivity: TWalletConnectivity, - pub current_base_node_watcher: watch::Receiver>, + pub current_base_node_watcher: watch::Receiver>, pub output_manager_service: OutputManagerHandle, pub transaction_service: TransactionServiceHandle, pub one_sided_tari_address: TariAddress, diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index 92045db1f3..99def52a02 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -206,7 +206,7 @@ where async fn attempt_sync(&mut self, peer: NodeId) -> Result<(u64, u64, MicroMinotari, Duration), UtxoScannerError> { self.publish_event(UtxoScannerEvent::ConnectingToBaseNode(peer.clone())); - let selected_peer = self.resources.wallet_connectivity.get_current_base_node_id(); + let selected_peer = self.resources.wallet_connectivity.get_current_base_node_peer_node_id(); let mut client = if selected_peer.map(|p| p == peer).unwrap_or(false) { // Use the wallet connectivity service so that RPC pools are correctly managed diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2022b97de9..644aea0cd8 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -85,7 +85,12 @@ use tari_utilities::{hex::Hex, ByteArray}; use crate::{ base_node_service::{handle::BaseNodeServiceHandle, BaseNodeServiceInitializer}, config::WalletConfig, - connectivity_service::{WalletConnectivityHandle, WalletConnectivityInitializer, WalletConnectivityInterface}, + connectivity_service::{ + BaseNodePeerManager, + WalletConnectivityHandle, + WalletConnectivityInitializer, + WalletConnectivityInterface, + }, consts, error::{WalletError, WalletStorageError}, output_manager_service::{ @@ -381,13 +386,14 @@ where &mut self, public_key: CommsPublicKey, address: Option, + backup_peers: Option>, ) -> Result<(), WalletError> { info!( "Wallet setting base node peer, public key: {}, net address: {:?}.", public_key, address ); - if let Some(current_node) = self.wallet_connectivity.get_current_base_node_id() { + if let Some(current_node) = self.wallet_connectivity.get_current_base_node_peer_node_id() { self.comms .connectivity() .remove_peer_from_allow_list(current_node) @@ -396,6 +402,7 @@ where let peer_manager = self.comms.peer_manager(); let mut connectivity = self.comms.connectivity(); + let mut backup_peers = backup_peers.unwrap_or_default(); if let Some(mut current_peer) = peer_manager.find_by_public_key(&public_key).await? { // Only invalidate the identity signature if addresses are different if address.is_some() { @@ -415,7 +422,13 @@ where connectivity .add_peer_to_allow_list(current_peer.node_id.clone()) .await?; - self.wallet_connectivity.set_base_node(current_peer); + let mut peer_list = vec![current_peer]; + if let Some(pos) = backup_peers.iter().position(|p| p.public_key == public_key) { + backup_peers.remove(pos); + } + peer_list.append(&mut backup_peers); + self.wallet_connectivity + .set_base_node(BaseNodePeerManager::new(0, peer_list)?); } else { let node_id = NodeId::from_key(&public_key); if address.is_none() { @@ -430,7 +443,7 @@ where }); } let peer = Peer::new( - public_key, + public_key.clone(), node_id, MultiaddressesWithStats::from_addresses_with_source(vec![address.unwrap()], &PeerAddressSource::Config), PeerFlags::empty(), @@ -440,7 +453,13 @@ where ); peer_manager.add_peer(peer.clone()).await?; connectivity.add_peer_to_allow_list(peer.node_id.clone()).await?; - self.wallet_connectivity.set_base_node(peer); + let mut peer_list = vec![peer]; + if let Some(pos) = backup_peers.iter().position(|p| p.public_key == public_key) { + backup_peers.remove(pos); + } + peer_list.append(&mut backup_peers); + self.wallet_connectivity + .set_base_node(BaseNodePeerManager::new(0, peer_list)?); } Ok(()) diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index 84c531bac6..e788e093d0 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use minotari_wallet::{ base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle}, - connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityMock}, + connectivity_service::{create_wallet_connectivity_mock, BaseNodePeerManager, WalletConnectivityMock}, output_manager_service::{ config::OutputManagerServiceConfig, error::{OutputManagerError, OutputManagerStorageError}, @@ -135,7 +135,8 @@ async fn setup_output_manager_service( let mut wallet_connectivity_mock = create_wallet_connectivity_mock(); let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE); - wallet_connectivity_mock.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity_mock + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); wallet_connectivity_mock.base_node_changed().await; let service = BaseNodeWalletRpcMockService::new(); diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 0f860664f5..15fea49f58 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -41,6 +41,7 @@ use minotari_wallet::{ base_node_service::{config::BaseNodeServiceConfig, handle::BaseNodeServiceHandle, BaseNodeServiceInitializer}, connectivity_service::{ create_wallet_connectivity_mock, + BaseNodePeerManager, WalletConnectivityHandle, WalletConnectivityInitializer, WalletConnectivityInterface, @@ -361,7 +362,7 @@ async fn setup_transaction_service_no_comms( wallet_connectivity_service_mock .set_base_node_wallet_rpc_client(connect_rpc_client(&mut rpc_server_connection).await); - wallet_connectivity_service_mock.set_base_node(node_identity.to_peer()); + wallet_connectivity_service_mock.set_base_node(BaseNodePeerManager::new(0, vec![node_identity.to_peer()]).unwrap()); wallet_connectivity_service_mock.base_node_changed().await; let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap(); @@ -3072,11 +3073,13 @@ async fn test_power_mode_updates() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_ts_interface.base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap()); alice_ts_interface .wallet_connectivity_service_mock - .notify_base_node_set(alice_ts_interface.base_node_identity.to_peer()); + .notify_base_node_set( + BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap(), + ); alice_ts_interface .base_node_rpc_mock_state @@ -4293,7 +4296,7 @@ async fn test_restarting_transaction_protocols() { bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![base_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle .restart_transaction_protocols() @@ -4333,7 +4336,7 @@ async fn test_restarting_transaction_protocols() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![base_node_identity.to_peer()]).unwrap()); assert!(alice_ts_interface .transaction_service_handle @@ -4691,7 +4694,7 @@ async fn test_resend_on_startup() { // Need to set something for alices base node, doesn't matter what alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(alice_ts_interface .transaction_service_handle @@ -4741,7 +4744,7 @@ async fn test_resend_on_startup() { // Need to set something for alices base node, doesn't matter what alice2_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(alice2_ts_interface .transaction_service_handle @@ -4825,7 +4828,7 @@ async fn test_resend_on_startup() { // Need to set something for bobs base node, doesn't matter what bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle @@ -4872,7 +4875,7 @@ async fn test_resend_on_startup() { // Need to set something for bobs base node, doesn't matter what bob2_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_node_identity.to_peer()]).unwrap()); assert!(bob2_ts_interface .transaction_service_handle @@ -5224,7 +5227,7 @@ async fn test_transaction_timeout_cancellation() { // Need to set something for bobs base node, doesn't matter what bob_ts_interface .wallet_connectivity_service_mock - .set_base_node(bob_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![bob_node_identity.to_peer()]).unwrap()); assert!(bob_ts_interface .transaction_service_handle .restart_broadcast_protocols() @@ -5330,7 +5333,7 @@ async fn transaction_service_tx_broadcast() { alice_ts_interface .wallet_connectivity_service_mock - .set_base_node(alice_ts_interface.base_node_identity.to_peer()); + .set_base_node(BaseNodePeerManager::new(0, vec![alice_ts_interface.base_node_identity.to_peer()]).unwrap()); let connection2 = make_wallet_database_memory_connection(); let mut bob_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection2, None).await; diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index 867eab6688..3a077193a7 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -26,7 +26,7 @@ use chacha20poly1305::{Key, KeyInit, XChaCha20Poly1305}; use chrono::Utc; use futures::StreamExt; use minotari_wallet::{ - connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityMock}, + connectivity_service::{create_wallet_connectivity_mock, BaseNodePeerManager, WalletConnectivityMock}, output_manager_service::{ error::OutputManagerError, handle::{OutputManagerHandle, OutputManagerRequest, OutputManagerResponse}, @@ -277,7 +277,8 @@ async fn tx_broadcast_protocol_submit_success() { ) = setup().await; let mut event_stream = resources.event_publisher.subscribe(); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -363,7 +364,8 @@ async fn tx_broadcast_protocol_submit_rejection() { add_transaction_to_database(1u64.into(), 1 * T, None, resources.db.clone()).await; let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -445,7 +447,8 @@ async fn tx_broadcast_protocol_restart_protocol_as_query() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server @@ -530,7 +533,8 @@ async fn tx_broadcast_protocol_submit_success_followed_by_rejection() { resources.config.broadcast_monitoring_timeout = Duration::from_secs(60); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server @@ -624,7 +628,8 @@ async fn tx_broadcast_protocol_submit_already_mined() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -695,7 +700,8 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { }); let timeout_update_watch = Watch::new(Duration::from_secs(1)); - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![server_node_identity.to_peer()]).unwrap()); // Now we add the connection let mut connection = mock_rpc_server .create_connection(server_node_identity.to_peer(), "t/bnwallet/1".into()) @@ -739,7 +745,8 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { }); // Change Base Node - wallet_connectivity.notify_base_node_set(new_server_node_identity.to_peer()); + wallet_connectivity + .notify_base_node_set(BaseNodePeerManager::new(0, vec![new_server_node_identity.to_peer()]).unwrap()); // Wait for 1 query let _schnorr_signatures = new_rpc_service_state diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 0949e15b32..4bb759d024 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -6879,10 +6879,11 @@ pub unsafe extern "C" fn wallet_set_base_node_peer( } }; - if let Err(e) = (*wallet) - .runtime - .block_on((*wallet).wallet.set_base_node_peer((*public_key).clone(), parsed_addr)) - { + if let Err(e) = (*wallet).runtime.block_on((*wallet).wallet.set_base_node_peer( + (*public_key).clone(), + parsed_addr, + None, + )) { error = LibWalletError::from(e).code; ptr::swap(error_out, &mut error as *mut c_int); return false; diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index 52e88f60b9..b66cdaa523 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -148,6 +148,10 @@ impl ConnectivityManagerMockState { assert!(is_found, "expected call to dial peer {} but no dial was found", peer); } + pub async fn is_peer_dialed(&self, peer: &NodeId) -> bool { + self.with_state(|state| state.dialed_peers.contains(peer)).await + } + pub async fn await_call_count(&self, count: usize) { let mut attempts = 0; while self.call_count().await < count {