Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[identify] Implement /ipfs/id/push/1.0.0 alongside some refactoring. #1999

Merged
merged 9 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p::{
either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version,
},
gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity},
identify::{Identify, IdentifyEvent},
identify::{Identify, IdentifyConfig, IdentifyEvent},
identity,
multiaddr::Protocol,
noise,
Expand Down Expand Up @@ -245,11 +245,10 @@ fn main() -> Result<(), Box<dyn Error>> {
gossipsub_config,
)
.expect("Valid configuration"),
identify: Identify::new(
identify: Identify::new(IdentifyConfig::new(
"/ipfs/0.1.0".into(),
"rust-ipfs-example".into(),
local_key.public(),
),
)),
ping: Ping::new(PingConfig::new()),
};

Expand Down
1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ wasm-timer = "0.2"

[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.8"
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../transports/noise" }
libp2p-tcp = { path = "../../transports/tcp" }
Expand Down
128 changes: 89 additions & 39 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use crate::protocol::{
IdentifyProtocol,
IdentifyPushProtocol,
IdentifyInfo,
InboundPush,
OutboundPush,
ReplySubstream
};
use futures::prelude::*;
use libp2p_core::either::{
EitherError,
EitherOutput,
};
use libp2p_core::upgrade::{
EitherUpgrade,
InboundUpgrade,
OutboundUpgrade,
ReadOneError
SelectUpgrade,
UpgradeError,
};
use libp2p_swarm::{
NegotiatedSubstream,
Expand All @@ -34,89 +47,119 @@ use libp2p_swarm::{
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use std::{pin::Pin, task::Context, task::Poll, time::Duration};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;

/// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
/// After an identification succeeded, wait this long before the next time.
const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
/// After we failed to identify the remote, try again after the given delay.
const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler {
/// Configuration for the protocol.
config: IdentifyProtocolConfig,

/// Pending events to yield.
events: SmallVec<[IdentifyHandlerEvent; 4]>,
events: SmallVec<[ProtocolsHandlerEvent<
EitherUpgrade<IdentifyProtocol, IdentifyPushProtocol<OutboundPush>>,
(),
IdentifyHandlerEvent,
io::Error,
>; 4]>,

/// Future that fires when we need to identify the node again.
next_id: Delay,

/// Whether the handler should keep the connection alive.
keep_alive: KeepAlive,

/// The interval of `next_id`, i.e. the recurrent delay.
interval: Duration,
}

/// Event produced by the `IdentifyHandler`.
#[derive(Debug)]
pub enum IdentifyHandlerEvent {
/// We obtained identification information from the remote
Identified(RemoteInfo),
/// We obtained identification information from the remote.
Identified(IdentifyInfo),
/// We received a request for identification.
Identify(ReplySubstream<NegotiatedSubstream>),
/// Failed to identify the remote.
IdentificationError(ProtocolsHandlerUpgrErr<ReadOneError>),
IdentificationError(ProtocolsHandlerUpgrErr<io::Error>),
}

/// Identifying information of the local node that is pushed to a remote.
#[derive(Debug)]
pub struct IdentifyPush(pub IdentifyInfo);

impl IdentifyHandler {
/// Creates a new `IdentifyHandler`.
pub fn new() -> Self {
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
IdentifyHandler {
config: IdentifyProtocolConfig,
events: SmallVec::new(),
next_id: Delay::new(DELAY_TO_FIRST_ID),
next_id: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
interval,
}
}
}

impl ProtocolsHandler for IdentifyHandler {
type InEvent = ();
type InEvent = IdentifyPush;
type OutEvent = IdentifyHandlerEvent;
type Error = ReadOneError;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = IdentifyProtocolConfig;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<IdentifyProtocol, IdentifyPushProtocol<InboundPush>>;
type OutboundProtocol = EitherUpgrade<IdentifyProtocol, IdentifyPushProtocol<OutboundPush>>;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(self.config.clone(), ())
SubstreamProtocol::new(
SelectUpgrade::new(
IdentifyProtocol,
IdentifyPushProtocol::inbound(),
), ())
}

fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo
output: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo
) {
self.events.push(IdentifyHandlerEvent::Identify(protocol))
match output {
EitherOutput::First(substream) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identify(substream)))
}
EitherOutput::Second(info) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(info)))
}
}
}

fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
output: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
self.events.push(IdentifyHandlerEvent::Identified(protocol));
self.keep_alive = KeepAlive::No;
match output {
EitherOutput::First(remote_info) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(remote_info)));
self.keep_alive = KeepAlive::No;
}
EitherOutput::Second(()) => {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that came to my mind: How about emitting an event along the lines of IdentifyEvent::Pushed once a push succeeded? I would expect this to be useful for people tracking these events in a monitoring system like Prometheus. This event would serve a similar purpose to the RequestSent event in libp2p-request-response:

/// A response to an inbound request has been sent.
///
/// When this event is received, the response has been flushed on
/// the underlying transport connection.
ResponseSent {
/// The peer to whom the response was sent.
peer: PeerId,
/// The ID of the inbound request whose response was sent.
request_id: RequestId,
},

In case you think this is useful @romanb I can prepare a pull request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

}
}

fn inject_event(&mut self, _: Self::InEvent) {}
fn inject_event(&mut self, IdentifyPush(push): Self::InEvent) {
self.events.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(
IdentifyPushProtocol::outbound(push)), ())
});
}

fn inject_dial_upgrade_error(
&mut self,
Expand All @@ -125,9 +168,16 @@ impl ProtocolsHandler for IdentifyHandler {
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error
>
) {
self.events.push(IdentifyHandlerEvent::IdentificationError(err));
let err = err.map_upgrade_err(|e| match e {
UpgradeError::Select(e) => UpgradeError::Select(e),
UpgradeError::Apply(EitherError::A(ioe)) => UpgradeError::Apply(ioe),
UpgradeError::Apply(EitherError::B(ioe)) => UpgradeError::Apply(ioe),
});
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::IdentificationError(err)));
self.keep_alive = KeepAlive::No;
self.next_id.reset(TRY_AGAIN_ON_ERR);
self.next_id.reset(self.interval);
}

fn connection_keep_alive(&self) -> KeepAlive {
Expand All @@ -143,18 +193,18 @@ impl ProtocolsHandler for IdentifyHandler {
>,
> {
if !self.events.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(
self.events.remove(0),
));
);
}

// Poll the future that fires when we need to identify the node again.
match Future::poll(Pin::new(&mut self.next_id), cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
self.next_id.reset(DELAY_TO_NEXT_ID);
self.next_id.reset(self.interval);
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.config.clone(), ())
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ())
};
Poll::Ready(ev)
}
Expand Down
Loading