From b6d94df162f5ee0c7bf5c8f19587d4c6771d661a Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Wed, 5 Jan 2022 06:10:43 -0500 Subject: [PATCH] fix: pass bytes.Reader to http.Request, clean up pooledReader ourselves (#159) * pass bytes.Reader to http.Request, self-manage cleanup * fix tests * add comment explaining indirection --- transmission/transmission.go | 36 +++++++++++++------------------ transmission/transmission_test.go | 11 +++++++--- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/transmission/transmission.go b/transmission/transmission.go index e44fe74..33d0af3 100644 --- a/transmission/transmission.go +++ b/transmission/transmission.go @@ -387,8 +387,14 @@ func (b *batchAgg) fireBatch(events []*Event) { var req *http.Request reqBody, zipped := buildReqReader(encEvs, !b.disableCompression) - req, err = http.NewRequest("POST", url.String(), reqBody) - req.ContentLength = int64(reqBody.Len()) + if reader, ok := reqBody.(*pooledReader); ok { + // Pass the underlying bytes.Reader to http.Request so that + // GetBody and ContentLength fields are populated on Request. + // See https://cs.opensource.google/go/go/+/refs/tags/go1.17.5:src/net/http/request.go;l=898 + req, err = http.NewRequest("POST", url.String(), &reader.Reader) + } else { + req, err = http.NewRequest("POST", url.String(), reqBody) + } req.Header.Set("Content-Type", contentType) if zipped { req.Header.Set("Content-Encoding", "zstd") @@ -398,6 +404,9 @@ func (b *batchAgg) fireBatch(events []*Event) { req.Header.Add("X-Honeycomb-Team", writeKey) // send off batch! resp, err = b.httpClient.Do(req) + if reader, ok := reqBody.(*pooledReader); ok { + reader.Release() + } if httpErr, ok := err.(httpError); ok && httpErr.Timeout() { continue @@ -631,25 +640,12 @@ func (b *batchAgg) enqueueErrResponses(err error, events []*Event, duration time var zstdBufferPool sync.Pool -type ReqReader interface { - io.ReadCloser - Len() int -} - type pooledReader struct { bytes.Reader buf []byte } -type SimpleReader struct { - bytes.Reader -} - -func (r SimpleReader) Close() error { - return nil -} - -func (r *pooledReader) Close() error { +func (r *pooledReader) Release() error { // Ensure further attempts to read will return io.EOF r.Reset(nil) // Then reset and give up ownership of the buffer. @@ -677,9 +673,9 @@ func init() { } } -// buildReqReader returns an io.ReadCloser and a boolean, indicating whether or not +// buildReqReader returns an io.Reader and a boolean, indicating whether or not // the underlying bytes.Reader is compressed. -func buildReqReader(jsonEncoded []byte, compress bool) (ReqReader, bool) { +func buildReqReader(jsonEncoded []byte, compress bool) (io.Reader, bool) { if compress { var buf []byte if found, ok := zstdBufferPool.Get().([]byte); ok { @@ -693,9 +689,7 @@ func buildReqReader(jsonEncoded []byte, compress bool) (ReqReader, bool) { reader.Reset(reader.buf) return &reader, true } - var reader SimpleReader - reader.Reset(jsonEncoded) - return &reader, false + return bytes.NewReader(jsonEncoded), false } // nower to make testing easier diff --git a/transmission/transmission_test.go b/transmission/transmission_test.go index a8f9b31..59fd280 100644 --- a/transmission/transmission_test.go +++ b/transmission/transmission_test.go @@ -124,6 +124,9 @@ type FakeRoundTripper struct { func (f *FakeRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { f.req = r + if r.GetBody == nil { + panic("Retries must be possible. Set GetBody to fix this.") + } if r.ContentLength == 0 { panic("Expected a content length for all POST payloads.") } @@ -492,6 +495,9 @@ func (f *FancyFakeRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro headerKeys := strings.Split(reqHeader, ",") expectedURL, _ := url.Parse(fmt.Sprintf("%s/1/batch/%s", headerKeys[0], headerKeys[2])) if r.Header.Get("X-Honeycomb-Team") == headerKeys[1] && r.URL.String() == expectedURL.String() { + if r.GetBody == nil { + panic("Retries must be possible. Set GetBody to fix this.") + } if r.ContentLength == 0 { panic("Expected a content length for all POST payloads.") } @@ -1121,7 +1127,7 @@ func TestBuildReqReaderCompress(t *testing.T) { // attempting to Read() returns io.EOF but no crash. // Needed to support https://go-review.googlesource.com/c/net/+/355491 reader, _ = buildReqReader([]byte(`{"hello": "world"}`), true) - reader.Close() + reader.(*pooledReader).Release() _, err = reader.Read(nil) testEquals(t, err, io.EOF) } @@ -1235,7 +1241,6 @@ func BenchmarkCompression(b *testing.B) { for n := 0; n < b.N; n++ { reader, _ := buildReqReader(payload, false) reader.Read(buf) - reader.Close() } }) @@ -1243,7 +1248,7 @@ func BenchmarkCompression(b *testing.B) { for n := 0; n < b.N; n++ { reader, _ := buildReqReader(payload, true) reader.Read(buf) - reader.Close() + reader.(*pooledReader).Release() } })