From 1a055764a1f8092bb2db57b491cf674c44672129 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 8 Apr 2024 15:39:23 +0530 Subject: [PATCH] Fixed comments --- internal/transport/controlbuf.go | 35 ++++++++++++++-------------- internal/transport/http2_client.go | 17 +++++++------- internal/transport/http2_server.go | 13 +++++------ internal/transport/transport_test.go | 15 ++++++------ 4 files changed, 40 insertions(+), 40 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 047cacb99ae8..ce8fb90655e8 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -190,10 +190,10 @@ type incomingGoAway struct { func (*incomingGoAway) isTransportResponseFrame() bool { return false } type goAway struct { - code http2.ErrCode - debugData []byte - headsUp bool - closeConnErr error // if set, loopyWriter will exit with this error + code http2.ErrCode + debugData []byte + headsUp bool + closeConn error // if set, loopyWriter will exit with this error } func (*goAway) isTransportResponseFrame() bool { return false } @@ -495,21 +495,22 @@ type loopyWriter struct { ssGoAwayHandler func(*goAway) (bool, error) } -func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter { +func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter { var buf bytes.Buffer l := &loopyWriter{ - side: s, - cbuf: cbuf, - sendQuota: defaultWindowSize, - oiws: defaultWindowSize, - estdStreams: make(map[uint32]*outStream), - activeStreams: newOutStreamList(), - framer: fr, - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - bdpEst: bdpEst, - conn: conn, - logger: logger, + side: s, + cbuf: cbuf, + sendQuota: defaultWindowSize, + oiws: defaultWindowSize, + estdStreams: make(map[uint32]*outStream), + activeStreams: newOutStreamList(), + framer: fr, + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + bdpEst: bdpEst, + conn: conn, + logger: logger, + ssGoAwayHandler: goAwayHandler, } return l } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f05ab7f1b297..1df8808cfd35 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -409,6 +409,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts go t.reader(readerErrCh) defer func() { if err != nil { + // Close writerDone in case of error occurs close(t.writerDone) t.Close(err) } @@ -456,12 +457,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts if err := t.framer.writer.Flush(); err != nil { return nil, err } + // Block until the server preface is received successfully or an error occurs. if err = <-readerErrCh; err != nil { return nil, err } go func() { - t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger) - t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler + t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler) if err := t.loopy.run(); !isIOError(err) { // Immediately close the connection, as the loopy writer returns // when there are no more active streams and we were draining (the @@ -519,13 +520,13 @@ func (t *http2Client) getPeer() *peer.Peer { } } -// OutgoingGoAwayHandler writes GOAWAY to the connection. Always returns (false, err) as we want the GoAway +// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway // to be the last frame loopy writes to the transport. func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) { - if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil { + if err := t.framer.fr.WriteGoAway(MaxStreamID, http2.ErrCodeNo, g.debugData); err != nil { return false, err } - return false, g.closeConnErr + return false, g.closeConn } func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { @@ -1004,10 +1005,8 @@ func (t *http2Client) Close(err error) { t.mu.Unlock() // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. - t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(fmt.Sprintf("client shutdown with: %v", err)), closeConnErr: err}) - if t.writerDone != nil { - <-t.writerDone - } + t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(fmt.Sprintf("client shutdown with: %v", err)), closeConn: err}) + <-t.writerDone t.cancel() t.conn.Close() channelz.RemoveEntry(t.channelz.ID) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 19832a40a653..8a554ce408bf 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -330,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, t.handleSettings(sf) go func() { - t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger) - t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler + t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler) err := t.loopy.run() close(t.loopyWriterDone) if !isIOError(err) { @@ -673,9 +672,9 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) { // Any error processing client headers, e.g. invalid stream ID, // is considered a protocol violation. t.controlBuf.put(&goAway{ - code: http2.ErrCodeProtocol, - debugData: []byte(err.Error()), - closeConnErr: err, + code: http2.ErrCodeProtocol, + debugData: []byte(err.Error()), + closeConn: err, }) continue } @@ -920,7 +919,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConnErr: errors.New("got too many pings from the client")}) + t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")}) } } @@ -1350,7 +1349,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { // Stop accepting more streams now. t.state = draining sid := t.maxStreamID - retErr := g.closeConnErr + retErr := g.closeConn if len(t.activeStreams) == 0 { retErr = errors.New("second GOAWAY written and no active streams left to process") } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 1eb96eea8613..f367dfc23b78 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2677,10 +2677,8 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { defer lis.Close() // greetDone is used to notify when server is done greeting the client. greetDone := make(chan struct{}) - // successCh verifies that GOAWAY is received at server side - successCh := make(chan struct{}) // errorCh verifies that desired GOAWAY not received by server - errorCh := make(chan struct{}) + errorCh := make(chan error) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Launch the server. @@ -2723,14 +2721,16 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { goAwayFrame := fr if goAwayFrame.ErrCode == http2.ErrCodeNo { t.Logf("Received goAway frame from client") - close(successCh) + close(errorCh) } else { t.Logf("Received unexpected goAway frame from client") + errorCh <- errors.New("received unexpected goAway frame from client") close(errorCh) } return default: t.Logf("The server received a frame other than GOAWAY") + errorCh <- errors.New("server received a frame other than GOAWAY") close(errorCh) return } @@ -2749,10 +2749,11 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { ct.Close(errors.New("manually closed by client")) t.Logf("Closed the client connection") select { - case <-successCh: - case <-errorCh: - t.Errorf("Received an unexpected frame") + case err = <-errorCh: + t.Errorf("Error receiving the GOAWAY frame: %v", err) case <-ctx.Done(): t.Errorf("Timed out") + default: + t.Logf("Received a GOAWAY frame from client") } }