Skip to content

Commit

Permalink
bug: cancelling a stream blocks when http2 is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverpool committed Dec 4, 2024
1 parent 7dc3e6d commit 7bba305
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
81 changes: 81 additions & 0 deletions connect_ext_close_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package connect_test

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

connect "connectrpc.com/connect"
"connectrpc.com/connect/internal/assert"
pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

func TestClientStream_CancelContext(t *testing.T) {
t.Run("HTTP2 disabled", func(t *testing.T) {
testClientStream_CancelContext(t, false)
})
t.Run("HTTP2 enabled", func(t *testing.T) {
testClientStream_CancelContext(t, true)
})
}

func testClientStream_CancelContext(t *testing.T, enableHTTP2 bool) {
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{
delayCountUp: 3 * time.Second,
}))

s := httptest.NewUnstartedServer(mux)
s.EnableHTTP2 = enableHTTP2
s.StartTLS()

client := pingv1connect.NewPingServiceClient(
s.Client(),
s.URL,
)

ctx, cancel := context.WithCancel(context.Background())
stream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{
Number: 100,
}))

assert.Nil(t, err)

msg := make(chan int64)
go func() {
for stream.Receive() {
select {
case msg <- stream.Msg().Number:
default:
}
}
close(msg)
}()

assert.Equal(t, <-msg, 1)

closed := make(chan struct{})
go func() {
t.Log("will close stream")
t.Log("stream closed:", stream.Close())
close(closed)
}()

time.Sleep(10 * time.Millisecond) // delay to ensure that stream.Close has already been called
cancel()

select {
case <-closed:
case <-time.After(time.Second):
t.Error("stream was not closed within 1s")
}
select {
case _, ok := <-msg:
assert.False(t, ok)
case <-time.After(time.Second):
t.Error("stream was not done receiving within 1s")
}
}
2 changes: 2 additions & 0 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2813,6 +2813,7 @@ type pingServer struct {

checkMetadata bool
includeErrorDetails bool
delayCountUp time.Duration
}

func (p pingServer) Ping(ctx context.Context, request *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) {
Expand Down Expand Up @@ -2913,6 +2914,7 @@ func (p pingServer) CountUp(
if err := stream.Send(&pingv1.CountUpResponse{Number: i}); err != nil {
return err
}
time.Sleep(p.delayCountUp)
}
return nil
}
Expand Down

0 comments on commit 7bba305

Please sign in to comment.