From 90c1c18905339b8ba3e3c92db6cb0b43bb401bd2 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Wed, 20 May 2020 10:36:53 +0200 Subject: [PATCH] Adds a channel to the base_node parser to listen for state machine feedback Changed listing state display to be more clear Co-authored-by: Stan Bondi --- applications/tari_base_node/src/builder.rs | 19 +++++++-- applications/tari_base_node/src/parser.rs | 41 +++++++++++++++---- .../core/src/base_node/state_machine.rs | 7 +++- .../core/src/base_node/states/block_sync.rs | 17 ++++---- .../core/src/base_node/states/listening.rs | 6 ++- 5 files changed, 69 insertions(+), 21 deletions(-) diff --git a/applications/tari_base_node/src/builder.rs b/applications/tari_base_node/src/builder.rs index c8dfaa18a8c..3f9aaa24574 100644 --- a/applications/tari_base_node/src/builder.rs +++ b/applications/tari_base_node/src/builder.rs @@ -33,6 +33,7 @@ use std::{ }, time::Duration, }; +use tari_broadcast_channel::Subscriber; use tari_common::{CommsTransport, DatabaseType, GlobalConfig, Network, SocksAuthentication, TorControlAuthentication}; use tari_comms::{ multiaddr::{Multiaddr, Protocol}, @@ -51,6 +52,7 @@ use tari_core::{ base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, service::{BaseNodeServiceConfig, BaseNodeServiceInitializer}, + states::StatusInfo, BaseNodeStateMachine, BaseNodeStateMachineConfig, LocalNodeCommsInterface, @@ -200,6 +202,10 @@ impl NodeContainer { using_backend!(self, ctx, ctx.wallet_transaction_service()) } + pub fn get_state_machine_info_channel(&self) -> Subscriber { + using_backend!(self, ctx, ctx.get_status_event_stream()) + } + async fn run_impl(mut ctx: BaseNodeContext, rt: runtime::Handle) { info!(target: LOG_TARGET, "Tari base node has STARTED"); let mut wallet_output_handle = ctx.output_manager(); @@ -219,7 +225,7 @@ impl NodeContainer { let mut oms_handle_clone = wallet_output_handle.clone(); tokio::spawn(async move { delay_for(Duration::from_secs(240)).await; - oms_handle_clone.sync_with_base_node().await; + let _ = oms_handle_clone.sync_with_base_node().await; }); }, Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e), @@ -261,7 +267,9 @@ pub struct BaseNodeContext { pub miner_hashrate: Arc, } -impl BaseNodeContext { +impl BaseNodeContext +where B: 'static +{ /// Returns a handle to the Output Manager pub fn output_manager(&self) -> OutputManagerHandle { self.wallet_handles @@ -283,12 +291,17 @@ impl BaseNodeContext { .expect("Could not get local mempool interface handle") } - /// Return the handle to the Transaciton Service + /// Return the handle to the Transaction Service pub fn wallet_transaction_service(&self) -> TransactionServiceHandle { self.wallet_handles .get_handle::() .expect("Could not get wallet transaction service handle") } + + // /// Return the state machine channel to provide info updates + pub fn get_status_event_stream(&self) -> Subscriber { + self.node.get_status_event_stream() + } } /// Tries to construct a node identity by loading the secret key and other metadata from disk and calculating the diff --git a/applications/tari_base_node/src/parser.rs b/applications/tari_base_node/src/parser.rs index 416619b86ca..a6044738b6b 100644 --- a/applications/tari_base_node/src/parser.rs +++ b/applications/tari_base_node/src/parser.rs @@ -29,6 +29,7 @@ use crate::{ }; use chrono::Utc; use chrono_english::{parse_date_string, Dialect}; +use futures::StreamExt; use log::*; use qrcode::{render::unicode, QrCode}; use regex::Regex; @@ -41,10 +42,9 @@ use rustyline::{ }; use rustyline_derive::{Helper, Highlighter, Validator}; use std::{ - error::Error, io::{self, Write}, str::FromStr, - string::{ParseError, ToString}, + string::ToString, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -53,6 +53,7 @@ use std::{ }; use strum::IntoEnumIterator; use strum_macros::{Display, EnumIter, EnumString}; +use tari_broadcast_channel::Subscriber; use tari_common::GlobalConfig; use tari_comms::{ connection_manager::ConnectionManagerRequester, @@ -62,7 +63,7 @@ use tari_comms::{ }; use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester}; use tari_core::{ - base_node::LocalNodeCommsInterface, + base_node::{states::StatusInfo, LocalNodeCommsInterface}, blocks::BlockHeader, mempool::service::LocalMempoolService, tari_utilities::{hex::Hex, Hashable}, @@ -109,6 +110,7 @@ pub enum BaseNodeCommand { GetMiningState, MakeItRain, CoinSplit, + GetStateInfo, Quit, Exit, } @@ -132,6 +134,7 @@ pub struct Parser { enable_miner: Arc, miner_hashrate: Arc, miner_thread_count: u64, + state_machine_info: Subscriber, } const MAKE_IT_RAIN_USAGE: &str = "\nmake-it-rain [Txs/s] [duration (s)] [start amount (uT)] [increment (uT)/Tx] \ @@ -187,6 +190,7 @@ impl Parser { enable_miner: ctx.miner_enabled(), miner_hashrate: ctx.miner_hashrate(), miner_thread_count: config.num_mining_threads as u64, + state_machine_info: ctx.get_state_machine_info_channel(), } } @@ -236,6 +240,9 @@ impl Parser { Help => { self.print_help(args); }, + GetStateInfo => { + self.process_state_info(); + }, GetBalance => { self.process_get_balance(); }, @@ -329,6 +336,9 @@ impl Parser { let joined = self.commands.join(", "); println!("{}", joined); }, + GetStateInfo => { + println!("Prints out the status of the base node state machine"); + }, GetBalance => { println!("Gets your balance"); }, @@ -433,6 +443,23 @@ impl Parser { }); } + /// Function to process the ge-state-info command + fn process_state_info(&mut self) { + let mut channel = self.state_machine_info.clone(); + self.executor.spawn(async move { + match channel.next().await { + None => { + info!( + target: LOG_TARGET, + "Error communicating with state machine, channel could have been closed" + ); + return; + }, + Some(data) => println!("Current state machine state:\n{}", data), + }; + }); + } + /// Function to process the list utxos command fn process_list_unspent_outputs(&mut self) { let mut handler1 = self.node_service.clone(); @@ -1055,7 +1082,7 @@ impl Parser { } /// Helper function to convert an array from command_arg to a Vec of header heights - async fn cmd_arg_to_header_heights(mut handler: LocalNodeCommsInterface, command_arg: Vec) -> Vec { + async fn cmd_arg_to_header_heights(handler: LocalNodeCommsInterface, command_arg: Vec) -> Vec { let height_ranges: Result, _> = command_arg.iter().map(|v| u64::from_str(v)).collect(); match height_ranges { Ok(height_ranges) => { @@ -1068,14 +1095,14 @@ impl Parser { Ok(heights) => heights, Err(_) => { println!("Error communicating with comm interface"); - return Vec::new(); + Vec::new() }, } } }, - Err(e) => { + Err(_e) => { println!("Invalid number provided"); - return Vec::new(); + Vec::new() }, } } diff --git a/base_layer/core/src/base_node/state_machine.rs b/base_layer/core/src/base_node/state_machine.rs index 8c33ab64551..d42f1207923 100644 --- a/base_layer/core/src/base_node/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine.rs @@ -67,7 +67,7 @@ pub struct BaseNodeStateMachine { pub(super) metadata_event_stream: Subscriber, pub(super) config: BaseNodeStateMachineConfig, pub(super) info: StatusInfo, - pub(super) status_event_publisher: Publisher, + status_event_publisher: Publisher, status_event_subscriber: Subscriber, event_sender: Publisher, event_receiver: Subscriber, @@ -144,6 +144,11 @@ impl BaseNodeStateMachine { self.status_event_subscriber.clone() } + /// This function will publish the current StatusInfo to the channel + pub async fn publish_event_info(&mut self) { + let _ = self.status_event_publisher.send(self.info.clone()).await; + } + /// Start the base node runtime. pub async fn run(mut self) { use crate::base_node::states::BaseNodeState::*; diff --git a/base_layer/core/src/base_node/states/block_sync.rs b/base_layer/core/src/base_node/states/block_sync.rs index 9942bbc6e48..a3671bd7aa6 100644 --- a/base_layer/core/src/base_node/states/block_sync.rs +++ b/base_layer/core/src/base_node/states/block_sync.rs @@ -107,7 +107,7 @@ impl Display for BlockSyncInfo { fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { let local_height = self.local_height.unwrap_or(0); let tip_height = self.tip_height.unwrap_or(0); - fmt.write_str("Syncing from: \n")?; + fmt.write_str("Syncing from the following peers: \n")?; for peer in &self.sync_peers { fmt.write_str(&format!("{}\n", peer))?; } @@ -161,8 +161,7 @@ impl BlockSyncStrategy { ) -> StateEvent { shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None)); - - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; match self { BlockSyncStrategy::ViaBestChainMetadata(sync) => sync.next_event(shared, network_tip, sync_peers).await, BlockSyncStrategy::ViaRandomPeer(sync) => sync.next_event(shared).await, @@ -217,9 +216,11 @@ impl BestChainMetadataBlockSyncInfo { network_tip: &ChainMetadata, sync_peers: &mut Vec, ) -> StateEvent + where + B: 'static, { shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None)); - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; info!(target: LOG_TARGET, "Synchronizing missing blocks."); match synchronize_blocks(shared, network_tip, sync_peers).await { Ok(()) => { @@ -278,7 +279,7 @@ async fn synchronize_blocks( info.sync_peers.clear(); info.sync_peers.append(&mut sync_peers.clone()); } - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; let local_metadata = shared.db.get_metadata()?; if let Some(local_block_hash) = local_metadata.best_block.clone() { if let Some(network_block_hash) = network_metadata.best_block.clone() { @@ -320,7 +321,7 @@ async fn synchronize_blocks( info.local_height = Some(sync_height); } - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; let max_height = min( sync_height + (shared.config.block_sync_config.block_request_size - 1) as u64, network_tip_height, @@ -422,14 +423,14 @@ async fn request_and_add_blocks( // assuming the numbers are ordred info.tip_height = Some(block_nums[block_nums.len() - 1]); } - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; for block in blocks { let block_hash = block.hash(); if let StatusInfo::BlockSync(ref mut info) = shared.info { info.local_height = Some(block.header.height); } - let _ = shared.status_event_publisher.send(shared.info.clone()).await; + shared.publish_event_info().await; match shared .local_node_interface .submit_block(block.clone(), Broadcast::from(false)) diff --git a/base_layer/core/src/base_node/states/listening.rs b/base_layer/core/src/base_node/states/listening.rs index 7ea0be48e2b..a3867337838 100644 --- a/base_layer/core/src/base_node/states/listening.rs +++ b/base_layer/core/src/base_node/states/listening.rs @@ -29,7 +29,7 @@ use crate::{ chain_storage::{BlockchainBackend, ChainMetadata}, proof_of_work::Difficulty, }; -use futures::stream::StreamExt; +use futures::{stream::StreamExt, SinkExt}; use log::*; use std::fmt::{Display, Formatter}; use tari_comms::peer_manager::NodeId; @@ -61,9 +61,11 @@ impl ListeningInfo { pub struct ListeningData; impl ListeningData { - pub async fn next_event(&mut self, shared: &mut BaseNodeStateMachine) -> StateEvent { + pub async fn next_event(&mut self, shared: &mut BaseNodeStateMachine) -> StateEvent + where B: 'static { info!(target: LOG_TARGET, "Listening for chain metadata updates"); shared.info = StatusInfo::Listening(ListeningInfo::new()); + shared.publish_event_info().await; while let Some(metadata_event) = shared.metadata_event_stream.next().await { match &*metadata_event { ChainMetadataEvent::PeerChainMetadataReceived(ref peer_metadata_list) => {