diff --git a/src/aggregator/client/conn_options.go b/src/aggregator/client/conn_options.go index 7a85b67581..71b5a89886 100644 --- a/src/aggregator/client/conn_options.go +++ b/src/aggregator/client/conn_options.go @@ -117,15 +117,15 @@ type ConnectionOptions interface { type connectionOptions struct { clockOpts clock.Options instrumentOpts instrument.Options + writeRetryOpts retry.Options + rwOpts xio.Options connTimeout time.Duration - connKeepAlive bool writeTimeout time.Duration + maxDuration time.Duration initThreshold int maxThreshold int multiplier int - maxDuration time.Duration - writeRetryOpts retry.Options - rwOpts xio.Options + connKeepAlive bool } // NewConnectionOptions create a new set of connection options. diff --git a/src/aggregator/client/payload.go b/src/aggregator/client/payload.go index ffc2722450..a9c6b6f5a9 100644 --- a/src/aggregator/client/payload.go +++ b/src/aggregator/client/payload.go @@ -55,8 +55,8 @@ type timedPayload struct { } type timedWithStagedMetadatas struct { - metric aggregated.Metric metadatas metadata.StagedMetadatas + metric aggregated.Metric } type passthroughPayload struct { @@ -65,10 +65,10 @@ type passthroughPayload struct { } type payloadUnion struct { - payloadType payloadType - untimed untimedPayload forwarded forwardedPayload + untimed untimedPayload timed timedPayload timedWithStagedMetadatas timedWithStagedMetadatas passthrough passthroughPayload + payloadType payloadType } diff --git a/src/aggregator/client/ref_count.go b/src/aggregator/client/ref_count.go index fd3f5a588b..fa67c8bbea 100644 --- a/src/aggregator/client/ref_count.go +++ b/src/aggregator/client/ref_count.go @@ -25,8 +25,8 @@ import "sync/atomic" type destructorFn func() type refCount struct { - n int32 destructorFn destructorFn + n int32 } func (rc *refCount) SetRefCount(n int) { atomic.StoreInt32(&rc.n, int32(n)) } diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 4db58ae8ea..dfc3c628a3 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -65,14 +65,14 @@ type writer struct { log *zap.Logger metrics writerMetrics - flushSize int - maxTimerBatchSize int encoderOpts protobuf.UnaggregatedOptions queue instanceQueue + flushSize int + maxTimerBatchSize int - closed bool encodersByShard map[uint32]*lockedEncoder newLockedEncoderFn newLockedEncoderFn + closed bool } func newInstanceWriter(instance placement.Instance, opts Options) instanceWriter { @@ -534,8 +534,8 @@ func newWriterMetrics(s tally.Scope) writerMetrics { } type lockedEncoder struct { - sync.Mutex protobuf.UnaggregatedEncoder + sync.Mutex } func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder { @@ -544,8 +544,8 @@ func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder { } type refCountedWriter struct { - refCount instanceWriter + refCount } func newRefCountedWriter(instance placement.Instance, opts Options) *refCountedWriter { diff --git a/src/metrics/encoding/protobuf/aggregated_encoder.go b/src/metrics/encoding/protobuf/aggregated_encoder.go index c27c60226b..842e68664d 100644 --- a/src/metrics/encoding/protobuf/aggregated_encoder.go +++ b/src/metrics/encoding/protobuf/aggregated_encoder.go @@ -68,5 +68,9 @@ func (enc *aggregatedEncoder) Encode( } func (enc *aggregatedEncoder) Buffer() Buffer { - return NewBuffer(enc.buf, enc.pool) + var fn PoolReleaseFn + if enc.pool != nil { + fn = enc.pool.Put + } + return NewBuffer(enc.buf, fn) } diff --git a/src/metrics/encoding/protobuf/buffer.go b/src/metrics/encoding/protobuf/buffer.go index f6e08eb3fc..94b1f624a8 100644 --- a/src/metrics/encoding/protobuf/buffer.go +++ b/src/metrics/encoding/protobuf/buffer.go @@ -26,16 +26,18 @@ import ( "github.com/m3db/m3/src/x/pool" ) +// PoolReleaseFn is a function used to release underlying slice back to bytes pool. +type PoolReleaseFn func([]byte) + // Buffer contains a byte slice backed by an optional bytes pool. type Buffer struct { - buf []byte - pool pool.BytesPool - closed bool + buf []byte + finalizer PoolReleaseFn } // NewBuffer create a new buffer. -func NewBuffer(buf []byte, p pool.BytesPool) Buffer { - return Buffer{buf: buf, pool: p} +func NewBuffer(buf []byte, p PoolReleaseFn) Buffer { + return Buffer{buf: buf, finalizer: p} } // Bytes returns the raw byte slice. @@ -46,15 +48,11 @@ func (b *Buffer) Truncate(n int) { b.buf = b.buf[:n] } // Close closes the buffer. func (b *Buffer) Close() { - if b.closed { - return - } - b.closed = true - if b.pool != nil && b.buf != nil { - b.pool.Put(b.buf) + if b.finalizer != nil && b.buf != nil { + b.finalizer(b.buf) } - b.pool = nil b.buf = nil + b.finalizer = nil } type copyDataMode int diff --git a/src/metrics/encoding/protobuf/buffer_test.go b/src/metrics/encoding/protobuf/buffer_test.go index 628a6e604f..df6552ef20 100644 --- a/src/metrics/encoding/protobuf/buffer_test.go +++ b/src/metrics/encoding/protobuf/buffer_test.go @@ -37,13 +37,11 @@ func TestBufferWithPool(t *testing.T) { data := p.Get(16)[:16] data[0] = 0xff - buf := NewBuffer(data, p) - require.Equal(t, data, buf.Bytes()) - require.False(t, buf.closed) + buf := NewBuffer(data, p.Put) + require.NotNil(t, buf.buf) buf.Close() - require.True(t, buf.closed) - require.Nil(t, buf.pool) + require.Nil(t, buf.finalizer) require.Nil(t, buf.buf) // Verify that closing the buffer returns the buffer to pool. @@ -60,11 +58,10 @@ func TestBufferNilPool(t *testing.T) { data := make([]byte, 16) buf := NewBuffer(data, nil) require.Equal(t, data, buf.Bytes()) - require.False(t, buf.closed) + require.NotNil(t, buf.buf) buf.Close() - require.True(t, buf.closed) - require.Nil(t, buf.pool) + require.Nil(t, buf.finalizer) require.Nil(t, buf.buf) } diff --git a/src/metrics/encoding/protobuf/unaggregated_encoder.go b/src/metrics/encoding/protobuf/unaggregated_encoder.go index cee6855a4b..d470468360 100644 --- a/src/metrics/encoding/protobuf/unaggregated_encoder.go +++ b/src/metrics/encoding/protobuf/unaggregated_encoder.go @@ -111,7 +111,7 @@ func (enc *unaggregatedEncoder) Truncate(n int) error { } func (enc *unaggregatedEncoder) Relinquish() Buffer { - res := NewBuffer(enc.buf[:enc.used], enc.pool) + res := NewBuffer(enc.buf[:enc.used], enc.pool.Put) enc.buf = nil enc.used = 0 return res