Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
wip
  • Loading branch information
hansieodendaal committed Sep 3, 2024
1 parent 78a4803 commit 9523a45
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ async fn set_base_node_peer(
println!("Setting base node peer...");
println!("{}::{}", public_key, address);
wallet
.set_base_node_peer(public_key.clone(), Some(address.clone()))
.set_base_node_peer(public_key.clone(), Some(address.clone()), None)
.await?;
Ok((public_key, address))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl wallet_server::Wallet for WalletGrpcServer {
println!("{}::{}", public_key, net_address);
let mut wallet = self.wallet.clone();
wallet
.set_base_node_peer(public_key.clone(), Some(net_address.clone()))
.set_base_node_peer(public_key.clone(), Some(net_address.clone()), None)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?;

Expand Down
13 changes: 10 additions & 3 deletions applications/minotari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
use std::{fs, io, path::PathBuf, str::FromStr, sync::Arc, time::Instant};

use crossterm::terminal::{disable_raw_mode, enable_raw_mode, is_raw_mode_enabled};
use digest::crypto_common::rand_core::OsRng;
use log::*;
use rand::prelude::SliceRandom;
use minotari_app_utilities::{consts, identity_management::setup_node_identity};
#[cfg(feature = "ledger")]
use minotari_ledger_wallet_comms::accessor_methods::{ledger_get_public_spend_key, ledger_get_view_key};
Expand Down Expand Up @@ -569,7 +571,7 @@ fn setup_identity_from_db<D: WalletBackend + 'static>(
/// Starts the wallet by setting the base node peer, and restarting the transaction and broadcast protocols.
pub async fn start_wallet(
wallet: &mut WalletSqlite,
base_node: &Peer,
base_nodes: Vec<Peer>,
wallet_mode: &WalletMode,
) -> Result<(), ExitError> {
// Verify ledger build if wallet type is Ledger
Expand Down Expand Up @@ -601,13 +603,18 @@ pub async fn start_wallet(

debug!(target: LOG_TARGET, "Setting base node peer");

let net_address = base_node
if base_nodes.is_empty() {
return Err(ExitError::new(ExitCode::WalletError, "No base nodes configured to connect to"));
}
let selected_base_node = base_nodes.choose(&mut OsRng).expect("base_nodes is not empty");
let net_address = selected_base_node
.addresses
.best()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node has no address!"))?;
let backup_peers = base_nodes.iter().filter(|&v|v != selected_base_node).collect::<Vec<_>>();

wallet
.set_base_node_peer(base_node.public_key.clone(), Some(net_address.address().clone()))
.set_base_node_peer(selected_base_node.public_key.clone(), Some(net_address.address().clone()), Some(base_nodes))
.await
.map_err(|e| {
ExitError::new(
Expand Down
4 changes: 2 additions & 2 deletions applications/minotari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ pub fn run_wallet_with_cli(
// get base node/s
let base_node_config =
runtime.block_on(get_base_node_peer_config(config, &mut wallet, cli.non_interactive_mode))?;
let base_node_selected = base_node_config.get_base_node_peer()?;
let base_nodes_selected = base_node_config.get_base_node_peers()?;

let wallet_mode = wallet_mode(&cli, boot_mode);

// start wallet
runtime.block_on(start_wallet(&mut wallet, &base_node_selected, &wallet_mode))?;
runtime.block_on(start_wallet(&mut wallet, &base_nodes_selected, &wallet_mode))?;

debug!(target: LOG_TARGET, "Starting app");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None
)
.await?;

Expand All @@ -1060,6 +1061,7 @@ impl AppStateInner {
.set_base_node_peer(
peer.public_key.clone(),
Some(peer.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None
)
.await?;

Expand Down Expand Up @@ -1098,6 +1100,7 @@ impl AppStateInner {
.set_base_node_peer(
previous.public_key.clone(),
Some(previous.addresses.best().ok_or(UiError::NoAddress)?.address().clone()),
None
)
.await?;

Expand Down
16 changes: 6 additions & 10 deletions applications/minotari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,19 @@ impl PeerConfig {
/// Get the prioritised base node peer from the PeerConfig.
/// 1. Custom Base Node
/// 2. First configured Base Node Peer
/// 3. Random configured Peer Seed
pub fn get_base_node_peer(&self) -> Result<Peer, ExitError> {
/// 3. All configured Peer Seeds
pub fn get_base_node_peers(&self) -> Result<Vec<Peer>, ExitError> {
if let Some(base_node) = self.base_node_custom.clone() {
Ok(base_node)
Ok(vec![base_node])
} else if !self.base_node_peers.is_empty() {
Ok(self
Ok(vec![self
.base_node_peers
.first()
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Configured base node peer has no address!"))?
.clone())
.clone()])
} else if !self.peer_seeds.is_empty() {
// pick a random peer seed
Ok(self
.peer_seeds
.choose(&mut OsRng)
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Peer seeds was empty."))?
.clone())
.peer_seeds.clone())
} else {
Err(ExitError::new(
ExitCode::ConfigError,
Expand Down
21 changes: 13 additions & 8 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::sync::{mpsc, oneshot, watch};

use super::service::OnlineStatus;
use crate::{connectivity_service::WalletConnectivityInterface, util::watch::Watch};
use crate::connectivity_service::interface::SelectedPeer;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
Expand All @@ -39,14 +40,14 @@ pub enum WalletConnectivityRequest {
#[derive(Clone)]
pub struct WalletConnectivityHandle {
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<SelectedPeer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
}

impl WalletConnectivityHandle {
pub(super) fn new(
sender: mpsc::Sender<WalletConnectivityRequest>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<SelectedPeer>>,
online_status_rx: watch::Receiver<OnlineStatus>,
) -> Self {
Self {
Expand All @@ -59,16 +60,16 @@ impl WalletConnectivityHandle {

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer: Peer) {
fn set_base_node(&mut self, base_node_peer: SelectedPeer) {
if let Some(peer) = self.base_node_watch.borrow().as_ref() {
if peer.public_key == base_node_peer.public_key {
if peer.current_peer.public_key == base_node_peer.current_peer.public_key {
return;
}
}
self.base_node_watch.send(Some(base_node_peer));
}

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>> {
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<SelectedPeer>> {
self.base_node_watch.get_receiver()
}

Expand Down Expand Up @@ -120,15 +121,19 @@ impl WalletConnectivityInterface for WalletConnectivityHandle {
}

fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().clone()
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.clone())
}

fn get_backup_base_node_peers(&self) -> Option<Vec<Peer>> {
self.base_node_watch.borrow().as_ref().flat_map(|p| p.backup_peers.clone())
}

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.node_id.clone())
}

fn is_base_node_set(&self) -> bool {
Expand Down
16 changes: 14 additions & 2 deletions base_layer/wallet/src/connectivity_service/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,21 @@ use tokio::sync::watch;

use crate::connectivity_service::OnlineStatus;


/// The selected peer is a current base node and an optional list of backup peers.
#[derive(Clone)]
pub struct SelectedPeer {
/// The current base node that the wallet is connected to
pub current_peer: Peer,
/// The other base nodes that the wallet can connect to if the selected peer is not available
pub backup_peers: Option<Vec<Peer>>,
}

#[async_trait::async_trait]
pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {
fn set_base_node(&mut self, base_node_peer: Peer);
fn set_base_node(&mut self, base_node_peer: SelectedPeer);

fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<Peer>>;
fn get_current_base_node_watcher(&self) -> watch::Receiver<Option<SelectedPeer>>;

/// Obtain a BaseNodeWalletRpcClient.
///
Expand Down Expand Up @@ -70,6 +80,8 @@ pub trait WalletConnectivityInterface: Clone + Send + Sync + 'static {

fn get_current_base_node_peer(&self) -> Option<Peer>;

fn get_backup_base_node_peers(&self) -> Option<Vec<Peer>>;

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey>;

fn get_current_base_node_id(&self) -> Option<NodeId>;
Expand Down
21 changes: 13 additions & 8 deletions base_layer/wallet/src/connectivity_service/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
connectivity_service::{OnlineStatus, WalletConnectivityInterface},
util::watch::Watch,
};
use crate::connectivity_service::SelectedPeer;

pub fn create() -> WalletConnectivityMock {
WalletConnectivityMock::new()
Expand All @@ -40,7 +41,7 @@ pub fn create() -> WalletConnectivityMock {
#[derive(Clone)]
pub struct WalletConnectivityMock {
online_status_watch: Watch<OnlineStatus>,
base_node_watch: Watch<Option<Peer>>,
base_node_watch: Watch<Option<SelectedPeer>>,
base_node_wallet_rpc_client: Watch<Option<RpcClientLease<BaseNodeWalletRpcClient>>>,
base_node_sync_rpc_client: Watch<Option<RpcClientLease<BaseNodeSyncRpcClient>>>,
}
Expand All @@ -65,11 +66,11 @@ impl WalletConnectivityMock {
self.base_node_sync_rpc_client.send(Some(RpcClientLease::new(client)));
}

pub fn notify_base_node_set(&self, base_node_peer: Peer) {
pub fn notify_base_node_set(&self, base_node_peer: SelectedPeer) {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn base_node_changed(&mut self) -> Option<Peer> {
pub async fn base_node_changed(&mut self) -> Option<SelectedPeer> {
self.base_node_watch.changed().await;
self.base_node_watch.borrow().as_ref().cloned()
}
Expand All @@ -82,11 +83,11 @@ impl WalletConnectivityMock {

#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityMock {
fn set_base_node(&mut self, base_node_peer: Peer) {
fn set_base_node(&mut self, base_node_peer: SelectedPeer) {
self.notify_base_node_set(base_node_peer);
}

fn get_current_base_node_watcher(&self) -> Receiver<Option<Peer>> {
fn get_current_base_node_watcher(&self) -> Receiver<Option<SelectedPeer>> {
self.base_node_watch.get_receiver()
}

Expand Down Expand Up @@ -121,15 +122,19 @@ impl WalletConnectivityInterface for WalletConnectivityMock {
}

fn get_current_base_node_peer(&self) -> Option<Peer> {
self.base_node_watch.borrow().as_ref().cloned()
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.clone())
}

fn get_backup_base_node_peers(&self) -> Option<Vec<Peer>> {
self.base_node_watch.borrow().as_ref().flat_map(|p| p.backup_peers.clone())
}

fn get_current_base_node_peer_public_key(&self) -> Option<CommsPublicKey> {
self.base_node_watch.borrow().as_ref().map(|p| p.public_key.clone())
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.public_key.clone())
}

fn get_current_base_node_id(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
self.base_node_watch.borrow().as_ref().map(|p| p.current_peer.node_id.clone())
}

fn is_base_node_set(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/connectivity_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ mod mock;
pub use mock::{create as create_wallet_connectivity_mock, WalletConnectivityMock};

mod interface;
pub use interface::WalletConnectivityInterface;
pub use interface::{WalletConnectivityInterface, SelectedPeer};
5 changes: 5 additions & 0 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,13 @@ impl WalletConnectivityService {
}

async fn setup_base_node_connection(&mut self) {
let mut count = 0;
self.pools = None;
loop {
if count >= 3 {
let base_node_selected = self.connectivity.;
}
count += 1;
let node_id = match self.current_base_node() {
Some(n) => n,
None => {
Expand Down
13 changes: 7 additions & 6 deletions base_layer/wallet/src/connectivity_service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{
connectivity_service::{OnlineStatus, WalletConnectivityHandle, WalletConnectivityInterface},
util::watch::Watch,
};
use crate::connectivity_service::SelectedPeer;

async fn setup() -> (
WalletConnectivityHandle,
Expand Down Expand Up @@ -87,7 +88,7 @@ async fn it_dials_peer_when_base_node_is_set() {
// Set the mock to defer returning a result for the peer connection
mock_state.set_pending_connection(base_node_peer.node_id()).await;
// Initiate a connection to the base node
handle.set_base_node(base_node_peer.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None });

// Wait for connection request
mock_state.await_call_count(1).await;
Expand All @@ -110,7 +111,7 @@ async fn it_resolves_many_pending_rpc_session_requests() {
mock_state.set_pending_connection(base_node_peer.node_id()).await;

// Initiate a connection to the base node
handle.set_base_node(base_node_peer.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None });

let pending_requests = iter::repeat_with(|| {
let mut handle = handle.clone();
Expand Down Expand Up @@ -142,7 +143,7 @@ async fn it_changes_to_a_new_base_node() {
mock_state.add_active_connection(conn2).await;

// Initiate a connection to the base node
handle.set_base_node(base_node_peer1.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer1.to_peer(), backup_peers: None });

mock_state.await_call_count(1).await;
mock_state.expect_dial_peer(base_node_peer1.node_id()).await;
Expand All @@ -153,7 +154,7 @@ async fn it_changes_to_a_new_base_node() {
assert!(rpc_client.is_connected());

// Initiate a connection to the base node
handle.set_base_node(base_node_peer2.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer2.to_peer(), backup_peers: None });

mock_state.await_call_count(1).await;
mock_state.expect_dial_peer(base_node_peer2.node_id()).await;
Expand All @@ -172,7 +173,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() {
mock_state.set_pending_connection(base_node_peer.node_id()).await;

// Initiate a connection to the base node
handle.set_base_node(base_node_peer.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None });

// Now a connection will given to the service
mock_state.add_active_connection(conn.clone()).await;
Expand Down Expand Up @@ -212,7 +213,7 @@ async fn it_gracefully_handles_multiple_connection_failures() {
let conn = mock_server.create_mockimpl_connection(base_node_peer.to_peer()).await;

// Initiate a connection to the base node
handle.set_base_node(base_node_peer.to_peer());
handle.set_base_node(SelectedPeer { current_peer: base_node_peer.to_peer(), backup_peers: None });

// Now a connection will given to the service
mock_state.add_active_connection(conn.clone()).await;
Expand Down
Loading

0 comments on commit 9523a45

Please sign in to comment.