Skip to content

Commit

Permalink
http2: block RoundTrip when the Transport hits MaxConcurrentStreams
Browse files Browse the repository at this point in the history
Currently if the http2.Transport hits SettingsMaxConcurrentStreams for a
server, it just makes a new TCP connection and creates the stream on the
new connection. This CL updates that behavior to instead block RoundTrip
until a new stream is available.

I also fixed a second bug, which was necessary to make some tests pass:
Previously, a stream was removed from cc.streams only if either (a) we
received END_STREAM from the server, or (b) we received RST_STREAM from
the server. This CL removes a stream from cc.streams if the request was
cancelled (via ctx.Close, req.Cancel, or resp.Body.Close) before
receiving END_STREAM or RST_STREAM from the server.

Updates golang/go#13774
Updates golang/go#20985
Updates golang/go#21229

Change-Id: I660ffd724c4c513e0f1cc587b404bedb8aff80be
Reviewed-on: https://go-review.googlesource.com/53250
Run-TryBot: Tom Bergan <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
Reviewed-by: Brad Fitzpatrick <[email protected]>
  • Loading branch information
tombergan committed Aug 9, 2017
1 parent 167db31 commit 1c05540
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 26 deletions.
102 changes: 78 additions & 24 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type ClientConn struct {
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
Expand Down Expand Up @@ -217,35 +218,45 @@ type clientStream struct {
resTrailer *http.Header // client's Response.Trailer
}

// awaitRequestCancel runs in its own goroutine and waits for the user
// to cancel a RoundTrip request, its context to expire, or for the
// request to be done (any way it might be removed from the cc.streams
// map: peer reset, successful completion, TCP connection breakage,
// etc)
func (cs *clientStream) awaitRequestCancel(req *http.Request) {
// awaitRequestCancel waits for the user to cancel a request or for the done
// channel to be signaled. A non-nil error is returned only if the request was
// canceled.
func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
ctx := reqContext(req)
if req.Cancel == nil && ctx.Done() == nil {
return
return nil
}
select {
case <-req.Cancel:
cs.cancelStream()
cs.bufPipe.CloseWithError(errRequestCanceled)
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}

// awaitRequestCancel waits for the user to cancel a request, its context to
// expire, or for the request to be done (any way it might be removed from the
// cc.streams map: peer reset, successful completion, TCP connection breakage,
// etc). If the request is canceled, then cs will be canceled and closed.
func (cs *clientStream) awaitRequestCancel(req *http.Request) {
if err := awaitRequestCancel(req, cs.done); err != nil {
cs.cancelStream()
cs.bufPipe.CloseWithError(ctx.Err())
case <-cs.done:
cs.bufPipe.CloseWithError(err)
}
}

func (cs *clientStream) cancelStream() {
cs.cc.mu.Lock()
cc := cs.cc
cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
cs.cc.mu.Unlock()
cc.mu.Unlock()

if !didReset {
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
cc.forgetStreamID(cs.ID)
}
}

Expand Down Expand Up @@ -594,6 +605,8 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
}
}

// CanTakeNewRequest reports whether the connection can take a new request,
// meaning it has not been closed or received or sent a GOAWAY.
func (cc *ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
Expand All @@ -605,8 +618,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
return false
}
return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
cc.nextStreamID < math.MaxInt32
int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
}

// onIdleTimeout is called from a time.AfterFunc goroutine. It will
Expand Down Expand Up @@ -752,10 +764,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
hasTrailers := trailers != ""

cc.mu.Lock()
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
return nil, errClientConnUnusable
return nil, err
}

body := req.Body
Expand Down Expand Up @@ -869,31 +880,31 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, errTimeout
case <-ctx.Done():
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, ctx.Err()
case <-req.Cancel:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
cc.forgetStreamID(cs.ID)
return nil, errRequestCanceled
case <-cs.peerReset:
// processResetStream already removed the
Expand All @@ -920,6 +931,45 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
}
}

// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
// Must hold cc.mu.
func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
var waitingForConn chan struct{}
var waitingForConnErr error // guarded by cc.mu
for {
cc.lastActive = time.Now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
if waitingForConn != nil {
close(waitingForConn)
}
return nil
}
// Unfortunately, we cannot wait on a condition variable and channel at
// the same time, so instead, we spin up a goroutine to check if the
// request is canceled while we wait for a slot to open in the connection.
if waitingForConn == nil {
waitingForConn = make(chan struct{})
go func() {
if err := awaitRequestCancel(req, waitingForConn); err != nil {
cc.mu.Lock()
waitingForConnErr = err
cc.cond.Broadcast()
cc.mu.Unlock()
}
}()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--
if waitingForConnErr != nil {
return waitingForConnErr
}
}
}

// requires cc.wmu be held
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
first := true // first frame written (HEADERS is first, then CONTINUATION)
Expand Down Expand Up @@ -1279,7 +1329,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
// Wake up checkResetOrDone via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
cc.cond.Broadcast()
}
return cs
}
Expand Down Expand Up @@ -1378,8 +1430,9 @@ func (rl *clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
Expand Down Expand Up @@ -1701,6 +1754,7 @@ func (b transportResponseBody) Close() error {
}

cs.bufPipe.BreakWithError(errClosedResponseBody)
cc.forgetStreamID(cs.ID)
return nil
}

Expand Down
Loading

0 comments on commit 1c05540

Please sign in to comment.