diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index af06c3eb4f..7eeeb90821 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -116,7 +116,7 @@ impl ChainMetadataService { BlockEvent::Verified((_, BlockAddResult::ChainReorg(_), _)) => { self.update_liveness_chain_metadata().await?; }, - BlockEvent::Verified(_) | BlockEvent::Invalid(_) => {}, + _ => {}, } Ok(()) diff --git a/base_layer/core/src/base_node/comms_interface/error.rs b/base_layer/core/src/base_node/comms_interface/error.rs index f02be25bba..0aea8087c7 100644 --- a/base_layer/core/src/base_node/comms_interface/error.rs +++ b/base_layer/core/src/base_node/comms_interface/error.rs @@ -40,4 +40,6 @@ pub enum CommsInterfaceError { /// Failure in broadcast DHT middleware BroadcastFailed, DifficultyAdjustmentManagerError(ConsensusManagerError), + #[error(msg_embedded, non_std, no_from)] + InvalidPeerResponse(String), } diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 581f1750b0..51a8f3520f 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -30,7 +30,7 @@ use crate::{ }, OutboundNodeCommsInterface, }, - blocks::{blockheader::BlockHeader, Block, NewBlockTemplate}, + blocks::{blockheader::BlockHeader, Block, NewBlock, NewBlockTemplate}, chain_storage::{ async_db, BlockAddResult, @@ -53,6 +53,7 @@ use std::{ use strum_macros::Display; use tari_comms::peer_manager::NodeId; use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex}; +use tokio::sync::Semaphore; const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler"; const MAX_HEADERS_PER_RESPONSE: u32 = 100; @@ -69,6 +70,13 @@ pub enum BlockEvent { #[derive(Debug, Clone, Copy)] pub struct Broadcast(bool); +impl Broadcast { + #[inline] + pub fn is_true(&self) -> bool { + self.0 + } +} + #[allow(clippy::identity_op)] impl Display for Broadcast { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { @@ -89,13 +97,12 @@ impl From for Broadcast { } /// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes. -pub struct InboundNodeCommsHandlers -where T: BlockchainBackend + 'static -{ +pub struct InboundNodeCommsHandlers { block_event_sender: BlockEventSender, blockchain_db: BlockchainDatabase, mempool: Mempool, consensus_manager: ConsensusManager, + new_block_request_semaphore: Arc, outbound_nci: OutboundNodeCommsInterface, } @@ -116,13 +123,14 @@ where T: BlockchainBackend + 'static blockchain_db, mempool, consensus_manager, + new_block_request_semaphore: Arc::new(Semaphore::new(1)), outbound_nci, } } /// Handle inbound node comms requests from remote nodes and local services. pub async fn handle_request(&self, request: &NodeCommsRequest) -> Result { - debug!(target: LOG_TARGET, "Handling remote request: {}", request); + debug!(target: LOG_TARGET, "Handling remote request {}", request); match request { NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata( async_db::get_metadata(self.blockchain_db.clone()).await?, @@ -308,10 +316,66 @@ where T: BlockchainBackend + 'static } } + /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous + /// requests for the full block. + /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be + /// executed in a dedicated task. + pub async fn handle_new_block_message( + &mut self, + new_block: NewBlock, + source_peer: NodeId, + ) -> Result<(), CommsInterfaceError> + { + let NewBlock { block_hash } = new_block; + + // Only a single block request can complete at a time. + // As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for + // the same full block. The first request that succeeds will stop the node from requesting the block from any + // other node (block_exists is true). + let _permit = self.new_block_request_semaphore.acquire().await; + + if async_db::block_exists(self.blockchain_db.clone(), block_hash.clone()).await? { + debug!( + target: LOG_TARGET, + "Block with hash `{}` already stored", + block_hash.to_hex() + ); + return Ok(()); + } + + debug!( + target: LOG_TARGET, + "Block with hash `{}` is unknown. Requesting it from peer `{}`.", + block_hash.to_hex(), + source_peer.short_str() + ); + let mut block = self + .outbound_nci + .request_blocks_with_hashes_from_peer(vec![block_hash], Some(source_peer.clone())) + .await?; + + match block.pop() { + Some(block) => self.handle_block(block.block, true.into(), Some(source_peer)).await, + None => { + // TODO: #banheuristic - peer propagated block hash for which it could not return the full block + debug!( + target: LOG_TARGET, + "Peer `{}` failed to return the block that was requested.", + source_peer.short_str() + ); + Err(CommsInterfaceError::InvalidPeerResponse(format!( + "Invalid response from peer `{}`: Peer failed to provide the block that was propagated", + source_peer.short_str() + ))) + }, + } + } + /// Handle inbound blocks from remote nodes and local services. pub async fn handle_block( - &mut self, - block_context: &(Block, Broadcast), + &self, + block: Block, + broadcast: Broadcast, source_peer: Option, ) -> Result<(), CommsInterfaceError> { @@ -330,11 +394,30 @@ where T: BlockchainBackend + 'static trace!(target: LOG_TARGET, "Block: {}", block); let add_block_result = async_db::add_block(self.blockchain_db.clone(), block.clone()).await; // Create block event on block event stream - let mut result = Ok(()); - let block_event = match add_block_result.clone() { + match add_block_result { Ok(block_add_result) => { trace!(target: LOG_TARGET, "Block event created: {}", block_add_result); - BlockEvent::Verified((Box::new(block.clone()), block_add_result, *broadcast)) + + let should_propagate = match &block_add_result { + BlockAddResult::Ok => true, + BlockAddResult::BlockExists => false, + BlockAddResult::OrphanBlock => false, + BlockAddResult::ChainReorg(_) => true, + }; + + self.publish_block_event(BlockEvent::Verified((Box::new(block), block_add_result, broadcast))); + + if should_propagate && broadcast.is_true() { + info!( + target: LOG_TARGET, + "Propagate block ({}) to network.", + block_hash.to_hex() + ); + let exclude_peers = source_peer.into_iter().collect(); + let new_block = NewBlock::new(block_hash); + self.outbound_nci.propagate_block(new_block, exclude_peers).await?; + } + Ok(()) }, Err(e) => { warn!( @@ -344,33 +427,16 @@ where T: BlockchainBackend + 'static block_hash.to_hex(), e ); - result = Err(CommsInterfaceError::ChainStorageError(e.clone())); - BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast)) + self.publish_block_event(BlockEvent::Invalid((Box::new(block), e.clone(), broadcast))); + Err(CommsInterfaceError::ChainStorageError(e)) }, - }; - self.block_event_sender - .send(Arc::new(block_event)) - .map_err(|_| CommsInterfaceError::EventStreamError)?; + } + } - // Propagate verified block to remote nodes - if let Ok(add_block_result) = add_block_result { - let propagate = match add_block_result { - BlockAddResult::Ok => true, - BlockAddResult::BlockExists => false, - BlockAddResult::OrphanBlock => false, - BlockAddResult::ChainReorg(_) => true, - }; - if propagate && bool::from(*broadcast) { - info!( - target: LOG_TARGET, - "Propagate block ({}) to network.", - block.hash().to_hex() - ); - let exclude_peers = source_peer.into_iter().collect(); - self.outbound_nci.propagate_block(block.clone(), exclude_peers).await?; - } + fn publish_block_event(&self, event: BlockEvent) { + if let Err(event) = self.block_event_sender.send(Arc::new(event)) { + debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0) } - result } async fn get_target_difficulty(&self, pow_algo: PowAlgorithm) -> Result { @@ -402,16 +468,14 @@ where T: BlockchainBackend + 'static } } -impl Clone for InboundNodeCommsHandlers -where T: BlockchainBackend + 'static -{ +impl Clone for InboundNodeCommsHandlers { fn clone(&self) -> Self { - // All members use Arc's internally so calling clone should be cheap. Self { block_event_sender: self.block_event_sender.clone(), blockchain_db: self.blockchain_db.clone(), mempool: self.mempool.clone(), consensus_manager: self.consensus_manager.clone(), + new_block_request_semaphore: self.new_block_request_semaphore.clone(), outbound_nci: self.outbound_nci.clone(), } } diff --git a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs index 8a0a5d56fa..25d9eb0264 100644 --- a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs @@ -22,7 +22,7 @@ use crate::{ base_node::comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse}, - blocks::{blockheader::BlockHeader, Block}, + blocks::{blockheader::BlockHeader, NewBlock}, chain_storage::{ChainMetadata, HistoricalBlock, MmrTree}, transactions::{ transaction::{TransactionKernel, TransactionOutput}, @@ -41,7 +41,7 @@ pub const LOG_TARGET: &str = "c::bn::comms_interface::outbound_interface"; #[derive(Clone)] pub struct OutboundNodeCommsInterface { request_sender: SenderService<(NodeCommsRequest, Option), Result>, - block_sender: UnboundedSender<(Block, Vec)>, + block_sender: UnboundedSender<(NewBlock, Vec)>, } impl OutboundNodeCommsInterface { @@ -51,7 +51,7 @@ impl OutboundNodeCommsInterface { (NodeCommsRequest, Option), Result, >, - block_sender: UnboundedSender<(Block, Vec)>, + block_sender: UnboundedSender<(NewBlock, Vec)>, ) -> Self { Self { @@ -268,13 +268,13 @@ impl OutboundNodeCommsInterface { /// Transmit a block to remote base nodes, excluding the provided peers. pub async fn propagate_block( - &mut self, - block: Block, + &self, + new_block: NewBlock, exclude_peers: Vec, ) -> Result<(), CommsInterfaceError> { self.block_sender - .unbounded_send((block, exclude_peers)) + .unbounded_send((new_block, exclude_peers)) .map_err(|_| CommsInterfaceError::BroadcastFailed) } diff --git a/base_layer/core/src/base_node/service/initializer.rs b/base_layer/core/src/base_node/service/initializer.rs index 173f80f6b9..b62f425c45 100644 --- a/base_layer/core/src/base_node/service/initializer.rs +++ b/base_layer/core/src/base_node/service/initializer.rs @@ -26,7 +26,7 @@ use crate::{ proto, service::service::{BaseNodeService, BaseNodeServiceConfig, BaseNodeStreams}, }, - blocks::Block, + blocks::NewBlock, chain_storage::{BlockchainBackend, BlockchainDatabase}, consensus::ConsensusManager, mempool::Mempool, @@ -55,9 +55,7 @@ const LOG_TARGET: &str = "c::bn::service::initializer"; const SUBSCRIPTION_LABEL: &str = "Base Node"; /// Initializer for the Base Node service handle and service future. -pub struct BaseNodeServiceInitializer -where T: BlockchainBackend -{ +pub struct BaseNodeServiceInitializer { inbound_message_subscription_factory: Arc, blockchain_db: BlockchainDatabase, mempool: Mempool, @@ -103,15 +101,15 @@ where T: BlockchainBackend } /// Create a stream of 'New Block` messages - fn inbound_block_stream(&self) -> impl Stream> { + fn inbound_block_stream(&self) -> impl Stream> { self.inbound_message_subscription_factory .get_subscription(TariMessageType::NewBlock, SUBSCRIPTION_LABEL) .filter_map(extract_block) } } -async fn extract_block(msg: Arc) -> Option> { - match msg.decode_message::() { +async fn extract_block(msg: Arc) -> Option> { + match msg.decode_message::() { Err(e) => { warn!( target: LOG_TARGET, @@ -120,10 +118,10 @@ async fn extract_block(msg: Arc) -> Option> { ); None }, - Ok(block) => { - let block = match Block::try_from(block) { + Ok(new_block) => { + let block = match NewBlock::try_from(new_block) { Err(e) => { - let origin = &msg.source_peer.public_key; + let origin = &msg.source_peer.node_id; warn!( target: LOG_TARGET, "Inbound block message from {} was ill-formed. {}", origin, e diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index f43287b8f4..767d0b15c6 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -36,9 +36,9 @@ use crate::{ RequestKey, WaitingRequests, }, - blocks::Block, + blocks::{Block, NewBlock}, chain_storage::BlockchainBackend, - proto::core::Block as ProtoBlock, + proto as shared_protos, }; use futures::{ channel::{ @@ -59,7 +59,7 @@ use tari_comms_dht::{ envelope::NodeDestination, outbound::{OutboundEncryption, OutboundMessageRequester, SendMessageParams}, }; -use tari_crypto::tari_utilities::{hex::Hex, Hashable}; +use tari_crypto::tari_utilities::hex::Hex; use tari_p2p::{domain_message::DomainMessage, tari_message::TariMessageType}; use tari_service_framework::RequestContext; use tokio::task; @@ -86,12 +86,22 @@ impl Default for BaseNodeServiceConfig { /// A convenience struct to hold all the BaseNode streams pub struct BaseNodeStreams { + /// `NodeCommsRequest` messages to send to a remote peer. If a specific peer is not provided, a random peer is + /// chosen. outbound_request_stream: SOutReq, - outbound_block_stream: UnboundedReceiver<(Block, Vec)>, + /// Blocks to be propagated out to the network. The second element of the tuple is a list of peers to exclude from + /// this round of propagation + outbound_block_stream: UnboundedReceiver<(NewBlock, Vec)>, + /// `BaseNodeRequest` messages received from external peers inbound_request_stream: SInReq, + /// `BaseNodeResponse` messages received from external peers inbound_response_stream: SInRes, + /// `NewBlock` messages received from external peers inbound_block_stream: SBlockIn, + /// Incoming local request messages from the LocalNodeCommsInterface and other local services local_request_stream: SLocalReq, + /// The stream of blocks sent from local services `LocalCommsNodeInterface::submit_block` e.g. block sync and + /// miner local_block_stream: SLocalBlock, } @@ -103,13 +113,13 @@ where >, SInReq: Stream>, SInRes: Stream>, - SBlockIn: Stream>, + SBlockIn: Stream>, SLocalReq: Stream>>, SLocalBlock: Stream>>, { pub fn new( outbound_request_stream: SOutReq, - outbound_block_stream: UnboundedReceiver<(Block, Vec)>, + outbound_block_stream: UnboundedReceiver<(NewBlock, Vec)>, inbound_request_stream: SInReq, inbound_response_stream: SInRes, inbound_block_stream: SBlockIn, @@ -131,7 +141,7 @@ where /// The Base Node Service is responsible for handling inbound requests and responses and for sending new requests to /// remote Base Node Services. -pub struct BaseNodeService { +pub struct BaseNodeService { outbound_message_service: OutboundMessageRequester, inbound_nch: InboundNodeCommsHandlers, waiting_requests: WaitingRequests>, @@ -170,7 +180,7 @@ where B: BlockchainBackend + 'static >, SInReq: Stream>, SInRes: Stream>, - SBlockIn: Stream>, + SBlockIn: Stream>, SLocalReq: Stream>>, SLocalBlock: Stream>>, { @@ -231,7 +241,7 @@ where B: BlockchainBackend + 'static self.spawn_handle_local_request(local_request_context); }, - // Incoming local block messages from the LocalNodeCommsInterface and other local services + // Incoming local block messages from the LocalNodeCommsInterface e.g. miner and block sync local_block_context = local_block_stream.select_next_some() => { self.spawn_handle_local_block(local_block_context); }, @@ -279,10 +289,10 @@ where B: BlockchainBackend + 'static }); } - fn spawn_handle_outbound_block(&self, block: Block, excluded_peers: Vec) { + fn spawn_handle_outbound_block(&self, new_block: NewBlock, excluded_peers: Vec) { let outbound_message_service = self.outbound_message_service.clone(); task::spawn(async move { - let _ = handle_outbound_block(outbound_message_service, block, excluded_peers) + let _ = handle_outbound_block(outbound_message_service, new_block, excluded_peers) .await .or_else(|err| { error!(target: LOG_TARGET, "Failed to handle outbound block message {:?}", err); @@ -334,10 +344,10 @@ where B: BlockchainBackend + 'static }); } - fn spawn_handle_incoming_block(&self, block_msg: DomainMessage) { + fn spawn_handle_incoming_block(&self, new_block: DomainMessage) { let inbound_nch = self.inbound_nch.clone(); task::spawn(async move { - let _ = handle_incoming_block(inbound_nch, block_msg).await.or_else(|err| { + let _ = handle_incoming_block(inbound_nch, new_block).await.or_else(|err| { error!(target: LOG_TARGET, "Failed to handle incoming block message: {:?}", err); Err(err) }); @@ -369,11 +379,11 @@ where B: BlockchainBackend + 'static block_context: RequestContext<(Block, Broadcast), Result<(), CommsInterfaceError>>, ) { - let mut inbound_nch = self.inbound_nch.clone(); + let inbound_nch = self.inbound_nch.clone(); task::spawn(async move { - let (block, reply_tx) = block_context.split(); + let ((block, broadcast), reply_tx) = block_context.split(); let _ = reply_tx - .send(inbound_nch.handle_block(&block, None).await) + .send(inbound_nch.handle_block(block, broadcast, None).await) .or_else(|err| { error!( target: LOG_TARGET, @@ -419,41 +429,40 @@ async fn handle_incoming_request( ) .await?; + // Wait for the response to be sent and log the result let request_key = inner_msg.request_key; - tokio::spawn(async move { - match send_message_response.resolve().await { - Err(err) => { - error!( + match send_message_response.resolve().await { + Err(err) => { + error!( + target: LOG_TARGET, + "Incoming request ({}) response failed to send: {}", request_key, err + ); + }, + Ok(send_states) => { + let msg_tag = send_states[0].tag; + trace!( + target: LOG_TARGET, + "Incoming request ({}) response queued with {}", + request_key, + &msg_tag, + ); + if send_states.wait_single().await { + trace!( target: LOG_TARGET, - "Incoming request ({}) response failed to send: {}", request_key, err + "Incoming request ({}) response Direct Send was successful {}", + request_key, + msg_tag ); - }, - Ok(send_states) => { - let msg_tag = send_states[0].tag; - trace!( + } else { + error!( target: LOG_TARGET, - "Incoming request ({}) response queued with {}", + "Incoming request ({}) response Direct Send was unsuccessful and no message was sent {}", request_key, - &msg_tag, + msg_tag ); - if send_states.wait_single().await { - trace!( - target: LOG_TARGET, - "Incoming request ({}) response Direct Send was successful {}", - request_key, - msg_tag - ); - } else { - error!( - target: LOG_TARGET, - "Incoming request ({}) response Direct Send was unsuccessful and no message was sent {}", - request_key, - msg_tag - ); - } - }, - }; - }); + } + }, + }; Ok(()) } @@ -526,7 +535,7 @@ async fn handle_outbound_request( }, Ok(send_states) => { // Wait for matching responses to arrive - waiting_requests.insert(request_key, Some(reply_tx)).await; + waiting_requests.insert(request_key, reply_tx).await; // Spawn timeout for waiting_request spawn_request_timeout(timeout_sender, request_key, config.request_timeout); // Log messages @@ -535,20 +544,18 @@ async fn handle_outbound_request( target: LOG_TARGET, "Outbound request ({}) response queued with {}", request_key, &msg_tag, ); - tokio::spawn(async move { - if send_states.wait_single().await { - debug!( - target: LOG_TARGET, - "Outbound request ({}) response Direct Send was successful {}", request_key, msg_tag - ); - } else { - error!( - target: LOG_TARGET, - "Outbound request ({}) response Direct Send was unsuccessful and no message was sent", - request_key - ); - }; - }); + + if send_states.wait_single().await { + debug!( + target: LOG_TARGET, + "Outbound request ({}) response Direct Send was successful {}", request_key, msg_tag + ); + } else { + error!( + target: LOG_TARGET, + "Outbound request ({}) response Direct Send was unsuccessful and no message was sent", request_key + ); + }; }, Err(err) => { debug!(target: LOG_TARGET, "Failed to send outbound request: {}", err); @@ -568,7 +575,7 @@ async fn handle_outbound_request( async fn handle_outbound_block( mut outbound_message_service: OutboundMessageRequester, - block: Block, + new_block: NewBlock, exclude_peers: Vec, ) -> Result<(), CommsInterfaceError> { @@ -577,7 +584,10 @@ async fn handle_outbound_block( NodeDestination::Unknown, OutboundEncryption::None, exclude_peers, - OutboundDomainMessage::new(TariMessageType::NewBlock, ProtoBlock::from(block)), + OutboundDomainMessage::new( + TariMessageType::NewBlock, + shared_protos::core::NewBlock::from(new_block), + ), ) .await .map_err(|e| { @@ -614,25 +624,24 @@ fn spawn_request_timeout(mut timeout_sender: Sender, request_key: Re async fn handle_incoming_block( mut inbound_nch: InboundNodeCommsHandlers, - domain_block_msg: DomainMessage, + domain_block_msg: DomainMessage, ) -> Result<(), BaseNodeServiceError> { - let DomainMessage::<_> { source_peer, inner, .. } = domain_block_msg; + let DomainMessage::<_> { + source_peer, + inner: new_block, + .. + } = domain_block_msg; debug!( - "New candidate block #{} (accum_diff: {}, hash: ({})) received.", - inner.header.height, - inner.header.total_accumulated_difficulty_inclusive(), - inner.header.hash().to_hex(), - ); - trace!( target: LOG_TARGET, - "New block: {}, from: {}", - inner, - source_peer.public_key + "New candidate block with hash `{}` received from `{}`.", + new_block.block_hash.to_hex(), + source_peer.node_id.short_str() ); + inbound_nch - .handle_block(&(inner, true.into()), Some(source_peer.node_id)) + .handle_new_block_message(new_block, source_peer.node_id) .await?; // TODO - retain peer info for stats and potential banning for sending invalid blocks diff --git a/base_layer/core/src/base_node/waiting_requests.rs b/base_layer/core/src/base_node/waiting_requests.rs index 8b0bfa03b2..f6f9c685a1 100644 --- a/base_layer/core/src/base_node/waiting_requests.rs +++ b/base_layer/core/src/base_node/waiting_requests.rs @@ -47,8 +47,8 @@ impl WaitingRequests { } /// Insert a new waiting request. - pub async fn insert(&self, key: RequestKey, reply_tx: Option>) { - self.requests.write().await.insert(key, reply_tx); + pub async fn insert(&self, key: RequestKey, reply_tx: OneshotSender) { + self.requests.write().await.insert(key, Some(reply_tx)); } /// Remove the waiting request corresponding to the provided key. diff --git a/base_layer/core/src/blocks/block.rs b/base_layer/core/src/blocks/block.rs index a754b691ea..bfed1ee02c 100644 --- a/base_layer/core/src/blocks/block.rs +++ b/base_layer/core/src/blocks/block.rs @@ -23,7 +23,7 @@ // Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License, // Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0. use crate::{ - blocks::BlockHeader, + blocks::{BlockHash, BlockHeader}, consensus::ConsensusConstants, proof_of_work::ProofOfWork, transactions::{ @@ -264,4 +264,23 @@ impl Hashable for Block { } } +//---------------------------------- NewBlock --------------------------------------------// +pub struct NewBlock { + pub block_hash: BlockHash, +} + +impl NewBlock { + pub fn new(block_hash: BlockHash) -> Self { + Self { block_hash } + } +} + +impl From<&Block> for NewBlock { + fn from(block: &Block) -> Self { + Self { + block_hash: block.hash(), + } + } +} + //---------------------------------------- Tests ----------------------------------------------------// diff --git a/base_layer/core/src/blocks/blockheader.rs b/base_layer/core/src/blocks/blockheader.rs index b138b5a70b..e72035b11c 100644 --- a/base_layer/core/src/blocks/blockheader.rs +++ b/base_layer/core/src/blocks/blockheader.rs @@ -59,6 +59,7 @@ use std::{ }; use tari_crypto::tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray, Hashable}; +pub const BLOCK_HASH_LENGTH: usize = 32; pub type BlockHash = Vec; #[derive(Clone, Debug, PartialEq, Error)] @@ -116,11 +117,11 @@ impl BlockHeader { BlockHeader { version: blockchain_version, height: 0, - prev_hash: vec![0; 32], + prev_hash: vec![0; BLOCK_HASH_LENGTH], timestamp: EpochTime::now(), - output_mr: vec![0; 32], - range_proof_mr: vec![0; 32], - kernel_mr: vec![0; 32], + output_mr: vec![0; BLOCK_HASH_LENGTH], + range_proof_mr: vec![0; BLOCK_HASH_LENGTH], + kernel_mr: vec![0; BLOCK_HASH_LENGTH], total_kernel_offset: BlindingFactor::default(), nonce: 0, pow: ProofOfWork::default(), @@ -140,9 +141,9 @@ impl BlockHeader { height: prev.height + 1, prev_hash, timestamp: EpochTime::now(), - output_mr: vec![0; 32], - range_proof_mr: vec![0; 32], - kernel_mr: vec![0; 32], + output_mr: vec![0; BLOCK_HASH_LENGTH], + range_proof_mr: vec![0; BLOCK_HASH_LENGTH], + kernel_mr: vec![0; BLOCK_HASH_LENGTH], total_kernel_offset: BlindingFactor::default(), nonce: 0, pow, diff --git a/base_layer/core/src/blocks/mod.rs b/base_layer/core/src/blocks/mod.rs index 1fc82f74b2..78ae318147 100644 --- a/base_layer/core/src/blocks/mod.rs +++ b/base_layer/core/src/blocks/mod.rs @@ -27,7 +27,7 @@ mod new_blockheader_template; pub mod genesis_block; -pub use block::{Block, BlockBuilder, BlockValidationError}; +pub use block::{Block, BlockBuilder, BlockValidationError, NewBlock}; pub use blockheader::{BlockHash, BlockHeader, BlockHeaderValidationError}; pub use new_block_template::NewBlockTemplate; pub use new_blockheader_template::NewBlockHeaderTemplate; diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index e2fe1da1c3..0f4b37fca7 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - blocks::{Block, BlockHeader, NewBlockTemplate}, + blocks::{Block, BlockHash, BlockHeader, NewBlockTemplate}, chain_storage::{ blockchain_database::BlockAddResult, metadata::ChainMetadata, @@ -109,5 +109,6 @@ make_async!(calculate_mmr_roots(template: NewBlockTemplate) -> Block, "calculate make_async!(fetch_block(height: u64) -> HistoricalBlock, "fetch_block"); make_async!(fetch_block_with_hash(hash: HashOutput) -> Option, "fetch_block_with_hash"); +make_async!(block_exists(block_hash: BlockHash) -> bool, "block_exists"); make_async!(rewind_to_height(height: u64) -> Vec, "rewind_to_height"); make_async!(fetch_mmr_proof(tree: MmrTree, pos: usize) -> MerkleProof, "fetch_mmr_proof"); diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index b43a501f6d..d9cc60badc 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -93,7 +93,7 @@ pub struct MutableMmrState { /// for example. /// The `GenesisBlockValidator` is used to check that the chain builds on the correct genesis block. /// The `ChainTipValidator` is used to check that the accounting balance and MMR states of the chain state is valid. -pub struct Validators { +pub struct Validators { block: Arc>, orphan: Arc>, accum_difficulty: Arc>, @@ -114,7 +114,7 @@ impl Validators { } } -impl Clone for Validators { +impl Clone for Validators { fn clone(&self) -> Self { Validators { block: Arc::clone(&self.block), @@ -246,9 +246,7 @@ macro_rules! fetch { /// let db = BlockchainDatabase::new(db_backend, &rules, validators, BlockchainDatabaseConfig::default()).unwrap(); /// // Do stuff with db /// ``` -pub struct BlockchainDatabase -where T: BlockchainBackend -{ +pub struct BlockchainDatabase { db: Arc>, validators: Validators, config: BlockchainDatabaseConfig, @@ -556,11 +554,17 @@ where T: BlockchainBackend /// Attempt to fetch the block corresponding to the provided hash from the main chain, if it cannot be found then /// the block will be searched in the orphan block pool. - pub fn fetch_block_with_hash(&self, hash: HashOutput) -> Result, ChainStorageError> { + pub fn fetch_block_with_hash(&self, hash: BlockHash) -> Result, ChainStorageError> { let db = self.db_read_access()?; fetch_block_with_hash(&*db, hash) } + /// Returns true if this block exists in the chain, or is orphaned. + pub fn block_exists(&self, hash: BlockHash) -> Result { + let db = self.db_read_access()?; + block_exists(&*db, hash) + } + /// Atomically commit the provided transaction to the database backend. This function does not update the metadata. pub fn commit(&self, txn: DbTransaction) -> Result<(), ChainStorageError> { let mut db = self.db_write_access()?; @@ -606,7 +610,7 @@ pub fn fetch_headers( fn fetch_header_with_block_hash( db: &T, - hash: HashOutput, + hash: BlockHash, ) -> Result { fetch!(db, hash, BlockHash) @@ -629,7 +633,7 @@ fn fetch_stxo(db: &T, hash: HashOutput) -> Result(db: &T, hash: HashOutput) -> Result { +fn fetch_orphan(db: &T, hash: BlockHash) -> Result { fetch!(db, hash, OrphanBlock) } @@ -773,7 +777,7 @@ fn fetch_block(db: &T, height: u64) -> Result( db: &T, - hash: HashOutput, + hash: BlockHash, ) -> Result, ChainStorageError> { if let Ok(header) = fetch_header_with_block_hash(db, hash.clone()) { @@ -785,6 +789,11 @@ fn fetch_block_with_hash( Ok(None) } +fn block_exists(db: &T, hash: BlockHash) -> Result { + let exists = db.contains(&DbKey::BlockHash(hash.clone()))? || db.contains(&DbKey::OrphanBlock(hash))?; + Ok(exists) +} + fn check_for_valid_height(db: &T, height: u64) -> Result { let metadata = db.fetch_metadata()?; let db_height = metadata.height_of_longest_chain.unwrap_or(0); @@ -1409,9 +1418,7 @@ fn log_error(req: DbKey, err: ChainStorageError) -> Result Clone for BlockchainDatabase -where T: BlockchainBackend -{ +impl Clone for BlockchainDatabase { fn clone(&self) -> Self { BlockchainDatabase { db: self.db.clone(), diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index 35cce190fa..484bc65e56 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -62,9 +62,7 @@ impl MempoolValidators { /// The Mempool consists of an Unconfirmed Transaction Pool, Pending Pool, Orphan Pool and Reorg Pool and is responsible /// for managing and maintaining all unconfirmed transactions have not yet been included in a block, and transactions /// that have recently been included in a block. -pub struct Mempool -where T: BlockchainBackend -{ +pub struct Mempool { pool_storage: Arc>>, } @@ -147,9 +145,7 @@ where T: BlockchainBackend } } -impl Clone for Mempool -where T: BlockchainBackend -{ +impl Clone for Mempool { fn clone(&self) -> Self { Mempool { pool_storage: self.pool_storage.clone(), diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 4bd206033b..75fe8e95ba 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -47,9 +47,7 @@ pub const LOG_TARGET: &str = "c::mp::mempool"; /// The Mempool consists of an Unconfirmed Transaction Pool, Pending Pool, Orphan Pool and Reorg Pool and is responsible /// for managing and maintaining all unconfirmed transactions have not yet been included in a block, and transactions /// that have recently been included in a block. -pub struct MempoolStorage -where T: BlockchainBackend -{ +pub struct MempoolStorage { blockchain_db: BlockchainDatabase, unconfirmed_pool: UnconfirmedPool, orphan_pool: OrphanPool, diff --git a/base_layer/core/src/mempool/orphan_pool/orphan_pool.rs b/base_layer/core/src/mempool/orphan_pool/orphan_pool.rs index 908307161b..700b9aaac2 100644 --- a/base_layer/core/src/mempool/orphan_pool/orphan_pool.rs +++ b/base_layer/core/src/mempool/orphan_pool/orphan_pool.rs @@ -58,9 +58,7 @@ impl Default for OrphanPoolConfig { /// The Orphan Pool contains all the received transactions that attempt to spend UTXOs that don't exist. These UTXOs /// might exist in the future if these transactions are from a series or set of transactions that need to be processed /// in a specific order. Some of these transactions might still be constrained by pending time-locks. -pub struct OrphanPool -where T: BlockchainBackend -{ +pub struct OrphanPool { pool_storage: Arc>>, } diff --git a/base_layer/core/src/mempool/orphan_pool/orphan_pool_storage.rs b/base_layer/core/src/mempool/orphan_pool/orphan_pool_storage.rs index 03112ccb0d..79dff41b3a 100644 --- a/base_layer/core/src/mempool/orphan_pool/orphan_pool_storage.rs +++ b/base_layer/core/src/mempool/orphan_pool/orphan_pool_storage.rs @@ -37,9 +37,7 @@ pub const LOG_TARGET: &str = "c::mp::orphan_pool::orphan_pool_storage"; /// The Orphan Pool contains all the received transactions that attempt to spend UTXOs that don't exist. These UTXOs /// might exist in the future if these transactions are from a series or set of transactions that need to be processed /// in a specific order. Some of these transactions might still be constrained by pending time-locks. -pub struct OrphanPoolStorage -where T: BlockchainBackend -{ +pub struct OrphanPoolStorage { config: OrphanPoolConfig, txs_by_signature: TtlCache>, validator: Validator, diff --git a/base_layer/core/src/mempool/service/initializer.rs b/base_layer/core/src/mempool/service/initializer.rs index 2a3b2003b1..dbddd1cc67 100644 --- a/base_layer/core/src/mempool/service/initializer.rs +++ b/base_layer/core/src/mempool/service/initializer.rs @@ -60,9 +60,7 @@ const LOG_TARGET: &str = "c::bn::mempool_service::initializer"; const SUBSCRIPTION_LABEL: &str = "Mempool"; /// Initializer for the Mempool service and service future. -pub struct MempoolServiceInitializer -where T: BlockchainBackend -{ +pub struct MempoolServiceInitializer { inbound_message_subscription_factory: Arc, mempool: Mempool, config: MempoolServiceConfig, diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index cc7e723d0e..552b9cc8c6 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -433,7 +433,7 @@ async fn handle_outbound_request( match send_result { Ok(_) => { // Spawn timeout and wait for matching response to arrive - waiting_requests.insert(request_key, Some(reply_tx)).await; + waiting_requests.insert(request_key, reply_tx).await; // Spawn timeout for waiting_request spawn_request_timeout(timeout_sender, request_key, config.request_timeout); Ok(()) diff --git a/base_layer/core/src/proto/block.proto b/base_layer/core/src/proto/block.proto index 969deb9865..0ff1cea98c 100644 --- a/base_layer/core/src/proto/block.proto +++ b/base_layer/core/src/proto/block.proto @@ -51,6 +51,12 @@ message Block { tari.types.AggregateBody body = 2; } +// A new block message. This is the message that is propagated around the network. It contains the +// minimal information required to identify and optionally request the full block. +message NewBlock { + bytes block_hash = 1; +} + // The representation of a historical block in the blockchain. It is essentially identical to a protocol-defined // block but contains some extra metadata that clients such as Block Explorers will find interesting. message HistoricalBlock { diff --git a/base_layer/core/src/proto/block.rs b/base_layer/core/src/proto/block.rs index 436f9a2069..87cad69366 100644 --- a/base_layer/core/src/proto/block.rs +++ b/base_layer/core/src/proto/block.rs @@ -22,7 +22,7 @@ use super::core as proto; use crate::{ - blocks::{Block, BlockHeader, NewBlockHeaderTemplate, NewBlockTemplate}, + blocks::{blockheader::BLOCK_HASH_LENGTH, Block, BlockHeader, NewBlock, NewBlockHeaderTemplate, NewBlockTemplate}, chain_storage::HistoricalBlock, proof_of_work::{Difficulty, PowAlgorithm, ProofOfWork}, proto::utils::try_convert_all, @@ -246,3 +246,30 @@ impl From for proto::NewBlockHeaderTemplate { } } } + +//---------------------------------- NewBlock --------------------------------------------// + +impl TryFrom for NewBlock { + type Error = String; + + fn try_from(new_block: proto::NewBlock) -> Result { + let block_hash = new_block.block_hash; + if block_hash.len() != BLOCK_HASH_LENGTH { + return Err(format!( + "Block hash has an incorrect length. (len={}, expected={})", + block_hash.len(), + BLOCK_HASH_LENGTH + )); + } + + Ok(Self { block_hash }) + } +} + +impl From for proto::NewBlock { + fn from(new_block: NewBlock) -> Self { + Self { + block_hash: new_block.block_hash, + } + } +} diff --git a/base_layer/core/src/validation/traits.rs b/base_layer/core/src/validation/traits.rs index 16c7c243c9..5840e9dd84 100644 --- a/base_layer/core/src/validation/traits.rs +++ b/base_layer/core/src/validation/traits.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{chain_storage::BlockchainBackend, validation::error::ValidationError}; +use crate::validation::error::ValidationError; pub type Validator = Box>; pub type StatelessValidator = Box>; @@ -28,9 +28,7 @@ pub type StatelessValidator = Box>; /// The core validation trait. Multiple `Validation` implementors can be chained together in a [ValidatorPipeline] to /// provide consensus validation for blocks, transactions, or DAN instructions. Implementors only need to implement /// the methods that are relevant for the pipeline, since the default implementation always passes. -pub trait Validation: Send + Sync -where B: BlockchainBackend -{ +pub trait Validation: Send + Sync { /// General validation code that can run independent of external state fn validate(&self, item: &T, db: &B) -> Result<(), ValidationError>; } diff --git a/base_layer/core/tests/helpers/block_builders.rs b/base_layer/core/tests/helpers/block_builders.rs index 541c996447..be2b0dceb6 100644 --- a/base_layer/core/tests/helpers/block_builders.rs +++ b/base_layer/core/tests/helpers/block_builders.rs @@ -22,6 +22,7 @@ use croaring::Bitmap; use rand::{rngs::OsRng, RngCore}; +use std::iter::repeat_with; use tari_core::{ blocks::{Block, BlockHeader, NewBlockTemplate}, chain_storage::{BlockAddResult, BlockchainBackend, BlockchainDatabase, ChainStorageError}, @@ -374,3 +375,21 @@ pub fn generate_block_with_coinbase( } result } + +pub fn construct_chained_blocks( + db: &BlockchainDatabase, + block0: Block, + consensus_constants: &ConsensusConstants, + n: usize, +) -> Vec +{ + let mut prev_block = block0; + + repeat_with(|| { + let block = append_block(db, &prev_block, vec![], consensus_constants, 1.into()).unwrap(); + prev_block = block.clone(); + block + }) + .take(n) + .collect() +} diff --git a/base_layer/core/tests/helpers/event_stream.rs b/base_layer/core/tests/helpers/event_stream.rs index 596d2b4266..010529e445 100644 --- a/base_layer/core/tests/helpers/event_stream.rs +++ b/base_layer/core/tests/helpers/event_stream.rs @@ -23,7 +23,7 @@ use futures::{future, future::Either, stream::FusedStream, FutureExt, Stream, StreamExt}; use std::time::Duration; -pub async fn event_stream_next(mut stream: TStream, timeout: Duration) -> Option +pub async fn event_stream_next(stream: &mut TStream, timeout: Duration) -> Option where TStream: Stream + FusedStream + Unpin { let either = future::select(stream.select_next_some(), tokio::time::delay_for(timeout).fuse()).await; diff --git a/base_layer/core/tests/mining.rs b/base_layer/core/tests/mining.rs index 0f8494e1e1..c96a7b8007 100644 --- a/base_layer/core/tests/mining.rs +++ b/base_layer/core/tests/mining.rs @@ -104,14 +104,14 @@ fn mining() { let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1, 112); miner.subscribe_to_node_state_events(state_event_receiver); miner.subscribe_to_mempool_state_events(alice_node.local_mp_interface.get_mempool_state_event_stream()); - let miner_utxo_stream = miner.get_utxo_receiver_channel().fuse(); + let mut miner_utxo_stream = miner.get_utxo_receiver_channel().fuse(); runtime.spawn(miner.mine()); runtime.block_on(async { // Simulate the BlockSync event assert!(state_event_sender.send(StateEvent::BlocksSynchronized).await.is_ok()); // Wait for miner to finish mining block 1 - assert!(event_stream_next(miner_utxo_stream, Duration::from_secs(20)) + assert!(event_stream_next(&mut miner_utxo_stream, Duration::from_secs(20)) .await .is_some()); // Check that the mined block was submitted to the base node service and the block was added to the blockchain @@ -128,7 +128,7 @@ fn mining() { } } } - assert!(found_tx_outputs == tx1.body.outputs().len()); + assert_eq!(found_tx_outputs, tx1.body.outputs().len()); async_assert_eventually!( alice_node .mempool diff --git a/base_layer/core/tests/node_service.rs b/base_layer/core/tests/node_service.rs index fc5d6881c3..b462e0121e 100644 --- a/base_layer/core/tests/node_service.rs +++ b/base_layer/core/tests/node_service.rs @@ -22,8 +22,9 @@ #[allow(dead_code)] mod helpers; +use crate::helpers::block_builders::construct_chained_blocks; use croaring::Bitmap; -use futures::join; +use futures::{join, StreamExt}; use helpers::{ block_builders::{ append_block, @@ -44,13 +45,14 @@ use helpers::{ }, }; use std::time::Duration; +use tari_comms::protocol::messaging::MessagingEvent; use tari_core::{ base_node::{ comms_interface::{BlockEvent, Broadcast, CommsInterfaceError}, consts::BASE_NODE_SERVICE_DESIRED_RESPONSE_FRACTION, service::BaseNodeServiceConfig, }, - blocks::BlockHeader, + blocks::{BlockHeader, NewBlock}, chain_storage::{BlockAddResult, DbTransaction, MmrTree}, consensus::{ConsensusConstantsBuilder, ConsensusManagerBuilder, Network}, mempool::MempoolServiceConfig, @@ -70,7 +72,7 @@ use tari_core::{ use tari_crypto::tari_utilities::hash::Hashable; use tari_mmr::MmrCacheConfig; use tari_p2p::services::liveness::LivenessConfig; -use tari_test_utils::random; +use tari_test_utils::{random, unpack_enum}; use tempdir::TempDir; use tokio::runtime::Runtime; @@ -381,13 +383,14 @@ fn request_and_response_fetch_blocks_with_hashes() { } #[test] -fn propagate_and_forward_valid_block() { +fn propagate_and_forward_many_valid_blocks() { let mut runtime = Runtime::new().unwrap(); let temp_dir = TempDir::new(random::string(8).as_str()).unwrap(); let factories = CryptoFactories::default(); - // Alice will propagate block to bob, bob will receive it, verify it and then propagate it to carol and dan. Dan and - // Carol will also try to propagate the block to each other, as they dont know that bob sent it to the other node. - // These duplicate blocks will be discarded and wont be propagated again. + // Alice will propagate a number of block hashes to bob, bob will receive it, request the full block, verify and + // then propagate the hash to carol and dan. Dan and Carol will also try to propagate the block hashes to each + // other, but the block should not be re-requested. These duplicate blocks will be discarded and wont be + // propagated again. // /-> carol <-\ // / | // alice -> bob | @@ -406,7 +409,7 @@ fn propagate_and_forward_valid_block() { .with_consensus_constants(consensus_constants) .with_block(block0.clone()) .build(); - let (mut alice_node, rules) = BaseNodeBuilder::new(network) + let (alice_node, rules) = BaseNodeBuilder::new(network) .with_node_identity(alice_node_identity.clone()) .with_consensus_manager(rules) .start(&mut runtime, temp_dir.path().join("alice").to_str().unwrap()); @@ -428,11 +431,92 @@ fn propagate_and_forward_valid_block() { wait_until_online(&mut runtime, &[&alice_node, &bob_node, &carol_node, &dan_node]); - let bob_block_event_stream = bob_node.local_nci.get_block_event_stream_fused(); - let carol_block_event_stream = carol_node.local_nci.get_block_event_stream_fused(); - let dan_block_event_stream = dan_node.local_nci.get_block_event_stream_fused(); + let mut bob_block_event_stream = bob_node.local_nci.get_block_event_stream_fused(); + let mut carol_block_event_stream = carol_node.local_nci.get_block_event_stream_fused(); + let mut dan_block_event_stream = dan_node.local_nci.get_block_event_stream_fused(); - let block1 = append_block( + let blocks = construct_chained_blocks(&alice_node.blockchain_db, block0, &rules.consensus_constants(), 5); + + runtime.block_on(async { + for block in &blocks { + alice_node + .outbound_nci + .propagate_block(NewBlock::from(block), vec![]) + .await + .unwrap(); + + let bob_block_event_fut = event_stream_next(&mut bob_block_event_stream, Duration::from_millis(20000)); + let carol_block_event_fut = event_stream_next(&mut carol_block_event_stream, Duration::from_millis(20000)); + let dan_block_event_fut = event_stream_next(&mut dan_block_event_stream, Duration::from_millis(20000)); + let (bob_block_event, carol_block_event, dan_block_event) = + join!(bob_block_event_fut, carol_block_event_fut, dan_block_event_fut); + let block_hash = block.hash(); + + if let BlockEvent::Verified((received_block, _, _)) = &*bob_block_event.unwrap().unwrap() { + assert_eq!(received_block.hash(), block_hash); + } else { + panic!("Bob's node did not receive and validate the expected block"); + } + if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*carol_block_event.unwrap().unwrap() + { + assert_eq!(received_block.hash(), block_hash); + } else { + panic!("Carol's node did not receive and validate the expected block"); + } + if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*dan_block_event.unwrap().unwrap() { + assert_eq!(received_block.hash(), block_hash); + } else { + panic!("Dan's node did not receive and validate the expected block"); + } + } + + alice_node.comms.shutdown().await; + bob_node.comms.shutdown().await; + carol_node.comms.shutdown().await; + dan_node.comms.shutdown().await; + }); +} + +#[test] +fn propagate_and_forward_invalid_block_hash() { + // Alice will propagate a "made up" block hash to Bob, Bob will request the block from Alice. Alice will not be able + // to provide the block and so Bob will not propagate the hash further to Carol. + // alice -> bob -> carol + + let mut runtime = Runtime::new().unwrap(); + let temp_dir = TempDir::new(random::string(8).as_str()).unwrap(); + let factories = CryptoFactories::default(); + + let alice_node_identity = random_node_identity(); + let bob_node_identity = random_node_identity(); + let carol_node_identity = random_node_identity(); + let network = Network::LocalNet; + let consensus_constants = ConsensusConstantsBuilder::new(network) + .with_emission_amounts(100_000_000.into(), 0.999, 100.into()) + .build(); + let (block0, _) = create_genesis_block(&factories, &consensus_constants); + let rules = ConsensusManagerBuilder::new(network) + .with_consensus_constants(consensus_constants) + .with_block(block0.clone()) + .build(); + let (alice_node, rules) = BaseNodeBuilder::new(network) + .with_node_identity(alice_node_identity.clone()) + .with_consensus_manager(rules) + .start(&mut runtime, temp_dir.path().join("alice").to_str().unwrap()); + let (bob_node, rules) = BaseNodeBuilder::new(network) + .with_node_identity(bob_node_identity.clone()) + .with_peers(vec![alice_node_identity]) + .with_consensus_manager(rules) + .start(&mut runtime, temp_dir.path().join("bob").to_str().unwrap()); + let (carol_node, rules) = BaseNodeBuilder::new(network) + .with_node_identity(carol_node_identity.clone()) + .with_peers(vec![bob_node_identity.clone()]) + .with_consensus_manager(rules) + .start(&mut runtime, temp_dir.path().join("carol").to_str().unwrap()); + + wait_until_online(&mut runtime, &[&alice_node, &bob_node, &carol_node]); + + let mut block1 = append_block( &alice_node.blockchain_db, &block0, vec![], @@ -440,43 +524,53 @@ fn propagate_and_forward_valid_block() { 1.into(), ) .unwrap(); - let block1_hash = block1.hash(); + // Create unknown block hash + block1.header.height = 0; + + let mut alice_message_events = alice_node.comms.subscribe_messaging_events().fuse(); + let mut bob_message_events = bob_node.comms.subscribe_messaging_events().fuse(); + let mut carol_message_events = carol_node.comms.subscribe_messaging_events().fuse(); runtime.block_on(async { - // Alice will start the propagation. Bob, Carol and Dan will propagate based on the logic in their inbound - // handle_block handlers - assert!(alice_node + alice_node .outbound_nci - .propagate_block(block1.clone(), vec![]) + .propagate_block(NewBlock::from(&block1), vec![]) .await - .is_ok()); - - let bob_block_event_fut = event_stream_next(bob_block_event_stream, Duration::from_millis(20000)); - let carol_block_event_fut = event_stream_next(carol_block_event_stream, Duration::from_millis(20000)); - let dan_block_event_fut = event_stream_next(dan_block_event_stream, Duration::from_millis(20000)); - let (bob_block_event, carol_block_event, dan_block_event) = - join!(bob_block_event_fut, carol_block_event_fut, dan_block_event_fut); + .unwrap(); - if let BlockEvent::Verified((received_block, _, _)) = &*bob_block_event.unwrap().unwrap() { - assert_eq!(received_block.hash(), block1_hash); - } else { - panic!("Bob's node did not receive and validate the expected block"); - } - if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*carol_block_event.unwrap().unwrap() { - assert_eq!(received_block.hash(), block1_hash); - } else { - panic!("Carol's node did not receive and validate the expected block"); - } - if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*dan_block_event.unwrap().unwrap() { - assert_eq!(received_block.hash(), block1_hash); - } else { - panic!("Dan's node did not receive and validate the expected block"); - } + // Alice propagated to Bob + let msg_event = event_stream_next(&mut alice_message_events, Duration::from_secs(10)) + .await + .unwrap() + .unwrap(); + unpack_enum!(MessagingEvent::MessageSent(_a) = &*msg_event); + // Bob received the invalid hash + let msg_event = event_stream_next(&mut bob_message_events, Duration::from_secs(10)) + .await + .unwrap() + .unwrap(); + unpack_enum!(MessagingEvent::MessageReceived(_a, _b) = &*msg_event); + // Sent the request for the block to Alice + let msg_event = event_stream_next(&mut bob_message_events, Duration::from_secs(10)) + .await + .unwrap() + .unwrap(); + unpack_enum!(MessagingEvent::MessageSent(_a) = &*msg_event); + // Bob received a response from Alice + let msg_event = event_stream_next(&mut bob_message_events, Duration::from_secs(10)) + .await + .unwrap() + .unwrap(); + unpack_enum!(MessagingEvent::MessageReceived(node_id, _a) = &*msg_event); + assert_eq!(&**node_id, alice_node.node_identity.node_id()); + // Checking a negative: Bob should not have propagated this hash to Carol. If Bob does, this assertion will be + // flaky. + let msg_event = event_stream_next(&mut carol_message_events, Duration::from_millis(500)).await; + assert!(msg_event.is_none()); alice_node.comms.shutdown().await; bob_node.comms.shutdown().await; carol_node.comms.shutdown().await; - dan_node.comms.shutdown().await; }); } @@ -506,9 +600,9 @@ fn propagate_and_forward_invalid_block() { .with_block(block0.clone()) .build(); let stateless_block_validator = StatelessBlockValidator::new(rules.clone(), factories.clone()); - let mock_validator = MockValidator::new(true); let mock_accum_difficulty_validator = MockAccumDifficultyValidator {}; + let mock_validator = MockValidator::new(false); let (dan_node, rules) = BaseNodeBuilder::new(network) .with_node_identity(dan_node_identity.clone()) .with_consensus_manager(rules) @@ -533,7 +627,8 @@ fn propagate_and_forward_invalid_block() { mock_accum_difficulty_validator.clone(), ) .start(&mut runtime, temp_dir.path().join("bob").to_str().unwrap()); - let (mut alice_node, rules) = BaseNodeBuilder::new(network) + let mock_validator = MockValidator::new(true); + let (alice_node, rules) = BaseNodeBuilder::new(network) .with_node_identity(alice_node_identity) .with_peers(vec![bob_node_identity.clone(), carol_node_identity.clone()]) .with_consensus_manager(rules) @@ -541,8 +636,9 @@ fn propagate_and_forward_invalid_block() { wait_until_online(&mut runtime, &[&alice_node, &bob_node, &carol_node, &dan_node]); - // Make block 1 invalid - let mut block1 = append_block( + // This is a valid block, however Bob, Carol and Dan's block validator is set to always reject the block + // after fetching it. + let block1 = append_block( &alice_node.blockchain_db, &block0, vec![], @@ -550,22 +646,22 @@ fn propagate_and_forward_invalid_block() { 1.into(), ) .unwrap(); - block1.header.height = 0; let block1_hash = block1.hash(); + runtime.block_on(async { - let bob_block_event_stream = bob_node.local_nci.get_block_event_stream_fused(); - let carol_block_event_stream = carol_node.local_nci.get_block_event_stream_fused(); - let dan_block_event_stream = dan_node.local_nci.get_block_event_stream_fused(); + let mut bob_block_event_stream = bob_node.local_nci.get_block_event_stream_fused(); + let mut carol_block_event_stream = carol_node.local_nci.get_block_event_stream_fused(); + let mut dan_block_event_stream = dan_node.local_nci.get_block_event_stream_fused(); assert!(alice_node .outbound_nci - .propagate_block(block1.clone(), vec![]) + .propagate_block(NewBlock::from(&block1), vec![]) .await .is_ok()); - let bob_block_event_fut = event_stream_next(bob_block_event_stream, Duration::from_millis(20000)); - let carol_block_event_fut = event_stream_next(carol_block_event_stream, Duration::from_millis(20000)); - let dan_block_event_fut = event_stream_next(dan_block_event_stream, Duration::from_millis(5000)); + let bob_block_event_fut = event_stream_next(&mut bob_block_event_stream, Duration::from_millis(20000)); + let carol_block_event_fut = event_stream_next(&mut carol_block_event_stream, Duration::from_millis(20000)); + let dan_block_event_fut = event_stream_next(&mut dan_block_event_stream, Duration::from_millis(5000)); let (bob_block_event, carol_block_event, dan_block_event) = join!(bob_block_event_fut, carol_block_event_fut, dan_block_event_fut); @@ -739,7 +835,7 @@ fn local_submit_block() { BaseNodeBuilder::new(network).start(&mut runtime, temp_dir.path().to_str().unwrap()); let db = &node.blockchain_db; - let event_stream = node.local_nci.get_block_event_stream_fused(); + let mut event_stream = node.local_nci.get_block_event_stream_fused(); let block0 = db.fetch_block(0).unwrap().block().clone(); let block1 = db .calculate_mmr_roots(chain_block(&block0, vec![], &consensus_manager.consensus_constants())) @@ -751,7 +847,7 @@ fn local_submit_block() { .await .is_ok()); - let event = event_stream_next(event_stream, Duration::from_millis(20000)).await; + let event = event_stream_next(&mut event_stream, Duration::from_millis(20000)).await; if let BlockEvent::Verified((received_block, result, _)) = &*event.unwrap().unwrap() { assert_eq!(received_block.hash(), block1.hash()); assert_eq!(*result, BlockAddResult::Ok); diff --git a/base_layer/core/tests/wallet.rs b/base_layer/core/tests/wallet.rs index 347d4978b0..b4d82e23b2 100644 --- a/base_layer/core/tests/wallet.rs +++ b/base_layer/core/tests/wallet.rs @@ -307,7 +307,7 @@ fn wallet_base_node_integration_test() { let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1, 113); miner.subscribe_to_node_state_events(state_event_receiver); miner.subscribe_to_mempool_state_events(base_node.local_mp_interface.get_mempool_state_event_stream()); - let miner_utxo_stream = miner.get_utxo_receiver_channel().fuse(); + let mut miner_utxo_stream = miner.get_utxo_receiver_channel().fuse(); runtime.spawn(async move { miner.mine().await; }); @@ -316,7 +316,7 @@ fn wallet_base_node_integration_test() { // Simulate block sync assert!(state_event_sender.send(StateEvent::BlocksSynchronized).await.is_ok()); // Wait for miner to finish mining block 1 - assert!(event_stream_next(miner_utxo_stream, Duration::from_secs(20)) + assert!(event_stream_next(&mut miner_utxo_stream, Duration::from_secs(20)) .await .is_some()); // Check that the mined block was submitted to the base node service and the block was added to the blockchain @@ -333,7 +333,7 @@ fn wallet_base_node_integration_test() { } } } - assert!(found_tx_outputs == transaction.body.outputs().len()); + assert_eq!(found_tx_outputs, transaction.body.outputs().len()); }); runtime.block_on(async { diff --git a/comms/src/common/rate_limit.rs b/comms/src/common/rate_limit.rs index f908690b59..ab4926763a 100644 --- a/comms/src/common/rate_limit.rs +++ b/comms/src/common/rate_limit.rs @@ -167,6 +167,6 @@ mod test { } } // Could allow a few more than 50. In slower test runs (e.g. on CI) this can be quite a bit over. - assert!((50..100).contains(&count)); + assert!((50..200).contains(&count)); } }