Skip to content

Commit

Permalink
fix: pass bytes.Reader to http.Request, clean up pooledReader ourselv…
Browse files Browse the repository at this point in the history
…es (#159)

* pass bytes.Reader to http.Request, self-manage cleanup

* fix tests

* add comment explaining indirection
  • Loading branch information
lizthegrey authored Jan 5, 2022
1 parent 3ac22d9 commit b6d94df
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
36 changes: 15 additions & 21 deletions transmission/transmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,14 @@ func (b *batchAgg) fireBatch(events []*Event) {

var req *http.Request
reqBody, zipped := buildReqReader(encEvs, !b.disableCompression)
req, err = http.NewRequest("POST", url.String(), reqBody)
req.ContentLength = int64(reqBody.Len())
if reader, ok := reqBody.(*pooledReader); ok {
// Pass the underlying bytes.Reader to http.Request so that
// GetBody and ContentLength fields are populated on Request.
// See https://cs.opensource.google/go/go/+/refs/tags/go1.17.5:src/net/http/request.go;l=898
req, err = http.NewRequest("POST", url.String(), &reader.Reader)
} else {
req, err = http.NewRequest("POST", url.String(), reqBody)
}
req.Header.Set("Content-Type", contentType)
if zipped {
req.Header.Set("Content-Encoding", "zstd")
Expand All @@ -398,6 +404,9 @@ func (b *batchAgg) fireBatch(events []*Event) {
req.Header.Add("X-Honeycomb-Team", writeKey)
// send off batch!
resp, err = b.httpClient.Do(req)
if reader, ok := reqBody.(*pooledReader); ok {
reader.Release()
}

if httpErr, ok := err.(httpError); ok && httpErr.Timeout() {
continue
Expand Down Expand Up @@ -631,25 +640,12 @@ func (b *batchAgg) enqueueErrResponses(err error, events []*Event, duration time

var zstdBufferPool sync.Pool

type ReqReader interface {
io.ReadCloser
Len() int
}

type pooledReader struct {
bytes.Reader
buf []byte
}

type SimpleReader struct {
bytes.Reader
}

func (r SimpleReader) Close() error {
return nil
}

func (r *pooledReader) Close() error {
func (r *pooledReader) Release() error {
// Ensure further attempts to read will return io.EOF
r.Reset(nil)
// Then reset and give up ownership of the buffer.
Expand Down Expand Up @@ -677,9 +673,9 @@ func init() {
}
}

// buildReqReader returns an io.ReadCloser and a boolean, indicating whether or not
// buildReqReader returns an io.Reader and a boolean, indicating whether or not
// the underlying bytes.Reader is compressed.
func buildReqReader(jsonEncoded []byte, compress bool) (ReqReader, bool) {
func buildReqReader(jsonEncoded []byte, compress bool) (io.Reader, bool) {
if compress {
var buf []byte
if found, ok := zstdBufferPool.Get().([]byte); ok {
Expand All @@ -693,9 +689,7 @@ func buildReqReader(jsonEncoded []byte, compress bool) (ReqReader, bool) {
reader.Reset(reader.buf)
return &reader, true
}
var reader SimpleReader
reader.Reset(jsonEncoded)
return &reader, false
return bytes.NewReader(jsonEncoded), false
}

// nower to make testing easier
Expand Down
11 changes: 8 additions & 3 deletions transmission/transmission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type FakeRoundTripper struct {

func (f *FakeRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
f.req = r
if r.GetBody == nil {
panic("Retries must be possible. Set GetBody to fix this.")
}
if r.ContentLength == 0 {
panic("Expected a content length for all POST payloads.")
}
Expand Down Expand Up @@ -492,6 +495,9 @@ func (f *FancyFakeRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
headerKeys := strings.Split(reqHeader, ",")
expectedURL, _ := url.Parse(fmt.Sprintf("%s/1/batch/%s", headerKeys[0], headerKeys[2]))
if r.Header.Get("X-Honeycomb-Team") == headerKeys[1] && r.URL.String() == expectedURL.String() {
if r.GetBody == nil {
panic("Retries must be possible. Set GetBody to fix this.")
}
if r.ContentLength == 0 {
panic("Expected a content length for all POST payloads.")
}
Expand Down Expand Up @@ -1121,7 +1127,7 @@ func TestBuildReqReaderCompress(t *testing.T) {
// attempting to Read() returns io.EOF but no crash.
// Needed to support https://go-review.googlesource.com/c/net/+/355491
reader, _ = buildReqReader([]byte(`{"hello": "world"}`), true)
reader.Close()
reader.(*pooledReader).Release()
_, err = reader.Read(nil)
testEquals(t, err, io.EOF)
}
Expand Down Expand Up @@ -1235,15 +1241,14 @@ func BenchmarkCompression(b *testing.B) {
for n := 0; n < b.N; n++ {
reader, _ := buildReqReader(payload, false)
reader.Read(buf)
reader.Close()
}
})

b.Run("zstd", func(b *testing.B) {
for n := 0; n < b.N; n++ {
reader, _ := buildReqReader(payload, true)
reader.Read(buf)
reader.Close()
reader.(*pooledReader).Release()
}
})

Expand Down

0 comments on commit b6d94df

Please sign in to comment.