Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate block hashes instead of full blocks (network breaking) #1986

Merged
merged 1 commit into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl ChainMetadataService {
BlockEvent::Verified((_, BlockAddResult::ChainReorg(_), _)) => {
self.update_liveness_chain_metadata().await?;
},
BlockEvent::Verified(_) | BlockEvent::Invalid(_) => {},
_ => {},
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ pub enum CommsInterfaceError {
/// Failure in broadcast DHT middleware
BroadcastFailed,
DifficultyAdjustmentManagerError(ConsensusManagerError),
#[error(msg_embedded, non_std, no_from)]
InvalidPeerResponse(String),
}
141 changes: 102 additions & 39 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
},
OutboundNodeCommsInterface,
},
blocks::{blockheader::BlockHeader, Block, NewBlockTemplate},
blocks::{blockheader::BlockHeader, Block, NewBlock, NewBlockTemplate},
chain_storage::{
async_db,
BlockAddResult,
Expand All @@ -53,6 +53,7 @@ use std::{
use strum_macros::Display;
use tari_comms::peer_manager::NodeId;
use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex};
use tokio::sync::Semaphore;

const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
const MAX_HEADERS_PER_RESPONSE: u32 = 100;
Expand All @@ -69,6 +70,13 @@ pub enum BlockEvent {
#[derive(Debug, Clone, Copy)]
pub struct Broadcast(bool);

impl Broadcast {
#[inline]
pub fn is_true(&self) -> bool {
self.0
}
}

#[allow(clippy::identity_op)]
impl Display for Broadcast {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
Expand All @@ -89,13 +97,12 @@ impl From<bool> for Broadcast {
}

/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
pub struct InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
pub struct InboundNodeCommsHandlers<T> {
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
new_block_request_semaphore: Arc<Semaphore>,
outbound_nci: OutboundNodeCommsInterface,
}

Expand All @@ -116,13 +123,14 @@ where T: BlockchainBackend + 'static
blockchain_db,
mempool,
consensus_manager,
new_block_request_semaphore: Arc::new(Semaphore::new(1)),
outbound_nci,
}
}

/// Handle inbound node comms requests from remote nodes and local services.
pub async fn handle_request(&self, request: &NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
debug!(target: LOG_TARGET, "Handling remote request: {}", request);
debug!(target: LOG_TARGET, "Handling remote request {}", request);
match request {
NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
async_db::get_metadata(self.blockchain_db.clone()).await?,
Expand Down Expand Up @@ -308,14 +316,69 @@ where T: BlockchainBackend + 'static
}
}

/// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
/// requests for the full block.
/// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
/// executed in a dedicated task.
pub async fn handle_new_block_message(
&mut self,
new_block: NewBlock,
source_peer: NodeId,
) -> Result<(), CommsInterfaceError>
{
let NewBlock { block_hash } = new_block;

// Only a single block request can complete at a time.
// As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for
// the same full block. The first request that succeeds will stop the node from requesting the block from any
// other node (block_exists is true).
let _permit = self.new_block_request_semaphore.acquire().await;

if async_db::block_exists(self.blockchain_db.clone(), block_hash.clone()).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
block_hash.to_hex()
);
return Ok(());
}

debug!(
target: LOG_TARGET,
"Block with hash `{}` is unknown. Requesting it from peer `{}`.",
block_hash.to_hex(),
source_peer.short_str()
);
let mut block = self
.outbound_nci
.request_blocks_with_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

match block.pop() {
Some(block) => self.handle_block(block.block, true.into(), Some(source_peer)).await,
None => {
// TODO: #banheuristic - peer propagated block hash for which it could not return the full block
debug!(
target: LOG_TARGET,
"Peer `{}` failed to return the block that was requested.",
source_peer.short_str()
);
Err(CommsInterfaceError::InvalidPeerResponse(format!(
"Invalid response from peer `{}`: Peer failed to provide the block that was propagated",
source_peer.short_str()
)))
},
}
}

/// Handle inbound blocks from remote nodes and local services.
pub async fn handle_block(
&mut self,
block_context: &(Block, Broadcast),
&self,
block: Block,
broadcast: Broadcast,
source_peer: Option<NodeId>,
) -> Result<(), CommsInterfaceError>
{
let (block, broadcast) = block_context;
let block_hash = block.hash();
debug!(
target: LOG_TARGET,
Expand All @@ -330,11 +393,30 @@ where T: BlockchainBackend + 'static
trace!(target: LOG_TARGET, "Block: {}", block);
let add_block_result = async_db::add_block(self.blockchain_db.clone(), block.clone()).await;
// Create block event on block event stream
let mut result = Ok(());
let block_event = match add_block_result.clone() {
match add_block_result {
Ok(block_add_result) => {
trace!(target: LOG_TARGET, "Block event created: {}", block_add_result);
BlockEvent::Verified((Box::new(block.clone()), block_add_result, *broadcast))

let should_propagate = match &block_add_result {
BlockAddResult::Ok => true,
BlockAddResult::BlockExists => false,
BlockAddResult::OrphanBlock => false,
BlockAddResult::ChainReorg(_) => true,
};

self.publish_block_event(BlockEvent::Verified((Box::new(block), block_add_result, broadcast)));

if should_propagate && broadcast.is_true() {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
block_hash.to_hex()
);
let exclude_peers = source_peer.into_iter().collect();
let new_block = NewBlock::new(block_hash);
self.outbound_nci.propagate_block(new_block, exclude_peers).await?;
}
Ok(())
},
Err(e) => {
warn!(
Expand All @@ -344,33 +426,16 @@ where T: BlockchainBackend + 'static
block_hash.to_hex(),
e
);
result = Err(CommsInterfaceError::ChainStorageError(e.clone()));
BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast))
self.publish_block_event(BlockEvent::Invalid((Box::new(block), e.clone(), broadcast)));
Err(CommsInterfaceError::ChainStorageError(e))
},
};
self.block_event_sender
.send(Arc::new(block_event))
.map_err(|_| CommsInterfaceError::EventStreamError)?;
}
}

// Propagate verified block to remote nodes
if let Ok(add_block_result) = add_block_result {
let propagate = match add_block_result {
BlockAddResult::Ok => true,
BlockAddResult::BlockExists => false,
BlockAddResult::OrphanBlock => false,
BlockAddResult::ChainReorg(_) => true,
};
if propagate && bool::from(*broadcast) {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
block.hash().to_hex()
);
let exclude_peers = source_peer.into_iter().collect();
self.outbound_nci.propagate_block(block.clone(), exclude_peers).await?;
}
fn publish_block_event(&self, event: BlockEvent) {
if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
}
result
}

async fn get_target_difficulty(&self, pow_algo: PowAlgorithm) -> Result<Difficulty, CommsInterfaceError> {
Expand Down Expand Up @@ -402,16 +467,14 @@ where T: BlockchainBackend + 'static
}
}

impl<T> Clone for InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
impl<T> Clone for InboundNodeCommsHandlers<T> {
fn clone(&self) -> Self {
// All members use Arc's internally so calling clone should be cheap.
Self {
block_event_sender: self.block_event_sender.clone(),
blockchain_db: self.blockchain_db.clone(),
mempool: self.mempool.clone(),
consensus_manager: self.consensus_manager.clone(),
new_block_request_semaphore: self.new_block_request_semaphore.clone(),
outbound_nci: self.outbound_nci.clone(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{
base_node::comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse},
blocks::{blockheader::BlockHeader, Block},
blocks::{blockheader::BlockHeader, NewBlock},
chain_storage::{ChainMetadata, HistoricalBlock, MmrTree},
transactions::{
transaction::{TransactionKernel, TransactionOutput},
Expand All @@ -41,7 +41,7 @@ pub const LOG_TARGET: &str = "c::bn::comms_interface::outbound_interface";
#[derive(Clone)]
pub struct OutboundNodeCommsInterface {
request_sender: SenderService<(NodeCommsRequest, Option<NodeId>), Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: UnboundedSender<(Block, Vec<NodeId>)>,
block_sender: UnboundedSender<(NewBlock, Vec<NodeId>)>,
}

impl OutboundNodeCommsInterface {
Expand All @@ -51,7 +51,7 @@ impl OutboundNodeCommsInterface {
(NodeCommsRequest, Option<NodeId>),
Result<NodeCommsResponse, CommsInterfaceError>,
>,
block_sender: UnboundedSender<(Block, Vec<NodeId>)>,
block_sender: UnboundedSender<(NewBlock, Vec<NodeId>)>,
) -> Self
{
Self {
Expand Down Expand Up @@ -268,13 +268,13 @@ impl OutboundNodeCommsInterface {

/// Transmit a block to remote base nodes, excluding the provided peers.
pub async fn propagate_block(
&mut self,
block: Block,
&self,
new_block: NewBlock,
exclude_peers: Vec<NodeId>,
) -> Result<(), CommsInterfaceError>
{
self.block_sender
.unbounded_send((block, exclude_peers))
.unbounded_send((new_block, exclude_peers))
.map_err(|_| CommsInterfaceError::BroadcastFailed)
}

Expand Down
18 changes: 8 additions & 10 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
proto,
service::service::{BaseNodeService, BaseNodeServiceConfig, BaseNodeStreams},
},
blocks::Block,
blocks::NewBlock,
chain_storage::{BlockchainBackend, BlockchainDatabase},
consensus::ConsensusManager,
mempool::Mempool,
Expand Down Expand Up @@ -55,9 +55,7 @@ const LOG_TARGET: &str = "c::bn::service::initializer";
const SUBSCRIPTION_LABEL: &str = "Base Node";

/// Initializer for the Base Node service handle and service future.
pub struct BaseNodeServiceInitializer<T>
where T: BlockchainBackend
{
pub struct BaseNodeServiceInitializer<T> {
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
Expand Down Expand Up @@ -103,15 +101,15 @@ where T: BlockchainBackend
}

/// Create a stream of 'New Block` messages
fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<Block>> {
fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<NewBlock>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::NewBlock, SUBSCRIPTION_LABEL)
.filter_map(extract_block)
}
}

async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<Block>> {
match msg.decode_message::<shared_protos::core::Block>() {
async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<NewBlock>> {
match msg.decode_message::<shared_protos::core::NewBlock>() {
Err(e) => {
warn!(
target: LOG_TARGET,
Expand All @@ -120,10 +118,10 @@ async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<Block>> {
);
None
},
Ok(block) => {
let block = match Block::try_from(block) {
Ok(new_block) => {
let block = match NewBlock::try_from(new_block) {
Err(e) => {
let origin = &msg.source_peer.public_key;
let origin = &msg.source_peer.node_id;
warn!(
target: LOG_TARGET,
"Inbound block message from {} was ill-formed. {}", origin, e
Expand Down
Loading