From 85e3025d4dfc382aeac29f9396aad451440442d4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 15 May 2023 04:40:55 +0200 Subject: [PATCH] feat(request-response): don't close connection on stream errors Related: #3591. Pull-Request: #3913. --- Cargo.lock | 3 ++ examples/file-sharing/Cargo.toml | 3 +- protocols/request-response/Cargo.toml | 2 + .../request-response/src/handler_priv.rs | 43 ++++--------------- 4 files changed, 15 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 890eb5583ac..f352aad1b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1533,6 +1533,7 @@ dependencies = [ "futures", "libp2p", "multiaddr", + "void", ] [[package]] @@ -2892,8 +2893,10 @@ dependencies = [ "libp2p-swarm-test", "libp2p-tcp", "libp2p-yamux", + "log", "rand 0.8.5", "smallvec", + "void", ] [[package]] diff --git a/examples/file-sharing/Cargo.toml b/examples/file-sharing/Cargo.toml index 83d19448d0b..0fa7883ac23 100644 --- a/examples/file-sharing/Cargo.toml +++ b/examples/file-sharing/Cargo.toml @@ -13,4 +13,5 @@ either = "1.8" env_logger = "0.10" futures = "0.3.28" libp2p = { path = "../../libp2p", features = ["async-std", "dns", "kad", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] } -multiaddr = { version = "0.17.1" } \ No newline at end of file +multiaddr = { version = "0.17.1" } +void = "1.0.2" diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 75cbdc369fa..b2bb23875ce 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -19,6 +19,8 @@ libp2p-swarm = { version = "0.42.1", path = "../../swarm" } libp2p-identity = { version = "0.1.0", path = "../../identity" } rand = "0.8" smallvec = "1.6.1" +void = "1.0.2" +log = "0.4.17" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/protocols/request-response/src/handler_priv.rs b/protocols/request-response/src/handler_priv.rs index 849c9e577fa..23a6f24d160 100644 --- a/protocols/request-response/src/handler_priv.rs +++ b/protocols/request-response/src/handler_priv.rs @@ -39,7 +39,7 @@ use libp2p_swarm::{ use smallvec::SmallVec; use std::{ collections::VecDeque, - fmt, io, + fmt, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -71,8 +71,6 @@ where substream_timeout: Duration, /// The current connection keep-alive. keep_alive: KeepAlive, - /// A pending fatal error that results in the connection being closed. - pending_error: Option>, /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -113,7 +111,6 @@ where outbound: VecDeque::new(), inbound: FuturesUnordered::new(), pending_events: VecDeque::new(), - pending_error: None, inbound_request_id, } } @@ -156,40 +153,22 @@ where // the remote peer does not support the requested protocol(s). self.pending_events .push_back(Event::OutboundUnsupportedProtocols(info)); + log::debug!("outbound stream {info} failed: Failed negotiation"); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + log::debug!("outbound stream {info} failed: {e}"); } + _ => {} } } fn on_listen_upgrade_error( &mut self, - ListenUpgradeError { info, error }: ListenUpgradeError< + ListenUpgradeError { error, info }: ListenUpgradeError< ::InboundOpenInfo, ::InboundProtocol, >, ) { - match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events.push_back(Event::InboundTimeout(info)) - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The local peer merely doesn't support the protocol(s) requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the local peer does not support the requested protocol(s). - self.pending_events - .push_back(Event::InboundUnsupportedProtocols(info)); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } + log::debug!("inbound stream {info} failed: {error}"); } } @@ -284,7 +263,7 @@ where { type InEvent = RequestProtocol; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; + type Error = void::Void; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; @@ -338,12 +317,6 @@ where cx: &mut Context<'_>, ) -> Poll, RequestId, Self::OutEvent, Self::Error>> { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - // Drain pending events. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::Custom(event));