diff --git a/Cargo.lock b/Cargo.lock index 68e3f0df4b..74d0ece736 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5504,6 +5504,7 @@ dependencies = [ "chrono", "clap 3.2.25", "config", + "console-subscriber", "crossterm 0.25.0", "digest 0.9.0", "futures 0.3.28", diff --git a/applications/tari_console_wallet/Cargo.toml b/applications/tari_console_wallet/Cargo.toml index d1fc2640bb..93550b8545 100644 --- a/applications/tari_console_wallet/Cargo.toml +++ b/applications/tari_console_wallet/Cargo.toml @@ -23,10 +23,10 @@ tari_utilities = "0.4.10" tari_wallet = { path = "../../base_layer/wallet", features = ["bundled_sqlite"] } # Uncomment for tokio tracing via tokio-console (needs "tracing" featurs) -#console-subscriber = "0.1.3" +console-subscriber = "0.1.8" #tokio = { version = "1.20", features = ["signal", "tracing"] } # Uncomment for normal use (non tokio-console tracing) -tokio = { version = "1.23", default-features = false, features = ["signal", "sync"] } +tokio = { version = "1.23", features = ["signal"] } bitflags = "1.2.1" chrono = { version = "0.4.19", default-features = false } diff --git a/applications/tari_console_wallet/src/cli.rs b/applications/tari_console_wallet/src/cli.rs index 775b843112..944b14abbf 100644 --- a/applications/tari_console_wallet/src/cli.rs +++ b/applications/tari_console_wallet/src/cli.rs @@ -85,6 +85,8 @@ pub struct Cli { pub grpc_address: Option, #[clap(subcommand)] pub command2: Option, + #[clap(long, alias = "profile")] + pub profile_with_tokio_console: bool, } impl ConfigOverrideProvider for Cli { diff --git a/applications/tari_console_wallet/src/lib.rs b/applications/tari_console_wallet/src/lib.rs index 912b37c974..abd0814505 100644 --- a/applications/tari_console_wallet/src/lib.rs +++ b/applications/tari_console_wallet/src/lib.rs @@ -96,6 +96,7 @@ pub fn run_wallet(shutdown: &mut Shutdown, runtime: Runtime, config: &mut Applic grpc_enabled: true, grpc_address: None, command2: None, + profile_with_tokio_console: false, }; run_wallet_with_cli(shutdown, runtime, config, cli) diff --git a/applications/tari_console_wallet/src/main.rs b/applications/tari_console_wallet/src/main.rs index 4526bf8e9d..0cce3b9d2b 100644 --- a/applications/tari_console_wallet/src/main.rs +++ b/applications/tari_console_wallet/src/main.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::process; +use std::{panic, process}; use clap::Parser; use log::*; @@ -47,8 +47,13 @@ mod utils; mod wallet_modes; fn main() { - // Uncomment to enable tokio tracing via tokio-console - // console_subscriber::init(); + // Setup a panic hook which prints the default rust panic message but also exits the process. This makes a panic in + // any thread "crash" the system instead of silently continuing. + let default_hook = panic::take_hook(); + panic::set_hook(Box::new(move |info| { + default_hook(info); + process::exit(1); + })); match main_inner() { Ok(_) => process::exit(0), @@ -80,6 +85,11 @@ fn main_inner() -> Result<(), ExitError> { include_str!("../log4rs_sample.yml"), )?; + if cli.profile_with_tokio_console { + // Uncomment to enable tokio tracing via tokio-console + console_subscriber::init(); + } + let mut config = ApplicationConfig::load_from(&cfg)?; setup_grpc_config(&mut config); diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index bbeed83cc2..2728659f0d 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -1313,7 +1313,7 @@ struct AppStateConfig { impl Default for AppStateConfig { fn default() -> Self { Self { - cache_update_cooldown: Duration::from_secs(2), + cache_update_cooldown: Duration::from_millis(100), } } } diff --git a/base_layer/wallet/src/base_node_service/config.rs b/base_layer/wallet/src/base_node_service/config.rs index 2785826065..fb86a26a1a 100644 --- a/base_layer/wallet/src/base_node_service/config.rs +++ b/base_layer/wallet/src/base_node_service/config.rs @@ -30,7 +30,7 @@ use tari_common::configuration::serializers; pub struct BaseNodeServiceConfig { /// The refresh interval #[serde(with = "serializers::seconds")] - pub base_node_monitor_refresh_interval: Duration, + pub base_node_monitor_max_refresh_interval: Duration, /// The RPC client pool size pub base_node_rpc_pool_size: usize, /// This is the size of the event channel used to communicate base node events to the wallet @@ -40,7 +40,7 @@ pub struct BaseNodeServiceConfig { impl Default for BaseNodeServiceConfig { fn default() -> Self { Self { - base_node_monitor_refresh_interval: Duration::from_secs(3), + base_node_monitor_max_refresh_interval: Duration::from_secs(90), base_node_rpc_pool_size: 10, event_channel_size: 250, } diff --git a/base_layer/wallet/src/base_node_service/handle.rs b/base_layer/wallet/src/base_node_service/handle.rs index ecf2e13a2b..21a1481ad0 100644 --- a/base_layer/wallet/src/base_node_service/handle.rs +++ b/base_layer/wallet/src/base_node_service/handle.rs @@ -22,8 +22,9 @@ use std::{fmt, fmt::Formatter, sync::Arc, time::Duration}; -use tari_common_types::chain_metadata::ChainMetadata; +use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash}; use tari_service_framework::reply_channel::SenderService; +use tari_utilities::hex::Hex; use tokio::sync::broadcast; use tower::Service; @@ -46,7 +47,7 @@ pub enum BaseNodeServiceResponse { #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum BaseNodeEvent { BaseNodeStateChanged(BaseNodeState), - NewBlockDetected(u64), + NewBlockDetected(BlockHash, u64), } impl fmt::Display for BaseNodeEvent { @@ -55,8 +56,8 @@ impl fmt::Display for BaseNodeEvent { BaseNodeEvent::BaseNodeStateChanged(state) => { write!(f, "BaseNodeStateChanged: Synced:{:?}", state.is_synced) }, - BaseNodeEvent::NewBlockDetected(s) => { - write!(f, "NewBlockDetected: {}", s) + BaseNodeEvent::NewBlockDetected(hash, height) => { + write!(f, "NewBlockDetected: {} ({})", height, hash.to_hex()) }, } } diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 9b36b555af..6581ab4bf3 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + cmp, convert::TryFrom, future::Future, sync::Arc, @@ -30,8 +31,11 @@ use std::{ use chrono::Utc; use futures::{future, future::Either}; use log::*; -use tari_common_types::chain_metadata::ChainMetadata; -use tari_comms::protocol::rpc::RpcError; +use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash as BlockHashType}; +use tari_comms::{ + backoff::{Backoff, ExponentialBackoff}, + protocol::rpc::RpcError, +}; use tokio::{sync::RwLock, time}; use crate::{ @@ -47,7 +51,9 @@ use crate::{ const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor"; pub struct BaseNodeMonitor { - interval: Duration, + max_interval: Duration, + backoff: ExponentialBackoff, + backoff_attempts: usize, state: Arc>, db: WalletDatabase, wallet_connectivity: TWalletConnectivity, @@ -60,14 +66,16 @@ where TWalletConnectivity: WalletConnectivityInterface, { pub fn new( - interval: Duration, + max_interval: Duration, state: Arc>, db: WalletDatabase, wallet_connectivity: TWalletConnectivity, event_publisher: BaseNodeEventSender, ) -> Self { Self { - interval, + max_interval, + backoff: ExponentialBackoff::default(), + backoff_attempts: 0, state, db, wallet_connectivity, @@ -169,14 +177,15 @@ where let is_synced = tip_info.is_synced; let height_of_longest_chain = chain_metadata.height_of_longest_chain(); - self.update_state(BaseNodeState { - node_id: Some(base_node_id.clone()), - chain_metadata: Some(chain_metadata), - is_synced: Some(is_synced), - updated: Some(Utc::now().naive_utc()), - latency: Some(latency), - }) - .await; + let new_block = self + .update_state(BaseNodeState { + node_id: Some(base_node_id.clone()), + chain_metadata: Some(chain_metadata), + is_synced: Some(is_synced), + updated: Some(Utc::now().naive_utc()), + latency: Some(latency), + }) + .await; debug!( target: LOG_TARGET, @@ -187,9 +196,18 @@ where latency.as_millis() ); - let delay = time::sleep(self.interval.saturating_sub(latency)); - if interrupt(base_node_watch.changed(), delay).await.is_none() { - self.update_state(Default::default()).await; + // If there's a new block, try again immediately, + if new_block { + self.backoff_attempts = 0; + } else { + self.backoff_attempts += 1; + let delay = time::sleep( + cmp::min(self.max_interval, self.backoff.calculate_backoff(self.backoff_attempts)) + .saturating_sub(latency), + ); + if interrupt(base_node_watch.changed(), delay).await.is_none() { + self.update_state(Default::default()).await; + } } } @@ -198,24 +216,27 @@ where Ok(()) } - async fn update_state(&self, new_state: BaseNodeState) { + // returns true if a new block, otherwise false + async fn update_state(&self, new_state: BaseNodeState) -> bool { let mut lock = self.state.write().await; - let (new_block_detected, height) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) { + let (new_block_detected, height, hash) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) { (Some(new_metadata), Some(old_metadata)) => ( - new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(), + new_metadata.best_block() != old_metadata.best_block(), new_metadata.height_of_longest_chain(), + *new_metadata.best_block(), ), - (Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()), - (None, _) => (false, 0), + (Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain(), *new_metadata.best_block()), + (None, _) => (false, 0, BlockHashType::default()), }; if new_block_detected { - self.publish_event(BaseNodeEvent::NewBlockDetected(height)); + self.publish_event(BaseNodeEvent::NewBlockDetected(hash, height)); } *lock = new_state.clone(); self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state)); + new_block_detected } fn publish_event(&self, event: BaseNodeEvent) { diff --git a/base_layer/wallet/src/base_node_service/service.rs b/base_layer/wallet/src/base_node_service/service.rs index 67c2aec2f2..beb52c877c 100644 --- a/base_layer/wallet/src/base_node_service/service.rs +++ b/base_layer/wallet/src/base_node_service/service.rs @@ -127,7 +127,7 @@ where T: WalletBackend + 'static fn spawn_monitor(&self) { let monitor = BaseNodeMonitor::new( - self.config.base_node_monitor_refresh_interval, + self.config.base_node_monitor_max_refresh_interval, self.state.clone(), self.db.clone(), self.wallet_connectivity.clone(), diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index e486b28d60..f7e380113f 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -267,10 +267,10 @@ impl WalletConnectivityService { debug!( target: LOG_TARGET, "Dial was cancelled. Retrying after {}s ...", - self.config.base_node_monitor_refresh_interval.as_secs() + self.config.base_node_monitor_max_refresh_interval.as_secs() ); self.set_online_status(OnlineStatus::Offline); - time::sleep(self.config.base_node_monitor_refresh_interval).await; + time::sleep(self.config.base_node_monitor_max_refresh_interval).await; continue; }, Err(e) => { @@ -278,7 +278,7 @@ impl WalletConnectivityService { if self.current_base_node().as_ref() == Some(&node_id) { self.disconnect_base_node(node_id).await; self.set_online_status(OnlineStatus::Offline); - time::sleep(self.config.base_node_monitor_refresh_interval).await; + time::sleep(self.config.base_node_monitor_max_refresh_interval).await; } continue; }, diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 8abcfcc0bc..14352edd7b 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -494,21 +494,19 @@ where fn handle_base_node_service_event(&mut self, event: Arc) { match (*event).clone() { - BaseNodeEvent::BaseNodeStateChanged(state) => { - let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) { - (Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(), - (None, _) => true, - _ => false, - }; - if trigger_validation { - let _id = self.validate_outputs().map_err(|e| { - warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); - e - }); - } - self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); + BaseNodeEvent::BaseNodeStateChanged(_state) => { + trace!( + target: LOG_TARGET, + "Received Base Node State Change but no block changes" + ); + }, + BaseNodeEvent::NewBlockDetected(_hash, height) => { + self.last_seen_tip_height = Some(height); + let _id = self.validate_outputs().map_err(|e| { + warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); + e + }); }, - BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 6b55796121..0abc99e924 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -872,25 +872,20 @@ where >, ) { match (*event).clone() { - BaseNodeEvent::BaseNodeStateChanged(state) => { - let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) { - (Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(), - (None, _) => true, - _ => false, - }; + BaseNodeEvent::BaseNodeStateChanged(_state) => { + trace!(target: LOG_TARGET, "Received BaseNodeStateChanged event, but igoring",); + }, + BaseNodeEvent::NewBlockDetected(_hash, height) => { + let _operation_id = self + .start_transaction_validation_protocol(transaction_validation_join_handles) + .await + .map_err(|e| { + warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); + e + }); - if trigger_validation { - let _operation_id = self - .start_transaction_validation_protocol(transaction_validation_join_handles) - .await - .map_err(|e| { - warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); - e - }); - } - self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); + self.last_seen_tip_height = Some(height); }, - BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs index a119855e74..65be7678f1 100644 --- a/base_layer/wallet/src/utxo_scanner_service/service.rs +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -148,7 +148,7 @@ where event = base_node_service_event_stream.recv() => { match event { Ok(e) => { - if let BaseNodeEvent::NewBlockDetected(h) = (*e).clone() { + if let BaseNodeEvent::NewBlockDetected(_hash, h) = (*e).clone() { debug!(target: LOG_TARGET, "New block event received: {}", h); if local_shutdown.is_triggered() { debug!(target: LOG_TARGET, "Starting new round of UTXO scanning"); diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index 72b52c01ee..a678fbd25f 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -73,10 +73,7 @@ use tari_service_framework::reply_channel; use tari_shutdown::Shutdown; use tari_utilities::Hidden; use tari_wallet::{ - base_node_service::{ - handle::{BaseNodeEvent, BaseNodeServiceHandle}, - service::BaseNodeState, - }, + base_node_service::handle::{BaseNodeEvent, BaseNodeServiceHandle}, connectivity_service::{create_wallet_connectivity_mock, WalletConnectivityMock}, output_manager_service::{ config::OutputManagerServiceConfig, @@ -1787,7 +1784,10 @@ async fn test_txo_validation() { // Trigger validation through a base_node_service event oms.node_event - .send(Arc::new(BaseNodeEvent::BaseNodeStateChanged(BaseNodeState::default()))) + .send(Arc::new(BaseNodeEvent::NewBlockDetected( + (*block5_header_reorg.hash()).into(), + 5, + ))) .unwrap(); let _result = oms diff --git a/base_layer/wallet/tests/utxo_scanner/mod.rs b/base_layer/wallet/tests/utxo_scanner/mod.rs index ddb8dab377..910274c2ee 100644 --- a/base_layer/wallet/tests/utxo_scanner/mod.rs +++ b/base_layer/wallet/tests/utxo_scanner/mod.rs @@ -971,14 +971,17 @@ async fn test_utxo_scanner_one_sided_payments() { }; test_interface.rpc_service_state.set_tip_info_response(TipInfoResponse { - metadata: Some(chain_metadata), + metadata: Some(chain_metadata.clone()), is_synced: true, }); time::sleep(Duration::from_secs(5)).await; test_interface .base_node_service_event_publisher - .send(Arc::new(BaseNodeEvent::NewBlockDetected(11))) + .send(Arc::new(BaseNodeEvent::NewBlockDetected( + chain_metadata.best_block.as_ref().cloned().unwrap().try_into().unwrap(), + 11, + ))) .unwrap(); let delay = time::sleep(Duration::from_secs(60)); diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index ac1eb1b9a4..166b965193 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -362,7 +362,7 @@ where TBackend: TransactionBackend + 'static self.base_node_state_changed(state); }, - BaseNodeEvent::NewBlockDetected(_new_block_number) => { + BaseNodeEvent::NewBlockDetected(_hash, _new_block_number) => { // }, } diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 97268a8ceb..de7e0f1c7f 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -136,6 +136,7 @@ use tari_utilities::{ SafePassword, }; use tari_wallet::{ + base_node_service::config::BaseNodeServiceConfig, connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface}, error::{WalletError, WalletStorageError}, output_manager_service::{ @@ -5394,6 +5395,7 @@ pub unsafe extern "C" fn wallet_create( direct_send_timeout: (*config).dht.discovery_request_timeout, ..Default::default() }, + base_node_service_config: BaseNodeServiceConfig { ..Default::default() }, network, ..Default::default() }; diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index c6d58187b7..4d7bc999ad 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -154,7 +154,7 @@ event_channel_size = 3500 [wallet.base_node] # Configuration for the wallet's base node service # The refresh interval (default = 3 s) -#base_node_monitor_refresh_interval = 3 +#base_node_monitor_max_refresh_interval = 3 # The RPC client pool size (default = 5) #base_node_rpc_pool_size = 5 # This is the size of the event channel used to communicate base node events to the wallet. (default = 250). diff --git a/integration_tests/src/ffi/wallet.rs b/integration_tests/src/ffi/wallet.rs index 413a478324..eb0066280b 100644 --- a/integration_tests/src/ffi/wallet.rs +++ b/integration_tests/src/ffi/wallet.rs @@ -139,7 +139,6 @@ extern "C" fn callback_connectivity_status(status: u64) { extern "C" fn callback_base_node_state(state: *mut TariBaseNodeState) { let callbacks = Callbacks::instance(); callbacks.on_basenode_state_update(state); - // println!("callback_base_node_state"); } #[derive(Default, Debug)] diff --git a/integration_tests/src/wallet_process.rs b/integration_tests/src/wallet_process.rs index 04843c0576..6920c3e341 100644 --- a/integration_tests/src/wallet_process.rs +++ b/integration_tests/src/wallet_process.rs @@ -131,7 +131,7 @@ pub async fn spawn_wallet( wallet_app_config .wallet .base_node_service_config - .base_node_monitor_refresh_interval = Duration::from_secs(15); + .base_node_monitor_max_refresh_interval = Duration::from_secs(15); wallet_app_config.wallet.p2p.transport.transport_type = TransportType::Tcp; wallet_app_config.wallet.p2p.transport.tcp.listener_address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap(); @@ -222,6 +222,7 @@ pub fn get_default_cli() -> Cli { grpc_enabled: true, grpc_address: None, command2: None, + profile_with_tokio_console: false, } }