Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switched the BlockEvent stream to Tokio broadcast #1889

Merged
merged 1 commit into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ChainMetadataService {
/// Run the service
pub async fn run(mut self) {
let mut liveness_event_stream = self.liveness.get_event_stream_fused();
let mut base_node_event_stream = self.base_node.get_block_event_stream_fused();
let mut block_event_stream = self.base_node.get_block_event_stream_fused();

log_if_error!(
target: LOG_TARGET,
Expand All @@ -80,13 +80,15 @@ impl ChainMetadataService {

loop {
futures::select! {
event = base_node_event_stream.select_next_some() => {
log_if_error!(
level: debug,
target: LOG_TARGET,
"Failed to handle base node event because '{}'",
self.handle_block_event(&event).await
);
block_event = block_event_stream.select_next_some() => {
if let Ok(block_event) = block_event {
log_if_error!(
level: debug,
target: LOG_TARGET,
"Failed to handle block event because '{}'",
self.handle_block_event(&block_event).await
);
}
},

liveness_event = liveness_event_stream.select_next_some() => {
Expand Down Expand Up @@ -265,15 +267,16 @@ mod test {
use tari_p2p::services::liveness::{mock::create_p2p_liveness_mock, LivenessRequest, PingPongEvent};
use tari_service_framework::reply_channel;
use tari_test_utils::{runtime, unpack_enum};
use tokio::sync::broadcast;

fn create_base_node_nci() -> (
LocalNodeCommsInterface,
reply_channel::Receiver<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
) {
let (base_node_sender, base_node_receiver) = reply_channel::unbounded();
let (block_sender, _block_receiver) = reply_channel::unbounded();
let (_base_node_publisher, subscriber) = broadcast_channel::bounded(1);
let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, subscriber);
let (block_event_sender, _) = broadcast::channel(50);
let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, block_event_sender);

(base_node, base_node_receiver)
}
Expand Down
26 changes: 13 additions & 13 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@

use crate::{
base_node::{
comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse},
comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
NodeCommsRequest,
NodeCommsResponse,
},
OutboundNodeCommsInterface,
},
blocks::{blockheader::BlockHeader, Block, NewBlockTemplate},
Expand All @@ -39,17 +44,14 @@ use crate::{
proof_of_work::{get_target_difficulty, Difficulty, PowAlgorithm},
transactions::transaction::{TransactionKernel, TransactionOutput},
};
use futures::SinkExt;
use log::*;
use std::{
fmt::{Display, Error, Formatter},
sync::Arc,
};
use strum_macros::Display;
use tari_broadcast_channel::Publisher;
use tari_comms::peer_manager::NodeId;
use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex};
use tokio::sync::RwLock;

const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
const MAX_HEADERS_PER_RESPONSE: u32 = 100;
Expand Down Expand Up @@ -89,7 +91,7 @@ impl From<bool> for Broadcast {
pub struct InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
event_publisher: Arc<RwLock<Publisher<BlockEvent>>>,
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
Expand All @@ -101,15 +103,15 @@ where T: BlockchainBackend + 'static
{
/// Construct a new InboundNodeCommsInterface.
pub fn new(
event_publisher: Publisher<BlockEvent>,
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
outbound_nci: OutboundNodeCommsInterface,
) -> Self
{
Self {
event_publisher: Arc::new(RwLock::new(event_publisher)),
block_event_sender,
blockchain_db,
mempool,
consensus_manager,
Expand Down Expand Up @@ -305,12 +307,10 @@ where T: BlockchainBackend + 'static
BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast))
},
};
self.event_publisher
.write()
.await
.send(block_event)
.await
self.block_event_sender
.send(Arc::new(block_event))
.map_err(|_| CommsInterfaceError::EventStreamError)?;

// Propagate verified block to remote nodes
if let Ok(add_block_result) = add_block_result {
let propagate = match add_block_result {
Expand Down Expand Up @@ -365,7 +365,7 @@ where T: BlockchainBackend + 'static
fn clone(&self) -> Self {
// All members use Arc's internally so calling clone should be cheap.
Self {
event_publisher: self.event_publisher.clone(),
block_event_sender: self.block_event_sender.clone(),
blockchain_db: self.blockchain_db.clone(),
mempool: self.mempool.clone(),
consensus_manager: self.consensus_manager.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,43 @@ use crate::{
proof_of_work::{Difficulty, PowAlgorithm},
};
use futures::{stream::Fuse, StreamExt};
use tari_broadcast_channel::Subscriber;
use std::sync::Arc;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower_service::Service;

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
pub type BlockEventReceiver = broadcast::Receiver<Arc<BlockEvent>>;

/// The InboundNodeCommsInterface provides an interface to request information from the current local node by other
/// internal services.
#[derive(Clone)]
pub struct LocalNodeCommsInterface {
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>,
block_event_stream: Subscriber<BlockEvent>,
block_event_sender: BlockEventSender,
}

impl LocalNodeCommsInterface {
/// Construct a new LocalNodeCommsInterface with the specified SenderService.
pub fn new(
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>,
block_event_stream: Subscriber<BlockEvent>,
block_event_sender: BlockEventSender,
) -> Self
{
Self {
request_sender,
block_sender,
block_event_stream,
block_event_sender,
}
}

pub fn get_block_event_stream(&self) -> Subscriber<BlockEvent> {
self.block_event_stream.clone()
pub fn get_block_event_stream(&self) -> BlockEventReceiver {
self.block_event_sender.subscribe()
}

pub fn get_block_event_stream_fused(&self) -> Fuse<Subscriber<BlockEvent>> {
pub fn get_block_event_stream_fused(&self) -> Fuse<BlockEventReceiver> {
self.get_block_event_stream().fuse()
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/comms_interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ pub use comms_request::{MmrStateRequest, NodeCommsRequest};
pub use comms_response::NodeCommsResponse;
pub use error::CommsInterfaceError;
pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers};
pub use local_interface::LocalNodeCommsInterface;
pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface};
pub use outbound_interface::OutboundNodeCommsInterface;
9 changes: 4 additions & 5 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::{
use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, future, Future, Stream, StreamExt};
use log::*;
use std::{convert::TryFrom, sync::Arc};
use tari_broadcast_channel::bounded;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_p2p::{
comms_connector::PeerMessage,
Expand All @@ -51,7 +50,7 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime;
use tokio::{runtime, sync::broadcast};

const LOG_TARGET: &str = "c::bn::service::initializer";

Expand Down Expand Up @@ -166,14 +165,14 @@ where T: BlockchainBackend + 'static
let (local_block_sender_service, local_block_stream) = reply_channel::unbounded();
let outbound_nci =
OutboundNodeCommsInterface::new(outbound_request_sender_service, outbound_block_sender_service);
let (block_event_publisher, block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let local_nci = LocalNodeCommsInterface::new(
local_request_sender_service,
local_block_sender_service,
block_event_subscriber,
block_event_sender.clone(),
);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
self.blockchain_db.clone(),
self.mempool.clone(),
self.consensus_manager.clone(),
Expand Down
16 changes: 11 additions & 5 deletions base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node::{comms_interface::BlockEvent, generate_request_key, RequestKey, WaitingRequests},
base_node::{
comms_interface::{BlockEvent, BlockEventReceiver},
generate_request_key,
RequestKey,
WaitingRequests,
},
chain_storage::BlockchainBackend,
mempool::{
proto,
Expand All @@ -48,7 +53,6 @@ use futures::{
use log::*;
use rand::rngs::OsRng;
use std::{convert::TryInto, sync::Arc, time::Duration};
use tari_broadcast_channel::Subscriber;
use tari_comms::peer_manager::NodeId;
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
Expand All @@ -70,7 +74,7 @@ pub struct MempoolStreams<SOutReq, SInReq, SInRes, STxIn, SLocalReq> {
inbound_response_stream: SInRes,
inbound_transaction_stream: STxIn,
local_request_stream: SLocalReq,
block_event_stream: Subscriber<BlockEvent>,
block_event_stream: BlockEventReceiver,
}

impl<SOutReq, SInReq, SInRes, STxIn, SLocalReq> MempoolStreams<SOutReq, SInReq, SInRes, STxIn, SLocalReq>
Expand All @@ -88,7 +92,7 @@ where
inbound_response_stream: SInRes,
inbound_transaction_stream: STxIn,
local_request_stream: SLocalReq,
block_event_stream: Subscriber<BlockEvent>,
block_event_stream: BlockEventReceiver,
) -> Self
{
Self {
Expand Down Expand Up @@ -199,7 +203,9 @@ where B: BlockchainBackend + 'static

// Block events from local Base Node.
block_event = block_event_stream.select_next_some() => {
self.spawn_handle_block_event(block_event);
if let Ok(block_event) = block_event {
self.spawn_handle_block_event(block_event);
}
},

// Timeout events for waiting requests
Expand Down
22 changes: 11 additions & 11 deletions base_layer/core/tests/node_comms_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
mod helpers;

use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, executor::block_on, StreamExt};
use tari_broadcast_channel::bounded;
use tari_comms::peer_manager::NodeId;
use tari_core::{
base_node::{
Expand All @@ -46,6 +45,7 @@ use tari_core::{
use tari_crypto::tari_utilities::hash::Hashable;
use tari_service_framework::{reply_channel, reply_channel::Receiver};
use tari_test_utils::runtime::test_async;
use tokio::sync::broadcast;

async fn test_request_responder(
receiver: &mut Receiver<(NodeCommsRequest, Option<NodeId>), Result<NodeCommsResponse, CommsInterfaceError>>,
Expand Down Expand Up @@ -91,12 +91,12 @@ fn inbound_get_metadata() {

let network = Network::LocalNet;
let consensus_manager = ConsensusManagerBuilder::new(network).build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender.clone());
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -144,12 +144,12 @@ fn inbound_fetch_kernels() {
let (mempool, store) = new_mempool();
let network = Network::LocalNet;
let consensus_manager = ConsensusManagerBuilder::new(network).build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -205,12 +205,12 @@ fn inbound_fetch_headers() {
let consensus_manager = ConsensusManagerBuilder::new(network)
.with_consensus_constants(consensus_constants)
.build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -258,7 +258,7 @@ fn outbound_fetch_utxos() {
fn inbound_fetch_utxos() {
let factories = CryptoFactories::default();
let (mempool, store) = new_mempool();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let network = Network::LocalNet;
let consensus_constants = network.create_consensus_constants();
let consensus_manager = ConsensusManagerBuilder::new(network)
Expand All @@ -268,7 +268,7 @@ fn inbound_fetch_utxos() {
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -320,7 +320,7 @@ fn outbound_fetch_blocks() {
#[test]
fn inbound_fetch_blocks() {
let (mempool, store) = new_mempool();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let network = Network::LocalNet;
let consensus_constants = network.create_consensus_constants();
let consensus_manager = ConsensusManagerBuilder::new(network)
Expand All @@ -330,7 +330,7 @@ fn inbound_fetch_blocks() {
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down
Loading