From c25db9b3f47267105b984cbed2bc86c7a6a4f0bc Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 7 Mar 2024 11:21:05 +0530 Subject: [PATCH] Added end2end test --- internal/transport/controlbuf.go | 6 -- internal/transport/http2_client.go | 37 ++----------- internal/transport/transport_test.go | 82 ---------------------------- test/goaway_test.go | 47 ++++++++++++++++ 4 files changed, 51 insertions(+), 121 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index cc55dae96a14..83c3829826ae 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -453,12 +453,6 @@ func (c *controlBuffer) finish() { c.mu.Unlock() } -func (c *controlBuffer) setError(err error) { - c.mu.Lock() - c.err = err - c.mu.Unlock() -} - type side int const ( diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 5cf837aac5c9..224feaf76a30 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -117,10 +117,6 @@ type http2Client struct { nextID uint32 registeredCompressors string - // goAwaySent is initialised with http2Client and fired when client - // transport is shutdown - goAwaySent *grpcsync.Event - // Do not access controlBuf with mu held. mu sync.Mutex // guard the following variables state transportState @@ -331,7 +327,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts registeredCompressors: grpcutil.RegisteredCompressors(), address: addr, conn: conn, - goAwaySent: grpcsync.NewEvent(), remoteAddr: conn.RemoteAddr(), localAddr: conn.LocalAddr(), authInfo: authInfo, @@ -456,12 +451,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } go func() { t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger) - t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler if err := t.loopy.run(); !isIOError(err) { // Immediately close the connection, as the loopy writer returns // when there are no more active streams and we were draining (the // server sent a GOAWAY). For I/O errors, the reader will hit it // after draining any remaining incoming data. + err = t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte("GOAWAY from client")) + if err != nil { + t.logger.Infof("Failed writing goaway %v", err) + } t.conn.Close() } close(t.writerDone) @@ -514,20 +512,6 @@ func (t *http2Client) getPeer() *peer.Peer { } } -// Handles outgoing GoAway and returns true if loopy needs to put itself -// in draining mode. -func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) { - // Send out a GOAWAY frame so server is aware of client transport shutdown - if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil { - return false, err - } - t.goAwaySent.Fire() - if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { - return false, err - } - return false, nil -} - func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { aud := t.createAudience(callHdr) ri := credentials.RequestInfo{ @@ -1003,19 +987,6 @@ func (t *http2Client) Close(err error) { } t.mu.Unlock() t.controlBuf.finish() - t.controlBuf.setError(nil) - err1 := t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("GOAWAY from client"), headsUp: true}) - if err1 != nil { - t.logger.Infof("Failed to put goaway to server: %v\n", err1) - } - timer := time.NewTimer(2 * time.Second) - defer timer.Stop() - select { - case <-t.goAwaySent.Done(): - case <-timer.C: - case <-t.ctx.Done(): - } - t.controlBuf.setError(ErrConnClosing) t.cancel() t.conn.Close() channelz.RemoveEntry(t.channelzID) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 768a91e0dc56..ff27678294f1 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -1188,88 +1188,6 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { } -// Test that a transport level client shutdown successfully sends a GOAWAY frame -// to underlying connection. -func (s) TestClientSendsAGoAwayFrame(t *testing.T) { - // Create a server. - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening: %v", err) - } - defer lis.Close() - // The success channel verifies that the server's reader goroutine received - // a GOAWAY frame from the client. - success := testutils.NewChannel() - // Launch the server. - go func() { - sconn, err := lis.Accept() - if err != nil { - t.Errorf("Error while accepting: %v", err) - } - defer sconn.Close() - if _, err := io.ReadFull(sconn, make([]byte, len(clientPreface))); err != nil { - t.Errorf("Error while writing settings ack: %v", err) - return - } - sfr := http2.NewFramer(sconn, sconn) - if err := sfr.WriteSettings(); err != nil { - t.Errorf("Error while writing settings %v", err) - return - } - if err := sfr.WriteSettingsAck(); err != nil { - t.Errorf("Error while writing settings ack %v", err) - return - } - - // Read frames off the wire. Should expect to see a GOAWAY frame after - // the client closes. - for { - frame, err := sfr.ReadFrame() - if err != nil { - return - } - switch frame.(type) { - case *http2.SettingsFrame: - // Do nothing. A settings frame is expected from client preface. - case *http2.GoAwayFrame: - // Records that the server successfully received a GOAWAY frame. - success.Send(nil) - return - default: - // The client should have sent nothing but settings and GOAWAY frame. - success.Send(errors.New("the server received a frame other than settings or GOAWAY")) - return - } - } - }() - connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) - defer cancel() - - copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)} - ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) - if err != nil { - t.Fatalf("Error while creating client transport: %v", err) - } - callHdr := &CallHdr{ - Host: "localhost", - Method: "foo.Small", - } - s1, err1 := ct.NewStream(connectCtx, callHdr) - if err1 != nil { - t.Fatalf("failed to open stream: %v", err1) - } - if s1.id != 1 { - t.Fatalf("wrong stream id: %d", s1.id) - } - ct.Close(errors.New("manually closed by client")) - t.Logf("Closed the client connection") - if e, err := success.Receive(connectCtx); e != nil || err != nil { - t.Fatalf("Error in frame received: %v. Error receiving from channel: %v", e, err) - } else { - t.Logf("Server received the GOAWAY from client") - } -} - func (s) TestServerWithMisbehavedClient(t *testing.T) { server := setUpServerOnly(t, 0, &ServerConfig{}, suspended) defer server.stop() diff --git a/test/goaway_test.go b/test/goaway_test.go index 2a8ff0bfcc04..f18f5196862d 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -761,3 +761,50 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { t.Fatalf("Error waiting for graceful shutdown of the server: %v", err) } } + +func (s) TestClientSendsAGoAwayFrame(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + s := grpc.NewServer() + defer s.Stop() + go s.Serve(lis) + + conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + + st := newServerTesterFromConn(t, conn) + st.greet() + goAwayFrameReceived := testutils.NewChannel() + go func() { + for { + f, err := st.readFrame() + if err != nil { + return + } + switch f.(type) { + case *http2.GoAwayFrame: + t.Logf("Received GoAway Frame") + goAwayFrameReceived.Send(nil) + default: + t.Logf("server tester received unexpected frame type %T", f) + } + } + }() + go func() { + s.GracefulStop() + }() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := goAwayFrameReceived.Receive(ctx); err != nil { + t.Fatalf("Error sending GOAWAY from client: %v", err) + } + // Write Ping + st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9}) + // Close the conn to finish up the Shutdown process. + conn.Close() +}