From d5b84a012dbda4332cd601abf08630cf1a9ee8d3 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 19 May 2020 11:01:06 +0200 Subject: [PATCH] Adds a channel to the base_node parser to listen for state machine feedback Changed listing state display to be more clear --- applications/tari_base_node/src/builder.rs | 19 +++++++-- applications/tari_base_node/src/parser.rs | 39 +++++++++++++++---- .../core/src/base_node/states/block_sync.rs | 3 +- .../core/src/base_node/states/listening.rs | 3 +- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/applications/tari_base_node/src/builder.rs b/applications/tari_base_node/src/builder.rs index c8dfaa18a8..3f9aaa2457 100644 --- a/applications/tari_base_node/src/builder.rs +++ b/applications/tari_base_node/src/builder.rs @@ -33,6 +33,7 @@ use std::{ }, time::Duration, }; +use tari_broadcast_channel::Subscriber; use tari_common::{CommsTransport, DatabaseType, GlobalConfig, Network, SocksAuthentication, TorControlAuthentication}; use tari_comms::{ multiaddr::{Multiaddr, Protocol}, @@ -51,6 +52,7 @@ use tari_core::{ base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, service::{BaseNodeServiceConfig, BaseNodeServiceInitializer}, + states::StatusInfo, BaseNodeStateMachine, BaseNodeStateMachineConfig, LocalNodeCommsInterface, @@ -200,6 +202,10 @@ impl NodeContainer { using_backend!(self, ctx, ctx.wallet_transaction_service()) } + pub fn get_state_machine_info_channel(&self) -> Subscriber { + using_backend!(self, ctx, ctx.get_status_event_stream()) + } + async fn run_impl(mut ctx: BaseNodeContext, rt: runtime::Handle) { info!(target: LOG_TARGET, "Tari base node has STARTED"); let mut wallet_output_handle = ctx.output_manager(); @@ -219,7 +225,7 @@ impl NodeContainer { let mut oms_handle_clone = wallet_output_handle.clone(); tokio::spawn(async move { delay_for(Duration::from_secs(240)).await; - oms_handle_clone.sync_with_base_node().await; + let _ = oms_handle_clone.sync_with_base_node().await; }); }, Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e), @@ -261,7 +267,9 @@ pub struct BaseNodeContext { pub miner_hashrate: Arc, } -impl BaseNodeContext { +impl BaseNodeContext +where B: 'static +{ /// Returns a handle to the Output Manager pub fn output_manager(&self) -> OutputManagerHandle { self.wallet_handles @@ -283,12 +291,17 @@ impl BaseNodeContext { .expect("Could not get local mempool interface handle") } - /// Return the handle to the Transaciton Service + /// Return the handle to the Transaction Service pub fn wallet_transaction_service(&self) -> TransactionServiceHandle { self.wallet_handles .get_handle::() .expect("Could not get wallet transaction service handle") } + + // /// Return the state machine channel to provide info updates + pub fn get_status_event_stream(&self) -> Subscriber { + self.node.get_status_event_stream() + } } /// Tries to construct a node identity by loading the secret key and other metadata from disk and calculating the diff --git a/applications/tari_base_node/src/parser.rs b/applications/tari_base_node/src/parser.rs index 416619b86c..5839a5e03e 100644 --- a/applications/tari_base_node/src/parser.rs +++ b/applications/tari_base_node/src/parser.rs @@ -29,6 +29,7 @@ use crate::{ }; use chrono::Utc; use chrono_english::{parse_date_string, Dialect}; +use futures::StreamExt; use log::*; use qrcode::{render::unicode, QrCode}; use regex::Regex; @@ -41,10 +42,9 @@ use rustyline::{ }; use rustyline_derive::{Helper, Highlighter, Validator}; use std::{ - error::Error, io::{self, Write}, str::FromStr, - string::{ParseError, ToString}, + string::ToString, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -53,6 +53,7 @@ use std::{ }; use strum::IntoEnumIterator; use strum_macros::{Display, EnumIter, EnumString}; +use tari_broadcast_channel::Subscriber; use tari_common::GlobalConfig; use tari_comms::{ connection_manager::ConnectionManagerRequester, @@ -62,7 +63,7 @@ use tari_comms::{ }; use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester}; use tari_core::{ - base_node::LocalNodeCommsInterface, + base_node::{states::StatusInfo, LocalNodeCommsInterface}, blocks::BlockHeader, mempool::service::LocalMempoolService, tari_utilities::{hex::Hex, Hashable}, @@ -109,6 +110,7 @@ pub enum BaseNodeCommand { GetMiningState, MakeItRain, CoinSplit, + GetStateInfo, Quit, Exit, } @@ -132,6 +134,7 @@ pub struct Parser { enable_miner: Arc, miner_hashrate: Arc, miner_thread_count: u64, + state_machine_info: Subscriber, } const MAKE_IT_RAIN_USAGE: &str = "\nmake-it-rain [Txs/s] [duration (s)] [start amount (uT)] [increment (uT)/Tx] \ @@ -187,6 +190,7 @@ impl Parser { enable_miner: ctx.miner_enabled(), miner_hashrate: ctx.miner_hashrate(), miner_thread_count: config.num_mining_threads as u64, + state_machine_info: ctx.get_state_machine_info_channel(), } } @@ -236,6 +240,9 @@ impl Parser { Help => { self.print_help(args); }, + GetStateInfo => { + self.process_state_info(); + }, GetBalance => { self.process_get_balance(); }, @@ -329,6 +336,9 @@ impl Parser { let joined = self.commands.join(", "); println!("{}", joined); }, + GetStateInfo => { + println!("Prints out the status of the base_node state machine"); + }, GetBalance => { println!("Gets your balance"); }, @@ -433,6 +443,21 @@ impl Parser { }); } + /// Function to process the ge-state-info command + fn process_state_info(&mut self) { + let mut channel = self.state_machine_info.clone(); + self.executor.spawn(async move { + match channel.next().await { + None => { + println!("Something went wrong"); + warn!(target: LOG_TARGET, "Error communicating with state machine"); + return; + }, + Some(data) => println!("Current state machine state:\n{}", data), + }; + }); + } + /// Function to process the list utxos command fn process_list_unspent_outputs(&mut self) { let mut handler1 = self.node_service.clone(); @@ -1055,7 +1080,7 @@ impl Parser { } /// Helper function to convert an array from command_arg to a Vec of header heights - async fn cmd_arg_to_header_heights(mut handler: LocalNodeCommsInterface, command_arg: Vec) -> Vec { + async fn cmd_arg_to_header_heights(handler: LocalNodeCommsInterface, command_arg: Vec) -> Vec { let height_ranges: Result, _> = command_arg.iter().map(|v| u64::from_str(v)).collect(); match height_ranges { Ok(height_ranges) => { @@ -1068,14 +1093,14 @@ impl Parser { Ok(heights) => heights, Err(_) => { println!("Error communicating with comm interface"); - return Vec::new(); + Vec::new() }, } } }, - Err(e) => { + Err(_e) => { println!("Invalid number provided"); - return Vec::new(); + Vec::new() }, } } diff --git a/base_layer/core/src/base_node/states/block_sync.rs b/base_layer/core/src/base_node/states/block_sync.rs index 9942bbc6e4..b8ad9bca1a 100644 --- a/base_layer/core/src/base_node/states/block_sync.rs +++ b/base_layer/core/src/base_node/states/block_sync.rs @@ -107,7 +107,7 @@ impl Display for BlockSyncInfo { fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { let local_height = self.local_height.unwrap_or(0); let tip_height = self.tip_height.unwrap_or(0); - fmt.write_str("Syncing from: \n")?; + fmt.write_str("Syncing from the following peers: \n")?; for peer in &self.sync_peers { fmt.write_str(&format!("{}\n", peer))?; } @@ -161,6 +161,7 @@ impl BlockSyncStrategy { ) -> StateEvent { shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None)); + let _ = shared.status_event_publisher.send(shared.info.clone()).await; let _ = shared.status_event_publisher.send(shared.info.clone()).await; match self { diff --git a/base_layer/core/src/base_node/states/listening.rs b/base_layer/core/src/base_node/states/listening.rs index 7ea0be48e2..ad4174493d 100644 --- a/base_layer/core/src/base_node/states/listening.rs +++ b/base_layer/core/src/base_node/states/listening.rs @@ -29,7 +29,7 @@ use crate::{ chain_storage::{BlockchainBackend, ChainMetadata}, proof_of_work::Difficulty, }; -use futures::stream::StreamExt; +use futures::{stream::StreamExt, SinkExt}; use log::*; use std::fmt::{Display, Formatter}; use tari_comms::peer_manager::NodeId; @@ -64,6 +64,7 @@ impl ListeningData { pub async fn next_event(&mut self, shared: &mut BaseNodeStateMachine) -> StateEvent { info!(target: LOG_TARGET, "Listening for chain metadata updates"); shared.info = StatusInfo::Listening(ListeningInfo::new()); + let _ = shared.status_event_publisher.send(shared.info.clone()).await; while let Some(metadata_event) = shared.metadata_event_stream.next().await { match &*metadata_event { ChainMetadataEvent::PeerChainMetadataReceived(ref peer_metadata_list) => {