Skip to content

Commit

Permalink
net/http: fix spurious logging in Transport when server closes idle conn
Browse files Browse the repository at this point in the history
In https://golang.org/3210, Transport errors occurring before
receiving response headers were wrapped in another error type to
indicate to the retry logic elsewhere that the request might be
re-tryable. But a check for err == io.EOF was missed, which then became
false once io.EOF was wrapped in the beforeRespHeaderError type.

The beforeRespHeaderError was too fragile. Remove it. I tried to fix
it in an earlier version of this CL and just broke different things
instead.

Also remove the "markBroken" method. It's redundant and confusing.

Also, rename the checkTransportResend method to shouldRetryRequest and
make it return a bool instead of an error. This also helps readability.

Now the code recognizes the two main reasons we'd want to retry a
request: because we never wrote the request in the first place (so:
count the number of bytes we've written), or because the server hung
up on us before we received response headers for an idempotent request.

As an added bonus, this could make POST requests safely re-tryable
since we know we haven't written anything yet. But it's too late in Go
1.7 to enable that, so we'll do that later (filed golang#15723).

This also adds a new internal (package http) test, since testing this
blackbox at higher levels in transport_test wasn't possible.

Fixes golang#15446

Change-Id: I2c1dc03b1f1ebdf3f04eba81792bd5c4fb6b6b66
Reviewed-on: https://go-review.googlesource.com/23160
Reviewed-by: Andrew Gerrand <[email protected]>
Run-TryBot: Brad Fitzpatrick <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
  • Loading branch information
bradfitz committed May 18, 2016
1 parent 9d73e14 commit 4d2ac54
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 77 deletions.
213 changes: 136 additions & 77 deletions src/net/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,47 +387,47 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) {
if err == nil {
return resp, nil
}
if err := checkTransportResend(err, req, pconn); err != nil {
if !pconn.shouldRetryRequest(req, err) {
return nil, err
}
testHookRoundTripRetried()
}
}

// checkTransportResend checks whether a failed HTTP request can be
// resent on a new connection. The non-nil input error is the error from
// roundTrip, which might be wrapped in a beforeRespHeaderError error.
//
// The return value is either nil to retry the request, the provided
// err unmodified, or the unwrapped error inside a
// beforeRespHeaderError.
func checkTransportResend(err error, req *Request, pconn *persistConn) error {
brhErr, ok := err.(beforeRespHeaderError)
if !ok {
return err
// shouldRetryRequest reports whether we should retry sending a failed
// HTTP request on a new connection. The non-nil input error is the
// error from roundTrip.
func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
if err == errMissingHost {
// User error.
return false
}
err = brhErr.error // unwrap the custom error in case we return it
if err != errMissingHost && pconn.isReused() && req.isReplayable() {
// If we try to reuse a connection that the server is in the process of
// closing, we may end up successfully writing out our request (or a
// portion of our request) only to find a connection error when we try to
// read from (or finish writing to) the socket.

// There can be a race between the socket pool checking whether a socket
// is still connected, receiving the FIN, and sending/reading data on a
// reused socket. If we receive the FIN between the connectedness check
// and writing/reading from the socket, we may first learn the socket is
// disconnected when we get a ERR_SOCKET_NOT_CONNECTED. This will most
// likely happen when trying to retrieve its IP address. See
// http://crbug.com/105824 for more details.

// We resend a request only if we reused a keep-alive connection and did
// not yet receive any header data. This automatically prevents an
// infinite resend loop because we'll run out of the cached keep-alive
// connections eventually.
return nil
if !pc.isReused() {
// This was a fresh connection. There's no reason the server
// should've hung up on us.
//
// Also, if we retried now, we could loop forever
// creating new connections and retrying if the server
// is just hanging up on us because it doesn't like
// our request (as opposed to sending an error).
return false
}
return err
if !req.isReplayable() {
// Don't retry non-idempotent requests.

// TODO: swap the nothingWrittenError and isReplayable checks,
// putting the "if nothingWrittenError => return true" case
// first, per golang.org/issue/15723
return false
}
if _, ok := err.(nothingWrittenError); ok {
// We never wrote anything, so it's safe to retry.
return true
}
if err == errServerClosedIdle || err == errServerClosedConn {
return true
}
return false // conservatively
}

// ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol.
Expand Down Expand Up @@ -570,7 +570,8 @@ var (
errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
errCloseIdleConns = errors.New("http: CloseIdleConnections called")
errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
errServerClosedIdle = errors.New("http: server closed idle conn")
errServerClosedIdle = errors.New("http: server closed idle connection")
errServerClosedConn = errors.New("http: server closed connection")
errIdleConnTimeout = errors.New("http: idle connection timeout")
)

Expand Down Expand Up @@ -881,12 +882,13 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
pconn := &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
tlsDial := t.DialTLS != nil && cm.targetScheme == "https" && cm.proxyURL == nil
if tlsDial {
Expand Down Expand Up @@ -1003,12 +1005,28 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistCon
}

pconn.br = bufio.NewReader(pconn)
pconn.bw = bufio.NewWriter(pconn.conn)
pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}

// persistConnWriter is the io.Writer written to by pc.bw.
// It accumulates the number of bytes written to the underlying conn,
// so the retry logic can determine whether any bytes made it across
// the wire.
// This is exactly 1 pointer field wide so it can go into an interface
// without allocation.
type persistConnWriter struct {
pc *persistConn
}

func (w persistConnWriter) Write(p []byte) (n int, err error) {
n, err = w.pc.conn.Write(p)
w.pc.nwrite += int64(n)
return
}

// useProxy reports whether requests to addr should use a proxy,
// according to the NO_PROXY or no_proxy environment variable.
// addr is always a canonicalAddr with a host and port.
Expand Down Expand Up @@ -1142,6 +1160,7 @@ type persistConn struct {
tlsState *tls.ConnectionState
br *bufio.Reader // from conn
bw *bufio.Writer // to conn
nwrite int64 // bytes written
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
closech chan struct{} // closed when conn closed
Expand All @@ -1154,6 +1173,8 @@ type persistConn struct {
// whether or not a connection can be reused. Issue 7569.
writeErrCh chan error

writeLoopDone chan struct{} // closed when write loop ends

// Both guarded by Transport.idleMu:
idleAt time.Time // time it last become idle
idleTimer *time.Timer // holding an AfterFunc to close it
Expand Down Expand Up @@ -1195,7 +1216,7 @@ func (pc *persistConn) Read(p []byte) (n int, err error) {
// isBroken reports whether this connection is in a known broken state.
func (pc *persistConn) isBroken() bool {
pc.mu.Lock()
b := pc.broken
b := pc.closed != nil
pc.mu.Unlock()
return b
}
Expand Down Expand Up @@ -1247,6 +1268,56 @@ func (pc *persistConn) closeConnIfStillIdle() {
pc.close(errIdleConnTimeout)
}

// mapRoundTripErrorFromReadLoop maps the provided readLoop error into
// the error value that should be returned from persistConn.roundTrip.
//
// The startBytesWritten value should be the value of pc.nwrite before the roundTrip
// started writing the request.
func (pc *persistConn) mapRoundTripErrorFromReadLoop(startBytesWritten int64, err error) (out error) {
if err == nil {
return nil
}
if pc.isCanceled() {
return errRequestCanceled
}
if err == errServerClosedIdle || err == errServerClosedConn {
return err
}
if pc.isBroken() {
<-pc.writeLoopDone
if pc.nwrite == startBytesWritten {
return nothingWrittenError{err}
}
}
return err
}

// mapRoundTripErrorAfterClosed returns the error value to be propagated
// up to Transport.RoundTrip method when persistConn.roundTrip sees
// its pc.closech channel close, indicating the persistConn is dead.
// (after closech is closed, pc.closed is valid).
func (pc *persistConn) mapRoundTripErrorAfterClosed(startBytesWritten int64) error {
if pc.isCanceled() {
return errRequestCanceled
}
err := pc.closed
if err == errServerClosedIdle || err == errServerClosedConn {
// Don't decorate
return err
}

// Wait for the writeLoop goroutine to terminated, and then
// see if we actually managed to write anything. If not, we
// can retry the request.
<-pc.writeLoopDone
if pc.nwrite == startBytesWritten {
return nothingWrittenError{err}
}

return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)

}

func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
Expand Down Expand Up @@ -1283,9 +1354,6 @@ func (pc *persistConn) readLoop() {
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
if err != nil {
err = beforeRespHeaderError{err}
}

pc.mu.Lock()
if pc.numExpectedResponses == 0 {
Expand All @@ -1301,20 +1369,24 @@ func (pc *persistConn) readLoop() {
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
err = errServerClosedConn
closeErr = err
}

if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}

// If we won't be able to retry this request later (from the
// roundTrip goroutine), mark it as done now.
// BEFORE the send on rc.ch, as the client might re-use the
// same *Request pointer, and we don't want to set call
// t.setReqCanceler from this persistConn while the Transport
// potentially spins up a different persistConn for the
// caller's subsequent request.
if checkTransportResend(err, rc.req, pc) != nil {
if !pc.shouldRetryRequest(rc.req, err) {
pc.t.setReqCanceler(rc.req, nil)
}
select {
Expand Down Expand Up @@ -1501,24 +1573,33 @@ func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
}
}

// nothingWrittenError wraps a write errors which ended up writing zero bytes.
type nothingWrittenError struct {
error
}

func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
if pc.isBroken() {
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
pc.markBroken()
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
pc.writeErrCh <- err // to the body reader, which might recycle us
wr.ch <- err // to the roundTrip function
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
Expand Down Expand Up @@ -1619,12 +1700,6 @@ var (
testHookReadLoopBeforeNextRead = nop
)

// beforeRespHeaderError is used to indicate when an IO error has occurred before
// any header data was received.
type beforeRespHeaderError struct {
error
}

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
Expand Down Expand Up @@ -1680,6 +1755,7 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}

Expand All @@ -1704,7 +1780,7 @@ WaitResponse:
if pc.isCanceled() {
err = errRequestCanceled
}
re = responseAndError{err: beforeRespHeaderError{err}}
re = responseAndError{err: err}
pc.close(fmt.Errorf("write error: %v", err))
break WaitResponse
}
Expand All @@ -1714,22 +1790,14 @@ WaitResponse:
respHeaderTimer = timer.C
}
case <-pc.closech:
var err error
if pc.isCanceled() {
err = errRequestCanceled
} else {
err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)}
}
re = responseAndError{err: err}
re = responseAndError{err: pc.mapRoundTripErrorAfterClosed(startBytesWritten)}
break WaitResponse
case <-respHeaderTimer:
pc.close(errTimeout)
re = responseAndError{err: errTimeout}
break WaitResponse
case re = <-resc:
if re.err != nil && pc.isCanceled() {
re.err = errRequestCanceled
}
re.err = pc.mapRoundTripErrorFromReadLoop(startBytesWritten, re.err)
break WaitResponse
case <-cancelChan:
pc.t.CancelRequest(req.Request)
Expand All @@ -1749,15 +1817,6 @@ WaitResponse:
return re.res, re.err
}

// markBroken marks a connection as broken (so it's not reused).
// It differs from close in that it doesn't close the underlying
// connection for use when it's still being read.
func (pc *persistConn) markBroken() {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.broken = true
}

// markReused marks this connection as having been successfully used for a
// request and response.
func (pc *persistConn) markReused() {
Expand Down
Loading

0 comments on commit 4d2ac54

Please sign in to comment.