Skip to content

Commit

Permalink
fix(codec): Cancelled client streaming handling (#1315)
Browse files Browse the repository at this point in the history
This PR fixes how client side streaming is handled on the server side
and improves overall source error matching.

Fixes:

- Correctly, detect h2 codes when its wrapped in a hyper error.
- Cancelled requests from the client side during client streaming
  requests correctly return EOF (`None` from `Streaming::message()`)

Closes #848
  • Loading branch information
LucioFranco authored Mar 14, 2023
1 parent dcb0026 commit c8027a1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
6 changes: 5 additions & 1 deletion tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ enum State {
Error,
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum Direction {
Request,
Response(StatusCode),
Expand Down Expand Up @@ -232,6 +232,10 @@ impl StreamingInner {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
if self.direction == Direction::Request && e.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
Expand Down
25 changes: 19 additions & 6 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,17 @@ impl Status {
// FIXME: bubble this into `transport` and expose generic http2 reasons.
#[cfg(feature = "transport")]
fn from_h2_error(err: Box<h2::Error>) -> Status {
let code = Self::code_from_h2(&err);

let mut status = Self::new(code, format!("h2 protocol error: {}", err));
status.source = Some(Arc::new(*err));
status
}

#[cfg(feature = "transport")]
fn code_from_h2(err: &h2::Error) -> Code {
// See https://github.com/grpc/grpc/blob/3977c30/doc/PROTOCOL-HTTP2.md#errors
let code = match err.reason() {
match err.reason() {
Some(h2::Reason::NO_ERROR)
| Some(h2::Reason::PROTOCOL_ERROR)
| Some(h2::Reason::INTERNAL_ERROR)
Expand All @@ -376,11 +385,7 @@ impl Status {
Some(h2::Reason::INADEQUATE_SECURITY) => Code::PermissionDenied,

_ => Code::Unknown,
};

let mut status = Self::new(code, format!("h2 protocol error: {}", err));
status.source = Some(Arc::new(*err));
status
}
}

#[cfg(feature = "transport")]
Expand Down Expand Up @@ -416,6 +421,14 @@ impl Status {
if err.is_timeout() || err.is_connect() {
return Some(Status::unavailable(err.to_string()));
}

if let Some(h2_err) = err.source().and_then(|e| e.downcast_ref::<h2::Error>()) {
let code = Status::code_from_h2(&h2_err);
let status = Self::new(code, format!("h2 protocol error: {}", err));

return Some(status);
}

None
}

Expand Down

0 comments on commit c8027a1

Please sign in to comment.