Skip to content

Commit

Permalink
feat: improved base node monitoring (#5390)
Browse files Browse the repository at this point in the history
Add the ability to run tokio_console on the console_wallet. 

I found one case where the UI would use a lot of CPU, but I couldn't
find the cause.

Reduced the UI caching time to be more responsive.

Renamed base_node_monitor_refresh_interval to
base_node_monitor_max_refresh_interval and implemented a backoff
strategy.
If a new block is found when querying the base node, immediately query
it again, backing off each time it has the same block. This allows you
to update quickly if the node is syncing. Otherwise, backoff to the max
interval. The default is 90 seconds.
(was previously 3 seconds)

Changed output and transaction manager service to rely on the base node
service to send them an event when a new block is encountered. Both were
doing this check themselves in addition to the check the base node
service was doing.

The base node service also only checked on the height, now the hash is
also checked.

---------

Co-authored-by: Hansie Odendaal <[email protected]>
  • Loading branch information
stringhandler and hansieodendaal authored May 24, 2023
1 parent 8631bc2 commit c704890
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ tari_utilities = "0.4.10"
tari_wallet = { path = "../../base_layer/wallet", features = ["bundled_sqlite"] }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
console-subscriber = "0.1.8"
#tokio = { version = "1.20", features = ["signal", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.23", default-features = false, features = ["signal", "sync"] }
tokio = { version = "1.23", features = ["signal"] }

bitflags = "1.2.1"
chrono = { version = "0.4.19", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct Cli {
pub grpc_address: Option<String>,
#[clap(subcommand)]
pub command2: Option<CliCommands>,
#[clap(long, alias = "profile")]
pub profile_with_tokio_console: bool,
}

impl ConfigOverrideProvider for Cli {
Expand Down
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn run_wallet(shutdown: &mut Shutdown, runtime: Runtime, config: &mut Applic
grpc_enabled: true,
grpc_address: None,
command2: None,
profile_with_tokio_console: false,
};

run_wallet_with_cli(shutdown, runtime, config, cli)
Expand Down
16 changes: 13 additions & 3 deletions applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::process;
use std::{panic, process};

use clap::Parser;
use log::*;
Expand All @@ -47,8 +47,13 @@ mod utils;
mod wallet_modes;

fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();
// Setup a panic hook which prints the default rust panic message but also exits the process. This makes a panic in
// any thread "crash" the system instead of silently continuing.
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |info| {
default_hook(info);
process::exit(1);
}));

match main_inner() {
Ok(_) => process::exit(0),
Expand Down Expand Up @@ -80,6 +85,11 @@ fn main_inner() -> Result<(), ExitError> {
include_str!("../log4rs_sample.yml"),
)?;

if cli.profile_with_tokio_console {
// Uncomment to enable tokio tracing via tokio-console
console_subscriber::init();
}

let mut config = ApplicationConfig::load_from(&cfg)?;

setup_grpc_config(&mut config);
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ struct AppStateConfig {
impl Default for AppStateConfig {
fn default() -> Self {
Self {
cache_update_cooldown: Duration::from_secs(2),
cache_update_cooldown: Duration::from_millis(100),
}
}
}
4 changes: 2 additions & 2 deletions base_layer/wallet/src/base_node_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common::configuration::serializers;
pub struct BaseNodeServiceConfig {
/// The refresh interval
#[serde(with = "serializers::seconds")]
pub base_node_monitor_refresh_interval: Duration,
pub base_node_monitor_max_refresh_interval: Duration,
/// The RPC client pool size
pub base_node_rpc_pool_size: usize,
/// This is the size of the event channel used to communicate base node events to the wallet
Expand All @@ -40,7 +40,7 @@ pub struct BaseNodeServiceConfig {
impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(3),
base_node_monitor_max_refresh_interval: Duration::from_secs(90),
base_node_rpc_pool_size: 10,
event_channel_size: 250,
}
Expand Down
9 changes: 5 additions & 4 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

use std::{fmt, fmt::Formatter, sync::Arc, time::Duration};

use tari_common_types::chain_metadata::ChainMetadata;
use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash};
use tari_service_framework::reply_channel::SenderService;
use tari_utilities::hex::Hex;
use tokio::sync::broadcast;
use tower::Service;

Expand All @@ -46,7 +47,7 @@ pub enum BaseNodeServiceResponse {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
NewBlockDetected(u64),
NewBlockDetected(BlockHash, u64),
}

impl fmt::Display for BaseNodeEvent {
Expand All @@ -55,8 +56,8 @@ impl fmt::Display for BaseNodeEvent {
BaseNodeEvent::BaseNodeStateChanged(state) => {
write!(f, "BaseNodeStateChanged: Synced:{:?}", state.is_synced)
},
BaseNodeEvent::NewBlockDetected(s) => {
write!(f, "NewBlockDetected: {}", s)
BaseNodeEvent::NewBlockDetected(hash, height) => {
write!(f, "NewBlockDetected: {} ({})", height, hash.to_hex())
},
}
}
Expand Down
65 changes: 43 additions & 22 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
cmp,
convert::TryFrom,
future::Future,
sync::Arc,
Expand All @@ -30,8 +31,11 @@ use std::{
use chrono::Utc;
use futures::{future, future::Either};
use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::protocol::rpc::RpcError;
use tari_common_types::{chain_metadata::ChainMetadata, types::BlockHash as BlockHashType};
use tari_comms::{
backoff::{Backoff, ExponentialBackoff},
protocol::rpc::RpcError,
};
use tokio::{sync::RwLock, time};

use crate::{
Expand All @@ -47,7 +51,9 @@ use crate::{
const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";

pub struct BaseNodeMonitor<TBackend, TWalletConnectivity> {
interval: Duration,
max_interval: Duration,
backoff: ExponentialBackoff,
backoff_attempts: usize,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
Expand All @@ -60,14 +66,16 @@ where
TWalletConnectivity: WalletConnectivityInterface,
{
pub fn new(
interval: Duration,
max_interval: Duration,
state: Arc<RwLock<BaseNodeState>>,
db: WalletDatabase<TBackend>,
wallet_connectivity: TWalletConnectivity,
event_publisher: BaseNodeEventSender,
) -> Self {
Self {
interval,
max_interval,
backoff: ExponentialBackoff::default(),
backoff_attempts: 0,
state,
db,
wallet_connectivity,
Expand Down Expand Up @@ -169,14 +177,15 @@ where
let is_synced = tip_info.is_synced;
let height_of_longest_chain = chain_metadata.height_of_longest_chain();

self.update_state(BaseNodeState {
node_id: Some(base_node_id.clone()),
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;
let new_block = self
.update_state(BaseNodeState {
node_id: Some(base_node_id.clone()),
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
})
.await;

debug!(
target: LOG_TARGET,
Expand All @@ -187,9 +196,18 @@ where
latency.as_millis()
);

let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.update_state(Default::default()).await;
// If there's a new block, try again immediately,
if new_block {
self.backoff_attempts = 0;
} else {
self.backoff_attempts += 1;
let delay = time::sleep(
cmp::min(self.max_interval, self.backoff.calculate_backoff(self.backoff_attempts))
.saturating_sub(latency),
);
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.update_state(Default::default()).await;
}
}
}

Expand All @@ -198,24 +216,27 @@ where
Ok(())
}

async fn update_state(&self, new_state: BaseNodeState) {
// returns true if a new block, otherwise false
async fn update_state(&self, new_state: BaseNodeState) -> bool {
let mut lock = self.state.write().await;
let (new_block_detected, height) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) {
let (new_block_detected, height, hash) = match (new_state.chain_metadata.clone(), lock.chain_metadata.clone()) {
(Some(new_metadata), Some(old_metadata)) => (
new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(),
new_metadata.best_block() != old_metadata.best_block(),
new_metadata.height_of_longest_chain(),
*new_metadata.best_block(),
),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()),
(None, _) => (false, 0),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain(), *new_metadata.best_block()),
(None, _) => (false, 0, BlockHashType::default()),
};

if new_block_detected {
self.publish_event(BaseNodeEvent::NewBlockDetected(height));
self.publish_event(BaseNodeEvent::NewBlockDetected(hash, height));
}

*lock = new_state.clone();

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
new_block_detected
}

fn publish_event(&self, event: BaseNodeEvent) {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where T: WalletBackend + 'static

fn spawn_monitor(&self) {
let monitor = BaseNodeMonitor::new(
self.config.base_node_monitor_refresh_interval,
self.config.base_node_monitor_max_refresh_interval,
self.state.clone(),
self.db.clone(),
self.wallet_connectivity.clone(),
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,18 @@ impl WalletConnectivityService {
debug!(
target: LOG_TARGET,
"Dial was cancelled. Retrying after {}s ...",
self.config.base_node_monitor_refresh_interval.as_secs()
self.config.base_node_monitor_max_refresh_interval.as_secs()
);
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
time::sleep(self.config.base_node_monitor_max_refresh_interval).await;
continue;
},
Err(e) => {
warn!(target: LOG_TARGET, "{}", e);
if self.current_base_node().as_ref() == Some(&node_id) {
self.disconnect_base_node(node_id).await;
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
time::sleep(self.config.base_node_monitor_max_refresh_interval).await;
}
continue;
},
Expand Down
26 changes: 12 additions & 14 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,21 +494,19 @@ where

fn handle_base_node_service_event(&mut self, event: Arc<BaseNodeEvent>) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) {
(Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(),
(None, _) => true,
_ => false,
};
if trigger_validation {
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
BaseNodeEvent::BaseNodeStateChanged(_state) => {
trace!(
target: LOG_TARGET,
"Received Base Node State Change but no block changes"
);
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
self.last_seen_tip_height = Some(height);
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
29 changes: 12 additions & 17 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,25 +872,20 @@ where
>,
) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
let trigger_validation = match (self.last_seen_tip_height, state.chain_metadata.clone()) {
(Some(last_seen_tip_height), Some(cm)) => last_seen_tip_height != cm.height_of_longest_chain(),
(None, _) => true,
_ => false,
};
BaseNodeEvent::BaseNodeStateChanged(_state) => {
trace!(target: LOG_TARGET, "Received BaseNodeStateChanged event, but igoring",);
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
let _operation_id = self
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});

if trigger_validation {
let _operation_id = self
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
self.last_seen_tip_height = Some(height);
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/utxo_scanner_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
event = base_node_service_event_stream.recv() => {
match event {
Ok(e) => {
if let BaseNodeEvent::NewBlockDetected(h) = (*e).clone() {
if let BaseNodeEvent::NewBlockDetected(_hash, h) = (*e).clone() {
debug!(target: LOG_TARGET, "New block event received: {}", h);
if local_shutdown.is_triggered() {
debug!(target: LOG_TARGET, "Starting new round of UTXO scanning");
Expand Down
Loading

0 comments on commit c704890

Please sign in to comment.