From 9523a457345a661e2d94911f5bfec8834a9c9f2b Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 30 Aug 2024 16:43:28 +0200 Subject: [PATCH] wip wip --- .../src/automation/commands.rs | 2 +- .../src/grpc/wallet_grpc_server.rs | 2 +- .../minotari_console_wallet/src/init/mod.rs | 13 +++++++++--- .../minotari_console_wallet/src/lib.rs | 4 ++-- .../src/ui/state/app_state.rs | 3 +++ .../src/wallet_modes.rs | 16 ++++++-------- .../wallet/src/connectivity_service/handle.rs | 21 ++++++++++++------- .../src/connectivity_service/interface.rs | 16 ++++++++++++-- .../wallet/src/connectivity_service/mock.rs | 21 ++++++++++++------- .../wallet/src/connectivity_service/mod.rs | 2 +- .../src/connectivity_service/service.rs | 5 +++++ .../wallet/src/connectivity_service/test.rs | 13 ++++++------ base_layer/wallet/src/wallet.rs | 6 ++++-- .../transaction_service_tests/service.rs | 4 ++-- base_layer/wallet_ffi/src/lib.rs | 2 +- 15 files changed, 83 insertions(+), 47 deletions(-) diff --git a/applications/minotari_console_wallet/src/automation/commands.rs b/applications/minotari_console_wallet/src/automation/commands.rs index 60a552e953..efe1ea385a 100644 --- a/applications/minotari_console_wallet/src/automation/commands.rs +++ b/applications/minotari_console_wallet/src/automation/commands.rs @@ -389,7 +389,7 @@ async fn set_base_node_peer( println!("Setting base node peer..."); println!("{}::{}", public_key, address); wallet - .set_base_node_peer(public_key.clone(), Some(address.clone())) + .set_base_node_peer(public_key.clone(), Some(address.clone()), None) .await?; Ok((public_key, address)) } diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index 7c4909b74f..178e2476c2 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -257,7 +257,7 @@ impl wallet_server::Wallet for WalletGrpcServer { println!("{}::{}", public_key, net_address); let mut wallet = self.wallet.clone(); wallet - .set_base_node_peer(public_key.clone(), Some(net_address.clone())) + .set_base_node_peer(public_key.clone(), Some(net_address.clone()), None) .await .map_err(|e| Status::internal(format!("{:?}", e)))?; diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index 8001616f97..31d84f44ba 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -25,7 +25,9 @@ use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant}; use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled}; +use digest::crypto_common::rand_core::OsRng; use log::*; +use rand::prelude::SliceRandom; use minotari_app_utilities::{consts, identity_management::setup_node_identity}; #[cfg(feature = "ledger")] use minotari_ledger_wallet_comms::accessor_methods::{ledger_get_public_spend_key, ledger_get_view_key}; @@ -569,7 +571,7 @@ fn setup_identity_from_db( /// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols. pub async fn start_wallet( wallet: &mut WalletSqlite, - base_node: &Peer, + base_nodes: Vec, wallet_mode: &WalletMode, ) -> Result<(), ExitError> { // Verify ledger build if wallet type is Ledger @@ -601,13 +603,18 @@ pub async fn start_wallet( debug!(target: LOG_TARGET, "Setting base node peer"); - let net_address = base_node + if base_nodes.is_empty() { + return Err(ExitError::new(ExitCode::WalletError, "No base nodes configured to connect to")); + } + let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty"); + let net_address = selected_base_node .addresses .best() .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?; + let backup_peers = base_nodes.iter().filter(|&v|v != selected_base_node).collect::>(); wallet - .set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone())) + .set_base_node_peer(selected_base_node.public_key.clone(), Some(net_address.address().clone()), Some(base_nodes)) .await .map_err(|e| { ExitError::new( diff --git a/applications/minotari_console_wallet/src/lib.rs b/applications/minotari_console_wallet/src/lib.rs index 5efbcb4207..abea46859e 100644 --- a/applications/minotari_console_wallet/src/lib.rs +++ b/applications/minotari_console_wallet/src/lib.rs @@ -217,12 +217,12 @@ pub fn run_wallet_with_cli( // get base node/s let base_node_config = runtime.block_on(get_base_node_peer_config(config, &mut wallet, cli.non_interactive_mode))?; - let base_node_selected = base_node_config.get_base_node_peer()?; + let base_nodes_selected = base_node_config.get_base_node_peers()?; let wallet_mode = wallet_mode(&cli, boot_mode); // start wallet - runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?; + runtime.block_on(start_wallet(&mut wallet, &base_nodes_selected, &wallet_mode))?; debug!(target: LOG_TARGET, "Starting app"); diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index 513884a3d6..61c06f2bdc 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -1035,6 +1035,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None ) .await?; @@ -1060,6 +1061,7 @@ impl AppStateInner { .set_base_node_peer( peer.public_key.clone(), Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None ) .await?; @@ -1098,6 +1100,7 @@ impl AppStateInner { .set_base_node_peer( previous.public_key.clone(), Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()), + None ) .await?; diff --git a/applications/minotari_console_wallet/src/wallet_modes.rs b/applications/minotari_console_wallet/src/wallet_modes.rs index edffc487a2..caa0341400 100644 --- a/applications/minotari_console_wallet/src/wallet_modes.rs +++ b/applications/minotari_console_wallet/src/wallet_modes.rs @@ -90,23 +90,19 @@ impl PeerConfig { /// Get the prioritised base node peer from the PeerConfig. /// 1. Custom Base Node /// 2. First configured Base Node Peer - /// 3. Random configured Peer Seed - pub fn get_base_node_peer(&self) -> Result { + /// 3. All configured Peer Seeds + pub fn get_base_node_peers(&self) -> Result, ExitError> { if let Some(base_node) = self.base_node_custom.clone() { - Ok(base_node) + Ok(vec![base_node]) } else if !self.base_node_peers.is_empty() { - Ok(self + Ok(vec![self .base_node_peers .first() .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))? - .clone()) + .clone()]) } else if !self.peer_seeds.is_empty() { - // pick a random peer seed Ok(self - .peer_seeds - .choose(&mut OsRng) - .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))? - .clone()) + .peer_seeds.clone()) } else { Err(ExitError::new( ExitCode::ConfigError, diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index 94bdde2cb7..7dfdec6b88 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -30,6 +30,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use super::service::OnlineStatus; use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch}; +use crate::connectivity_service::interface::SelectedPeer; pub enum WalletConnectivityRequest { ObtainBaseNodeWalletRpcClient(oneshot::Sender>), @@ -39,14 +40,14 @@ pub enum WalletConnectivityRequest { #[derive(Clone)] pub struct WalletConnectivityHandle { sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, } impl WalletConnectivityHandle { pub(super) fn new( sender: mpsc::Sender, - base_node_watch: Watch>, + base_node_watch: Watch>, online_status_rx: watch::Receiver, ) -> Self { Self { @@ -59,16 +60,16 @@ impl WalletConnectivityHandle { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityHandle { - fn set_base_node(&mut self, base_node_peer: Peer) { + fn set_base_node(&mut self, base_node_peer: SelectedPeer) { if let Some(peer) = self.base_node_watch.borrow().as_ref() { - if peer.public_key == base_node_peer.public_key { + if peer.current_peer.public_key == base_node_peer.current_peer.public_key { return; } } self.base_node_watch.send(Some(base_node_peer)); } - fn get_current_base_node_watcher(&self) -> watch::Receiver> { + fn get_current_base_node_watcher(&self) -> watch::Receiver> { self.base_node_watch.get_receiver() } @@ -120,15 +121,19 @@ impl WalletConnectivityInterface for WalletConnectivityHandle { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().clone() + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.clone()) + } + + fn get_backup_base_node_peers(&self) -> Option> { + self.base_node_watch.borrow().as_ref().flat_map(|p| p.backup_peers.clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.public_key.clone()) } fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/interface.rs b/base_layer/wallet/src/connectivity_service/interface.rs index 8a5527ce7c..b4d5b8a62c 100644 --- a/base_layer/wallet/src/connectivity_service/interface.rs +++ b/base_layer/wallet/src/connectivity_service/interface.rs @@ -32,11 +32,21 @@ use tokio::sync::watch; use crate::connectivity_service::OnlineStatus; + +/// The selected peer is a current base node and an optional list of backup peers. +#[derive(Clone)] +pub struct SelectedPeer { + /// The current base node that the wallet is connected to + pub current_peer: Peer, + /// The other base nodes that the wallet can connect to if the selected peer is not available + pub backup_peers: Option>, +} + #[async_trait::async_trait] pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { - fn set_base_node(&mut self, base_node_peer: Peer); + fn set_base_node(&mut self, base_node_peer: SelectedPeer); - fn get_current_base_node_watcher(&self) -> watch::Receiver>; + fn get_current_base_node_watcher(&self) -> watch::Receiver>; /// Obtain a BaseNodeWalletRpcClient. /// @@ -70,6 +80,8 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static { fn get_current_base_node_peer(&self) -> Option; + fn get_backup_base_node_peers(&self) -> Option>; + fn get_current_base_node_peer_public_key(&self) -> Option; fn get_current_base_node_id(&self) -> Option; diff --git a/base_layer/wallet/src/connectivity_service/mock.rs b/base_layer/wallet/src/connectivity_service/mock.rs index 11228b6661..2d6c9525d3 100644 --- a/base_layer/wallet/src/connectivity_service/mock.rs +++ b/base_layer/wallet/src/connectivity_service/mock.rs @@ -32,6 +32,7 @@ use crate::{ connectivity_service::{OnlineStatus, WalletConnectivityInterface}, util::watch::Watch, }; +use crate::connectivity_service::SelectedPeer; pub fn create() -> WalletConnectivityMock { WalletConnectivityMock::new() @@ -40,7 +41,7 @@ pub fn create() -> WalletConnectivityMock { #[derive(Clone)] pub struct WalletConnectivityMock { online_status_watch: Watch, - base_node_watch: Watch>, + base_node_watch: Watch>, base_node_wallet_rpc_client: Watch>>, base_node_sync_rpc_client: Watch>>, } @@ -65,11 +66,11 @@ impl WalletConnectivityMock { self.base_node_sync_rpc_client.send(Some(RpcClientLease::new(client))); } - pub fn notify_base_node_set(&self, base_node_peer: Peer) { + pub fn notify_base_node_set(&self, base_node_peer: SelectedPeer) { self.base_node_watch.send(Some(base_node_peer)); } - pub async fn base_node_changed(&mut self) -> Option { + pub async fn base_node_changed(&mut self) -> Option { self.base_node_watch.changed().await; self.base_node_watch.borrow().as_ref().cloned() } @@ -82,11 +83,11 @@ impl WalletConnectivityMock { #[async_trait::async_trait] impl WalletConnectivityInterface for WalletConnectivityMock { - fn set_base_node(&mut self, base_node_peer: Peer) { + fn set_base_node(&mut self, base_node_peer: SelectedPeer) { self.notify_base_node_set(base_node_peer); } - fn get_current_base_node_watcher(&self) -> Receiver> { + fn get_current_base_node_watcher(&self) -> Receiver> { self.base_node_watch.get_receiver() } @@ -121,15 +122,19 @@ impl WalletConnectivityInterface for WalletConnectivityMock { } fn get_current_base_node_peer(&self) -> Option { - self.base_node_watch.borrow().as_ref().cloned() + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.clone()) + } + + fn get_backup_base_node_peers(&self) -> Option> { + self.base_node_watch.borrow().as_ref().flat_map(|p| p.backup_peers.clone()) } fn get_current_base_node_peer_public_key(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone()) + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.public_key.clone()) } fn get_current_base_node_id(&self) -> Option { - self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) + self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.node_id.clone()) } fn is_base_node_set(&self) -> bool { diff --git a/base_layer/wallet/src/connectivity_service/mod.rs b/base_layer/wallet/src/connectivity_service/mod.rs index 75b99ed330..88c4b23923 100644 --- a/base_layer/wallet/src/connectivity_service/mod.rs +++ b/base_layer/wallet/src/connectivity_service/mod.rs @@ -39,4 +39,4 @@ mod mock; pub use mock::{create as create_wallet_connectivity_mock, WalletConnectivityMock}; mod interface; -pub use interface::WalletConnectivityInterface; +pub use interface::{WalletConnectivityInterface, SelectedPeer}; diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index f880739bfb..fed4a42f93 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -235,8 +235,13 @@ impl WalletConnectivityService { } async fn setup_base_node_connection(&mut self) { + let mut count = 0; self.pools = None; loop { + if count >= 3 { + let base_node_selected = self.connectivity.; + } + count += 1; let node_id = match self.current_base_node() { Some(n) => n, None => { diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index a0da32e40d..992e5b0f6a 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -48,6 +48,7 @@ use crate::{ connectivity_service::{OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface}, util::watch::Watch, }; +use crate::connectivity_service::SelectedPeer; async fn setup() -> ( WalletConnectivityHandle, @@ -87,7 +88,7 @@ async fn it_dials_peer_when_base_node_is_set() { // Set the mock to defer returning a result for the peer connection mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None }); // Wait for connection request mock_state.await_call_count(1).await; @@ -110,7 +111,7 @@ async fn it_resolves_many_pending_rpc_session_requests() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None }); let pending_requests = iter::repeat_with(|| { let mut handle = handle.clone(); @@ -142,7 +143,7 @@ async fn it_changes_to_a_new_base_node() { mock_state.add_active_connection(conn2).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer1.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer1.to_peer(), backup_peers: None }); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer1.node_id()).await; @@ -153,7 +154,7 @@ async fn it_changes_to_a_new_base_node() { assert!(rpc_client.is_connected()); // Initiate a connection to the base node - handle.set_base_node(base_node_peer2.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer2.to_peer(), backup_peers: None }); mock_state.await_call_count(1).await; mock_state.expect_dial_peer(base_node_peer2.node_id()).await; @@ -172,7 +173,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.set_pending_connection(base_node_peer.node_id()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None }); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; @@ -212,7 +213,7 @@ async fn it_gracefully_handles_multiple_connection_failures() { let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await; // Initiate a connection to the base node - handle.set_base_node(base_node_peer.to_peer()); + handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None }); // Now a connection will given to the service mock_state.add_active_connection(conn.clone()).await; diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 2022b97de9..0be77d32fa 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -106,6 +106,7 @@ use crate::{ util::wallet_identity::WalletIdentity, utxo_scanner_service::{handle::UtxoScannerHandle, initializer::UtxoScannerServiceInitializer, RECOVERY_KEY}, }; +use crate::connectivity_service::SelectedPeer; const LOG_TARGET: &str = "wallet"; /// The minimum buffer size for the wallet pubsub_connector channel @@ -381,6 +382,7 @@ where &mut self, public_key: CommsPublicKey, address: Option, + backup_peers: Option>, ) -> Result<(), WalletError> { info!( "Wallet setting base node peer, public key: {}, net address: {:?}.", @@ -415,7 +417,7 @@ where connectivity .add_peer_to_allow_list(current_peer.node_id.clone()) .await?; - self.wallet_connectivity.set_base_node(current_peer); + self.wallet_connectivity.set_base_node(SelectedPeer { current_peer, backup_peers }); } else { let node_id = NodeId::from_key(&public_key); if address.is_none() { @@ -440,7 +442,7 @@ where ); peer_manager.add_peer(peer.clone()).await?; connectivity.add_peer_to_allow_list(peer.node_id.clone()).await?; - self.wallet_connectivity.set_base_node(peer); + self.wallet_connectivity.set_base_node(SelectedPeer { current_peer: peer, backup_peers }); } Ok(()) diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 486d714c72..9c26a0a6d4 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -169,7 +169,7 @@ use tokio::{ task, time::sleep, }; - +use minotari_wallet::connectivity_service::SelectedPeer; use crate::support::{ base_node_service_mock::MockBaseNodeService, comms_and_services::{create_dummy_message, setup_comms_services}, @@ -361,7 +361,7 @@ async fn setup_transaction_service_no_comms( wallet_connectivity_service_mock .set_base_node_wallet_rpc_client(connect_rpc_client(&mut rpc_server_connection).await); - wallet_connectivity_service_mock.set_base_node(node_identity.to_peer()); + wallet_connectivity_service_mock.set_base_node(SelectedPeer { current_peer: node_identity.to_peer(), backup_peers: None }); wallet_connectivity_service_mock.base_node_changed().await; let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap(); diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 355571efe2..7b53ee0723 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -6864,7 +6864,7 @@ pub unsafe extern "C" fn wallet_set_base_node_peer( if let Err(e) = (*wallet) .runtime - .block_on((*wallet).wallet.set_base_node_peer((*public_key).clone(), parsed_addr)) + .block_on((*wallet).wallet.set_base_node_peer((*public_key).clone(), parsed_addr, None)) { error = LibWalletError::from(e).code; ptr::swap(error_out, &mut error as *mut c_int);