Skip to content

Commit

Permalink
Implement ProtocolsHandler methods in wrappers. (#1710)
Browse files Browse the repository at this point in the history
* Implement ProtocolsHandler methods in wrappers.

This PR forwards calls to some ProtocolsHandler methods that were
previously not implemented in wrappers such as `MapInEvent`.

It is unclear though how this can be implemented in some handlers
such as `MultiHandler` as the information at hand does not enable
it to decide which handler to forward the call to.

* Add `MultiHandler::inject_listen_ugrade_error`.
  • Loading branch information
twittner authored Aug 18, 2020
1 parent cbdbf65 commit 21f9447
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 40 deletions.
26 changes: 15 additions & 11 deletions swarm/src/protocols_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ pub trait ProtocolsHandler: Send + 'static {
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
fn inject_listen_upgrade_error(
&mut self,
_: ProtocolsHandlerUpgrErr<
<Self::InboundProtocol as InboundUpgradeSend>::Error
>
_: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {}

/// Returns until when the connection should be kept alive.
Expand Down Expand Up @@ -189,7 +187,6 @@ pub trait ProtocolsHandler: Send + 'static {
>;

/// Adds a closure that turns the input event into something else.
#[inline]
fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
where
Self: Sized,
Expand All @@ -199,7 +196,6 @@ pub trait ProtocolsHandler: Send + 'static {
}

/// Adds a closure that turns the output event into something else.
#[inline]
fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
where
Self: Sized,
Expand All @@ -214,7 +210,6 @@ pub trait ProtocolsHandler: Send + 'static {
/// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence,
/// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned
/// > handler.
#[inline]
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
where
Self: Sized,
Expand All @@ -226,7 +221,6 @@ pub trait ProtocolsHandler: Send + 'static {
/// exclusively.
///
/// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`.
#[inline]
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
where
Self: Sized,
Expand Down Expand Up @@ -331,7 +325,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
{
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a
/// `TOutboundOpenInfo` to something else.
#[inline]
pub fn map_outbound_open_info<F, I>(
self,
map: F,
Expand All @@ -353,7 +346,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>

/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`)
/// to something else.
#[inline]
pub fn map_protocol<F, I>(
self,
map: F,
Expand All @@ -374,7 +366,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
}

/// If this is a `Custom` event, maps the content to something else.
#[inline]
pub fn map_custom<F, I>(
self,
map: F,
Expand All @@ -392,7 +383,6 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
}

/// If this is a `Close` event, maps the content to something else.
#[inline]
pub fn map_close<F, I>(
self,
map: F,
Expand Down Expand Up @@ -421,6 +411,20 @@ pub enum ProtocolsHandlerUpgrErr<TUpgrErr> {
Upgrade(UpgradeError<TUpgrErr>),
}

impl<TUpgrErr> ProtocolsHandlerUpgrErr<TUpgrErr> {
/// Map the inner [`UpgradeError`] type.
pub fn map_upgrade_err<F, E>(self, f: F) -> ProtocolsHandlerUpgrErr<E>
where
F: FnOnce(UpgradeError<TUpgrErr>) -> UpgradeError<E>
{
match self {
ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout,
ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer,
ProtocolsHandlerUpgrErr::Upgrade(e) => ProtocolsHandlerUpgrErr::Upgrade(f(e))
}
}
}

impl<TUpgrErr> fmt::Display for ProtocolsHandlerUpgrErr<TUpgrErr>
where
TUpgrErr: fmt::Display,
Expand Down
6 changes: 5 additions & 1 deletion swarm/src/protocols_handler/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade};
use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}};
use std::task::{Context, Poll};
use void::Void;

Expand Down Expand Up @@ -71,8 +71,12 @@ impl ProtocolsHandler for DummyProtocolsHandler {

fn inject_event(&mut self, _: Self::InEvent) {}

fn inject_address_change(&mut self, _: &Multiaddr) {}

fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}

fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error>) {}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
Expand Down
21 changes: 12 additions & 9 deletions swarm/src/protocols_handler/map_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};

use libp2p_core::Multiaddr;
use std::{marker::PhantomData, task::Context, task::Poll};

/// Wrapper around a protocol handler that turns the input event into something else.
Expand All @@ -38,7 +38,6 @@ pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {

impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
/// Creates a `MapInEvent`.
#[inline]
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
MapInEvent {
inner,
Expand All @@ -62,20 +61,17 @@ where
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;

#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.inner.listen_protocol()
}

#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}

#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
Expand All @@ -84,24 +80,31 @@ where
self.inner.inject_fully_negotiated_outbound(protocol, info)
}

#[inline]
fn inject_event(&mut self, event: TNewIn) {
if let Some(event) = (self.map)(event) {
self.inner.inject_event(event);
}
}

#[inline]
fn inject_address_change(&mut self, addr: &Multiaddr) {
self.inner.inject_address_change(addr)
}

fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

#[inline]
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive()
}

#[inline]
fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
21 changes: 12 additions & 9 deletions swarm/src/protocols_handler/map_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};

use libp2p_core::Multiaddr;
use std::task::{Context, Poll};

/// Wrapper around a protocol handler that turns the output event into something else.
Expand All @@ -37,7 +37,6 @@ pub struct MapOutEvent<TProtoHandler, TMap> {

impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
/// Creates a `MapOutEvent`.
#[inline]
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
MapOutEvent {
inner,
Expand All @@ -60,20 +59,17 @@ where
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;

#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.inner.listen_protocol()
}

#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}

#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
Expand All @@ -82,22 +78,29 @@ where
self.inner.inject_fully_negotiated_outbound(protocol, info)
}

#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
self.inner.inject_event(event)
}

#[inline]
fn inject_address_change(&mut self, addr: &Multiaddr) {
self.inner.inject_address_change(addr)
}

fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

#[inline]
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive()
}

#[inline]
fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
56 changes: 55 additions & 1 deletion swarm/src/protocols_handler/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::upgrade::{
UpgradeInfoSend
};
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ConnectedPoint, PeerId, upgrade::ProtocolName};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use rand::Rng;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -135,6 +136,12 @@ where
}
}

fn inject_address_change(&mut self, addr: &Multiaddr) {
for h in self.handlers.values_mut() {
h.inject_address_change(addr)
}
}

fn inject_dial_upgrade_error (
&mut self,
(key, arg): Self::OutboundOpenInfo,
Expand All @@ -147,6 +154,53 @@ where
}
}

fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
match error {
ProtocolsHandlerUpgrErr::Timer =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer)
}
ProtocolsHandlerUpgrErr::Timeout =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout)
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)))
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) =>
match e {
ProtocolError::IoError(e) =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::InvalidMessage =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::InvalidProtocol =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
ProtocolError::TooManyProtocols =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) =>
if let Some(h) = self.handlers.get_mut(&k) {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.handlers.values()
.map(|h| h.connection_keep_alive())
Expand Down
Loading

0 comments on commit 21f9447

Please sign in to comment.