From 0b05b2109358d064b2aab286cb7170e763793e38 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Sun, 23 Aug 2020 16:57:20 +0200 Subject: [PATCH] Add `ProtocolsHandler::InboundOpenInfo`. (#1714) * Add `ProtocolsHandler::InboundOpenInfo`. * Update swarm/src/protocols_handler/multi.rs Co-authored-by: Roman Borschel * Update CHANGELOGs and versions. Co-authored-by: Roman Borschel --- CHANGELOG.md | 7 ++ Cargo.toml | 18 ++-- protocols/floodsub/CHANGELOG.md | 4 + protocols/floodsub/Cargo.toml | 4 +- protocols/gossipsub/CHANGELOG.md | 4 + protocols/gossipsub/Cargo.toml | 4 +- protocols/gossipsub/src/handler.rs | 11 +- protocols/identify/CHANGELOG.md | 4 + protocols/identify/Cargo.toml | 4 +- protocols/identify/src/handler.rs | 11 +- protocols/kad/CHANGELOG.md | 4 + protocols/kad/Cargo.toml | 4 +- protocols/kad/src/handler.rs | 15 ++- protocols/mdns/CHANGELOG.md | 4 + protocols/mdns/Cargo.toml | 4 +- protocols/ping/CHANGELOG.md | 4 + protocols/ping/Cargo.toml | 4 +- protocols/ping/src/handler.rs | 12 +-- protocols/request-response/CHANGELOG.md | 4 + protocols/request-response/Cargo.toml | 4 +- protocols/request-response/src/handler.rs | 12 ++- swarm/CHANGELOG.md | 11 ++ swarm/Cargo.toml | 3 +- swarm/src/protocols_handler.rs | 69 ++++++++----- swarm/src/protocols_handler/dummy.rs | 10 +- swarm/src/protocols_handler/map_in.rs | 15 ++- swarm/src/protocols_handler/map_out.rs | 19 ++-- swarm/src/protocols_handler/multi.rs | 106 +++++++++++++++----- swarm/src/protocols_handler/node_handler.rs | 31 +++--- swarm/src/protocols_handler/one_shot.rs | 26 ++--- swarm/src/protocols_handler/select.rs | 64 ++++++------ swarm/src/toggle.rs | 45 ++++++--- 32 files changed, 337 insertions(+), 204 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf6f25df197..be0f1f99c60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,13 @@ - [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md) - [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md) +# Version 0.25.0 [unreleased] + +- The `ProtocolsHandler` in `libp2p-swarm` has a new associated type + `InboundOpenInfo` ([PR 1714]). + +[PR 1714]: https://github.com/libp2p/rust-libp2p/pull/1714 + # Version 0.24.0 [2020-08-18] - Update `libp2p-core`, `libp2p-gossipsub`, `libp2p-kad`, `libp2p-mdns`, diff --git a/Cargo.toml b/Cargo.toml index 4c9564eaf4a..49c4697ce1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.24.0" +version = "0.25.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -65,18 +65,18 @@ futures = "0.3.1" lazy_static = "1.2" libp2p-core = { version = "0.21.0", path = "core" } libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" } -libp2p-floodsub = { version = "0.21.0", path = "protocols/floodsub", optional = true } -libp2p-gossipsub = { version = "0.21.0", path = "./protocols/gossipsub", optional = true } -libp2p-identify = { version = "0.21.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.22.0", path = "protocols/kad", optional = true } +libp2p-floodsub = { version = "0.22.0", path = "protocols/floodsub", optional = true } +libp2p-gossipsub = { version = "0.22.0", path = "./protocols/gossipsub", optional = true } +libp2p-identify = { version = "0.22.0", path = "protocols/identify", optional = true } +libp2p-kad = { version = "0.23.0", path = "protocols/kad", optional = true } libp2p-mplex = { version = "0.21.0", path = "muxers/mplex", optional = true } libp2p-noise = { version = "0.23.0", path = "protocols/noise", optional = true } -libp2p-ping = { version = "0.21.0", path = "protocols/ping", optional = true } +libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.21.0", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true } -libp2p-request-response = { version = "0.2.0", path = "protocols/request-response", optional = true } +libp2p-request-response = { version = "0.3.0", path = "protocols/request-response", optional = true } libp2p-secio = { version = "0.21.0", path = "protocols/secio", default-features = false, optional = true } -libp2p-swarm = { version = "0.21.0", path = "swarm" } +libp2p-swarm = { version = "0.22.0", path = "swarm" } libp2p-uds = { version = "0.21.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.21.0", path = "transports/wasm-ext", optional = true } libp2p-yamux = { version = "0.21.0", path = "muxers/yamux", optional = true } @@ -90,7 +90,7 @@ wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] libp2p-deflate = { version = "0.21.0", path = "protocols/deflate", optional = true } libp2p-dns = { version = "0.21.0", path = "transports/dns", optional = true } -libp2p-mdns = { version = "0.21.0", path = "protocols/mdns", optional = true } +libp2p-mdns = { version = "0.22.0", path = "protocols/mdns", optional = true } libp2p-tcp = { version = "0.21.0", path = "transports/tcp", optional = true } libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional = true } diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 0d857fb7ab8..ccf096b19c1 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.21.0 [2020-08-18] - Bump `libp2p-core` and `libp2p-swarm` dependency. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 974539c5d26..7e637a767df 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-floodsub" edition = "2018" description = "Floodsub protocol for libp2p" -version = "0.21.0" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ cuckoofilter = "0.3.2" fnv = "1.0" futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } prost = "0.6.1" rand = "0.7" smallvec = "1.0" diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index c6936c699aa..8b356a188bc 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.21.0 [2020-08-18] - Add public API to list topics and peers. [PR 1677](https://github.com/libp2p/rust-libp2p/pull/1677). diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index fe6395bbb17..4b080f11883 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-gossipsub" edition = "2018" description = "Gossipsub protocol for libp2p" -version = "0.21.0" +version = "0.22.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } libp2p-core = { version = "0.21.0", path = "../../core" } bytes = "0.5.4" byteorder = "1.3.2" diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 6daca114545..125b65a8412 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -40,7 +40,7 @@ use std::{ /// Protocol Handler that manages a single long-lived substream with a peer. pub struct GossipsubHandler { /// Upgrade configuration for the gossipsub protocol. - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, /// The single long-lived outbound substream. outbound_substream: Option, @@ -95,7 +95,7 @@ impl GossipsubHandler { protocol_id, max_transmit_size, validation_mode, - )), + ), ()), inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, @@ -112,14 +112,16 @@ impl ProtocolsHandler for GossipsubHandler { type InboundProtocol = ProtocolConfig; type OutboundProtocol = ProtocolConfig; type OutboundOpenInfo = GossipsubRpc; + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } fn inject_fully_negotiated_inbound( &mut self, substream: >::Output, + _info: Self::InboundOpenInfo ) { // new inbound substream. Replace the current one, if it exists. trace!("New inbound substream request"); @@ -184,8 +186,7 @@ impl ProtocolsHandler for GossipsubHandler { self.send_queue.shrink_to_fit(); self.outbound_substream_establishing = true; return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: self.listen_protocol.clone(), - info: message, + protocol: self.listen_protocol.clone().map_info(|()| message) }); } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 9c87a614c7c..9c38322fcb6 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.21.0 [2020-08-18] - Bump `libp2p-core` and `libp2p-swarm` dependencies. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index d6b7975140c..187f42b45ff 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-identify" edition = "2018" description = "Nodes identifcation protocol for libp2p" -version = "0.21.0" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } log = "0.4.1" prost = "0.6.1" smallvec = "1.0" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0a9b341d139..0dde71ff99b 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -93,14 +93,16 @@ impl ProtocolsHandler for IdentifyHandler { type InboundProtocol = IdentifyProtocolConfig; type OutboundProtocol = IdentifyProtocolConfig; type OutboundOpenInfo = (); + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(self.config.clone()) + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(self.config.clone(), ()) } fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output + protocol: >::Output, + _info: Self::InboundOpenInfo ) { self.events.push(IdentifyHandlerEvent::Identify(protocol)) } @@ -152,8 +154,7 @@ impl ProtocolsHandler for IdentifyHandler { Poll::Ready(Ok(())) => { self.next_id.reset(DELAY_TO_NEXT_ID); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.config.clone()), - info: (), + protocol: SubstreamProtocol::new(self.config.clone(), ()) }; Poll::Ready(ev) } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 9504673b972..61dad8fcc4f 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.23.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.22.1 [2020-08-19] - Explicitly convert from u8 to usize in `BucketIndex::range` to prevent type diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index eb60d59089f..f91e39dc65d 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-kad" edition = "2018" description = "Kademlia protocol for libp2p" -version = "0.22.1" +version = "0.23.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -18,7 +18,7 @@ futures_codec = "0.4" futures = "0.3.1" log = "0.4" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } multihash = "0.11.0" prost = "0.6.1" rand = "0.7.2" diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 3d4e6cefb03..e30f1c43b94 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -261,7 +261,6 @@ impl error::Error for KademliaHandlerQueryErr { } impl From> for KademliaHandlerQueryErr { - #[inline] fn from(err: ProtocolsHandlerUpgrErr) -> Self { KademliaHandlerQueryErr::Upgrade(err) } @@ -409,13 +408,13 @@ where type OutboundProtocol = KademliaProtocolConfig; // Message of the request to send to the remote, and user data if we expect an answer. type OutboundOpenInfo = (KadRequestMsg, Option); + type InboundOpenInfo = (); - #[inline] - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { if self.config.allow_listening { - SubstreamProtocol::new(self.config.protocol_config.clone()).map_upgrade(upgrade::EitherUpgrade::A) + SubstreamProtocol::new(self.config.protocol_config.clone(), ()).map_upgrade(upgrade::EitherUpgrade::A) } else { - SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)) + SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) } } @@ -431,6 +430,7 @@ where fn inject_fully_negotiated_inbound( &mut self, protocol: >::Output, + (): Self::InboundOpenInfo ) { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. @@ -591,7 +591,6 @@ where } } - #[inline] fn inject_dial_upgrade_error( &mut self, (_, user_data): Self::OutboundOpenInfo, @@ -605,7 +604,6 @@ where } } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -696,8 +694,7 @@ fn advance_substream( match state { SubstreamState::OutPendingOpen(msg, user_data) => { let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(upgrade), - info: (msg, user_data), + protocol: SubstreamProtocol::new(upgrade, (msg, user_data)) }; (None, Some(ev), false) } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 071d80a0610..e1c0524271a 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.21.0 [2020-08-18] - Bump `libp2p-core` and `libp2p-swarm` dependencies. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index f888567343d..24d582d8f54 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-mdns" edition = "2018" -version = "0.21.0" +version = "0.22.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -17,7 +17,7 @@ either = "1.5.3" futures = "0.3.1" lazy_static = "1.2" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } log = "0.4" net2 = "0.2" rand = "0.7" diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 85922bc1eb7..0496ceca656 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.21.0 [2020-08-18] - Refactor the ping protocol for conformity by (re)using diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 669267af39e..4420e401694 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-ping" edition = "2018" description = "Ping protocol for libp2p" -version = "0.21.0" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index db6eba0ffa1..13d03640c14 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -207,12 +207,13 @@ impl ProtocolsHandler for PingHandler { type InboundProtocol = protocol::Ping; type OutboundProtocol = protocol::Ping; type OutboundOpenInfo = (); + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(protocol::Ping) + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(protocol::Ping, ()) } - fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream) { + fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { self.inbound = Some(protocol::recv_ping(stream).boxed()); } @@ -329,11 +330,10 @@ impl ProtocolsHandler for PingHandler { } None => { self.outbound = Some(PingState::OpenStream); - let protocol = SubstreamProtocol::new(protocol::Ping) + let protocol = SubstreamProtocol::new(protocol::Ping, ()) .with_timeout(self.config.timeout); return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info: (), + protocol }) } } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 5bf3fa10571..08da7e43929 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.0 [unreleased] + +- Update `libp2p-swarm`. + # 0.2.0 [2020-08-18] - Fixed connection keep-alive, permitting connections to close due diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 77b12a524ab..79d88314397 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-request-response" edition = "2018" description = "Generic Request/Response Protocols" -version = "0.2.0" +version = "0.3.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] async-trait = "0.1" futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../../core" } -libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-swarm = { version = "0.22.0", path = "../../swarm" } log = "0.4.11" lru = "0.6" rand = "0.7" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 57dcef8184c..392988d322c 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -145,8 +145,9 @@ where type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { // A channel for notifying the handler when the inbound // upgrade received the request. let (rq_send, rq_recv) = oneshot::channel(); @@ -173,12 +174,13 @@ where // `ResponseChannel`. self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); - SubstreamProtocol::new(proto).with_timeout(self.substream_timeout) + SubstreamProtocol::new(proto, ()).with_timeout(self.substream_timeout) } fn inject_fully_negotiated_inbound( &mut self, (): (), + (): () ) { // Nothing to do, as the response has already been sent // as part of the upgrade. @@ -229,6 +231,7 @@ where fn inject_listen_upgrade_error( &mut self, + (): Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr ) { match error { @@ -300,9 +303,8 @@ where let info = request.request_id; return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(request) - .with_timeout(self.substream_timeout), - info, + protocol: SubstreamProtocol::new(request, info) + .with_timeout(self.substream_timeout) }, ) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f1bf0889012..6cd730abb09 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,14 @@ +# 0.22.0 [unreleased] + +- Adds `ProtocolsHandler::InboundOpenInfo` type which mirrors the existing + `OutboundOpenInfo` type. A value of this type is passed as an extra argument + to `ProtocolsHandler::inject_fully_negotiated_inbound` and + `ProtocolsHandler::inject_listen_upgrade_error`. +- `SubstreamProtocol` now has a second type parameter corresponding to + inbound or outbound information, a value of which is part of `SubstreamProtocol` + now. Consequently `ProtocolsHandlerEvent::OutboundSubstreamRequest` no longer + has a separate `info` field. + # 0.21.0 [2020-08-18] - Add missing delegation calls in some `ProtocolsHandler` wrappers. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 72d43c384f0..1f4f3e602a2 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm" edition = "2018" description = "The libp2p swarm" -version = "0.21.0" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +either = "1.6.0" futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../core" } log = "0.4" diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 6c638636fe5..7d2f4d0755f 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -110,6 +110,8 @@ pub trait ProtocolsHandler: Send + 'static { type InboundProtocol: InboundUpgradeSend + Send + 'static; /// The outbound upgrade for the protocol(s) used by the handler. type OutboundProtocol: OutboundUpgradeSend; + /// The type of additional information returned from `listen_protocol`. + type InboundOpenInfo: Send + 'static; /// The type of additional information passed to an `OutboundSubstreamRequest`. type OutboundOpenInfo: Send + 'static; @@ -120,12 +122,13 @@ pub trait ProtocolsHandler: Send + 'static { /// > supported protocols, even if in a specific context a particular one is /// > not supported, (eg. when only allowing one substream at a time for a protocol). /// > This allows a remote to put the list of supported protocols in a cache. - fn listen_protocol(&self) -> SubstreamProtocol; + fn listen_protocol(&self) -> SubstreamProtocol; /// Injects the output of a successful upgrade on a new inbound substream. fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output + protocol: ::Output, + info: Self::InboundOpenInfo ); /// Injects the output of a successful upgrade on a new outbound substream. @@ -156,6 +159,7 @@ pub trait ProtocolsHandler: Send + 'static { /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed. fn inject_listen_upgrade_error( &mut self, + _: Self::InboundOpenInfo, _: ProtocolsHandlerUpgrErr<::Error> ) {} @@ -235,20 +239,22 @@ pub trait ProtocolsHandler: Send + 'static { /// The inbound substream protocol(s) are defined by [`ProtocolsHandler::listen_protocol`] /// and the outbound substream protocol(s) by [`ProtocolsHandlerEvent::OutboundSubstreamRequest`]. #[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct SubstreamProtocol { +pub struct SubstreamProtocol { upgrade: TUpgrade, + info: TInfo, upgrade_protocol: upgrade::Version, timeout: Duration, } -impl SubstreamProtocol { +impl SubstreamProtocol { /// Create a new `SubstreamProtocol` from the given upgrade. /// /// The default timeout for applying the given upgrade on a substream is /// 10 seconds. - pub fn new(upgrade: TUpgrade) -> SubstreamProtocol { + pub fn new(upgrade: TUpgrade, info: TInfo) -> Self { SubstreamProtocol { upgrade, + info, upgrade_protocol: upgrade::Version::V1, timeout: Duration::from_secs(10), } @@ -262,12 +268,26 @@ impl SubstreamProtocol { } /// Maps a function over the protocol upgrade. - pub fn map_upgrade(self, f: F) -> SubstreamProtocol + pub fn map_upgrade(self, f: F) -> SubstreamProtocol where F: FnOnce(TUpgrade) -> U, { SubstreamProtocol { upgrade: f(self.upgrade), + info: self.info, + upgrade_protocol: self.upgrade_protocol, + timeout: self.timeout, + } + } + + /// Maps a function over the protocol info. + pub fn map_info(self, f: F) -> SubstreamProtocol + where + F: FnOnce(TInfo) -> U, + { + SubstreamProtocol { + upgrade: self.upgrade, + info: f(self.info), upgrade_protocol: self.upgrade_protocol, timeout: self.timeout, } @@ -284,20 +304,19 @@ impl SubstreamProtocol { &self.upgrade } + /// Borrows the contained protocol info. + pub fn info(&self) -> &TInfo { + &self.info + } + /// Borrows the timeout for the protocol upgrade. pub fn timeout(&self) -> &Duration { &self.timeout } /// Converts the substream protocol configuration into the contained upgrade. - pub fn into_upgrade(self) -> (upgrade::Version, TUpgrade) { - (self.upgrade_protocol, self.upgrade) - } -} - -impl From for SubstreamProtocol { - fn from(upgrade: TUpgrade) -> SubstreamProtocol { - SubstreamProtocol::new(upgrade) + pub fn into_upgrade(self) -> (upgrade::Version, TUpgrade, TInfo) { + (self.upgrade_protocol, self.upgrade, self.info) } } @@ -307,9 +326,7 @@ pub enum ProtocolsHandlerEvent, - /// User-defined information, passed back when the substream is open. - info: TOutboundOpenInfo, + protocol: SubstreamProtocol }, /// Close the connection for the given reason. @@ -333,10 +350,9 @@ impl F: FnOnce(TOutboundOpenInfo) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info: map(info), + protocol: protocol.map_info(map) } } ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), @@ -354,10 +370,9 @@ impl F: FnOnce(TConnectionUpgrade) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(map), - info, + protocol: protocol.map_upgrade(map) } } ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), @@ -374,8 +389,8 @@ impl F: FnOnce(TCustom) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } } ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), ProtocolsHandlerEvent::Close(val) => ProtocolsHandlerEvent::Close(val), @@ -391,8 +406,8 @@ impl F: FnOnce(TErr) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } } ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), ProtocolsHandlerEvent::Close(val) => ProtocolsHandlerEvent::Close(map(val)), diff --git a/swarm/src/protocols_handler/dummy.rs b/swarm/src/protocols_handler/dummy.rs index 70820c69870..764f95fe2cf 100644 --- a/swarm/src/protocols_handler/dummy.rs +++ b/swarm/src/protocols_handler/dummy.rs @@ -51,14 +51,16 @@ impl ProtocolsHandler for DummyProtocolsHandler { type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade) + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) } fn inject_fully_negotiated_inbound( &mut self, - _: >::Output + _: >::Output, + _: Self::InboundOpenInfo ) { } @@ -75,7 +77,7 @@ impl ProtocolsHandler for DummyProtocolsHandler { fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} - fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn inject_listen_upgrade_error(&mut self, _: Self::InboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive diff --git a/swarm/src/protocols_handler/map_in.rs b/swarm/src/protocols_handler/map_in.rs index c66187cd4a1..7c007db6686 100644 --- a/swarm/src/protocols_handler/map_in.rs +++ b/swarm/src/protocols_handler/map_in.rs @@ -59,17 +59,19 @@ where type Error = TProtoHandler::Error; type InboundProtocol = TProtoHandler::InboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol; + type InboundOpenInfo = TProtoHandler::InboundOpenInfo; type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output + protocol: ::Output, + info: Self::InboundOpenInfo ) { - self.inner.inject_fully_negotiated_inbound(protocol) + self.inner.inject_fully_negotiated_inbound(protocol, info) } fn inject_fully_negotiated_outbound( @@ -94,11 +96,8 @@ where self.inner.inject_dial_upgrade_error(info, error) } - fn inject_listen_upgrade_error( - &mut self, - error: ProtocolsHandlerUpgrErr<::Error> - ) { - self.inner.inject_listen_upgrade_error(error) + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + self.inner.inject_listen_upgrade_error(info, error) } fn connection_keep_alive(&self) -> KeepAlive { diff --git a/swarm/src/protocols_handler/map_out.rs b/swarm/src/protocols_handler/map_out.rs index 557625669e5..292f0223bab 100644 --- a/swarm/src/protocols_handler/map_out.rs +++ b/swarm/src/protocols_handler/map_out.rs @@ -57,17 +57,19 @@ where type Error = TProtoHandler::Error; type InboundProtocol = TProtoHandler::InboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol; + type InboundOpenInfo = TProtoHandler::InboundOpenInfo; type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output + protocol: ::Output, + info: Self::InboundOpenInfo ) { - self.inner.inject_fully_negotiated_inbound(protocol) + self.inner.inject_fully_negotiated_inbound(protocol, info) } fn inject_fully_negotiated_outbound( @@ -90,11 +92,8 @@ where self.inner.inject_dial_upgrade_error(info, error) } - fn inject_listen_upgrade_error( - &mut self, - error: ProtocolsHandlerUpgrErr<::Error> - ) { - self.inner.inject_listen_upgrade_error(error) + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + self.inner.inject_listen_upgrade_error(info, error) } fn connection_keep_alive(&self) -> KeepAlive { @@ -111,8 +110,8 @@ where match ev { ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), ProtocolsHandlerEvent::Close(err) => ProtocolsHandlerEvent::Close(err), - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } } } }) diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index 6177ed2fb96..9d4a4fd6ba8 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -96,13 +96,21 @@ where type Error = ::Error; type InboundProtocol = Upgrade::InboundProtocol>; type OutboundProtocol = ::OutboundProtocol; + type InboundOpenInfo = Info::InboundOpenInfo>; type OutboundOpenInfo = (K, ::OutboundOpenInfo); - fn listen_protocol(&self) -> SubstreamProtocol { - let upgrades = self.handlers.iter() - .map(|(k, h)| (k.clone(), h.listen_protocol().into_upgrade().1)) - .collect(); - SubstreamProtocol::new(Upgrade { upgrades }) + fn listen_protocol(&self) -> SubstreamProtocol { + let (upgrade, info) = self.handlers.iter() + .map(|(k, h)| { + let (_, u, i) = h.listen_protocol().into_upgrade(); + (k.clone(), (u, i)) + }) + .fold((Upgrade::new(), Info::new()), |(mut upg, mut inf), (k, (u, i))| { + upg.upgrades.push((k.clone(), u)); + inf.infos.push((k, i)); + (upg, inf) + }); + SubstreamProtocol::new(upgrade, info) } fn inject_fully_negotiated_outbound ( @@ -119,10 +127,13 @@ where fn inject_fully_negotiated_inbound ( &mut self, - (key, arg): ::Output + (key, arg): ::Output, + mut info: Self::InboundOpenInfo ) { if let Some(h) = self.handlers.get_mut(&key) { - h.inject_fully_negotiated_inbound(arg) + if let Some(i) = info.take(&key) { + h.inject_fully_negotiated_inbound(arg, i) + } } else { log::error!("inject_fully_negotiated_inbound: no handler for key") } @@ -156,47 +167,64 @@ where fn inject_listen_upgrade_error( &mut self, + mut info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error> ) { match error { ProtocolsHandlerUpgrErr::Timer => - for h in self.handlers.values_mut() { - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Timer) + } } ProtocolsHandlerUpgrErr::Timeout => - for h in self.handlers.values_mut() { - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Timeout) + } } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => - for h in self.handlers.values_mut() { - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))) + } } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => match e { ProtocolError::IoError(e) => - for h in self.handlers.values_mut() { - let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into())); - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into())); + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } } ProtocolError::InvalidMessage => - for h in self.handlers.values_mut() { - let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage); + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } } ProtocolError::InvalidProtocol => - for h in self.handlers.values_mut() { - let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol); + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } } ProtocolError::TooManyProtocols => - for h in self.handlers.values_mut() { - let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + for (k, h) in &mut self.handlers { + if let Some(i) = info.take(k) { + let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols); + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e))) + } } } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => if let Some(h) = self.handlers.get_mut(&k) { - h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + if let Some(i) = info.take(&k) { + h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + } } } } @@ -309,12 +337,37 @@ impl ProtocolName for IndexedProtoName { } } +/// The aggregated `InboundOpenInfo`s of supported inbound substream protocols. +#[derive(Clone)] +pub struct Info { + infos: Vec<(K, I)> +} + +impl Info { + fn new() -> Self { + Info { infos: Vec::new() } + } + + pub fn take(&mut self, k: &K) -> Option { + if let Some(p) = self.infos.iter().position(|(key, _)| key == k) { + return Some(self.infos.remove(p).1) + } + None + } +} + /// Inbound and outbound upgrade for all `ProtocolsHandler`s. #[derive(Clone)] pub struct Upgrade { upgrades: Vec<(K, H)> } +impl Upgrade { + fn new() -> Self { + Upgrade { upgrades: Vec::new() } + } +} + impl fmt::Debug for Upgrade where K: fmt::Debug + Eq + Hash, @@ -433,4 +486,3 @@ impl fmt::Display for DuplicateProtonameError { } impl error::Error for DuplicateProtonameError {} - diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index c11aa83a493..5bf0a581ab1 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -95,8 +95,11 @@ where /// The underlying handler. handler: TProtoHandler, /// Futures that upgrade incoming substreams. - negotiating_in: - Vec<(InboundUpgradeApply, SendWrapper>, Delay)>, + negotiating_in: Vec<( + TProtoHandler::InboundOpenInfo, + InboundUpgradeApply, SendWrapper>, + Delay + )>, /// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata /// to pass back once successfully opened. negotiating_out: Vec<( @@ -192,9 +195,10 @@ where SubstreamEndpoint::Listener => { let protocol = self.handler.listen_protocol(); let timeout = protocol.timeout().clone(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(protocol.into_upgrade().1)); + let (_, upgrade, info) = protocol.into_upgrade(); + let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); let timeout = Delay::new(timeout); - self.negotiating_in.push((upgrade, timeout)); + self.negotiating_in.push((info, upgrade, timeout)); } SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => { let pos = match self @@ -231,27 +235,27 @@ where // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `negotiating_in` one by one and add them back if not ready. for n in (0..self.negotiating_in.len()).rev() { - let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); + let (info, mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); match Future::poll(Pin::new(&mut timeout), cx) { Poll::Ready(Ok(_)) => { let err = ProtocolsHandlerUpgrErr::Timeout; - self.handler.inject_listen_upgrade_error(err); + self.handler.inject_listen_upgrade_error(info, err); continue } Poll::Ready(Err(_)) => { let err = ProtocolsHandlerUpgrErr::Timer; - self.handler.inject_listen_upgrade_error(err); + self.handler.inject_listen_upgrade_error(info, err); continue; } Poll::Pending => {}, } match Future::poll(Pin::new(&mut in_progress), cx) { Poll::Ready(Ok(upgrade)) => - self.handler.inject_fully_negotiated_inbound(upgrade), - Poll::Pending => self.negotiating_in.push((in_progress, timeout)), + self.handler.inject_fully_negotiated_inbound(upgrade, info), + Poll::Pending => self.negotiating_in.push((info, in_progress, timeout)), Poll::Ready(Err(err)) => { let err = ProtocolsHandlerUpgrErr::Upgrade(err); - self.handler.inject_listen_upgrade_error(err); + self.handler.inject_listen_upgrade_error(info, err); } } } @@ -308,14 +312,11 @@ where Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))); } - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info, - }) => { + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { let id = self.unique_dial_upgrade_id; let timeout = protocol.timeout().clone(); self.unique_dial_upgrade_id += 1; - let (version, upgrade) = protocol.into_upgrade(); + let (version, upgrade, info) = protocol.into_upgrade(); self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade)))); return Poll::Ready(Ok( ConnectionHandlerEvent::OutboundSubstreamRequest((id, info, timeout)), diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 4f5a0288eca..c83d822782c 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -38,7 +38,7 @@ where TOutbound: OutboundUpgradeSend, { /// The upgrade for inbound substreams. - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. pending_error: Option::Error>>, /// Queue of events to produce in `poll()`. @@ -62,7 +62,7 @@ where { /// Creates a `OneShotHandler`. pub fn new( - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, config: OneShotHandlerConfig, ) -> Self { OneShotHandler { @@ -86,7 +86,7 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { + pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { &self.listen_protocol } @@ -94,7 +94,7 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { + pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { &mut self.listen_protocol } @@ -113,21 +113,20 @@ where { fn default() -> Self { OneShotHandler::new( - SubstreamProtocol::new(Default::default()), + SubstreamProtocol::new(Default::default(), ()), OneShotHandlerConfig::default() ) } } -impl ProtocolsHandler - for OneShotHandler +impl ProtocolsHandler for OneShotHandler where TInbound: InboundUpgradeSend + Send + 'static, TOutbound: OutboundUpgradeSend, TInbound::Output: Into, TOutbound::Output: Into, TOutbound::Error: error::Error + Send + 'static, - SubstreamProtocol: Clone, + SubstreamProtocol: Clone, TEvent: Send + 'static, { type InEvent = TOutbound; @@ -138,14 +137,16 @@ where type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); + type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } fn inject_fully_negotiated_inbound( &mut self, out: ::Output, + (): Self::InboundOpenInfo ) { // If we're shutting down the connection for inactivity, reset the timeout. if !self.keep_alive.is_yes() { @@ -208,9 +209,8 @@ where let upgrade = self.dial_queue.remove(0); return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(upgrade) - .with_timeout(self.config.outbound_substream_timeout), - info: (), + protocol: SubstreamProtocol::new(upgrade, ()) + .with_timeout(self.config.outbound_substream_timeout) }, ); } @@ -256,7 +256,7 @@ mod tests { #[test] fn do_not_keep_idle_connection_alive() { let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new( - SubstreamProtocol::new(DeniedUpgrade{}), + SubstreamProtocol::new(DeniedUpgrade{}, ()), Default::default(), ); diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index 7561e0349d4..42bc310e3fd 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -105,13 +105,16 @@ where type InboundProtocol = SelectUpgrade::InboundProtocol>, SendWrapper<::InboundProtocol>>; type OutboundProtocol = EitherUpgrade, SendWrapper>; type OutboundOpenInfo = EitherOutput; + type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { let proto1 = self.proto1.listen_protocol(); let proto2 = self.proto2.listen_protocol(); let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone(); - let choice = SelectUpgrade::new(SendWrapper(proto1.into_upgrade().1), SendWrapper(proto2.into_upgrade().1)); - SubstreamProtocol::new(choice).with_timeout(timeout) + let (_, u1, i1) = proto1.into_upgrade(); + let (_, u2, i2) = proto2.into_upgrade(); + let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2)); + SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) } fn inject_fully_negotiated_outbound(&mut self, protocol: ::Output, endpoint: Self::OutboundOpenInfo) { @@ -127,12 +130,12 @@ where } } - fn inject_fully_negotiated_inbound(&mut self, protocol: ::Output) { + fn inject_fully_negotiated_inbound(&mut self, protocol: ::Output, (i1, i2): Self::InboundOpenInfo) { match protocol { EitherOutput::First(protocol) => - self.proto1.inject_fully_negotiated_inbound(protocol), + self.proto1.inject_fully_negotiated_inbound(protocol, i1), EitherOutput::Second(protocol) => - self.proto2.inject_fully_negotiated_inbound(protocol) + self.proto2.inject_fully_negotiated_inbound(protocol, i2) } } @@ -143,9 +146,9 @@ where } } - fn inject_address_change(&mut self, addr: &Multiaddr) { - self.proto1.inject_address_change(addr); - self.proto2.inject_address_change(addr) + fn inject_address_change(&mut self, new_address: &Multiaddr) { + self.proto1.inject_address_change(new_address); + self.proto2.inject_address_change(new_address) } fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { @@ -183,19 +186,19 @@ where } } - fn inject_listen_upgrade_error(&mut self, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_listen_upgrade_error(&mut self, (i1, i2): Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { match error { ProtocolsHandlerUpgrErr::Timer => { - self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer); - self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer); + self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Timer); + self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Timer) } ProtocolsHandlerUpgrErr::Timeout => { - self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout); - self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout); + self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Timeout); + self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Timeout) } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); - self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); + self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); + self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed))); } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => { let (e1, e2); @@ -217,14 +220,14 @@ where e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) } } - self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1))); - self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2))) + self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1))); + self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2))) } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { - self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => { - self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) + self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e))) } } } @@ -234,7 +237,6 @@ where } fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.proto1.poll(cx) { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))); @@ -242,13 +244,11 @@ where Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::A(event))); }, - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info, - }) => { + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))), - info: EitherOutput::First(info), + protocol: protocol + .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + .map_info(EitherOutput::First) }); }, Poll::Pending => () @@ -261,13 +261,11 @@ where Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(event))); }, - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol, - info, - }) => { + Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u))), - info: EitherOutput::Second(info), + protocol: protocol + .map_upgrade(|u| EitherUpgrade::B(SendWrapper(u))) + .map_info(EitherOutput::Second) }); }, Poll::Pending => () diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index c534cab532b..bb9011261ad 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -28,6 +28,7 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr, IntoProtocolsHandler }; +use either::Either; use libp2p_core::{ ConnectedPoint, PeerId, @@ -213,26 +214,35 @@ where type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = TInner::OutboundProtocol; type OutboundOpenInfo = TInner::OutboundOpenInfo; + type InboundOpenInfo = Either; - fn listen_protocol(&self) -> SubstreamProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { if let Some(inner) = self.inner.as_ref() { - inner.listen_protocol().map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + inner.listen_protocol() + .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + .map_info(Either::Left) } else { - SubstreamProtocol::new(EitherUpgrade::B(SendWrapper(DeniedUpgrade))) + SubstreamProtocol::new(EitherUpgrade::B(SendWrapper(DeniedUpgrade)), Either::Right(())) } } fn inject_fully_negotiated_inbound( &mut self, - out: ::Output + out: ::Output, + info: Self::InboundOpenInfo ) { let out = match out { EitherOutput::First(out) => out, EitherOutput::Second(v) => void::unreachable(v), }; - self.inner.as_mut().expect("Can't receive an inbound substream if disabled; QED") - .inject_fully_negotiated_inbound(out) + if let Either::Left(info) = info { + self.inner.as_mut() + .expect("Can't receive an inbound substream if disabled; QED") + .inject_fully_negotiated_inbound(out, info) + } else { + panic!("Unpexpected Either::Right in enabled `inject_fully_negotiated_inbound`.") + } } fn inject_fully_negotiated_outbound( @@ -260,13 +270,22 @@ where .inject_dial_upgrade_error(info, err) } - fn inject_listen_upgrade_error(&mut self, err: ProtocolsHandlerUpgrErr<::Error>) { - if let Some(inner) = self.inner.as_mut() { - let err = err.map_upgrade_err(|e| e.map_err(|e| match e { - EitherError::A(e) => e, - EitherError::B(v) => void::unreachable(v) - })); - inner.inject_listen_upgrade_error(err) + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, err: ProtocolsHandlerUpgrErr<::Error>) { + let err = match err { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, + ProtocolsHandlerUpgrErr::Upgrade(err) => + ProtocolsHandlerUpgrErr::Upgrade(err.map_err(|err| match err { + EitherError::A(e) => e, + EitherError::B(v) => void::unreachable(v) + })) + }; + if let Either::Left(info) = info { + self.inner.as_mut() + .expect("Can't receive an inbound substream if disabled; QED") + .inject_listen_upgrade_error(info, err) + } else { + panic!("Unexpected Either::Right on enabled `inject_listen_upgrade_error`.") } }