Skip to content

Commit

Permalink
errors: Introduce immediate dial error and request-response rejection…
Browse files Browse the repository at this point in the history
… reasons (#227)

This PR makes several changes to the errors in general, with the main
goal of extracting the reject-reason from the request response
protocols:
- an `ImmediateDial` error is introduced for failing to dial peers due
to internal errors (no addresses available, tried to dial self, already
connected etc) to distinguish between network dial failures
- opening substreams now return a `SubstreamError` instead of using the
overarching litep2p::Error
- substreams are now implementing `Stream<Error = SubstreamError>` for
consistency with opening substreams
- Reject reasons include immediate dialing errors. Ideally, we could
expose the same level of information that is exposed via
`ListDialErrors` (provide a tuple of addresses with individual dial
errors), however that would require a bigger refactor to the code. For
now this information is enough for Substrate metrics to provide more
information and align with litep2p metrics.

This PR is part of a bigger effort to simply the overarching error enum:
- #204

Closes: #188

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Sep 5, 2024
1 parent 25ddf91 commit 69d7ad7
Show file tree
Hide file tree
Showing 27 changed files with 331 additions and 155 deletions.
67 changes: 62 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub enum Error {
ConnectionDoesntExist(ConnectionId),
#[error("Exceeded connection limits `{0:?}`")]
ConnectionLimit(ConnectionLimitsError),
#[error("Failed to dial peer immediately")]
ImmediateDialError(#[from] ImmediateDialError),
}

/// Error type for address parsing.
Expand All @@ -150,7 +152,7 @@ pub enum AddressError {
InvalidPeerId(Multihash),
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ParseError {
/// The provided probuf message cannot be decoded.
#[error("Failed to decode protobuf message: `{0:?}`")]
Expand Down Expand Up @@ -180,10 +182,16 @@ pub enum ParseError {
InvalidData,
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum SubstreamError {
#[error("Connection closed")]
ConnectionClosed,
#[error("Connection channel clogged")]
ChannelClogged,
#[error("Connection to peer does not exist: `{0}`")]
PeerDoesNotExist(PeerId),
#[error("I/O error: `{0}`")]
IoError(ErrorKind),
#[error("yamux error: `{0}`")]
YamuxError(crate::yamux::ConnectionError, Direction),
#[error("Failed to read from substream, substream id `{0:?}`")]
Expand Down Expand Up @@ -232,6 +240,25 @@ pub enum NegotiationError {
WebSocket(#[from] tokio_tungstenite::tungstenite::error::Error),
}

impl PartialEq for NegotiationError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::MultistreamSelectError(lhs), Self::MultistreamSelectError(rhs)) => lhs == rhs,
(Self::SnowError(lhs), Self::SnowError(rhs)) => lhs == rhs,
(Self::ParseError(lhs), Self::ParseError(rhs)) => lhs == rhs,
(Self::IoError(lhs), Self::IoError(rhs)) => lhs == rhs,
(Self::PeerIdMismatch(lhs, lhs_1), Self::PeerIdMismatch(rhs, rhs_1)) =>
lhs == rhs && lhs_1 == rhs_1,
#[cfg(feature = "quic")]
(Self::Quic(lhs), Self::Quic(rhs)) => lhs == rhs,
#[cfg(feature = "websocket")]
(Self::WebSocket(lhs), Self::WebSocket(rhs)) =>
core::mem::discriminant(lhs) == core::mem::discriminant(rhs),
_ => core::mem::discriminant(self) == core::mem::discriminant(other),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum NotificationError {
#[error("Peer already exists")]
Expand All @@ -246,7 +273,8 @@ pub enum NotificationError {

/// The error type for dialing a peer.
///
/// This error is reported via the litep2p events.
/// This error is reported via the litep2p events after performing
/// a network dialing operation.
#[derive(Debug, thiserror::Error)]
pub enum DialError {
/// The dialing operation timed out.
Expand All @@ -269,9 +297,32 @@ pub enum DialError {
NegotiationError(#[from] NegotiationError),
}

/// Dialing resulted in an immediate error before performing any network operations.
#[derive(Debug, thiserror::Error, Copy, Clone, Eq, PartialEq)]
pub enum ImmediateDialError {
/// The provided address does not include a peer ID.
#[error("`PeerId` missing from the address")]
PeerIdMissing,
/// The peer ID provided in the address is the same as the local peer ID.
#[error("Tried to dial self")]
TriedToDialSelf,
/// Cannot dial an already connected peer.
#[error("Already connected to peer")]
AlreadyConnected,
/// Cannot dial a peer that does not have any address available.
#[error("No address available for peer")]
NoAddressAvailable,
/// The essential task was closed.
#[error("TaskClosed")]
TaskClosed,
/// The channel is clogged.
#[error("Connection channel clogged")]
ChannelClogged,
}

/// Error during the QUIC transport negotiation.
#[cfg(feature = "quic")]
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum QuicError {
/// The provided certificate is invalid.
#[error("Invalid certificate")]
Expand All @@ -285,7 +336,7 @@ pub enum QuicError {
}

/// Error during DNS resolution.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum DnsError {
/// The DNS resolution failed to resolve the provided URL.
#[error("DNS failed to resolve url `{0}`")]
Expand All @@ -309,6 +360,12 @@ impl From<io::Error> for Error {
}
}

impl From<io::Error> for SubstreamError {
fn from(error: io::Error) -> SubstreamError {
SubstreamError::IoError(error.kind())
}
}

impl From<io::Error> for DialError {
fn from(error: io::Error) -> Self {
DialError::NegotiationError(NegotiationError::IoError(error.kind()))
Expand Down
50 changes: 32 additions & 18 deletions src/mock/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::error::Error;
use crate::error::SubstreamError;

use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
Expand All @@ -31,15 +31,20 @@ use std::{

/// Trait which describes the behavior of a mock substream.
pub trait Substream:
Debug + Stream<Item = crate::Result<BytesMut>> + Sink<Bytes, Error = Error> + Send + Unpin + 'static
Debug
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static
{
}

/// Blanket implementation for [`Substream`].
impl<
T: Debug
+ Stream<Item = crate::Result<BytesMut>>
+ Sink<Bytes, Error = Error>
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static,
Expand All @@ -52,33 +57,33 @@ mockall::mock! {
pub Substream {}

impl Sink<bytes::Bytes> for Substream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), Error>;
fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), SubstreamError>;

fn poll_flush<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn poll_close<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;
}

impl Stream for Substream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Option<crate::Result<BytesMut>>>;
) -> Poll<Option<Result<BytesMut, SubstreamError>>>;
}
}

Expand All @@ -95,32 +100,41 @@ impl DummySubstream {
}

impl Sink<bytes::Bytes> for DummySubstream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_ready<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), Error> {
fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), SubstreamError> {
Ok(())
}

fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_flush<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_close<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Ready(Ok(()))
}
}

impl Stream for DummySubstream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Option<crate::Result<BytesMut>>> {
) -> Poll<Option<Result<BytesMut, SubstreamError>>> {
Poll::Pending
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/multistream_select/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ where
}

/// Error that can happen when negotiating a protocol with the remote.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum NegotiationError {
/// A protocol error occurred during the negotiation.
#[error("A protocol error occurred during the negotiation: `{0:?}`")]
Expand Down
9 changes: 9 additions & 0 deletions src/multistream_select/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,15 @@ pub enum ProtocolError {
ProtocolNotSupported,
}

impl PartialEq for ProtocolError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ProtocolError::IoError(lhs), ProtocolError::IoError(rhs)) => lhs.kind() == rhs.kind(),
_ => std::mem::discriminant(self) == std::mem::discriminant(other),
}
}
}

impl From<ProtocolError> for io::Error {
fn from(err: ProtocolError) -> Self {
if let ProtocolError::IoError(e) = err {
Expand Down
12 changes: 6 additions & 6 deletions src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! Connection-related helper code.
use crate::{
error::Error,
error::{Error, SubstreamError},
protocol::protocol_set::ProtocolCommand,
types::{protocol::ProtocolName, ConnectionId, SubstreamId},
};
Expand Down Expand Up @@ -110,11 +110,11 @@ impl ConnectionHandle {
fallback_names: Vec<ProtocolName>,
substream_id: SubstreamId,
permit: Permit,
) -> crate::Result<()> {
) -> Result<(), SubstreamError> {
match &self.connection {
ConnectionType::Active(active) => active.clone(),
ConnectionType::Inactive(inactive) =>
inactive.upgrade().ok_or(Error::ConnectionClosed)?,
inactive.upgrade().ok_or(SubstreamError::ConnectionClosed)?,
}
.try_send(ProtocolCommand::OpenSubstream {
protocol: protocol.clone(),
Expand All @@ -123,8 +123,8 @@ impl ConnectionHandle {
permit,
})
.map_err(|error| match error {
TrySendError::Full(_) => Error::ChannelClogged,
TrySendError::Closed(_) => Error::ConnectionClosed,
TrySendError::Full(_) => SubstreamError::ChannelClogged,
TrySendError::Closed(_) => SubstreamError::ConnectionClosed,
})
}

Expand Down Expand Up @@ -236,7 +236,7 @@ mod tests {
SubstreamId::new(),
permit,
) {
Err(Error::ChannelClogged) => {}
Err(SubstreamError::ChannelClogged) => {}
error => panic!("invalid error: {error:?}"),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl Identify {
return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
substream_id,
)))),
Ok(Some(Err(error))) => return Err(error),
Ok(Some(Err(error))) => return Err(error.into()),
Ok(Some(Ok(payload))) => payload,
};

Expand Down
18 changes: 8 additions & 10 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,9 @@ mod tests {
let mut executor = QueryExecutor::new();
let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.read_message(
peer,
Expand Down Expand Up @@ -294,10 +293,9 @@ mod tests {
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Ok(()));
substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(())));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.send_request_read_response(
peer,
Expand Down Expand Up @@ -330,7 +328,7 @@ mod tests {
substream
.expect_poll_ready()
.times(1)
.return_once(|_| Poll::Ready(Err(crate::Error::Unknown)));
.return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed)));
substream.expect_poll_close().times(1).return_once(|_| Poll::Ready(Ok(())));

executor.send_request_read_response(
Expand Down Expand Up @@ -393,7 +391,7 @@ mod tests {
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
.return_once(|_| Poll::Ready(Some(Err(crate::error::SubstreamError::ChannelClogged))));

executor.read_message(
peer,
Expand Down
Loading

0 comments on commit 69d7ad7

Please sign in to comment.