Skip to content

Commit

Permalink
Propagate block hashes instead of full blocks (network breaking)
Browse files Browse the repository at this point in the history
- Nodes propagate a block hash (`NewBlock` message = 33 bytes) instead
  of the full block.  This will significantly reduce network bandwidth
  usage for block propagation.
- On receipt of a block hash a node checks if it has the block.
  If so, it simply ignores the message. Otherwise, it requests the
  full block from the peer that sent the `NewBlock` message. If the
  block is valid and has been added to the node's blockchain db,
  the node propagates the block hash message to other peers.
- Change block propagate test to tests a few block hashes being propagated rather
  than a single block.
- Changed invalid block test to check that an invalid block is not
  proagated.
- New test to check that an invalid block hash is not propagated
- Cleaned up some unecessary generics on structs
  • Loading branch information
sdbondi committed Jun 12, 2020
1 parent a4fc030 commit de31f7e
Show file tree
Hide file tree
Showing 28 changed files with 476 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl ChainMetadataService {
BlockEvent::Verified((_, BlockAddResult::ChainReorg(_), _)) => {
self.update_liveness_chain_metadata().await?;
},
BlockEvent::Verified(_) | BlockEvent::Invalid(_) => {},
_ => {},
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ pub enum CommsInterfaceError {
/// Failure in broadcast DHT middleware
BroadcastFailed,
DifficultyAdjustmentManagerError(ConsensusManagerError),
#[error(msg_embedded, non_std, no_from)]
InvalidPeerResponse(String),
}
144 changes: 104 additions & 40 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
},
OutboundNodeCommsInterface,
},
blocks::{blockheader::BlockHeader, Block, NewBlockTemplate},
blocks::{blockheader::BlockHeader, Block, NewBlock, NewBlockTemplate},
chain_storage::{
async_db,
BlockAddResult,
Expand All @@ -53,6 +53,7 @@ use std::{
use strum_macros::Display;
use tari_comms::peer_manager::NodeId;
use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex};
use tokio::sync::Semaphore;

const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
const MAX_HEADERS_PER_RESPONSE: u32 = 100;
Expand All @@ -69,6 +70,13 @@ pub enum BlockEvent {
#[derive(Debug, Clone, Copy)]
pub struct Broadcast(bool);

impl Broadcast {
#[inline]
pub fn is_true(&self) -> bool {
self.0
}
}

#[allow(clippy::identity_op)]
impl Display for Broadcast {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
Expand All @@ -89,13 +97,12 @@ impl From<bool> for Broadcast {
}

/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
pub struct InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
pub struct InboundNodeCommsHandlers<T> {
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
new_block_request_semaphore: Arc<Semaphore>,
outbound_nci: OutboundNodeCommsInterface,
}

Expand All @@ -116,13 +123,14 @@ where T: BlockchainBackend + 'static
blockchain_db,
mempool,
consensus_manager,
new_block_request_semaphore: Arc::new(Semaphore::new(1)),
outbound_nci,
}
}

/// Handle inbound node comms requests from remote nodes and local services.
pub async fn handle_request(&self, request: &NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
debug!(target: LOG_TARGET, "Handling remote request: {}", request);
debug!(target: LOG_TARGET, "Handling remote request {}", request);
match request {
NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
async_db::get_metadata(self.blockchain_db.clone()).await?,
Expand Down Expand Up @@ -308,19 +316,75 @@ where T: BlockchainBackend + 'static
}
}

/// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
/// requests for the full block.
/// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
/// executed in a dedicated task.
pub async fn handle_new_block_message(
&mut self,
new_block: NewBlock,
source_peer: NodeId,
) -> Result<(), CommsInterfaceError>
{
let NewBlock { block_hash } = new_block;

// Only a single block request can complete at a time.
// As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for
// the same full block. The first request that succeeds will stop the node from requesting the block from any
// other node (block_exists is true).
let _permit = self.new_block_request_semaphore.acquire().await;

if async_db::block_exists(self.blockchain_db.clone(), block_hash.clone()).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
block_hash.to_hex()
);
return Ok(());
}

debug!(
target: LOG_TARGET,
"Block with hash `{}` is unknown. Requesting it from peer `{}`.",
block_hash.to_hex(),
source_peer.short_str()
);
let mut block = self
.outbound_nci
.request_blocks_with_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

match block.pop() {
Some(block) => self.handle_block(block.block, true.into(), Some(source_peer)).await,
None => {
// TODO: #banheuristic - peer propagated block hash for which it could not return the full block
debug!(
target: LOG_TARGET,
"Peer `{}` failed to return the block that was requested.",
source_peer.short_str()
);
Err(CommsInterfaceError::InvalidPeerResponse(format!(
"Invalid response from peer `{}`: Peer failed to provide the block that was propagated",
source_peer.short_str()
)))
},
}
}

/// Handle inbound blocks from remote nodes and local services.
pub async fn handle_block(
&mut self,
block_context: &(Block, Broadcast),
&self,
block: Block,
broadcast: Broadcast,
source_peer: Option<NodeId>,
) -> Result<(), CommsInterfaceError>
{
let (block, broadcast) = block_context;
let block_hash = block.hash();
debug!(
target: LOG_TARGET,
"Block #{} ({}) received from {}",
block.header.height,
block.hash().to_hex(),
block_hash.to_hex(),
source_peer
.as_ref()
.map(|p| format!("remote peer: {}", p))
Expand All @@ -329,41 +393,43 @@ where T: BlockchainBackend + 'static
trace!(target: LOG_TARGET, "Block: {}", block);
let add_block_result = async_db::add_block(self.blockchain_db.clone(), block.clone()).await;
// Create block event on block event stream
let mut result = Ok(());
let block_event = match add_block_result.clone() {
match add_block_result {
Ok(block_add_result) => {
trace!(target: LOG_TARGET, "Block event created: {}", block_add_result);
BlockEvent::Verified((Box::new(block.clone()), block_add_result, *broadcast))

let should_propagate = match &block_add_result {
BlockAddResult::Ok => true,
BlockAddResult::BlockExists => false,
BlockAddResult::OrphanBlock => false,
BlockAddResult::ChainReorg(_) => true,
};

self.publish_block_event(BlockEvent::Verified((Box::new(block), block_add_result, broadcast)));

if should_propagate && broadcast.is_true() {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
block_hash.to_hex()
);
let exclude_peers = source_peer.into_iter().collect();
let new_block = NewBlock::new(block_hash);
self.outbound_nci.propagate_block(new_block, exclude_peers).await?;
}
Ok(())
},
Err(e) => {
warn!(target: LOG_TARGET, "Block validation failed: {:?}", e);
result = Err(CommsInterfaceError::ChainStorageError(e.clone()));
BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast))
self.publish_block_event(BlockEvent::Invalid((Box::new(block), e.clone(), broadcast)));
Err(CommsInterfaceError::ChainStorageError(e))
},
};
self.block_event_sender
.send(Arc::new(block_event))
.map_err(|_| CommsInterfaceError::EventStreamError)?;
}
}

// Propagate verified block to remote nodes
if let Ok(add_block_result) = add_block_result {
let propagate = match add_block_result {
BlockAddResult::Ok => true,
BlockAddResult::BlockExists => false,
BlockAddResult::OrphanBlock => false,
BlockAddResult::ChainReorg(_) => true,
};
if propagate && bool::from(*broadcast) {
info!(
target: LOG_TARGET,
"Propagate block ({}) to network.",
block.hash().to_hex()
);
let exclude_peers = source_peer.into_iter().collect();
self.outbound_nci.propagate_block(block.clone(), exclude_peers).await?;
}
fn publish_block_event(&self, event: BlockEvent) {
if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
}
result
}

async fn get_target_difficulty(&self, pow_algo: PowAlgorithm) -> Result<Difficulty, CommsInterfaceError> {
Expand Down Expand Up @@ -395,16 +461,14 @@ where T: BlockchainBackend + 'static
}
}

impl<T> Clone for InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
impl<T> Clone for InboundNodeCommsHandlers<T> {
fn clone(&self) -> Self {
// All members use Arc's internally so calling clone should be cheap.
Self {
block_event_sender: self.block_event_sender.clone(),
blockchain_db: self.blockchain_db.clone(),
mempool: self.mempool.clone(),
consensus_manager: self.consensus_manager.clone(),
new_block_request_semaphore: self.new_block_request_semaphore.clone(),
outbound_nci: self.outbound_nci.clone(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{
base_node::comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse},
blocks::{blockheader::BlockHeader, Block},
blocks::{blockheader::BlockHeader, NewBlock},
chain_storage::{ChainMetadata, HistoricalBlock, MmrTree},
transactions::{
transaction::{TransactionKernel, TransactionOutput},
Expand All @@ -41,7 +41,7 @@ pub const LOG_TARGET: &str = "c::bn::comms_interface::outbound_interface";
#[derive(Clone)]
pub struct OutboundNodeCommsInterface {
request_sender: SenderService<(NodeCommsRequest, Option<NodeId>), Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: UnboundedSender<(Block, Vec<NodeId>)>,
block_sender: UnboundedSender<(NewBlock, Vec<NodeId>)>,
}

impl OutboundNodeCommsInterface {
Expand All @@ -51,7 +51,7 @@ impl OutboundNodeCommsInterface {
(NodeCommsRequest, Option<NodeId>),
Result<NodeCommsResponse, CommsInterfaceError>,
>,
block_sender: UnboundedSender<(Block, Vec<NodeId>)>,
block_sender: UnboundedSender<(NewBlock, Vec<NodeId>)>,
) -> Self
{
Self {
Expand Down Expand Up @@ -268,13 +268,13 @@ impl OutboundNodeCommsInterface {

/// Transmit a block to remote base nodes, excluding the provided peers.
pub async fn propagate_block(
&mut self,
block: Block,
&self,
new_block: NewBlock,
exclude_peers: Vec<NodeId>,
) -> Result<(), CommsInterfaceError>
{
self.block_sender
.unbounded_send((block, exclude_peers))
.unbounded_send((new_block, exclude_peers))
.map_err(|_| CommsInterfaceError::BroadcastFailed)
}

Expand Down
18 changes: 8 additions & 10 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
proto,
service::service::{BaseNodeService, BaseNodeServiceConfig, BaseNodeStreams},
},
blocks::Block,
blocks::NewBlock,
chain_storage::{BlockchainBackend, BlockchainDatabase},
consensus::ConsensusManager,
mempool::Mempool,
Expand Down Expand Up @@ -55,9 +55,7 @@ const LOG_TARGET: &str = "c::bn::service::initializer";
const SUBSCRIPTION_LABEL: &str = "Base Node";

/// Initializer for the Base Node service handle and service future.
pub struct BaseNodeServiceInitializer<T>
where T: BlockchainBackend
{
pub struct BaseNodeServiceInitializer<T> {
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
Expand Down Expand Up @@ -103,15 +101,15 @@ where T: BlockchainBackend
}

/// Create a stream of 'New Block` messages
fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<Block>> {
fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<NewBlock>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::NewBlock, SUBSCRIPTION_LABEL)
.filter_map(extract_block)
}
}

async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<Block>> {
match msg.decode_message::<shared_protos::core::Block>() {
async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<NewBlock>> {
match msg.decode_message::<shared_protos::core::NewBlock>() {
Err(e) => {
warn!(
target: LOG_TARGET,
Expand All @@ -120,10 +118,10 @@ async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<Block>> {
);
None
},
Ok(block) => {
let block = match Block::try_from(block) {
Ok(new_block) => {
let block = match NewBlock::try_from(new_block) {
Err(e) => {
let origin = &msg.source_peer.public_key;
let origin = &msg.source_peer.node_id;
warn!(
target: LOG_TARGET,
"Inbound block message from {} was ill-formed. {}", origin, e
Expand Down
Loading

0 comments on commit de31f7e

Please sign in to comment.