Skip to content

Commit

Permalink
Added multiplexed substream counts (#1842)
Browse files Browse the repository at this point in the history
Merge pull request #1842

Added multiplexed substream counts

* pull/1842/head:
  Added multiplexed substream counts
  • Loading branch information
sdbondi committed May 13, 2020
2 parents 82255ec + d509f4a commit 324ab21
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 135 deletions.
11 changes: 6 additions & 5 deletions comms/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ use crate::{
},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeIdentity, PeerManager},
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotification, Protocols},
tor,
transports::{SocksTransport, TcpWithTorTransport, Transport},
types::{CommsDatabase, CommsSubstream},
types::CommsDatabase,
};
use futures::{channel::mpsc, AsyncRead, AsyncWrite};
use log::*;
Expand All @@ -73,7 +74,7 @@ pub struct CommsBuilder<TTransport> {
node_identity: Option<Arc<NodeIdentity>>,
transport: Option<TTransport>,
executor: Option<runtime::Handle>,
protocols: Option<Protocols<CommsSubstream>>,
protocols: Option<Protocols<Substream>>,
dial_backoff: Option<BoxedBackoff>,
hidden_service: Option<tor::HiddenService>,
connection_manager_config: ConnectionManagerConfig,
Expand Down Expand Up @@ -220,7 +221,7 @@ where
}
}

pub fn with_protocols(mut self, protocols: Protocols<yamux::Stream>) -> Self {
pub fn with_protocols(mut self, protocols: Protocols<Substream>) -> Self {
self.protocols = Some(protocols);
self
}
Expand All @@ -238,7 +239,7 @@ where
node_identity: Arc<NodeIdentity>,
) -> (
messaging::MessagingProtocol,
mpsc::Sender<ProtocolNotification<CommsSubstream>>,
mpsc::Sender<ProtocolNotification<Substream>>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
Expand Down Expand Up @@ -277,7 +278,7 @@ where
&mut self,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
protocols: Protocols<CommsSubstream>,
protocols: Protocols<Substream>,
request_rx: mpsc::Receiver<ConnectionManagerRequest>,
connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
) -> ConnectionManager<TTransport, BoxedBackoff>
Expand Down
4 changes: 2 additions & 2 deletions comms/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use crate::{
memsocket,
message::{InboundMessage, OutboundMessage},
multiaddr::{Multiaddr, Protocol},
multiplexing::Substream,
peer_manager::{Peer, PeerFeatures},
pipeline,
pipeline::SinkService,
protocol::{messaging::MessagingEvent, ProtocolEvent, Protocols},
runtime,
test_utils::node_identity::build_node_identity,
transports::MemoryTransport,
types::CommsSubstream,
CommsNode,
};
use bytes::Bytes;
Expand All @@ -44,7 +44,7 @@ use tari_storage::HashmapDatabase;
use tari_test_utils::{collect_stream, unpack_enum};

async fn spawn_node(
protocols: Protocols<CommsSubstream>,
protocols: Protocols<Substream>,
) -> (CommsNode, mpsc::Receiver<InboundMessage>, mpsc::Sender<OutboundMessage>) {
let addr = format!("/memory/{}", memsocket::acquire_next_memsocket_port())
.parse::<Multiaddr>()
Expand Down
7 changes: 4 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{
};
use crate::{
backoff::Backoff,
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity},
protocol::{ProtocolEvent, ProtocolId, Protocols},
Expand Down Expand Up @@ -72,7 +73,7 @@ pub enum ConnectionManagerEvent {
ListenFailed(ConnectionManagerError),

// Substreams
NewInboundSubstream(Box<NodeId>, ProtocolId, yamux::Stream),
NewInboundSubstream(Box<NodeId>, ProtocolId, Substream),
}

impl fmt::Display for ConnectionManagerEvent {
Expand Down Expand Up @@ -157,7 +158,7 @@ pub struct ConnectionManager<TTransport, TBackoff> {
node_identity: Arc<NodeIdentity>,
active_connections: HashMap<NodeId, PeerConnection>,
shutdown_signal: Option<ShutdownSignal>,
protocols: Protocols<yamux::Stream>,
protocols: Protocols<Substream>,
listener_address: Option<Multiaddr>,
listening_notifiers: Vec<oneshot::Sender<Multiaddr>>,
connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
Expand All @@ -179,7 +180,7 @@ where
request_rx: mpsc::Receiver<ConnectionManagerRequest>,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
protocols: Protocols<yamux::Stream>,
protocols: Protocols<Substream>,
connection_manager_events_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
shutdown_signal: ShutdownSignal,
) -> Self
Expand Down
13 changes: 6 additions & 7 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ use super::{
types::ConnectionDirection,
};
use crate::{
multiplexing::{IncomingSubstreams, Yamux},
multiplexing::{Control, IncomingSubstreams, Substream, Yamux},
peer_manager::{NodeId, Peer, PeerFeatures},
protocol::{ProtocolId, ProtocolNegotiation},
runtime,
types::CommsSubstream,
};
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -92,7 +91,7 @@ pub enum PeerConnectionRequest {
/// Open a new substream and negotiate the given protocol
OpenSubstream(
ProtocolId,
oneshot::Sender<Result<NegotiatedSubstream<CommsSubstream>, PeerConnectionError>>,
oneshot::Sender<Result<NegotiatedSubstream<Substream>, PeerConnectionError>>,
),
/// Disconnect all substreams and close the transport connection
Disconnect(bool, oneshot::Sender<()>),
Expand Down Expand Up @@ -167,7 +166,7 @@ impl PeerConnection {
pub async fn open_substream(
&mut self,
protocol_id: &ProtocolId,
) -> Result<NegotiatedSubstream<CommsSubstream>, PeerConnectionError>
) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError>
{
let (reply_tx, reply_rx) = oneshot::channel();
self.request_tx
Expand Down Expand Up @@ -220,7 +219,7 @@ pub struct PeerConnectionActor {
direction: ConnectionDirection,
incoming_substreams: Fuse<IncomingSubstreams>,
substream_shutdown: Option<Shutdown>,
control: yamux::Control,
control: Control,
event_notifier: mpsc::Sender<ConnectionManagerEvent>,
supported_protocols: Vec<ProtocolId>,
shutdown: bool,
Expand Down Expand Up @@ -309,7 +308,7 @@ impl PeerConnectionActor {
}
}

async fn handle_incoming_substream(&mut self, mut stream: yamux::Stream) -> Result<(), PeerConnectionError> {
async fn handle_incoming_substream(&mut self, mut stream: Substream) -> Result<(), PeerConnectionError> {
let selected_protocol = ProtocolNegotiation::new(&mut stream)
.negotiate_protocol_inbound(&self.supported_protocols)
.await?;
Expand All @@ -327,7 +326,7 @@ impl PeerConnectionActor {
async fn open_negotiated_protocol_stream(
&mut self,
protocol: ProtocolId,
) -> Result<NegotiatedSubstream<CommsSubstream>, PeerConnectionError>
) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError>
{
debug!(
target: LOG_TARGET,
Expand Down
2 changes: 1 addition & 1 deletion comms/src/multiplexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod yamux;
pub use self::yamux::{Control, IncomingSubstreams, Yamux};
pub use self::yamux::{ConnectionError, Control, IncomingSubstreams, Substream, Yamux};
Loading

0 comments on commit 324ab21

Please sign in to comment.