diff --git a/src/client.rs b/src/client.rs index 5bbbaf49..9c211ab3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -135,9 +135,9 @@ //! [`Builder`]: struct.Builder.html //! [`Error`]: ../struct.Error.html -use crate::codec::{Codec, RecvError, SendError, UserError}; +use crate::codec::{Codec, SendError, UserError}; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; -use crate::proto; +use crate::proto::{self, Error}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes}; @@ -1493,7 +1493,7 @@ impl proto::Peer for Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { let mut b = Response::builder(); b = b.version(Version::HTTP_2); @@ -1507,10 +1507,7 @@ impl proto::Peer for Peer { Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); } }; diff --git a/src/codec/error.rs b/src/codec/error.rs index f505eb0f..0acb913e 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -1,26 +1,12 @@ -use crate::frame::{Reason, StreamId}; +use crate::proto::Error; use std::{error, fmt, io}; -/// Errors that are received -#[derive(Debug)] -pub enum RecvError { - Connection(Reason), - Stream { id: StreamId, reason: Reason }, - Io(io::Error), -} - /// Errors caused by sending a message #[derive(Debug)] pub enum SendError { - /// User error + Connection(Error), User(UserError), - - /// Connection error prevents sending. - Connection(Reason), - - /// I/O error - Io(io::Error), } /// Errors caused by users of the library @@ -65,47 +51,22 @@ pub enum UserError { PeerDisabledServerPush, } -// ===== impl RecvError ===== - -impl From for RecvError { - fn from(src: io::Error) -> Self { - RecvError::Io(src) - } -} - -impl error::Error for RecvError {} - -impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::RecvError::*; - - match *self { - Connection(ref reason) => reason.fmt(fmt), - Stream { ref reason, .. } => reason.fmt(fmt), - Io(ref e) => e.fmt(fmt), - } - } -} - // ===== impl SendError ===== impl error::Error for SendError {} impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::SendError::*; - match *self { - User(ref e) => e.fmt(fmt), - Connection(ref reason) => reason.fmt(fmt), - Io(ref e) => e.fmt(fmt), + Self::Connection(ref e) => e.fmt(fmt), + Self::User(ref e) => e.fmt(fmt), } } } impl From for SendError { fn from(src: io::Error) -> Self { - SendError::Io(src) + Self::Connection(src.into()) } } diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 9673c49a..7c3bbb3b 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -1,8 +1,8 @@ -use crate::codec::RecvError; use crate::frame::{self, Frame, Kind, Reason}; use crate::frame::{ DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE, }; +use crate::proto::Error; use crate::hpack; @@ -98,8 +98,7 @@ fn decode_frame( max_header_list_size: usize, partial_inout: &mut Option, mut bytes: BytesMut, -) -> Result, RecvError> { - use self::RecvError::*; +) -> Result, Error> { let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len()); let _e = span.enter(); @@ -110,7 +109,7 @@ fn decode_frame( if partial_inout.is_some() && head.kind() != Kind::Continuation { proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind()); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } let kind = head.kind(); @@ -131,14 +130,11 @@ fn decode_frame( // A stream cannot depend on itself. An endpoint MUST // treat this as a stream error (Section 5.4.2) of type // `PROTOCOL_ERROR`. - return Err(Stream { - id: $head.stream_id(), - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR)); }, Err(e) => { proto_err!(conn: "failed to load frame; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -151,14 +147,11 @@ fn decode_frame( Err(frame::Error::MalformedMessage) => { let id = $head.stream_id(); proto_err!(stream: "malformed header block; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); }, Err(e) => { proto_err!(conn: "failed HPACK decoding; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } @@ -183,7 +176,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -192,7 +185,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load PING frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -201,7 +194,7 @@ fn decode_frame( res.map_err(|e| { proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -212,7 +205,7 @@ fn decode_frame( // TODO: Should this always be connection level? Probably not... res.map_err(|e| { proto_err!(conn: "failed to load DATA frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -221,7 +214,7 @@ fn decode_frame( let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load RESET frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -229,7 +222,7 @@ fn decode_frame( let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e); - Connection(Reason::PROTOCOL_ERROR) + Error::library_go_away(Reason::PROTOCOL_ERROR) })? .into() } @@ -238,7 +231,7 @@ fn decode_frame( if head.stream_id() == 0 { // Invalid stream identifier proto_err!(conn: "invalid stream ID 0"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) { @@ -249,14 +242,11 @@ fn decode_frame( // `PROTOCOL_ERROR`. let id = head.stream_id(); proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); } Err(e) => { proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } } @@ -267,14 +257,14 @@ fn decode_frame( Some(partial) => partial, None => { proto_err!(conn: "received unexpected CONTINUATION frame"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } }; // The stream identifiers must match if partial.frame.stream_id() != head.stream_id() { proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID"); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } // Extend the buf @@ -297,7 +287,7 @@ fn decode_frame( // the attacker to go away. if partial.buf.len() + bytes.len() > max_header_list_size { proto_err!(conn: "CONTINUATION frame header block size over ignorable limit"); - return Err(Connection(Reason::COMPRESSION_ERROR)); + return Err(Error::library_go_away(Reason::COMPRESSION_ERROR).into()); } } partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); @@ -312,14 +302,11 @@ fn decode_frame( Err(frame::Error::MalformedMessage) => { let id = head.stream_id(); proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id); - return Err(Stream { - id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR)); } Err(e) => { proto_err!(conn: "failed HPACK decoding; err={:?}", e); - return Err(Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } } @@ -343,7 +330,7 @@ impl Stream for FramedRead where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let span = tracing::trace_span!("FramedRead::poll_next"); @@ -371,11 +358,11 @@ where } } -fn map_err(err: io::Error) -> RecvError { +fn map_err(err: io::Error) -> Error { if let io::ErrorKind::InvalidData = err.kind() { if let Some(custom) = err.get_ref() { if custom.is::() { - return RecvError::Connection(Reason::FRAME_SIZE_ERROR); + return Error::library_go_away(Reason::FRAME_SIZE_ERROR); } } } diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 7d0ab73d..359adf6e 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -2,12 +2,13 @@ mod error; mod framed_read; mod framed_write; -pub use self::error::{RecvError, SendError, UserError}; +pub use self::error::{SendError, UserError}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use crate::frame::{self, Data, Frame}; +use crate::proto::Error; use bytes::Buf; use futures_core::Stream; @@ -155,7 +156,7 @@ impl Stream for Codec where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) diff --git a/src/error.rs b/src/error.rs index 372bac2e..0421f403 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,8 @@ use crate::codec::{SendError, UserError}; -use crate::proto; +use crate::frame::StreamId; +use crate::proto::{self, Initiator}; +use bytes::Bytes; use std::{error, fmt, io}; pub use crate::frame::Reason; @@ -22,11 +24,14 @@ pub struct Error { #[derive(Debug)] enum Kind { - /// An error caused by an action taken by the remote peer. - /// - /// This is either an error received by the peer or caused by an invalid - /// action taken by the peer (i.e. a protocol error). - Proto(Reason), + /// A RST_STREAM frame was received or sent. + Reset(StreamId, Reason, Initiator), + + /// A GO_AWAY frame was received or sent. + GoAway(Bytes, Reason, Initiator), + + /// The user created an error from a bare Reason. + Reason(Reason), /// An error resulting from an invalid action taken by the user of this /// library. @@ -45,7 +50,7 @@ impl Error { /// action taken by the peer (i.e. a protocol error). pub fn reason(&self) -> Option { match self.kind { - Kind::Proto(reason) => Some(reason), + Kind::Reset(_, reason, _) | Kind::GoAway(_, reason, _) => Some(reason), _ => None, } } @@ -87,8 +92,13 @@ impl From for Error { Error { kind: match src { - Proto(reason) => Kind::Proto(reason), - Io(e) => Kind::Io(e), + Reset(stream_id, reason, initiator) => Kind::Reset(stream_id, reason, initiator), + GoAway(debug_data, reason, initiator) => { + Kind::GoAway(debug_data, reason, initiator) + } + Io(kind, inner) => { + Kind::Io(inner.map_or_else(|| kind.into(), |inner| io::Error::new(kind, inner))) + } }, } } @@ -97,7 +107,7 @@ impl From for Error { impl From for Error { fn from(src: Reason) -> Error { Error { - kind: Kind::Proto(src), + kind: Kind::Reason(src), } } } @@ -106,8 +116,7 @@ impl From for Error { fn from(src: SendError) -> Error { match src { SendError::User(e) => e.into(), - SendError::Connection(reason) => reason.into(), - SendError::Io(e) => Error::from_io(e), + SendError::Connection(e) => e.into(), } } } @@ -122,13 +131,38 @@ impl From for Error { impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::Kind::*; - - match self.kind { - Proto(ref reason) => write!(fmt, "protocol error: {}", reason), - User(ref e) => write!(fmt, "user error: {}", e), - Io(ref e) => fmt::Display::fmt(e, fmt), + let debug_data = match self.kind { + Kind::Reset(_, reason, Initiator::User) => { + return write!(fmt, "stream error sent by user: {}", reason) + } + Kind::Reset(_, reason, Initiator::Library) => { + return write!(fmt, "stream error detected: {}", reason) + } + Kind::Reset(_, reason, Initiator::Remote) => { + return write!(fmt, "stream error received: {}", reason) + } + Kind::GoAway(ref debug_data, reason, Initiator::User) => { + write!(fmt, "connection error sent by user: {}", reason)?; + debug_data + } + Kind::GoAway(ref debug_data, reason, Initiator::Library) => { + write!(fmt, "connection error detected: {}", reason)?; + debug_data + } + Kind::GoAway(ref debug_data, reason, Initiator::Remote) => { + write!(fmt, "connection error received: {}", reason)?; + debug_data + } + Kind::Reason(reason) => return write!(fmt, "protocol error: {}", reason), + Kind::User(ref e) => return write!(fmt, "user error: {}", e), + Kind::Io(ref e) => return e.fmt(fmt), + }; + + if !debug_data.is_empty() { + write!(fmt, " ({:?})", debug_data)?; } + + Ok(()) } } diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 52dd91d4..91d9c4c6 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -29,8 +29,7 @@ impl GoAway { self.error_code } - #[cfg(feature = "unstable")] - pub fn debug_data(&self) -> &[u8] { + pub fn debug_data(&self) -> &Bytes { &self.debug_data } diff --git a/src/frame/reset.rs b/src/frame/reset.rs index b2613028..39f6ac20 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -2,7 +2,7 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId}; use bytes::BufMut; -#[derive(Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Reset { stream_id: StreamId, error_code: Reason, diff --git a/src/lib.rs b/src/lib.rs index 951cd96b..381a62a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,8 +104,14 @@ macro_rules! ready { mod codec; mod error; mod hpack; + +#[cfg(not(feature = "unstable"))] mod proto; +#[cfg(feature = "unstable")] +#[allow(missing_docs)] +pub mod proto; + #[cfg(not(feature = "unstable"))] mod frame; @@ -125,7 +131,7 @@ pub use crate::error::{Error, Reason}; pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId}; #[cfg(feature = "unstable")] -pub use codec::{Codec, RecvError, SendError, UserError}; +pub use codec::{Codec, SendError, UserError}; use std::task::Poll; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b44fdcd5..a75df436 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,6 +1,6 @@ -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::frame::{Reason, StreamId}; -use crate::{client, frame, proto, server}; +use crate::{client, frame, server}; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; @@ -40,7 +40,7 @@ where /// /// This exists separately from State in order to support /// graceful shutdown. - error: Option, + error: Option, /// Pending GOAWAY frames to write. go_away: GoAway, @@ -68,7 +68,7 @@ struct DynConnection<'a, B: Buf = Bytes> { streams: DynStreams<'a, B>, - error: &'a mut Option, + error: &'a mut Option, ping_pong: &'a mut PingPong, } @@ -88,10 +88,10 @@ enum State { Open, /// The codec must be flushed - Closing(Reason), + Closing(Reason, Initiator), /// In a closed state - Closed(Reason), + Closed(Reason, Initiator), } impl Connection @@ -161,9 +161,9 @@ where /// Returns `Ready` when the connection is ready to receive a frame. /// - /// Returns `RecvError` as this may raise errors that are caused by delayed + /// Returns `Error` as this may raise errors that are caused by delayed /// processing of received frames. - fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { let _e = self.inner.span.enter(); let span = tracing::trace_span!("poll_ready"); let _e = span.enter(); @@ -191,26 +191,24 @@ where self.inner.as_dyn().go_away_from_user(e) } - fn take_error(&mut self, ours: Reason) -> Poll> { - let reason = if let Some(theirs) = self.inner.error.take() { - match (ours, theirs) { - // If either side reported an error, return that - // to the user. - (Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err, - // If both sides reported an error, give their - // error back to th user. We assume our error - // was a consequence of their error, and less - // important. - (_, theirs) => theirs, - } - } else { - ours - }; - - if reason == Reason::NO_ERROR { - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(proto::Error::Proto(reason))) + fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> { + let (debug_data, theirs) = self + .inner + .error + .take() + .as_ref() + .map_or((Bytes::new(), Reason::NO_ERROR), |frame| { + (frame.debug_data().clone(), frame.reason()) + }); + + match (ours, theirs) { + (Reason::NO_ERROR, Reason::NO_ERROR) => return Ok(()), + (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)), + // If both sides reported an error, give their + // error back to th user. We assume our error + // was a consequence of their error, and less + // important. + (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)), } } @@ -229,7 +227,7 @@ where } /// Advances the internal state of the connection. - pub fn poll(&mut self, cx: &mut Context) -> Poll> { + pub fn poll(&mut self, cx: &mut Context) -> Poll> { // XXX(eliza): cloning the span is unfortunately necessary here in // order to placate the borrow checker — `self` is mutably borrowed by // `poll2`, which means that we can't borrow `self.span` to enter it. @@ -268,20 +266,22 @@ where self.inner.as_dyn().handle_poll2_result(result)? } - State::Closing(reason) => { + State::Closing(reason, initiator) => { tracing::trace!("connection closing after flush"); // Flush/shutdown the codec ready!(self.codec.shutdown(cx))?; // Transition the state to error - self.inner.state = State::Closed(reason); + self.inner.state = State::Closed(reason, initiator); + } + State::Closed(reason, initiator) => { + return Poll::Ready(self.take_error(reason, initiator)); } - State::Closed(reason) => return self.take_error(reason), } } } - fn poll2(&mut self, cx: &mut Context) -> Poll> { + fn poll2(&mut self, cx: &mut Context) -> Poll> { // This happens outside of the loop to prevent needing to do a clock // check and then comparison of the queue possibly multiple times a // second (and thus, the clock wouldn't have changed enough to matter). @@ -300,7 +300,7 @@ where // the same error back to the user. return Poll::Ready(Ok(())); } else { - return Poll::Ready(Err(RecvError::Connection(reason))); + return Poll::Ready(Err(Error::library_go_away(reason))); } } // Only NO_ERROR should be waiting for idle @@ -384,42 +384,45 @@ where self.go_away.go_away_from_user(frame); // Notify all streams of reason we're abruptly closing. - self.streams.recv_err(&proto::Error::Proto(e)); + self.streams.handle_error(Error::user_go_away(e)); } - fn handle_poll2_result(&mut self, result: Result<(), RecvError>) -> Result<(), Error> { - use crate::codec::RecvError::*; + fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> { match result { // The connection has shutdown normally Ok(()) => { - *self.state = State::Closing(Reason::NO_ERROR); + *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library); Ok(()) } // Attempting to read a frame resulted in a connection level // error. This is handled by setting a GOAWAY frame followed by // terminating the connection. - Err(Connection(e)) => { + Err(Error::GoAway(debug_data, reason, initiator)) => { + let e = Error::GoAway(debug_data, reason, initiator); tracing::debug!(error = ?e, "Connection::poll; connection error"); // We may have already sent a GOAWAY for this error, // if so, don't send another, just flush and close up. - if let Some(reason) = self.go_away.going_away_reason() { - if reason == e { - tracing::trace!(" -> already going away"); - *self.state = State::Closing(e); - return Ok(()); - } + if self + .go_away + .going_away() + .map_or(false, |frame| frame.reason() == reason) + { + tracing::trace!(" -> already going away"); + *self.state = State::Closing(reason, initiator); + return Ok(()); } // Reset all active streams - self.streams.recv_err(&e.into()); - self.go_away_now(e); + self.streams.handle_error(e); + self.go_away_now(reason); Ok(()) } // Attempting to read a frame resulted in a stream level error. // This is handled by resetting the frame then trying to read // another frame. - Err(Stream { id, reason }) => { + Err(Error::Reset(id, reason, initiator)) => { + debug_assert_eq!(initiator, Initiator::Library); tracing::trace!(?id, ?reason, "stream error"); self.streams.send_reset(id, reason); Ok(()) @@ -428,12 +431,12 @@ where // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Io(e)) => { + Err(Error::Io(e, inner)) => { tracing::debug!(error = ?e, "Connection::poll; IO error"); - let e = e.into(); + let e = Error::Io(e, inner); // Reset all active streams - self.streams.recv_err(&e); + self.streams.handle_error(e.clone()); // Return the error Err(e) @@ -441,7 +444,7 @@ where } } - fn recv_frame(&mut self, frame: Option) -> Result { + fn recv_frame(&mut self, frame: Option) -> Result { use crate::frame::Frame::*; match frame { Some(Headers(frame)) => { @@ -471,7 +474,7 @@ where // until they are all EOS. Once they are, State should // transition to GoAway. self.streams.recv_go_away(&frame)?; - *self.error = Some(frame.reason()); + *self.error = Some(frame); } Some(Ping(frame)) => { tracing::trace!(?frame, "recv PING"); diff --git a/src/proto/error.rs b/src/proto/error.rs index c3ee20d0..19723726 100644 --- a/src/proto/error.rs +++ b/src/proto/error.rs @@ -1,53 +1,87 @@ -use crate::codec::{RecvError, SendError}; -use crate::frame::Reason; +use crate::codec::SendError; +use crate::frame::{Reason, StreamId}; +use bytes::Bytes; +use std::fmt; use std::io; /// Either an H2 reason or an I/O error -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Error { - Proto(Reason), - Io(io::Error), + Reset(StreamId, Reason, Initiator), + GoAway(Bytes, Reason, Initiator), + Io(io::ErrorKind, Option), +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Initiator { + User, + Library, + Remote, } impl Error { - /// Clone the error for internal purposes. - /// - /// `io::Error` is not `Clone`, so we only copy the `ErrorKind`. - pub(super) fn shallow_clone(&self) -> Error { + pub(crate) fn is_local(&self) -> bool { match *self { - Error::Proto(reason) => Error::Proto(reason), - Error::Io(ref io) => Error::Io(io::Error::from(io.kind())), + Self::Reset(_, _, initiator) | Self::GoAway(_, _, initiator) => initiator.is_local(), + Self::Io(..) => true, } } -} -impl From for Error { - fn from(src: Reason) -> Self { - Error::Proto(src) + pub(crate) fn user_go_away(reason: Reason) -> Self { + Self::GoAway(Bytes::new(), reason, Initiator::User) + } + + pub(crate) fn library_reset(stream_id: StreamId, reason: Reason) -> Self { + Self::Reset(stream_id, reason, Initiator::Library) + } + + pub(crate) fn library_go_away(reason: Reason) -> Self { + Self::GoAway(Bytes::new(), reason, Initiator::Library) + } + + pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self { + Self::Reset(stream_id, reason, Initiator::Remote) + } + + pub(crate) fn remote_go_away(debug_data: Bytes, reason: Reason) -> Self { + Self::GoAway(debug_data, reason, Initiator::Remote) } } -impl From for Error { - fn from(src: io::Error) -> Self { - Error::Io(src) +impl Initiator { + fn is_local(&self) -> bool { + match *self { + Self::User | Self::Library => true, + Self::Remote => false, + } } } -impl From for RecvError { - fn from(src: Error) -> RecvError { - match src { - Error::Proto(reason) => RecvError::Connection(reason), - Error::Io(e) => RecvError::Io(e), +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Reset(_, reason, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt), + Self::Io(_, Some(ref inner)) => inner.fmt(fmt), + Self::Io(kind, None) => io::Error::from(kind).fmt(fmt), } } } +impl From for Error { + fn from(src: io::ErrorKind) -> Self { + Error::Io(src.into(), None) + } +} + +impl From for Error { + fn from(src: io::Error) -> Self { + Error::Io(src.kind(), src.get_ref().map(|inner| inner.to_string())) + } +} + impl From for SendError { - fn from(src: Error) -> SendError { - match src { - Error::Proto(reason) => SendError::Connection(reason), - Error::Io(e) => SendError::Io(e), - } + fn from(src: Error) -> Self { + Self::Connection(src) } } diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 91d37b64..75942787 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -31,7 +31,7 @@ pub(super) struct GoAway { /// well, and we wouldn't want to save that here to accidentally dump in logs, /// or waste struct space.) #[derive(Debug)] -struct GoingAway { +pub(crate) struct GoingAway { /// Stores the highest stream ID of a GOAWAY that has been sent. /// /// It's illegal to send a subsequent GOAWAY with a higher ID. @@ -98,9 +98,9 @@ impl GoAway { self.is_user_initiated } - /// Return the last Reason we've sent. - pub fn going_away_reason(&self) -> Option { - self.going_away.as_ref().map(|g| g.reason) + /// Returns the going away info, if any. + pub fn going_away(&self) -> Option<&GoingAway> { + self.going_away.as_ref() } /// Returns if the connection should close now, or wait until idle. @@ -141,7 +141,7 @@ impl GoAway { return Poll::Ready(Some(Ok(reason))); } else if self.should_close_now() { - return match self.going_away_reason() { + return match self.going_away().map(|going_away| going_away.reason) { Some(reason) => Poll::Ready(Some(Ok(reason))), None => Poll::Ready(None), }; @@ -150,3 +150,9 @@ impl GoAway { Poll::Ready(None) } } + +impl GoingAway { + pub(crate) fn reason(&self) -> Reason { + self.reason + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 84fd8542..d505e77f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,7 +7,7 @@ mod settings; mod streams; pub(crate) use self::connection::{Config, Connection}; -pub(crate) use self::error::Error; +pub use self::error::{Error, Initiator}; pub(crate) use self::peer::{Dyn as DynPeer, Peer}; pub(crate) use self::ping_pong::UserPings; pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams}; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 3bcc7722..d62d9e24 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -1,7 +1,6 @@ -use crate::codec::RecvError; use crate::error::Reason; use crate::frame::{Pseudo, StreamId}; -use crate::proto::Open; +use crate::proto::{Error, Open}; use http::{HeaderMap, Request, Response}; @@ -21,7 +20,7 @@ pub(crate) trait Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result; + ) -> Result; fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); @@ -61,7 +60,7 @@ impl Dyn { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { if self.is_server() { crate::server::Peer::convert_poll_message(pseudo, fields, stream_id) .map(PollMessage::Server) @@ -72,12 +71,12 @@ impl Dyn { } /// Returns true if the remote peer can initiate a stream with the given ID. - pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), RecvError> { + pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), Error> { if self.is_server() { // Ensure that the ID is a valid client initiated ID if mode.is_push_promise() || !id.is_client_initiated() { proto_err!(conn: "cannot open stream {:?} - not client initiated", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) @@ -85,7 +84,7 @@ impl Dyn { // Ensure that the ID is a valid server initiated ID if !mode.is_push_promise() || !id.is_server_initiated() { proto_err!(conn: "cannot open stream {:?} - not server initiated", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 45329232..44f4c2df 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,4 +1,4 @@ -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::error::Reason; use crate::frame; use crate::proto::*; @@ -40,7 +40,7 @@ impl Settings { frame: frame::Settings, codec: &mut Codec, streams: &mut Streams, - ) -> Result<(), RecvError> + ) -> Result<(), Error> where T: AsyncWrite + Unpin, B: Buf, @@ -68,7 +68,7 @@ impl Settings { // We haven't sent any SETTINGS frames to be ACKed, so // this is very bizarre! Remote is either buggy or malicious. proto_err!(conn: "received unexpected settings ack"); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } else { @@ -97,7 +97,7 @@ impl Settings { cx: &mut Context, dst: &mut Codec, streams: &mut Streams, - ) -> Poll> + ) -> Poll> where T: AsyncWrite + Unpin, B: Buf, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 77eb507d..9671d589 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -791,7 +791,10 @@ impl Prioritize { }), None => { if let Some(reason) = stream.state.get_scheduled_reset() { - stream.state.set_reset(reason); + let stream_id = stream.id; + stream + .state + .set_reset(stream_id, reason, Initiator::Library); let frame = frame::Reset::new(stream.id, reason); Frame::Reset(frame) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1f30450f..1df0921c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,7 +1,7 @@ use super::*; -use crate::codec::{RecvError, UserError}; -use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use crate::{frame, proto}; +use crate::codec::UserError; +use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::proto::{self, Error}; use std::task::Context; use http::{HeaderMap, Request, Response}; @@ -68,7 +68,7 @@ pub(super) enum Event { #[derive(Debug)] pub(super) enum RecvHeaderBlockError { Oversize(T), - State(RecvError), + State(Error), } #[derive(Debug)] @@ -130,7 +130,7 @@ impl Recv { id: StreamId, mode: Open, counts: &mut Counts, - ) -> Result, RecvError> { + ) -> Result, Error> { assert!(self.refused.is_none()); counts.peer().ensure_can_open(id, mode)?; @@ -138,7 +138,7 @@ impl Recv { let next_id = self.next_stream_id()?; if id < next_id { proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } self.next_stream_id = id.next_id(); @@ -182,11 +182,7 @@ impl Recv { Ok(v) => v, Err(()) => { proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - } - .into()); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); } }; @@ -318,16 +314,13 @@ impl Recv { &mut self, frame: frame::Headers, stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { // Transition the state stream.state.recv_close()?; if stream.ensure_content_length_zero().is_err() { proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } let trailers = frame.into_fields(); @@ -461,7 +454,7 @@ impl Recv { &mut self, settings: &frame::Settings, store: &mut Store, - ) -> Result<(), RecvError> { + ) -> Result<(), proto::Error> { let target = if let Some(val) = settings.initial_window_size() { val } else { @@ -508,7 +501,7 @@ impl Recv { stream .recv_flow .inc_window(inc) - .map_err(RecvError::Connection)?; + .map_err(proto::Error::library_go_away)?; stream.recv_flow.assign_capacity(inc); Ok(()) }) @@ -526,11 +519,7 @@ impl Recv { stream.pending_recv.is_empty() } - pub fn recv_data( - &mut self, - frame: frame::Data, - stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> { let sz = frame.payload().len(); // This should have been enforced at the codec::FramedRead layer, so @@ -548,7 +537,7 @@ impl Recv { // Receiving a DATA frame when not expecting one is a protocol // error. proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } tracing::trace!( @@ -563,7 +552,7 @@ impl Recv { "recv_data; frame ignored on locally reset {:?} for some time", stream.id, ); - return self.ignore_data(sz); + return Ok(self.ignore_data(sz)?); } // Ensure that there is enough capacity on the connection before acting @@ -579,10 +568,7 @@ impl Recv { // So, for violating the **stream** window, we can send either a // stream or connection error. We've opted to send a stream // error. - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::FLOW_CONTROL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR)); } if stream.dec_content_length(frame.payload().len()).is_err() { @@ -591,10 +577,7 @@ impl Recv { stream.id, frame.payload().len(), ); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } if frame.is_end_stream() { @@ -604,15 +587,12 @@ impl Recv { stream.id, frame.payload().len(), ); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } if stream.state.recv_close().is_err() { proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } } @@ -631,7 +611,7 @@ impl Recv { Ok(()) } - pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { + pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> { // Ensure that there is enough capacity on the connection... self.consume_connection_window(sz)?; @@ -647,14 +627,14 @@ impl Recv { Ok(()) } - pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { + pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> { if self.flow.window_size() < sz { tracing::debug!( "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", self.flow.window_size(), sz, ); - return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); + return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR)); } // Update connection level flow control @@ -669,7 +649,7 @@ impl Recv { &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { stream.state.reserve_remote()?; if frame.is_over_size() { // A frame is over size if the decoded header block was bigger than @@ -688,10 +668,10 @@ impl Recv { headers frame is over size; promised_id={:?};", frame.promised_id(), ); - return Err(RecvError::Stream { - id: frame.promised_id(), - reason: Reason::REFUSED_STREAM, - }); + return Err(Error::library_reset( + frame.promised_id(), + Reason::REFUSED_STREAM, + )); } let promised_id = frame.promised_id(); @@ -714,10 +694,7 @@ impl Recv { promised_id, ), } - return Err(RecvError::Stream { - id: promised_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR)); } use super::peer::PollMessage::*; @@ -747,18 +724,16 @@ impl Recv { /// Handle remote sending an explicit RST_STREAM. pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { // Notify the stream - stream - .state - .recv_reset(frame.reason(), stream.is_pending_send); + stream.state.recv_reset(frame, stream.is_pending_send); stream.notify_send(); stream.notify_recv(); } - /// Handle a received error - pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { + /// Handle a connection-level error + pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) { // Receive an error - stream.state.recv_err(err); + stream.state.handle_error(err); // If a receiver is waiting, notify it stream.notify_send(); @@ -789,11 +764,11 @@ impl Recv { self.max_stream_id } - pub fn next_stream_id(&self) -> Result { + pub fn next_stream_id(&self) -> Result { if let Ok(id) = self.next_stream_id { Ok(id) } else { - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } @@ -808,10 +783,10 @@ impl Recv { } /// Returns true if the remote peer can reserve a stream with the given ID. - pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { + pub fn ensure_can_reserve(&self) -> Result<(), Error> { if !self.is_push_enabled { proto_err!(conn: "recv_push_promise: push is disabled"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } Ok(()) @@ -1098,8 +1073,8 @@ impl Open { // ===== impl RecvHeaderBlockError ===== -impl From for RecvHeaderBlockError { - fn from(err: RecvError) -> Self { +impl From for RecvHeaderBlockError { + fn from(err: Error) -> Self { RecvHeaderBlockError::State(err) } } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e8804127..d4d64cd8 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -2,8 +2,9 @@ use super::{ store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize, }; -use crate::codec::{RecvError, UserError}; +use crate::codec::UserError; use crate::frame::{self, Reason}; +use crate::proto::{Error, Initiator}; use bytes::Buf; use http; @@ -161,6 +162,7 @@ impl Send { pub fn send_reset( &mut self, reason: Reason, + initiator: Initiator, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, @@ -169,14 +171,16 @@ impl Send { let is_reset = stream.state.is_reset(); let is_closed = stream.state.is_closed(); let is_empty = stream.pending_send.is_empty(); + let stream_id = stream.id; tracing::trace!( - "send_reset(..., reason={:?}, stream={:?}, ..., \ + "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ state={:?} \ ", reason, - stream.id, + initiator, + stream_id, is_reset, is_closed, is_empty, @@ -187,13 +191,13 @@ impl Send { // Don't double reset tracing::trace!( " -> not sending RST_STREAM ({:?} is already reset)", - stream.id + stream_id ); return; } // Transition the state to reset no matter what. - stream.state.set_reset(reason); + stream.state.set_reset(stream_id, reason, initiator); // If closed AND the send queue is flushed, then the stream cannot be // reset explicitly, either. Implicit resets can still be queued. @@ -201,7 +205,7 @@ impl Send { tracing::trace!( " -> not sending explicit RST_STREAM ({:?} was closed \ and send queue was flushed)", - stream.id + stream_id ); return; } @@ -371,7 +375,14 @@ impl Send { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { tracing::debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset(Reason::FLOW_CONTROL_ERROR, buffer, stream, counts, task); + self.send_reset( + Reason::FLOW_CONTROL_ERROR, + Initiator::Library, + buffer, + stream, + counts, + task, + ); return Err(e); } @@ -379,7 +390,7 @@ impl Send { Ok(()) } - pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), RecvError> { + pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { if last_stream_id > self.max_stream_id { // The remote endpoint sent a `GOAWAY` frame indicating a stream // that we never sent, or that we have already terminated on account @@ -392,14 +403,14 @@ impl Send { "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", last_stream_id, self.max_stream_id, ); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } self.max_stream_id = last_stream_id; Ok(()) } - pub fn recv_err( + pub fn handle_error( &mut self, buffer: &mut Buffer>, stream: &mut store::Ptr, @@ -417,7 +428,7 @@ impl Send { store: &mut Store, counts: &mut Counts, task: &mut Option, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { // Applies an update to the remote endpoint's initial window size. // // Per RFC 7540 §6.9.2: @@ -480,7 +491,7 @@ impl Send { // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - Ok::<_, RecvError>(()) + Ok::<_, Error>(()) })?; self.prioritize @@ -490,7 +501,7 @@ impl Send { store.for_each(|mut stream| { self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) - .map_err(RecvError::Connection) + .map_err(Error::library_go_away) })?; } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 3e739daf..9931d41b 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -1,9 +1,8 @@ use std::io; -use crate::codec::UserError::*; -use crate::codec::{RecvError, UserError}; -use crate::frame::{self, Reason}; -use crate::proto::{self, PollReset}; +use crate::codec::UserError; +use crate::frame::{self, Reason, StreamId}; +use crate::proto::{self, Error, Initiator, PollReset}; use self::Inner::*; use self::Peer::*; @@ -53,7 +52,7 @@ pub struct State { inner: Inner, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: @@ -71,12 +70,10 @@ enum Peer { Streaming, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] enum Cause { EndStream, - Proto(Reason), - LocallyReset(Reason), - Io, + Error(Error), /// This indicates to the connection that a reset frame must be sent out /// once the send queue has been flushed. @@ -85,7 +82,7 @@ enum Cause { /// - User drops all references to a stream, so we want to CANCEL the it. /// - Header block size was too large, so we want to REFUSE, possibly /// after sending a 431 response frame. - Scheduled(Reason), + ScheduledLibraryReset(Reason), } impl State { @@ -123,7 +120,7 @@ impl State { } _ => { // All other transitions result in a protocol error - return Err(UnexpectedFrameType); + return Err(UserError::UnexpectedFrameType); } }; @@ -133,7 +130,7 @@ impl State { /// Opens the receive-half of the stream when a HEADERS frame is received. /// /// Returns true if this transitions the state to Open. - pub fn recv_open(&mut self, frame: &frame::Headers) -> Result { + pub fn recv_open(&mut self, frame: &frame::Headers) -> Result { let mut initial = false; let eos = frame.is_end_stream(); @@ -195,10 +192,10 @@ impl State { HalfClosedLocal(Streaming) } } - state => { + ref state => { // All other transitions result in a protocol error proto_err!(conn: "recv_open: in unexpected state {:?}", state); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -206,15 +203,15 @@ impl State { } /// Transition from Idle -> ReservedRemote - pub fn reserve_remote(&mut self) -> Result<(), RecvError> { + pub fn reserve_remote(&mut self) -> Result<(), Error> { match self.inner { Idle => { self.inner = ReservedRemote; Ok(()) } - state => { + ref state => { proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } @@ -231,7 +228,7 @@ impl State { } /// Indicates that the remote side will not send more data to the local. - pub fn recv_close(&mut self) -> Result<(), RecvError> { + pub fn recv_close(&mut self) -> Result<(), Error> { match self.inner { Open { local, .. } => { // The remote side will continue to receive data. @@ -244,9 +241,9 @@ impl State { self.inner = Closed(Cause::EndStream); Ok(()) } - state => { + ref state => { proto_err!(conn: "recv_close: in unexpected state {:?}", state); - Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) } } } @@ -254,9 +251,9 @@ impl State { /// The remote explicitly sent a RST_STREAM. /// /// # Arguments - /// - `reason`: the reason field of the received RST_STREAM frame. + /// - `frame`: the received RST_STREAM frame. /// - `queued`: true if this stream has frames in the pending send queue. - pub fn recv_reset(&mut self, reason: Reason, queued: bool) { + pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { match self.inner { // If the stream is already in a `Closed` state, do nothing, // provided that there are no frames still in the send queue. @@ -275,30 +272,28 @@ impl State { // In either of these cases, we want to overwrite the stream's // previous state with the received RST_STREAM, so that the queue // will be cleared by `Prioritize::pop_frame`. - state => { + ref state => { tracing::trace!( - "recv_reset; reason={:?}; state={:?}; queued={:?}", - reason, + "recv_reset; frame={:?}; state={:?}; queued={:?}", + frame, state, queued ); - self.inner = Closed(Cause::Proto(reason)); + self.inner = Closed(Cause::Error(Error::remote_reset( + frame.stream_id(), + frame.reason(), + ))); } } } - /// We noticed a protocol error. - pub fn recv_err(&mut self, err: &proto::Error) { - use crate::proto::Error::*; - + /// Handle a connection-level error. + pub fn handle_error(&mut self, err: &proto::Error) { match self.inner { Closed(..) => {} _ => { - tracing::trace!("recv_err; err={:?}", err); - self.inner = Closed(match *err { - Proto(reason) => Cause::LocallyReset(reason), - Io(..) => Cause::Io, - }); + tracing::trace!("handle_error; err={:?}", err); + self.inner = Closed(Cause::Error(err.clone())); } } } @@ -306,9 +301,9 @@ impl State { pub fn recv_eof(&mut self) { match self.inner { Closed(..) => {} - s => { - tracing::trace!("recv_eof; state={:?}", s); - self.inner = Closed(Cause::Io); + ref state => { + tracing::trace!("recv_eof; state={:?}", state); + self.inner = Closed(Cause::Error(io::ErrorKind::BrokenPipe.into())); } } } @@ -325,39 +320,39 @@ impl State { tracing::trace!("send_close: HalfClosedRemote => Closed"); self.inner = Closed(Cause::EndStream); } - state => panic!("send_close: unexpected state {:?}", state), + ref state => panic!("send_close: unexpected state {:?}", state), } } /// Set the stream state to reset locally. - pub fn set_reset(&mut self, reason: Reason) { - self.inner = Closed(Cause::LocallyReset(reason)); + pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { + self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); } /// Set the stream state to a scheduled reset. pub fn set_scheduled_reset(&mut self, reason: Reason) { debug_assert!(!self.is_closed()); - self.inner = Closed(Cause::Scheduled(reason)); + self.inner = Closed(Cause::ScheduledLibraryReset(reason)); } pub fn get_scheduled_reset(&self) -> Option { match self.inner { - Closed(Cause::Scheduled(reason)) => Some(reason), + Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), _ => None, } } pub fn is_scheduled_reset(&self) -> bool { match self.inner { - Closed(Cause::Scheduled(..)) => true, + Closed(Cause::ScheduledLibraryReset(..)) => true, _ => false, } } pub fn is_local_reset(&self) -> bool { match self.inner { - Closed(Cause::LocallyReset(_)) => true, - Closed(Cause::Scheduled(..)) => true, + Closed(Cause::Error(ref e)) => e.is_local(), + Closed(Cause::ScheduledLibraryReset(..)) => true, _ => false, } } @@ -436,10 +431,10 @@ impl State { pub fn ensure_recv_open(&self) -> Result { // TODO: Is this correct? match self.inner { - Closed(Cause::Proto(reason)) - | Closed(Cause::LocallyReset(reason)) - | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), - Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), + Closed(Cause::Error(ref e)) => Err(e.clone()), + Closed(Cause::ScheduledLibraryReset(reason)) => { + Err(proto::Error::library_go_away(reason)) + } Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), _ => Ok(true), } @@ -448,10 +443,10 @@ impl State { /// Returns a reason if the stream has been reset. pub(super) fn ensure_reason(&self, mode: PollReset) -> Result, crate::Error> { match self.inner { - Closed(Cause::Proto(reason)) - | Closed(Cause::LocallyReset(reason)) - | Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), - Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), + Closed(Cause::Error(Error::Reset(_, reason, _))) + | Closed(Cause::Error(Error::GoAway(_, reason, _))) + | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), + Closed(Cause::Error(ref e)) => Err(e.clone().into()), Open { local: Streaming, .. } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index e3e02c2f..1281b11b 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,9 +1,9 @@ use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; -use crate::codec::{Codec, RecvError, SendError, UserError}; +use crate::codec::{Codec, SendError, UserError}; use crate::frame::{self, Frame, Reason}; -use crate::proto::{peer, Open, Peer, WindowSize}; +use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize}; use crate::{client, proto, server}; use bytes::{Buf, Bytes}; @@ -180,7 +180,7 @@ where me.poll_complete(&self.send_buffer, cx, dst) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -198,7 +198,7 @@ where ) } - pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { + pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -297,30 +297,30 @@ where } impl DynStreams<'_, B> { - pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_headers(self.peer, &self.send_buffer, frame) } - pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), RecvError> { + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_data(self.peer, &self.send_buffer, frame) } - pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), RecvError> { + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_reset(&self.send_buffer, frame) } - /// Handle a received error and return the ID of the last processed stream. - pub fn recv_err(&mut self, err: &proto::Error) -> StreamId { + /// Notify all streams that a connection-level error happened. + pub fn handle_error(&mut self, err: proto::Error) -> StreamId { let mut me = self.inner.lock().unwrap(); - me.recv_err(&self.send_buffer, err) + me.handle_error(&self.send_buffer, err) } - pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), RecvError> { + pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_go_away(&self.send_buffer, frame) } @@ -329,12 +329,12 @@ impl DynStreams<'_, B> { self.inner.lock().unwrap().actions.recv.last_processed_id() } - pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> { + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_window_update(&self.send_buffer, frame) } - pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), RecvError> { + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); me.recv_push_promise(&self.send_buffer, frame) } @@ -375,7 +375,7 @@ impl Inner { peer: peer::Dyn, send_buffer: &SendBuffer, frame: frame::Headers, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); // The GOAWAY process has begun. All streams with a greater ID than @@ -405,10 +405,7 @@ impl Inner { "recv_headers for old stream={:?}, sending STREAM_CLOSED", id, ); - return Err(RecvError::Stream { - id, - reason: Reason::STREAM_CLOSED, - }); + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); } } @@ -471,10 +468,7 @@ impl Inner { Ok(()) } else { - Err(RecvError::Stream { - id: stream.id, - reason: Reason::REFUSED_STREAM, - }) + Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM)) } }, Err(RecvHeaderBlockError::State(err)) => Err(err), @@ -484,10 +478,7 @@ impl Inner { // Receiving trailers that don't set EOS is a "malformed" // message. Malformed messages are a stream error. proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id); - return Err(RecvError::Stream { - id: stream.id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); } actions.recv.recv_trailers(frame, stream) @@ -502,7 +493,7 @@ impl Inner { peer: peer::Dyn, send_buffer: &SendBuffer, frame: frame::Data, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let stream = match self.store.find_mut(&id) { @@ -529,14 +520,11 @@ impl Inner { let sz = sz as WindowSize; self.actions.recv.ignore_data(sz)?; - return Err(RecvError::Stream { - id, - reason: Reason::STREAM_CLOSED, - }); + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); } proto_err!(conn: "recv_data: stream not found; id={:?}", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } }; @@ -551,7 +539,7 @@ impl Inner { // Any stream error after receiving a DATA frame means // we won't give the data to the user, and so they can't // release the capacity. We do it automatically. - if let Err(RecvError::Stream { .. }) = res { + if let Err(Error::Reset(..)) = res { actions .recv .release_connection_capacity(sz as WindowSize, &mut None); @@ -564,12 +552,12 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::Reset, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); if id.is_zero() { proto_err!(conn: "recv_reset: invalid stream ID 0"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } // The GOAWAY process has begun. All streams with a greater ID than @@ -589,7 +577,7 @@ impl Inner { // TODO: Are there other error cases? self.actions .ensure_not_idle(self.counts.peer(), id) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; return Ok(()); } @@ -602,7 +590,7 @@ impl Inner { self.counts.transition(stream, |counts, stream| { actions.recv.recv_reset(frame, stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.send.handle_error(send_buffer, stream, counts); assert!(stream.state.is_closed()); Ok(()) }) @@ -612,7 +600,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::WindowUpdate, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -622,7 +610,7 @@ impl Inner { self.actions .send .recv_connection_window_update(frame, &mut self.store, &mut self.counts) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; } else { // The remote may send window updates for streams that the local now // considers closed. It's ok... @@ -640,14 +628,14 @@ impl Inner { } else { self.actions .ensure_not_idle(self.counts.peer(), id) - .map_err(RecvError::Connection)?; + .map_err(Error::library_go_away)?; } } Ok(()) } - fn recv_err(&mut self, send_buffer: &SendBuffer, err: &proto::Error) -> StreamId { + fn handle_error(&mut self, send_buffer: &SendBuffer, err: proto::Error) -> StreamId { let actions = &mut self.actions; let counts = &mut self.counts; let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -658,14 +646,14 @@ impl Inner { self.store .for_each(|stream| { counts.transition(stream, |counts, stream| { - actions.recv.recv_err(err, &mut *stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) }) .unwrap(); - actions.conn_error = Some(err.shallow_clone()); + actions.conn_error = Some(err); last_processed_id } @@ -674,7 +662,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: &frame::GoAway, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let actions = &mut self.actions; let counts = &mut self.counts; let mut send_buffer = send_buffer.inner.lock().unwrap(); @@ -684,14 +672,14 @@ impl Inner { actions.send.recv_go_away(last_stream_id)?; - let err = frame.reason().into(); + let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); self.store .for_each(|stream| { if stream.id > last_stream_id { counts.transition(stream, |counts, stream| { - actions.recv.recv_err(&err, &mut *stream); - actions.send.recv_err(send_buffer, stream, counts); + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) } else { @@ -709,7 +697,7 @@ impl Inner { &mut self, send_buffer: &SendBuffer, frame: frame::PushPromise, - ) -> Result<(), RecvError> { + ) -> Result<(), Error> { let id = frame.stream_id(); let promised_id = frame.promised_id(); @@ -733,7 +721,7 @@ impl Inner { } None => { proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); } }; @@ -826,7 +814,7 @@ impl Inner { // This handles resetting send state associated with the // stream - actions.send.recv_err(send_buffer, stream, counts); + actions.send.handle_error(send_buffer, stream, counts); Ok::<_, ()>(()) }) }) @@ -886,8 +874,13 @@ impl Inner { let stream = self.store.resolve(key); let mut send_buffer = send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - self.actions - .send_reset(stream, reason, &mut self.counts, send_buffer); + self.actions.send_reset( + stream, + reason, + Initiator::Library, + &mut self.counts, + send_buffer, + ); } } @@ -1060,7 +1053,7 @@ impl StreamRef { let send_buffer = &mut *send_buffer; me.actions - .send_reset(stream, reason, &mut me.counts, send_buffer); + .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer); } pub fn send_response( @@ -1468,12 +1461,19 @@ impl Actions { &mut self, stream: store::Ptr, reason: Reason, + initiator: Initiator, counts: &mut Counts, send_buffer: &mut Buffer>, ) { counts.transition(stream, |counts, stream| { - self.send - .send_reset(reason, send_buffer, stream, counts, &mut self.task); + self.send.send_reset( + reason, + initiator, + send_buffer, + stream, + counts, + &mut self.task, + ); self.recv.enqueue_reset_expiration(stream, counts); // if a RecvStream is parked, ensure it's notified stream.notify_recv(); @@ -1485,12 +1485,13 @@ impl Actions { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - res: Result<(), RecvError>, - ) -> Result<(), RecvError> { - if let Err(RecvError::Stream { reason, .. }) = res { + res: Result<(), Error>, + ) -> Result<(), Error> { + if let Err(Error::Reset(stream_id, reason, initiator)) = res { + debug_assert_eq!(stream_id, stream.id); // Reset the stream. self.send - .send_reset(reason, buffer, stream, counts, &mut self.task); + .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); Ok(()) } else { res @@ -1507,7 +1508,7 @@ impl Actions { fn ensure_no_conn_error(&self) -> Result<(), proto::Error> { if let Some(ref err) = self.conn_error { - Err(err.shallow_clone()) + Err(err.clone()) } else { Ok(()) } diff --git a/src/server.rs b/src/server.rs index f7131536..6662712d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -115,9 +115,9 @@ //! [`SendStream`]: ../struct.SendStream.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html -use crate::codec::{Codec, RecvError, UserError}; +use crate::codec::{Codec, UserError}; use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; -use crate::proto::{self, Config, Prioritized}; +use crate::proto::{self, Config, Error, Prioritized}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes}; @@ -1202,7 +1202,7 @@ where if &PREFACE[self.pos..self.pos + n] != buf.filled() { proto_err!(conn: "read_preface: invalid preface"); // TODO: Should this just write the GO_AWAY frame directly? - return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into())); + return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into())); } self.pos += n; @@ -1388,7 +1388,7 @@ impl proto::Peer for Peer { pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, - ) -> Result { + ) -> Result { use http::{uri, Version}; let mut b = Request::builder(); @@ -1396,10 +1396,7 @@ impl proto::Peer for Peer { macro_rules! malformed { ($($arg:tt)*) => {{ tracing::debug!($($arg)*); - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); }} } @@ -1416,7 +1413,7 @@ impl proto::Peer for Peer { // Specifying :status for a request is a protocol error if pseudo.status.is_some() { tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } // Convert the URI @@ -1483,10 +1480,7 @@ impl proto::Peer for Peer { // TODO: Should there be more specialized handling for different // kinds of errors proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id); - return Err(RecvError::Stream { - id: stream_id, - reason: Reason::PROTOCOL_ERROR, - }); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); } }; diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 4f81de23..b5df9ad9 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -1,7 +1,8 @@ use crate::SendFrame; use h2::frame::{self, Frame}; -use h2::{self, RecvError, SendError}; +use h2::proto::Error; +use h2::{self, SendError}; use futures::future::poll_fn; use futures::{ready, Stream, StreamExt}; @@ -284,7 +285,7 @@ impl Handle { } impl Stream for Handle { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.codec).poll_next(cx) diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index b574df5a..23ddc1f3 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -410,7 +410,11 @@ async fn send_reset_notifies_recv_stream() { }; let rx = async { let mut body = res.into_body(); - body.next().await.unwrap().expect_err("RecvBody"); + let err = body.next().await.unwrap().expect_err("RecvBody"); + assert_eq!( + err.to_string(), + "stream error sent by user: refused stream before processing any application logic" + ); }; // a FuturesUnordered is used on purpose! @@ -672,7 +676,7 @@ async fn sending_request_on_closed_connection() { }; let poll_err = poll_fn(|cx| client.poll_ready(cx)).await.unwrap_err(); - let msg = "protocol error: unspecific protocol error detected"; + let msg = "connection error detected: unspecific protocol error detected"; assert_eq!(poll_err.to_string(), msg); let request = Request::builder() diff --git a/tests/h2-tests/tests/codec_read.rs b/tests/h2-tests/tests/codec_read.rs index fe3cfea9..d955e186 100644 --- a/tests/h2-tests/tests/codec_read.rs +++ b/tests/h2-tests/tests/codec_read.rs @@ -236,7 +236,7 @@ async fn read_goaway_with_debug_data() { let data = poll_frame!(GoAway, codec); assert_eq!(data.reason(), Reason::ENHANCE_YOUR_CALM); assert_eq!(data.last_stream_id(), 1); - assert_eq!(data.debug_data(), b"too_many_pings"); + assert_eq!(&**data.debug_data(), b"too_many_pings"); assert_closed!(codec); } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 1b86cadb..be04a61b 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -217,7 +217,7 @@ async fn recv_data_overflows_connection_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "connection error detected: flow-control protocol violated" ); }; @@ -227,7 +227,7 @@ async fn recv_data_overflows_connection_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "connection error detected: flow-control protocol violated" ); }; join(conn, req).await; @@ -278,7 +278,7 @@ async fn recv_data_overflows_stream_window() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: flow-control protocol violated" + "stream error detected: flow-control protocol violated" ); }; @@ -358,7 +358,7 @@ async fn stream_error_release_connection_capacity() { .expect_err("body"); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "stream error detected: unspecific protocol error detected" ); cap.release_capacity(to_release).expect("release_capacity"); }; diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index a5a7dfe9..f52f781d 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -164,7 +164,7 @@ async fn recv_push_when_push_disabled_is_conn_error() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -174,7 +174,7 @@ async fn recv_push_when_push_disabled_is_conn_error() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -380,8 +380,16 @@ async fn recv_push_promise_skipped_stream_id() { .unwrap(); let req = async move { - let res = client.send_request(request, true).unwrap().0.await; - assert!(res.is_err()); + let err = client + .send_request(request, true) + .unwrap() + .0 + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "connection error detected: unspecific protocol error detected" + ); }; // client should see a protocol error @@ -390,7 +398,7 @@ async fn recv_push_promise_skipped_stream_id() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; @@ -435,7 +443,11 @@ async fn recv_push_promise_dup_stream_id() { let req = async move { let res = client.send_request(request, true).unwrap().0.await; - assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "connection error detected: unspecific protocol error detected" + ); }; // client should see a protocol error @@ -444,7 +456,7 @@ async fn recv_push_promise_dup_stream_id() { let err = res.unwrap_err(); assert_eq!( err.to_string(), - "protocol error: unspecific protocol error detected" + "connection error detected: unspecific protocol error detected" ); }; diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index cd2644d0..14b70f6d 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -207,13 +207,19 @@ async fn errors_if_recv_frame_exceeds_max_frame_size() { let body = resp.into_parts().1; let res = util::concat(body).await; let err = res.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: frame with invalid size"); + assert_eq!( + err.to_string(), + "connection error detected: frame with invalid size" + ); }; // client should see a conn error let conn = async move { let err = h2.await.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: frame with invalid size"); + assert_eq!( + err.to_string(), + "connection error detected: frame with invalid size" + ); }; join(conn, req).await; }; @@ -321,7 +327,10 @@ async fn recv_goaway_finishes_processed_streams() { // this request will trigger a goaway let req2 = async move { let err = client.get("https://example.com/").await.unwrap_err(); - assert_eq!(err.to_string(), "protocol error: not a result of an error"); + assert_eq!( + err.to_string(), + "go away from remote: not a result of an error" + ); }; join3(async move { h2.await.expect("client") }, req1, req2).await;