From 875f4ef65f0b6f68add937576e9e0687bad2ddd2 Mon Sep 17 00:00:00 2001 From: Anthony Ramine Date: Mon, 30 Aug 2021 10:01:06 +0200 Subject: [PATCH] Refactor errors (fixes #530) h2::Error now knows whether protocol errors happened because the user sent them, because it was received from the remote peer, or because the library itself emitted an error because it detected a protocol violation. It also keeps track of whether it came from a RST_STREAM or GO_AWAY frame, and in the case of the latter, it includes the additional debug data if any. --- src/client.rs | 11 +-- src/codec/error.rs | 49 +--------- src/codec/framed_read.rs | 61 +++++------- src/codec/mod.rs | 5 +- src/error.rs | 70 ++++++++++---- src/frame/go_away.rs | 3 +- src/frame/reset.rs | 2 +- src/lib.rs | 8 +- src/proto/connection.rs | 109 +++++++++++----------- src/proto/error.rs | 90 ++++++++++++------ src/proto/go_away.rs | 16 +++- src/proto/mod.rs | 2 +- src/proto/peer.rs | 13 ++- src/proto/settings.rs | 8 +- src/proto/streams/prioritize.rs | 5 +- src/proto/streams/recv.rs | 99 ++++++++------------ src/proto/streams/send.rs | 37 +++++--- src/proto/streams/state.rs | 103 ++++++++++----------- src/proto/streams/streams.rs | 123 +++++++++++++------------ src/server.rs | 20 ++-- tests/h2-support/src/mock.rs | 5 +- tests/h2-tests/tests/client_request.rs | 8 +- tests/h2-tests/tests/codec_read.rs | 2 +- tests/h2-tests/tests/flow_control.rs | 8 +- tests/h2-tests/tests/push_promise.rs | 26 ++++-- tests/h2-tests/tests/stream_states.rs | 15 ++- 26 files changed, 465 insertions(+), 433 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5bbbaf499..9c211ab32 100644 --- a/src/client.rs +++ b/src/client.rs @@ -135,9 +135,9 @@ //! [`Builder`]: struct.Builder.html //! [`Error`]: ../struct.Error.html -use crate::codec::{Codec, RecvError, SendError, UserError}; +use crate::codec::{Codec, SendError, UserError}; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; -use crate::proto; +use crate::proto::{self, Error}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes}; @@ -1493,7 +1493,7 @@ impl proto::Peer for Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { let mut b = Response::builder(); b = b.version(Version::HTTP_2); @@ -1507,10 +1507,7 @@ impl proto::Peer for Peer { Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); } }; diff --git a/src/codec/error.rs b/src/codec/error.rs index f505eb0f5..0acb913e5 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -1,26 +1,12 @@ -use crate::frame::{Reason, StreamId}; +use crate::proto::Error; use std::{error, fmt, io}; -/// Errors that are received -#[derive(Debug)] -pub enum RecvError { - Connection(Reason), - Stream { id: StreamId, reason: Reason }, - Io(io::Error), -} - /// Errors caused by sending a message #[derive(Debug)] pub enum SendError { - /// User error + Connection(Error), User(UserError), - - /// Connection error prevents sending. - Connection(Reason), - - /// I/O error - Io(io::Error), } /// Errors caused by users of the library @@ -65,47 +51,22 @@ pub enum UserError { PeerDisabledServerPush, } -// ===== impl RecvError ===== - -impl From for RecvError { - fn from(src: io::Error) -> Self { - RecvError::Io(src) - } -} - -impl error::Error for RecvError {} - -impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::RecvError::*; - - match *self { - Connection(ref reason) => reason.fmt(fmt), - Stream { ref reason, .. } => reason.fmt(fmt), - Io(ref e) => e.fmt(fmt), - } - } -} - // ===== impl SendError ===== impl error::Error for SendError {} impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::SendError::*; - match *self { - User(ref e) => e.fmt(fmt), - Connection(ref reason) => reason.fmt(fmt), - Io(ref e) => e.fmt(fmt), + Self::Connection(ref e) => e.fmt(fmt), + Self::User(ref e) => e.fmt(fmt), } } } impl From for SendError { fn from(src: io::Error) -> Self { - SendError::Io(src) + Self::Connection(src.into()) } } diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 9673c49a8..7c3bbb3ba 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -1,8 +1,8 @@ -use crate::codec::RecvError; use crate::frame::{self, Frame, Kind, Reason}; use crate::frame::{ DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE, }; +use crate::proto::Error; use crate::hpack; @@ -98,8 +98,7 @@ fn decode_frame( max_header_list_size: usize, partial_inout: &mut Option, mut bytes: BytesMut, -) -> Result, RecvError> { - use self::RecvError::*; +) -> Result, Error> { let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len()); let _e = span.enter(); @@ -110,7 +109,7 @@ fn decode_frame( if partial_inout.is_some() && head.kind() != Kind::Continuation { proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind()); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } let kind = head.kind(); @@ -131,14 +130,11 @@ fn decode_frame( // A stream cannot depend on itself. An endpoint MUST // treat this as a stream error (Section 5.4.2) of type // `PROTOCOL_ERROR`. - return Err(Stream { - id: $head.stream_id(), - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR)); }, Err(e) => { proto_err!(conn: "failed to load frame; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -151,14 +147,11 @@ fn decode_frame( Err(frame::Error::MalformedMessage) => { let id = $head.stream_id(); proto_err!(stream: "malformed header block; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); }, Err(e) => { proto_err!(conn: "failed HPACK decoding; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } @@ -183,7 +176,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -192,7 +185,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load PING frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -201,7 +194,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -212,7 +205,7 @@ fn decode_frame( // TODO: Should this always be connection level? Probably not... res.map_err(|e| { proto_err!(conn: "failed to load DATA frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -221,7 +214,7 @@ fn decode_frame( let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load RESET frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -229,7 +222,7 @@ fn decode_frame( let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -238,7 +231,7 @@ fn decode_frame( if head.stream_id() == 0 { // Invalid stream identifier proto_err!(conn: "invalid stream ID 0"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) { @@ -249,14 +242,11 @@ fn decode_frame( // `PROTOCOL_ERROR`. let id = head.stream_id(); proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); } Err(e) => { proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } } @@ -267,14 +257,14 @@ fn decode_frame( Some(partial) => partial, None => { proto_err!(conn: "received unexpected CONTINUATION frame"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } }; // The stream identifiers must match if partial.frame.stream_id() != head.stream_id() { proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } // Extend the buf @@ -297,7 +287,7 @@ fn decode_frame( // the attacker to go away. if partial.buf.len() + bytes.len() > max_header_list_size { proto_err!(conn: "CONTINUATION frame header block size over ignorable limit"); - return Err(Connection(Reason::COMPRESSION_ERROR)); + return Err(Error::library_go_away(Reason::COMPRESSION_ERROR).into()); } } partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); @@ -312,14 +302,11 @@ fn decode_frame( Err(frame::Error::MalformedMessage) => { let id = head.stream_id(); proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); } Err(e) => { proto_err!(conn: "failed HPACK decoding; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } @@ -343,7 +330,7 @@ impl Stream for FramedRead where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let span = tracing::trace_span!("FramedRead::poll_next"); @@ -371,11 +358,11 @@ where } } -fn map_err(err: io::Error) -> RecvError { +fn map_err(err: io::Error) -> Error { if let io::ErrorKind::InvalidData = err.kind() { if let Some(custom) = err.get_ref() { if custom.is::() { - return RecvError::Connection(Reason::FRAME_SIZE_ERROR); + return Error::library_go_away(Reason::FRAME_SIZE_ERROR); } } } diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 7d0ab73d8..359adf6e4 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -2,12 +2,13 @@ mod error; mod framed_read; mod framed_write; -pub use self::error::{RecvError, SendError, UserError}; +pub use self::error::{SendError, UserError}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use crate::frame::{self, Data, Frame}; +use crate::proto::Error; use bytes::Buf; use futures_core::Stream; @@ -155,7 +156,7 @@ impl Stream for Codec where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) diff --git a/src/error.rs b/src/error.rs index 372bac2ee..0421f4030 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,8 @@ use crate::codec::{SendError, UserError}; -use crate::proto; +use crate::frame::StreamId; +use crate::proto::{self, Initiator}; +use bytes::Bytes; use std::{error, fmt, io}; pub use crate::frame::Reason; @@ -22,11 +24,14 @@ pub struct Error { #[derive(Debug)] enum Kind { - /// An error caused by an action taken by the remote peer. - /// - /// This is either an error received by the peer or caused by an invalid - /// action taken by the peer (i.e. a protocol error). - Proto(Reason), + /// A RST_STREAM frame was received or sent. + Reset(StreamId, Reason, Initiator), + + /// A GO_AWAY frame was received or sent. + GoAway(Bytes, Reason, Initiator), + + /// The user created an error from a bare Reason. + Reason(Reason), /// An error resulting from an invalid action taken by the user of this /// library. @@ -45,7 +50,7 @@ impl Error { /// action taken by the peer (i.e. a protocol error). pub fn reason(&self) -> Option { match self.kind { - Kind::Proto(reason) => Some(reason), + Kind::Reset(_, reason, _) | Kind::GoAway(_, reason, _) => Some(reason), _ => None, } } @@ -87,8 +92,13 @@ impl From for Error { Error { kind: match src { - Proto(reason) => Kind::Proto(reason), - Io(e) => Kind::Io(e), + Reset(stream_id, reason, initiator) => Kind::Reset(stream_id, reason, initiator), + GoAway(debug_data, reason, initiator) => { + Kind::GoAway(debug_data, reason, initiator) + } + Io(kind, inner) => { + Kind::Io(inner.map_or_else(|| kind.into(), |inner| io::Error::new(kind, inner))) + } }, } } @@ -97,7 +107,7 @@ impl From for Error { impl From for Error { fn from(src: Reason) -> Error { Error { - kind: Kind::Proto(src), + kind: Kind::Reason(src), } } } @@ -106,8 +116,7 @@ impl From for Error { fn from(src: SendError) -> Error { match src { SendError::User(e) => e.into(), - SendError::Connection(reason) => reason.into(), - SendError::Io(e) => Error::from_io(e), + SendError::Connection(e) => e.into(), } } } @@ -122,13 +131,38 @@ impl From for Error { impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::Kind::*; - - match self.kind { - Proto(ref reason) => write!(fmt, "protocol error: {}", reason), - User(ref e) => write!(fmt, "user error: {}", e), - Io(ref e) => fmt::Display::fmt(e, fmt), + let debug_data = match self.kind { + Kind::Reset(_, reason, Initiator::User) => { + return write!(fmt, "stream error sent by user: {}", reason) + } + Kind::Reset(_, reason, Initiator::Library) => { + return write!(fmt, "stream error detected: {}", reason) + } + Kind::Reset(_, reason, Initiator::Remote) => { + return write!(fmt, "stream error received: {}", reason) + } + Kind::GoAway(ref debug_data, reason, Initiator::User) => { + write!(fmt, "connection error sent by user: {}", reason)?; + debug_data + } + Kind::GoAway(ref debug_data, reason, Initiator::Library) => { + write!(fmt, "connection error detected: {}", reason)?; + debug_data + } + Kind::GoAway(ref debug_data, reason, Initiator::Remote) => { + write!(fmt, "connection error received: {}", reason)?; + debug_data + } + Kind::Reason(reason) => return write!(fmt, "protocol error: {}", reason), + Kind::User(ref e) => return write!(fmt, "user error: {}", e), + Kind::Io(ref e) => return e.fmt(fmt), + }; + + if !debug_data.is_empty() { + write!(fmt, " ({:?})", debug_data)?; } + + Ok(()) } } diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 52dd91d4c..91d9c4c6b 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -29,8 +29,7 @@ impl GoAway { self.error_code } - #[cfg(feature = "unstable")] - pub fn debug_data(&self) -> &[u8] { + pub fn debug_data(&self) -> &Bytes { &self.debug_data } diff --git a/src/frame/reset.rs b/src/frame/reset.rs index b2613028d..39f6ac202 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -2,7 +2,7 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId}; use bytes::BufMut; -#[derive(Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Reset { stream_id: StreamId, error_code: Reason, diff --git a/src/lib.rs b/src/lib.rs index 951cd96b3..381a62a46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,8 +104,14 @@ macro_rules! ready { mod codec; mod error; mod hpack; + +#[cfg(not(feature = "unstable"))] mod proto; +#[cfg(feature = "unstable")] +#[allow(missing_docs)] +pub mod proto; + #[cfg(not(feature = "unstable"))] mod frame; @@ -125,7 +131,7 @@ pub use crate::error::{Error, Reason}; pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId}; #[cfg(feature = "unstable")] -pub use codec::{Codec, RecvError, SendError, UserError}; +pub use codec::{Codec, SendError, UserError}; use std::task::Poll; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b44fdcd5c..a75df4369 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,6 +1,6 @@ -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::frame::{Reason, StreamId}; -use crate::{client, frame, proto, server}; +use crate::{client, frame, server}; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; @@ -40,7 +40,7 @@ where /// /// This exists separately from State in order to support /// graceful shutdown. - error: Option, + error: Option, /// Pending GOAWAY frames to write. go_away: GoAway, @@ -68,7 +68,7 @@ struct DynConnection<'a, B: Buf = Bytes> { streams: DynStreams<'a, B>, - error: &'a mut Option, + error: &'a mut Option, ping_pong: &'a mut PingPong, } @@ -88,10 +88,10 @@ enum State { Open, /// The codec must be flushed - Closing(Reason), + Closing(Reason, Initiator), /// In a closed state - Closed(Reason), + Closed(Reason, Initiator), } impl Connection @@ -161,9 +161,9 @@ where /// Returns `Ready` when the connection is ready to receive a frame. /// - /// Returns `RecvError` as this may raise errors that are caused by delayed + /// Returns `Error` as this may raise errors that are caused by delayed /// processing of received frames. - fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { let _e = self.inner.span.enter(); let span = tracing::trace_span!("poll_ready"); let _e = span.enter(); @@ -191,26 +191,24 @@ where self.inner.as_dyn().go_away_from_user(e) } - fn take_error(&mut self, ours: Reason) -> Poll> { - let reason = if let Some(theirs) = self.inner.error.take() { - match (ours, theirs) { - // If either side reported an error, return that - // to the user. - (Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err, - // If both sides reported an error, give their - // error back to th user. We assume our error - // was a consequence of their error, and less - // important. - (_, theirs) => theirs, - } - } else { - ours - }; - - if reason == Reason::NO_ERROR { - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(proto::Error::Proto(reason))) + fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> { + let (debug_data, theirs) = self + .inner + .error + .take() + .as_ref() + .map_or((Bytes::new(), Reason::NO_ERROR), |frame| { + (frame.debug_data().clone(), frame.reason()) + }); + + match (ours, theirs) { + (Reason::NO_ERROR, Reason::NO_ERROR) => return Ok(()), + (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)), + // If both sides reported an error, give their + // error back to th user. We assume our error + // was a consequence of their error, and less + // important. + (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)), } } @@ -229,7 +227,7 @@ where } /// Advances the internal state of the connection. - pub fn poll(&mut self, cx: &mut Context) -> Poll> { + pub fn poll(&mut self, cx: &mut Context) -> Poll> { // XXX(eliza): cloning the span is unfortunately necessary here in // order to placate the borrow checker — `self` is mutably borrowed by // `poll2`, which means that we can't borrow `self.span` to enter it. @@ -268,20 +266,22 @@ where self.inner.as_dyn().handle_poll2_result(result)? } - State::Closing(reason) => { + State::Closing(reason, initiator) => { tracing::trace!("connection closing after flush"); // Flush/shutdown the codec ready!(self.codec.shutdown(cx))?; // Transition the state to error - self.inner.state = State::Closed(reason); + self.inner.state = State::Closed(reason, initiator); + } + State::Closed(reason, initiator) => { + return Poll::Ready(self.take_error(reason, initiator)); } - State::Closed(reason) => return self.take_error(reason), } } } - fn poll2(&mut self, cx: &mut Context) -> Poll> { + fn poll2(&mut self, cx: &mut Context) -> Poll> { // This happens outside of the loop to prevent needing to do a clock // check and then comparison of the queue possibly multiple times a // second (and thus, the clock wouldn't have changed enough to matter). @@ -300,7 +300,7 @@ where // the same error back to the user. return Poll::Ready(Ok(())); } else { - return Poll::Ready(Err(RecvError::Connection(reason))); + return Poll::Ready(Err(Error::library_go_away(reason))); } } // Only NO_ERROR should be waiting for idle @@ -384,42 +384,45 @@ where self.go_away.go_away_from_user(frame); // Notify all streams of reason we're abruptly closing. - self.streams.recv_err(&proto::Error::Proto(e)); + self.streams.handle_error(Error::user_go_away(e)); } - fn handle_poll2_result(&mut self, result: Result<(), RecvError>) -> Result<(), Error> { - use crate::codec::RecvError::*; + fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> { match result { // The connection has shutdown normally Ok(()) => { - *self.state = State::Closing(Reason::NO_ERROR); + *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library); Ok(()) } // Attempting to read a frame resulted in a connection level // error. This is handled by setting a GOAWAY frame followed by // terminating the connection. - Err(Connection(e)) => { + Err(Error::GoAway(debug_data, reason, initiator)) => { + let e = Error::GoAway(debug_data, reason, initiator); tracing::debug!(error = ?e, "Connection::poll; connection error"); // We may have already sent a GOAWAY for this error, // if so, don't send another, just flush and close up. - if let Some(reason) = self.go_away.going_away_reason() { - if reason == e { - tracing::trace!(" -> already going away"); - *self.state = State::Closing(e); - return Ok(()); - } + if self + .go_away + .going_away() + .map_or(false, |frame| frame.reason() == reason) + { + tracing::trace!(" -> already going away"); + *self.state = State::Closing(reason, initiator); + return Ok(()); } // Reset all active streams - self.streams.recv_err(&e.into()); - self.go_away_now(e); + self.streams.handle_error(e); + self.go_away_now(reason); Ok(()) } // Attempting to read a frame resulted in a stream level error. // This is handled by resetting the frame then trying to read // another frame. - Err(Stream { id, reason }) => { + Err(Error::Reset(id, reason, initiator)) => { + debug_assert_eq!(initiator, Initiator::Library); tracing::trace!(?id, ?reason, "stream error"); self.streams.send_reset(id, reason); Ok(()) @@ -428,12 +431,12 @@ where // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Io(e)) => { + Err(Error::Io(e, inner)) => { tracing::debug!(error = ?e, "Connection::poll; IO error"); - let e = e.into(); + let e = Error::Io(e, inner); // Reset all active streams - self.streams.recv_err(&e); + self.streams.handle_error(e.clone()); // Return the error Err(e) @@ -441,7 +444,7 @@ where } } - fn recv_frame(&mut self, frame: Option) -> Result { + fn recv_frame(&mut self, frame: Option) -> Result { use crate::frame::Frame::*; match frame { Some(Headers(frame)) => { @@ -471,7 +474,7 @@ where // until they are all EOS. Once they are, State should // transition to GoAway. self.streams.recv_go_away(&frame)?; - *self.error = Some(frame.reason()); + *self.error = Some(frame); } Some(Ping(frame)) => { tracing::trace!(?frame, "recv PING"); diff --git a/src/proto/error.rs b/src/proto/error.rs index c3ee20d03..197237263 100644 --- a/src/proto/error.rs +++ b/src/proto/error.rs @@ -1,53 +1,87 @@ -use crate::codec::{RecvError, SendError}; -use crate::frame::Reason; +use crate::codec::SendError; +use crate::frame::{Reason, StreamId}; +use bytes::Bytes; +use std::fmt; use std::io; /// Either an H2 reason or an I/O error -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Error { - Proto(Reason), - Io(io::Error), + Reset(StreamId, Reason, Initiator), + GoAway(Bytes, Reason, Initiator), + Io(io::ErrorKind, Option), +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Initiator { + User, + Library, + Remote, } impl Error { - /// Clone the error for internal purposes. - /// - /// `io::Error` is not `Clone`, so we only copy the `ErrorKind`. - pub(super) fn shallow_clone(&self) -> Error { + pub(crate) fn is_local(&self) -> bool { match *self { - Error::Proto(reason) => Error::Proto(reason), - Error::Io(ref io) => Error::Io(io::Error::from(io.kind())), + Self::Reset(_, _, initiator) | Self::GoAway(_, _, initiator) => initiator.is_local(), + Self::Io(..) => true, } } -} -impl From for Error { - fn from(src: Reason) -> Self { - Error::Proto(src) + pub(crate) fn user_go_away(reason: Reason) -> Self { + Self::GoAway(Bytes::new(), reason, Initiator::User) + } + + pub(crate) fn library_reset(stream_id: StreamId, reason: Reason) -> Self { + Self::Reset(stream_id, reason, Initiator::Library) + } + + pub(crate) fn library_go_away(reason: Reason) -> Self { + Self::GoAway(Bytes::new(), reason, Initiator::Library) + } + + pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self { + Self::Reset(stream_id, reason, Initiator::Remote) + } + + pub(crate) fn remote_go_away(debug_data: Bytes, reason: Reason) -> Self { + Self::GoAway(debug_data, reason, Initiator::Remote) } } -impl From for Error { - fn from(src: io::Error) -> Self { - Error::Io(src) +impl Initiator { + fn is_local(&self) -> bool { + match *self { + Self::User | Self::Library => true, + Self::Remote => false, + } } } -impl From for RecvError { - fn from(src: Error) -> RecvError { - match src { - Error::Proto(reason) => RecvError::Connection(reason), - Error::Io(e) => RecvError::Io(e), +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Reset(_, reason, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt), + Self::Io(_, Some(ref inner)) => inner.fmt(fmt), + Self::Io(kind, None) => io::Error::from(kind).fmt(fmt), } } } +impl From for Error { + fn from(src: io::ErrorKind) -> Self { + Error::Io(src.into(), None) + } +} + +impl From for Error { + fn from(src: io::Error) -> Self { + Error::Io(src.kind(), src.get_ref().map(|inner| inner.to_string())) + } +} + impl From for SendError { - fn from(src: Error) -> SendError { - match src { - Error::Proto(reason) => SendError::Connection(reason), - Error::Io(e) => SendError::Io(e), - } + fn from(src: Error) -> Self { + Self::Connection(src) } } diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 91d37b642..759427878 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -31,7 +31,7 @@ pub(super) struct GoAway { /// well, and we wouldn't want to save that here to accidentally dump in logs, /// or waste struct space.) #[derive(Debug)] -struct GoingAway { +pub(crate) struct GoingAway { /// Stores the highest stream ID of a GOAWAY that has been sent. /// /// It's illegal to send a subsequent GOAWAY with a higher ID. @@ -98,9 +98,9 @@ impl GoAway { self.is_user_initiated } - /// Return the last Reason we've sent. - pub fn going_away_reason(&self) -> Option { - self.going_away.as_ref().map(|g| g.reason) + /// Returns the going away info, if any. + pub fn going_away(&self) -> Option<&GoingAway> { + self.going_away.as_ref() } /// Returns if the connection should close now, or wait until idle. @@ -141,7 +141,7 @@ impl GoAway { return Poll::Ready(Some(Ok(reason))); } else if self.should_close_now() { - return match self.going_away_reason() { + return match self.going_away().map(|going_away| going_away.reason) { Some(reason) => Poll::Ready(Some(Ok(reason))), None => Poll::Ready(None), }; @@ -150,3 +150,9 @@ impl GoAway { Poll::Ready(None) } } + +impl GoingAway { + pub(crate) fn reason(&self) -> Reason { + self.reason + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 84fd8542e..d505e77f3 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,7 +7,7 @@ mod settings; mod streams; pub(crate) use self::connection::{Config, Connection}; -pub(crate) use self::error::Error; +pub use self::error::{Error, Initiator}; pub(crate) use self::peer::{Dyn as DynPeer, Peer}; pub(crate) use self::ping_pong::UserPings; pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams}; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 3bcc77224..d62d9e24e 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -1,7 +1,6 @@ -use crate::codec::RecvError; use crate::error::Reason; use crate::frame::{Pseudo, StreamId}; -use crate::proto::Open; +use crate::proto::{Error, Open}; use http::{HeaderMap, Request, Response}; @@ -21,7 +20,7 @@ pub(crate) trait Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result; + ) -> Result; fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); @@ -61,7 +60,7 @@ impl Dyn { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { if self.is_server() { crate::server::Peer::convert_poll_message(pseudo, fields, stream_id) .map(PollMessage::Server) @@ -72,12 +71,12 @@ impl Dyn { } /// Returns true if the remote peer can initiate a stream with the given ID. - pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), RecvError> { + pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), Error> { if self.is_server() { // Ensure that the ID is a valid client initiated ID if mode.is_push_promise() || !id.is_client_initiated() { proto_err!(conn: "cannot open stream {:?} - not client initiated", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) @@ -85,7 +84,7 @@ impl Dyn { // Ensure that the ID is a valid server initiated ID if !mode.is_push_promise() || !id.is_server_initiated() { proto_err!(conn: "cannot open stream {:?} - not server initiated", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 453292324..44f4c2df4 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,4 +1,4 @@ -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::error::Reason; use crate::frame; use crate::proto::*; @@ -40,7 +40,7 @@ impl Settings { frame: frame::Settings, codec: &mut Codec, streams: &mut Streams, - ) -> Result<(), RecvError> + ) -> Result<(), Error> where T: AsyncWrite + Unpin, B: Buf, @@ -68,7 +68,7 @@ impl Settings { // We haven't sent any SETTINGS frames to be ACKed, so // this is very bizarre! Remote is either buggy or malicious. proto_err!(conn: "received unexpected settings ack"); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } else { @@ -97,7 +97,7 @@ impl Settings { cx: &mut Context, dst: &mut Codec, streams: &mut Streams, - ) -> Poll> + ) -> Poll> where T: AsyncWrite + Unpin, B: Buf, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 77eb507db..9671d5898 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -791,7 +791,10 @@ impl Prioritize { }), None => { if let Some(reason) = stream.state.get_scheduled_reset() { - stream.state.set_reset(reason); + let stream_id = stream.id; + stream + .state + .set_reset(stream_id, reason, Initiator::Library); let frame = frame::Reset::new(stream.id, reason); Frame::Reset(frame) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 08a2fc336..be996b963 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,7 +1,7 @@ use super::*; -use crate::codec::{RecvError, UserError}; -use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use crate::{frame, proto}; +use crate::codec::UserError; +use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::proto::{self, Error}; use std::task::Context; use http::{HeaderMap, Request, Response}; @@ -68,7 +68,7 @@ pub(super) enum Event { #[derive(Debug)] pub(super) enum RecvHeaderBlockError { Oversize(T), - State(RecvError), + State(Error), } #[derive(Debug)] @@ -124,7 +124,7 @@ impl Recv { id: StreamId, mode: Open, counts: &mut Counts, - ) -> Result, RecvError> { + ) -> Result, Error> { assert!(self.refused.is_none()); counts.peer().ensure_can_open(id, mode)?; @@ -132,7 +132,7 @@ impl Recv { let next_id = self.next_stream_id()?; if id < next_id { proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } self.next_stream_id = id.next_id(); @@ -176,11 +176,7 @@ impl Recv { Ok(v) => v, Err(()) => { proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - } - .into()); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); } }; @@ -312,16 +308,13 @@ impl Recv { &mut self, frame: frame::Headers, stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { // Transition the state stream.state.recv_close()?; if stream.ensure_content_length_zero().is_err() { proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } let trailers = frame.into_fields(); @@ -455,7 +448,7 @@ impl Recv { &mut self, settings: &frame::Settings, store: &mut Store, - ) -> Result<(), RecvError> { + ) -> Result<(), proto::Error> { let target = if let Some(val) = settings.initial_window_size() { val } else { @@ -502,7 +495,7 @@ impl Recv { stream .recv_flow .inc_window(inc) - .map_err(RecvError::Connection)?; + .map_err(proto::Error::library_go_away)?; stream.recv_flow.assign_capacity(inc); Ok(()) }) @@ -520,11 +513,7 @@ impl Recv { stream.pending_recv.is_empty() } - pub fn recv_data( - &mut self, - frame: frame::Data, - stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> { let sz = frame.payload().len(); // This should have been enforced at the codec::FramedRead layer, so @@ -542,7 +531,7 @@ impl Recv { // Receiving a DATA frame when not expecting one is a protocol // error. proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } tracing::trace!( @@ -557,7 +546,7 @@ impl Recv { "recv_data; frame ignored on locally reset {:?} for some time", stream.id, ); - return self.ignore_data(sz); + return Ok(self.ignore_data(sz)?); } // Ensure that there is enough capacity on the connection before acting @@ -573,10 +562,7 @@ impl Recv { // So, for violating the **stream** window, we can send either a // stream or connection error. We've opted to send a stream // error. - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::FLOW_CONTROL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR)); } if stream.dec_content_length(frame.payload().len()).is_err() { @@ -585,10 +571,7 @@ impl Recv { stream.id, frame.payload().len(), ); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } if frame.is_end_stream() { @@ -598,15 +581,12 @@ impl Recv { stream.id, frame.payload().len(), ); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } if stream.state.recv_close().is_err() { proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } } @@ -625,7 +605,7 @@ impl Recv { Ok(()) } - pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { + pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> { // Ensure that there is enough capacity on the connection... self.consume_connection_window(sz)?; @@ -641,14 +621,14 @@ impl Recv { Ok(()) } - pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { + pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> { if self.flow.window_size() < sz { tracing::debug!( "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", self.flow.window_size(), sz, ); - return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); + return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR)); } // Update connection level flow control @@ -663,7 +643,7 @@ impl Recv { &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { stream.state.reserve_remote()?; if frame.is_over_size() { // A frame is over size if the decoded header block was bigger than @@ -682,10 +662,10 @@ impl Recv { headers frame is over size; promised_id={:?};", frame.promised_id(), ); - return Err(RecvError::Stream { - id: frame.promised_id(), - reason: Reason::REFUSED_STREAM, - }); + return Err(Error::library_reset( + frame.promised_id(), + Reason::REFUSED_STREAM, + )); } let promised_id = frame.promised_id(); @@ -708,10 +688,7 @@ impl Recv { promised_id, ), } - return Err(RecvError::Stream { - id: promised_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR)); } use super::peer::PollMessage::*; @@ -741,18 +718,16 @@ impl Recv { /// Handle remote sending an explicit RST_STREAM. pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { // Notify the stream - stream - .state - .recv_reset(frame.reason(), stream.is_pending_send); + stream.state.recv_reset(frame, stream.is_pending_send); stream.notify_send(); stream.notify_recv(); } - /// Handle a received error - pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { + /// Handle a connection-level error + pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) { // Receive an error - stream.state.recv_err(err); + stream.state.handle_error(err); // If a receiver is waiting, notify it stream.notify_send(); @@ -783,11 +758,11 @@ impl Recv { self.max_stream_id } - pub fn next_stream_id(&self) -> Result { + pub fn next_stream_id(&self) -> Result { if let Ok(id) = self.next_stream_id { Ok(id) } else { - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } @@ -802,10 +777,10 @@ impl Recv { } /// Returns true if the remote peer can reserve a stream with the given ID. - pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { + pub fn ensure_can_reserve(&self) -> Result<(), Error> { if !self.is_push_enabled { proto_err!(conn: "recv_push_promise: push is disabled"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) @@ -1092,8 +1067,8 @@ impl Open { // ===== impl RecvHeaderBlockError ===== -impl From for RecvHeaderBlockError { - fn from(err: RecvError) -> Self { +impl From for RecvHeaderBlockError { + fn from(err: Error) -> Self { RecvHeaderBlockError::State(err) } } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e8804127b..d4d64cd80 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -2,8 +2,9 @@ use super::{ store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize, }; -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::frame::{self, Reason}; +use crate::proto::{Error, Initiator}; use bytes::Buf; use http; @@ -161,6 +162,7 @@ impl Send { pub fn send_reset( &mut self, reason: Reason, + initiator: Initiator, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, @@ -169,14 +171,16 @@ impl Send { let is_reset = stream.state.is_reset(); let is_closed = stream.state.is_closed(); let is_empty = stream.pending_send.is_empty(); + let stream_id = stream.id; tracing::trace!( - "send_reset(..., reason={:?}, stream={:?}, ..., \ + "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ state={:?} \ ", reason, - stream.id, + initiator, + stream_id, is_reset, is_closed, is_empty, @@ -187,13 +191,13 @@ impl Send { // Don't double reset tracing::trace!( " -> not sending RST_STREAM ({:?} is already reset)", - stream.id + stream_id ); return; } // Transition the state to reset no matter what. - stream.state.set_reset(reason); + stream.state.set_reset(stream_id, reason, initiator); // If closed AND the send queue is flushed, then the stream cannot be // reset explicitly, either. Implicit resets can still be queued. @@ -201,7 +205,7 @@ impl Send { tracing::trace!( " -> not sending explicit RST_STREAM ({:?} was closed \ and send queue was flushed)", - stream.id + stream_id ); return; } @@ -371,7 +375,14 @@ impl Send { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { tracing::debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task); + self.send_reset( + Reason::FLOW_CONTROL_ERROR, + Initiator::Library, + buffer, + stream, + counts, + task, + ); return Err(e); } @@ -379,7 +390,7 @@ impl Send { Ok(()) } - pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), RecvError> { + pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { if last_stream_id > self.max_stream_id { // The remote endpoint sent a `GOAWAY` frame indicating a stream // that we never sent, or that we have already terminated on account @@ -392,14 +403,14 @@ impl Send { "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", last_stream_id, self.max_stream_id, ); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } self.max_stream_id = last_stream_id; Ok(()) } - pub fn recv_err( + pub fn handle_error( &mut self, buffer: &mut Buffer>, stream: &mut store::Ptr, @@ -417,7 +428,7 @@ impl Send { store: &mut Store, counts: &mut Counts, task: &mut Option, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { // Applies an update to the remote endpoint's initial window size. // // Per RFC 7540 §6.9.2: @@ -480,7 +491,7 @@ impl Send { // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - Ok::<_, RecvError>(()) + Ok::<_, Error>(()) })?; self.prioritize @@ -490,7 +501,7 @@ impl Send { store.for_each(|mut stream| { self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) - .map_err(RecvError::Connection) + .map_err(Error::library_go_away) })?; } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 3e739daf9..9931d41b1 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -1,9 +1,8 @@ use std::io; -use crate::codec::UserError::*; -use crate::codec::{RecvError, UserError}; -use crate::frame::{self, Reason}; -use crate::proto::{self, PollReset}; +use crate::codec::UserError; +use crate::frame::{self, Reason, StreamId}; +use crate::proto::{self, Error, Initiator, PollReset}; use self::Inner::*; use self::Peer::*; @@ -53,7 +52,7 @@ pub struct State { inner: Inner, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: @@ -71,12 +70,10 @@ enum Peer { Streaming, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] enum Cause { EndStream, - Proto(Reason), - LocallyReset(Reason), - Io, + Error(Error), /// This indicates to the connection that a reset frame must be sent out /// once the send queue has been flushed. @@ -85,7 +82,7 @@ enum Cause { /// - User drops all references to a stream, so we want to CANCEL the it. /// - Header block size was too large, so we want to REFUSE, possibly /// after sending a 431 response frame. - Scheduled(Reason), + ScheduledLibraryReset(Reason), } impl State { @@ -123,7 +120,7 @@ impl State { } _ => { // All other transitions result in a protocol error - return Err(UnexpectedFrameType); + return Err(UserError::UnexpectedFrameType); } }; @@ -133,7 +130,7 @@ impl State { /// Opens the receive-half of the stream when a HEADERS frame is received. /// /// Returns true if this transitions the state to Open. - pub fn recv_open(&mut self, frame: &frame::Headers) -> Result { + pub fn recv_open(&mut self, frame: &frame::Headers) -> Result { let mut initial = false; let eos = frame.is_end_stream(); @@ -195,10 +192,10 @@ impl State { HalfClosedLocal(Streaming) } } - state => { + ref state => { // All other transitions result in a protocol error proto_err!(conn: "recv_open: in unexpected state {:?}", state); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -206,15 +203,15 @@ impl State { } /// Transition from Idle -> ReservedRemote - pub fn reserve_remote(&mut self) -> Result<(), RecvError> { + pub fn reserve_remote(&mut self) -> Result<(), Error> { match self.inner { Idle => { self.inner = ReservedRemote; Ok(()) } - state => { + ref state => { proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } @@ -231,7 +228,7 @@ impl State { } /// Indicates that the remote side will not send more data to the local. - pub fn recv_close(&mut self) -> Result<(), RecvError> { + pub fn recv_close(&mut self) -> Result<(), Error> { match self.inner { Open { local, .. } => { // The remote side will continue to receive data. @@ -244,9 +241,9 @@ impl State { self.inner = Closed(Cause::EndStream); Ok(()) } - state => { + ref state => { proto_err!(conn: "recv_close: in unexpected state {:?}", state); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } @@ -254,9 +251,9 @@ impl State { /// The remote explicitly sent a RST_STREAM. /// /// # Arguments - /// - `reason`: the reason field of the received RST_STREAM frame. + /// - `frame`: the received RST_STREAM frame. /// - `queued`: true if this stream has frames in the pending send queue. - pub fn recv_reset(&mut self, reason: Reason, queued: bool) { + pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { match self.inner { // If the stream is already in a `Closed` state, do nothing, // provided that there are no frames still in the send queue. @@ -275,30 +272,28 @@ impl State { // In either of these cases, we want to overwrite the stream's // previous state with the received RST_STREAM, so that the queue // will be cleared by `Prioritize::pop_frame`. - state => { + ref state => { tracing::trace!( - "recv_reset; reason={:?}; state={:?}; queued={:?}", - reason, + "recv_reset; frame={:?}; state={:?}; queued={:?}", + frame, state, queued ); - self.inner = Closed(Cause::Proto(reason)); + self.inner = Closed(Cause::Error(Error::remote_reset( + frame.stream_id(), + frame.reason(), + ))); } } } - /// We noticed a protocol error. - pub fn recv_err(&mut self, err: &proto::Error) { - use crate::proto::Error::*; - + /// Handle a connection-level error. + pub fn handle_error(&mut self, err: &proto::Error) { match self.inner { Closed(..) => {} _ => { - tracing::trace!("recv_err; err={:?}", err); - self.inner = Closed(match *err { - Proto(reason) => Cause::LocallyReset(reason), - Io(..) => Cause::Io, - }); + tracing::trace!("handle_error; err={:?}", err); + self.inner = Closed(Cause::Error(err.clone())); } } } @@ -306,9 +301,9 @@ impl State { pub fn recv_eof(&mut self) { match self.inner { Closed(..) => {} - s => { - tracing::trace!("recv_eof; state={:?}", s); - self.inner = Closed(Cause::Io); + ref state => { + tracing::trace!("recv_eof; state={:?}", state); + self.inner = Closed(Cause::Error(io::ErrorKind::BrokenPipe.into())); } } } @@ -325,39 +320,39 @@ impl State { tracing::trace!("send_close: HalfClosedRemote => Closed"); self.inner = Closed(Cause::EndStream); } - state => panic!("send_close: unexpected state {:?}", state), + ref state => panic!("send_close: unexpected state {:?}", state), } } /// Set the stream state to reset locally. - pub fn set_reset(&mut self, reason: Reason) { - self.inner = Closed(Cause::LocallyReset(reason)); + pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { + self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); } /// Set the stream state to a scheduled reset. pub fn set_scheduled_reset(&mut self, reason: Reason) { debug_assert!(!self.is_closed()); - self.inner = Closed(Cause::Scheduled(reason)); + self.inner = Closed(Cause::ScheduledLibraryReset(reason)); } pub fn get_scheduled_reset(&self) -> Option { match self.inner { - Closed(Cause::Scheduled(reason)) => Some(reason), + Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), _ => None, } } pub fn is_scheduled_reset(&self) -> bool { match self.inner { - Closed(Cause::Scheduled(..)) => true, + Closed(Cause::ScheduledLibraryReset(..)) => true, _ => false, } } pub fn is_local_reset(&self) -> bool { match self.inner { - Closed(Cause::LocallyReset(_)) => true, - Closed(Cause::Scheduled(..)) => true, + Closed(Cause::Error(ref e)) => e.is_local(), + Closed(Cause::ScheduledLibraryReset(..)) => true, _ => false, } } @@ -436,10 +431,10 @@ impl State { pub fn ensure_recv_open(&self) -> Result { // TODO: Is this correct? match self.inner { - Closed(Cause::Proto(reason)) - | Closed(Cause::LocallyReset(reason)) - | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), - Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), + Closed(Cause::Error(ref e)) => Err(e.clone()), + Closed(Cause::ScheduledLibraryReset(reason)) => { + Err(proto::Error::library_go_away(reason)) + } Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), _ => Ok(true), } @@ -448,10 +443,10 @@ impl State { /// Returns a reason if the stream has been reset. pub(super) fn ensure_reason(&self, mode: PollReset) -> Result, crate::Error> { match self.inner { - Closed(Cause::Proto(reason)) - | Closed(Cause::LocallyReset(reason)) - | Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), - Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), + Closed(Cause::Error(Error::Reset(_, reason, _))) + | Closed(Cause::Error(Error::GoAway(_, reason, _))) + | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), + Closed(Cause::Error(ref e)) => Err(e.clone().into()), Open { local: Streaming, .. } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index e3e02c2fa..1281b11bd 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,9 +1,9 @@ use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; -use crate::codec::{Codec, RecvError, SendError, UserError}; +use crate::codec::{Codec, SendError, UserError}; use crate::frame::{self, Frame, Reason}; -use crate::proto::{peer, Open, Peer, WindowSize}; +use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize}; use crate::{client, proto, server}; use bytes::{Buf, Bytes}; @@ -180,7 +180,7 @@ where me.poll_complete(&self.send_buffer, cx, dst) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -198,7 +198,7 @@ where ) } - pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { + pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -297,30 +297,30 @@ where } impl DynStreams<'_, B> { - pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_headers(self.peer, &self.send_buffer, frame) } - pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), RecvError> { + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_data(self.peer, &self.send_buffer, frame) } - pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), RecvError> { + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_reset(&self.send_buffer, frame) } - /// Handle a received error and return the ID of the last processed stream. - pub fn recv_err(&mut self, err: &proto::Error) -> StreamId { + /// Notify all streams that a connection-level error happened. + pub fn handle_error(&mut self, err: proto::Error) -> StreamId { let mut me = self.inner.lock().unwrap(); - me.recv_err(&self.send_buffer, err) + me.handle_error(&self.send_buffer, err) } - pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), RecvError> { + pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_go_away(&self.send_buffer, frame) } @@ -329,12 +329,12 @@ impl DynStreams<'_, B> { self.inner.lock().unwrap().actions.recv.last_processed_id() } - pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> { + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_window_update(&self.send_buffer, frame) } - pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), RecvError> { + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_push_promise(&self.send_buffer, frame) } @@ -375,7 +375,7 @@ impl Inner { peer: peer::Dyn, send_buffer: &SendBuffer, frame: frame::Headers, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); // The GOAWAY process has begun. All streams with a greater ID than @@ -405,10 +405,7 @@ impl Inner { "recv_headers for old stream={:?}, sending STREAM_CLOSED", id, ); - return Err(RecvError::Stream { - id, - reason: Reason::STREAM_CLOSED, - }); + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); } } @@ -471,10 +468,7 @@ impl Inner { Ok(()) } else { - Err(RecvError::Stream { - id: stream.id, - reason: Reason::REFUSED_STREAM, - }) + Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM)) } }, Err(RecvHeaderBlockError::State(err)) => Err(err), @@ -484,10 +478,7 @@ impl Inner { // Receiving trailers that don't set EOS is a "malformed" // message. Malformed messages are a stream error. proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } actions.recv.recv_trailers(frame, stream) @@ -502,7 +493,7 @@ impl Inner { peer: peer::Dyn, send_buffer: &SendBuffer, frame: frame::Data, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let stream = match self.store.find_mut(&id) { @@ -529,14 +520,11 @@ impl Inner { let sz = sz as WindowSize; self.actions.recv.ignore_data(sz)?; - return Err(RecvError::Stream { - id, - reason: Reason::STREAM_CLOSED, - }); + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); } proto_err!(conn: "recv_data: stream not found; id={:?}", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -551,7 +539,7 @@ impl Inner { // Any stream error after receiving a DATA frame means // we won't give the data to the user, and so they can't // release the capacity. We do it automatically. - if let Err(RecvError::Stream { .. }) = res { + if let Err(Error::Reset(..)) = res { actions .recv .release_connection_capacity(sz as WindowSize, &mut None); @@ -564,12 +552,12 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::Reset, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); if id.is_zero() { proto_err!(conn: "recv_reset: invalid stream ID 0"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } // The GOAWAY process has begun. All streams with a greater ID than @@ -589,7 +577,7 @@ impl Inner { // TODO: Are there other error cases? self.actions .ensure_not_idle(self.counts.peer(), id) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; return Ok(()); } @@ -602,7 +590,7 @@ impl Inner { self.counts.transition(stream, |counts, stream| { actions.recv.recv_reset(frame, stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.send.handle_error(send_buffer, stream, counts); assert!(stream.state.is_closed()); Ok(()) }) @@ -612,7 +600,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::WindowUpdate, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -622,7 +610,7 @@ impl Inner { self.actions .send .recv_connection_window_update(frame, &mut self.store, &mut self.counts) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; } else { // The remote may send window updates for streams that the local now // considers closed. It's ok... @@ -640,14 +628,14 @@ impl Inner { } else { self.actions .ensure_not_idle(self.counts.peer(), id) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; } } Ok(()) } - fn recv_err(&mut self, send_buffer: &SendBuffer, err: &proto::Error) -> StreamId { + fn handle_error(&mut self, send_buffer: &SendBuffer, err: proto::Error) -> StreamId { let actions = &mut self.actions; let counts = &mut self.counts; let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -658,14 +646,14 @@ impl Inner { self.store .for_each(|stream| { counts.transition(stream, |counts, stream| { - actions.recv.recv_err(err, &mut *stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) }) .unwrap(); - actions.conn_error = Some(err.shallow_clone()); + actions.conn_error = Some(err); last_processed_id } @@ -674,7 +662,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: &frame::GoAway, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let actions = &mut self.actions; let counts = &mut self.counts; let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -684,14 +672,14 @@ impl Inner { actions.send.recv_go_away(last_stream_id)?; - let err = frame.reason().into(); + let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); self.store .for_each(|stream| { if stream.id > last_stream_id { counts.transition(stream, |counts, stream| { - actions.recv.recv_err(&err, &mut *stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) } else { @@ -709,7 +697,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::PushPromise, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let promised_id = frame.promised_id(); @@ -733,7 +721,7 @@ impl Inner { } None => { proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } }; @@ -826,7 +814,7 @@ impl Inner { // This handles resetting send state associated with the // stream - actions.send.recv_err(send_buffer, stream, counts); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) }) @@ -886,8 +874,13 @@ impl Inner { let stream = self.store.resolve(key); let mut send_buffer = send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - self.actions - .send_reset(stream, reason, &mut self.counts, send_buffer); + self.actions.send_reset( + stream, + reason, + Initiator::Library, + &mut self.counts, + send_buffer, + ); } } @@ -1060,7 +1053,7 @@ impl StreamRef { let send_buffer = &mut *send_buffer; me.actions - .send_reset(stream, reason, &mut me.counts, send_buffer); + .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer); } pub fn send_response( @@ -1468,12 +1461,19 @@ impl Actions { &mut self, stream: store::Ptr, reason: Reason, + initiator: Initiator, counts: &mut Counts, send_buffer: &mut Buffer>, ) { counts.transition(stream, |counts, stream| { - self.send - .send_reset(reason, send_buffer, stream, counts, &mut self.task); + self.send.send_reset( + reason, + initiator, + send_buffer, + stream, + counts, + &mut self.task, + ); self.recv.enqueue_reset_expiration(stream, counts); // if a RecvStream is parked, ensure it's notified stream.notify_recv(); @@ -1485,12 +1485,13 @@ impl Actions { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - res: Result<(), RecvError>, - ) -> Result<(), RecvError> { - if let Err(RecvError::Stream { reason, .. }) = res { + res: Result<(), Error>, + ) -> Result<(), Error> { + if let Err(Error::Reset(stream_id, reason, initiator)) = res { + debug_assert_eq!(stream_id, stream.id); // Reset the stream. self.send - .send_reset(reason, buffer, stream, counts, &mut self.task); + .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); Ok(()) } else { res @@ -1507,7 +1508,7 @@ impl Actions { fn ensure_no_conn_error(&self) -> Result<(), proto::Error> { if let Some(ref err) = self.conn_error { - Err(err.shallow_clone()) + Err(err.clone()) } else { Ok(()) } diff --git a/src/server.rs b/src/server.rs index f71315363..6662712db 100644 --- a/src/server.rs +++ b/src/server.rs @@ -115,9 +115,9 @@ //! [`SendStream`]: ../struct.SendStream.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html -use crate::codec::{Codec, RecvError, UserError}; +use crate::codec::{Codec, UserError}; use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; -use crate::proto::{self, Config, Prioritized}; +use crate::proto::{self, Config, Error, Prioritized}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes}; @@ -1202,7 +1202,7 @@ where if &PREFACE[self.pos..self.pos + n] != buf.filled() { proto_err!(conn: "read_preface: invalid preface"); // TODO: Should this just write the GO_AWAY frame directly? - return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into())); + return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into())); } self.pos += n; @@ -1388,7 +1388,7 @@ impl proto::Peer for Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { use http::{uri, Version}; let mut b = Request::builder(); @@ -1396,10 +1396,7 @@ impl proto::Peer for Peer { macro_rules! malformed { ($($arg:tt)*) => {{ tracing::debug!($($arg)*); - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); }} } @@ -1416,7 +1413,7 @@ impl proto::Peer for Peer { // Specifying :status for a request is a protocol error if pseudo.status.is_some() { tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } // Convert the URI @@ -1483,10 +1480,7 @@ impl proto::Peer for Peer { // TODO: Should there be more specialized handling for different // kinds of errors proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id); - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); } }; diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 4f81de239..b5df9ad9b 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -1,7 +1,8 @@ use crate::SendFrame; use h2::frame::{self, Frame}; -use h2::{self, RecvError, SendError}; +use h2::proto::Error; +use h2::{self, SendError}; use futures::future::poll_fn; use futures::{ready, Stream, StreamExt}; @@ -284,7 +285,7 @@ impl Handle { } impl Stream for Handle { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.codec).poll_next(cx) diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index b574df5aa..23ddc1f36 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -410,7 +410,11 @@ async fn send_reset_notifies_recv_stream() { }; let rx = async { let mut body = res.into_body(); - body.next().await.unwrap().expect_err("RecvBody"); + let err = body.next().await.unwrap().expect_err("RecvBody"); + assert_eq!( + err.to_string(), + "stream error sent by user: refused stream before processing any application logic" + ); }; // a FuturesUnordered is used on purpose! @@ -672,7 +676,7 @@ async fn sending_request_on_closed_connection() { }; let poll_err = poll_fn(|cx| client.poll_ready(cx)).await.unwrap_err(); - let msg = "protocol error: unspecific protocol error detected"; + let msg = "connection error detected: unspecific protocol error detected"; assert_eq!(poll_err.to_string(), msg); let request = Request::builder() diff --git a/tests/h2-tests/tests/codec_read.rs b/tests/h2-tests/tests/codec_read.rs index fe3cfea97..d955e186b 100644 --- a/tests/h2-tests/tests/codec_read.rs +++ b/tests/h2-tests/tests/codec_read.rs @@ -236,7 +236,7 @@ async fn read_goaway_with_debug_data() { let data = poll_frame!(GoAway, codec); assert_eq!(data.reason(), Reason::ENHANCE_YOUR_CALM); assert_eq!(data.last_stream_id(), 1); - assert_eq!(data.debug_data(), b"too_many_pings"); + assert_eq!(&**data.debug_data(), b"too_many_pings"); assert_closed!(codec); } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 1b86cadb2..be04a61b7 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -217,7 +217,7 @@ async fn recv_data_overflows_connection_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "connection error detected: flow-control protocol violated" ); }; @@ -227,7 +227,7 @@ async fn recv_data_overflows_connection_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "connection error detected: flow-control protocol violated" ); }; join(conn, req).await; @@ -278,7 +278,7 @@ async fn recv_data_overflows_stream_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "stream error detected: flow-control protocol violated" ); }; @@ -358,7 +358,7 @@ async fn stream_error_release_connection_capacity() { .expect_err("body"); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "stream error detected: unspecific protocol error detected" ); cap.release_capacity(to_release).expect("release_capacity"); }; diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index a5a7dfe97..f52f781d5 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -164,7 +164,7 @@ async fn recv_push_when_push_disabled_is_conn_error() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -174,7 +174,7 @@ async fn recv_push_when_push_disabled_is_conn_error() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -380,8 +380,16 @@ async fn recv_push_promise_skipped_stream_id() { .unwrap(); let req = async move { - let res = client.send_request(request, true).unwrap().0.await; - assert!(res.is_err()); + let err = client + .send_request(request, true) + .unwrap() + .0 + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "connection error detected: unspecific protocol error detected" + ); }; // client should see a protocol error @@ -390,7 +398,7 @@ async fn recv_push_promise_skipped_stream_id() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -435,7 +443,11 @@ async fn recv_push_promise_dup_stream_id() { let req = async move { let res = client.send_request(request, true).unwrap().0.await; - assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "connection error detected: unspecific protocol error detected" + ); }; // client should see a protocol error @@ -444,7 +456,7 @@ async fn recv_push_promise_dup_stream_id() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index cd2644d06..14b70f6d8 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -207,13 +207,19 @@ async fn errors_if_recv_frame_exceeds_max_frame_size() { let body = resp.into_parts().1; let res = util::concat(body).await; let err = res.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: frame with invalid size"); + assert_eq!( + err.to_string(), + "connection error detected: frame with invalid size" + ); }; // client should see a conn error let conn = async move { let err = h2.await.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: frame with invalid size"); + assert_eq!( + err.to_string(), + "connection error detected: frame with invalid size" + ); }; join(conn, req).await; }; @@ -321,7 +327,10 @@ async fn recv_goaway_finishes_processed_streams() { // this request will trigger a goaway let req2 = async move { let err = client.get("https://example.com/").await.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: not a result of an error"); + assert_eq!( + err.to_string(), + "go away from remote: not a result of an error" + ); }; join3(async move { h2.await.expect("client") }, req1, req2).await;