From b5d1e79341725ffa40787ddcb310d9d7acf57237 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Oct 2018 08:57:07 -0500 Subject: [PATCH 1/3] [Heartbeat] Read entire body before closing connection This fixes an issue where the connection would be closed after only a partial body read. The RoundTripper would close the conn before returning the response which included a partially buffered body. This would actually work for short responses, since the backing bufio would do a partial read, but would fail on all but the shortest responses. Normally connection lifecycle is handled outside the realm of the `RoundTripper`, but for our purposes we don't want to re-use connections. Since it is a requirement that all response bodies be closed, we can piggy-back on top of that to ensure the connection is closed. Fixes #8588 --- heartbeat/hbtest/hbtestutil.go | 15 ++++++++ heartbeat/monitors/active/http/http_test.go | 35 +++++++++++++++++++ .../monitors/active/http/simple_transp.go | 18 +++++++++- 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index ac6e5b0f3ee..9144ed92b93 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -26,6 +26,7 @@ import ( "net/url" "os" "strconv" + "strings" "testing" "github.com/stretchr/testify/require" @@ -51,6 +52,20 @@ func HelloWorldHandler(status int) http.HandlerFunc { ) } +func LargeResponseHandler(bytes int) http.HandlerFunc { + var body strings.Builder + for i := 0; i < bytes; i++ { + body.WriteString("x") + } + + return http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + io.WriteString(w, body.String()) + }, + ) +} + // ServerPort takes an httptest.Server and returns its port as a uint16. func ServerPort(server *httptest.Server) (uint16, error) { u, err := url.Parse(server.URL) diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 8899cdf0fc5..37d11f483bb 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -210,6 +210,41 @@ func TestDownStatuses(t *testing.T) { } } +func TestLargeResponse(t *testing.T) { + server := httptest.NewServer(hbtest.LargeResponseHandler(1024 * 1024)) + defer server.Close() + + configSrc := map[string]interface{}{ + "urls": server.URL, + "timeout": "1s", + "check.response.body": "x", + } + + config, err := common.NewConfigFrom(configSrc) + require.NoError(t, err) + + jobs, err := create("largeresp", config) + require.NoError(t, err) + + job := jobs[0] + + event, _, err := job.Run() + require.NoError(t, err) + + port, err := hbtest.ServerPort(server) + require.NoError(t, err) + + mapvaltest.Test( + t, + mapval.Strict(mapval.Compose( + hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"), + hbtest.RespondingTCPChecks(port), + respondingHTTPChecks(server.URL, 200), + )), + event.Fields, + ) +} + func TestHTTPSServer(t *testing.T) { server := httptest.NewTLSServer(hbtest.HelloWorldHandler(http.StatusOK)) port, err := hbtest.ServerPort(server) diff --git a/heartbeat/monitors/active/http/simple_transp.go b/heartbeat/monitors/active/http/simple_transp.go index 9f43c4fec4c..81f37d48e00 100644 --- a/heartbeat/monitors/active/http/simple_transp.go +++ b/heartbeat/monitors/active/http/simple_transp.go @@ -85,7 +85,6 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) { if err != nil { return nil, err } - defer conn.Close() requestedGzip := false if t.DisableCompression && @@ -147,6 +146,22 @@ func (t *SimpleTransport) writeRequest(conn net.Conn, req *http.Request) error { return err } +// comboConnReadCloser wraps a ReadCloser that is backed by +// on a net.Conn. It will close the net.Conn when the ReadCloser is closed. +type comboConnReadCloser struct { + conn net.Conn + rc io.ReadCloser +} + +func (c comboConnReadCloser) Read(p []byte) (n int, err error) { + return c.rc.Read(p) +} + +func (c comboConnReadCloser) Close() error { + defer c.conn.Close() + return c.rc.Close() +} + func (t *SimpleTransport) readResponse( conn net.Conn, req *http.Request, @@ -154,6 +169,7 @@ func (t *SimpleTransport) readResponse( ) (*http.Response, error) { reader := bufio.NewReader(conn) resp, err := http.ReadResponse(reader, req) + resp.Body = comboConnReadCloser{conn, resp.Body} if err != nil { return nil, err } From 7b5d9085bff1ba6da8e20540676b97eb38771388 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Oct 2018 14:19:04 -0500 Subject: [PATCH 2/3] Document/Rename SizedResponseHandler --- heartbeat/hbtest/hbtestutil.go | 5 ++++- heartbeat/monitors/active/http/http_test.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 9144ed92b93..1c38a223dfb 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -52,7 +52,10 @@ func HelloWorldHandler(status int) http.HandlerFunc { ) } -func LargeResponseHandler(bytes int) http.HandlerFunc { +// SizedResponseHandler responds with 200 to any request with a body +// exactly the size of the `bytes` argument, where each byte is the +// character 'x' +func SizedResponseHandler(bytes int) http.HandlerFunc { var body strings.Builder for i := 0; i < bytes; i++ { body.WriteString("x") diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 37d11f483bb..91585b0c329 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -211,7 +211,7 @@ func TestDownStatuses(t *testing.T) { } func TestLargeResponse(t *testing.T) { - server := httptest.NewServer(hbtest.LargeResponseHandler(1024 * 1024)) + server := httptest.NewServer(hbtest.SizedResponseHandler(1024 * 1024)) defer server.Close() configSrc := map[string]interface{}{ From 8a2ff56b3a0659e12998c91fb73a6a0de20b69bf Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 22 Oct 2018 17:11:05 -0500 Subject: [PATCH 3/3] Properly cleanup conns in error states in heartbeat --- heartbeat/monitors/active/http/simple_transp.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/heartbeat/monitors/active/http/simple_transp.go b/heartbeat/monitors/active/http/simple_transp.go index 81f37d48e00..889baa8e5a1 100644 --- a/heartbeat/monitors/active/http/simple_transp.go +++ b/heartbeat/monitors/active/http/simple_transp.go @@ -127,6 +127,21 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) { case ret = <-readerDone: break case <-done: + // We need to free resources from the main reader + // We start by closing the conn, which will most likely cause an error + // in the read goroutine (unless we are right on the boundary between timeout and success) + // and will free up both the connection and cause that go routine to terminate. + conn.Close() + // Now we block waiting for that goroutine to finish. We do this synchronously + // because with a closed connection it should return immediately. + // We can ignore the ret.err value because the error is most likely due to us + // prematurely closing the conn + ret := <-readerDone + // If the body has been allocated we need to close it + if ret.resp != nil { + ret.resp.Body.Close() + } + // finally, return the real error. No need to return a response here return nil, errors.New("http: request timed out while waiting for response") } close(readerDone) @@ -169,10 +184,10 @@ func (t *SimpleTransport) readResponse( ) (*http.Response, error) { reader := bufio.NewReader(conn) resp, err := http.ReadResponse(reader, req) - resp.Body = comboConnReadCloser{conn, resp.Body} if err != nil { return nil, err } + resp.Body = comboConnReadCloser{conn, resp.Body} t.sigStartRead()