From c50ad880bcb604d7cf4aa392f18a79948d3a1b0b Mon Sep 17 00:00:00 2001 From: stringhandler Date: Tue, 19 Dec 2023 11:14:38 +0200 Subject: [PATCH 1/7] fix: make tor startup async --- .../minotari_console_wallet/src/init/mod.rs | 21 ++-- applications/minotari_node/src/bootstrap.rs | 9 +- .../src/chat_client/src/networking.rs | 9 +- base_layer/p2p/src/initialization.rs | 14 ++- base_layer/wallet_ffi/src/lib.rs | 28 ++--- comms/core/src/builder/comms_node.rs | 69 ++++++------ .../core/src/tor/hidden_service/controller.rs | 3 + .../transports/hidden_service_transport.rs | 100 ++++++++++++++++++ comms/core/src/transports/mod.rs | 3 + 9 files changed, 184 insertions(+), 72 deletions(-) create mode 100644 comms/core/src/transports/hidden_service_transport.rs diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index d5faff3b2a..636faa2567 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -23,6 +23,7 @@ #![allow(dead_code, unused)] use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; +use std::time::Instant; use log::*; use minotari_app_utilities::identity_management::setup_node_identity; @@ -442,6 +443,8 @@ pub async fn init_wallet( .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Error consensus manager. {}", e)))?; let factories = CryptoFactories::default(); + let now = Instant::now(); + let mut wallet = Wallet::start( wallet_config, config.peer_seeds.clone(), @@ -463,12 +466,18 @@ pub async fn init_wallet( WalletError::CommsInitializationError(cie) => cie.to_exit_error(), e => ExitError::new(ExitCode::WalletError, format!("Error creating Wallet Container: {}", e)), })?; - if let Some(hs) = wallet.comms.hidden_service() { - wallet - .db - .set_tor_identity(hs.tor_identity().clone()) - .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?; - } + // TODO: fix this + // if let Some(hs) = wallet.comms.hidden_service() { + // wallet + // .db + // .set_tor_identity(hs.tor_identity().clone()) + // .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?; + // } + + error!( + target: LOG_TARGET, + "Wallet started in {}ms", now.elapsed().as_millis() + ); if let Some(file_name) = seed_words_file_name { let seed_words = wallet.get_seed_words(&MnemonicLanguage::English)?.join(" "); diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index c2d0ce7753..dce1a5b28c 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -177,10 +177,11 @@ where B: BlockchainBackend + 'static .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; }, }; - if let Some(hs) = comms.hidden_service() { - identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity()) - .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; - } + todo!("Fix this"); + // if let Some(hs) = comms.hidden_service() { + // identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity()) + // .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; + // } handles.register(comms); diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index fa84a20e9f..b42d9a41be 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -121,10 +121,11 @@ pub async fn start( trace!(target: LOG_TARGET, "save chat identity file"); }, }; - if let Some(hs) = comms.hidden_service() { - identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?; - trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity()); - } + todo!("Fix this"); + // if let Some(hs) = comms.hidden_service() { + // identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?; + // trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity()); + // } handles.register(comms); let comms = handles.expect_handle::(); diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index ac9ab9b653..5514e963c6 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -70,6 +70,7 @@ use tari_storage::{ use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tower::ServiceBuilder; +use tari_comms::transports::HiddenServiceTransport; use crate::{ comms_connector::{InboundDomainConnector, PubsubDomainConnector}, @@ -251,20 +252,17 @@ pub async fn spawn_comms_using_transport( let listener_address_override = tor_config.listener_address_override.clone(); let mut hidden_service_ctl = initialize_hidden_service(tor_config)?; // Set the listener address to be the address (usually local) to which tor will forward all traffic - let transport = hidden_service_ctl.initialize_transport().await?; + let instant = Instant::now(); + let transport = HiddenServiceTransport::new(hidden_service_ctl); + error!(target: LOG_TARGET, "TOR transport initialized in {:.0?}", instant.elapsed()); + - info!( - target: LOG_TARGET, - "Tor hidden service initialized. proxied_address = '{:?}', listener_override_address = {:?}", - hidden_service_ctl.proxied_address(), - listener_address_override, - ); comms .with_listener_address( listener_address_override.unwrap_or_else(|| multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]), ) - .with_hidden_service_controller(hidden_service_ctl) + // .with_hidden_service_controller(hidden_service_ctl) .spawn_with_transport(transport) .await? }, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 807c68ecf9..0dd463c0a0 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -5472,11 +5472,11 @@ pub unsafe extern "C" fn wallet_create( match w { Ok(w) => { // lets ensure the wallet tor_id is saved, this could have been changed during wallet startup - if let Some(hs) = w.comms.hidden_service() { - if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) { - warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e); - } - } + // if let Some(hs) = w.comms.hidden_service() { + // if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) { + // warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e); + // } + // } let wallet_address = TariAddress::new(w.comms.node_identity().public_key().clone(), w.network.as_network()); // Start Callback Handler @@ -5512,15 +5512,15 @@ pub unsafe extern "C" fn wallet_create( runtime.spawn(callback_handler.start()); - let mut ts = w.transaction_service.clone(); - runtime.spawn(async move { - if let Err(e) = ts.restart_transaction_protocols().await { - warn!( - target: LOG_TARGET, - "Could not restart transaction negotiation protocols: {:?}", e - ); - } - }); + // let mut ts = w.transaction_service.clone(); + // runtime.spawn(async move { + // if let Err(e) = ts.restart_transaction_protocols().await { + // warn!( + // target: LOG_TARGET, + // "Could not restart transaction negotiation protocols: {:?}", e + // ); + // } + // }); let tari_wallet = TariWallet { wallet: w, diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 649497c2c7..3b2d8e4c6c 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{iter, sync::Arc, time::Duration}; +use std::time::Instant; use log::*; use multiaddr::{multiaddr, Protocol}; @@ -218,28 +219,30 @@ impl UnspawnedCommsNode { node_identity.node_id() ); - let listening_info = connection_manager_requester.wait_until_listening().await?; - + // let instant = Instant::now(); + // + // let listening_info = connection_manager_requester.wait_until_listening().await?; + // error!(target: LOG_TARGET, "Waited for {} to connect", instant.elapsed().as_millis()); // Final setup of the hidden service. - let mut hidden_service = None; - if let Some(mut ctl) = hidden_service_ctl { - // Only set the address to the bind address it is set to TCP port 0 - let mut proxied_addr = ctl.proxied_address(); - if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { - // Remove the TCP port 0 address and replace it with the actual listener port - if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() { - proxied_addr.pop(); - proxied_addr.push(Protocol::Tcp(port)); - ctl.set_proxied_addr(&proxied_addr); - } - } - let hs = ctl.create_hidden_service().await?; - let onion_addr = hs.get_onion_address(); - if !node_identity.public_addresses().contains(&onion_addr) { - node_identity.add_public_address(onion_addr); - } - hidden_service = Some(hs); - } + // let mut hidden_service = None; + // if let Some(mut ctl) = hidden_service_ctl { + // // Only set the address to the bind address it is set to TCP port 0 + // let mut proxied_addr = ctl.proxied_address(); + // if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { + // // Remove the TCP port 0 address and replace it with the actual listener port + // if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() { + // proxied_addr.pop(); + // proxied_addr.push(Protocol::Tcp(port)); + // ctl.set_proxied_addr(&proxied_addr); + // } + // } + // let hs = ctl.create_hidden_service().await?; + // let onion_addr = hs.get_onion_address(); + // if !node_identity.public_addresses().contains(&onion_addr) { + // node_identity.add_public_address(onion_addr); + // } + // hidden_service = Some(hs); + // } info!( target: LOG_TARGET, "Your node's public addresses are '{}'", @@ -266,11 +269,10 @@ impl UnspawnedCommsNode { shutdown_signal, connection_manager_requester, connectivity_requester, - listening_info, node_identity, peer_manager, liveness_watch, - hidden_service, + // hidden_service, complete_signals: ext_context.drain_complete_signals(), }) } @@ -313,11 +315,11 @@ pub struct CommsNode { /// Shared PeerManager instance peer_manager: Arc, /// The bind addresses of the listener(s) - listening_info: ListenerInfo, + // listening_info: ListenerInfo, /// Current liveness status liveness_watch: watch::Receiver, /// `Some` if the comms node is configured to run via a hidden service, otherwise `None` - hidden_service: Option, + //hidden_service: Option, /// The 'reciprocal' shutdown signals for each comms service complete_signals: Vec, } @@ -349,25 +351,20 @@ impl CommsNode { } /// Return the Ip/Tcp address that this node is listening on - pub fn listening_address(&self) -> &Multiaddr { - self.listening_info.bind_address() - } + // pub fn listening_address(&self) -> &Multiaddr { + // self.listening_info.bind_address() + // } /// Return [ListenerInfo] - pub fn listening_info(&self) -> &ListenerInfo { - &self.listening_info - } + // pub fn listening_info(&self) -> &ListenerInfo { + // &self.listening_info + // } /// Returns the current liveness status pub fn liveness_status(&self) -> LivenessStatus { *self.liveness_watch.borrow() } - /// Return the Ip/Tcp address that this node is listening on - pub fn hidden_service(&self) -> Option<&tor::HiddenService> { - self.hidden_service.as_ref() - } - /// Return a handle that is used to call the connectivity service. pub fn connectivity(&self) -> ConnectivityRequester { self.connectivity_requester.clone() diff --git a/comms/core/src/tor/hidden_service/controller.rs b/comms/core/src/tor/hidden_service/controller.rs index a706da54df..d5a60089ca 100644 --- a/comms/core/src/tor/hidden_service/controller.rs +++ b/comms/core/src/tor/hidden_service/controller.rs @@ -124,7 +124,9 @@ impl HiddenServiceController { } pub async fn initialize_transport(&mut self) -> Result { + dbg!("here3"); self.connect_and_auth().await?; + dbg!("here4"); let socks_addr = self.get_socks_address().await?; Ok(SocksTransport::new(SocksConfig { proxy_address: socks_addr, @@ -235,6 +237,7 @@ impl HiddenServiceController { } fn client_mut(&mut self) -> Result<&mut TorControlPortClient, HiddenServiceControllerError> { + dbg!("here5"); self.client .as_mut() .filter(|c| c.is_connected()) diff --git a/comms/core/src/transports/hidden_service_transport.rs b/comms/core/src/transports/hidden_service_transport.rs new file mode 100644 index 0000000000..d5f94400da --- /dev/null +++ b/comms/core/src/transports/hidden_service_transport.rs @@ -0,0 +1,100 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::io; +use std::sync::Arc; +use log::info; +use multiaddr::Multiaddr; +use tokio::sync::RwLock; +use crate::tor::HiddenServiceController; +use crate::transports::{SocksTransport, TcpTransport, Transport}; +use crate::transports::tcp::TcpInbound; + +const LOG_TARGET: &str = "comms::transports::hidden_service_transport"; + +#[derive(thiserror::Error, Debug)] +pub enum HiddenServiceTransportError { + #[error("Tor hidden service transport error: `{0}`")] + HiddenServiceControllerError(#[from] crate::tor::HiddenServiceControllerError), + #[error("Tor hidden service socks error: `{0}`")] + SocksTransportError(#[from] io::Error), + +} + +struct HiddenServiceTransportInner { + socks_transport: Option, + hidden_service_ctl: HiddenServiceController + +} + +#[derive(Clone)] +pub struct HiddenServiceTransport { + inner: Arc> +} + +impl HiddenServiceTransport { + pub fn new(hidden_service_ctl: HiddenServiceController) -> Self { + Self { + inner : Arc::new(RwLock::new(HiddenServiceTransportInner { + socks_transport: None, + hidden_service_ctl + })) + } + } + + async fn ensure_initialized(&self) -> Result<(), io::Error> { + let inner = self.inner.read().await; + if inner.socks_transport.is_none() { + drop(inner); + let mut mut_inner = self.inner.write().await; + if mut_inner.socks_transport.is_none() { + let transport = mut_inner.hidden_service_ctl.initialize_transport().await.expect("TODO NEED TO MAP THESE ERRORS SOMEHOW"); + mut_inner.socks_transport = Some(transport); + } + } + Ok(()) + } +} +#[crate::async_trait] +impl Transport for HiddenServiceTransport { + type Output = ::Output; + type Error = ::Error; + type Listener = ::Listener; + + async fn listen(&self, addr: &Multiaddr) -> Result<(Self::Listener, Multiaddr), Self::Error> { + self.ensure_initialized().await?; + let inner = self.inner.read().await; + + // info!( + // target: LOG_TARGET, + // "Tor hidden service initialized. proxied_address = '{:?}'", + // inner.proxied_address(), + // ); + Ok(inner.socks_transport.as_ref().unwrap().listen(addr).await?) + } + + async fn dial(&self, addr: &Multiaddr) -> Result { + self.ensure_initialized().await?; + let inner = self.inner.read().await; + Ok(inner.socks_transport.as_ref().unwrap().dial(addr).await?) + } +} diff --git a/comms/core/src/transports/mod.rs b/comms/core/src/transports/mod.rs index 45050f540d..6419985302 100644 --- a/comms/core/src/transports/mod.rs +++ b/comms/core/src/transports/mod.rs @@ -48,6 +48,9 @@ mod tcp; pub use tcp::TcpTransport; mod tcp_with_tor; +mod hidden_service_transport; +pub use hidden_service_transport::HiddenServiceTransport; + pub use tcp_with_tor::TcpWithTorTransport; /// Defines an abstraction for implementations that can dial and listen for connections over a provided address. From f40ea907f77e97359844d3fefb60b36428169c2d Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Mon, 22 Jan 2024 17:40:45 +0200 Subject: [PATCH 2/7] first compiling draft --- Cargo.lock | 107 ++++++++++++++---- .../minotari_console_wallet/src/init/mod.rs | 10 +- applications/minotari_node/src/bootstrap.rs | 21 ++-- .../src/chat_client/src/networking.rs | 23 ++-- base_layer/p2p/src/initialization.rs | 24 ++-- base_layer/wallet/src/wallet.rs | 13 ++- comms/core/src/builder/comms_node.rs | 46 +------- .../core/src/tor/hidden_service/controller.rs | 5 +- .../transports/hidden_service_transport.rs | 59 ++++++---- comms/core/src/transports/mod.rs | 3 +- 10 files changed, 177 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6995a920c4..0bf04043d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,9 +95,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" dependencies = [ "anstyle", "anstyle-parse", @@ -115,30 +115,30 @@ checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2138,15 +2138,15 @@ dependencies = [ [[package]] name = "globset" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759c97c1e17c55525b57192c06a267cda0ac5210b222d6b82189a2338fa1c13d" +checksum = "57da3b9b5b85bd66f31093f8c408b90a74431672542466497dcbdfdc02034be1" dependencies = [ "aho-corasick", "bstr", - "fnv", "log", - "regex", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -2482,17 +2482,16 @@ dependencies = [ [[package]] name = "ignore" -version = "0.4.20" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbe7873dab538a9a44ad79ede1faf5f30d49f9a5c883ddbab48bce81b64b7492" +checksum = "b46810df39e66e925525d6e38ce1e7f6e1d208f72dc39757880fcb66e2c58af1" dependencies = [ + "crossbeam-deque", "globset", - "lazy_static", "log", "memchr", - "regex", + "regex-automata 0.4.3", "same-file", - "thread_local", "walkdir", "winapi-util", ] @@ -2592,9 +2591,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "inventory" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0508c56cfe9bfd5dfeb0c22ab9a6abfda2f27bdca422132e494266351ed8d83c" +checksum = "c8573b2b1fb643a372c73b23f4da5f888677feef3305146d68a539250a9bccc7" [[package]] name = "ipnet" @@ -7155,6 +7154,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -7185,6 +7193,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -7197,6 +7220,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -7209,6 +7238,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -7221,6 +7256,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -7233,6 +7274,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -7245,6 +7292,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -7257,6 +7310,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -7269,6 +7328,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.18" diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index 636faa2567..d37acb7763 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -22,8 +22,7 @@ #![allow(dead_code, unused)] -use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; -use std::time::Instant; +use std::{fs, path::PathBuf, str::FromStr, sync::Arc, time::Instant}; use log::*; use minotari_app_utilities::identity_management::setup_node_identity; @@ -466,13 +465,6 @@ pub async fn init_wallet( WalletError::CommsInitializationError(cie) => cie.to_exit_error(), e => ExitError::new(ExitCode::WalletError, format!("Error creating Wallet Container: {}", e)), })?; - // TODO: fix this - // if let Some(hs) = wallet.comms.hidden_service() { - // wallet - // .db - // .set_tor_identity(hs.tor_identity().clone()) - // .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?; - // } error!( target: LOG_TARGET, diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index dce1a5b28c..fdc1d3d6ff 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -164,10 +164,20 @@ where B: BlockchainBackend + 'static let comms = comms.add_protocol_extension(mempool_protocol); let comms = Self::setup_rpc_services(comms, &handles, self.db.into(), &p2p_config); - let comms = initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone()) - .await - .map_err(|e| e.to_exit_error())?; + let comms = if p2p_config.transport.transport_type == TransportType::Tor { + let path = base_node_config.tor_identity_file.clone(); + let after_comms = move |identity| { + let _result = identity_management::save_as_json(&path, &identity); + trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); + }; + initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await + } else { + let after_comms = |_identity| {}; + initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await + }; + + let comms = comms.map_err(|e| e.to_exit_error())?; // Save final node identity after comms has initialized. This is required because the public_address can be // changed by comms during initialization when using tor. match p2p_config.transport.transport_type { @@ -177,11 +187,6 @@ where B: BlockchainBackend + 'static .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; }, }; - todo!("Fix this"); - // if let Some(hs) = comms.hidden_service() { - // identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity()) - // .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; - // } handles.register(comms); diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index b42d9a41be..1193fdee00 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -29,7 +29,7 @@ pub use tari_comms::{ multiaddr::Multiaddr, peer_manager::{NodeIdentity, PeerFeatures}, }; -use tari_comms::{peer_manager::Peer, CommsNode, UnspawnedCommsNode}; +use tari_comms::{peer_manager::Peer, CommsNode, UnspawnedCommsNode}; use tari_contacts::contacts_service::{handle::ContactsServiceHandle, ContactsServiceInitializer}; use tari_p2p::{ comms_connector::pubsub_connector, @@ -109,10 +109,17 @@ pub async fn start( for peer in seed_peers { peer_manager.add_peer(peer).await?; } - - let comms = spawn_comms_using_transport(comms, p2p_config.transport.clone()).await?; - - // Save final node identity after comms has initialized. This is required because the public_address can be + let comms = if p2p_config.transport.transport_type == TransportType::Tor { + let path = config.chat_client.tor_identity_file.clone(); + let after_comms = move |identity| { + let _result = identity_management::save_as_json(&path, &identity); + trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); + }; + spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await? + } else { + let after_comms = |_identity| {}; + spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await? + }; // changed by comms during initialization when using tor. match p2p_config.transport.transport_type { TransportType::Tcp => {}, // Do not overwrite TCP public_address in the base_node_id! @@ -121,11 +128,7 @@ pub async fn start( trace!(target: LOG_TARGET, "save chat identity file"); }, }; - todo!("Fix this"); - // if let Some(hs) = comms.hidden_service() { - // identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?; - // trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity()); - // } + handles.register(comms); let comms = handles.expect_handle::(); diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 5514e963c6..d4cf62ca97 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -51,8 +51,15 @@ use tari_comms::{ ProtocolId, }, tor, - tor::HiddenServiceControllerError, - transports::{predicate::FalsePredicate, MemoryTransport, SocksConfig, SocksTransport, TcpWithTorTransport}, + tor::{HiddenServiceControllerError, TorIdentity}, + transports::{ + predicate::FalsePredicate, + HiddenServiceTransport, + MemoryTransport, + SocksConfig, + SocksTransport, + TcpWithTorTransport, + }, utils::cidr::parse_cidrs, CommsBuilder, CommsBuilderError, @@ -70,7 +77,6 @@ use tari_storage::{ use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tower::ServiceBuilder; -use tari_comms::transports::HiddenServiceTransport; use crate::{ comms_connector::{InboundDomainConnector, PubsubDomainConnector}, @@ -210,9 +216,10 @@ pub async fn initialize_local_test_comms>( Ok((comms, dht, event_sender)) } -pub async fn spawn_comms_using_transport( +pub async fn spawn_comms_using_transport( comms: UnspawnedCommsNode, transport_config: TransportConfig, + after_comms: F, ) -> Result { let comms = match transport_config.transport_type { TransportType::Memory => { @@ -250,19 +257,16 @@ pub async fn spawn_comms_using_transport( let tor_config = transport_config.tor; debug!(target: LOG_TARGET, "Building TOR comms stack ({:?})", tor_config); let listener_address_override = tor_config.listener_address_override.clone(); - let mut hidden_service_ctl = initialize_hidden_service(tor_config)?; + let hidden_service_ctl = initialize_hidden_service(tor_config)?; // Set the listener address to be the address (usually local) to which tor will forward all traffic let instant = Instant::now(); - let transport = HiddenServiceTransport::new(hidden_service_ctl); - error!(target: LOG_TARGET, "TOR transport initialized in {:.0?}", instant.elapsed()); - - + let transport = HiddenServiceTransport::new(hidden_service_ctl, after_comms); + debug!(target: LOG_TARGET, "TOR transport initialized in {:.0?}", instant.elapsed()); comms .with_listener_address( listener_address_override.unwrap_or_else(|| multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]), ) - // .with_hidden_service_controller(hidden_service_ctl) .spawn_with_transport(transport) .await? }, diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index ce2d79f9e1..947937f151 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -36,6 +36,7 @@ use tari_comms::{ multiaddr::Multiaddr, net_address::{MultiaddressesWithStats, PeerAddressSource}, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, + tor::TorIdentity, types::{CommsPublicKey, CommsSecretKey}, CommsNode, NodeIdentity, @@ -72,6 +73,7 @@ use tari_p2p::{ initialization::P2pInitializer, services::liveness::{config::LivenessConfig, LivenessInitializer}, PeerSeedsConfig, + TransportType, }; use tari_script::{one_sided_payment_script, ExecutionStack, TariScript}; use tari_service_framework::StackBuilder; @@ -255,7 +257,16 @@ where let comms = handles .take_handle::() .expect("P2pInitializer was not added to the stack"); - let comms = initialization::spawn_comms_using_transport(comms, config.p2p.transport).await?; + let comms = if config.p2p.transport.transport_type == TransportType::Tor { + let wallet_db = wallet_database.clone(); + let after_comms = move |identity: TorIdentity| { + let _result = wallet_db.set_tor_identity(identity.clone()); + }; + initialization::spawn_comms_using_transport(comms, config.p2p.transport, after_comms).await? + } else { + let after_comms = |_identity| {}; + initialization::spawn_comms_using_transport(comms, config.p2p.transport, after_comms).await? + }; let mut output_manager_handle = handles.expect_handle::(); let key_manager_handle = handles.expect_handle::(); diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 3b2d8e4c6c..92824ee1b8 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -21,10 +21,8 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{iter, sync::Arc, time::Duration}; -use std::time::Instant; use log::*; -use multiaddr::{multiaddr, Protocol}; use tari_shutdown::ShutdownSignal; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -38,7 +36,6 @@ use crate::{ ConnectionManagerEvent, ConnectionManagerRequest, ConnectionManagerRequester, - ListenerInfo, LivenessCheck, LivenessStatus, }, @@ -144,7 +141,7 @@ impl UnspawnedCommsNode { let UnspawnedCommsNode { builder, connection_manager_request_rx, - mut connection_manager_requester, + connection_manager_requester, connectivity_requester, connectivity_rx, node_identity, @@ -156,7 +153,6 @@ impl UnspawnedCommsNode { let CommsBuilder { dial_backoff, - hidden_service_ctl, connection_manager_config, connectivity_config, .. @@ -218,31 +214,6 @@ impl UnspawnedCommsNode { "Your node's network ID is '{}'", node_identity.node_id() ); - - // let instant = Instant::now(); - // - // let listening_info = connection_manager_requester.wait_until_listening().await?; - // error!(target: LOG_TARGET, "Waited for {} to connect", instant.elapsed().as_millis()); - // Final setup of the hidden service. - // let mut hidden_service = None; - // if let Some(mut ctl) = hidden_service_ctl { - // // Only set the address to the bind address it is set to TCP port 0 - // let mut proxied_addr = ctl.proxied_address(); - // if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { - // // Remove the TCP port 0 address and replace it with the actual listener port - // if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() { - // proxied_addr.pop(); - // proxied_addr.push(Protocol::Tcp(port)); - // ctl.set_proxied_addr(&proxied_addr); - // } - // } - // let hs = ctl.create_hidden_service().await?; - // let onion_addr = hs.get_onion_address(); - // if !node_identity.public_addresses().contains(&onion_addr) { - // node_identity.add_public_address(onion_addr); - // } - // hidden_service = Some(hs); - // } info!( target: LOG_TARGET, "Your node's public addresses are '{}'", @@ -272,7 +243,6 @@ impl UnspawnedCommsNode { node_identity, peer_manager, liveness_watch, - // hidden_service, complete_signals: ext_context.drain_complete_signals(), }) } @@ -314,12 +284,8 @@ pub struct CommsNode { node_identity: Arc, /// Shared PeerManager instance peer_manager: Arc, - /// The bind addresses of the listener(s) - // listening_info: ListenerInfo, /// Current liveness status liveness_watch: watch::Receiver, - /// `Some` if the comms node is configured to run via a hidden service, otherwise `None` - //hidden_service: Option, /// The 'reciprocal' shutdown signals for each comms service complete_signals: Vec, } @@ -350,16 +316,6 @@ impl CommsNode { &self.node_identity } - /// Return the Ip/Tcp address that this node is listening on - // pub fn listening_address(&self) -> &Multiaddr { - // self.listening_info.bind_address() - // } - - /// Return [ListenerInfo] - // pub fn listening_info(&self) -> &ListenerInfo { - // &self.listening_info - // } - /// Returns the current liveness status pub fn liveness_status(&self) -> LivenessStatus { *self.liveness_watch.borrow() diff --git a/comms/core/src/tor/hidden_service/controller.rs b/comms/core/src/tor/hidden_service/controller.rs index d5a60089ca..87da0b61df 100644 --- a/comms/core/src/tor/hidden_service/controller.rs +++ b/comms/core/src/tor/hidden_service/controller.rs @@ -83,7 +83,7 @@ pub struct HiddenServiceController { proxied_port_mapping: PortMapping, socks_address_override: Option, socks_auth: socks::Authentication, - identity: Option, + pub identity: Option, hs_flags: HsFlags, is_authenticated: bool, proxy_opts: TorProxyOpts, @@ -124,9 +124,7 @@ impl HiddenServiceController { } pub async fn initialize_transport(&mut self) -> Result { - dbg!("here3"); self.connect_and_auth().await?; - dbg!("here4"); let socks_addr = self.get_socks_address().await?; Ok(SocksTransport::new(SocksConfig { proxy_address: socks_addr, @@ -237,7 +235,6 @@ impl HiddenServiceController { } fn client_mut(&mut self) -> Result<&mut TorControlPortClient, HiddenServiceControllerError> { - dbg!("here5"); self.client .as_mut() .filter(|c| c.is_connected()) diff --git a/comms/core/src/transports/hidden_service_transport.rs b/comms/core/src/transports/hidden_service_transport.rs index d5f94400da..f8736cf591 100644 --- a/comms/core/src/transports/hidden_service_transport.rs +++ b/comms/core/src/transports/hidden_service_transport.rs @@ -20,14 +20,16 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::io; -use std::sync::Arc; -use log::info; +use std::{io, io::ErrorKind, sync::Arc}; + +use log::*; use multiaddr::Multiaddr; use tokio::sync::RwLock; -use crate::tor::HiddenServiceController; -use crate::transports::{SocksTransport, TcpTransport, Transport}; -use crate::transports::tcp::TcpInbound; + +use crate::{ + tor::{HiddenServiceController, TorIdentity}, + transports::{SocksTransport, Transport}, +}; const LOG_TARGET: &str = "comms::transports::hidden_service_transport"; @@ -37,27 +39,27 @@ pub enum HiddenServiceTransportError { HiddenServiceControllerError(#[from] crate::tor::HiddenServiceControllerError), #[error("Tor hidden service socks error: `{0}`")] SocksTransportError(#[from] io::Error), - } struct HiddenServiceTransportInner { socks_transport: Option, - hidden_service_ctl: HiddenServiceController - + hidden_service_ctl: HiddenServiceController, } #[derive(Clone)] -pub struct HiddenServiceTransport { - inner: Arc> +pub struct HiddenServiceTransport { + inner: Arc>, + after_init: F, } -impl HiddenServiceTransport { - pub fn new(hidden_service_ctl: HiddenServiceController) -> Self { +impl HiddenServiceTransport { + pub fn new(hidden_service_ctl: HiddenServiceController, after_init: F) -> Self { Self { - inner : Arc::new(RwLock::new(HiddenServiceTransportInner { + inner: Arc::new(RwLock::new(HiddenServiceTransportInner { socks_transport: None, - hidden_service_ctl - })) + hidden_service_ctl, + })), + after_init, } } @@ -67,7 +69,21 @@ impl HiddenServiceTransport { drop(inner); let mut mut_inner = self.inner.write().await; if mut_inner.socks_transport.is_none() { - let transport = mut_inner.hidden_service_ctl.initialize_transport().await.expect("TODO NEED TO MAP THESE ERRORS SOMEHOW"); + let transport = mut_inner.hidden_service_ctl.initialize_transport().await.map_err(|e| { + error!( + target: LOG_TARGET, + "Error initializing hidden transport service stack{}", + e + ); + io::Error::new(ErrorKind::Other, e.to_string()) + })?; + (self.after_init)( + mut_inner + .hidden_service_ctl + .identity + .clone() + .ok_or(io::Error::new(ErrorKind::Other, "Missing tor identity".to_string()))?, + ); mut_inner.socks_transport = Some(transport); } } @@ -75,20 +91,15 @@ impl HiddenServiceTransport { } } #[crate::async_trait] -impl Transport for HiddenServiceTransport { - type Output = ::Output; +impl Transport for HiddenServiceTransport { type Error = ::Error; type Listener = ::Listener; + type Output = ::Output; async fn listen(&self, addr: &Multiaddr) -> Result<(Self::Listener, Multiaddr), Self::Error> { self.ensure_initialized().await?; let inner = self.inner.read().await; - // info!( - // target: LOG_TARGET, - // "Tor hidden service initialized. proxied_address = '{:?}'", - // inner.proxied_address(), - // ); Ok(inner.socks_transport.as_ref().unwrap().listen(addr).await?) } diff --git a/comms/core/src/transports/mod.rs b/comms/core/src/transports/mod.rs index 6419985302..1c4d40dd1b 100644 --- a/comms/core/src/transports/mod.rs +++ b/comms/core/src/transports/mod.rs @@ -47,10 +47,9 @@ pub use socks::{SocksConfig, SocksTransport}; mod tcp; pub use tcp::TcpTransport; -mod tcp_with_tor; mod hidden_service_transport; +mod tcp_with_tor; pub use hidden_service_transport::HiddenServiceTransport; - pub use tcp_with_tor::TcpWithTorTransport; /// Defines an abstraction for implementations that can dial and listen for connections over a provided address. From 4ce680458b6f122e1a82bb79fa778c4ae08166f2 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Wed, 24 Jan 2024 14:32:23 +0200 Subject: [PATCH 3/7] wip --- .../src/ui/state/app_state.rs | 3 ++- base_layer/wallet/src/wallet.rs | 20 ++++++++++--------- .../core/src/tor/hidden_service/controller.rs | 19 +++++++++++++++++- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index cc2a3b5196..0dd07f1765 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -208,7 +208,8 @@ impl AppState { let current = self.get_selected_base_node(); let list = self.get_base_node_list().clone(); let mut index: usize = list.iter().position(|(_, p)| p == current).unwrap_or_default(); - if !list.is_empty() { + if !list.is_empty() + { if index == list.len() - 1 { index = 0; } else { diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 947937f151..34c30f3ae8 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -259,8 +259,17 @@ where .expect("P2pInitializer was not added to the stack"); let comms = if config.p2p.transport.transport_type == TransportType::Tor { let wallet_db = wallet_database.clone(); + let after_comms = move |identity: TorIdentity| { - let _result = wallet_db.set_tor_identity(identity.clone()); + dbg!(&identity); + let add: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) + .parse() + .expect("Should be able to create memory address"); + let _result = wallet_db.set_tor_identity(identity); + // Persist the comms node address and features after it has been spawned to capture any modifications made + // during comms startup. In the case of a Tor Transport the public address could have been generated + let _result = wallet_db.set_node_address(add + ); }; initialization::spawn_comms_using_transport(comms, config.p2p.transport, after_comms).await? } else { @@ -291,14 +300,7 @@ where e })?; - // Persist the comms node address and features after it has been spawned to capture any modifications made - // during comms startup. In the case of a Tor Transport the public address could have been generated - wallet_database.set_node_address( - comms - .node_identity() - .first_public_address() - .ok_or(WalletError::PublicAddressNotSet)?, - )?; + wallet_database.set_node_features(comms.node_identity().features())?; let identity_sig = comms.node_identity().identity_signature_read().as_ref().cloned(); if let Some(identity_sig) = identity_sig { diff --git a/comms/core/src/tor/hidden_service/controller.rs b/comms/core/src/tor/hidden_service/controller.rs index 87da0b61df..fb974dfc9e 100644 --- a/comms/core/src/tor/hidden_service/controller.rs +++ b/comms/core/src/tor/hidden_service/controller.rs @@ -24,6 +24,7 @@ use std::{fs, io, net::SocketAddr, sync::Arc, time::Duration}; use futures::{future, future::Either, pin_mut, StreamExt}; use log::*; +use multiaddr::{multiaddr, Protocol}; use tari_shutdown::OptionalShutdownSignal; use tari_utilities::hex::Hex; use thiserror::Error; @@ -124,7 +125,23 @@ impl HiddenServiceController { } pub async fn initialize_transport(&mut self) -> Result { - self.connect_and_auth().await?; + dbg!("init"); + let con = self.connect_and_auth().await; + dbg!(con.is_ok()); + con?; + + let socks_addr = self.get_socks_address().await?; + let mut proxied_addr = self.proxied_address(); + if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { + if let Some(Protocol::Tcp(port)) = socks_addr.iter().last(){ + proxied_addr.pop(); + proxied_addr.push(Protocol::Tcp(port)); + } + self.set_proxied_addr(&proxied_addr); + } + dbg!("init_done"); + let result = self.create_hidden_service_from_identity().await; + dbg!(result.is_ok()); let socks_addr = self.get_socks_address().await?; Ok(SocksTransport::new(SocksConfig { proxy_address: socks_addr, From 4be3c1b3f3620d0645d088036d5776c5f74ac43a Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 25 Jan 2024 12:04:39 +0200 Subject: [PATCH 4/7] complete --- .../src/ui/state/app_state.rs | 41 +++++++++++++++++-- .../src/ui/state/wallet_event_monitor.rs | 7 ---- applications/minotari_node/src/bootstrap.rs | 20 +++++++-- .../src/chat_client/src/networking.rs | 11 ++++- base_layer/wallet/src/wallet.rs | 19 +++++---- .../core/src/tor/hidden_service/controller.rs | 11 ++--- 6 files changed, 77 insertions(+), 32 deletions(-) diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index 0dd07f1765..2c1fd54b4a 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -208,8 +208,7 @@ impl AppState { let current = self.get_selected_base_node(); let list = self.get_base_node_list().clone(); let mut index: usize = list.iter().position(|(_, p)| p == current).unwrap_or_default(); - if !list.is_empty() - { + if !list.is_empty() { if index == list.len() - 1 { index = 0; } else { @@ -897,12 +896,12 @@ impl AppStateInner { }); self.data.contacts = ui_contacts; + self.refresh_network_id().await?; self.updated = true; Ok(()) } pub async fn refresh_burnt_proofs_state(&mut self) -> Result<(), UiError> { - // let db_burnt_proofs = self.wallet.db.get_burnt_proofs()?; let db_burnt_proofs = self.wallet.db.fetch_burnt_proofs()?; let mut ui_proofs: Vec = vec![]; @@ -922,7 +921,43 @@ impl AppStateInner { Ok(()) } + pub async fn refresh_network_id(&mut self) -> Result<(), UiError> { + let wallet_id = WalletIdentity::new(self.wallet.comms.node_identity(), self.wallet.network.as_network()); + let eid = wallet_id.address.to_emoji_string(); + let qr_link = format!( + "tari://{}/transactions/send?tariAddress={}", + wallet_id.network, + wallet_id.address.to_hex() + ); + let code = QrCode::new(qr_link).unwrap(); + let image = code + .render::() + .dark_color(unicode::Dense1x2::Dark) + .light_color(unicode::Dense1x2::Light) + .build() + .lines() + .skip(1) + .fold("".to_string(), |acc, l| format!("{}{}\n", acc, l)); + let identity = MyIdentity { + tari_address: wallet_id.address.to_hex(), + network_address: wallet_id + .node_identity + .public_addresses() + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(", "), + emoji_id: eid, + qr_code: image, + node_id: wallet_id.node_identity.node_id().to_string(), + }; + self.data.my_identity = identity; + self.updated = true; + Ok(()) + } + pub async fn refresh_connected_peers_state(&mut self) -> Result<(), UiError> { + self.refresh_network_id().await?; let connections = self.wallet.comms.connectivity().get_active_connections().await?; let peer_manager = self.wallet.comms.peer_manager(); let mut peers = Vec::with_capacity(connections.len()); diff --git a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs index da5a085e23..768d9c3b17 100644 --- a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -74,13 +74,6 @@ impl WalletEventMonitor { let mut base_node_changed = wallet_connectivity.get_current_base_node_watcher(); let mut base_node_events = self.app_state_inner.read().await.get_base_node_event_stream(); - // let mut software_update_notif = self - // .app_state_inner - // .read() - // .await - // .get_software_updater() - // .new_update_notifier() - // .clone(); let mut contacts_liveness_events = self.app_state_inner.read().await.get_contacts_liveness_event_stream(); diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index fdc1d3d6ff..ccd17d44a7 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -28,7 +28,14 @@ use tari_common::{ configuration::bootstrap::ApplicationType, exit_codes::{ExitCode, ExitError}, }; -use tari_comms::{peer_manager::Peer, protocol::rpc::RpcServer, NodeIdentity, UnspawnedCommsNode}; +use tari_comms::{ + multiaddr::Multiaddr, + peer_manager::Peer, + protocol::rpc::RpcServer, + tor::TorIdentity, + NodeIdentity, + UnspawnedCommsNode, +}; use tari_comms_dht::Dht; use tari_core::{ base_node, @@ -167,9 +174,16 @@ where B: BlockchainBackend + 'static let comms = if p2p_config.transport.transport_type == TransportType::Tor { let path = base_node_config.tor_identity_file.clone(); - let after_comms = move |identity| { + let node_id = comms.node_identity().clone(); + let after_comms = move |identity: TorIdentity| { let _result = identity_management::save_as_json(&path, &identity); - trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); + trace!(target: LOG_TARGET, "resave the tor identity {:?}", identity); + let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) + .parse() + .expect("Should be able to create address"); + if !node_id.public_addresses().contains(&address) { + node_id.add_public_address(address.clone()); + } }; initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await } else { diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index 1193fdee00..23e56076f0 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -29,7 +29,7 @@ pub use tari_comms::{ multiaddr::Multiaddr, peer_manager::{NodeIdentity, PeerFeatures}, }; -use tari_comms::{peer_manager::Peer, CommsNode, UnspawnedCommsNode}; +use tari_comms::{peer_manager::Peer, tor::TorIdentity, CommsNode, UnspawnedCommsNode}; use tari_contacts::contacts_service::{handle::ContactsServiceHandle, ContactsServiceInitializer}; use tari_p2p::{ comms_connector::pubsub_connector, @@ -111,9 +111,16 @@ pub async fn start( } let comms = if p2p_config.transport.transport_type == TransportType::Tor { let path = config.chat_client.tor_identity_file.clone(); - let after_comms = move |identity| { + let node_id = comms.node_identity().clone(); + let after_comms = move |identity: TorIdentity| { let _result = identity_management::save_as_json(&path, &identity); + let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) + .parse() + .expect("Should be able to create address"); trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); + if !node_id.public_addresses().contains(&address) { + node_id.add_public_address(address.clone()); + } }; spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await? } else { diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 34c30f3ae8..4b006313da 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -259,17 +259,19 @@ where .expect("P2pInitializer was not added to the stack"); let comms = if config.p2p.transport.transport_type == TransportType::Tor { let wallet_db = wallet_database.clone(); - + let node_id = comms.node_identity().clone(); let after_comms = move |identity: TorIdentity| { - dbg!(&identity); - let add: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) + let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) .parse() - .expect("Should be able to create memory address"); + .expect("Should be able to create address"); let _result = wallet_db.set_tor_identity(identity); - // Persist the comms node address and features after it has been spawned to capture any modifications made - // during comms startup. In the case of a Tor Transport the public address could have been generated - let _result = wallet_db.set_node_address(add - ); + if !node_id.public_addresses().contains(&address) { + node_id.add_public_address(address.clone()); + } + // Persist the comms node address and features after it has been spawned to capture any modifications + // made during comms startup. In the case of a Tor Transport the public address could + // have been generated + let _result = wallet_db.set_node_address(address); }; initialization::spawn_comms_using_transport(comms, config.p2p.transport, after_comms).await? } else { @@ -300,7 +302,6 @@ where e })?; - wallet_database.set_node_features(comms.node_identity().features())?; let identity_sig = comms.node_identity().identity_signature_read().as_ref().cloned(); if let Some(identity_sig) = identity_sig { diff --git a/comms/core/src/tor/hidden_service/controller.rs b/comms/core/src/tor/hidden_service/controller.rs index fb974dfc9e..54dd521b5c 100644 --- a/comms/core/src/tor/hidden_service/controller.rs +++ b/comms/core/src/tor/hidden_service/controller.rs @@ -125,23 +125,18 @@ impl HiddenServiceController { } pub async fn initialize_transport(&mut self) -> Result { - dbg!("init"); - let con = self.connect_and_auth().await; - dbg!(con.is_ok()); - con?; + self.connect_and_auth().await?; let socks_addr = self.get_socks_address().await?; let mut proxied_addr = self.proxied_address(); if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { - if let Some(Protocol::Tcp(port)) = socks_addr.iter().last(){ + if let Some(Protocol::Tcp(port)) = socks_addr.iter().last() { proxied_addr.pop(); proxied_addr.push(Protocol::Tcp(port)); } self.set_proxied_addr(&proxied_addr); } - dbg!("init_done"); - let result = self.create_hidden_service_from_identity().await; - dbg!(result.is_ok()); + self.create_hidden_service_from_identity().await?; let socks_addr = self.get_socks_address().await?; Ok(SocksTransport::new(SocksConfig { proxy_address: socks_addr, From 6feca2e7d91329b278b5873d00dec79735badc49 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 25 Jan 2024 14:23:36 +0200 Subject: [PATCH 5/7] fix tests --- base_layer/core/tests/helpers/nodes.rs | 5 +++-- base_layer/p2p/tests/support/comms_and_services.rs | 5 +++-- base_layer/wallet/tests/support/comms_and_services.rs | 5 +++-- comms/core/examples/stress/service.rs | 3 +-- comms/core/examples/stress_test.rs | 4 ++-- comms/core/examples/tor.rs | 6 ++---- comms/core/src/builder/comms_node.rs | 4 ++++ comms/core/src/builder/tests.rs | 6 +++--- comms/core/tests/tests/rpc.rs | 5 +++-- comms/core/tests/tests/rpc_stress.rs | 5 +++-- comms/core/tests/tests/substream_stress.rs | 6 ++++-- 11 files changed, 31 insertions(+), 23 deletions(-) diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 98702db9d8..5302c51953 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -445,15 +445,16 @@ async fn setup_base_node_services( blockchain_db.clone().into(), base_node_service, )); - let comms = comms + let mut comms = comms .add_protocol_extension(rpc_server) .spawn_with_transport(MemoryTransport) .await .unwrap(); // Set the public address for tests + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 4bd2dca73f..168fa09842 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -51,11 +51,12 @@ pub async fn setup_comms_services( .await .unwrap(); - let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); // Set the public address for tests comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); (comms, dht, messaging_events) } diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index b6c7344f0e..6d9f92d3fc 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -58,11 +58,12 @@ pub async fn setup_comms_services( .await .unwrap(); - let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); // Set the public address for tests comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); (comms, dht) } diff --git a/comms/core/examples/stress/service.rs b/comms/core/examples/stress/service.rs index 7880c07519..2199638f4b 100644 --- a/comms/core/examples/stress/service.rs +++ b/comms/core/examples/stress/service.rs @@ -63,10 +63,9 @@ pub fn start_service( let (request_tx, request_rx) = mpsc::channel(1); println!( - "Node credentials are {}::{:?} (local_listening_addr='{}')", + "Node credentials are {}::{:?})", node_identity.public_key().to_hex(), node_identity.public_addresses(), - comms_node.listening_address(), ); let service = StressTestService::new( diff --git a/comms/core/examples/stress_test.rs b/comms/core/examples/stress_test.rs index a101198b9e..b39cc07d1a 100644 --- a/comms/core/examples/stress_test.rs +++ b/comms/core/examples/stress_test.rs @@ -95,7 +95,7 @@ async fn run() -> Result<(), Error> { temp_dir.as_ref(), public_ip, port, - tor_identity, + tor_identity.clone(), is_tcp, shutdown.to_signal(), ) @@ -105,7 +105,7 @@ async fn run() -> Result<(), Error> { } if !is_tcp { if let Some(tor_identity_path) = tor_identity_path.as_ref() { - save_json(comms_node.hidden_service().unwrap().tor_identity(), tor_identity_path)?; + save_json(&tor_identity.unwrap(), tor_identity_path)?; } } diff --git a/comms/core/examples/tor.rs b/comms/core/examples/tor.rs index ac33ee50c7..cf3b6ef1d9 100644 --- a/comms/core/examples/tor.rs +++ b/comms/core/examples/tor.rs @@ -87,16 +87,14 @@ async fn run() -> Result<(), Error> { println!("Comms nodes started!"); println!( - "Node 1 is '{}' with address '{:?}' (local_listening_addr='{}')", + "Node 1 is '{}' with address '{:?}')", node_identity1.node_id().short_str(), node_identity1.public_addresses(), - comms_node1.listening_address(), ); println!( - "Node 2 is '{}' with address '{:?}' (local_listening_addr='{}')", + "Node 2 is '{}' with address '{:?}')", node_identity2.node_id().short_str(), node_identity2.public_addresses(), - comms_node2.listening_address(), ); // Let's add node 2 as a peer to node 1 diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 92824ee1b8..8885055e33 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -296,6 +296,10 @@ impl CommsNode { self.connection_manager_requester.get_event_subscription() } + pub fn connection_manager_requester(&mut self)-> &mut ConnectionManagerRequester{ + &mut self.connection_manager_requester + } + /// Get a subscription to `ConnectivityEvent`s pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx { self.connectivity_requester.get_event_subscription() diff --git a/comms/core/src/builder/tests.rs b/comms/core/src/builder/tests.rs index a4d8a0ae9c..3808031dbc 100644 --- a/comms/core/src/builder/tests.rs +++ b/comms/core/src/builder/tests.rs @@ -88,7 +88,7 @@ async fn spawn_node( .unwrap(); let (messaging_events_sender, _) = broadcast::channel(100); - let comms_node = comms_node + let mut comms_node = comms_node .add_protocol_extensions(protocols.into()) .add_protocol_extension( MessagingProtocolExtension::new( @@ -107,8 +107,8 @@ async fn spawn_node( .spawn_with_transport(MemoryTransport) .await .unwrap(); - - unpack_enum!(Protocol::Memory(_port) = comms_node.listening_address().iter().next().unwrap()); + let address = comms_node.connection_manager_requester().wait_until_listening().await.unwrap(); + unpack_enum!(Protocol::Memory(_port) = address.bind_address().iter().next().unwrap()); (comms_node, inbound_rx, outbound_tx, messaging_events_sender) } diff --git a/comms/core/tests/tests/rpc.rs b/comms/core/tests/tests/rpc.rs index d97a0596d4..24d08b11d8 100644 --- a/comms/core/tests/tests/rpc.rs +++ b/comms/core/tests/tests/rpc.rs @@ -44,15 +44,16 @@ async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, RpcServerHandle) { .add_service(GreetingServer::new(GreetingService::default())); let rpc_server_hnd = rpc_server.get_handle(); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_rpc_server(rpc_server) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); (comms, rpc_server_hnd) } diff --git a/comms/core/tests/tests/rpc_stress.rs b/comms/core/tests/tests/rpc_stress.rs index 0e27fa38f9..42fe62cd66 100644 --- a/comms/core/tests/tests/rpc_stress.rs +++ b/comms/core/tests/tests/rpc_stress.rs @@ -46,15 +46,16 @@ async fn spawn_node(signal: ShutdownSignal) -> CommsNode { .finish() .add_service(GreetingServer::new(GreetingService::default())); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_rpc_server(rpc_server) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); comms } diff --git a/comms/core/tests/tests/substream_stress.rs b/comms/core/tests/tests/substream_stress.rs index d36a26d673..cdeba323d6 100644 --- a/comms/core/tests/tests/substream_stress.rs +++ b/comms/core/tests/tests/substream_stress.rs @@ -41,15 +41,17 @@ const PROTOCOL_NAME: &[u8] = b"test/dummy/protocol"; pub async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, ProtocolNotificationRx) { let (notif_tx, notif_rx) = mpsc::channel(1); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_protocol(&[ProtocolId::from_static(PROTOCOL_NAME)], ¬if_tx) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); (comms, notif_rx) } From 17e754b4e360f9ea7a645ab6e1b799878cf07062 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 25 Jan 2024 16:08:10 +0200 Subject: [PATCH 6/7] clippy --- applications/minotari_node/src/bootstrap.rs | 5 +++-- base_layer/contacts/src/chat_client/src/networking.rs | 4 ++-- base_layer/core/tests/helpers/nodes.rs | 10 ++++++---- base_layer/p2p/tests/support/comms_and_services.rs | 10 ++++++---- base_layer/wallet/src/wallet.rs | 2 +- base_layer/wallet/tests/support/comms_and_services.rs | 10 ++++++---- comms/core/src/builder/comms_node.rs | 2 +- comms/core/src/builder/tests.rs | 6 +++++- comms/core/tests/tests/rpc.rs | 6 +++++- comms/core/tests/tests/rpc_stress.rs | 6 +++++- comms/core/tests/tests/substream_stress.rs | 7 +++++-- 11 files changed, 45 insertions(+), 23 deletions(-) diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index ccd17d44a7..8d8692332d 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -86,6 +86,7 @@ pub struct BaseNodeBootstrapper<'a, B> { impl BaseNodeBootstrapper<'_, B> where B: BlockchainBackend + 'static { + #[allow(clippy::too_many_lines)] pub async fn bootstrap(self) -> Result { let mut base_node_config = self.app_config.base_node.clone(); let mut p2p_config = self.app_config.base_node.p2p.clone(); @@ -174,7 +175,7 @@ where B: BlockchainBackend + 'static let comms = if p2p_config.transport.transport_type == TransportType::Tor { let path = base_node_config.tor_identity_file.clone(); - let node_id = comms.node_identity().clone(); + let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { let _result = identity_management::save_as_json(&path, &identity); trace!(target: LOG_TARGET, "resave the tor identity {:?}", identity); @@ -182,7 +183,7 @@ where B: BlockchainBackend + 'static .parse() .expect("Should be able to create address"); if !node_id.public_addresses().contains(&address) { - node_id.add_public_address(address.clone()); + node_id.add_public_address(address); } }; initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index 23e56076f0..cf4d5b9483 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -111,7 +111,7 @@ pub async fn start( } let comms = if p2p_config.transport.transport_type == TransportType::Tor { let path = config.chat_client.tor_identity_file.clone(); - let node_id = comms.node_identity().clone(); + let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { let _result = identity_management::save_as_json(&path, &identity); let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) @@ -119,7 +119,7 @@ pub async fn start( .expect("Should be able to create address"); trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); if !node_id.public_addresses().contains(&address) { - node_id.add_public_address(address.clone()); + node_id.add_public_address(address); } }; spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await? diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 5302c51953..2417789169 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -451,10 +451,12 @@ async fn setup_base_node_services( .await .unwrap(); // Set the public address for tests - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); - comms - .node_identity() - .add_public_address(address.bind_address().clone()); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); + comms.node_identity().add_public_address(address.bind_address().clone()); let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 168fa09842..a653cb4f7a 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -52,11 +52,13 @@ pub async fn setup_comms_services( .unwrap(); let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); // Set the public address for tests - comms - .node_identity() - .add_public_address(address.bind_address().clone()); + comms.node_identity().add_public_address(address.bind_address().clone()); (comms, dht, messaging_events) } diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 4b006313da..cf96c8a760 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -259,7 +259,7 @@ where .expect("P2pInitializer was not added to the stack"); let comms = if config.p2p.transport.transport_type == TransportType::Tor { let wallet_db = wallet_database.clone(); - let node_id = comms.node_identity().clone(); + let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) .parse() diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index 6d9f92d3fc..cf53a469a3 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -59,11 +59,13 @@ pub async fn setup_comms_services( .unwrap(); let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); // Set the public address for tests - comms - .node_identity() - .add_public_address(address.bind_address().clone()); + comms.node_identity().add_public_address(address.bind_address().clone()); (comms, dht) } diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 8885055e33..b9bd002a98 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -296,7 +296,7 @@ impl CommsNode { self.connection_manager_requester.get_event_subscription() } - pub fn connection_manager_requester(&mut self)-> &mut ConnectionManagerRequester{ + pub fn connection_manager_requester(&mut self) -> &mut ConnectionManagerRequester { &mut self.connection_manager_requester } diff --git a/comms/core/src/builder/tests.rs b/comms/core/src/builder/tests.rs index 3808031dbc..02626c75e7 100644 --- a/comms/core/src/builder/tests.rs +++ b/comms/core/src/builder/tests.rs @@ -107,7 +107,11 @@ async fn spawn_node( .spawn_with_transport(MemoryTransport) .await .unwrap(); - let address = comms_node.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms_node + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); unpack_enum!(Protocol::Memory(_port) = address.bind_address().iter().next().unwrap()); (comms_node, inbound_rx, outbound_tx, messaging_events_sender) diff --git a/comms/core/tests/tests/rpc.rs b/comms/core/tests/tests/rpc.rs index 24d08b11d8..d4845d226f 100644 --- a/comms/core/tests/tests/rpc.rs +++ b/comms/core/tests/tests/rpc.rs @@ -50,7 +50,11 @@ async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, RpcServerHandle) { .await .unwrap(); - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); comms .node_identity() .set_public_addresses(vec![address.bind_address().clone()]); diff --git a/comms/core/tests/tests/rpc_stress.rs b/comms/core/tests/tests/rpc_stress.rs index 42fe62cd66..9a445e8f14 100644 --- a/comms/core/tests/tests/rpc_stress.rs +++ b/comms/core/tests/tests/rpc_stress.rs @@ -52,7 +52,11 @@ async fn spawn_node(signal: ShutdownSignal) -> CommsNode { .await .unwrap(); - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); comms .node_identity() .set_public_addresses(vec![address.bind_address().clone()]); diff --git a/comms/core/tests/tests/substream_stress.rs b/comms/core/tests/tests/substream_stress.rs index cdeba323d6..488ec9064c 100644 --- a/comms/core/tests/tests/substream_stress.rs +++ b/comms/core/tests/tests/substream_stress.rs @@ -47,8 +47,11 @@ pub async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, ProtocolNotificat .await .unwrap(); - - let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); + let address = comms + .connection_manager_requester() + .wait_until_listening() + .await + .unwrap(); comms .node_identity() .set_public_addresses(vec![address.bind_address().clone()]); From 04a5ed60e35586fe6f2de9b8ea811466d773fbfb Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Mon, 29 Jan 2024 07:59:30 +0200 Subject: [PATCH 7/7] review --- applications/minotari_node/src/bootstrap.rs | 16 +++++++++++----- .../contacts/src/chat_client/src/networking.rs | 18 ++++++++++++------ base_layer/wallet/src/wallet.rs | 16 +++++++++++----- base_layer/wallet_ffi/src/lib.rs | 16 ---------------- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index 8d8692332d..45fa169623 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -29,7 +29,7 @@ use tari_common::{ exit_codes::{ExitCode, ExitError}, }; use tari_comms::{ - multiaddr::Multiaddr, + multiaddr::{Error as MultiaddrError, Multiaddr}, peer_manager::Peer, protocol::rpc::RpcServer, tor::TorIdentity, @@ -177,11 +177,17 @@ where B: BlockchainBackend + 'static let path = base_node_config.tor_identity_file.clone(); let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { - let _result = identity_management::save_as_json(&path, &identity); + let address_string = format!("/onion3/{}:{}", identity.service_id, identity.onion_port); + if let Err(e) = identity_management::save_as_json(&path, &identity) { + error!(target: LOG_TARGET, "Failed to save tor identity{:?}", e); + } trace!(target: LOG_TARGET, "resave the tor identity {:?}", identity); - let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) - .parse() - .expect("Should be able to create address"); + let result: Result = address_string.parse(); + if result.is_err() { + error!(target: LOG_TARGET, "Failed to parse tor identity as multiaddr{:?}", result); + return; + } + let address = result.unwrap(); if !node_id.public_addresses().contains(&address) { node_id.add_public_address(address); } diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index cf4d5b9483..0dc3a0f124 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -22,11 +22,11 @@ use std::{str::FromStr, sync::Arc, time::Duration}; -use log::trace; +use log::{error, trace}; use minotari_app_utilities::{identity_management, identity_management::load_from_json}; // Re-exports pub use tari_comms::{ - multiaddr::Multiaddr, + multiaddr::{Error as MultiaddrError, Multiaddr}, peer_manager::{NodeIdentity, PeerFeatures}, }; use tari_comms::{peer_manager::Peer, tor::TorIdentity, CommsNode, UnspawnedCommsNode}; @@ -113,10 +113,16 @@ pub async fn start( let path = config.chat_client.tor_identity_file.clone(); let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { - let _result = identity_management::save_as_json(&path, &identity); - let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) - .parse() - .expect("Should be able to create address"); + let address_string = format!("/onion3/{}:{}", identity.service_id, identity.onion_port); + if let Err(e) = identity_management::save_as_json(&path, &identity) { + error!(target: LOG_TARGET, "Failed to save tor identity{:?}", e); + } + let result: Result = address_string.parse(); + if result.is_err() { + error!(target: LOG_TARGET, "Failed to parse tor identity as multiaddr{:?}", result); + return; + } + let address = result.unwrap(); trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", identity); if !node_id.public_addresses().contains(&address) { node_id.add_public_address(address); diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index cf96c8a760..fbac21e767 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -33,7 +33,7 @@ use tari_common_types::{ types::{ComAndPubSignature, Commitment, PrivateKey, PublicKey, SignatureWithDomain}, }; use tari_comms::{ - multiaddr::Multiaddr, + multiaddr::{Error as MultiaddrError, Multiaddr}, net_address::{MultiaddressesWithStats, PeerAddressSource}, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, tor::TorIdentity, @@ -261,10 +261,16 @@ where let wallet_db = wallet_database.clone(); let node_id = comms.node_identity(); let after_comms = move |identity: TorIdentity| { - let address: Multiaddr = format!("/onion3/{}:{}", identity.service_id, identity.onion_port) - .parse() - .expect("Should be able to create address"); - let _result = wallet_db.set_tor_identity(identity); + let address_string = format!("/onion3/{}:{}", identity.service_id, identity.onion_port); + if let Err(e) = wallet_db.set_tor_identity(identity) { + error!(target: LOG_TARGET, "Failed to set wallet db tor identity{:?}", e); + } + let result: Result = address_string.parse(); + if result.is_err() { + error!(target: LOG_TARGET, "Failed to parse tor identity as multiaddr{:?}", result); + return; + } + let address = result.unwrap(); if !node_id.public_addresses().contains(&address) { node_id.add_public_address(address.clone()); } diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 0dd463c0a0..47f0b01845 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -5471,12 +5471,6 @@ pub unsafe extern "C" fn wallet_create( match w { Ok(w) => { - // lets ensure the wallet tor_id is saved, this could have been changed during wallet startup - // if let Some(hs) = w.comms.hidden_service() { - // if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) { - // warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e); - // } - // } let wallet_address = TariAddress::new(w.comms.node_identity().public_key().clone(), w.network.as_network()); // Start Callback Handler @@ -5512,16 +5506,6 @@ pub unsafe extern "C" fn wallet_create( runtime.spawn(callback_handler.start()); - // let mut ts = w.transaction_service.clone(); - // runtime.spawn(async move { - // if let Err(e) = ts.restart_transaction_protocols().await { - // warn!( - // target: LOG_TARGET, - // "Could not restart transaction negotiation protocols: {:?}", e - // ); - // } - // }); - let tari_wallet = TariWallet { wallet: w, runtime,