Skip to content

Commit

Permalink
Made messaging pipeline optional in CommsBuilder
Browse files Browse the repository at this point in the history
Messaging pipeline is optional. If a MessageingPipeline is not specified
in the builder, the messaging protocol, associated channels and tokio tasks
will never be initialized/spawned. This allows a more light-weight comms to be
initialized in tests that do not require the messaging pipeline (such as
integration tests for components using e.g. RPC only) without the added
cruft of setting up a _no-op_ messaging pipeline.
  • Loading branch information
sdbondi committed Jun 17, 2020
1 parent aced32c commit 3663513
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 162 deletions.
4 changes: 2 additions & 2 deletions comms/src/bounded_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::runtime::current_executor;
use crate::runtime::current;
use std::{future::Future, sync::Arc};
use tokio::{runtime, sync::Semaphore, task::JoinHandle};

Expand All @@ -42,7 +42,7 @@ impl BoundedExecutor {
}

pub fn from_current(num_permits: usize) -> Self {
Self::new(current_executor(), num_permits)
Self::new(current(), num_permits)
}

/// Spawn a future onto the Tokio runtime asynchronously blocking if there are too many
Expand Down
129 changes: 79 additions & 50 deletions comms/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ use super::{placeholder::PlaceholderService, CommsBuilderError, CommsShutdown};
use crate::{
backoff::BoxedBackoff,
bounded_executor::BoundedExecutor,
builder::consts,
connection_manager::{ConnectionManager, ConnectionManagerEvent, ConnectionManagerRequester},
connectivity::{ConnectivityManager, ConnectivityRequester},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
peer_manager::{NodeIdentity, PeerManager},
pipeline,
protocol::{messaging, messaging::MessagingProtocol},
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotifier, Protocols},
runtime,
runtime::task,
tor,
transports::Transport,
};
Expand All @@ -58,13 +61,10 @@ pub struct BuiltCommsNode<
pub connectivity_requester: ConnectivityRequester,
pub messaging_pipeline: Option<pipeline::Config<TInPipe, TOutPipe, TOutReq>>,
pub node_identity: Arc<NodeIdentity>,
pub messaging: MessagingProtocol,
pub messaging_event_tx: messaging::MessagingEventSender,
pub inbound_message_rx: mpsc::Receiver<InboundMessage>,
pub hidden_service: Option<tor::HiddenService>,
pub messaging_request_tx: mpsc::Sender<messaging::MessagingRequest>,
pub shutdown: Shutdown,
pub peer_manager: Arc<PeerManager>,
pub protocols: Protocols<Substream>,
pub shutdown: Shutdown,
}

impl<TTransport, TInPipe, TOutPipe, TOutReq> BuiltCommsNode<TTransport, TInPipe, TOutPipe, TOutReq>
Expand Down Expand Up @@ -100,11 +100,8 @@ where
connectivity_manager: self.connectivity_manager,
connectivity_requester: self.connectivity_requester,
node_identity: self.node_identity,
messaging: self.messaging,
messaging_event_tx: self.messaging_event_tx,
inbound_message_rx: self.inbound_message_rx,
shutdown: self.shutdown,
messaging_request_tx: self.messaging_request_tx,
protocols: self.protocols,
hidden_service: self.hidden_service,
peer_manager: self.peer_manager,
}
Expand All @@ -131,19 +128,16 @@ where

pub async fn spawn(self) -> Result<CommsNode, CommsBuilderError> {
let BuiltCommsNode {
connection_manager,
mut connection_manager,
connection_manager_requester,
connection_manager_event_tx,
connectivity_manager,
connectivity_requester,
messaging_pipeline,
messaging_request_tx,
inbound_message_rx,
node_identity,
shutdown,
peer_manager,
messaging,
messaging_event_tx,
mut protocols,
hidden_service,
} = self;

Expand All @@ -163,34 +157,48 @@ where
"Your node's public address is '{}'",
node_identity.public_address()
);
let messaging_pipeline = messaging_pipeline.ok_or(CommsBuilderError::MessagingPiplineNotProvided)?;

let mut complete_signals = Vec::new();
let events_stream = connection_manager_event_tx.subscribe();
let conn_man_shutdown_signal = connection_manager.complete_signal();

let executor = runtime::current_executor();
complete_signals.push(connection_manager.complete_signal());

// Connectivity manager
executor.spawn(connectivity_manager.create().run());
executor.spawn(connection_manager.run());

// Spawn messaging protocol
let messaging_signal = messaging.complete_signal();
executor.spawn(messaging.run());

// Spawn inbound pipeline
let bounded_executor = BoundedExecutor::new(executor.clone(), messaging_pipeline.max_concurrent_inbound_tasks);
let inbound = pipeline::Inbound::new(
bounded_executor,
inbound_message_rx,
messaging_pipeline.inbound,
shutdown.to_signal(),
);
executor.spawn(inbound.run());
task::spawn(connectivity_manager.create().run());

let mut messaging_event_tx = None;
if let Some(messaging_pipeline) = messaging_pipeline {
let (messaging, notifier, messaging_request_tx, inbound_message_rx, messaging_event_sender) =
initialize_messaging(
node_identity.clone(),
peer_manager.clone(),
connection_manager_requester.clone(),
shutdown.to_signal(),
);
messaging_event_tx = Some(messaging_event_sender);
protocols.add(&[messaging::MESSAGING_PROTOCOL.clone()], notifier);
// Spawn messaging protocol
complete_signals.push(messaging.complete_signal());
task::spawn(messaging.run());

// Spawn inbound pipeline
let bounded_executor =
BoundedExecutor::new(runtime::current(), messaging_pipeline.max_concurrent_inbound_tasks);
let inbound = pipeline::Inbound::new(
bounded_executor,
inbound_message_rx,
messaging_pipeline.inbound,
shutdown.to_signal(),
);
task::spawn(inbound.run());

// Spawn outbound pipeline
let outbound =
pipeline::Outbound::new(runtime::current(), messaging_pipeline.outbound, messaging_request_tx);
task::spawn(outbound.run());
}

// Spawn outbound pipeline
let outbound = pipeline::Outbound::new(executor.clone(), messaging_pipeline.outbound, messaging_request_tx);
executor.spawn(outbound.run());
connection_manager.set_protocols(protocols);
task::spawn(connection_manager.run());

let listening_addr = Self::wait_listening(events_stream).await?;

Expand All @@ -202,9 +210,9 @@ where
listening_addr,
node_identity,
peer_manager,
messaging_event_tx,
messaging_event_tx: messaging_event_tx.unwrap_or_else(|| broadcast::channel(1).0),
hidden_service,
complete_signals: vec![conn_man_shutdown_signal, messaging_signal],
complete_signals,
})
}

Expand All @@ -218,11 +226,6 @@ where
Arc::clone(&self.node_identity)
}

/// Return a subscription to OMS events. This will emit events sent _after_ this subscription was created.
pub fn subscribe_messaging_events(&self) -> messaging::MessagingEventReceiver {
self.messaging_event_tx.subscribe()
}

/// Return an owned copy of a ConnectionManagerRequester. Used to initiate connections to peers.
pub fn connection_manager_requester(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
Expand Down Expand Up @@ -298,11 +301,6 @@ impl CommsNode {
self.messaging_event_tx.subscribe()
}

/// Return a clone of the of the messaging event Sender to allow for other services to create subscriptions
pub fn message_event_sender(&self) -> messaging::MessagingEventSender {
self.messaging_event_tx.clone()
}

/// Return an owned copy of a ConnectionManagerRequester. Used to initiate connections to peers.
pub fn connection_manager(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
Expand All @@ -325,3 +323,34 @@ impl CommsNode {
CommsShutdown::new(self.complete_signals)
}
}

fn initialize_messaging(
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
connection_manager_requester: ConnectionManagerRequester,
shutdown_signal: ShutdownSignal,
) -> (
messaging::MessagingProtocol,
ProtocolNotifier<Substream>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
)
{
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::MESSAGING_EVENTS_BUFFER_SIZE);
let messaging = MessagingProtocol::new(
connection_manager_requester,
peer_manager,
node_identity,
proto_rx,
messaging_request_rx,
event_tx.clone(),
inbound_message_tx,
shutdown_signal,
);

(messaging, proto_tx, messaging_request_tx, inbound_message_rx, event_tx)
}
5 changes: 0 additions & 5 deletions comms/src/builder/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ pub enum CommsBuilderError {
NodeIdentityNotSet,
#[error("The PeerStorage was not provided to the CommsBuilder. Use `with_peer_storage` to set it.")]
PeerStorageNotProvided,
#[error(
"The messaging pipeline was not provided to the CommsBuilder. Use `with_messaging_pipeline` to set it's \
pipeline."
)]
MessagingPiplineNotProvided,
#[error("Unable to receive a ConnectionManagerEvent within timeout")]
ConnectionManagerEventStreamTimeout,
#[error("ConnectionManagerEvent stream unexpectedly closed")]
Expand Down
56 changes: 3 additions & 53 deletions comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ use crate::{
ConnectivityRequest,
ConnectivityRequester,
},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeIdentity, PeerManager},
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotification, Protocols},
protocol::Protocols,
tor,
transports::{SocksTransport, TcpTransport, Transport},
types::CommsDatabase,
Expand Down Expand Up @@ -249,37 +248,6 @@ where
self
}

fn make_messaging(
&self,
conn_man_requester: ConnectionManagerRequester,
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
) -> (
messaging::MessagingProtocol,
mpsc::Sender<ProtocolNotification<Substream>>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
)
{
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::MESSAGING_EVENTS_BUFFER_SIZE);
let messaging = MessagingProtocol::new(
conn_man_requester,
peer_manager,
node_identity,
proto_rx,
messaging_request_rx,
event_tx.clone(),
inbound_message_tx,
self.shutdown.to_signal(),
);

(messaging, proto_tx, messaging_request_tx, inbound_message_rx, event_tx)
}

fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
match self.peer_storage.take() {
Some(storage) => {
Expand All @@ -294,7 +262,6 @@ where
&mut self,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
protocols: Protocols<Substream>,
request_rx: mpsc::Receiver<ConnectionManagerRequest>,
connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
) -> ConnectionManager<TTransport, BoxedBackoff>
Expand All @@ -311,7 +278,6 @@ where
request_rx,
node_identity,
peer_manager,
protocols,
connection_manager_events_tx,
self.shutdown.to_signal(),
)
Expand Down Expand Up @@ -352,20 +318,8 @@ where
let connection_manager_requester =
ConnectionManagerRequester::new(conn_man_tx, connection_manager_event_tx.clone());

let (messaging, messaging_proto_tx, messaging_request_tx, inbound_message_rx, messaging_event_tx) = self
.make_messaging(
connection_manager_requester.clone(),
peer_manager.clone(),
node_identity.clone(),
);

//---------------------------------- Protocols --------------------------------------------//
let protocols = self
.protocols
.take()
.or_else(|| Some(Protocols::new()))
.map(move |protocols| protocols.add(&[messaging::MESSAGING_PROTOCOL.clone()], messaging_proto_tx))
.expect("cannot fail");
let protocols = self.protocols.take().unwrap_or_default();

//---------------------------------- ConnectivityManager --------------------------------------------//

Expand All @@ -383,7 +337,6 @@ where
let connection_manager = self.make_connection_manager(
node_identity.clone(),
peer_manager.clone(),
protocols,
conn_man_rx,
connection_manager_event_tx.clone(),
);
Expand All @@ -394,13 +347,10 @@ where
connection_manager_event_tx,
connectivity_manager,
connectivity_requester,
messaging_request_tx,
messaging_pipeline: None,
messaging,
messaging_event_tx,
inbound_message_rx,
node_identity,
peer_manager,
protocols,
hidden_service: self.hidden_service,
shutdown: self.shutdown,
})
Expand Down
10 changes: 6 additions & 4 deletions comms/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn spawn_node(
let comms_node = CommsBuilder::new()
// These calls are just to get rid of unused function warnings.
// <IrrelevantCalls>
.with_executor(runtime::current_executor())
.with_executor(runtime::current())
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.on_shutdown(|| {})
// </IrrelevantCalls>
Expand Down Expand Up @@ -103,12 +103,14 @@ async fn peer_to_peer_custom_protocols() {
// Setup test protocols
let (test_sender, _test_protocol_rx1) = mpsc::channel(10);
let (another_test_sender, mut another_test_protocol_rx1) = mpsc::channel(10);
let protocols1 = Protocols::new()
let mut protocols1 = Protocols::new();
protocols1
.add(&[TEST_PROTOCOL], test_sender)
.add(&[ANOTHER_TEST_PROTOCOL], another_test_sender);
let (test_sender, mut test_protocol_rx2) = mpsc::channel(10);
let (another_test_sender, _another_test_protocol_rx2) = mpsc::channel(10);
let protocols2 = Protocols::new()
let mut protocols2 = Protocols::new();
protocols2
.add(&[TEST_PROTOCOL], test_sender)
.add(&[ANOTHER_TEST_PROTOCOL], another_test_sender);

Expand Down Expand Up @@ -288,7 +290,7 @@ async fn peer_to_peer_messaging_simultaneous() {
.unwrap();

// Simultaneously send messages between the two nodes
let rt_handle = runtime::current_executor();
let rt_handle = runtime::current();
let handle1 = rt_handle.spawn(async move {
for i in 0..NUM_MSGS {
let outbound_msg = OutboundMessage::new(
Expand Down
Loading

0 comments on commit 3663513

Please sign in to comment.