diff --git a/CHANGELOG.md b/CHANGELOG.md index 6751b54849b..059826ac46b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,10 @@ # `libp2p` facade crate +## Version 0.37.0 [unreleased] + +- Update `libp2p-identify`. + ## Version 0.36.0 [2021-03-17] - Consolidate top-level utility functions for constructing development diff --git a/Cargo.toml b/Cargo.toml index 656bbebdf04..e783796fd3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -67,7 +67,7 @@ lazy_static = "1.2" libp2p-core = { version = "0.28.0", path = "core", default-features = false } libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true } libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true } -libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true } +libp2p-identify = { version = "0.29.0", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true } libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true } libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true } diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index f53419d267e..01e350bc160 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -38,7 +38,7 @@ use libp2p::{ either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version, }, gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity}, - identify::{Identify, IdentifyEvent}, + identify::{Identify, IdentifyConfig, IdentifyEvent}, identity, multiaddr::Protocol, noise, @@ -245,11 +245,10 @@ fn main() -> Result<(), Box> { gossipsub_config, ) .expect("Valid configuration"), - identify: Identify::new( + identify: Identify::new(IdentifyConfig::new( "/ipfs/0.1.0".into(), - "rust-ipfs-example".into(), local_key.public(), - ), + )), ping: Ping::new(PingConfig::new()), }; diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index f7d9f08f53e..a3eee56830f 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.29.0 [unreleased] + +- Implement the `/ipfs/id/push/1.0.0` protocol. + cf. https://github.com/libp2p/specs/tree/master/identify#identifypush + [PR 1999](https://github.com/libp2p/rust-libp2p/pull/1999) + # 0.28.0 [2021-03-17] - Update `libp2p-swarm`. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 79c466e9632..8893c964485 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-identify" edition = "2018" description = "Nodes identifcation protocol for libp2p" -version = "0.28.0" +version = "0.29.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -20,6 +20,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" +env_logger = "0.8" libp2p-mplex = { path = "../../muxers/mplex" } libp2p-noise = { path = "../../transports/noise" } libp2p-tcp = { path = "../../transports/tcp" } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0dde71ff99b..9fd397c0fbc 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,12 +18,25 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream}; +use crate::protocol::{ + IdentifyProtocol, + IdentifyPushProtocol, + IdentifyInfo, + InboundPush, + OutboundPush, + ReplySubstream +}; use futures::prelude::*; +use libp2p_core::either::{ + EitherError, + EitherOutput, +}; use libp2p_core::upgrade::{ + EitherUpgrade, InboundUpgrade, OutboundUpgrade, - ReadOneError + SelectUpgrade, + UpgradeError, }; use libp2p_swarm::{ NegotiatedSubstream, @@ -34,89 +47,119 @@ use libp2p_swarm::{ ProtocolsHandlerUpgrErr }; use smallvec::SmallVec; -use std::{pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Delay; -/// Delay between the moment we connect and the first time we identify. -const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500); -/// After an identification succeeded, wait this long before the next time. -const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60); -/// After we failed to identify the remote, try again after the given delay. -const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60); - /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects /// at least one identification request to be answered by the remote before /// permitting the underlying connection to be closed. pub struct IdentifyHandler { - /// Configuration for the protocol. - config: IdentifyProtocolConfig, - /// Pending events to yield. - events: SmallVec<[IdentifyHandlerEvent; 4]>, + events: SmallVec<[ProtocolsHandlerEvent< + EitherUpgrade>, + (), + IdentifyHandlerEvent, + io::Error, + >; 4]>, /// Future that fires when we need to identify the node again. next_id: Delay, /// Whether the handler should keep the connection alive. keep_alive: KeepAlive, + + /// The interval of `next_id`, i.e. the recurrent delay. + interval: Duration, } /// Event produced by the `IdentifyHandler`. #[derive(Debug)] pub enum IdentifyHandlerEvent { - /// We obtained identification information from the remote - Identified(RemoteInfo), + /// We obtained identification information from the remote. + Identified(IdentifyInfo), /// We received a request for identification. Identify(ReplySubstream), /// Failed to identify the remote. - IdentificationError(ProtocolsHandlerUpgrErr), + IdentificationError(ProtocolsHandlerUpgrErr), } +/// Identifying information of the local node that is pushed to a remote. +#[derive(Debug)] +pub struct IdentifyPush(pub IdentifyInfo); + impl IdentifyHandler { /// Creates a new `IdentifyHandler`. - pub fn new() -> Self { + pub fn new(initial_delay: Duration, interval: Duration) -> Self { IdentifyHandler { - config: IdentifyProtocolConfig, events: SmallVec::new(), - next_id: Delay::new(DELAY_TO_FIRST_ID), + next_id: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, + interval, } } } impl ProtocolsHandler for IdentifyHandler { - type InEvent = (); + type InEvent = IdentifyPush; type OutEvent = IdentifyHandlerEvent; - type Error = ReadOneError; - type InboundProtocol = IdentifyProtocolConfig; - type OutboundProtocol = IdentifyProtocolConfig; + type Error = io::Error; + type InboundProtocol = SelectUpgrade>; + type OutboundProtocol = EitherUpgrade>; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(self.config.clone(), ()) + SubstreamProtocol::new( + SelectUpgrade::new( + IdentifyProtocol, + IdentifyPushProtocol::inbound(), + ), ()) } fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output, - _info: Self::InboundOpenInfo + output: >::Output, + _: Self::InboundOpenInfo ) { - self.events.push(IdentifyHandlerEvent::Identify(protocol)) + match output { + EitherOutput::First(substream) => { + self.events.push( + ProtocolsHandlerEvent::Custom( + IdentifyHandlerEvent::Identify(substream))) + } + EitherOutput::Second(info) => { + self.events.push( + ProtocolsHandlerEvent::Custom( + IdentifyHandlerEvent::Identified(info))) + } + } } fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, - _info: Self::OutboundOpenInfo, + output: >::Output, + _: Self::OutboundOpenInfo, ) { - self.events.push(IdentifyHandlerEvent::Identified(protocol)); - self.keep_alive = KeepAlive::No; + match output { + EitherOutput::First(remote_info) => { + self.events.push( + ProtocolsHandlerEvent::Custom( + IdentifyHandlerEvent::Identified(remote_info))); + self.keep_alive = KeepAlive::No; + } + EitherOutput::Second(()) => {} + } } - fn inject_event(&mut self, _: Self::InEvent) {} + fn inject_event(&mut self, IdentifyPush(push): Self::InEvent) { + self.events.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + EitherUpgrade::B( + IdentifyPushProtocol::outbound(push)), ()) + }); + } fn inject_dial_upgrade_error( &mut self, @@ -125,9 +168,16 @@ impl ProtocolsHandler for IdentifyHandler { >::Error > ) { - self.events.push(IdentifyHandlerEvent::IdentificationError(err)); + let err = err.map_upgrade_err(|e| match e { + UpgradeError::Select(e) => UpgradeError::Select(e), + UpgradeError::Apply(EitherError::A(ioe)) => UpgradeError::Apply(ioe), + UpgradeError::Apply(EitherError::B(ioe)) => UpgradeError::Apply(ioe), + }); + self.events.push( + ProtocolsHandlerEvent::Custom( + IdentifyHandlerEvent::IdentificationError(err))); self.keep_alive = KeepAlive::No; - self.next_id.reset(TRY_AGAIN_ON_ERR); + self.next_id.reset(self.interval); } fn connection_keep_alive(&self) -> KeepAlive { @@ -143,18 +193,18 @@ impl ProtocolsHandler for IdentifyHandler { >, > { if !self.events.is_empty() { - return Poll::Ready(ProtocolsHandlerEvent::Custom( + return Poll::Ready( self.events.remove(0), - )); + ); } // Poll the future that fires when we need to identify the node again. match Future::poll(Pin::new(&mut self.next_id), cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(())) => { - self.next_id.reset(DELAY_TO_NEXT_ID); + self.next_id.reset(self.interval); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.config.clone(), ()) + protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()) }; Poll::Ready(ev) } diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 81c114f12ea..111bd03ff70 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{IdentifyHandler, IdentifyHandlerEvent}; +use crate::handler::{IdentifyHandler, IdentifyHandlerEvent, IdentifyPush}; use crate::protocol::{IdentifyInfo, ReplySubstream}; use futures::prelude::*; use libp2p_core::{ @@ -27,23 +27,26 @@ use libp2p_core::{ PeerId, PublicKey, connection::ConnectionId, - upgrade::{ReadOneError, UpgradeError} + upgrade::UpgradeError }; use libp2p_swarm::{ AddressScore, + DialPeerCondition, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr }; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashSet, HashMap, VecDeque}, io, pin::Pin, task::Context, - task::Poll + task::Poll, + time::Duration, }; /// Network behaviour that automatically identifies nodes periodically, returns information @@ -53,18 +56,16 @@ use std::{ /// are reported via [`NetworkBehaviourAction::ReportObservedAddr`] with a /// [score](AddressScore) of `1`. pub struct Identify { - /// Protocol version to send back to remotes. - protocol_version: String, - /// Agent version to send back to remotes. - agent_version: String, - /// The public key of the local node. To report on the wire. - local_public_key: PublicKey, + config: IdentifyConfig, /// For each peer we're connected to, the observed address to send back to it. - observed_addresses: HashMap>, + connected: HashMap>, /// Pending replies to send. pending_replies: VecDeque, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, + /// Peers to which an active push with current information about + /// the local peer should be sent. + pending_push: HashSet, } /// A pending reply to an inbound identification request. @@ -82,16 +83,92 @@ enum Reply { } } +/// Configuration for the [`Identify`] [`NetworkBehaviour`]. +#[non_exhaustive] +pub struct IdentifyConfig { + /// Application-specific version of the protocol family used by the peer, + /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`. + pub protocol_version: String, + /// The public key of the local node. To report on the wire. + pub local_public_key: PublicKey, + /// Name and version of the local peer implementation, similar to the + /// `User-Agent` header in the HTTP protocol. + /// + /// Defaults to `rust-libp2p/`. + pub agent_version: String, + /// The initial delay before the first identification request + /// is sent to a remote on a newly established connection. + /// + /// Defaults to 500ms. + pub initial_delay: Duration, + /// The interval at which identification requests are sent to + /// the remote on established connections after the first request, + /// i.e. the delay between identification requests. + /// + /// Defaults to 5 minutes. + pub interval: Duration, +} + +impl IdentifyConfig { + /// Creates a new configuration for the `Identify` behaviour that + /// advertises the given protocol version and public key. + pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self { + IdentifyConfig { + protocol_version, + agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")), + local_public_key, + initial_delay: Duration::from_millis(500), + interval: Duration::from_secs(5 * 60), + } + } + + /// Configures the agent version sent to peers. + pub fn with_agent_version(mut self, v: String) -> Self { + self.agent_version = v; + self + } + + /// Configures the initial delay before the first identification + /// request is sent on a newly established connection to a peer. + pub fn with_initial_delay(mut self, d: Duration) -> Self { + self.initial_delay = d; + self + } + + /// Configures the interval at which identification requests are + /// sent to peers after the initial request. + pub fn with_interval(mut self, d: Duration) -> Self { + self.interval = d; + self + } +} + impl Identify { /// Creates a new `Identify` network behaviour. - pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self { + pub fn new(config: IdentifyConfig) -> Self { Identify { - protocol_version, - agent_version, - local_public_key, - observed_addresses: HashMap::new(), + config, + connected: HashMap::new(), pending_replies: VecDeque::new(), events: VecDeque::new(), + pending_push: HashSet::new(), + } + } + + /// Initiates an active push of the local peer information to the given peers. + pub fn push(&mut self, peers: I) + where + I: IntoIterator + { + for p in peers { + if self.pending_push.insert(p) { + if !self.connected.contains_key(&p) { + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id: p, + condition: DialPeerCondition::Disconnected + }); + } + } } } } @@ -101,7 +178,7 @@ impl NetworkBehaviour for Identify { type OutEvent = IdentifyEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - IdentifyHandler::new() + IdentifyHandler::new(self.config.initial_delay, self.config.interval) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { @@ -117,17 +194,24 @@ impl NetworkBehaviour for Identify { ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), }; - self.observed_addresses.entry(*peer_id).or_default().insert(*conn, addr); + self.connected.entry(*peer_id).or_default().insert(*conn, addr.clone()); } fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { - if let Some(addrs) = self.observed_addresses.get_mut(peer_id) { + if let Some(addrs) = self.connected.get_mut(peer_id) { addrs.remove(conn); } } + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + if !self.connected.contains_key(peer_id) { + self.pending_push.remove(peer_id); + } + } + fn inject_disconnected(&mut self, peer_id: &PeerId) { - self.observed_addresses.remove(peer_id); + self.connected.remove(peer_id); + self.pending_push.remove(peer_id); } fn inject_event( @@ -137,22 +221,22 @@ impl NetworkBehaviour for Identify { event: ::OutEvent, ) { match event { - IdentifyHandlerEvent::Identified(remote) => { + IdentifyHandlerEvent::Identified(info) => { + let observed = info.observed_addr.clone(); self.events.push_back( NetworkBehaviourAction::GenerateEvent( IdentifyEvent::Received { peer_id, - info: remote.info, - observed_addr: remote.observed_addr.clone(), + info, })); self.events.push_back( NetworkBehaviourAction::ReportObservedAddr { - address: remote.observed_addr, + address: observed, score: AddressScore::Finite(1), }); } IdentifyHandlerEvent::Identify(sender) => { - let observed = self.observed_addresses.get(&peer_id) + let observed = self.connected.get(&peer_id) .and_then(|addrs| addrs.get(&connection)) .expect("`inject_event` is only called with an established connection \ and `inject_connection_established` ensures there is an entry; qed"); @@ -185,17 +269,42 @@ impl NetworkBehaviour for Identify { return Poll::Ready(event); } - if let Some(r) = self.pending_replies.pop_front() { - // The protocol names can be bytes, but the identify protocol except UTF-8 strings. - // There's not much we can do to solve this conflict except strip non-UTF-8 characters. - let protocols: Vec<_> = params - .supported_protocols() - .map(|p| String::from_utf8_lossy(&p).to_string()) - .collect(); + // Check for a pending active push to perform. + let peer_push = self.pending_push.iter().find_map(|peer| { + self.connected.get(peer).map(|conns| { + let observed_addr = conns + .values() + .next() + .expect("connected peer has a connection") + .clone(); + + let listen_addrs = listen_addrs(params); + let protocols = supported_protocols(params); + + let info = IdentifyInfo { + public_key: self.config.local_public_key.clone(), + protocol_version: self.config.protocol_version.clone(), + agent_version: self.config.agent_version.clone(), + listen_addrs, + protocols, + observed_addr, + }; - let mut listen_addrs: Vec<_> = params.external_addresses().map(|r| r.addr).collect(); - listen_addrs.extend(params.listened_addresses()); + (*peer, IdentifyPush(info)) + }) + }); + + if let Some((peer_id, push)) = peer_push { + self.pending_push.remove(&peer_id); + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + event: push, + handler: NotifyHandler::Any, + }) + } + // Check for pending replies to send. + if let Some(r) = self.pending_replies.pop_front() { let mut sending = 0; let to_send = self.pending_replies.len() + 1; let mut reply = Some(r); @@ -203,13 +312,14 @@ impl NetworkBehaviour for Identify { match reply { Some(Reply::Queued { peer, io, observed }) => { let info = IdentifyInfo { - public_key: self.local_public_key.clone(), - protocol_version: self.protocol_version.clone(), - agent_version: self.agent_version.clone(), - listen_addrs: listen_addrs.clone(), - protocols: protocols.clone(), + listen_addrs: listen_addrs(params), + protocols: supported_protocols(params), + public_key: self.config.local_public_key.clone(), + protocol_version: self.config.protocol_version.clone(), + agent_version: self.config.agent_version.clone(), + observed_addr: observed, }; - let io = Box::pin(io.send(info, &observed)); + let io = Box::pin(io.send(info)); reply = Some(Reply::Sending { peer, io }); } Some(Reply::Sending { peer, mut io }) => { @@ -255,8 +365,6 @@ pub enum IdentifyEvent { peer_id: PeerId, /// The information provided by the peer. info: IdentifyInfo, - /// The address observed by the peer for the local node. - observed_addr: Multiaddr, }, /// Identifying information of the local node has been sent to a peer. Sent { @@ -268,14 +376,29 @@ pub enum IdentifyEvent { /// The peer with whom the error originated. peer_id: PeerId, /// The error that occurred. - error: ProtocolsHandlerUpgrErr, + error: ProtocolsHandlerUpgrErr, }, } +fn supported_protocols(params: &impl PollParameters) -> Vec { + // The protocol names can be bytes, but the identify protocol except UTF-8 strings. + // There's not much we can do to solve this conflict except strip non-UTF-8 characters. + params + .supported_protocols() + .map(|p| String::from_utf8_lossy(&p).to_string()) + .collect() +} + +fn listen_addrs(params: &impl PollParameters) -> Vec { + let mut listen_addrs: Vec<_> = params.external_addresses().map(|r| r.addr).collect(); + listen_addrs.extend(params.listened_addresses()); + listen_addrs +} + #[cfg(test)] mod tests { - use crate::{Identify, IdentifyEvent}; - use futures::{prelude::*, pin_mut}; + use super::*; + use futures::pin_mut; use libp2p_core::{ identity, PeerId, @@ -303,17 +426,21 @@ mod tests { } #[test] - fn periodic_id_works() { + fn periodic_identify() { let (mut swarm1, pubkey1) = { let (pubkey, transport) = transport(); - let protocol = Identify::new("a".to_string(), "b".to_string(), pubkey.clone()); + let protocol = Identify::new( + IdentifyConfig::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string())); let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id()); (swarm, pubkey) }; let (mut swarm2, pubkey2) = { let (pubkey, transport) = transport(); - let protocol = Identify::new("c".to_string(), "d".to_string(), pubkey.clone()); + let protocol = Identify::new( + IdentifyConfig::new("c".to_string(), pubkey.clone()) + .with_agent_version("d".to_string())); let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id()); (swarm, pubkey) }; @@ -365,4 +492,76 @@ mod tests { } }) } + + #[test] + fn identify_push() { + let _ = env_logger::try_init(); + + let (mut swarm1, pubkey1) = { + let (pubkey, transport) = transport(); + let protocol = Identify::new( + IdentifyConfig::new("a".to_string(), pubkey.clone()) + // Delay identification requests so we can test the push protocol. + .with_initial_delay(Duration::from_secs(u32::MAX as u64))); + let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id()); + (swarm, pubkey) + }; + + let (mut swarm2, pubkey2) = { + let (pubkey, transport) = transport(); + let protocol = Identify::new( + IdentifyConfig::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string()) + // Delay identification requests so we can test the push protocol. + .with_initial_delay(Duration::from_secs(u32::MAX as u64))); + let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id()); + (swarm, pubkey) + }; + + Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + + let listen_addr = async_std::task::block_on(async { + loop { + let swarm1_fut = swarm1.next_event(); + pin_mut!(swarm1_fut); + match swarm1_fut.await { + SwarmEvent::NewListenAddr(addr) => return addr, + _ => {} + } + } + }); + + Swarm::dial_addr(&mut swarm2, listen_addr).unwrap(); + + async_std::task::block_on(async move { + loop { + let swarm1_fut = swarm1.next_event(); + let swarm2_fut = swarm2.next_event(); + + { + pin_mut!(swarm1_fut); + pin_mut!(swarm2_fut); + match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 { + future::Either::Left(SwarmEvent::Behaviour( + IdentifyEvent::Received { info, .. } + )) => { + assert_eq!(info.public_key, pubkey2); + assert_eq!(info.protocol_version, "a"); + assert_eq!(info.agent_version, "b"); + assert!(!info.protocols.is_empty()); + assert!(info.listen_addrs.is_empty()); + return; + } + future::Either::Right(SwarmEvent::ConnectionEstablished { .. }) => { + // Once a connection is established, we can initiate an + // active push below. + } + _ => { continue } + } + } + + swarm2.push(std::iter::once(pubkey1.clone().into_peer_id())); + } + }) + } } diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 3ed9013ade9..48c0c651428 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -35,9 +35,9 @@ //! [Identify]: https://github.com/libp2p/specs/tree/master/identify //! [`Identify`]: self::Identify //! [`IdentifyEvent`]: self::IdentifyEvent -//! [`IdentifyInfo`]: self::IdentifyEvent +//! [`IdentifyInfo`]: self::IdentifyInfo -pub use self::identify::{Identify, IdentifyEvent}; +pub use self::identify::{Identify, IdentifyConfig, IdentifyEvent}; pub use self::protocol::IdentifyInfo; mod handler; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index dc4a7a6e26f..2c73a1d627f 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -30,16 +30,44 @@ use prost::Message; use std::convert::TryFrom; use std::{fmt, io, iter, pin::Pin}; -/// Configuration for an upgrade to the `Identify` protocol. +/// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] -pub struct IdentifyProtocolConfig; +pub struct IdentifyProtocol; +/// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. #[derive(Debug, Clone)] -#[non_exhaustive] -pub struct RemoteInfo { - /// Information about the remote. - pub info: IdentifyInfo, - /// Address the remote sees for us. +pub struct IdentifyPushProtocol(T); +pub struct InboundPush(); +pub struct OutboundPush(IdentifyInfo); + +impl IdentifyPushProtocol { + pub fn inbound() -> Self { + IdentifyPushProtocol(InboundPush()) + } +} + +impl IdentifyPushProtocol { + pub fn outbound(info: IdentifyInfo) -> Self { + IdentifyPushProtocol(OutboundPush(info)) + } +} + +/// Information of a peer sent in protocol messages. +#[derive(Debug, Clone)] +pub struct IdentifyInfo { + /// The public key of the local peer. + pub public_key: PublicKey, + /// Application-specific version of the protocol family used by the peer, + /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`. + pub protocol_version: String, + /// Name and version of the peer, similar to the `User-Agent` header in + /// the HTTP protocol. + pub agent_version: String, + /// The addresses that the peer is listening on. + pub listen_addrs: Vec, + /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. + pub protocols: Vec, + /// Address observed by or for the remote. pub observed_addr: Multiaddr, } @@ -60,56 +88,14 @@ where { /// Sends back the requested information on the substream. /// - /// Consumes the substream, returning a `ReplyFuture` that resolves + /// Consumes the substream, returning a future that resolves /// when the reply has been sent on the underlying connection. - pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr) - -> impl Future> - { - debug!("Sending identify info to client"); - trace!("Sending: {:?}", info); - - let listen_addrs = info.listen_addrs - .into_iter() - .map(|addr| addr.to_vec()) - .collect(); - - let pubkey_bytes = info.public_key.into_protobuf_encoding(); - - let message = structs_proto::Identify { - agent_version: Some(info.agent_version), - protocol_version: Some(info.protocol_version), - public_key: Some(pubkey_bytes), - listen_addrs, - observed_addr: Some(observed_addr.to_vec()), - protocols: info.protocols - }; - - async move { - let mut bytes = Vec::with_capacity(message.encoded_len()); - message.encode(&mut bytes).expect("Vec provides capacity as needed"); - upgrade::write_one(&mut self.inner, &bytes).await - } + pub async fn send(self, info: IdentifyInfo) -> io::Result<()> { + send(self.inner, info).await } } -/// Information of a peer sent in `Identify` protocol responses. -#[derive(Debug, Clone)] -pub struct IdentifyInfo { - /// The public key underlying the peer's `PeerId`. - pub public_key: PublicKey, - /// Version of the protocol family used by the peer, e.g. `ipfs/1.0.0` - /// or `polkadot/1.0.0`. - pub protocol_version: String, - /// Name and version of the peer, similar to the `User-Agent` header in - /// the HTTP protocol. - pub agent_version: String, - /// The addresses that the peer is listening on. - pub listen_addrs: Vec, - /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. - pub protocols: Vec, -} - -impl UpgradeInfo for IdentifyProtocolConfig { +impl UpgradeInfo for IdentifyProtocol { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -118,59 +104,119 @@ impl UpgradeInfo for IdentifyProtocolConfig { } } -impl InboundUpgrade for IdentifyProtocolConfig -where - C: AsyncRead + AsyncWrite + Unpin, -{ +impl InboundUpgrade for IdentifyProtocol { type Output = ReplySubstream; type Error = io::Error; type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - trace!("Upgrading inbound connection"); future::ok(ReplySubstream { inner: socket }) } } -impl OutboundUpgrade for IdentifyProtocolConfig +impl OutboundUpgrade for IdentifyProtocol where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = RemoteInfo; - type Error = upgrade::ReadOneError; + type Output = IdentifyInfo; + type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future { - Box::pin(async move { - socket.close().await?; - let msg = upgrade::read_one(&mut socket, 4096).await?; - let (info, observed_addr) = match parse_proto_msg(msg) { - Ok(v) => v, - Err(err) => { - debug!("Failed to parse protobuf message; error = {:?}", err); - return Err(err.into()) - } - }; + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + recv(socket).boxed() + } +} + +impl UpgradeInfo for IdentifyPushProtocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; - trace!("Remote observes us as {:?}", observed_addr); - trace!("Information received: {:?}", info); + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/ipfs/id/push/1.0.0") + } +} - Ok(RemoteInfo { - info, - observed_addr, - }) - }) +impl InboundUpgrade for IdentifyPushProtocol +where + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = IdentifyInfo; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + recv(socket).boxed() + } +} + +impl OutboundUpgrade for IdentifyPushProtocol +where + C: AsyncWrite + Unpin + Send + 'static, +{ + type Output = (); + type Error = io::Error; + type Future = Pin> + Send>>; + + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + send(socket, self.0.0).boxed() } } -// Turns a protobuf message into an `IdentifyInfo` and an observed address. If something bad -// happens, turn it into an `io::Error`. -fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> { +async fn send(mut io: T, info: IdentifyInfo) -> io::Result<()> +where + T: AsyncWrite + Unpin +{ + trace!("Sending: {:?}", info); + + let listen_addrs = info.listen_addrs + .into_iter() + .map(|addr| addr.to_vec()) + .collect(); + + let pubkey_bytes = info.public_key.into_protobuf_encoding(); + + let message = structs_proto::Identify { + agent_version: Some(info.agent_version), + protocol_version: Some(info.protocol_version), + public_key: Some(pubkey_bytes), + listen_addrs, + observed_addr: Some(info.observed_addr.to_vec()), + protocols: info.protocols + }; + + let mut bytes = Vec::with_capacity(message.encoded_len()); + message.encode(&mut bytes).expect("Vec provides capacity as needed"); + upgrade::write_one(&mut io, &bytes).await +} + +async fn recv(mut socket: T) -> io::Result +where + T: AsyncRead + AsyncWrite + Unpin +{ + socket.close().await?; + + let msg = upgrade::read_one(&mut socket, 4096) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .await?; + + let info = match parse_proto_msg(msg) { + Ok(v) => v, + Err(err) => { + debug!("Invalid message: {:?}", err); + return Err(err.into()) + } + }; + + trace!("Received: {:?}", info); + + Ok(info) +} + +/// Turns a protobuf message into an `IdentifyInfo`. +fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result { match structs_proto::Identify::decode(msg.as_ref()) { Ok(msg) => { - // Turn a `Vec` into a `Multiaddr`. If something bad happens, turn it into - // an `io::Error`. - fn bytes_to_multiaddr(bytes: Vec) -> Result { + fn parse_multiaddr(bytes: Vec) -> Result { Multiaddr::try_from(bytes) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) } @@ -178,7 +224,7 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i let listen_addrs = { let mut addrs = Vec::new(); for addr in msg.listen_addrs.into_iter() { - addrs.push(bytes_to_multiaddr(addr)?); + addrs.push(parse_multiaddr(addr)?); } addrs }; @@ -186,16 +232,17 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default()) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?; + let observed_addr = parse_multiaddr(msg.observed_addr.unwrap_or_default())?; let info = IdentifyInfo { public_key, protocol_version: msg.protocol_version.unwrap_or_default(), agent_version: msg.agent_version.unwrap_or_default(), listen_addrs, - protocols: msg.protocols + protocols: msg.protocols, + observed_addr, }; - Ok((info, observed_addr)) + Ok(info) } Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)), @@ -204,7 +251,6 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i #[cfg(test)] mod tests { - use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig}; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::oneshot}; use libp2p_core::{ @@ -212,6 +258,7 @@ mod tests { Transport, upgrade::{self, apply_outbound, apply_inbound} }; + use super::*; #[test] fn correct_transfer() { @@ -236,8 +283,13 @@ mod tests { .expect("listen address"); tx.send(addr).unwrap(); - let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap(); - let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap(); + let socket = listener + .next().await.unwrap().unwrap() + .into_upgrade().unwrap() + .0.await.unwrap(); + + let sender = apply_inbound(socket, IdentifyProtocol).await.unwrap(); + sender.send( IdentifyInfo { public_key: send_pubkey, @@ -248,8 +300,8 @@ mod tests { "/ip6/::1/udp/1000".parse().unwrap(), ], protocols: vec!["proto1".to_string(), "proto2".to_string()], + observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), }, - &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(), ).await.unwrap(); }); @@ -257,9 +309,12 @@ mod tests { let transport = TcpConfig::new(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let RemoteInfo { info, observed_addr, .. } = - apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap(); - assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap()); + let info = apply_outbound( + socket, + IdentifyProtocol, + upgrade::Version::V1 + ).await.unwrap(); + assert_eq!(info.observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap()); assert_eq!(info.public_key, recv_pubkey); assert_eq!(info.protocol_version, "proto_version"); assert_eq!(info.agent_version, "agent_version"); diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index c8c1692ccf0..496bede1770 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -30,7 +30,7 @@ use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::{MemoryTransport, Transport, TransportError}; use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; use libp2p_core::{identity, upgrade, PeerId}; -use libp2p_identify::{Identify, IdentifyEvent, IdentifyInfo}; +use libp2p_identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo}; use libp2p_kad::{GetClosestPeersOk, Kademlia, KademliaEvent, QueryResult}; use libp2p_ping::{Ping, PingConfig, PingEvent}; use libp2p_plaintext::PlainText2Config; @@ -1238,11 +1238,10 @@ fn build_swarm(reachability: Reachability, relay_mode: RelayMode) -> Swarm