diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index f442d68bdb2..09c06900d34 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -3,6 +3,9 @@ - Make RSA keypair support optional. To enable RSA support, `rsa` feature should be enabled. See [PR 2860]. +- Add `ReadyUpgrade`. See [PR 2855]. + +[PR 2855]: https://github.com/libp2p/rust-libp2p/pull/2855 [PR 2860]: https://github.com/libp2p/rust-libp2p/pull/2860/ # 0.35.1 diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 34a27cdf77a..de9ef765e16 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -65,6 +65,7 @@ mod from_fn; mod map; mod optional; mod pending; +mod ready; mod select; mod transfer; @@ -79,6 +80,7 @@ pub use self::{ map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr}, optional::OptionalUpgrade, pending::PendingUpgrade, + ready::ReadyUpgrade, select::SelectUpgrade, transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint}, }; diff --git a/core/src/upgrade/ready.rs b/core/src/upgrade/ready.rs new file mode 100644 index 00000000000..16a9b2867f4 --- /dev/null +++ b/core/src/upgrade/ready.rs @@ -0,0 +1,75 @@ +// Copyright 2022 Protocol Labs. +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use futures::future; +use std::iter; +use void::Void; + +/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that directly yields the substream. +#[derive(Debug, Copy, Clone)] +pub struct ReadyUpgrade

{ + protocol_name: P, +} + +impl

ReadyUpgrade

{ + pub fn new(protocol_name: P) -> Self { + Self { protocol_name } + } +} + +impl

UpgradeInfo for ReadyUpgrade

+where + P: ProtocolName + Clone, +{ + type Info = P; + type InfoIter = iter::Once

; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_name.clone()) + } +} + +impl InboundUpgrade for ReadyUpgrade

+where + P: ProtocolName + Clone, +{ + type Output = C; + type Error = Void; + type Future = future::Ready>; + + fn upgrade_inbound(self, stream: C, _: Self::Info) -> Self::Future { + future::ready(Ok(stream)) + } +} + +impl OutboundUpgrade for ReadyUpgrade

+where + P: ProtocolName + Clone, +{ + type Output = C; + type Error = Void; + type Future = future::Ready>; + + fn upgrade_outbound(self, stream: C, _: Self::Info) -> Self::Future { + future::ready(Ok(stream)) + } +} diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 850f4ebc05f..f0e71fb070e 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -18,10 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol; +use crate::{protocol, PROTOCOL_NAME}; use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; +use libp2p_core::upgrade::ReadyUpgrade; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, @@ -225,13 +226,13 @@ impl ConnectionHandler for Handler { type InEvent = Void; type OutEvent = crate::Result; type Error = Failure; - type InboundProtocol = protocol::Ping; - type OutboundProtocol = protocol::Ping; + type InboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundProtocol = ReadyUpgrade<&'static [u8]>; type OutboundOpenInfo = (); type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(protocol::Ping, ()) + fn listen_protocol(&self) -> SubstreamProtocol, ()> { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) } fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { @@ -274,7 +275,8 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll, (), crate::Result, Self::Error>> + { match self.state { State::Inactive { reported: true } => { return Poll::Pending; // nothing to do on this connection @@ -366,7 +368,7 @@ impl ConnectionHandler for Handler { } None => { self.outbound = Some(PingState::OpenStream); - let protocol = SubstreamProtocol::new(protocol::Ping, ()) + let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) .with_timeout(self.config.timeout); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol, diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 659040e2d7f..3c44adcd0b4 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -20,13 +20,10 @@ use futures::prelude::*; use instant::Instant; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; -use std::{io, iter, time::Duration}; -use void::Void; +use std::{io, time::Duration}; -pub const PROTOCOL_NAME: &[u8; 16] = b"/ipfs/ping/1.0.0"; +pub const PROTOCOL_NAME: &[u8] = b"/ipfs/ping/1.0.0"; /// The `Ping` protocol upgrade. /// @@ -52,35 +49,6 @@ pub struct Ping; const PING_SIZE: usize = 32; -impl UpgradeInfo for Ping { - type Info = &'static [u8]; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(PROTOCOL_NAME) - } -} - -impl InboundUpgrade for Ping { - type Output = NegotiatedSubstream; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_inbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - future::ok(stream) - } -} - -impl OutboundUpgrade for Ping { - type Output = NegotiatedSubstream; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_outbound(self, stream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - future::ok(stream) - } -} - /// Sends a ping and waits for the pong. pub async fn send_ping(mut stream: S) -> io::Result<(S, Duration)> where