diff --git a/src/net/http/export_test.go b/src/net/http/export_test.go index f3939db27d09e..514d02b2a3a4f 100644 --- a/src/net/http/export_test.go +++ b/src/net/http/export_test.go @@ -121,12 +121,12 @@ func (t *Transport) RequestIdleConnChForTesting() { func (t *Transport) PutIdleTestConn() bool { c, _ := net.Pipe() - return t.putIdleConn(&persistConn{ + return t.tryPutIdleConn(&persistConn{ t: t, conn: c, // dummy closech: make(chan struct{}), // so it can be closed cacheKey: connectMethodKey{"", "http", "example.com"}, - }) + }) == nil } // All test hooks must be non-nil so they can be called directly, diff --git a/src/net/http/transport.go b/src/net/http/transport.go index e610f9706be75..67b291504115b 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -365,7 +365,7 @@ func (t *Transport) CloseIdleConnections() { t.idleMu.Unlock() for _, conns := range m { for _, pconn := range conns { - pconn.close() + pconn.close(errCloseIdleConns) } } } @@ -450,17 +450,34 @@ func (cm *connectMethod) proxyAuth() string { return "" } -// putIdleConn adds pconn to the list of idle persistent connections awaiting +// error values for debugging and testing, not seen by users. +var ( + errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") + errConnBroken = errors.New("http: putIdleConn: connection is in bad state") + errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") + errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") + errCloseIdleConns = errors.New("http: CloseIdleConnections called") + errReadLoopExiting = errors.New("http: persistConn.readLoop exiting") + errServerClosedIdle = errors.New("http: server closed idle conn") +) + +func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { + if err := t.tryPutIdleConn(pconn); err != nil { + pconn.close(err) + } +} + +// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting // a new request. -// If pconn is no longer needed or not in a good state, putIdleConn -// returns false. -func (t *Transport) putIdleConn(pconn *persistConn) bool { +// If pconn is no longer needed or not in a good state, tryPutIdleConn returns +// an error explaining why it wasn't registered. +// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that. +func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { - pconn.close() - return false + return errKeepAlivesDisabled } if pconn.isBroken() { - return false + return errConnBroken } key := pconn.cacheKey max := t.MaxIdleConnsPerHost @@ -479,7 +496,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ t.idleMu.Unlock() - return true + return nil default: if waitingDialer != nil { // They had populated this, but their dial won @@ -489,16 +506,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } if t.wantIdle { t.idleMu.Unlock() - pconn.close() - return false + return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } if len(t.idleConn[key]) >= max { t.idleMu.Unlock() - pconn.close() - return false + return errTooManyIdle } for _, exist := range t.idleConn[key] { if exist == pconn { @@ -507,7 +522,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { } t.idleConn[key] = append(t.idleConn[key], pconn) t.idleMu.Unlock() - return true + return nil } // getIdleConnCh returns a channel to receive and return idle @@ -626,7 +641,7 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { - t.putIdleConn(v.pc) + t.putOrCloseIdleConn(v.pc) } testHookPostPendingDial() }() @@ -929,10 +944,10 @@ type persistConn struct { lk sync.Mutex // guards following fields numExpectedResponses int - closed bool // whether conn has been closed - broken bool // an error has happened on this connection; marked broken so it's not reused. - canceled bool // whether this conn was broken due a CancelRequest - reused bool // whether conn has had successful request/response and is being reused. + closed error // set non-nil when conn is closed, before closech is closed + broken bool // an error has happened on this connection; marked broken so it's not reused. + canceled bool // whether this conn was broken due a CancelRequest + reused bool // whether conn has had successful request/response and is being reused. // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) @@ -966,11 +981,20 @@ func (pc *persistConn) cancelRequest() { pc.lk.Lock() defer pc.lk.Unlock() pc.canceled = true - pc.closeLocked() + pc.closeLocked(errRequestCanceled) } func (pc *persistConn) readLoop() { - defer pc.close() + closeErr := errReadLoopExiting // default value, if not changed below + defer func() { pc.close(closeErr) }() + + tryPutIdleConn := func() bool { + if err := pc.t.tryPutIdleConn(pc); err != nil { + closeErr = err + return false + } + return true + } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection @@ -1016,7 +1040,11 @@ func (pc *persistConn) readLoop() { if checkTransportResend(err, rc.req, pc) != nil { pc.t.setReqCanceler(rc.req, nil) } - rc.ch <- responseAndError{err: err} + select { + case rc.ch <- responseAndError{err: err}: + case <-rc.callerGone: + return + } return } @@ -1035,8 +1063,6 @@ func (pc *persistConn) readLoop() { if !hasBody { pc.t.setReqCanceler(rc.req, nil) - resc := make(chan *Response) // unbuffered matters; see below - rc.ch <- responseAndError{ch: resc} // buffered send // Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll @@ -1047,9 +1073,13 @@ func (pc *persistConn) readLoop() { alive = alive && !pc.sawEOF && pc.wroteRequest() && - pc.t.putIdleConn(pc) + tryPutIdleConn() - resc <- resp // unbuffered send + select { + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return + } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so @@ -1079,7 +1109,11 @@ func (pc *persistConn) readLoop() { return err } - rc.ch <- responseAndError{r: resp} + select { + case rc.ch <- responseAndError{res: resp}: + case <-rc.callerGone: + return + } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish @@ -1091,7 +1125,7 @@ func (pc *persistConn) readLoop() { bodyEOF && !pc.sawEOF && pc.wroteRequest() && - pc.t.putIdleConn(pc) + tryPutIdleConn() if bodyEOF { eofc <- struct{}{} } @@ -1116,14 +1150,19 @@ func maybeUngzipResponse(resp *Response) { } func (pc *persistConn) readLoopPeekFailLocked(peekErr error) { - if pc.closed { + if pc.closed != nil { return } if n := pc.br.Buffered(); n > 0 { buf, _ := pc.br.Peek(n) log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr) } - pc.closeLocked() + if peekErr == io.EOF { + // common case. + pc.closeLocked(errServerClosedIdle) + } else { + pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr)) + } } // readResponse reads an HTTP response (or two, in the case of "Expect: @@ -1227,25 +1266,13 @@ func (pc *persistConn) wroteRequest() bool { // responseAndError is how the goroutine reading from an HTTP/1 server // communicates with the goroutine doing the RoundTrip. type responseAndError struct { - ch chan *Response // if non-nil, res should be read from here - r *Response // else use this response (see res method) + res *Response // else use this response (see res method) err error } -func (re responseAndError) res() *Response { - switch { - case re.err != nil: - return nil - case re.ch != nil: - return <-re.ch - default: - return re.r - } -} - type requestAndChan struct { req *Request - ch chan responseAndError + ch chan responseAndError // unbuffered; always send in select on callerGone // did the Transport (as opposed to the client code) add an // Accept-Encoding gzip header? only if it we set it do @@ -1257,6 +1284,8 @@ type requestAndChan struct { // the server responds 100 Continue, readLoop send a value // to writeLoop via this chan. continueCh chan<- struct{} + + callerGone <-chan struct{} // closed when roundTrip caller has returned } // A writeRequest is sent by the readLoop's goroutine to the @@ -1283,7 +1312,7 @@ func (e *httpError) Timeout() bool { return e.timeout } func (e *httpError) Temporary() bool { return true } var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} -var errClosed error = &httpError{err: "net/http: transport closed before response was received"} +var errClosed error = &httpError{err: "net/http: server closed connection before response was received"} var errRequestCanceled = errors.New("net/http: request canceled") func nop() {} @@ -1309,7 +1338,7 @@ type beforeRespHeaderError struct { func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { - pc.t.putIdleConn(pc) + pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.lk.Lock() @@ -1355,14 +1384,23 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err req.extraHeaders().Set("Connection", "close") } + gone := make(chan struct{}) + defer close(gone) + // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} - resc := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, resc, requestedGzip, continueCh} + resc := make(chan responseAndError) + pc.reqch <- requestAndChan{ + req: req.Request, + ch: resc, + addedGzip: requestedGzip, + continueCh: continueCh, + callerGone: gone, + } var re responseAndError var respHeaderTimer <-chan time.Time @@ -1372,22 +1410,9 @@ WaitResponse: testHookWaitResLoop() select { case err := <-writeErrCh: - if isNetWriteError(err) { - // Issue 11745. If we failed to write the request - // body, it's possible the server just heard enough - // and already wrote to us. Prioritize the server's - // response over returning a body write error. - select { - case re = <-resc: - pc.close() - break WaitResponse - case <-time.After(50 * time.Millisecond): - // Fall through. - } - } if err != nil { re = responseAndError{err: beforeRespHeaderError{err}} - pc.close() + pc.close(fmt.Errorf("write error: %v", err)) break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { @@ -1400,12 +1425,12 @@ WaitResponse: if pc.isCanceled() { err = errRequestCanceled } else { - err = beforeRespHeaderError{errClosed} + err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)} } re = responseAndError{err: err} break WaitResponse case <-respHeaderTimer: - pc.close() + pc.close(errTimeout) re = responseAndError{err: errTimeout} break WaitResponse case re = <-resc: @@ -1419,7 +1444,10 @@ WaitResponse: if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } - return re.res(), re.err + if (re.res == nil) == (re.err == nil) { + panic("internal error: exactly one of res or err should be set") + } + return re.res, re.err } // markBroken marks a connection as broken (so it's not reused). @@ -1439,17 +1467,25 @@ func (pc *persistConn) markReused() { pc.lk.Unlock() } -func (pc *persistConn) close() { +// close closes the underlying TCP connection and closes +// the pc.closech channel. +// +// The provided err is only for testing and debugging; in normal +// circumstances it should never be seen by users. +func (pc *persistConn) close(err error) { pc.lk.Lock() defer pc.lk.Unlock() - pc.closeLocked() + pc.closeLocked(err) } -func (pc *persistConn) closeLocked() { +func (pc *persistConn) closeLocked(err error) { + if err == nil { + panic("nil error") + } pc.broken = true - if !pc.closed { + if pc.closed == nil { pc.conn.Close() - pc.closed = true + pc.closed = err close(pc.closech) } pc.mutateHeaderFunc = nil