diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 465c6eb7c0..216d247209 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -70,7 +70,7 @@ impl ChainMetadataService { /// Run the service pub async fn run(mut self) { let mut liveness_event_stream = self.liveness.get_event_stream_fused(); - let mut base_node_event_stream = self.base_node.get_block_event_stream_fused(); + let mut block_event_stream = self.base_node.get_block_event_stream_fused(); log_if_error!( target: LOG_TARGET, @@ -80,13 +80,15 @@ impl ChainMetadataService { loop { futures::select! { - event = base_node_event_stream.select_next_some() => { - log_if_error!( - level: debug, - target: LOG_TARGET, - "Failed to handle base node event because '{}'", - self.handle_block_event(&event).await - ); + block_event = block_event_stream.select_next_some() => { + if let Ok(block_event) = block_event { + log_if_error!( + level: debug, + target: LOG_TARGET, + "Failed to handle block event because '{}'", + self.handle_block_event(&block_event).await + ); + } }, liveness_event = liveness_event_stream.select_next_some() => { @@ -265,6 +267,7 @@ mod test { use tari_p2p::services::liveness::{mock::create_p2p_liveness_mock, LivenessRequest, PingPongEvent}; use tari_service_framework::reply_channel; use tari_test_utils::{runtime, unpack_enum}; + use tokio::sync::broadcast; fn create_base_node_nci() -> ( LocalNodeCommsInterface, @@ -272,8 +275,8 @@ mod test { ) { let (base_node_sender, base_node_receiver) = reply_channel::unbounded(); let (block_sender, _block_receiver) = reply_channel::unbounded(); - let (_base_node_publisher, subscriber) = broadcast_channel::bounded(1); - let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, subscriber); + let (block_event_sender, _) = broadcast::channel(50); + let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, block_event_sender); (base_node, base_node_receiver) } diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 7ffc67bf2f..39b9c89f6a 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -22,7 +22,12 @@ use crate::{ base_node::{ - comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse}, + comms_interface::{ + error::CommsInterfaceError, + local_interface::BlockEventSender, + NodeCommsRequest, + NodeCommsResponse, + }, OutboundNodeCommsInterface, }, blocks::{blockheader::BlockHeader, Block, NewBlockTemplate}, @@ -39,17 +44,11 @@ use crate::{ proof_of_work::{get_target_difficulty, Difficulty, PowAlgorithm}, transactions::transaction::{TransactionKernel, TransactionOutput}, }; -use futures::SinkExt; use log::*; -use std::{ - fmt::{Display, Error, Formatter}, - sync::Arc, -}; +use std::fmt::{Display, Error, Formatter}; use strum_macros::Display; -use tari_broadcast_channel::Publisher; use tari_comms::peer_manager::NodeId; use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex}; -use tokio::sync::RwLock; const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler"; const MAX_HEADERS_PER_RESPONSE: u32 = 100; @@ -89,7 +88,7 @@ impl From for Broadcast { pub struct InboundNodeCommsHandlers where T: BlockchainBackend + 'static { - event_publisher: Arc>>, + block_event_sender: BlockEventSender, blockchain_db: BlockchainDatabase, mempool: Mempool, consensus_manager: ConsensusManager, @@ -101,7 +100,7 @@ where T: BlockchainBackend + 'static { /// Construct a new InboundNodeCommsInterface. pub fn new( - event_publisher: Publisher, + block_event_sender: BlockEventSender, blockchain_db: BlockchainDatabase, mempool: Mempool, consensus_manager: ConsensusManager, @@ -109,7 +108,7 @@ where T: BlockchainBackend + 'static ) -> Self { Self { - event_publisher: Arc::new(RwLock::new(event_publisher)), + block_event_sender, blockchain_db, mempool, consensus_manager, @@ -305,12 +304,10 @@ where T: BlockchainBackend + 'static BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast)) }, }; - self.event_publisher - .write() - .await + self.block_event_sender .send(block_event) - .await .map_err(|_| CommsInterfaceError::EventStreamError)?; + // Propagate verified block to remote nodes if let Ok(add_block_result) = add_block_result { let propagate = match add_block_result { @@ -365,7 +362,7 @@ where T: BlockchainBackend + 'static fn clone(&self) -> Self { // All members use Arc's internally so calling clone should be cheap. Self { - event_publisher: self.event_publisher.clone(), + block_event_sender: self.block_event_sender.clone(), blockchain_db: self.blockchain_db.clone(), mempool: self.mempool.clone(), consensus_manager: self.consensus_manager.clone(), diff --git a/base_layer/core/src/base_node/comms_interface/local_interface.rs b/base_layer/core/src/base_node/comms_interface/local_interface.rs index bf4d127fb2..28fd5bb7c2 100644 --- a/base_layer/core/src/base_node/comms_interface/local_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/local_interface.rs @@ -33,17 +33,20 @@ use crate::{ proof_of_work::{Difficulty, PowAlgorithm}, }; use futures::{stream::Fuse, StreamExt}; -use tari_broadcast_channel::Subscriber; use tari_service_framework::reply_channel::SenderService; +use tokio::sync::broadcast; use tower_service::Service; +pub type BlockEventSender = broadcast::Sender; +pub type BlockEventReceiver = broadcast::Receiver; + /// The InboundNodeCommsInterface provides an interface to request information from the current local node by other /// internal services. #[derive(Clone)] pub struct LocalNodeCommsInterface { request_sender: SenderService>, block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>, - block_event_stream: Subscriber, + block_event_sender: BlockEventSender, } impl LocalNodeCommsInterface { @@ -51,21 +54,21 @@ impl LocalNodeCommsInterface { pub fn new( request_sender: SenderService>, block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>, - block_event_stream: Subscriber, + block_event_sender: BlockEventSender, ) -> Self { Self { request_sender, block_sender, - block_event_stream, + block_event_sender, } } - pub fn get_block_event_stream(&self) -> Subscriber { - self.block_event_stream.clone() + pub fn get_block_event_stream(&self) -> BlockEventReceiver { + self.block_event_sender.subscribe() } - pub fn get_block_event_stream_fused(&self) -> Fuse> { + pub fn get_block_event_stream_fused(&self) -> Fuse { self.get_block_event_stream().fuse() } diff --git a/base_layer/core/src/base_node/comms_interface/mod.rs b/base_layer/core/src/base_node/comms_interface/mod.rs index 7daa66a8e0..e4743787fb 100644 --- a/base_layer/core/src/base_node/comms_interface/mod.rs +++ b/base_layer/core/src/base_node/comms_interface/mod.rs @@ -32,5 +32,5 @@ pub use comms_request::{MmrStateRequest, NodeCommsRequest}; pub use comms_response::NodeCommsResponse; pub use error::CommsInterfaceError; pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers}; -pub use local_interface::LocalNodeCommsInterface; +pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface}; pub use outbound_interface::OutboundNodeCommsInterface; diff --git a/base_layer/core/src/base_node/service/initializer.rs b/base_layer/core/src/base_node/service/initializer.rs index d2aed71668..71f9447fe7 100644 --- a/base_layer/core/src/base_node/service/initializer.rs +++ b/base_layer/core/src/base_node/service/initializer.rs @@ -35,7 +35,6 @@ use crate::{ use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, future, Future, Stream, StreamExt}; use log::*; use std::{convert::TryFrom, sync::Arc}; -use tari_broadcast_channel::bounded; use tari_comms_dht::outbound::OutboundMessageRequester; use tari_p2p::{ comms_connector::PeerMessage, @@ -51,7 +50,7 @@ use tari_service_framework::{ ServiceInitializer, }; use tari_shutdown::ShutdownSignal; -use tokio::runtime; +use tokio::{runtime, sync::broadcast}; const LOG_TARGET: &str = "c::bn::service::initializer"; @@ -166,14 +165,14 @@ where T: BlockchainBackend + 'static let (local_block_sender_service, local_block_stream) = reply_channel::unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(outbound_request_sender_service, outbound_block_sender_service); - let (block_event_publisher, block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let local_nci = LocalNodeCommsInterface::new( local_request_sender_service, local_block_sender_service, - block_event_subscriber, + block_event_sender.clone(), ); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, self.blockchain_db.clone(), self.mempool.clone(), self.consensus_manager.clone(), diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index 615d263dba..be6a595f21 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -21,7 +21,12 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - base_node::{comms_interface::BlockEvent, generate_request_key, RequestKey, WaitingRequests}, + base_node::{ + comms_interface::{BlockEvent, BlockEventReceiver}, + generate_request_key, + RequestKey, + WaitingRequests, + }, chain_storage::BlockchainBackend, mempool::{ proto, @@ -47,8 +52,7 @@ use futures::{ }; use log::*; use rand::rngs::OsRng; -use std::{convert::TryInto, sync::Arc, time::Duration}; -use tari_broadcast_channel::Subscriber; +use std::{convert::TryInto, time::Duration}; use tari_comms::peer_manager::NodeId; use tari_comms_dht::{ domain_message::OutboundDomainMessage, @@ -70,7 +74,7 @@ pub struct MempoolStreams { inbound_response_stream: SInRes, inbound_transaction_stream: STxIn, local_request_stream: SLocalReq, - block_event_stream: Subscriber, + block_event_stream: BlockEventReceiver, } impl MempoolStreams @@ -88,7 +92,7 @@ where inbound_response_stream: SInRes, inbound_transaction_stream: STxIn, local_request_stream: SLocalReq, - block_event_stream: Subscriber, + block_event_stream: BlockEventReceiver, ) -> Self { Self { @@ -199,7 +203,9 @@ where B: BlockchainBackend + 'static // Block events from local Base Node. block_event = block_event_stream.select_next_some() => { - self.spawn_handle_block_event(block_event); + if let Ok(block_event) = block_event { + self.spawn_handle_block_event(block_event); + } }, // Timeout events for waiting requests @@ -322,7 +328,7 @@ where B: BlockchainBackend + 'static }); } - fn spawn_handle_block_event(&self, block_event: Arc) { + fn spawn_handle_block_event(&self, block_event: BlockEvent) { let inbound_handlers = self.inbound_handlers.clone(); task::spawn(async move { let _ = handle_block_event(inbound_handlers, &block_event).await.or_else(|err| { diff --git a/base_layer/core/tests/node_comms_interface.rs b/base_layer/core/tests/node_comms_interface.rs index 12a5d62259..182263d9b0 100644 --- a/base_layer/core/tests/node_comms_interface.rs +++ b/base_layer/core/tests/node_comms_interface.rs @@ -24,7 +24,6 @@ mod helpers; use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, executor::block_on, StreamExt}; -use tari_broadcast_channel::bounded; use tari_comms::peer_manager::NodeId; use tari_core::{ base_node::{ @@ -46,6 +45,7 @@ use tari_core::{ use tari_crypto::tari_utilities::hash::Hashable; use tari_service_framework::{reply_channel, reply_channel::Receiver}; use tari_test_utils::runtime::test_async; +use tokio::sync::broadcast; async fn test_request_responder( receiver: &mut Receiver<(NodeCommsRequest, Option), Result>, @@ -91,12 +91,12 @@ fn inbound_get_metadata() { let network = Network::LocalNet; let consensus_manager = ConsensusManagerBuilder::new(network).build(); - let (block_event_publisher, _block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let (request_sender, _) = reply_channel::unbounded(); let (block_sender, _) = futures_mpsc_channel_unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender.clone()); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, store.clone(), mempool, consensus_manager, @@ -144,12 +144,12 @@ fn inbound_fetch_kernels() { let (mempool, store) = new_mempool(); let network = Network::LocalNet; let consensus_manager = ConsensusManagerBuilder::new(network).build(); - let (block_event_publisher, _block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let (request_sender, _) = reply_channel::unbounded(); let (block_sender, _) = futures_mpsc_channel_unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, store.clone(), mempool, consensus_manager, @@ -205,12 +205,12 @@ fn inbound_fetch_headers() { let consensus_manager = ConsensusManagerBuilder::new(network) .with_consensus_constants(consensus_constants) .build(); - let (block_event_publisher, _block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let (request_sender, _) = reply_channel::unbounded(); let (block_sender, _) = futures_mpsc_channel_unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, store.clone(), mempool, consensus_manager, @@ -258,7 +258,7 @@ fn outbound_fetch_utxos() { fn inbound_fetch_utxos() { let factories = CryptoFactories::default(); let (mempool, store) = new_mempool(); - let (block_event_publisher, _block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let network = Network::LocalNet; let consensus_constants = network.create_consensus_constants(); let consensus_manager = ConsensusManagerBuilder::new(network) @@ -268,7 +268,7 @@ fn inbound_fetch_utxos() { let (block_sender, _) = futures_mpsc_channel_unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, store.clone(), mempool, consensus_manager, @@ -320,7 +320,7 @@ fn outbound_fetch_blocks() { #[test] fn inbound_fetch_blocks() { let (mempool, store) = new_mempool(); - let (block_event_publisher, _block_event_subscriber) = bounded(100); + let (block_event_sender, _) = broadcast::channel(50); let network = Network::LocalNet; let consensus_constants = network.create_consensus_constants(); let consensus_manager = ConsensusManagerBuilder::new(network) @@ -330,7 +330,7 @@ fn inbound_fetch_blocks() { let (block_sender, _) = futures_mpsc_channel_unbounded(); let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); let inbound_nch = InboundNodeCommsHandlers::new( - block_event_publisher, + block_event_sender, store.clone(), mempool, consensus_manager, diff --git a/base_layer/core/tests/node_service.rs b/base_layer/core/tests/node_service.rs index dc1d3cf705..591c83d94a 100644 --- a/base_layer/core/tests/node_service.rs +++ b/base_layer/core/tests/node_service.rs @@ -459,17 +459,17 @@ fn propagate_and_forward_valid_block() { let (bob_block_event, carol_block_event, dan_block_event) = join!(bob_block_event_fut, carol_block_event_fut, dan_block_event_fut); - if let BlockEvent::Verified((received_block, _, _)) = &*bob_block_event.unwrap() { + if let Some(Ok(BlockEvent::Verified((received_block, _, _)))) = bob_block_event { assert_eq!(received_block.hash(), block1_hash); } else { panic!("Bob's node did not receive and validate the expected block"); } - if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*carol_block_event.unwrap() { + if let Some(Ok(BlockEvent::Verified((received_block, _block_add_result, _)))) = carol_block_event { assert_eq!(received_block.hash(), block1_hash); } else { panic!("Carol's node did not receive and validate the expected block"); } - if let BlockEvent::Verified((received_block, _block_add_result, _)) = &*dan_block_event.unwrap() { + if let Some(Ok(BlockEvent::Verified((received_block, _block_add_result, _)))) = dan_block_event { assert_eq!(received_block.hash(), block1_hash); } else { panic!("Dan's node did not receive and validate the expected block"); @@ -569,12 +569,12 @@ fn propagate_and_forward_invalid_block() { let (bob_block_event, carol_block_event, dan_block_event) = join!(bob_block_event_fut, carol_block_event_fut, dan_block_event_fut); - if let BlockEvent::Invalid((received_block, _err, _)) = &*bob_block_event.unwrap() { + if let Some(Ok(BlockEvent::Invalid((received_block, _err, _)))) = bob_block_event { assert_eq!(received_block.hash(), block1_hash); } else { panic!("Bob's node should have detected an invalid block"); } - if let BlockEvent::Invalid((received_block, _err, _)) = &*carol_block_event.unwrap() { + if let Some(Ok(BlockEvent::Invalid((received_block, _err, _)))) = carol_block_event { assert_eq!(received_block.hash(), block1_hash); } else { panic!("Carol's node should have detected an invalid block"); @@ -739,6 +739,7 @@ fn local_submit_block() { BaseNodeBuilder::new(network).start(&mut runtime, temp_dir.path().to_str().unwrap()); let db = &node.blockchain_db; + let event_stream = node.local_nci.get_block_event_stream_fused(); let block0 = db.fetch_block(0).unwrap().block().clone(); let block1 = db .calculate_mmr_roots(chain_block(&block0, vec![], &consensus_manager.consensus_constants())) @@ -750,12 +751,10 @@ fn local_submit_block() { .await .is_ok()); - let event_stream = node.local_nci.get_block_event_stream_fused(); let event = event_stream_next(event_stream, Duration::from_millis(20000)).await; - - if let BlockEvent::Verified((received_block, result, _)) = &*event.unwrap() { + if let Some(Ok(BlockEvent::Verified((received_block, result, _)))) = event { assert_eq!(received_block.hash(), block1.hash()); - assert_eq!(*result, BlockAddResult::Ok); + assert_eq!(result, BlockAddResult::Ok); } else { panic!("Block validation failed"); }