Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Add FromFn ConnectionHandler #2852

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2c580d9
Add `ReadyUpgrade`
thomaseizinger Aug 24, 2022
b2d1c05
Add basic `FromFn` `ConnectionHandler`
thomaseizinger Aug 24, 2022
230fabb
Add `TState` abstraction
thomaseizinger Aug 28, 2022
58fe45b
Publicly expose types
thomaseizinger Aug 28, 2022
ff1db42
Fix docs
thomaseizinger Aug 28, 2022
1b175d2
Implement limit for max inbound streams and pending dials
thomaseizinger Aug 30, 2022
69fa94f
Implement std::error::Error for `OpenError`
thomaseizinger Aug 30, 2022
54e6437
Avoid dial terminology for streams
thomaseizinger Sep 5, 2022
635730a
Remove `idle_waker`
thomaseizinger Sep 5, 2022
b37fa5c
Make `TState` configurable
thomaseizinger Sep 5, 2022
d4079bf
Finish local work before producing new work
thomaseizinger Sep 5, 2022
bc361ec
Introduce `FromFnProto`
thomaseizinger Sep 5, 2022
712180b
Expose `remote_peer_id` and `connected_point` to closures
thomaseizinger Sep 5, 2022
f9f8e75
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 2, 2022
5d7f0bd
Implement `Shared` abstraction to automatically share state
thomaseizinger Nov 2, 2022
8ee59fd
Don't allow ConnectionHandlers to modify the state
thomaseizinger Nov 2, 2022
7fb4387
Implement test
thomaseizinger Nov 2, 2022
ac5c236
Remove unnecessary waker
thomaseizinger Nov 2, 2022
3e81e72
Separate clone-able registration data
thomaseizinger Nov 2, 2022
49c6eb1
WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 2, 2022
411c0d7
fixup! WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 11, 2022
5e185d8
fixup! WIP: Migrate `libp2p-rendezvous` to `from_fn`
thomaseizinger Nov 11, 2022
bdbeadc
Enforce the use of `Shared`
thomaseizinger Nov 11, 2022
660c0a3
Share state between connection handlers
thomaseizinger Nov 11, 2022
f5a3c50
Introduce builder pattern
thomaseizinger Nov 11, 2022
df74f14
Update docs
thomaseizinger Nov 11, 2022
f84388f
Make inbound streams impossible when no handler is configured
thomaseizinger Nov 11, 2022
6d1add5
Allow for streaming protocols
thomaseizinger Nov 11, 2022
913caf3
Pass `ConnectedPoint` by value
thomaseizinger Nov 11, 2022
e9cd5fc
Allow regular async functions to be used as handlers
thomaseizinger Nov 11, 2022
5801576
Flatten errors
thomaseizinger Nov 12, 2022
c15433f
Implement `libp2p-ping` with `from_fn` abstraction
thomaseizinger Nov 12, 2022
a418d2f
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 16, 2022
628e9ab
Fully migrate rendezvous
thomaseizinger Nov 16, 2022
7bdd953
Minimise diff
thomaseizinger Nov 16, 2022
4485c48
Merge branch 'master' into from-fn-connection-handler
thomaseizinger Nov 18, 2022
6f00703
Update to new behaviour interface
thomaseizinger Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 0 additions & 400 deletions protocols/ping/src/handler.rs

This file was deleted.

335 changes: 289 additions & 46 deletions protocols/ping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,40 @@

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod handler;
mod protocol;

use handler::Handler;
pub use handler::{Config, Failure, Success};
use libp2p_core::{connection::ConnectionId, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use std::{
collections::VecDeque,
task::{Context, Poll},
use crate::protocol::{recv_ping, send_ping};
use futures::future::Either;
use futures_timer::Delay;
use libp2p_core::connection::ConnectionId;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::handler::from_fn;
use libp2p_swarm::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, VecDeque};
use std::error::Error;
use std::num::NonZeroU32;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, io};

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Config instead.")]
pub type PingConfig = Config;
pub use crate::protocol::PROTOCOL_NAME;

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Event instead.")]
pub type PingEvent = Event;

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Success instead.")]
pub type PingSuccess = Success;

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Failure instead.")]
pub type PingFailure = Failure;

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Result instead.")]
pub type PingResult = Result;

#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Behaviour instead.")]
pub type Ping = Behaviour;

pub use self::protocol::PROTOCOL_NAME;

/// The result of an inbound or outbound ping.
pub type Result = std::result::Result<Success, Failure>;

type Handler = from_fn::FromFnProto<Result, Result, (), ()>;

/// A [`NetworkBehaviour`] that responds to inbound pings and
/// periodically sends outbound pings on every established connection.
///
/// See the crate root documentation for more information.
#[derive(Default)]
pub struct Behaviour {
/// Configuration for outbound pings.
config: Config,
/// Queue of events to yield to the swarm.
events: VecDeque<Event>,
actions: VecDeque<NetworkBehaviourAction<Event, Handler>>,
failures: HashMap<(PeerId, ConnectionId), (u32, VecDeque<Failure>)>,
}

/// Event generated by the `Ping` network behaviour.
Expand All @@ -102,14 +92,138 @@ impl Behaviour {
pub fn new(config: Config) -> Self {
Self {
config,
events: VecDeque::new(),
actions: Default::default(),
failures: Default::default(),
}
}

fn reset_num_failures(&mut self, peer: PeerId, connection_id: ConnectionId) {
self.failures.entry((peer, connection_id)).or_default().0 = 0;
}

fn record_failure(&mut self, peer: PeerId, connection_id: ConnectionId, e: Failure) {
self.failures
.entry((peer, connection_id))
.or_default()
.1
.push_back(e);
}
}

/// The configuration for outbound pings.
#[derive(Debug, Clone)]
pub struct Config {
/// The timeout of an outbound ping.
pub(crate) timeout: Duration,
/// The duration between the last successful outbound or inbound ping
/// and the next outbound ping.
pub(crate) interval: Duration,
/// The maximum number of failed outbound pings before the associated
/// connection is deemed unhealthy, indicating to the `Swarm` that it
/// should be closed.
pub(crate) max_failures: NonZeroU32,
}

impl Config {
/// Creates a new [`Config`] with the following default settings:
///
/// * [`Config::with_interval`] 15s
/// * [`Config::with_timeout`] 20s
/// * [`Config::with_max_failures`] 1
/// * [`Config::with_keep_alive`] false
///
/// These settings have the following effect:
///
/// * A ping is sent every 15 seconds on a healthy connection.
/// * Every ping sent must yield a response within 20 seconds in order to
/// be successful.
/// * A single ping failure is sufficient for the connection to be subject
/// to being closed.
/// * The connection may be closed at any time as far as the ping protocol
/// is concerned, i.e. the ping protocol itself does not keep the
/// connection alive.
pub fn new() -> Self {
Self {
timeout: Duration::from_secs(20),
interval: Duration::from_secs(15),
max_failures: NonZeroU32::new(1).expect("1 != 0"),
}
}

/// Sets the ping timeout.
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}

/// Sets the ping interval.
pub fn with_interval(mut self, d: Duration) -> Self {
self.interval = d;
self
}

/// Sets the maximum number of consecutive ping failures upon which the remote
/// peer is considered unreachable and the connection closed.
pub fn with_max_failures(mut self, n: NonZeroU32) -> Self {
self.max_failures = n;
self
}
}

impl Default for Behaviour {
impl Default for Config {
fn default() -> Self {
Self::new(Config::new())
Self::new()
}
}

/// The successful result of processing an inbound or outbound ping.
#[derive(Debug)]
pub enum Success {
/// Received a ping and sent back a pong.
Pong,
/// Sent a ping and received back a pong.
///
/// Includes the round-trip time.
Ping { rtt: Duration },
}

/// An outbound ping failure.
#[derive(Debug)]
pub enum Failure {
/// The ping timed out, i.e. no response was received within the
/// configured ping timeout.
Timeout,
/// The peer does not support the ping protocol.
Unsupported,
/// The ping failed for reasons other than a timeout.
Other {
error: Box<dyn Error + Send + 'static>,
},
}

impl From<io::Error> for Failure {
fn from(e: io::Error) -> Self {
Failure::Other { error: Box::new(e) }
}
}

impl fmt::Display for Failure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Failure::Timeout => f.write_str("Ping timeout"),
Failure::Other { error } => write!(f, "Ping error: {}", error),
Failure::Unsupported => write!(f, "Ping protocol not supported"),
}
}
}

impl Error for Failure {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Failure::Timeout => None,
Failure::Other { error } => Some(&**error),
Failure::Unsupported => None,
}
}
}

Expand All @@ -118,30 +232,159 @@ impl NetworkBehaviour for Behaviour {
type OutEvent = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
Handler::new(self.config.clone())
from_fn::from_fn(std::str::from_utf8(PROTOCOL_NAME).unwrap())
.without_state()
.with_streaming_inbound_handler(1, |stream, _, _, _| {
futures::stream::try_unfold(stream, |stream| async move {
let stream = recv_ping(stream).await?;

Ok(Some((Success::Pong, stream)))
})
})
.with_streaming_outbound_handler(1, {
let interval = self.config.interval;
let timeout = self.config.timeout;

move |stream, _, _, _, _| {
futures::stream::try_unfold(stream, move |stream| async move {
Delay::new(interval).await;

let ping = send_ping(stream);
futures::pin_mut!(ping);

match futures::future::select(Delay::new(timeout), ping).await {
Either::Left(((), _unfinished_ping)) => Err(Failure::Timeout),
Either::Right((Ok((stream, rtt)), _)) => {
Ok(Some((Success::Ping { rtt }, stream)))
}
Either::Right((Err(e), _)) => {
Err(Failure::Other { error: Box::new(e) })
}
}
})
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this looks pretty neat!

}

fn inject_connection_established(
&mut self,
peer: &PeerId,
connection: &ConnectionId,
_: &ConnectedPoint,
_: Option<&Vec<Multiaddr>>,
_: usize,
) {
self.actions.push_back(start_ping_action(peer, connection));
}

fn inject_event(&mut self, peer: PeerId, _: ConnectionId, result: Result) {
self.events.push_front(Event { peer, result })
fn inject_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
event: from_fn::OutEvent<Result, Result, ()>,
) {
match event {
from_fn::OutEvent::InboundEmitted(Ok(success)) => {
self.actions
.push_back(NetworkBehaviourAction::GenerateEvent(Event {
peer,
result: Ok(success),
}))
}
from_fn::OutEvent::OutboundEmitted(Ok(success)) => {
self.actions
.push_back(NetworkBehaviourAction::GenerateEvent(Event {
peer,
result: Ok(success),
}));
self.reset_num_failures(peer, connection);
}
from_fn::OutEvent::InboundEmitted(Err(e)) => {
log::debug!("Inbound ping error: {:?}", e);
}
from_fn::OutEvent::OutboundEmitted(Err(e)) => {
self.record_failure(peer, connection, e);
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Timeout(())) => {
self.record_failure(peer, connection, Failure::Timeout);
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Unsupported {
open_info: (),
..
}) => {
self.record_failure(peer, connection, Failure::Unsupported);
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::NegotiationFailed((), error)) => {
self.record_failure(
peer,
connection,
Failure::Other {
error: Box::new(error),
},
);
}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::LimitExceeded {
open_info: (),
..
}) => {
unreachable!("We only ever open a new stream if the old one is dead.")
}
}
}

fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(e) = self.events.pop_back() {
let Event { result, peer } = &e;
if let Some(action) = self.actions.pop_front() {
return Poll::Ready(action);
}

match result {
Ok(Success::Ping { .. }) => log::debug!("Ping sent to {:?}", peer),
Ok(Success::Pong) => log::debug!("Ping received from {:?}", peer),
_ => {}
}
for ((peer, connection), (failures, pending_errors)) in self.failures.iter_mut() {
// Check for outbound ping failures.
if let Some(error) = pending_errors.pop_back() {
log::debug!("Ping failure: {:?}", error);

Poll::Ready(NetworkBehaviourAction::GenerateEvent(e))
} else {
Poll::Pending
*failures += 1;

// Note: For backward-compatibility, with configured
// `max_failures == 1`, the first failure is always "free"
// and silent. This allows peers who still use a new substream
// for each ping to have successful ping exchanges with peers
// that use a single substream, since every successful ping
// resets `failures` to `0`, while at the same time emitting
// events only for `max_failures - 1` failures, as before.
if *failures > 1 || self.config.max_failures.get() > 1 {
if *failures >= self.config.max_failures.get() {
log::debug!("Too many failures ({}). Closing connection.", failures);
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id: *peer,
connection: CloseConnection::One(*connection),
});
}
}

self.actions.push_back(start_ping_action(peer, connection));

return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event {
peer: *peer,
result: Err(error),
}));
}
}

Poll::Pending
}
}

fn start_ping_action(
peer: &PeerId,
connection: &ConnectionId,
) -> NetworkBehaviourAction<Event, Handler> {
NetworkBehaviourAction::NotifyHandler {
peer_id: *peer,
handler: NotifyHandler::One(*connection),
event: from_fn::InEvent::NewOutbound(()),
}
}
Loading