Skip to content

Commit

Permalink
Switched the BlockEvent stream to Tokio broadcast #1889
Browse files Browse the repository at this point in the history
Merge pull request #1889

Reduce the memory footprint for block event broadcast channel by using
a tokio broadcast channel.

- The tari_broadcast_channel is memory intensive for sharing BlockEvents as it keeps the full set of events up to the specified bound limit even when the receivers have already processed the events. Using a tokio broadcast channel will result in less memory usage as all fully received events will be discarded without reaching the bound limit.
- The BlockEvent stream currently uses the tari_broadcast_channel, this channel is bounded and does not clear when the receivers have read the events. As newly added and reorged blocks are distributed between services using this event stream, the channel can be memory intensive.
- These changes replaced the tari_broadcast_channel used for the BlockEvent stream with a tokio broadcast channel. The Tokio broadcast channel has the benefit of removing the events from the channel as soon as all the receivers have read the events. This should minimise the amount of memory required.

* pull/1889/head:
  - The tari_broadcast_channel is memory intensive for sharing BlockEvents as it keeps the full set of events up to the specified bound limit even when the receivers have already processed the events. Using a tokio broadcast channel will result in less memory usage as all fully received events will be discarded without reaching the bound limit. - The BlockEvent stream currently uses the tari_broadcast_channel, this channel is bounded and does not clear when the receivers have read the events. As newly added and reorged blocks are distributed between services using this event stream, the channel can be memory intensive. - These changes replaced the tari_broadcast_channel used for the BlockEvent stream with a tokio broadcast channel. The Tokio broadcast channel has the benefit of removing the events from the channel as soon as all the receivers have read the events. This should minimise the amount of memory required.
  • Loading branch information
sdbondi committed May 20, 2020
2 parents e262005 + b4d571a commit 702b51c
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 60 deletions.
23 changes: 13 additions & 10 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ChainMetadataService {
/// Run the service
pub async fn run(mut self) {
let mut liveness_event_stream = self.liveness.get_event_stream_fused();
let mut base_node_event_stream = self.base_node.get_block_event_stream_fused();
let mut block_event_stream = self.base_node.get_block_event_stream_fused();

log_if_error!(
target: LOG_TARGET,
Expand All @@ -80,13 +80,15 @@ impl ChainMetadataService {

loop {
futures::select! {
event = base_node_event_stream.select_next_some() => {
log_if_error!(
level: debug,
target: LOG_TARGET,
"Failed to handle base node event because '{}'",
self.handle_block_event(&event).await
);
block_event = block_event_stream.select_next_some() => {
if let Ok(block_event) = block_event {
log_if_error!(
level: debug,
target: LOG_TARGET,
"Failed to handle block event because '{}'",
self.handle_block_event(&block_event).await
);
}
},

liveness_event = liveness_event_stream.select_next_some() => {
Expand Down Expand Up @@ -265,15 +267,16 @@ mod test {
use tari_p2p::services::liveness::{mock::create_p2p_liveness_mock, LivenessRequest, PingPongEvent};
use tari_service_framework::reply_channel;
use tari_test_utils::{runtime, unpack_enum};
use tokio::sync::broadcast;

fn create_base_node_nci() -> (
LocalNodeCommsInterface,
reply_channel::Receiver<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
) {
let (base_node_sender, base_node_receiver) = reply_channel::unbounded();
let (block_sender, _block_receiver) = reply_channel::unbounded();
let (_base_node_publisher, subscriber) = broadcast_channel::bounded(1);
let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, subscriber);
let (block_event_sender, _) = broadcast::channel(50);
let base_node = LocalNodeCommsInterface::new(base_node_sender, block_sender, block_event_sender);

(base_node, base_node_receiver)
}
Expand Down
26 changes: 13 additions & 13 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@

use crate::{
base_node::{
comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse},
comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
NodeCommsRequest,
NodeCommsResponse,
},
OutboundNodeCommsInterface,
},
blocks::{blockheader::BlockHeader, Block, NewBlockTemplate},
Expand All @@ -39,17 +44,14 @@ use crate::{
proof_of_work::{get_target_difficulty, Difficulty, PowAlgorithm},
transactions::transaction::{TransactionKernel, TransactionOutput},
};
use futures::SinkExt;
use log::*;
use std::{
fmt::{Display, Error, Formatter},
sync::Arc,
};
use strum_macros::Display;
use tari_broadcast_channel::Publisher;
use tari_comms::peer_manager::NodeId;
use tari_crypto::tari_utilities::{hash::Hashable, hex::Hex};
use tokio::sync::RwLock;

const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
const MAX_HEADERS_PER_RESPONSE: u32 = 100;
Expand Down Expand Up @@ -89,7 +91,7 @@ impl From<bool> for Broadcast {
pub struct InboundNodeCommsHandlers<T>
where T: BlockchainBackend + 'static
{
event_publisher: Arc<RwLock<Publisher<BlockEvent>>>,
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
Expand All @@ -101,15 +103,15 @@ where T: BlockchainBackend + 'static
{
/// Construct a new InboundNodeCommsInterface.
pub fn new(
event_publisher: Publisher<BlockEvent>,
block_event_sender: BlockEventSender,
blockchain_db: BlockchainDatabase<T>,
mempool: Mempool<T>,
consensus_manager: ConsensusManager,
outbound_nci: OutboundNodeCommsInterface,
) -> Self
{
Self {
event_publisher: Arc::new(RwLock::new(event_publisher)),
block_event_sender,
blockchain_db,
mempool,
consensus_manager,
Expand Down Expand Up @@ -305,12 +307,10 @@ where T: BlockchainBackend + 'static
BlockEvent::Invalid((Box::new(block.clone()), e, *broadcast))
},
};
self.event_publisher
.write()
.await
.send(block_event)
.await
self.block_event_sender
.send(Arc::new(block_event))
.map_err(|_| CommsInterfaceError::EventStreamError)?;

// Propagate verified block to remote nodes
if let Ok(add_block_result) = add_block_result {
let propagate = match add_block_result {
Expand Down Expand Up @@ -365,7 +365,7 @@ where T: BlockchainBackend + 'static
fn clone(&self) -> Self {
// All members use Arc's internally so calling clone should be cheap.
Self {
event_publisher: self.event_publisher.clone(),
block_event_sender: self.block_event_sender.clone(),
blockchain_db: self.blockchain_db.clone(),
mempool: self.mempool.clone(),
consensus_manager: self.consensus_manager.clone(),
Expand Down
18 changes: 11 additions & 7 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,43 @@ use crate::{
proof_of_work::{Difficulty, PowAlgorithm},
};
use futures::{stream::Fuse, StreamExt};
use tari_broadcast_channel::Subscriber;
use std::sync::Arc;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower_service::Service;

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
pub type BlockEventReceiver = broadcast::Receiver<Arc<BlockEvent>>;

/// The InboundNodeCommsInterface provides an interface to request information from the current local node by other
/// internal services.
#[derive(Clone)]
pub struct LocalNodeCommsInterface {
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>,
block_event_stream: Subscriber<BlockEvent>,
block_event_sender: BlockEventSender,
}

impl LocalNodeCommsInterface {
/// Construct a new LocalNodeCommsInterface with the specified SenderService.
pub fn new(
request_sender: SenderService<NodeCommsRequest, Result<NodeCommsResponse, CommsInterfaceError>>,
block_sender: SenderService<(Block, Broadcast), Result<(), CommsInterfaceError>>,
block_event_stream: Subscriber<BlockEvent>,
block_event_sender: BlockEventSender,
) -> Self
{
Self {
request_sender,
block_sender,
block_event_stream,
block_event_sender,
}
}

pub fn get_block_event_stream(&self) -> Subscriber<BlockEvent> {
self.block_event_stream.clone()
pub fn get_block_event_stream(&self) -> BlockEventReceiver {
self.block_event_sender.subscribe()
}

pub fn get_block_event_stream_fused(&self) -> Fuse<Subscriber<BlockEvent>> {
pub fn get_block_event_stream_fused(&self) -> Fuse<BlockEventReceiver> {
self.get_block_event_stream().fuse()
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/comms_interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ pub use comms_request::{MmrStateRequest, NodeCommsRequest};
pub use comms_response::NodeCommsResponse;
pub use error::CommsInterfaceError;
pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers};
pub use local_interface::LocalNodeCommsInterface;
pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface};
pub use outbound_interface::OutboundNodeCommsInterface;
9 changes: 4 additions & 5 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::{
use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, future, Future, Stream, StreamExt};
use log::*;
use std::{convert::TryFrom, sync::Arc};
use tari_broadcast_channel::bounded;
use tari_comms_dht::outbound::OutboundMessageRequester;
use tari_p2p::{
comms_connector::PeerMessage,
Expand All @@ -51,7 +50,7 @@ use tari_service_framework::{
ServiceInitializer,
};
use tari_shutdown::ShutdownSignal;
use tokio::runtime;
use tokio::{runtime, sync::broadcast};

const LOG_TARGET: &str = "c::bn::service::initializer";

Expand Down Expand Up @@ -166,14 +165,14 @@ where T: BlockchainBackend + 'static
let (local_block_sender_service, local_block_stream) = reply_channel::unbounded();
let outbound_nci =
OutboundNodeCommsInterface::new(outbound_request_sender_service, outbound_block_sender_service);
let (block_event_publisher, block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let local_nci = LocalNodeCommsInterface::new(
local_request_sender_service,
local_block_sender_service,
block_event_subscriber,
block_event_sender.clone(),
);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
self.blockchain_db.clone(),
self.mempool.clone(),
self.consensus_manager.clone(),
Expand Down
16 changes: 11 additions & 5 deletions base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node::{comms_interface::BlockEvent, generate_request_key, RequestKey, WaitingRequests},
base_node::{
comms_interface::{BlockEvent, BlockEventReceiver},
generate_request_key,
RequestKey,
WaitingRequests,
},
chain_storage::BlockchainBackend,
mempool::{
proto,
Expand All @@ -48,7 +53,6 @@ use futures::{
use log::*;
use rand::rngs::OsRng;
use std::{convert::TryInto, sync::Arc, time::Duration};
use tari_broadcast_channel::Subscriber;
use tari_comms::peer_manager::NodeId;
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
Expand All @@ -70,7 +74,7 @@ pub struct MempoolStreams<SOutReq, SInReq, SInRes, STxIn, SLocalReq> {
inbound_response_stream: SInRes,
inbound_transaction_stream: STxIn,
local_request_stream: SLocalReq,
block_event_stream: Subscriber<BlockEvent>,
block_event_stream: BlockEventReceiver,
}

impl<SOutReq, SInReq, SInRes, STxIn, SLocalReq> MempoolStreams<SOutReq, SInReq, SInRes, STxIn, SLocalReq>
Expand All @@ -88,7 +92,7 @@ where
inbound_response_stream: SInRes,
inbound_transaction_stream: STxIn,
local_request_stream: SLocalReq,
block_event_stream: Subscriber<BlockEvent>,
block_event_stream: BlockEventReceiver,
) -> Self
{
Self {
Expand Down Expand Up @@ -199,7 +203,9 @@ where B: BlockchainBackend + 'static

// Block events from local Base Node.
block_event = block_event_stream.select_next_some() => {
self.spawn_handle_block_event(block_event);
if let Ok(block_event) = block_event {
self.spawn_handle_block_event(block_event);
}
},

// Timeout events for waiting requests
Expand Down
22 changes: 11 additions & 11 deletions base_layer/core/tests/node_comms_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
mod helpers;

use futures::{channel::mpsc::unbounded as futures_mpsc_channel_unbounded, executor::block_on, StreamExt};
use tari_broadcast_channel::bounded;
use tari_comms::peer_manager::NodeId;
use tari_core::{
base_node::{
Expand All @@ -46,6 +45,7 @@ use tari_core::{
use tari_crypto::tari_utilities::hash::Hashable;
use tari_service_framework::{reply_channel, reply_channel::Receiver};
use tari_test_utils::runtime::test_async;
use tokio::sync::broadcast;

async fn test_request_responder(
receiver: &mut Receiver<(NodeCommsRequest, Option<NodeId>), Result<NodeCommsResponse, CommsInterfaceError>>,
Expand Down Expand Up @@ -91,12 +91,12 @@ fn inbound_get_metadata() {

let network = Network::LocalNet;
let consensus_manager = ConsensusManagerBuilder::new(network).build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender.clone());
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -144,12 +144,12 @@ fn inbound_fetch_kernels() {
let (mempool, store) = new_mempool();
let network = Network::LocalNet;
let consensus_manager = ConsensusManagerBuilder::new(network).build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -205,12 +205,12 @@ fn inbound_fetch_headers() {
let consensus_manager = ConsensusManagerBuilder::new(network)
.with_consensus_constants(consensus_constants)
.build();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let (request_sender, _) = reply_channel::unbounded();
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -258,7 +258,7 @@ fn outbound_fetch_utxos() {
fn inbound_fetch_utxos() {
let factories = CryptoFactories::default();
let (mempool, store) = new_mempool();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let network = Network::LocalNet;
let consensus_constants = network.create_consensus_constants();
let consensus_manager = ConsensusManagerBuilder::new(network)
Expand All @@ -268,7 +268,7 @@ fn inbound_fetch_utxos() {
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down Expand Up @@ -320,7 +320,7 @@ fn outbound_fetch_blocks() {
#[test]
fn inbound_fetch_blocks() {
let (mempool, store) = new_mempool();
let (block_event_publisher, _block_event_subscriber) = bounded(100);
let (block_event_sender, _) = broadcast::channel(50);
let network = Network::LocalNet;
let consensus_constants = network.create_consensus_constants();
let consensus_manager = ConsensusManagerBuilder::new(network)
Expand All @@ -330,7 +330,7 @@ fn inbound_fetch_blocks() {
let (block_sender, _) = futures_mpsc_channel_unbounded();
let outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_publisher,
block_event_sender,
store.clone(),
mempool,
consensus_manager,
Expand Down
Loading

0 comments on commit 702b51c

Please sign in to comment.