Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds a channel to the base_node parser to listen for state machine fe…
Browse files Browse the repository at this point in the history
…edback

Changed listing state display to be more clear
SWvheerden committed May 19, 2020
1 parent 824dfe0 commit d5b84a0
Showing 4 changed files with 52 additions and 12 deletions.
19 changes: 16 additions & 3 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ use std::{
},
time::Duration,
};
use tari_broadcast_channel::Subscriber;
use tari_common::{CommsTransport, DatabaseType, GlobalConfig, Network, SocksAuthentication, TorControlAuthentication};
use tari_comms::{
multiaddr::{Multiaddr, Protocol},
@@ -51,6 +52,7 @@ use tari_core::{
base_node::{
chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer},
service::{BaseNodeServiceConfig, BaseNodeServiceInitializer},
states::StatusInfo,
BaseNodeStateMachine,
BaseNodeStateMachineConfig,
LocalNodeCommsInterface,
@@ -200,6 +202,10 @@ impl NodeContainer {
using_backend!(self, ctx, ctx.wallet_transaction_service())
}

pub fn get_state_machine_info_channel(&self) -> Subscriber<StatusInfo> {
using_backend!(self, ctx, ctx.get_status_event_stream())
}

async fn run_impl<B: BlockchainBackend + 'static>(mut ctx: BaseNodeContext<B>, rt: runtime::Handle) {
info!(target: LOG_TARGET, "Tari base node has STARTED");
let mut wallet_output_handle = ctx.output_manager();
@@ -219,7 +225,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.sync_with_base_node().await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
@@ -261,7 +267,9 @@ pub struct BaseNodeContext<B: BlockchainBackend> {
pub miner_hashrate: Arc<AtomicU64>,
}

impl<B: BlockchainBackend> BaseNodeContext<B> {
impl<B: BlockchainBackend> BaseNodeContext<B>
where B: 'static
{
/// Returns a handle to the Output Manager
pub fn output_manager(&self) -> OutputManagerHandle {
self.wallet_handles
@@ -283,12 +291,17 @@ impl<B: BlockchainBackend> BaseNodeContext<B> {
.expect("Could not get local mempool interface handle")
}

/// Return the handle to the Transaciton Service
/// Return the handle to the Transaction Service
pub fn wallet_transaction_service(&self) -> TransactionServiceHandle {
self.wallet_handles
.get_handle::<TransactionServiceHandle>()
.expect("Could not get wallet transaction service handle")
}

// /// Return the state machine channel to provide info updates
pub fn get_status_event_stream(&self) -> Subscriber<StatusInfo> {
self.node.get_status_event_stream()
}
}

/// Tries to construct a node identity by loading the secret key and other metadata from disk and calculating the
39 changes: 32 additions & 7 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ use crate::{
};
use chrono::Utc;
use chrono_english::{parse_date_string, Dialect};
use futures::StreamExt;
use log::*;
use qrcode::{render::unicode, QrCode};
use regex::Regex;
@@ -41,10 +42,9 @@ use rustyline::{
};
use rustyline_derive::{Helper, Highlighter, Validator};
use std::{
error::Error,
io::{self, Write},
str::FromStr,
string::{ParseError, ToString},
string::ToString,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@@ -53,6 +53,7 @@ use std::{
};
use strum::IntoEnumIterator;
use strum_macros::{Display, EnumIter, EnumString};
use tari_broadcast_channel::Subscriber;
use tari_common::GlobalConfig;
use tari_comms::{
connection_manager::ConnectionManagerRequester,
@@ -62,7 +63,7 @@ use tari_comms::{
};
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_core::{
base_node::LocalNodeCommsInterface,
base_node::{states::StatusInfo, LocalNodeCommsInterface},
blocks::BlockHeader,
mempool::service::LocalMempoolService,
tari_utilities::{hex::Hex, Hashable},
@@ -109,6 +110,7 @@ pub enum BaseNodeCommand {
GetMiningState,
MakeItRain,
CoinSplit,
GetStateInfo,
Quit,
Exit,
}
@@ -132,6 +134,7 @@ pub struct Parser {
enable_miner: Arc<AtomicBool>,
miner_hashrate: Arc<AtomicU64>,
miner_thread_count: u64,
state_machine_info: Subscriber<StatusInfo>,
}

const MAKE_IT_RAIN_USAGE: &str = "\nmake-it-rain [Txs/s] [duration (s)] [start amount (uT)] [increment (uT)/Tx] \
@@ -187,6 +190,7 @@ impl Parser {
enable_miner: ctx.miner_enabled(),
miner_hashrate: ctx.miner_hashrate(),
miner_thread_count: config.num_mining_threads as u64,
state_machine_info: ctx.get_state_machine_info_channel(),
}
}

@@ -236,6 +240,9 @@ impl Parser {
Help => {
self.print_help(args);
},
GetStateInfo => {
self.process_state_info();
},
GetBalance => {
self.process_get_balance();
},
@@ -329,6 +336,9 @@ impl Parser {
let joined = self.commands.join(", ");
println!("{}", joined);
},
GetStateInfo => {
println!("Prints out the status of the base_node state machine");
},
GetBalance => {
println!("Gets your balance");
},
@@ -433,6 +443,21 @@ impl Parser {
});
}

/// Function to process the ge-state-info command
fn process_state_info(&mut self) {
let mut channel = self.state_machine_info.clone();
self.executor.spawn(async move {
match channel.next().await {
None => {
println!("Something went wrong");
warn!(target: LOG_TARGET, "Error communicating with state machine");
return;
},
Some(data) => println!("Current state machine state:\n{}", data),
};
});
}

/// Function to process the list utxos command
fn process_list_unspent_outputs(&mut self) {
let mut handler1 = self.node_service.clone();
@@ -1055,7 +1080,7 @@ impl Parser {
}

/// Helper function to convert an array from command_arg to a Vec<u64> of header heights
async fn cmd_arg_to_header_heights(mut handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
async fn cmd_arg_to_header_heights(handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
let height_ranges: Result<Vec<u64>, _> = command_arg.iter().map(|v| u64::from_str(v)).collect();
match height_ranges {
Ok(height_ranges) => {
@@ -1068,14 +1093,14 @@ impl Parser {
Ok(heights) => heights,
Err(_) => {
println!("Error communicating with comm interface");
return Vec::new();
Vec::new()
},
}
}
},
Err(e) => {
Err(_e) => {
println!("Invalid number provided");
return Vec::new();
Vec::new()
},
}
}
3 changes: 2 additions & 1 deletion base_layer/core/src/base_node/states/block_sync.rs
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@ impl Display for BlockSyncInfo {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let local_height = self.local_height.unwrap_or(0);
let tip_height = self.tip_height.unwrap_or(0);
fmt.write_str("Syncing from: \n")?;
fmt.write_str("Syncing from the following peers: \n")?;
for peer in &self.sync_peers {
fmt.write_str(&format!("{}\n", peer))?;
}
@@ -161,6 +161,7 @@ impl BlockSyncStrategy {
) -> StateEvent
{
shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None));
let _ = shared.status_event_publisher.send(shared.info.clone()).await;

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
match self {
3 changes: 2 additions & 1 deletion base_layer/core/src/base_node/states/listening.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ use crate::{
chain_storage::{BlockchainBackend, ChainMetadata},
proof_of_work::Difficulty,
};
use futures::stream::StreamExt;
use futures::{stream::StreamExt, SinkExt};
use log::*;
use std::fmt::{Display, Formatter};
use tari_comms::peer_manager::NodeId;
@@ -64,6 +64,7 @@ impl ListeningData {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.info = StatusInfo::Listening(ListeningInfo::new());
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
while let Some(metadata_event) = shared.metadata_event_stream.next().await {
match &*metadata_event {
ChainMetadataEvent::PeerChainMetadataReceived(ref peer_metadata_list) => {

0 comments on commit d5b84a0

Please sign in to comment.