Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Pack some tcp client structs for lower memory utilization #3037

Merged
merged 2 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/aggregator/client/conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ type ConnectionOptions interface {
type connectionOptions struct {
clockOpts clock.Options
instrumentOpts instrument.Options
writeRetryOpts retry.Options
rwOpts xio.Options
connTimeout time.Duration
connKeepAlive bool
writeTimeout time.Duration
maxDuration time.Duration
initThreshold int
maxThreshold int
multiplier int
maxDuration time.Duration
writeRetryOpts retry.Options
rwOpts xio.Options
connKeepAlive bool
}

// NewConnectionOptions create a new set of connection options.
Expand Down
6 changes: 3 additions & 3 deletions src/aggregator/client/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type timedPayload struct {
}

type timedWithStagedMetadatas struct {
metric aggregated.Metric
metadatas metadata.StagedMetadatas
metric aggregated.Metric
}

type passthroughPayload struct {
Expand All @@ -65,10 +65,10 @@ type passthroughPayload struct {
}

type payloadUnion struct {
payloadType payloadType
untimed untimedPayload
forwarded forwardedPayload
untimed untimedPayload
timed timedPayload
timedWithStagedMetadatas timedWithStagedMetadatas
passthrough passthroughPayload
payloadType payloadType
}
2 changes: 1 addition & 1 deletion src/aggregator/client/ref_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import "sync/atomic"
type destructorFn func()

type refCount struct {
n int32
destructorFn destructorFn
n int32
}

func (rc *refCount) SetRefCount(n int) { atomic.StoreInt32(&rc.n, int32(n)) }
Expand Down
10 changes: 5 additions & 5 deletions src/aggregator/client/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ type writer struct {

log *zap.Logger
metrics writerMetrics
flushSize int
maxTimerBatchSize int
encoderOpts protobuf.UnaggregatedOptions
queue instanceQueue
flushSize int
maxTimerBatchSize int

closed bool
encodersByShard map[uint32]*lockedEncoder
newLockedEncoderFn newLockedEncoderFn
closed bool
}

func newInstanceWriter(instance placement.Instance, opts Options) instanceWriter {
Expand Down Expand Up @@ -534,8 +534,8 @@ func newWriterMetrics(s tally.Scope) writerMetrics {
}

type lockedEncoder struct {
sync.Mutex
protobuf.UnaggregatedEncoder
sync.Mutex
}

func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder {
Expand All @@ -544,8 +544,8 @@ func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder {
}

type refCountedWriter struct {
refCount
instanceWriter
refCount
}

func newRefCountedWriter(instance placement.Instance, opts Options) *refCountedWriter {
Expand Down
6 changes: 5 additions & 1 deletion src/metrics/encoding/protobuf/aggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@ func (enc *aggregatedEncoder) Encode(
}

func (enc *aggregatedEncoder) Buffer() Buffer {
return NewBuffer(enc.buf, enc.pool)
var fn PoolReleaseFn
if enc.pool != nil {
fn = enc.pool.Put
}
return NewBuffer(enc.buf, fn)
}
22 changes: 10 additions & 12 deletions src/metrics/encoding/protobuf/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
"github.com/m3db/m3/src/x/pool"
)

// PoolReleaseFn is a function used to release underlying slice back to bytes pool.
type PoolReleaseFn func([]byte)

// Buffer contains a byte slice backed by an optional bytes pool.
type Buffer struct {
buf []byte
pool pool.BytesPool
closed bool
buf []byte
finalizer PoolReleaseFn
}

// NewBuffer create a new buffer.
func NewBuffer(buf []byte, p pool.BytesPool) Buffer {
return Buffer{buf: buf, pool: p}
func NewBuffer(buf []byte, p PoolReleaseFn) Buffer {
return Buffer{buf: buf, finalizer: p}
}

// Bytes returns the raw byte slice.
Expand All @@ -46,15 +48,11 @@ func (b *Buffer) Truncate(n int) { b.buf = b.buf[:n] }

// Close closes the buffer.
func (b *Buffer) Close() {
if b.closed {
return
}
b.closed = true
if b.pool != nil && b.buf != nil {
b.pool.Put(b.buf)
if b.finalizer != nil && b.buf != nil {
b.finalizer(b.buf)
}
b.pool = nil
b.buf = nil
b.finalizer = nil
}

type copyDataMode int
Expand Down
13 changes: 5 additions & 8 deletions src/metrics/encoding/protobuf/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ func TestBufferWithPool(t *testing.T) {
data := p.Get(16)[:16]
data[0] = 0xff

buf := NewBuffer(data, p)
require.Equal(t, data, buf.Bytes())
require.False(t, buf.closed)
buf := NewBuffer(data, p.Put)
require.NotNil(t, buf.buf)

buf.Close()
require.True(t, buf.closed)
require.Nil(t, buf.pool)
require.Nil(t, buf.finalizer)
require.Nil(t, buf.buf)

// Verify that closing the buffer returns the buffer to pool.
Expand All @@ -60,11 +58,10 @@ func TestBufferNilPool(t *testing.T) {
data := make([]byte, 16)
buf := NewBuffer(data, nil)
require.Equal(t, data, buf.Bytes())
require.False(t, buf.closed)
require.NotNil(t, buf.buf)

buf.Close()
require.True(t, buf.closed)
require.Nil(t, buf.pool)
require.Nil(t, buf.finalizer)
require.Nil(t, buf.buf)
}

Expand Down
2 changes: 1 addition & 1 deletion src/metrics/encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (enc *unaggregatedEncoder) Truncate(n int) error {
}

func (enc *unaggregatedEncoder) Relinquish() Buffer {
res := NewBuffer(enc.buf[:enc.used], enc.pool)
res := NewBuffer(enc.buf[:enc.used], enc.pool.Put)
enc.buf = nil
enc.used = 0
return res
Expand Down