Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UI feedback of state machine #1880

Merged
merged 1 commit into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
},
time::Duration,
};
use tari_broadcast_channel::Subscriber;
use tari_common::{CommsTransport, DatabaseType, GlobalConfig, Network, SocksAuthentication, TorControlAuthentication};
use tari_comms::{
multiaddr::{Multiaddr, Protocol},
Expand All @@ -51,6 +52,7 @@ use tari_core::{
base_node::{
chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer},
service::{BaseNodeServiceConfig, BaseNodeServiceInitializer},
states::StatusInfo,
BaseNodeStateMachine,
BaseNodeStateMachineConfig,
LocalNodeCommsInterface,
Expand Down Expand Up @@ -200,6 +202,10 @@ impl NodeContainer {
using_backend!(self, ctx, ctx.wallet_transaction_service())
}

pub fn get_state_machine_info_channel(&self) -> Subscriber<StatusInfo> {
using_backend!(self, ctx, ctx.get_status_event_stream())
}

async fn run_impl<B: BlockchainBackend + 'static>(mut ctx: BaseNodeContext<B>, rt: runtime::Handle) {
info!(target: LOG_TARGET, "Tari base node has STARTED");
let mut wallet_output_handle = ctx.output_manager();
Expand All @@ -219,7 +225,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.sync_with_base_node().await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
Expand Down Expand Up @@ -261,7 +267,9 @@ pub struct BaseNodeContext<B: BlockchainBackend> {
pub miner_hashrate: Arc<AtomicU64>,
}

impl<B: BlockchainBackend> BaseNodeContext<B> {
impl<B: BlockchainBackend> BaseNodeContext<B>
where B: 'static
{
/// Returns a handle to the Output Manager
pub fn output_manager(&self) -> OutputManagerHandle {
self.wallet_handles
Expand All @@ -283,12 +291,17 @@ impl<B: BlockchainBackend> BaseNodeContext<B> {
.expect("Could not get local mempool interface handle")
}

/// Return the handle to the Transaciton Service
/// Return the handle to the Transaction Service
pub fn wallet_transaction_service(&self) -> TransactionServiceHandle {
self.wallet_handles
.get_handle::<TransactionServiceHandle>()
.expect("Could not get wallet transaction service handle")
}

// /// Return the state machine channel to provide info updates
pub fn get_status_event_stream(&self) -> Subscriber<StatusInfo> {
self.node.get_status_event_stream()
}
}

/// Tries to construct a node identity by loading the secret key and other metadata from disk and calculating the
Expand Down
44 changes: 37 additions & 7 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
};
use chrono::Utc;
use chrono_english::{parse_date_string, Dialect};
use futures::StreamExt;
use log::*;
use qrcode::{render::unicode, QrCode};
use regex::Regex;
Expand All @@ -41,10 +42,9 @@ use rustyline::{
};
use rustyline_derive::{Helper, Highlighter, Validator};
use std::{
error::Error,
io::{self, Write},
str::FromStr,
string::{ParseError, ToString},
string::ToString,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
Expand All @@ -53,6 +53,7 @@ use std::{
};
use strum::IntoEnumIterator;
use strum_macros::{Display, EnumIter, EnumString};
use tari_broadcast_channel::Subscriber;
use tari_common::GlobalConfig;
use tari_comms::{
connection_manager::ConnectionManagerRequester,
Expand All @@ -62,7 +63,7 @@ use tari_comms::{
};
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_core::{
base_node::LocalNodeCommsInterface,
base_node::{states::StatusInfo, LocalNodeCommsInterface},
blocks::BlockHeader,
mempool::service::LocalMempoolService,
tari_utilities::{hex::Hex, Hashable},
Expand Down Expand Up @@ -112,6 +113,7 @@ pub enum BaseNodeCommand {
GetMiningState,
MakeItRain,
CoinSplit,
GetStateInfo,
Quit,
Exit,
}
Expand All @@ -135,6 +137,7 @@ pub struct Parser {
enable_miner: Arc<AtomicBool>,
miner_hashrate: Arc<AtomicU64>,
miner_thread_count: u64,
state_machine_info: Subscriber<StatusInfo>,
}

const MAKE_IT_RAIN_USAGE: &str = "\nmake-it-rain [Txs/s] [duration (s)] [start amount (uT)] [increment (uT)/Tx] \
Expand Down Expand Up @@ -190,6 +193,7 @@ impl Parser {
enable_miner: ctx.miner_enabled(),
miner_hashrate: ctx.miner_hashrate(),
miner_thread_count: config.num_mining_threads as u64,
state_machine_info: ctx.get_state_machine_info_channel(),
}
}

Expand Down Expand Up @@ -239,6 +243,9 @@ impl Parser {
Help => {
self.print_help(args);
},
GetStateInfo => {
self.process_state_info();
},
Version => {
self.print_version();
},
Expand Down Expand Up @@ -335,6 +342,9 @@ impl Parser {
let joined = self.commands.join(", ");
println!("{}", joined);
},
GetStateInfo => {
println!("Prints out the status of the base node state machine");
},
Version => {
println!("Gets the current application version");
},
Expand Down Expand Up @@ -447,6 +457,26 @@ impl Parser {
println!("Version: {}", VERSION);
}

/// Function to process the get-state-info command
fn process_state_info(&mut self) {
// the channel only holds events of 1 as the channel is created bounded(1)
let mut channel = self.state_machine_info.clone();
// We clone the channel so that allows as to always start to read from the beginning. Hence the channel never
// empties.
self.executor.spawn(async move {
match channel.next().await {
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
None => {
info!(
target: LOG_TARGET,
"Error communicating with state machine, channel could have been closed"
);
return;
},
Some(data) => println!("Current state machine state:\n{}", data),
};
});
}

/// Function to process the list utxos command
fn process_list_unspent_outputs(&mut self) {
let mut handler1 = self.node_service.clone();
Expand Down Expand Up @@ -1069,7 +1099,7 @@ impl Parser {
}

/// Helper function to convert an array from command_arg to a Vec<u64> of header heights
async fn cmd_arg_to_header_heights(mut handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
async fn cmd_arg_to_header_heights(handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
let height_ranges: Result<Vec<u64>, _> = command_arg.iter().map(|v| u64::from_str(v)).collect();
match height_ranges {
Ok(height_ranges) => {
Expand All @@ -1082,14 +1112,14 @@ impl Parser {
Ok(heights) => heights,
Err(_) => {
println!("Error communicating with comm interface");
return Vec::new();
Vec::new()
},
}
}
},
Err(e) => {
Err(_e) => {
println!("Invalid number provided");
return Vec::new();
Vec::new()
},
}
}
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/base_node/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct BaseNodeStateMachine<B: BlockchainBackend> {
pub(super) metadata_event_stream: Subscriber<ChainMetadataEvent>,
pub(super) config: BaseNodeStateMachineConfig,
pub(super) info: StatusInfo,
pub(super) status_event_publisher: Publisher<StatusInfo>,
status_event_publisher: Publisher<StatusInfo>,
status_event_subscriber: Subscriber<StatusInfo>,
event_sender: Publisher<StateEvent>,
event_receiver: Subscriber<StateEvent>,
Expand Down Expand Up @@ -144,6 +144,11 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
self.status_event_subscriber.clone()
}

/// This function will publish the current StatusInfo to the channel
pub async fn publish_event_info(&mut self) {
let _ = self.status_event_publisher.send(self.info.clone()).await;
}

/// Start the base node runtime.
pub async fn run(mut self) {
use crate::base_node::states::BaseNodeState::*;
Expand Down
17 changes: 9 additions & 8 deletions base_layer/core/src/base_node/states/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Display for BlockSyncInfo {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let local_height = self.local_height.unwrap_or(0);
let tip_height = self.tip_height.unwrap_or(0);
fmt.write_str("Syncing from: \n")?;
fmt.write_str("Syncing from the following peers: \n")?;
for peer in &self.sync_peers {
fmt.write_str(&format!("{}\n", peer))?;
}
Expand Down Expand Up @@ -161,8 +161,7 @@ impl BlockSyncStrategy {
) -> StateEvent
{
shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None));

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
match self {
BlockSyncStrategy::ViaBestChainMetadata(sync) => sync.next_event(shared, network_tip, sync_peers).await,
BlockSyncStrategy::ViaRandomPeer(sync) => sync.next_event(shared).await,
Expand Down Expand Up @@ -217,9 +216,11 @@ impl BestChainMetadataBlockSyncInfo {
network_tip: &ChainMetadata,
sync_peers: &mut Vec<NodeId>,
) -> StateEvent
where
B: 'static,
{
shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None));
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
info!(target: LOG_TARGET, "Synchronizing missing blocks.");
match synchronize_blocks(shared, network_tip, sync_peers).await {
Ok(()) => {
Expand Down Expand Up @@ -278,7 +279,7 @@ async fn synchronize_blocks<B: BlockchainBackend + 'static>(
info.sync_peers.clear();
info.sync_peers.append(&mut sync_peers.clone());
}
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
let local_metadata = shared.db.get_metadata()?;
if let Some(local_block_hash) = local_metadata.best_block.clone() {
if let Some(network_block_hash) = network_metadata.best_block.clone() {
Expand Down Expand Up @@ -320,7 +321,7 @@ async fn synchronize_blocks<B: BlockchainBackend + 'static>(
info.local_height = Some(sync_height);
}

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
let max_height = min(
sync_height + (shared.config.block_sync_config.block_request_size - 1) as u64,
network_tip_height,
Expand Down Expand Up @@ -422,14 +423,14 @@ async fn request_and_add_blocks<B: BlockchainBackend + 'static>(
// assuming the numbers are ordred
info.tip_height = Some(block_nums[block_nums.len() - 1]);
}
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
for block in blocks {
let block_hash = block.hash();
if let StatusInfo::BlockSync(ref mut info) = shared.info {
info.local_height = Some(block.header.height);
}

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
match shared
.local_node_interface
.submit_block(block.clone(), Broadcast::from(false))
Expand Down
6 changes: 4 additions & 2 deletions base_layer/core/src/base_node/states/listening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
chain_storage::{BlockchainBackend, ChainMetadata},
proof_of_work::Difficulty,
};
use futures::stream::StreamExt;
use futures::{stream::StreamExt, SinkExt};
use log::*;
use std::fmt::{Display, Formatter};
use tari_comms::peer_manager::NodeId;
Expand Down Expand Up @@ -61,9 +61,11 @@ impl ListeningInfo {
pub struct ListeningData;

impl ListeningData {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent
where B: 'static {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.info = StatusInfo::Listening(ListeningInfo::new());
shared.publish_event_info().await;
while let Some(metadata_event) = shared.metadata_event_stream.next().await {
match &*metadata_event {
ChainMetadataEvent::PeerChainMetadataReceived(ref peer_metadata_list) => {
Expand Down