Skip to content

Commit

Permalink
Made messaging pipeline optional in CommsBuilder (#2001)
Browse files Browse the repository at this point in the history
Merge pull request #2001

Made messaging pipeline optional in CommsBuilder
  • Loading branch information
CjS77 committed Jun 18, 2020
2 parents 8fac55d + 3663513 commit b02295f
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 162 deletions.
4 changes: 2 additions & 2 deletions comms/src/bounded_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::runtime::current_executor;
use crate::runtime::current;
use std::{future::Future, sync::Arc};
use tokio::{runtime, sync::Semaphore, task::JoinHandle};

Expand All @@ -42,7 +42,7 @@ impl BoundedExecutor {
}

pub fn from_current(num_permits: usize) -> Self {
Self::new(current_executor(), num_permits)
Self::new(current(), num_permits)
}

/// Spawn a future onto the Tokio runtime asynchronously blocking if there are too many
Expand Down
129 changes: 79 additions & 50 deletions comms/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ use super::{placeholder::PlaceholderService, CommsBuilderError, CommsShutdown};
use crate::{
backoff::BoxedBackoff,
bounded_executor::BoundedExecutor,
builder::consts,
connection_manager::{ConnectionManager, ConnectionManagerEvent, ConnectionManagerRequester},
connectivity::{ConnectivityManager, ConnectivityRequester},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
peer_manager::{NodeIdentity, PeerManager},
pipeline,
protocol::{messaging, messaging::MessagingProtocol},
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotifier, Protocols},
runtime,
runtime::task,
tor,
transports::Transport,
};
Expand All @@ -58,13 +61,10 @@ pub struct BuiltCommsNode<
pub connectivity_requester: ConnectivityRequester,
pub messaging_pipeline: Option<pipeline::Config<TInPipe, TOutPipe, TOutReq>>,
pub node_identity: Arc<NodeIdentity>,
pub messaging: MessagingProtocol,
pub messaging_event_tx: messaging::MessagingEventSender,
pub inbound_message_rx: mpsc::Receiver<InboundMessage>,
pub hidden_service: Option<tor::HiddenService>,
pub messaging_request_tx: mpsc::Sender<messaging::MessagingRequest>,
pub shutdown: Shutdown,
pub peer_manager: Arc<PeerManager>,
pub protocols: Protocols<Substream>,
pub shutdown: Shutdown,
}

impl<TTransport, TInPipe, TOutPipe, TOutReq> BuiltCommsNode<TTransport, TInPipe, TOutPipe, TOutReq>
Expand Down Expand Up @@ -100,11 +100,8 @@ where
connectivity_manager: self.connectivity_manager,
connectivity_requester: self.connectivity_requester,
node_identity: self.node_identity,
messaging: self.messaging,
messaging_event_tx: self.messaging_event_tx,
inbound_message_rx: self.inbound_message_rx,
shutdown: self.shutdown,
messaging_request_tx: self.messaging_request_tx,
protocols: self.protocols,
hidden_service: self.hidden_service,
peer_manager: self.peer_manager,
}
Expand All @@ -131,19 +128,16 @@ where

pub async fn spawn(self) -> Result<CommsNode, CommsBuilderError> {
let BuiltCommsNode {
connection_manager,
mut connection_manager,
connection_manager_requester,
connection_manager_event_tx,
connectivity_manager,
connectivity_requester,
messaging_pipeline,
messaging_request_tx,
inbound_message_rx,
node_identity,
shutdown,
peer_manager,
messaging,
messaging_event_tx,
mut protocols,
hidden_service,
} = self;

Expand All @@ -163,34 +157,48 @@ where
"Your node's public address is '{}'",
node_identity.public_address()
);
let messaging_pipeline = messaging_pipeline.ok_or(CommsBuilderError::MessagingPiplineNotProvided)?;

let mut complete_signals = Vec::new();
let events_stream = connection_manager_event_tx.subscribe();
let conn_man_shutdown_signal = connection_manager.complete_signal();

let executor = runtime::current_executor();
complete_signals.push(connection_manager.complete_signal());

// Connectivity manager
executor.spawn(connectivity_manager.create().run());
executor.spawn(connection_manager.run());

// Spawn messaging protocol
let messaging_signal = messaging.complete_signal();
executor.spawn(messaging.run());

// Spawn inbound pipeline
let bounded_executor = BoundedExecutor::new(executor.clone(), messaging_pipeline.max_concurrent_inbound_tasks);
let inbound = pipeline::Inbound::new(
bounded_executor,
inbound_message_rx,
messaging_pipeline.inbound,
shutdown.to_signal(),
);
executor.spawn(inbound.run());
task::spawn(connectivity_manager.create().run());

let mut messaging_event_tx = None;
if let Some(messaging_pipeline) = messaging_pipeline {
let (messaging, notifier, messaging_request_tx, inbound_message_rx, messaging_event_sender) =
initialize_messaging(
node_identity.clone(),
peer_manager.clone(),
connection_manager_requester.clone(),
shutdown.to_signal(),
);
messaging_event_tx = Some(messaging_event_sender);
protocols.add(&[messaging::MESSAGING_PROTOCOL.clone()], notifier);
// Spawn messaging protocol
complete_signals.push(messaging.complete_signal());
task::spawn(messaging.run());

// Spawn inbound pipeline
let bounded_executor =
BoundedExecutor::new(runtime::current(), messaging_pipeline.max_concurrent_inbound_tasks);
let inbound = pipeline::Inbound::new(
bounded_executor,
inbound_message_rx,
messaging_pipeline.inbound,
shutdown.to_signal(),
);
task::spawn(inbound.run());

// Spawn outbound pipeline
let outbound =
pipeline::Outbound::new(runtime::current(), messaging_pipeline.outbound, messaging_request_tx);
task::spawn(outbound.run());
}

// Spawn outbound pipeline
let outbound = pipeline::Outbound::new(executor.clone(), messaging_pipeline.outbound, messaging_request_tx);
executor.spawn(outbound.run());
connection_manager.set_protocols(protocols);
task::spawn(connection_manager.run());

let listening_addr = Self::wait_listening(events_stream).await?;

Expand All @@ -202,9 +210,9 @@ where
listening_addr,
node_identity,
peer_manager,
messaging_event_tx,
messaging_event_tx: messaging_event_tx.unwrap_or_else(|| broadcast::channel(1).0),
hidden_service,
complete_signals: vec![conn_man_shutdown_signal, messaging_signal],
complete_signals,
})
}

Expand All @@ -218,11 +226,6 @@ where
Arc::clone(&self.node_identity)
}

/// Return a subscription to OMS events. This will emit events sent _after_ this subscription was created.
pub fn subscribe_messaging_events(&self) -> messaging::MessagingEventReceiver {
self.messaging_event_tx.subscribe()
}

/// Return an owned copy of a ConnectionManagerRequester. Used to initiate connections to peers.
pub fn connection_manager_requester(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
Expand Down Expand Up @@ -298,11 +301,6 @@ impl CommsNode {
self.messaging_event_tx.subscribe()
}

/// Return a clone of the of the messaging event Sender to allow for other services to create subscriptions
pub fn message_event_sender(&self) -> messaging::MessagingEventSender {
self.messaging_event_tx.clone()
}

/// Return an owned copy of a ConnectionManagerRequester. Used to initiate connections to peers.
pub fn connection_manager(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
Expand All @@ -325,3 +323,34 @@ impl CommsNode {
CommsShutdown::new(self.complete_signals)
}
}

fn initialize_messaging(
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
connection_manager_requester: ConnectionManagerRequester,
shutdown_signal: ShutdownSignal,
) -> (
messaging::MessagingProtocol,
ProtocolNotifier<Substream>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
)
{
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::MESSAGING_EVENTS_BUFFER_SIZE);
let messaging = MessagingProtocol::new(
connection_manager_requester,
peer_manager,
node_identity,
proto_rx,
messaging_request_rx,
event_tx.clone(),
inbound_message_tx,
shutdown_signal,
);

(messaging, proto_tx, messaging_request_tx, inbound_message_rx, event_tx)
}
5 changes: 0 additions & 5 deletions comms/src/builder/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ pub enum CommsBuilderError {
NodeIdentityNotSet,
#[error("The PeerStorage was not provided to the CommsBuilder. Use `with_peer_storage` to set it.")]
PeerStorageNotProvided,
#[error(
"The messaging pipeline was not provided to the CommsBuilder. Use `with_messaging_pipeline` to set it's \
pipeline."
)]
MessagingPiplineNotProvided,
#[error("Unable to receive a ConnectionManagerEvent within timeout")]
ConnectionManagerEventStreamTimeout,
#[error("ConnectionManagerEvent stream unexpectedly closed")]
Expand Down
56 changes: 3 additions & 53 deletions comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ use crate::{
ConnectivityRequest,
ConnectivityRequester,
},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeIdentity, PeerManager},
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotification, Protocols},
protocol::Protocols,
tor,
transports::{SocksTransport, TcpTransport, Transport},
types::CommsDatabase,
Expand Down Expand Up @@ -249,37 +248,6 @@ where
self
}

fn make_messaging(
&self,
conn_man_requester: ConnectionManagerRequester,
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
) -> (
messaging::MessagingProtocol,
mpsc::Sender<ProtocolNotification<Substream>>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
)
{
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::MESSAGING_EVENTS_BUFFER_SIZE);
let messaging = MessagingProtocol::new(
conn_man_requester,
peer_manager,
node_identity,
proto_rx,
messaging_request_rx,
event_tx.clone(),
inbound_message_tx,
self.shutdown.to_signal(),
);

(messaging, proto_tx, messaging_request_tx, inbound_message_rx, event_tx)
}

fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
match self.peer_storage.take() {
Some(storage) => {
Expand All @@ -294,7 +262,6 @@ where
&mut self,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
protocols: Protocols<Substream>,
request_rx: mpsc::Receiver<ConnectionManagerRequest>,
connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
) -> ConnectionManager<TTransport, BoxedBackoff>
Expand All @@ -311,7 +278,6 @@ where
request_rx,
node_identity,
peer_manager,
protocols,
connection_manager_events_tx,
self.shutdown.to_signal(),
)
Expand Down Expand Up @@ -352,20 +318,8 @@ where
let connection_manager_requester =
ConnectionManagerRequester::new(conn_man_tx, connection_manager_event_tx.clone());

let (messaging, messaging_proto_tx, messaging_request_tx, inbound_message_rx, messaging_event_tx) = self
.make_messaging(
connection_manager_requester.clone(),
peer_manager.clone(),
node_identity.clone(),
);

//---------------------------------- Protocols --------------------------------------------//
let protocols = self
.protocols
.take()
.or_else(|| Some(Protocols::new()))
.map(move |protocols| protocols.add(&[messaging::MESSAGING_PROTOCOL.clone()], messaging_proto_tx))
.expect("cannot fail");
let protocols = self.protocols.take().unwrap_or_default();

//---------------------------------- ConnectivityManager --------------------------------------------//

Expand All @@ -383,7 +337,6 @@ where
let connection_manager = self.make_connection_manager(
node_identity.clone(),
peer_manager.clone(),
protocols,
conn_man_rx,
connection_manager_event_tx.clone(),
);
Expand All @@ -394,13 +347,10 @@ where
connection_manager_event_tx,
connectivity_manager,
connectivity_requester,
messaging_request_tx,
messaging_pipeline: None,
messaging,
messaging_event_tx,
inbound_message_rx,
node_identity,
peer_manager,
protocols,
hidden_service: self.hidden_service,
shutdown: self.shutdown,
})
Expand Down
10 changes: 6 additions & 4 deletions comms/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn spawn_node(
let comms_node = CommsBuilder::new()
// These calls are just to get rid of unused function warnings.
// <IrrelevantCalls>
.with_executor(runtime::current_executor())
.with_executor(runtime::current())
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.on_shutdown(|| {})
// </IrrelevantCalls>
Expand Down Expand Up @@ -103,12 +103,14 @@ async fn peer_to_peer_custom_protocols() {
// Setup test protocols
let (test_sender, _test_protocol_rx1) = mpsc::channel(10);
let (another_test_sender, mut another_test_protocol_rx1) = mpsc::channel(10);
let protocols1 = Protocols::new()
let mut protocols1 = Protocols::new();
protocols1
.add(&[TEST_PROTOCOL], test_sender)
.add(&[ANOTHER_TEST_PROTOCOL], another_test_sender);
let (test_sender, mut test_protocol_rx2) = mpsc::channel(10);
let (another_test_sender, _another_test_protocol_rx2) = mpsc::channel(10);
let protocols2 = Protocols::new()
let mut protocols2 = Protocols::new();
protocols2
.add(&[TEST_PROTOCOL], test_sender)
.add(&[ANOTHER_TEST_PROTOCOL], another_test_sender);

Expand Down Expand Up @@ -288,7 +290,7 @@ async fn peer_to_peer_messaging_simultaneous() {
.unwrap();

// Simultaneously send messages between the two nodes
let rt_handle = runtime::current_executor();
let rt_handle = runtime::current();
let handle1 = rt_handle.spawn(async move {
for i in 0..NUM_MSGS {
let outbound_msg = OutboundMessage::new(
Expand Down
Loading

0 comments on commit b02295f

Please sign in to comment.