Skip to content

Commit

Permalink
Adds a channel to the base_node parser to listen for state machine fe…
Browse files Browse the repository at this point in the history
…edback

Changed listing state display to be more clear
Co-authored-by: Stan Bondi <[email protected]>
  • Loading branch information
SWvheerden committed May 20, 2020
1 parent 824dfe0 commit 90c1c18
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 21 deletions.
19 changes: 16 additions & 3 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
},
time::Duration,
};
use tari_broadcast_channel::Subscriber;
use tari_common::{CommsTransport, DatabaseType, GlobalConfig, Network, SocksAuthentication, TorControlAuthentication};
use tari_comms::{
multiaddr::{Multiaddr, Protocol},
Expand All @@ -51,6 +52,7 @@ use tari_core::{
base_node::{
chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer},
service::{BaseNodeServiceConfig, BaseNodeServiceInitializer},
states::StatusInfo,
BaseNodeStateMachine,
BaseNodeStateMachineConfig,
LocalNodeCommsInterface,
Expand Down Expand Up @@ -200,6 +202,10 @@ impl NodeContainer {
using_backend!(self, ctx, ctx.wallet_transaction_service())
}

pub fn get_state_machine_info_channel(&self) -> Subscriber<StatusInfo> {
using_backend!(self, ctx, ctx.get_status_event_stream())
}

async fn run_impl<B: BlockchainBackend + 'static>(mut ctx: BaseNodeContext<B>, rt: runtime::Handle) {
info!(target: LOG_TARGET, "Tari base node has STARTED");
let mut wallet_output_handle = ctx.output_manager();
Expand All @@ -219,7 +225,7 @@ impl NodeContainer {
let mut oms_handle_clone = wallet_output_handle.clone();
tokio::spawn(async move {
delay_for(Duration::from_secs(240)).await;
oms_handle_clone.sync_with_base_node().await;
let _ = oms_handle_clone.sync_with_base_node().await;
});
},
Err(e) => warn!(target: LOG_TARGET, "Error adding output: {}", e),
Expand Down Expand Up @@ -261,7 +267,9 @@ pub struct BaseNodeContext<B: BlockchainBackend> {
pub miner_hashrate: Arc<AtomicU64>,
}

impl<B: BlockchainBackend> BaseNodeContext<B> {
impl<B: BlockchainBackend> BaseNodeContext<B>
where B: 'static
{
/// Returns a handle to the Output Manager
pub fn output_manager(&self) -> OutputManagerHandle {
self.wallet_handles
Expand All @@ -283,12 +291,17 @@ impl<B: BlockchainBackend> BaseNodeContext<B> {
.expect("Could not get local mempool interface handle")
}

/// Return the handle to the Transaciton Service
/// Return the handle to the Transaction Service
pub fn wallet_transaction_service(&self) -> TransactionServiceHandle {
self.wallet_handles
.get_handle::<TransactionServiceHandle>()
.expect("Could not get wallet transaction service handle")
}

// /// Return the state machine channel to provide info updates
pub fn get_status_event_stream(&self) -> Subscriber<StatusInfo> {
self.node.get_status_event_stream()
}
}

/// Tries to construct a node identity by loading the secret key and other metadata from disk and calculating the
Expand Down
41 changes: 34 additions & 7 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
};
use chrono::Utc;
use chrono_english::{parse_date_string, Dialect};
use futures::StreamExt;
use log::*;
use qrcode::{render::unicode, QrCode};
use regex::Regex;
Expand All @@ -41,10 +42,9 @@ use rustyline::{
};
use rustyline_derive::{Helper, Highlighter, Validator};
use std::{
error::Error,
io::{self, Write},
str::FromStr,
string::{ParseError, ToString},
string::ToString,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
Expand All @@ -53,6 +53,7 @@ use std::{
};
use strum::IntoEnumIterator;
use strum_macros::{Display, EnumIter, EnumString};
use tari_broadcast_channel::Subscriber;
use tari_common::GlobalConfig;
use tari_comms::{
connection_manager::ConnectionManagerRequester,
Expand All @@ -62,7 +63,7 @@ use tari_comms::{
};
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_core::{
base_node::LocalNodeCommsInterface,
base_node::{states::StatusInfo, LocalNodeCommsInterface},
blocks::BlockHeader,
mempool::service::LocalMempoolService,
tari_utilities::{hex::Hex, Hashable},
Expand Down Expand Up @@ -109,6 +110,7 @@ pub enum BaseNodeCommand {
GetMiningState,
MakeItRain,
CoinSplit,
GetStateInfo,
Quit,
Exit,
}
Expand All @@ -132,6 +134,7 @@ pub struct Parser {
enable_miner: Arc<AtomicBool>,
miner_hashrate: Arc<AtomicU64>,
miner_thread_count: u64,
state_machine_info: Subscriber<StatusInfo>,
}

const MAKE_IT_RAIN_USAGE: &str = "\nmake-it-rain [Txs/s] [duration (s)] [start amount (uT)] [increment (uT)/Tx] \
Expand Down Expand Up @@ -187,6 +190,7 @@ impl Parser {
enable_miner: ctx.miner_enabled(),
miner_hashrate: ctx.miner_hashrate(),
miner_thread_count: config.num_mining_threads as u64,
state_machine_info: ctx.get_state_machine_info_channel(),
}
}

Expand Down Expand Up @@ -236,6 +240,9 @@ impl Parser {
Help => {
self.print_help(args);
},
GetStateInfo => {
self.process_state_info();
},
GetBalance => {
self.process_get_balance();
},
Expand Down Expand Up @@ -329,6 +336,9 @@ impl Parser {
let joined = self.commands.join(", ");
println!("{}", joined);
},
GetStateInfo => {
println!("Prints out the status of the base node state machine");
},
GetBalance => {
println!("Gets your balance");
},
Expand Down Expand Up @@ -433,6 +443,23 @@ impl Parser {
});
}

/// Function to process the ge-state-info command
fn process_state_info(&mut self) {
let mut channel = self.state_machine_info.clone();
self.executor.spawn(async move {
match channel.next().await {
None => {
info!(
target: LOG_TARGET,
"Error communicating with state machine, channel could have been closed"
);
return;
},
Some(data) => println!("Current state machine state:\n{}", data),
};
});
}

/// Function to process the list utxos command
fn process_list_unspent_outputs(&mut self) {
let mut handler1 = self.node_service.clone();
Expand Down Expand Up @@ -1055,7 +1082,7 @@ impl Parser {
}

/// Helper function to convert an array from command_arg to a Vec<u64> of header heights
async fn cmd_arg_to_header_heights(mut handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
async fn cmd_arg_to_header_heights(handler: LocalNodeCommsInterface, command_arg: Vec<String>) -> Vec<u64> {
let height_ranges: Result<Vec<u64>, _> = command_arg.iter().map(|v| u64::from_str(v)).collect();
match height_ranges {
Ok(height_ranges) => {
Expand All @@ -1068,14 +1095,14 @@ impl Parser {
Ok(heights) => heights,
Err(_) => {
println!("Error communicating with comm interface");
return Vec::new();
Vec::new()
},
}
}
},
Err(e) => {
Err(_e) => {
println!("Invalid number provided");
return Vec::new();
Vec::new()
},
}
}
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/base_node/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct BaseNodeStateMachine<B: BlockchainBackend> {
pub(super) metadata_event_stream: Subscriber<ChainMetadataEvent>,
pub(super) config: BaseNodeStateMachineConfig,
pub(super) info: StatusInfo,
pub(super) status_event_publisher: Publisher<StatusInfo>,
status_event_publisher: Publisher<StatusInfo>,
status_event_subscriber: Subscriber<StatusInfo>,
event_sender: Publisher<StateEvent>,
event_receiver: Subscriber<StateEvent>,
Expand Down Expand Up @@ -144,6 +144,11 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
self.status_event_subscriber.clone()
}

/// This function will publish the current StatusInfo to the channel
pub async fn publish_event_info(&mut self) {
let _ = self.status_event_publisher.send(self.info.clone()).await;
}

/// Start the base node runtime.
pub async fn run(mut self) {
use crate::base_node::states::BaseNodeState::*;
Expand Down
17 changes: 9 additions & 8 deletions base_layer/core/src/base_node/states/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Display for BlockSyncInfo {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let local_height = self.local_height.unwrap_or(0);
let tip_height = self.tip_height.unwrap_or(0);
fmt.write_str("Syncing from: \n")?;
fmt.write_str("Syncing from the following peers: \n")?;
for peer in &self.sync_peers {
fmt.write_str(&format!("{}\n", peer))?;
}
Expand Down Expand Up @@ -161,8 +161,7 @@ impl BlockSyncStrategy {
) -> StateEvent
{
shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None));

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
match self {
BlockSyncStrategy::ViaBestChainMetadata(sync) => sync.next_event(shared, network_tip, sync_peers).await,
BlockSyncStrategy::ViaRandomPeer(sync) => sync.next_event(shared).await,
Expand Down Expand Up @@ -217,9 +216,11 @@ impl BestChainMetadataBlockSyncInfo {
network_tip: &ChainMetadata,
sync_peers: &mut Vec<NodeId>,
) -> StateEvent
where
B: 'static,
{
shared.info = StatusInfo::BlockSync(BlockSyncInfo::new(None, None, None));
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
info!(target: LOG_TARGET, "Synchronizing missing blocks.");
match synchronize_blocks(shared, network_tip, sync_peers).await {
Ok(()) => {
Expand Down Expand Up @@ -278,7 +279,7 @@ async fn synchronize_blocks<B: BlockchainBackend + 'static>(
info.sync_peers.clear();
info.sync_peers.append(&mut sync_peers.clone());
}
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
let local_metadata = shared.db.get_metadata()?;
if let Some(local_block_hash) = local_metadata.best_block.clone() {
if let Some(network_block_hash) = network_metadata.best_block.clone() {
Expand Down Expand Up @@ -320,7 +321,7 @@ async fn synchronize_blocks<B: BlockchainBackend + 'static>(
info.local_height = Some(sync_height);
}

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
let max_height = min(
sync_height + (shared.config.block_sync_config.block_request_size - 1) as u64,
network_tip_height,
Expand Down Expand Up @@ -422,14 +423,14 @@ async fn request_and_add_blocks<B: BlockchainBackend + 'static>(
// assuming the numbers are ordred
info.tip_height = Some(block_nums[block_nums.len() - 1]);
}
let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
for block in blocks {
let block_hash = block.hash();
if let StatusInfo::BlockSync(ref mut info) = shared.info {
info.local_height = Some(block.header.height);
}

let _ = shared.status_event_publisher.send(shared.info.clone()).await;
shared.publish_event_info().await;
match shared
.local_node_interface
.submit_block(block.clone(), Broadcast::from(false))
Expand Down
6 changes: 4 additions & 2 deletions base_layer/core/src/base_node/states/listening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
chain_storage::{BlockchainBackend, ChainMetadata},
proof_of_work::Difficulty,
};
use futures::stream::StreamExt;
use futures::{stream::StreamExt, SinkExt};
use log::*;
use std::fmt::{Display, Formatter};
use tari_comms::peer_manager::NodeId;
Expand Down Expand Up @@ -61,9 +61,11 @@ impl ListeningInfo {
pub struct ListeningData;

impl ListeningData {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent
where B: 'static {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.info = StatusInfo::Listening(ListeningInfo::new());
shared.publish_event_info().await;
while let Some(metadata_event) = shared.metadata_event_stream.next().await {
match &*metadata_event {
ChainMetadataEvent::PeerChainMetadataReceived(ref peer_metadata_list) => {
Expand Down

0 comments on commit 90c1c18

Please sign in to comment.