Skip to content

Commit

Permalink
Merge #2194: Add is_synced flag to BaseNodeServiceResponse
Browse files Browse the repository at this point in the history
This PR adds a Boolean flag to all BaseNodeServiceResponse’s so that the node
querying can know if the queried node is synced to the chain tip. The wallet is
then updated to abort a UTXO validation operation if this flag is false. This
is because if a node is not synced it could incorrectly declare UTXO invalid
that it is not yet aware of.

To do this the Base Node Service needed to know whether the Base Node State
machine had synced at least once and was currently in the Listening state. To
allow the Base Node Service to speak to the Base Node State Machine the state
machine had to be refactored into a Service with a Handle that could be
registered in the Service Stack.

Additional the base node was updated to not process new blocks or transactions
until it had synced at least once. This determination was done using the flag
made available via the new BaseNodeStateMachineHandle.
  • Loading branch information
stringhandler committed Sep 9, 2020
2 parents 38221e2 + 9bfe2d7 commit 463f0e3
Show file tree
Hide file tree
Showing 53 changed files with 1,154 additions and 485 deletions.
306 changes: 121 additions & 185 deletions applications/tari_base_node/src/builder.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion applications/tari_base_node/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

use tari_broadcast_channel::Subscriber;
use tari_core::{
base_node::{states::StateEvent, LocalNodeCommsInterface},
base_node::{state_machine_service::states::StateEvent, LocalNodeCommsInterface},
consensus::ConsensusManager,
mempool::MempoolStateEvent,
mining::Miner,
Expand Down
22 changes: 11 additions & 11 deletions applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

use super::LOG_TARGET;
use crate::{
builder::NodeContainer,
builder::BaseNodeContext,
table::Table,
utils,
utils::{format_duration_basic, format_naive_datetime},
};
use chrono::Utc;
use chrono_english::{parse_date_string, Dialect};
use futures::{future::Either, StreamExt};
use futures::future::Either;
use log::*;
use qrcode::{render::unicode, QrCode};
use regex::Regex;
Expand All @@ -53,7 +53,6 @@ use std::{
};
use strum::IntoEnumIterator;
use strum_macros::{Display, EnumIter, EnumString};
use tari_broadcast_channel::Subscriber;
use tari_common::GlobalConfig;
use tari_comms::{
connection_manager::ConnectionManagerRequester,
Expand All @@ -64,7 +63,7 @@ use tari_comms::{
};
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_core::{
base_node::{states::StatusInfo, LocalNodeCommsInterface},
base_node::{state_machine_service::states::StatusInfo, LocalNodeCommsInterface},
blocks::BlockHeader,
mempool::service::LocalMempoolService,
mining::MinerInstruction,
Expand All @@ -82,7 +81,11 @@ use tari_wallet::{
transaction_service::{error::TransactionServiceError, handle::TransactionServiceHandle},
util::emoji::EmojiId,
};
use tokio::{runtime, sync::broadcast::Sender as syncSender, time};
use tokio::{
runtime,
sync::{broadcast::Sender as syncSender, watch},
time,
};

/// Enum representing commands used by the basenode
#[derive(Clone, PartialEq, Debug, Display, EnumIter, EnumString)]
Expand Down Expand Up @@ -145,7 +148,7 @@ pub struct Parser {
miner_hashrate: Arc<AtomicU64>,
miner_instructions: syncSender<MinerInstruction>,
miner_thread_count: u64,
state_machine_info: Subscriber<StatusInfo>,
state_machine_info: watch::Receiver<StatusInfo>,
}

// Import the auto-generated const values from the Manifest and Git
Expand Down Expand Up @@ -184,7 +187,7 @@ impl Hinter for Parser {

impl Parser {
/// creates a new parser struct
pub fn new(executor: runtime::Handle, ctx: &NodeContainer, config: &GlobalConfig) -> Self {
pub fn new(executor: runtime::Handle, ctx: &BaseNodeContext, config: &GlobalConfig) -> Self {
Parser {
executor,
wallet_node_identity: ctx.wallet_node_identity(),
Expand Down Expand Up @@ -511,12 +514,9 @@ impl Parser {

/// Function to process the get-state-info command
fn process_state_info(&mut self) {
// the channel only holds events of 1 as the channel is created bounded(1)
let mut channel = self.state_machine_info.clone();
// We clone the channel so that allows as to always start to read from the beginning. Hence the channel never
// empties.
self.executor.spawn(async move {
match channel.next().await {
match channel.recv().await {
None => {
info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::base_node::comms_interface::CommsInterfaceError;
use prost::DecodeError;
use tari_comms::message::MessageError;
use tari_p2p::services::liveness::error::LivenessError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use crate::{
},
chain_storage::{BlockAddResult, ChainMetadata},
};
use chrono::{NaiveDateTime, Utc};
use futures::{stream::StreamExt, SinkExt};
use log::*;
use prost::Message;
use std::time::Instant;
use tari_broadcast_channel::Publisher;
use tari_common::log_if_error;
use tari_comms::{message::MessageExt, peer_manager::NodeId};
Expand All @@ -42,7 +42,7 @@ pub(super) struct ChainMetadataService {
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
peer_chain_metadata: Vec<PeerChainMetadata>,
last_chainstate_flushed_at: NaiveDateTime,
last_chainstate_flushed_at: Option<Instant>,
event_publisher: Publisher<ChainMetadataEvent>,
}

Expand All @@ -62,7 +62,7 @@ impl ChainMetadataService {
liveness,
base_node,
peer_chain_metadata: Vec::new(),
last_chainstate_flushed_at: Utc::now().naive_utc(),
last_chainstate_flushed_at: None,
event_publisher,
}
}
Expand Down Expand Up @@ -186,7 +186,7 @@ impl ChainMetadataService {
.await
.map_err(|_| ChainMetadataSyncError::EventPublishFailed)?;

self.last_chainstate_flushed_at = Utc::now().naive_utc();
self.last_chainstate_flushed_at = Some(Instant::now());

Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions base_layer/core/src/base_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ pub mod consts;
#[cfg(feature = "base_node")]
pub mod service;
#[cfg(feature = "base_node")]
mod state_machine;
#[cfg(feature = "base_node")]
pub mod states;
pub mod state_machine_service;
#[cfg(feature = "base_node")]
mod validators;
#[cfg(feature = "base_node")]
Expand All @@ -53,7 +51,7 @@ pub use validators::{ChainBalanceValidator, HeaderValidator, SyncValidators};
#[cfg(feature = "base_node")]
pub use comms_interface::{LocalNodeCommsInterface, OutboundNodeCommsInterface};
#[cfg(feature = "base_node")]
pub use state_machine::{BaseNodeStateMachine, BaseNodeStateMachineConfig};
pub use state_machine_service::{BaseNodeStateMachine, BaseNodeStateMachineConfig, StateMachineHandle};

#[cfg(any(feature = "base_node", feature = "base_node_proto"))]
pub mod proto;
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/base_node/proto/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message BaseNodeServiceResponse {
// Indicates a MmrNodes response
MmrNodes MmrNodes = 12;
}
bool is_synced = 13;
}

message BlockHeaders {
Expand Down
8 changes: 7 additions & 1 deletion base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface},
proto,
service::service::{BaseNodeService, BaseNodeServiceConfig, BaseNodeStreams},
StateMachineHandle,
},
blocks::NewBlock,
chain_storage::{BlockchainBackend, BlockchainDatabase},
Expand Down Expand Up @@ -189,6 +190,10 @@ where T: BlockchainBackend + 'static
.get_handle::<OutboundMessageRequester>()
.expect("OutboundMessageRequester handle required for BaseNodeService");

let state_machine = handles
.get_handle::<StateMachineHandle>()
.expect("StateMachineHandle required to initialize MempoolService");

let streams = BaseNodeStreams::new(
outbound_request_stream,
outbound_block_stream,
Expand All @@ -198,7 +203,8 @@ where T: BlockchainBackend + 'static
local_request_stream,
local_block_stream,
);
let service = BaseNodeService::new(outbound_message_service, inbound_nch, config).start(streams);
let service =
BaseNodeService::new(outbound_message_service, inbound_nch, config, state_machine).start(streams);
futures::pin_mut!(service);
future::select(service, shutdown).await;
info!(target: LOG_TARGET, "Base Node Service shutdown");
Expand Down
56 changes: 50 additions & 6 deletions base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::{
generate_request_key,
proto,
service::error::BaseNodeServiceError,
state_machine_service::states::StatusInfo,
RequestKey,
StateMachineHandle,
WaitingRequests,
},
blocks::{Block, NewBlock},
Expand Down Expand Up @@ -148,6 +150,7 @@ pub struct BaseNodeService<B> {
timeout_sender: Sender<RequestKey>,
timeout_receiver_stream: Option<Receiver<RequestKey>>,
config: BaseNodeServiceConfig,
state_machine_handle: StateMachineHandle,
}

impl<B> BaseNodeService<B>
Expand All @@ -157,6 +160,7 @@ where B: BlockchainBackend + 'static
outbound_message_service: OutboundMessageRequester,
inbound_nch: InboundNodeCommsHandlers<B>,
config: BaseNodeServiceConfig,
state_machine_handle: StateMachineHandle,
) -> Self
{
let (timeout_sender, timeout_receiver) = channel(100);
Expand All @@ -167,6 +171,7 @@ where B: BlockchainBackend + 'static
timeout_sender,
timeout_receiver_stream: Some(timeout_receiver),
config,
state_machine_handle,
}
}

Expand Down Expand Up @@ -233,7 +238,7 @@ where B: BlockchainBackend + 'static

// Incoming block messages from the Comms layer
block_msg = inbound_block_stream.select_next_some() => {
self.spawn_handle_incoming_block(block_msg);
self.spawn_handle_incoming_block(block_msg).await;
}

// Incoming local request messages from the LocalNodeCommsInterface and other local services
Expand Down Expand Up @@ -304,8 +309,9 @@ where B: BlockchainBackend + 'static
fn spawn_handle_incoming_request(&self, domain_msg: DomainMessage<proto::base_node::BaseNodeServiceRequest>) {
let inbound_nch = self.inbound_nch.clone();
let outbound_message_service = self.outbound_message_service.clone();
let state_machine_handle = self.state_machine_handle.clone();
task::spawn(async move {
let _ = handle_incoming_request(inbound_nch, outbound_message_service, domain_msg)
let _ = handle_incoming_request(inbound_nch, outbound_message_service, state_machine_handle, domain_msg)
.await
.or_else(|err| {
error!(
Expand Down Expand Up @@ -344,7 +350,27 @@ where B: BlockchainBackend + 'static
});
}

fn spawn_handle_incoming_block(&self, new_block: DomainMessage<NewBlock>) {
async fn spawn_handle_incoming_block(&self, new_block: DomainMessage<NewBlock>) {
// Determine if we are bootstrapped
let mut status_watch = self.state_machine_handle.get_status_info_watch();

let bootstrapped = match status_watch.recv().await {
None => false,
Some(s) => match s {
StatusInfo::Listening(li) => li.is_bootstrapped(),
_ => false,
},
};

if !bootstrapped {
debug!(
target: LOG_TARGET,
"Propagated block `{}` from peer `{}` not processed while busy with initial sync.",
new_block.inner.block_hash.to_hex(),
new_block.source_peer.node_id.short_str(),
);
return;
}
let inbound_nch = self.inbound_nch.clone();
task::spawn(async move {
let _ = handle_incoming_block(inbound_nch, new_block).await.or_else(|err| {
Expand Down Expand Up @@ -398,6 +424,7 @@ where B: BlockchainBackend + 'static
async fn handle_incoming_request<B: BlockchainBackend + 'static>(
inbound_nch: InboundNodeCommsHandlers<B>,
mut outbound_message_service: OutboundMessageRequester,
state_machine_handle: StateMachineHandle,
domain_request_msg: DomainMessage<proto::BaseNodeServiceRequest>,
) -> Result<(), BaseNodeServiceError>
{
Expand All @@ -412,16 +439,28 @@ async fn handle_incoming_request<B: BlockchainBackend + 'static>(
.handle_request(&request.try_into().map_err(BaseNodeServiceError::InvalidRequest)?)
.await?;

// Determine if we are synced
let mut status_watch = state_machine_handle.get_status_info_watch();
let is_synced = match status_watch.recv().await {
None => false,
Some(s) => match s {
StatusInfo::Listening(li) => li.is_synced(),
_ => false,
},
};

let message = proto::BaseNodeServiceResponse {
request_key: inner_msg.request_key,
response: Some(response.into()),
is_synced,
};

trace!(
target: LOG_TARGET,
"Attempting outbound message in response to inbound request ({})",
inner_msg.request_key
);

let send_message_response = outbound_message_service
.send_direct(
origin_public_key,
Expand Down Expand Up @@ -472,18 +511,23 @@ async fn handle_incoming_response(
incoming_response: proto::BaseNodeServiceResponse,
) -> Result<(), BaseNodeServiceError>
{
let proto::BaseNodeServiceResponse { request_key, response } = incoming_response;
let proto::BaseNodeServiceResponse {
request_key,
response,
is_synced,
} = incoming_response;
let response: NodeCommsResponse = response
.and_then(|r| r.try_into().ok())
.ok_or_else(|| BaseNodeServiceError::InvalidResponse("Received an invalid base node response".to_string()))?;

if let Some((reply_tx, started)) = waiting_requests.remove(request_key).await {
trace!(
target: LOG_TARGET,
"Response for {} (request key: {}) received after {}ms",
"Response for {} (request key: {}) received after {}ms and is_synced: {}",
response,
&request_key,
started.elapsed().as_millis()
started.elapsed().as_millis(),
is_synced
);
let _ = reply_tx.send(Ok(response).or_else(|resp| {
warn!(
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/base_node/service/service_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ use serde::{Deserialize, Serialize};
pub struct BaseNodeServiceResponse {
pub request_key: RequestKey,
pub response: NodeCommsResponse,
pub is_synced: bool,
}
Loading

0 comments on commit 463f0e3

Please sign in to comment.