Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement idle connection timeout #98

Merged
merged 7 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# 0.4.2 [unreleased]
- feat: Implement idle connections timeout [PR 98]
- chore: Add UninitializedIpfs::set_listening_addrs and minor changes
- chore: Add peer to dht when discovered over mdns

[PR 98]: https://github.com/dariusc93/rust-ipfs/pull/98

# 0.4.1
- fix: Dont close connections on ping error [PR 95]

Expand Down
24 changes: 17 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ pub struct IpfsOptions {
/// Enables ipv6 for mdns
pub mdns_ipv6: bool,

/// Keep connection alive
pub keep_alive: bool,

/// Enables dcutr
pub dcutr: bool,

Expand Down Expand Up @@ -227,6 +224,9 @@ pub struct IpfsOptions {

pub keystore: Keystore,

/// Connection idle
pub connection_idle: Duration,

/// Repo Provider option
pub provider: RepoProvider,
/// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
Expand Down Expand Up @@ -265,7 +265,6 @@ impl Default for IpfsOptions {
disable_kad: Default::default(),
disable_bitswap: Default::default(),
bitswap_config: Default::default(),
keep_alive: Default::default(),
relay_server: Default::default(),
relay_server_config: Default::default(),
kad_configuration: Default::default(),
Expand All @@ -275,6 +274,7 @@ impl Default for IpfsOptions {
addr_config: Default::default(),
provider: Default::default(),
keystore: Keystore::in_memory(),
connection_idle: Duration::from_secs(30),
listening_addrs: vec![],
port_mapping: false,
transport_configuration: None,
Expand Down Expand Up @@ -598,6 +598,16 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
self
}

/// Set timeout for idle connections
pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
let duration = match duration > 0 {
true => duration,
false => 30
};
self.options.connection_idle = Duration::from_secs(duration);
self
}

/// Set swarm configuration
pub fn set_swarm_configuration(mut self, config: crate::p2p::SwarmConfig) -> Self {
self.options.swarm_configuration = Some(config);
Expand Down Expand Up @@ -668,9 +678,9 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}

/// Enable keep alive
pub fn enable_keepalive(mut self) -> Self {
self.options.keep_alive = true;
self
#[deprecated(note = "use UninitializedIpfs::set_idle_connection(u64::MAX)")]
pub fn enable_keepalive(self) -> Self {
self.set_idle_connection_timeout(u64::MAX)
}

/// Disables kademlia
Expand Down
10 changes: 4 additions & 6 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::gossipsub::GossipsubStream;
use super::{addressbook, protocol};
use super::{addressbook, connection_idle, protocol};
use bytes::Bytes;
use libp2p_allow_block_list::BlockedPeers;

Expand Down Expand Up @@ -29,7 +29,6 @@ use libp2p::relay::client::Behaviour as RelayClient;
use libp2p::relay::client::{self, Transport as ClientTransport};
use libp2p::relay::Behaviour as Relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::keep_alive::Behaviour as KeepAliveBehaviour;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{autonat, StreamProtocol};
use std::borrow::Cow;
Expand All @@ -49,7 +48,6 @@ where
pub kademlia: Toggle<Kademlia<MemoryStore>>,
pub ping: Ping,
pub identify: Identify,
pub keepalive: Toggle<KeepAliveBehaviour>,
pub pubsub: GossipsubStream,
pub autonat: autonat::Behaviour,
pub upnp: Toggle<libp2p_nat::Behaviour>,
Expand All @@ -58,6 +56,7 @@ where
pub relay_client: Toggle<RelayClient>,
pub dcutr: Toggle<Dcutr>,
pub addressbook: addressbook::Behaviour,
pub connection_idle: connection_idle::Behaviour,
pub peerbook: peerbook::Behaviour,
pub protocol: protocol::Behaviour,
pub custom: Toggle<C>,
Expand Down Expand Up @@ -388,8 +387,6 @@ where
.then_some(Bitswap::new(peer_id, repo, Default::default()).await)
.into();

let keepalive = options.keep_alive.then(KeepAliveBehaviour::default).into();

let ping = Ping::new(options.ping_config.unwrap_or_default());

let identify = Identify::new(
Expand Down Expand Up @@ -461,14 +458,14 @@ where

let block_list = libp2p_allow_block_list::Behaviour::default();
let protocol = protocol::Behaviour::default();
let connection_idle = connection_idle::Behaviour::new(options.connection_idle);
let custom = Toggle::from(custom);

Ok((
Behaviour {
mdns,
kademlia,
bitswap,
keepalive,
ping,
identify,
autonat,
Expand All @@ -482,6 +479,7 @@ where
addressbook,
protocol,
custom,
connection_idle
},
transport,
))
Expand Down
87 changes: 87 additions & 0 deletions src/p2p/connection_idle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
mod handler;

use std::{
task::{Context, Poll},
time::Duration,
};

use libp2p::{
core::Endpoint,
swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};

pub struct Behaviour {
idle: Duration,
}

impl Behaviour {
pub fn new(idle: Duration) -> Behaviour {
Behaviour { idle }
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Handler;
type ToSwarm = void::Void;

fn handle_pending_inbound_connection(
&mut self,
_: ConnectionId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<(), ConnectionDenied> {
Ok(())
}

fn handle_pending_outbound_connection(
&mut self,
_: ConnectionId,
_: Option<PeerId>,
_: &[Multiaddr],
_: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(vec![])
}

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(handler::Handler::new(self.idle))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(handler::Handler::new(self.idle))
}

fn on_connection_handler_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}

fn on_swarm_event(&mut self, _: FromSwarm<Self::ConnectionHandler>) {}

fn poll(
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
Poll::Pending
}
}
70 changes: 70 additions & 0 deletions src/p2p/connection_idle/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{
task::{Context, Poll},
time::{Duration, Instant},
};

use libp2p::{
core::upgrade::DeniedUpgrade,
swarm::{handler::ConnectionEvent, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol},
};
use void::Void;

#[derive(Clone, Debug)]
pub struct Handler {
keep_alive: KeepAlive,
}

impl Handler {
pub fn new(idle: Duration) -> Self {
Self {
keep_alive: KeepAlive::Until(Instant::now() + idle),
}
}
}

impl libp2p::swarm::ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Void;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type InboundOpenInfo = ();
type OutboundOpenInfo = Void;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn on_behaviour_event(&mut self, v: Self::FromBehaviour) {
void::unreachable(v)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}

fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
Poll::Pending
}

fn on_connection_event(
&mut self,
_: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
}
}
11 changes: 7 additions & 4 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! P2P handling for IPFS nodes.
use std::convert::TryInto;
use std::num::{NonZeroU8, NonZeroUsize};
use std::time::Duration;

use crate::error::Error;
use crate::repo::Repo;
Expand All @@ -19,6 +20,7 @@ use tracing::Span;

pub(crate) mod addr;
pub(crate) mod addressbook;
pub(crate) mod connection_idle;
pub(crate) mod peerbook;
pub mod protocol;

Expand Down Expand Up @@ -139,12 +141,12 @@ pub struct SwarmOptions {
pub addrbook_config: Option<AddressBookConfig>,
/// UPnP/PortMapping
pub portmapping: bool,
/// Keep alive
pub keep_alive: bool,
/// Relay client
pub relay: bool,
/// Enables dcutr
pub dcutr: bool,
/// Connection idle
pub connection_idle: Duration,
}

impl From<&IpfsOptions> for SwarmOptions {
Expand All @@ -163,12 +165,13 @@ impl From<&IpfsOptions> for SwarmOptions {
let disable_bitswap = options.disable_bitswap;
let bitswap_config = options.bitswap_config.clone();

let keep_alive = options.keep_alive;
let identify_config = options.identify_configuration.clone();
let portmapping = options.port_mapping;
let pubsub_config = options.pubsub_config.clone();
let addrbook_config = options.addr_config;

let connection_idle = options.connection_idle;

SwarmOptions {
bootstrap,
mdns,
Expand All @@ -183,11 +186,11 @@ impl From<&IpfsOptions> for SwarmOptions {
kad_config,
kad_store_config,
ping_config,
keep_alive,
identify_config,
portmapping,
addrbook_config,
pubsub_config,
connection_idle,
}
}
}
Expand Down