diff --git a/Cargo.lock b/Cargo.lock index 04d9254b9e5..553dd42fc91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2536,6 +2536,7 @@ dependencies = [ "libp2p-mdns", "libp2p-memory-connection-limits", "libp2p-metrics", + "libp2p-mplex", "libp2p-noise", "libp2p-ping", "libp2p-plaintext", diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index df9f7f64a7b..b29d3797b9d 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -144,6 +144,7 @@ env_logger = "0.10.0" clap = { version = "4.1.6", features = ["derive"] } tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] } +libp2p-mplex = { workspace = true } libp2p-noise = { workspace = true } libp2p-tcp = { workspace = true, features = ["tokio"] } diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index d376c45cc27..344f5d1833e 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -1,3 +1,4 @@ +use libp2p_core::upgrade::SelectUpgrade; use libp2p_core::{muxing::StreamMuxerBox, Transport}; use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, StreamMuxer, UpgradeInfo}; use libp2p_identity::{Keypair, PeerId}; @@ -129,47 +130,38 @@ impl SwarmBuilder { self.with_tcp_config(Default::default()) } - pub fn with_tcp_2( + pub fn with_tcp_2( self, - security_upgrade: SecU, - multiplexer_upgrade: MuxU, + security_upgrade: SecUpgrade, + multiplexer_upgrade: MuxUpgrade, ) -> Result< SwarmBuilder>, AuthenticationError, > where - D: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, - SecU: IntoSecurityUpgrade, - SecU::Upgrade: InboundUpgrade< - Negotiated, - Output = (PeerId, D), - Error = E, - >, - >>::Future: Send, - SecU::Upgrade: OutboundUpgrade< - Negotiated, - Output = (PeerId, D), - Error = E, - > + Clone - + Send + 'static, - - >>::Future: Send, - E: std::error::Error + Send + Sync + 'static, - <<::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, - <::Upgrade as UpgradeInfo>::Info: Send, - - MuxM: StreamMuxer + Send + 'static, - MuxM::Substream: Send + 'static, - MuxM::Error: Send + Sync + 'static, - - MuxU: IntoMultiplexerUpgrade, - MuxU::Upgrade: InboundUpgrade, Output = MuxM, Error = MuxE>, - >>::Future: Send, - MuxU::Upgrade: OutboundUpgrade, Output = MuxM, Error = MuxE> + Clone + Send + 'static, - >>::Future: Send, - MuxE: std::error::Error + Send + Sync + 'static, - <<::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, - <::Upgrade as UpgradeInfo>::Info: Send, + SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, + SecError: std::error::Error + Send + Sync + 'static, + SecUpgrade: IntoSecurityUpgrade, + SecUpgrade::Upgrade: Clone + Send + 'static, + SecUpgrade::Upgrade: InboundUpgrade, Output = (PeerId, SecStream), Error = SecError>, + >>::Future: Send, + SecUpgrade::Upgrade: OutboundUpgrade, Output = (PeerId, SecStream), Error = SecError>, + >>::Future: Send, + <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, + <>::Upgrade as UpgradeInfo>::Info: Send, + + MuxStream: StreamMuxer + Send + 'static, + MuxStream::Substream: Send + 'static, + MuxStream::Error: Send + Sync + 'static, + MuxUpgrade: IntoMultiplexerUpgrade, + MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError>, + >>::Future: Send, + MuxUpgrade::Upgrade: OutboundUpgrade, Output = MuxStream, Error = MuxError>, + MuxUpgrade::Upgrade: Clone + Send + 'static, + >>::Future: Send, + MuxError: std::error::Error + Send + Sync + 'static, + <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, + <>::Upgrade as UpgradeInfo>::Info: Send, { Ok(SwarmBuilder { phase: QuicPhase { @@ -196,34 +188,59 @@ impl SwarmBuilder { } } -pub trait IntoSecurityUpgrade { +pub trait IntoSecurityUpgrade { type Upgrade; fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade; } -impl IntoSecurityUpgrade for F +impl IntoSecurityUpgrade for (F,) where F: for<'a> FnOnce(&'a Keypair) -> T, { type Upgrade = T; fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade { - self(keypair) + self.0(keypair) } } -pub trait IntoMultiplexerUpgrade { +impl IntoSecurityUpgrade for (F1, F2) +where + F1: for<'a> FnOnce(&'a Keypair) -> T1, + F2: for<'a> FnOnce(&'a Keypair) -> T2, + T1: InboundUpgrade, Output = (PeerId, O1)>, + T2: InboundUpgrade, Output = (PeerId, O2)>, +{ + type Upgrade = map::Upgrade, fn(futures::future::Either<>>::Output, >>::Output>) -> (PeerId, futures::future::Either)>; + + fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade { + map::Upgrade::new( + SelectUpgrade::new(self.0(keypair), self.1(keypair)), + |either |futures::future::Either::factor_first(either), + ) + } +} + +pub trait IntoMultiplexerUpgrade { type Upgrade; fn into_multiplexer_upgrade(self) -> Self::Upgrade; } -impl IntoMultiplexerUpgrade for U { - type Upgrade = Self; +impl IntoMultiplexerUpgrade for (U,) { + type Upgrade = U; + + fn into_multiplexer_upgrade(self) -> Self::Upgrade { + self.0 + } +} + +impl IntoMultiplexerUpgrade for (U1, U2) { + type Upgrade = SelectUpgrade; fn into_multiplexer_upgrade(self) -> Self::Upgrade { - self + SelectUpgrade::new(self.0, self.1) } } @@ -1710,8 +1727,50 @@ mod tests { .with_tokio() .with_tcp_2( // TODO: Handle unwrap - |keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(), - libp2p_yamux::Config::default(), + (|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),), + // TODO: The single tuple is not intuitive. + (libp2p_yamux::Config::default(),), + ) + .unwrap() + .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) + .unwrap() + .build(); + } + + #[test] + #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))] + fn tcp_yamux_mplex() { + let _ = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp_2( + // TODO: Handle unwrap + (|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),), + ( + libp2p_yamux::Config::default(), + libp2p_mplex::MplexConfig::default(), + ), + ) + .unwrap() + .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) + .unwrap() + .build(); + } + + #[test] + #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))] + fn tcp_tls_noise() { + let _ = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp_2( + // TODO: Handle unwrap + ( + |keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(), + |keypair: &Keypair| libp2p_noise::Config::new(keypair).unwrap(), + ), + ( + libp2p_yamux::Config::default(), + libp2p_mplex::MplexConfig::default(), + ), ) .unwrap() .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) diff --git a/libp2p/src/builder/map.rs b/libp2p/src/builder/map.rs index d3909c61b7b..36cb1194269 100644 --- a/libp2p/src/builder/map.rs +++ b/libp2p/src/builder/map.rs @@ -23,7 +23,7 @@ use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; /// Upgrade applying a function to an inner upgrade. #[derive(Debug, Clone)] -pub(crate) struct Upgrade { +pub struct Upgrade { upgrade: U, fun: F, }