From 69d7ad797f73e58374f73895f6949dbe30777461 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 5 Sep 2024 12:34:29 +0300 Subject: [PATCH] errors: Introduce immediate dial error and request-response rejection reasons (#227) This PR makes several changes to the errors in general, with the main goal of extracting the reject-reason from the request response protocols: - an `ImmediateDial` error is introduced for failing to dial peers due to internal errors (no addresses available, tried to dial self, already connected etc) to distinguish between network dial failures - opening substreams now return a `SubstreamError` instead of using the overarching litep2p::Error - substreams are now implementing `Stream` for consistency with opening substreams - Reject reasons include immediate dialing errors. Ideally, we could expose the same level of information that is exposed via `ListDialErrors` (provide a tuple of addresses with individual dial errors), however that would require a bigger refactor to the code. For now this information is enough for Substrate metrics to provide more information and align with litep2p metrics. This PR is part of a bigger effort to simply the overarching error enum: - https://github.com/paritytech/litep2p/issues/204 Closes: https://github.com/paritytech/litep2p/issues/188 --------- Signed-off-by: Alexandru Vasile --- src/error.rs | 67 +++++++++++++++++-- src/mock/substream.rs | 50 +++++++++----- src/multistream_select/negotiated.rs | 2 +- src/multistream_select/protocol.rs | 9 +++ src/protocol/connection.rs | 12 ++-- src/protocol/libp2p/identify.rs | 2 +- src/protocol/libp2p/kademlia/executor.rs | 18 +++-- src/protocol/libp2p/kademlia/mod.rs | 10 +-- src/protocol/libp2p/kademlia/store.rs | 4 +- src/protocol/mod.rs | 2 +- src/protocol/notification/mod.rs | 6 +- src/protocol/notification/negotiation.rs | 8 ++- .../notification/tests/notification.rs | 3 +- .../tests/substream_validation.rs | 2 +- src/protocol/protocol_set.rs | 2 +- src/protocol/request_response/handle.rs | 58 ++++++++++++++-- src/protocol/request_response/mod.rs | 52 ++++++++++---- src/protocol/request_response/tests.rs | 4 +- src/protocol/transport_service.rs | 17 +++-- src/substream/mod.rs | 53 +++++++-------- src/transport/manager/handle.rs | 41 +++++------- src/transport/quic/substream.rs | 9 +-- src/yamux/error.rs | 13 ++++ src/yamux/frame/header.rs | 2 +- src/yamux/frame/io.rs | 11 +++ tests/protocol/request_response.rs | 18 ++--- tests/substream.rs | 11 ++- 27 files changed, 331 insertions(+), 155 deletions(-) diff --git a/src/error.rs b/src/error.rs index f05a83c3..0a5ddc93 100644 --- a/src/error.rs +++ b/src/error.rs @@ -125,6 +125,8 @@ pub enum Error { ConnectionDoesntExist(ConnectionId), #[error("Exceeded connection limits `{0:?}`")] ConnectionLimit(ConnectionLimitsError), + #[error("Failed to dial peer immediately")] + ImmediateDialError(#[from] ImmediateDialError), } /// Error type for address parsing. @@ -150,7 +152,7 @@ pub enum AddressError { InvalidPeerId(Multihash), } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum ParseError { /// The provided probuf message cannot be decoded. #[error("Failed to decode protobuf message: `{0:?}`")] @@ -180,10 +182,16 @@ pub enum ParseError { InvalidData, } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum SubstreamError { #[error("Connection closed")] ConnectionClosed, + #[error("Connection channel clogged")] + ChannelClogged, + #[error("Connection to peer does not exist: `{0}`")] + PeerDoesNotExist(PeerId), + #[error("I/O error: `{0}`")] + IoError(ErrorKind), #[error("yamux error: `{0}`")] YamuxError(crate::yamux::ConnectionError, Direction), #[error("Failed to read from substream, substream id `{0:?}`")] @@ -232,6 +240,25 @@ pub enum NegotiationError { WebSocket(#[from] tokio_tungstenite::tungstenite::error::Error), } +impl PartialEq for NegotiationError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::MultistreamSelectError(lhs), Self::MultistreamSelectError(rhs)) => lhs == rhs, + (Self::SnowError(lhs), Self::SnowError(rhs)) => lhs == rhs, + (Self::ParseError(lhs), Self::ParseError(rhs)) => lhs == rhs, + (Self::IoError(lhs), Self::IoError(rhs)) => lhs == rhs, + (Self::PeerIdMismatch(lhs, lhs_1), Self::PeerIdMismatch(rhs, rhs_1)) => + lhs == rhs && lhs_1 == rhs_1, + #[cfg(feature = "quic")] + (Self::Quic(lhs), Self::Quic(rhs)) => lhs == rhs, + #[cfg(feature = "websocket")] + (Self::WebSocket(lhs), Self::WebSocket(rhs)) => + core::mem::discriminant(lhs) == core::mem::discriminant(rhs), + _ => core::mem::discriminant(self) == core::mem::discriminant(other), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum NotificationError { #[error("Peer already exists")] @@ -246,7 +273,8 @@ pub enum NotificationError { /// The error type for dialing a peer. /// -/// This error is reported via the litep2p events. +/// This error is reported via the litep2p events after performing +/// a network dialing operation. #[derive(Debug, thiserror::Error)] pub enum DialError { /// The dialing operation timed out. @@ -269,9 +297,32 @@ pub enum DialError { NegotiationError(#[from] NegotiationError), } +/// Dialing resulted in an immediate error before performing any network operations. +#[derive(Debug, thiserror::Error, Copy, Clone, Eq, PartialEq)] +pub enum ImmediateDialError { + /// The provided address does not include a peer ID. + #[error("`PeerId` missing from the address")] + PeerIdMissing, + /// The peer ID provided in the address is the same as the local peer ID. + #[error("Tried to dial self")] + TriedToDialSelf, + /// Cannot dial an already connected peer. + #[error("Already connected to peer")] + AlreadyConnected, + /// Cannot dial a peer that does not have any address available. + #[error("No address available for peer")] + NoAddressAvailable, + /// The essential task was closed. + #[error("TaskClosed")] + TaskClosed, + /// The channel is clogged. + #[error("Connection channel clogged")] + ChannelClogged, +} + /// Error during the QUIC transport negotiation. #[cfg(feature = "quic")] -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum QuicError { /// The provided certificate is invalid. #[error("Invalid certificate")] @@ -285,7 +336,7 @@ pub enum QuicError { } /// Error during DNS resolution. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum DnsError { /// The DNS resolution failed to resolve the provided URL. #[error("DNS failed to resolve url `{0}`")] @@ -309,6 +360,12 @@ impl From for Error { } } +impl From for SubstreamError { + fn from(error: io::Error) -> SubstreamError { + SubstreamError::IoError(error.kind()) + } +} + impl From for DialError { fn from(error: io::Error) -> Self { DialError::NegotiationError(NegotiationError::IoError(error.kind())) diff --git a/src/mock/substream.rs b/src/mock/substream.rs index d206c434..4c52394e 100644 --- a/src/mock/substream.rs +++ b/src/mock/substream.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::error::Error; +use crate::error::SubstreamError; use bytes::{Bytes, BytesMut}; use futures::{Sink, Stream}; @@ -31,15 +31,20 @@ use std::{ /// Trait which describes the behavior of a mock substream. pub trait Substream: - Debug + Stream> + Sink + Send + Unpin + 'static + Debug + + Stream> + + Sink + + Send + + Unpin + + 'static { } /// Blanket implementation for [`Substream`]. impl< T: Debug - + Stream> - + Sink + + Stream> + + Sink + Send + Unpin + 'static, @@ -52,33 +57,33 @@ mockall::mock! { pub Substream {} impl Sink for Substream { - type Error = Error; + type Error = SubstreamError; fn poll_ready<'a>( self: Pin<&mut Self>, cx: &mut Context<'a> - ) -> Poll>; + ) -> Poll>; - fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), Error>; + fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), SubstreamError>; fn poll_flush<'a>( self: Pin<&mut Self>, cx: &mut Context<'a> - ) -> Poll>; + ) -> Poll>; fn poll_close<'a>( self: Pin<&mut Self>, cx: &mut Context<'a> - ) -> Poll>; + ) -> Poll>; } impl Stream for Substream { - type Item = crate::Result; + type Item = Result; fn poll_next<'a>( self: Pin<&mut Self>, cx: &mut Context<'a> - ) -> Poll>>; + ) -> Poll>>; } } @@ -95,32 +100,41 @@ impl DummySubstream { } impl Sink for DummySubstream { - type Error = Error; + type Error = SubstreamError; - fn poll_ready<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll> { + fn poll_ready<'a>( + self: Pin<&mut Self>, + _cx: &mut Context<'a>, + ) -> Poll> { Poll::Pending } - fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), Error> { + fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), SubstreamError> { Ok(()) } - fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll> { + fn poll_flush<'a>( + self: Pin<&mut Self>, + _cx: &mut Context<'a>, + ) -> Poll> { Poll::Pending } - fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll> { + fn poll_close<'a>( + self: Pin<&mut Self>, + _cx: &mut Context<'a>, + ) -> Poll> { Poll::Ready(Ok(())) } } impl Stream for DummySubstream { - type Item = crate::Result; + type Item = Result; fn poll_next<'a>( self: Pin<&mut Self>, _cx: &mut Context<'a>, - ) -> Poll>> { + ) -> Poll>> { Poll::Pending } } diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 846399aa..2a08f29c 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -350,7 +350,7 @@ where } /// Error that can happen when negotiating a protocol with the remote. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum NegotiationError { /// A protocol error occurred during the negotiation. #[error("A protocol error occurred during the negotiation: `{0:?}`")] diff --git a/src/multistream_select/protocol.rs b/src/multistream_select/protocol.rs index 3c02f794..694745a6 100644 --- a/src/multistream_select/protocol.rs +++ b/src/multistream_select/protocol.rs @@ -445,6 +445,15 @@ pub enum ProtocolError { ProtocolNotSupported, } +impl PartialEq for ProtocolError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ProtocolError::IoError(lhs), ProtocolError::IoError(rhs)) => lhs.kind() == rhs.kind(), + _ => std::mem::discriminant(self) == std::mem::discriminant(other), + } + } +} + impl From for io::Error { fn from(err: ProtocolError) -> Self { if let ProtocolError::IoError(e) = err { diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index 107e2df2..00277b8a 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -21,7 +21,7 @@ //! Connection-related helper code. use crate::{ - error::Error, + error::{Error, SubstreamError}, protocol::protocol_set::ProtocolCommand, types::{protocol::ProtocolName, ConnectionId, SubstreamId}, }; @@ -110,11 +110,11 @@ impl ConnectionHandle { fallback_names: Vec, substream_id: SubstreamId, permit: Permit, - ) -> crate::Result<()> { + ) -> Result<(), SubstreamError> { match &self.connection { ConnectionType::Active(active) => active.clone(), ConnectionType::Inactive(inactive) => - inactive.upgrade().ok_or(Error::ConnectionClosed)?, + inactive.upgrade().ok_or(SubstreamError::ConnectionClosed)?, } .try_send(ProtocolCommand::OpenSubstream { protocol: protocol.clone(), @@ -123,8 +123,8 @@ impl ConnectionHandle { permit, }) .map_err(|error| match error { - TrySendError::Full(_) => Error::ChannelClogged, - TrySendError::Closed(_) => Error::ConnectionClosed, + TrySendError::Full(_) => SubstreamError::ChannelClogged, + TrySendError::Closed(_) => SubstreamError::ConnectionClosed, }) } @@ -236,7 +236,7 @@ mod tests { SubstreamId::new(), permit, ) { - Err(Error::ChannelClogged) => {} + Err(SubstreamError::ChannelClogged) => {} error => panic!("invalid error: {error:?}"), } } diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index 184f7fb1..1c0a5985 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -325,7 +325,7 @@ impl Identify { return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some( substream_id, )))), - Ok(Some(Err(error))) => return Err(error), + Ok(Some(Err(error))) => return Err(error.into()), Ok(Some(Ok(payload))) => payload, }; diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index 9b8bd8ce..c2a04bf9 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -259,10 +259,9 @@ mod tests { let mut executor = QueryExecutor::new(); let peer = PeerId::random(); let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown)))); + substream.expect_poll_next().times(1).return_once(|_| { + Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed))) + }); executor.read_message( peer, @@ -294,10 +293,9 @@ mod tests { substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); substream.expect_start_send().times(1).return_once(|_| Ok(())); substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown)))); + substream.expect_poll_next().times(1).return_once(|_| { + Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed))) + }); executor.send_request_read_response( peer, @@ -330,7 +328,7 @@ mod tests { substream .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(crate::Error::Unknown))); + .return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed))); substream.expect_poll_close().times(1).return_once(|_| Poll::Ready(Ok(()))); executor.send_request_read_response( @@ -393,7 +391,7 @@ mod tests { substream .expect_poll_next() .times(1) - .return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown)))); + .return_once(|_| Poll::Ready(Some(Err(crate::error::SubstreamError::ChannelClogged)))); executor.read_message( peer, diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index ea157ea1..82e9da24 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -21,7 +21,7 @@ //! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation. use crate::{ - error::{Error, SubstreamError}, + error::{Error, ImmediateDialError, SubstreamError}, protocol::{ libp2p::kademlia::{ bucket::KBucketEntry, @@ -666,7 +666,7 @@ impl Kademlia { } // Already connected is a recoverable error. - Err(Error::AlreadyConnected) => { + Err(ImmediateDialError::AlreadyConnected) => { // Dial returned `Error::AlreadyConnected`, retry opening the substream. match self.service.open_substream(peer) { Ok(substream_id) => { @@ -680,14 +680,14 @@ impl Kademlia { } Err(err) => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); - Err(err) + Err(err.into()) } } } Err(error) => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer"); - Err(error) + Err(error.into()) } } } @@ -819,7 +819,7 @@ impl Kademlia { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address }) => self.on_dial_failure(peer, address), + Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address), None => return Err(Error::EssentialTaskClosed), }, context = self.executor.next() => { diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index fced9372..6222a6fd 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -138,7 +138,7 @@ impl MemoryStore { Vec::default() } else { - self.provider_keys.get(key).cloned().unwrap_or_else(|| Vec::default()) + self.provider_keys.get(key).cloned().unwrap_or_else(Vec::default) } } @@ -202,7 +202,7 @@ impl MemoryStore { false } else { - if providers.len() == usize::from(self.config.max_providers_per_key) { + if providers.len() == self.config.max_providers_per_key { providers.pop(); } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 87b92f84..9fcf5432 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -48,7 +48,7 @@ mod protocol_set; mod transport_service; /// Substream direction. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum Direction { /// Substream was opened by the remote peer. Inbound, diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 984771b3..42063081 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -951,7 +951,7 @@ impl NotificationProtocol { ) .await; - return Err(error); + return Err(error.into()); } Ok(()) => { tracing::trace!( @@ -1175,7 +1175,7 @@ impl NotificationProtocol { ) .await; - Err(error) + Err(error.into()) } }, // here the state is one of `OutboundState::{OutboundInitiated, Negotiating, @@ -1767,7 +1767,7 @@ impl NotificationProtocol { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address }) => self.on_dial_failure(peer, address).await, + Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address).await, None => (), }, result = self.pending_validations.select_next_some(), if !self.pending_validations.is_empty() => { diff --git a/src/protocol/notification/negotiation.rs b/src/protocol/notification/negotiation.rs index 8d238dfe..9c53c760 100644 --- a/src/protocol/notification/negotiation.rs +++ b/src/protocol/notification/negotiation.rs @@ -328,7 +328,6 @@ mod tests { use crate::{ mock::substream::{DummySubstream, MockSubstream}, types::SubstreamId, - Error, }; use futures::StreamExt; @@ -344,7 +343,10 @@ mod tests { let mut substream = MockSubstream::new(); substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream.expect_start_send().times(1).return_once(|_| Err(Error::Unknown)); + substream + .expect_start_send() + .times(1) + .return_once(|_| Err(crate::error::SubstreamError::ConnectionClosed)); let peer = PeerId::random(); let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream)); @@ -382,7 +384,7 @@ mod tests { substream .expect_poll_flush() .times(1) - .return_once(|_| Poll::Ready(Err(Error::Unknown))); + .return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed))); let peer = PeerId::random(); let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream)); diff --git a/src/protocol/notification/tests/notification.rs b/src/protocol/notification/tests/notification.rs index f766fc44..fc2c1ee7 100644 --- a/src/protocol/notification/tests/notification.rs +++ b/src/protocol/notification/tests/notification.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - error::Error, mock::substream::{DummySubstream, MockSubstream}, protocol::{ self, @@ -1029,7 +1028,7 @@ async fn second_inbound_substream_opened_while_outbound_substream_was_opening() substream1 .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(Error::Unknown))); + .return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed))); notif.peers.insert( peer, diff --git a/src/protocol/notification/tests/substream_validation.rs b/src/protocol/notification/tests/substream_validation.rs index cf2d6bb8..b2ac5054 100644 --- a/src/protocol/notification/tests/substream_validation.rs +++ b/src/protocol/notification/tests/substream_validation.rs @@ -256,7 +256,7 @@ async fn accept_fails_due_to_closed_substream() { substream .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(Error::SubstreamError(SubstreamError::ConnectionClosed)))); + .return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed))); let (proto_tx, _proto_rx) = channel(256); tx.send(InnerTransportEvent::ConnectionEstablished { diff --git a/src/protocol/protocol_set.rs b/src/protocol/protocol_set.rs index 736cb7f7..a03c9e82 100644 --- a/src/protocol/protocol_set.rs +++ b/src/protocol/protocol_set.rs @@ -173,7 +173,7 @@ pub enum ProtocolCommand { /// Fallback names. /// - /// If the protocol has changed its name but wishes to suppor the old name(s), it must + /// If the protocol has changed its name but wishes to support the old name(s), it must /// provide the old protocol names in `fallback_names`. These are fed into /// `multistream-select` which them attempts to negotiate a protocol for the substream /// using one of the provided names and if the substream is negotiated successfully, will diff --git a/src/protocol/request_response/handle.rs b/src/protocol/request_response/handle.rs index 3d1cfe9b..96ad334f 100644 --- a/src/protocol/request_response/handle.rs +++ b/src/protocol/request_response/handle.rs @@ -19,6 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + error::{ImmediateDialError, SubstreamError}, + multistream_select::ProtocolError, types::{protocol::ProtocolName, RequestId}, Error, PeerId, }; @@ -31,6 +33,7 @@ use tokio::sync::{ use std::{ collections::HashMap, + io::ErrorKind, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -43,10 +46,10 @@ use std::{ const LOG_TARGET: &str = "litep2p::request-response::handle"; /// Request-response error. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum RequestResponseError { /// Request was rejected. - Rejected, + Rejected(RejectReason), /// Request was canceled by the local node. Canceled, @@ -54,7 +57,7 @@ pub enum RequestResponseError { /// Request timed out. Timeout, - /// Litep2p isn't connected to the peer. + /// The peer is not connected and the dialing option was [`DialOptions::Reject`]. NotConnected, /// Too large payload. @@ -64,6 +67,53 @@ pub enum RequestResponseError { UnsupportedProtocol, } +/// The reason why a request was rejected. +#[derive(Debug, PartialEq)] +pub enum RejectReason { + /// Substream error. + SubstreamOpenError(SubstreamError), + + /// The peer disconnected before the request was processed. + ConnectionClosed, + + /// The substream was closed before the request was processed. + SubstreamClosed, + + /// The dial failed. + /// + /// If the dial failure is immediate, the error is included. + /// + /// If the dialing process is happening in parallel on multiple + /// addresses (potentially with multiple protocols), the dialing + /// process is not considered immediate and the given errors are not + /// propagated for simplicity. + DialFailed(Option), +} + +impl From for RejectReason { + fn from(error: SubstreamError) -> Self { + // Convert `ErrorKind::NotConnected` to `RejectReason::ConnectionClosed`. + match error { + SubstreamError::IoError(error) if error == ErrorKind::NotConnected => + RejectReason::ConnectionClosed, + SubstreamError::YamuxError(crate::yamux::ConnectionError::Io(error), _) + if error.kind() == ErrorKind::NotConnected => + RejectReason::ConnectionClosed, + SubstreamError::NegotiationError(crate::error::NegotiationError::IoError(error)) + if error == ErrorKind::NotConnected => + RejectReason::ConnectionClosed, + SubstreamError::NegotiationError( + crate::error::NegotiationError::MultistreamSelectError( + crate::multistream_select::NegotiationError::ProtocolError( + ProtocolError::IoError(error), + ), + ), + ) if error.kind() == ErrorKind::NotConnected => RejectReason::ConnectionClosed, + error => RejectReason::SubstreamOpenError(error), + } + } +} + /// Request-response events. pub(super) enum InnerRequestResponseEvent { /// Request received from remote @@ -141,7 +191,7 @@ impl From for RequestResponseEvent { } /// Request-response events. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum RequestResponseEvent { /// Request received from remote RequestReceived { diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 3125f71c..276b0713 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -53,7 +53,9 @@ use std::{ }; pub use config::{Config, ConfigBuilder}; -pub use handle::{DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle}; +pub use handle::{ + DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, RequestResponseHandle, +}; mod config; mod handle; @@ -270,7 +272,7 @@ impl RequestResponseProtocol { .report_request_failure( peer, context.request_id, - RequestResponseError::Rejected, + RequestResponseError::Rejected(error.into()), ) .await; } @@ -301,7 +303,7 @@ impl RequestResponseProtocol { .send(InnerRequestResponseEvent::RequestFailed { peer, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed), }) .await; } @@ -369,7 +371,7 @@ impl RequestResponseProtocol { fallback_protocol, Err(RequestResponseError::Timeout), ), - Ok(Err(Error::IoError(ErrorKind::PermissionDenied))) => { + Ok(Err(SubstreamError::IoError(ErrorKind::PermissionDenied))) => { tracing::warn!( target: LOG_TARGET, ?peer, @@ -384,11 +386,11 @@ impl RequestResponseProtocol { Err(RequestResponseError::TooLargePayload), ) } - Ok(Err(_error)) => ( + Ok(Err(error)) => ( peer, request_id, fallback_protocol, - Err(RequestResponseError::NotConnected), + Err(RequestResponseError::Rejected(error.into())), ), Ok(Ok(_)) => { tokio::select! { @@ -423,8 +425,20 @@ impl RequestResponseProtocol { event = substream.next() => match event { Some(Ok(response)) => { (peer, request_id, fallback_protocol, Ok(response.freeze().into())) + }, + Some(Err(error)) => { + (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(error.into()))) + }, + None => { + tracing::info!( + target: LOG_TARGET, + ?peer, + %protocol, + ?request_id, + "substream closed", + ); + (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(RejectReason::SubstreamClosed))) } - _ => (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected)), } } } @@ -439,7 +453,7 @@ impl RequestResponseProtocol { &mut self, peer: PeerId, request_id: RequestId, - request: crate::Result, + request: Result, ) -> crate::Result<()> { let fallback = self .peers @@ -614,7 +628,11 @@ impl RequestResponseProtocol { .get_mut(&peer) .map(|peer_context| peer_context.active.remove(&context.request_id)); let _ = self - .report_request_failure(peer, context.request_id, RequestResponseError::Rejected) + .report_request_failure( + peer, + context.request_id, + RequestResponseError::Rejected(RejectReason::DialFailed(None)), + ) .await; } } @@ -663,7 +681,7 @@ impl RequestResponseProtocol { SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError( MultistreamFailed, )) => RequestResponseError::UnsupportedProtocol, - _ => RequestResponseError::Rejected, + _ => RequestResponseError::Rejected(error.into()), }, }) .await @@ -754,7 +772,9 @@ impl RequestResponseProtocol { .report_request_failure( peer, request_id, - RequestResponseError::Rejected, + RequestResponseError::Rejected(RejectReason::DialFailed(Some( + error, + ))), ) .await; } @@ -786,8 +806,12 @@ impl RequestResponseProtocol { "failed to open substream", ); - self.report_request_failure(peer, request_id, RequestResponseError::Rejected) - .await + self.report_request_failure( + peer, + request_id, + RequestResponseError::Rejected(error.into()), + ) + .await } } } @@ -914,7 +938,7 @@ impl RequestResponseProtocol { ); } } - Some(TransportEvent::DialFailure { peer, .. }) => self.on_dial_failure(peer).await, + Some(TransportEvent::DialFailure { peer, .. }) => self.on_dial_failure(peer).await, None => return, }, event = self.pending_inbound.select_next_some(), if !self.pending_inbound.is_empty() => { diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 107b87ca..73d8ce86 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -189,7 +189,7 @@ async fn inbound_substream_error() { substream .expect_poll_next() .times(1) - .return_once(|_| Poll::Ready(Some(Err(Error::Unknown)))); + .return_once(|_| Poll::Ready(Some(Err(SubstreamError::ConnectionClosed)))); // register inbound substream from peer protocol @@ -297,7 +297,7 @@ async fn request_failure_reported_once() { }) => { assert_eq!(request_peer, peer); assert_eq!(request_id, RequestId::from(1337usize)); - assert_eq!(error, RequestResponseError::Rejected); + assert!(matches!(error, RequestResponseError::Rejected(_))); } event => panic!("unexpected event: {event:?}"), } diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 1525cb53..68c3bce3 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -20,7 +20,7 @@ use crate::{ addresses::PublicAddresses, - error::Error, + error::{Error, ImmediateDialError, SubstreamError}, protocol::{connection::ConnectionHandle, InnerTransportEvent, TransportEvent}, transport::{manager::TransportManagerHandle, Endpoint}, types::{protocol::ProtocolName, ConnectionId, SubstreamId}, @@ -290,7 +290,7 @@ impl TransportService { /// Dial `peer` using `PeerId`. /// /// Call fails if `Litep2p` doesn't have a known address for the peer. - pub fn dial(&mut self, peer: &PeerId) -> crate::Result<()> { + pub fn dial(&mut self, peer: &PeerId) -> Result<(), ImmediateDialError> { self.transport_handle.dial(peer) } @@ -302,7 +302,7 @@ impl TransportService { /// Calling this function is only necessary for those addresses that are discovered out-of-band /// since `Litep2p` internally keeps track of all peer addresses it has learned through user /// calling this function, Kademlia peer discoveries and `Identify` responses. - pub fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> { + pub fn dial_address(&mut self, address: Multiaddr) -> Result<(), ImmediateDialError> { self.transport_handle.dial_address(address) } @@ -327,12 +327,15 @@ impl TransportService { /// /// Call fails if there is no connection open to `peer` or the channel towards /// the connection is clogged. - pub fn open_substream(&mut self, peer: PeerId) -> crate::Result { + pub fn open_substream(&mut self, peer: PeerId) -> Result { // always prefer the primary connection - let connection = - &mut self.connections.get_mut(&peer).ok_or(Error::PeerDoesntExist(peer))?.primary; + let connection = &mut self + .connections + .get_mut(&peer) + .ok_or(SubstreamError::PeerDoesNotExist(peer))? + .primary; - let permit = connection.try_get_permit().ok_or(Error::ConnectionClosed)?; + let permit = connection.try_get_permit().ok_or(SubstreamError::ConnectionClosed)?; let substream_id = SubstreamId::from(self.next_substream_id.fetch_add(1usize, Ordering::Relaxed)); diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 37ba98c0..83a3b136 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -22,11 +22,7 @@ //! Substream-related helper code. use crate::{ - codec::ProtocolCodec, - error::{Error, SubstreamError}, - transport::tcp, - types::SubstreamId, - PeerId, + codec::ProtocolCodec, error::SubstreamError, transport::tcp, types::SubstreamId, PeerId, }; #[cfg(feature = "quic")] @@ -160,7 +156,7 @@ macro_rules! check_size { ($max_size:expr, $size:expr) => {{ if let Some(max_size) = $max_size { if $size > max_size { - return Err(Error::IoError(ErrorKind::PermissionDenied)); + return Err(SubstreamError::IoError(ErrorKind::PermissionDenied).into()); } } }}; @@ -362,14 +358,12 @@ impl Substream { io: &mut T, payload_size: usize, payload: Bytes, - ) -> crate::Result<()> { + ) -> Result<(), SubstreamError> { if payload.len() != payload_size { - return Err(Error::IoError(ErrorKind::PermissionDenied)); + return Err(SubstreamError::IoError(ErrorKind::PermissionDenied)); } - io.write_all(&payload) - .await - .map_err(|_| Error::SubstreamError(SubstreamError::ConnectionClosed))?; + io.write_all(&payload).await.map_err(|_| SubstreamError::ConnectionClosed)?; // Flush the stream. io.flush().await.map_err(From::from) @@ -380,10 +374,10 @@ impl Substream { io: &mut T, bytes: Bytes, max_size: Option, - ) -> crate::Result<()> { + ) -> Result<(), SubstreamError> { if let Some(max_size) = max_size { if bytes.len() > max_size { - return Err(Error::IoError(ErrorKind::PermissionDenied)); + return Err(SubstreamError::IoError(ErrorKind::PermissionDenied)); } } @@ -413,7 +407,7 @@ impl Substream { /// # Panics /// /// Panics if no codec is provided. - pub async fn send_framed(&mut self, bytes: Bytes) -> crate::Result<()> { + pub async fn send_framed(&mut self, bytes: Bytes) -> Result<(), SubstreamError> { tracing::trace!( target: LOG_TARGET, peer = ?self.peer, @@ -425,7 +419,7 @@ impl Substream { match &mut self.substream { #[cfg(test)] SubstreamType::Mock(ref mut substream) => - futures::SinkExt::send(substream, bytes).await, + futures::SinkExt::send(substream, bytes).await.map_err(Into::into), SubstreamType::Tcp(ref mut substream) => match self.codec { ProtocolCodec::Unspecified => panic!("codec is unspecified"), ProtocolCodec::Identity(payload_size) => @@ -528,7 +522,7 @@ fn read_payload_size(buffer: &[u8]) -> Result<(usize, usize), ReadError> { } impl Stream for Substream { - type Item = crate::Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); @@ -632,14 +626,20 @@ impl Stream for Substream { match read_payload_size(&this.size_vec[..this.offset]) { Err(ReadError::NotEnoughBytes) => continue, Err(_) => - return Poll::Ready(Some(Err(Error::InvalidData))), + return Poll::Ready(Some(Err( + SubstreamError::ReadFailure(Some( + this.substream_id, + )), + ))), Ok((size, num_bytes)) => { debug_assert_eq!(num_bytes, this.offset); if let Some(max_size) = max_size { if size > max_size { return Poll::Ready(Some(Err( - Error::InvalidData, + SubstreamError::ReadFailure(Some( + this.substream_id, + )), ))); } } @@ -663,7 +663,7 @@ impl Stream for Substream { // TODO: this code can definitely be optimized impl Sink for Substream { - type Error = Error; + type Error = SubstreamError; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // `MockSubstream` implements `Sink` so calls to `poll_ready()` must be delegated @@ -691,7 +691,7 @@ impl Sink for Substream { match self.codec { ProtocolCodec::Identity(payload_size) => { if item.len() != payload_size { - return Err(Error::IoError(ErrorKind::PermissionDenied)); + return Err(SubstreamError::IoError(ErrorKind::PermissionDenied)); } self.pending_out_bytes += item.len(); @@ -763,7 +763,7 @@ impl SubstreamSetKey for K pub struct SubstreamSet where K: SubstreamSetKey, - S: Stream> + Unpin, + S: Stream> + Unpin, { substreams: HashMap, } @@ -771,7 +771,7 @@ where impl SubstreamSet where K: SubstreamSetKey, - S: Stream> + Unpin, + S: Stream> + Unpin, { /// Create new [`SubstreamSet`]. pub fn new() -> Self { @@ -818,7 +818,7 @@ where impl Stream for SubstreamSet where K: SubstreamSetKey, - S: Stream> + Unpin, + S: Stream> + Unpin, { type Item = (K, ::Item); @@ -831,10 +831,7 @@ where Poll::Pending => continue, Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))), Poll::Ready(None) => - return Poll::Ready(Some(( - *key, - Err(Error::SubstreamError(SubstreamError::ConnectionClosed)), - ))), + return Poll::Ready(Some((*key, Err(SubstreamError::ConnectionClosed)))), } } @@ -939,7 +936,7 @@ mod tests { assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); match set.next().await { - Some((exited_peer, Err(Error::SubstreamError(SubstreamError::ConnectionClosed)))) => { + Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => { assert_eq!(peer, exited_peer); } _ => panic!("inavlid event received"), diff --git a/src/transport/manager/handle.rs b/src/transport/manager/handle.rs index 90fe7efc..0ded6406 100644 --- a/src/transport/manager/handle.rs +++ b/src/transport/manager/handle.rs @@ -21,7 +21,7 @@ use crate::{ addresses::PublicAddresses, crypto::ed25519::Keypair, - error::{AddressError, Error}, + error::ImmediateDialError, executor::Executor, protocol::ProtocolSet, transport::manager::{ @@ -232,9 +232,9 @@ impl TransportManagerHandle { /// Dial peer using `PeerId`. /// /// Returns an error if the peer is unknown or the peer is already connected. - pub fn dial(&self, peer: &PeerId) -> crate::Result<()> { + pub fn dial(&self, peer: &PeerId) -> Result<(), ImmediateDialError> { if peer == &self.local_peer_id { - return Err(Error::TriedToDialSelf); + return Err(ImmediateDialError::TriedToDialSelf); } { @@ -242,14 +242,14 @@ impl TransportManagerHandle { Some(PeerContext { state: PeerState::Connected { .. }, .. - }) => return Err(Error::AlreadyConnected), + }) => return Err(ImmediateDialError::AlreadyConnected), Some(PeerContext { state: PeerState::Disconnected { dial_record }, addresses, .. }) => { if addresses.is_empty() { - return Err(Error::NoAddressAvailable(*peer)); + return Err(ImmediateDialError::NoAddressAvailable); } // peer is already being dialed, don't dial again until the first dial concluded @@ -267,31 +267,31 @@ impl TransportManagerHandle { state: PeerState::Dialing { .. } | PeerState::Opening { .. }, .. }) => return Ok(()), - None => return Err(Error::PeerDoesntExist(*peer)), + None => return Err(ImmediateDialError::NoAddressAvailable), } } self.cmd_tx .try_send(InnerTransportManagerCommand::DialPeer { peer: *peer }) .map_err(|error| match error { - TrySendError::Full(_) => Error::ChannelClogged, - TrySendError::Closed(_) => Error::EssentialTaskClosed, + TrySendError::Full(_) => ImmediateDialError::ChannelClogged, + TrySendError::Closed(_) => ImmediateDialError::TaskClosed, }) } /// Dial peer using `Multiaddr`. /// /// Returns an error if address it not valid. - pub fn dial_address(&self, address: Multiaddr) -> crate::Result<()> { + pub fn dial_address(&self, address: Multiaddr) -> Result<(), ImmediateDialError> { if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) { - return Err(Error::AddressError(AddressError::PeerIdMissing)); + return Err(ImmediateDialError::PeerIdMissing); } self.cmd_tx .try_send(InnerTransportManagerCommand::DialAddress { address }) .map_err(|error| match error { - TrySendError::Full(_) => Error::ChannelClogged, - TrySendError::Closed(_) => Error::EssentialTaskClosed, + TrySendError::Full(_) => ImmediateDialError::ChannelClogged, + TrySendError::Closed(_) => ImmediateDialError::TaskClosed, }) } } @@ -468,7 +468,7 @@ mod tests { }; match handle.dial(&peer) { - Err(Error::AlreadyConnected) => {} + Err(ImmediateDialError::AlreadyConnected) => {} _ => panic!("invalid return value"), } } @@ -537,12 +537,8 @@ mod tests { peer }; - match handle.dial(&peer) { - Err(Error::NoAddressAvailable(failed_peer)) => { - assert_eq!(failed_peer, peer); - } - _ => panic!("invalid return value"), - } + let err = handle.dial(&peer).unwrap_err(); + assert!(matches!(err, ImmediateDialError::NoAddressAvailable)); } #[tokio::test] @@ -595,10 +591,9 @@ mod tests { let (mut handle, mut rx) = make_transport_manager_handle(); handle.supported_transport.insert(SupportedTransport::Tcp); - match handle.dial(&handle.local_peer_id) { - Err(Error::TriedToDialSelf) => {} - _ => panic!("invalid return value"), - } + let err = handle.dial(&handle.local_peer_id).unwrap_err(); + assert_eq!(err, ImmediateDialError::TriedToDialSelf); + assert!(rx.try_recv().is_err()); } diff --git a/src/transport/quic/substream.rs b/src/transport/quic/substream.rs index 19da25c0..201b6993 100644 --- a/src/transport/quic/substream.rs +++ b/src/transport/quic/substream.rs @@ -18,10 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - error::{Error, SubstreamError}, - BandwidthSink, -}; +use crate::{error::SubstreamError, BandwidthSink}; use bytes::Bytes; use futures::{AsyncRead, AsyncWrite}; @@ -63,14 +60,14 @@ impl Substream { } /// Write `buffers` to the underlying socket. - pub async fn write_all_chunks(&mut self, buffers: &mut [Bytes]) -> crate::Result<()> { + pub async fn write_all_chunks(&mut self, buffers: &mut [Bytes]) -> Result<(), SubstreamError> { let nwritten = buffers.iter().fold(0usize, |acc, buffer| acc + buffer.len()); match self .send_stream .write_all_chunks(buffers) .await - .map_err(|_| Error::SubstreamError(SubstreamError::ConnectionClosed)) + .map_err(|_| SubstreamError::ConnectionClosed) { Ok(()) => { self.bandwidth_sink.increase_outbound(nwritten); diff --git a/src/yamux/error.rs b/src/yamux/error.rs index fb729457..672e4ad9 100644 --- a/src/yamux/error.rs +++ b/src/yamux/error.rs @@ -26,6 +26,19 @@ pub enum ConnectionError { TooManyStreams, } +impl PartialEq for ConnectionError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ConnectionError::Io(e1), ConnectionError::Io(e2)) => e1.kind() == e2.kind(), + (ConnectionError::Decode(e1), ConnectionError::Decode(e2)) => e1 == e2, + (ConnectionError::NoMoreStreamIds, ConnectionError::NoMoreStreamIds) + | (ConnectionError::Closed, ConnectionError::Closed) + | (ConnectionError::TooManyStreams, ConnectionError::TooManyStreams) => true, + _ => false, + } + } +} + impl std::fmt::Display for ConnectionError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { diff --git a/src/yamux/frame/header.rs b/src/yamux/frame/header.rs index aaf825a7..cad9ed64 100644 --- a/src/yamux/frame/header.rs +++ b/src/yamux/frame/header.rs @@ -388,7 +388,7 @@ pub fn decode(buf: &[u8; HEADER_SIZE]) -> Result, HeaderDecodeError> /// Possible errors while decoding a message frame header. #[non_exhaustive] -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum HeaderDecodeError { /// Unknown version. Version(u8), diff --git a/src/yamux/frame/io.rs b/src/yamux/frame/io.rs index a0c67445..ba799016 100644 --- a/src/yamux/frame/io.rs +++ b/src/yamux/frame/io.rs @@ -294,6 +294,17 @@ pub enum FrameDecodeError { FrameTooLarge(usize), } +impl PartialEq for FrameDecodeError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (FrameDecodeError::Io(e1), FrameDecodeError::Io(e2)) => e1.kind() == e2.kind(), + (FrameDecodeError::Header(e1), FrameDecodeError::Header(e2)) => e1 == e2, + (FrameDecodeError::FrameTooLarge(n1), FrameDecodeError::FrameTooLarge(n2)) => n1 == n2, + _ => false, + } + } +} + impl std::fmt::Display for FrameDecodeError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { diff --git a/tests/protocol/request_response.rs b/tests/protocol/request_response.rs index 39d7e046..2acb5b42 100644 --- a/tests/protocol/request_response.rs +++ b/tests/protocol/request_response.rs @@ -22,8 +22,8 @@ use litep2p::{ config::ConfigBuilder as Litep2pConfigBuilder, crypto::ed25519::Keypair, protocol::request_response::{ - Config as RequestResponseConfig, ConfigBuilder, DialOptions, RequestResponseError, - RequestResponseEvent, + Config as RequestResponseConfig, ConfigBuilder, DialOptions, RejectReason, + RequestResponseError, RequestResponseEvent, }, transport::tcp::config::Config as TcpConfig, types::{protocol::ProtocolName, RequestId}, @@ -314,7 +314,7 @@ async fn reject_request(transport1: Transport, transport2: Transport) { RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::SubstreamClosed) } ); } @@ -789,7 +789,7 @@ async fn connection_close_while_request_is_pending(transport1: Transport, transp RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed), } ); } @@ -1005,7 +1005,7 @@ async fn response_too_big(transport1: Transport, transport2: Transport) { RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::SubstreamClosed), } ); } @@ -1569,7 +1569,9 @@ async fn dial_peer_but_no_known_address(transport1: Transport, transport2: Trans RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::DialFailed(Some( + litep2p::error::ImmediateDialError::NoAddressAvailable + ))), } ); } @@ -1916,7 +1918,7 @@ async fn excess_inbound_request_rejected(transport1: Transport, transport2: Tran RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::SubstreamClosed) } ); } @@ -2350,7 +2352,7 @@ async fn dial_failure(transport: Transport) { RequestResponseEvent::RequestFailed { peer, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::DialFailed(None)) } ); } diff --git a/tests/substream.rs b/tests/substream.rs index efcf5011..01a0fb79 100644 --- a/tests/substream.rs +++ b/tests/substream.rs @@ -21,6 +21,7 @@ use litep2p::{ codec::ProtocolCodec, config::ConfigBuilder, + error::SubstreamError, protocol::{Direction, TransportEvent, TransportService, UserProtocol}, substream::{Substream, SubstreamSet}, transport::tcp::config::Config as TcpConfig, @@ -156,7 +157,7 @@ impl UserProtocol for CustomProtocol { } Some(mut substream) => { let payload = Bytes::from(payload); - let res = substream.send_framed(payload).await; + let res = substream.send_framed(payload).await.map_err(Into::into); tx.send(res).unwrap(); let _ = substream.close().await; } @@ -169,7 +170,7 @@ impl UserProtocol for CustomProtocol { } Some(mut substream) => { let payload = Bytes::from(payload); - let res = substream.send(payload).await; + let res = substream.send(payload).await.map_err(Into::into); tx.send(res).unwrap(); let _ = substream.close().await; } @@ -325,6 +326,7 @@ async fn too_big_identity_payload_framed(transport1: Transport, transport2: Tran match rx.await { Ok(Err(Error::IoError(ErrorKind::PermissionDenied))) => {} + Ok(Err(Error::SubstreamError(SubstreamError::IoError(ErrorKind::PermissionDenied)))) => {} event => panic!("invalid event received: {event:?}"), } } @@ -411,12 +413,15 @@ async fn too_big_identity_payload_sink(transport1: Transport, transport2: Transp panic!("failed to open substream"); }; - // send too large paylod to peer + // send too large payload to peer let (tx, rx) = oneshot::channel(); tx1.send(Command::SendPayloadSink(peer2, vec![0u8; 16], tx)).await.unwrap(); match rx.await { Ok(Err(Error::IoError(ErrorKind::PermissionDenied))) => {} + Ok(Err(Error::SubstreamError(SubstreamError::IoError( + ErrorKind::PermissionDenied, + )))) => {} event => panic!("invalid event received: {event:?}"), } }