From 7fa984674971e801c549e4e2f1715f6f39c962e5 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 5 Jan 2016 19:40:25 +0000 Subject: [PATCH] net/http: tighten protocol between Transport.roundTrip and persistConn.readLoop In debugging the flaky test in #13825, I discovered that my previous change to tighten and simplify the communication protocol between Transport.roundTrip and persistConn.readLoop in https://golang.org/cl/17890 wasn't complete. This change simplifies it further: the buffered-vs-unbuffered complexity goes away, and we no longer need to re-try channel reads in the select case. It was trying to prioritize channels in the case that two were readable in the select. (it was only failing in the race builder because the race builds randomize select scheduling) The problem was that in the bodyless response case we had to return the idle connection before replying to roundTrip. But putIdleConn previously both added it to the free list (which we wanted), but also closed the connection, which made the caller goroutine (Transport.roundTrip) have two readable cases: pc.closech, and the response. We guarded against similar conditions in the caller's select for two readable channels, but such a fix wasn't possible here, and would be overly complicated. Instead, switch to unbuffered channels. The unbuffered channels were only to prevent goroutine leaks, so address that differently: add a "callerGone" channel closed by the caller on exit, and select on that during any unbuffered sends. As part of the fix, split putIdleConn into two halves: a part that just returns to the freelist, and a part that also closes. Update the four callers to the variants each wanted. Incidentally, the connections were closing on return to the pool due to MaxIdleConnsPerHost (somewhat related: #13801), but this bug could've manifested for plenty of other reasons. Fixes #13825 Change-Id: I6fa7136e2c52909d57a22ea4b74d0155fdf0e6fa Reviewed-on: https://go-review.googlesource.com/18282 Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot Reviewed-by: Andrew Gerrand --- src/net/http/export_test.go | 4 +- src/net/http/transport.go | 176 ++++++++++++++++++++++-------------- 2 files changed, 108 insertions(+), 72 deletions(-) diff --git a/src/net/http/export_test.go b/src/net/http/export_test.go index f3939db27d09e..514d02b2a3a4f 100644 --- a/src/net/http/export_test.go +++ b/src/net/http/export_test.go @@ -121,12 +121,12 @@ func (t *Transport) RequestIdleConnChForTesting() { func (t *Transport) PutIdleTestConn() bool { c, _ := net.Pipe() - return t.putIdleConn(&persistConn{ + return t.tryPutIdleConn(&persistConn{ t: t, conn: c, // dummy closech: make(chan struct{}), // so it can be closed cacheKey: connectMethodKey{"", "http", "example.com"}, - }) + }) == nil } // All test hooks must be non-nil so they can be called directly, diff --git a/src/net/http/transport.go b/src/net/http/transport.go index e610f9706be75..67b291504115b 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -365,7 +365,7 @@ func (t *Transport) CloseIdleConnections() { t.idleMu.Unlock() for _, conns := range m { for _, pconn := range conns { - pconn.close() + pconn.close(errCloseIdleConns) } } } @@ -450,17 +450,34 @@ func (cm *connectMethod) proxyAuth() string { return "" } -// putIdleConn adds pconn to the list of idle persistent connections awaiting +// error values for debugging and testing, not seen by users. +var ( + errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") + errConnBroken = errors.New("http: putIdleConn: connection is in bad state") + errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") + errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") + errCloseIdleConns = errors.New("http: CloseIdleConnections called") + errReadLoopExiting = errors.New("http: persistConn.readLoop exiting") + errServerClosedIdle = errors.New("http: server closed idle conn") +) + +func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { + if err := t.tryPutIdleConn(pconn); err != nil { + pconn.close(err) + } +} + +// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting // a new request. -// If pconn is no longer needed or not in a good state, putIdleConn -// returns false. -func (t *Transport) putIdleConn(pconn *persistConn) bool { +// If pconn is no longer needed or not in a good state, tryPutIdleConn returns +// an error explaining why it wasn't registered. +// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that. +func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { - pconn.close() - return false + return errKeepAlivesDisabled } if pconn.isBroken() { - return false + return errConnBroken } key := pconn.cacheKey max := t.MaxIdleConnsPerHost @@ -479,7 +496,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ t.idleMu.Unlock() - return true + return nil default: if waitingDialer != nil { // They had populated this, but their dial won @@ -489,16 +506,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } if t.wantIdle { t.idleMu.Unlock() - pconn.close() - return false + return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } if len(t.idleConn[key]) >= max { t.idleMu.Unlock() - pconn.close() - return false + return errTooManyIdle } for _, exist := range t.idleConn[key] { if exist == pconn { @@ -507,7 +522,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } t.idleConn[key] = append(t.idleConn[key], pconn) t.idleMu.Unlock() - return true + return nil } // getIdleConnCh returns a channel to receive and return idle @@ -626,7 +641,7 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { - t.putIdleConn(v.pc) + t.putOrCloseIdleConn(v.pc) } testHookPostPendingDial() }() @@ -929,10 +944,10 @@ type persistConn struct { lk sync.Mutex // guards following fields numExpectedResponses int - closed bool // whether conn has been closed - broken bool // an error has happened on this connection; marked broken so it's not reused. - canceled bool // whether this conn was broken due a CancelRequest - reused bool // whether conn has had successful request/response and is being reused. + closed error // set non-nil when conn is closed, before closech is closed + broken bool // an error has happened on this connection; marked broken so it's not reused. + canceled bool // whether this conn was broken due a CancelRequest + reused bool // whether conn has had successful request/response and is being reused. // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) @@ -966,11 +981,20 @@ func (pc *persistConn) cancelRequest() { pc.lk.Lock() defer pc.lk.Unlock() pc.canceled = true - pc.closeLocked() + pc.closeLocked(errRequestCanceled) } func (pc *persistConn) readLoop() { - defer pc.close() + closeErr := errReadLoopExiting // default value, if not changed below + defer func() { pc.close(closeErr) }() + + tryPutIdleConn := func() bool { + if err := pc.t.tryPutIdleConn(pc); err != nil { + closeErr = err + return false + } + return true + } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection @@ -1016,7 +1040,11 @@ func (pc *persistConn) readLoop() { if checkTransportResend(err, rc.req, pc) != nil { pc.t.setReqCanceler(rc.req, nil) } - rc.ch <- responseAndError{err: err} + select { + case rc.ch <- responseAndError{err: err}: + case <-rc.callerGone: + return + } return } @@ -1035,8 +1063,6 @@ func (pc *persistConn) readLoop() { if !hasBody { pc.t.setReqCanceler(rc.req, nil) - resc := make(chan *Response) // unbuffered matters; see below - rc.ch <- responseAndError{ch: resc} // buffered send // Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll @@ -1047,9 +1073,13 @@ func (pc *persistConn) readLoop() { alive = alive && !pc.sawEOF && pc.wroteRequest() && - pc.t.putIdleConn(pc) + tryPutIdleConn() - resc <- resp // unbuffered send + select { + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return + } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so @@ -1079,7 +1109,11 @@ func (pc *persistConn) readLoop() { return err } - rc.ch <- responseAndError{r: resp} + select { + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return + } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish @@ -1091,7 +1125,7 @@ func (pc *persistConn) readLoop() { bodyEOF && !pc.sawEOF && pc.wroteRequest() && - pc.t.putIdleConn(pc) + tryPutIdleConn() if bodyEOF { eofc <- struct{}{} } @@ -1116,14 +1150,19 @@ func maybeUngzipResponse(resp *Response) { } func (pc *persistConn) readLoopPeekFailLocked(peekErr error) { - if pc.closed { + if pc.closed != nil { return } if n := pc.br.Buffered(); n > 0 { buf, _ := pc.br.Peek(n) log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr) } - pc.closeLocked() + if peekErr == io.EOF { + // common case. + pc.closeLocked(errServerClosedIdle) + } else { + pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr)) + } } // readResponse reads an HTTP response (or two, in the case of "Expect: @@ -1227,25 +1266,13 @@ func (pc *persistConn) wroteRequest() bool { // responseAndError is how the goroutine reading from an HTTP/1 server // communicates with the goroutine doing the RoundTrip. type responseAndError struct { - ch chan *Response // if non-nil, res should be read from here - r *Response // else use this response (see res method) + res *Response // else use this response (see res method) err error } -func (re responseAndError) res() *Response { - switch { - case re.err != nil: - return nil - case re.ch != nil: - return <-re.ch - default: - return re.r - } -} - type requestAndChan struct { req *Request - ch chan responseAndError + ch chan responseAndError // unbuffered; always send in select on callerGone // did the Transport (as opposed to the client code) add an // Accept-Encoding gzip header? only if it we set it do @@ -1257,6 +1284,8 @@ type requestAndChan struct { // the server responds 100 Continue, readLoop send a value // to writeLoop via this chan. continueCh chan<- struct{} + + callerGone <-chan struct{} // closed when roundTrip caller has returned } // A writeRequest is sent by the readLoop's goroutine to the @@ -1283,7 +1312,7 @@ func (e *httpError) Timeout() bool { return e.timeout } func (e *httpError) Temporary() bool { return true } var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} -var errClosed error = &httpError{err: "net/http: transport closed before response was received"} +var errClosed error = &httpError{err: "net/http: server closed connection before response was received"} var errRequestCanceled = errors.New("net/http: request canceled") func nop() {} @@ -1309,7 +1338,7 @@ type beforeRespHeaderError struct { func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { - pc.t.putIdleConn(pc) + pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.lk.Lock() @@ -1355,14 +1384,23 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err req.extraHeaders().Set("Connection", "close") } + gone := make(chan struct{}) + defer close(gone) + // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} - resc := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, resc, requestedGzip, continueCh} + resc := make(chan responseAndError) + pc.reqch <- requestAndChan{ + req: req.Request, + ch: resc, + addedGzip: requestedGzip, + continueCh: continueCh, + callerGone: gone, + } var re responseAndError var respHeaderTimer <-chan time.Time @@ -1372,22 +1410,9 @@ WaitResponse: testHookWaitResLoop() select { case err := <-writeErrCh: - if isNetWriteError(err) { - // Issue 11745. If we failed to write the request - // body, it's possible the server just heard enough - // and already wrote to us. Prioritize the server's - // response over returning a body write error. - select { - case re = <-resc: - pc.close() - break WaitResponse - case <-time.After(50 * time.Millisecond): - // Fall through. - } - } if err != nil { re = responseAndError{err: beforeRespHeaderError{err}} - pc.close() + pc.close(fmt.Errorf("write error: %v", err)) break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { @@ -1400,12 +1425,12 @@ WaitResponse: if pc.isCanceled() { err = errRequestCanceled } else { - err = beforeRespHeaderError{errClosed} + err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)} } re = responseAndError{err: err} break WaitResponse case <-respHeaderTimer: - pc.close() + pc.close(errTimeout) re = responseAndError{err: errTimeout} break WaitResponse case re = <-resc: @@ -1419,7 +1444,10 @@ WaitResponse: if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } - return re.res(), re.err + if (re.res == nil) == (re.err == nil) { + panic("internal error: exactly one of res or err should be set") + } + return re.res, re.err } // markBroken marks a connection as broken (so it's not reused). @@ -1439,17 +1467,25 @@ func (pc *persistConn) markReused() { pc.lk.Unlock() } -func (pc *persistConn) close() { +// close closes the underlying TCP connection and closes +// the pc.closech channel. +// +// The provided err is only for testing and debugging; in normal +// circumstances it should never be seen by users. +func (pc *persistConn) close(err error) { pc.lk.Lock() defer pc.lk.Unlock() - pc.closeLocked() + pc.closeLocked(err) } -func (pc *persistConn) closeLocked() { +func (pc *persistConn) closeLocked(err error) { + if err == nil { + panic("nil error") + } pc.broken = true - if !pc.closed { + if pc.closed == nil { pc.conn.Close() - pc.closed = true + pc.closed = err close(pc.closech) } pc.mutateHeaderFunc = nil