Skip to content

Commit

Permalink
Fixed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Apr 8, 2024
1 parent ae48289 commit 5a14ee9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
35 changes: 18 additions & 17 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ type incomingGoAway struct {
func (*incomingGoAway) isTransportResponseFrame() bool { return false }

type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConnErr error // if set, loopyWriter will exit with this error
code http2.ErrCode
debugData []byte
headsUp bool
closeConn error // if set, loopyWriter will exit with this error
}

func (*goAway) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -495,21 +495,22 @@ type loopyWriter struct {
ssGoAwayHandler func(*goAway) (bool, error)
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
}
return l
}
Expand Down
15 changes: 7 additions & 8 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go t.reader(readerErrCh)
defer func() {
if err != nil {
// Close writerDone in case of error occurs
close(t.writerDone)
t.Close(err)
}
Expand Down Expand Up @@ -456,12 +457,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// Block until the server preface is received successfully or an error occurs.
if err = <-readerErrCh; err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
Expand Down Expand Up @@ -519,13 +520,13 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}

// OutgoingGoAwayHandler writes GOAWAY to the connection. Always returns (false, err) as we want the GoAway
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConnErr
return false, g.closeConn
}

func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
Expand Down Expand Up @@ -1004,10 +1005,8 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(fmt.Sprintf("client shutdown with: %v", err)), closeConnErr: err})
if t.writerDone != nil {
<-t.writerDone
}
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(fmt.Sprintf("client shutdown with: %v", err)), closeConn: err})
<-t.writerDone
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelz.ID)
Expand Down
13 changes: 6 additions & 7 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
Expand Down Expand Up @@ -673,9 +672,9 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
// Any error processing client headers, e.g. invalid stream ID,
// is considered a protocol violation.
t.controlBuf.put(&goAway{
code: http2.ErrCodeProtocol,
debugData: []byte(err.Error()),
closeConnErr: err,
code: http2.ErrCodeProtocol,
debugData: []byte(err.Error()),
closeConn: err,
})
continue
}
Expand Down Expand Up @@ -920,7 +919,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConnErr: errors.New("got too many pings from the client")})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}

Expand Down Expand Up @@ -1350,7 +1349,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
retErr := g.closeConnErr
retErr := g.closeConn
if len(t.activeStreams) == 0 {
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
Expand Down
17 changes: 10 additions & 7 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,10 +2677,8 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
defer lis.Close()
// greetDone is used to notify when server is done greeting the client.
greetDone := make(chan struct{})
// successCh verifies that GOAWAY is received at server side
successCh := make(chan struct{})
// errorCh verifies that desired GOAWAY not received by server
errorCh := make(chan struct{})
errorCh := make(chan error)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Launch the server.
Expand Down Expand Up @@ -2723,14 +2721,16 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
goAwayFrame := fr
if goAwayFrame.ErrCode == http2.ErrCodeNo {
t.Logf("Received goAway frame from client")
close(successCh)
close(errorCh)
} else {
t.Logf("Received unexpected goAway frame from client")
errorCh <- errors.New("received unexpected goAway frame from client")
close(errorCh)
}
return
default:
t.Logf("The server received a frame other than GOAWAY")
errorCh <- errors.New("server received a frame other than GOAWAY")
close(errorCh)
return
}
Expand All @@ -2749,10 +2749,13 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
ct.Close(errors.New("manually closed by client"))
t.Logf("Closed the client connection")
select {
case <-successCh:
case <-errorCh:
t.Errorf("Received an unexpected frame")
case err = <-errorCh:
case <-ctx.Done():
t.Errorf("Timed out")
}
if err != nil {
t.Errorf("Error receiving the GOAWAY frame: %v", err)
} else {
t.Logf("Received a GOAWAY frame from client")
}
}

0 comments on commit 5a14ee9

Please sign in to comment.