Skip to content

Commit

Permalink
Make stream client closers non-blocking (#791)
Browse files Browse the repository at this point in the history
This updates the behavior of the streaming client methods
`BidiStreamForClient.CloseResponse` and `ServerStreamForClient.Close` to
be non-blocking, aligning it with the standard behavior of `net/http`'s
`Request.Body` closure.

Previously, the implementation used a graceful, blocking closure that
fully read from the stream before closing. This allows for reuse of the
underlying TCP connection. However, this behavior could lead to
unexpected client hangs, as users may not anticipate blocking on close.

To address this, the closers no longer drain the stream. Documentation
has been updated to clarify the behavior and provide users a workaround
to keep the optimization by calling receive until the stream is drained.
This avoids unexpected blocking behavior in client applications.

Fixes #789

Signed-off-by: Edward McFarlane <[email protected]>
  • Loading branch information
emcfarlane authored Dec 5, 2024
1 parent 7dc3e6d commit d55ebd8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
9 changes: 9 additions & 0 deletions client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header {
}

// Close the receive side of the stream.
//
// Close is non-blocking. To gracefully close the stream and allow for
// connection resuse ensure all messages have been received before calling
// Close. All messages are received when Receive returns false.
func (s *ServerStreamForClient[Res]) Close() error {
if s.constructErr != nil {
return s.constructErr
Expand Down Expand Up @@ -251,6 +255,11 @@ func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error) {
}

// CloseResponse closes the receive side of the stream.
//
// CloseResponse is non-blocking. To gracefully close the stream and allow for
// connection resuse ensure all messages have been received before calling
// CloseResponse. All messages are received when Receive returns an error
// wrapping [io.EOF].
func (b *BidiStreamForClient[Req, Res]) CloseResponse() error {
if b.err != nil {
return b.err
Expand Down
8 changes: 1 addition & 7 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,7 @@ func (d *duplexHTTPCall) CloseRead() error {
if d.response == nil {
return nil
}
_, err := discard(d.response.Body)
closeErr := d.response.Body.Close()
if err == nil ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
err = closeErr
}
err := d.response.Body.Close()
err = wrapIfContextDone(d.ctx, err)
return wrapIfRSTError(err)
}
Expand Down

0 comments on commit d55ebd8

Please sign in to comment.