Skip to content

Commit

Permalink
[Heartbeat] Read entire body before closing connection (#8660)
Browse files Browse the repository at this point in the history
This fixes an issue where the connection would be closed after only a partial body read. The RoundTripper would close the conn before returning the response which included a partially buffered body. This would actually work for short responses, since the backing bufio would do a partial read, but would fail on all but the shortest responses.

Normally connection lifecycle is handled outside the realm of the `RoundTripper`, but for our purposes we don't want to re-use connections. Since it is a requirement that all response bodies be closed, we can piggy-back on top of that to ensure the connection is closed.
  • Loading branch information
andrewvc authored Oct 23, 2018
1 parent 972c6a9 commit 1e03ff4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
18 changes: 18 additions & 0 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/url"
"os"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -51,6 +52,23 @@ func HelloWorldHandler(status int) http.HandlerFunc {
)
}

// SizedResponseHandler responds with 200 to any request with a body
// exactly the size of the `bytes` argument, where each byte is the
// character 'x'
func SizedResponseHandler(bytes int) http.HandlerFunc {
var body strings.Builder
for i := 0; i < bytes; i++ {
body.WriteString("x")
}

return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
io.WriteString(w, body.String())
},
)
}

// ServerPort takes an httptest.Server and returns its port as a uint16.
func ServerPort(server *httptest.Server) (uint16, error) {
u, err := url.Parse(server.URL)
Expand Down
35 changes: 35 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,41 @@ func TestDownStatuses(t *testing.T) {
}
}

func TestLargeResponse(t *testing.T) {
server := httptest.NewServer(hbtest.SizedResponseHandler(1024 * 1024))
defer server.Close()

configSrc := map[string]interface{}{
"urls": server.URL,
"timeout": "1s",
"check.response.body": "x",
}

config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, err := create("largeresp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, 200),
)),
event.Fields,
)
}

func TestHTTPSServer(t *testing.T) {
server := httptest.NewTLSServer(hbtest.HelloWorldHandler(http.StatusOK))
port, err := hbtest.ServerPort(server)
Expand Down
33 changes: 32 additions & 1 deletion heartbeat/monitors/active/http/simple_transp.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if err != nil {
return nil, err
}
defer conn.Close()

requestedGzip := false
if t.DisableCompression &&
Expand Down Expand Up @@ -128,6 +127,21 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
case ret = <-readerDone:
break
case <-done:
// We need to free resources from the main reader
// We start by closing the conn, which will most likely cause an error
// in the read goroutine (unless we are right on the boundary between timeout and success)
// and will free up both the connection and cause that go routine to terminate.
conn.Close()
// Now we block waiting for that goroutine to finish. We do this synchronously
// because with a closed connection it should return immediately.
// We can ignore the ret.err value because the error is most likely due to us
// prematurely closing the conn
ret := <-readerDone
// If the body has been allocated we need to close it
if ret.resp != nil {
ret.resp.Body.Close()
}
// finally, return the real error. No need to return a response here
return nil, errors.New("http: request timed out while waiting for response")
}
close(readerDone)
Expand All @@ -147,6 +161,22 @@ func (t *SimpleTransport) writeRequest(conn net.Conn, req *http.Request) error {
return err
}

// comboConnReadCloser wraps a ReadCloser that is backed by
// on a net.Conn. It will close the net.Conn when the ReadCloser is closed.
type comboConnReadCloser struct {
conn net.Conn
rc io.ReadCloser
}

func (c comboConnReadCloser) Read(p []byte) (n int, err error) {
return c.rc.Read(p)
}

func (c comboConnReadCloser) Close() error {
defer c.conn.Close()
return c.rc.Close()
}

func (t *SimpleTransport) readResponse(
conn net.Conn,
req *http.Request,
Expand All @@ -157,6 +187,7 @@ func (t *SimpleTransport) readResponse(
if err != nil {
return nil, err
}
resp.Body = comboConnReadCloser{conn, resp.Body}

t.sigStartRead()

Expand Down

0 comments on commit 1e03ff4

Please sign in to comment.