Skip to content

Commit

Permalink
net/http: tighten protocol between Transport.roundTrip and persistCon…
Browse files Browse the repository at this point in the history
…n.readLoop

In debugging the flaky test in #13825, I discovered that my previous
change to tighten and simplify the communication protocol between
Transport.roundTrip and persistConn.readLoop in
https://golang.org/cl/17890 wasn't complete.

This change simplifies it further: the buffered-vs-unbuffered
complexity goes away, and we no longer need to re-try channel reads in
the select case. It was trying to prioritize channels in the case that
two were readable in the select. (it was only failing in the race builder
because the race builds randomize select scheduling)

The problem was that in the bodyless response case we had to return
the idle connection before replying to roundTrip. But putIdleConn
previously both added it to the free list (which we wanted), but also
closed the connection, which made the caller goroutine
(Transport.roundTrip) have two readable cases: pc.closech, and the
response. We guarded against similar conditions in the caller's select
for two readable channels, but such a fix wasn't possible here, and would
be overly complicated.

Instead, switch to unbuffered channels. The unbuffered channels were only
to prevent goroutine leaks, so address that differently: add a "callerGone"
channel closed by the caller on exit, and select on that during any unbuffered
sends.

As part of the fix, split putIdleConn into two halves: a part that
just returns to the freelist, and a part that also closes. Update the
four callers to the variants each wanted.

Incidentally, the connections were closing on return to the pool due
to MaxIdleConnsPerHost (somewhat related: #13801), but this bug
could've manifested for plenty of other reasons.

Fixes #13825

Change-Id: I6fa7136e2c52909d57a22ea4b74d0155fdf0e6fa
Reviewed-on: https://go-review.googlesource.com/18282
Run-TryBot: Brad Fitzpatrick <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
Reviewed-by: Andrew Gerrand <[email protected]>
  • Loading branch information
bradfitz committed Jan 5, 2016
1 parent 59ca878 commit 7fa9846
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/net/http/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func (t *Transport) RequestIdleConnChForTesting() {

func (t *Transport) PutIdleTestConn() bool {
c, _ := net.Pipe()
return t.putIdleConn(&persistConn{
return t.tryPutIdleConn(&persistConn{
t: t,
conn: c, // dummy
closech: make(chan struct{}), // so it can be closed
cacheKey: connectMethodKey{"", "http", "example.com"},
})
}) == nil
}

// All test hooks must be non-nil so they can be called directly,
Expand Down
176 changes: 106 additions & 70 deletions src/net/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (t *Transport) CloseIdleConnections() {
t.idleMu.Unlock()
for _, conns := range m {
for _, pconn := range conns {
pconn.close()
pconn.close(errCloseIdleConns)
}
}
}
Expand Down Expand Up @@ -450,17 +450,34 @@ func (cm *connectMethod) proxyAuth() string {
return ""
}

// putIdleConn adds pconn to the list of idle persistent connections awaiting
// error values for debugging and testing, not seen by users.
var (
errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
errCloseIdleConns = errors.New("http: CloseIdleConnections called")
errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
errServerClosedIdle = errors.New("http: server closed idle conn")
)

func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
if err := t.tryPutIdleConn(pconn); err != nil {
pconn.close(err)
}
}

// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
// If pconn is no longer needed or not in a good state, putIdleConn
// returns false.
func (t *Transport) putIdleConn(pconn *persistConn) bool {
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
// an error explaining why it wasn't registered.
// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
pconn.close()
return false
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return false
return errConnBroken
}
key := pconn.cacheKey
max := t.MaxIdleConnsPerHost
Expand All @@ -479,7 +496,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
t.idleMu.Unlock()
return true
return nil
default:
if waitingDialer != nil {
// They had populated this, but their dial won
Expand All @@ -489,16 +506,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
if t.wantIdle {
t.idleMu.Unlock()
pconn.close()
return false
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
if len(t.idleConn[key]) >= max {
t.idleMu.Unlock()
pconn.close()
return false
return errTooManyIdle
}
for _, exist := range t.idleConn[key] {
if exist == pconn {
Expand All @@ -507,7 +522,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
t.idleConn[key] = append(t.idleConn[key], pconn)
t.idleMu.Unlock()
return true
return nil
}

// getIdleConnCh returns a channel to receive and return idle
Expand Down Expand Up @@ -626,7 +641,7 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putIdleConn(v.pc)
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
Expand Down Expand Up @@ -929,10 +944,10 @@ type persistConn struct {

lk sync.Mutex // guards following fields
numExpectedResponses int
closed bool // whether conn has been closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
closed error // set non-nil when conn is closed, before closech is closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
Expand Down Expand Up @@ -966,11 +981,20 @@ func (pc *persistConn) cancelRequest() {
pc.lk.Lock()
defer pc.lk.Unlock()
pc.canceled = true
pc.closeLocked()
pc.closeLocked(errRequestCanceled)
}

func (pc *persistConn) readLoop() {
defer pc.close()
closeErr := errReadLoopExiting // default value, if not changed below
defer func() { pc.close(closeErr) }()

tryPutIdleConn := func() bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
return false
}
return true
}

// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
Expand Down Expand Up @@ -1016,7 +1040,11 @@ func (pc *persistConn) readLoop() {
if checkTransportResend(err, rc.req, pc) != nil {
pc.t.setReqCanceler(rc.req, nil)
}
rc.ch <- responseAndError{err: err}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}

Expand All @@ -1035,8 +1063,6 @@ func (pc *persistConn) readLoop() {

if !hasBody {
pc.t.setReqCanceler(rc.req, nil)
resc := make(chan *Response) // unbuffered matters; see below
rc.ch <- responseAndError{ch: resc} // buffered send

// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
Expand All @@ -1047,9 +1073,13 @@ func (pc *persistConn) readLoop() {
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
tryPutIdleConn()

resc <- resp // unbuffered send
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}

// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
Expand Down Expand Up @@ -1079,7 +1109,11 @@ func (pc *persistConn) readLoop() {
return err
}

rc.ch <- responseAndError{r: resp}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}

// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
Expand All @@ -1091,7 +1125,7 @@ func (pc *persistConn) readLoop() {
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
tryPutIdleConn()
if bodyEOF {
eofc <- struct{}{}
}
Expand All @@ -1116,14 +1150,19 @@ func maybeUngzipResponse(resp *Response) {
}

func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
if pc.closed {
if pc.closed != nil {
return
}
if n := pc.br.Buffered(); n > 0 {
buf, _ := pc.br.Peek(n)
log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
}
pc.closeLocked()
if peekErr == io.EOF {
// common case.
pc.closeLocked(errServerClosedIdle)
} else {
pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
}
}

// readResponse reads an HTTP response (or two, in the case of "Expect:
Expand Down Expand Up @@ -1227,25 +1266,13 @@ func (pc *persistConn) wroteRequest() bool {
// responseAndError is how the goroutine reading from an HTTP/1 server
// communicates with the goroutine doing the RoundTrip.
type responseAndError struct {
ch chan *Response // if non-nil, res should be read from here
r *Response // else use this response (see res method)
res *Response // else use this response (see res method)
err error
}

func (re responseAndError) res() *Response {
switch {
case re.err != nil:
return nil
case re.ch != nil:
return <-re.ch
default:
return re.r
}
}

type requestAndChan struct {
req *Request
ch chan responseAndError
ch chan responseAndError // unbuffered; always send in select on callerGone

// did the Transport (as opposed to the client code) add an
// Accept-Encoding gzip header? only if it we set it do
Expand All @@ -1257,6 +1284,8 @@ type requestAndChan struct {
// the server responds 100 Continue, readLoop send a value
// to writeLoop via this chan.
continueCh chan<- struct{}

callerGone <-chan struct{} // closed when roundTrip caller has returned
}

// A writeRequest is sent by the readLoop's goroutine to the
Expand All @@ -1283,7 +1312,7 @@ func (e *httpError) Timeout() bool { return e.timeout }
func (e *httpError) Temporary() bool { return true }

var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
var errClosed error = &httpError{err: "net/http: server closed connection before response was received"}
var errRequestCanceled = errors.New("net/http: request canceled")

func nop() {}
Expand All @@ -1309,7 +1338,7 @@ type beforeRespHeaderError struct {
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
pc.t.putIdleConn(pc)
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.lk.Lock()
Expand Down Expand Up @@ -1355,14 +1384,23 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
req.extraHeaders().Set("Connection", "close")
}

gone := make(chan struct{})
defer close(gone)

// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}

resc := make(chan responseAndError, 1)
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}

var re responseAndError
var respHeaderTimer <-chan time.Time
Expand All @@ -1372,22 +1410,9 @@ WaitResponse:
testHookWaitResLoop()
select {
case err := <-writeErrCh:
if isNetWriteError(err) {
// Issue 11745. If we failed to write the request
// body, it's possible the server just heard enough
// and already wrote to us. Prioritize the server's
// response over returning a body write error.
select {
case re = <-resc:
pc.close()
break WaitResponse
case <-time.After(50 * time.Millisecond):
// Fall through.
}
}
if err != nil {
re = responseAndError{err: beforeRespHeaderError{err}}
pc.close()
pc.close(fmt.Errorf("write error: %v", err))
break WaitResponse
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
Expand All @@ -1400,12 +1425,12 @@ WaitResponse:
if pc.isCanceled() {
err = errRequestCanceled
} else {
err = beforeRespHeaderError{errClosed}
err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)}
}
re = responseAndError{err: err}
break WaitResponse
case <-respHeaderTimer:
pc.close()
pc.close(errTimeout)
re = responseAndError{err: errTimeout}
break WaitResponse
case re = <-resc:
Expand All @@ -1419,7 +1444,10 @@ WaitResponse:
if re.err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
return re.res(), re.err
if (re.res == nil) == (re.err == nil) {
panic("internal error: exactly one of res or err should be set")
}
return re.res, re.err
}

// markBroken marks a connection as broken (so it's not reused).
Expand All @@ -1439,17 +1467,25 @@ func (pc *persistConn) markReused() {
pc.lk.Unlock()
}

func (pc *persistConn) close() {
// close closes the underlying TCP connection and closes
// the pc.closech channel.
//
// The provided err is only for testing and debugging; in normal
// circumstances it should never be seen by users.
func (pc *persistConn) close(err error) {
pc.lk.Lock()
defer pc.lk.Unlock()
pc.closeLocked()
pc.closeLocked(err)
}

func (pc *persistConn) closeLocked() {
func (pc *persistConn) closeLocked(err error) {
if err == nil {
panic("nil error")
}
pc.broken = true
if !pc.closed {
if pc.closed == nil {
pc.conn.Close()
pc.closed = true
pc.closed = err
close(pc.closech)
}
pc.mutateHeaderFunc = nil
Expand Down

0 comments on commit 7fa9846

Please sign in to comment.