Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ProtocolsHandler methods in wrappers. #1710

Merged
merged 3 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions swarm/src/protocols_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ pub trait ProtocolsHandler: Send + 'static {
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
fn inject_listen_upgrade_error(
&mut self,
_: ProtocolsHandlerUpgrErr<
<Self::InboundProtocol as InboundUpgradeSend>::Error
>
_: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {}

/// Returns until when the connection should be kept alive.
Expand Down Expand Up @@ -189,7 +187,6 @@ pub trait ProtocolsHandler: Send + 'static {
>;

/// Adds a closure that turns the input event into something else.
#[inline]
fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
where
Self: Sized,
Expand All @@ -199,7 +196,6 @@ pub trait ProtocolsHandler: Send + 'static {
}

/// Adds a closure that turns the output event into something else.
#[inline]
fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
where
Self: Sized,
Expand All @@ -214,7 +210,6 @@ pub trait ProtocolsHandler: Send + 'static {
/// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence,
/// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned
/// > handler.
#[inline]
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
where
Self: Sized,
Expand All @@ -226,7 +221,6 @@ pub trait ProtocolsHandler: Send + 'static {
/// exclusively.
///
/// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`.
#[inline]
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
where
Self: Sized,
Expand Down Expand Up @@ -331,7 +325,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
{
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
/// `TOutboundOpenInfo` to something else.
#[inline]
pub fn map_outbound_open_info<F, I>(
self,
map: F,
Expand All @@ -353,7 +346,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>

/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
/// to something else.
#[inline]
pub fn map_protocol<F, I>(
self,
map: F,
Expand All @@ -374,7 +366,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
}

/// If this is a `Custom` event, maps the content to something else.
#[inline]
pub fn map_custom<F, I>(
self,
map: F,
Expand All @@ -392,7 +383,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
}

/// If this is a `Close` event, maps the content to something else.
#[inline]
pub fn map_close<F, I>(
self,
map: F,
Expand Down Expand Up @@ -421,6 +411,20 @@ pub enum ProtocolsHandlerUpgrErr<TUpgrErr> {
Upgrade(UpgradeError<TUpgrErr>),
}

impl<TUpgrErr> ProtocolsHandlerUpgrErr<TUpgrErr> {
/// Map the inner [`UpgradeError`] type.
pub fn map_upgrade_err<F, E>(self, f: F) -> ProtocolsHandlerUpgrErr<E>
where
F: FnOnce(UpgradeError<TUpgrErr>) -> UpgradeError<E>
{
match self {
ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout,
ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer,
ProtocolsHandlerUpgrErr::Upgrade(e) => ProtocolsHandlerUpgrErr::Upgrade(f(e))
}
}
}

impl<TUpgrErr> fmt::Display for ProtocolsHandlerUpgrErr<TUpgrErr>
where
TUpgrErr: fmt::Display,
Expand Down
6 changes: 5 additions & 1 deletion swarm/src/protocols_handler/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade};
use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}};
use std::task::{Context, Poll};
use void::Void;

Expand Down Expand Up @@ -71,8 +71,12 @@ impl ProtocolsHandler for DummyProtocolsHandler {

fn inject_event(&mut self, _: Self::InEvent) {}

fn inject_address_change(&mut self, _: &Multiaddr) {}

fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}

fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error>) {}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
Expand Down
21 changes: 12 additions & 9 deletions swarm/src/protocols_handler/map_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};

use libp2p_core::Multiaddr;
use std::{marker::PhantomData, task::Context, task::Poll};

/// Wrapper around a protocol handler that turns the input event into something else.
Expand All @@ -38,7 +38,6 @@ pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {

impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
/// Creates a `MapInEvent`.
#[inline]
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
MapInEvent {
inner,
Expand All @@ -62,20 +61,17 @@ where
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;

#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.inner.listen_protocol()
}

#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}

#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
Expand All @@ -84,24 +80,31 @@ where
self.inner.inject_fully_negotiated_outbound(protocol, info)
}

#[inline]
fn inject_event(&mut self, event: TNewIn) {
if let Some(event) = (self.map)(event) {
self.inner.inject_event(event);
}
}

#[inline]
fn inject_address_change(&mut self, addr: &Multiaddr) {
self.inner.inject_address_change(addr)
}

fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

#[inline]
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive()
}

#[inline]
fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
21 changes: 12 additions & 9 deletions swarm/src/protocols_handler/map_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};

use libp2p_core::Multiaddr;
use std::task::{Context, Poll};

/// Wrapper around a protocol handler that turns the output event into something else.
Expand All @@ -37,7 +37,6 @@ pub struct MapOutEvent<TProtoHandler, TMap> {

impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
/// Creates a `MapOutEvent`.
#[inline]
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
MapOutEvent {
inner,
Expand All @@ -60,20 +59,17 @@ where
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;

#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.inner.listen_protocol()
}

#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}

#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
Expand All @@ -82,22 +78,29 @@ where
self.inner.inject_fully_negotiated_outbound(protocol, info)
}

#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
self.inner.inject_event(event)
}

#[inline]
fn inject_address_change(&mut self, addr: &Multiaddr) {
self.inner.inject_address_change(addr)
}

fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

#[inline]
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive()
}

#[inline]
fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
56 changes: 55 additions & 1 deletion swarm/src/protocols_handler/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::upgrade::{
UpgradeInfoSend
};
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use rand::Rng;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -135,6 +136,12 @@ where
}
}

fn inject_address_change(&mut self, addr: &Multiaddr) {
for h in self.handlers.values_mut() {
h.inject_address_change(addr)
}
}

fn inject_dial_upgrade_error (
&mut self,
(key, arg): Self::OutboundOpenInfo,
Expand All @@ -147,6 +154,53 @@ where
}
}

fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
match error {
ProtocolsHandlerUpgrErr::Timer =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer)
}
ProtocolsHandlerUpgrErr::Timeout =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout)
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)))
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) =>
match e {
ProtocolError::IoError(e) =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::InvalidMessage =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::InvalidProtocol =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::TooManyProtocols =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) =>
if let Some(h) = self.handlers.get_mut(&k) {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.handlers.values()
.map(|h| h.connection_keep_alive())
Expand Down
Loading