Skip to content

Commit

Permalink
Undo refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Jul 10, 2024
1 parent 32a78bb commit 89c7299
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 40 deletions.
22 changes: 11 additions & 11 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import (
// atomically.
var clientConnectionCounter uint64

var GoAwayLoopyWriterTimeout = time.Second
var goAwayLoopyWriterTimeout = 5 * time.Second

var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))

Expand Down Expand Up @@ -982,7 +982,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
}

// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// only once on transport. Once it is called, the transport should not be
// accessed anymore.
func (t *http2Client) Close(err error) {
t.mu.Lock()
Expand All @@ -1009,24 +1009,25 @@ func (t *http2Client) Close(err error) {
}
t.mu.Unlock()
var st *status.Status
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the connection.
// See https://httpwg.org/specs/rfc7540.html#GOAWAY. It also waits for loopyWriter to
// be closed with a timer to avoid the indefinite blocking.
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
// also waits for loopyWriter to be closed with a timer to avoid the
// long blocking in case the connection is half closed.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
timer := time.NewTimer(GoAwayLoopyWriterTimeout)
select {
case <-t.writerDone:
// Append info about previous goaway's if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
// Append info about previous goaway's if there were any, since this
// may be important for understanding the root cause for this
// connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
if len(goAwayDebugMessage) > 0 {
st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
err = st.Err()
} else {
st = status.New(codes.Unavailable, err.Error())
}
case <-timer.C:
t.logger.Warningf("timeout waiting for the loopy writer to be closed.")
case <-time.After(goAwayLoopyWriterTimeout):
t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %v. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
}
t.cancel()
t.conn.Close()
Expand All @@ -1042,7 +1043,6 @@ func (t *http2Client) Close(err error) {
}
sh.HandleConn(t.ctx, connEnd)
}
t.logger.Infof("Closed the client connection")
}

// GracefulClose sets the state to draining, which prevents new streams from
Expand Down
137 changes: 108 additions & 29 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
TransportCredentials: creds,
ChannelzParent: channelzSubChannel(t),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2465,7 +2465,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
Dialer: dialer,
ChannelzParent: channelzSubChannel(t),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2656,33 +2656,10 @@ func TestConnectionError_Unwrap(t *testing.T) {
}
}

// TestClientSendsAGoAwayFrame verifies that in the event of a graceful client transport shutdown, i.e.,
// Test that in the event of a graceful client transport shutdown, i.e.,
// clientTransport.Close(), client sends a goaway to the server with the correct
// error code and debug data.
func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
errorCh := createClientServerLoggingGoAway(t)
err := <-errorCh
if err != nil {
t.Errorf("Error receiving the GOAWAY frame: %v", err)
}
}

// TestClientCloseTimeoutOnHang verifies that in the event of a graceful
// client transport shutdown, i.e., clientTransport.Close(), if the conn hung
// for LoopyWriterTimeout, client should still be close itself and should
// not wait for long.
func (s) TestClientCloseTimeoutOnHang(t *testing.T) {
origGoAwayLoopyTimeout := GoAwayLoopyWriterTimeout
GoAwayLoopyWriterTimeout = 0
defer func() {
GoAwayLoopyWriterTimeout = origGoAwayLoopyTimeout
}()
createClientServerLoggingGoAway(t)
}

// createClientServerLoggingGoAway sets up a server(that expects a GOAWAY frame
// from the client.), and creates a ClientTransport .
func createClientServerLoggingGoAway(t *testing.T) chan error {
// Create a server.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
Expand All @@ -2697,7 +2674,6 @@ func createClientServerLoggingGoAway(t *testing.T) chan error {
defer cancel()
// Launch the server.
go func() {
defer close(errorCh)
sconn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting: %v", err)
Expand Down Expand Up @@ -2736,17 +2712,20 @@ func createClientServerLoggingGoAway(t *testing.T) chan error {
goAwayFrame := fr
if goAwayFrame.ErrCode == http2.ErrCodeNo {
t.Logf("Received goAway frame from client")
close(errorCh)
} else {
errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err)
close(errorCh)
}
return
default:
errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err)
close(errorCh)
return
}
}()

ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
Expand All @@ -2757,10 +2736,110 @@ func createClientServerLoggingGoAway(t *testing.T) chan error {
// Wait until server receives the headers and settings frame as part of greet.
<-greetDone
ct.Close(errors.New("manually closed by client"))
t.Logf("Closed the client connection")
select {
case err := <-errorCh:
if err != nil {
t.Errorf("Error receiving the GOAWAY frame: %v", err)
}
case <-ctx.Done():
t.Errorf("Context timed out")
}
}

// Test that in the event of a graceful client transport shutdown
// , i.e., clientTransport.Close(), if the conn hung for
// LoopyWriterTimeout, client should still be close itself and should
// not wait for long.
func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) {
// Override goAwayLoopyWriterTimeout to 0 so that we always
// time out while writing GOAWAY on client.Close(). This is
// equivalent to network hang scenario when client is
// failing to write GOAWAY frame.
origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout
goAwayLoopyWriterTimeout = 0
defer func() {
goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout
}()

// Create a server.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening: %v", err)
}
defer lis.Close()
// serverGreetingDone is used to notify when server is done greeting the client.
serverGreetingDone := make(chan struct{})
// errorCh verifies that desired GOAWAY not received by server
errorCh := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Launch the server.
go func() {
defer close(errorCh)
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting: %v", err)
}
defer conn.Close()
if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil {
t.Errorf("Error while reading client preface: %v", err)
return
}
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(); err != nil {
t.Errorf("Error while writing settings %v", err)
return
}
fr, _ := framer.ReadFrame()
if _, ok := fr.(*http2.SettingsFrame); !ok {
t.Errorf("Expected settings frame, got %T", fr)
}
fr, _ = framer.ReadFrame()
if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() {
t.Errorf("Expected settings ACK frame, got %T", fr)
}
fr, _ = framer.ReadFrame()
if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) {
t.Errorf("Expected Headers frame with END_HEADERS frame, got %T", fr)
}
close(serverGreetingDone)

frame, err := framer.ReadFrame()
if err != nil {
return
}
switch fr := frame.(type) {
case *http2.GoAwayFrame:
if fr.ErrCode == http2.ErrCodeNo {
t.Logf("Received goAway frame from client")
return
}
errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err)
default:
errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err)
return
}
}()

ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
_, err = ct.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("failed to open stream: %v", err)
}
// Wait until server receives the headers and settings frame as part of greet.
<-serverGreetingDone
// ct.Close will try to send the GOAWAY to server and will fail writing
// GOAWAY and will eventually close the loopyWriter as
// goAwayLoopyWriterTimeout (time out for writing GOAWAY) is set to zero, which is
// equivalent to network hang scenario.
ct.Close(errors.New("manually closed by client"))
select {
case <-errorCh:
case <-ctx.Done():
t.Errorf("Context timed out")
}
return errorCh
}

0 comments on commit 89c7299

Please sign in to comment.