Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] - additional buffer pool #2829

Merged
merged 14 commits into from
May 16, 2024
90 changes: 7 additions & 83 deletions pkg/buffers/buffer.go → pkg/buffers/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -1,98 +1,20 @@
// Package buffer provides a custom buffer type that includes metrics for tracking buffer usage.
// It also provides a pool for managing buffer reusability.
package buffers
package buffer

import (
"bytes"
"io"
"sync"
"time"
)

type poolMetrics struct{}

func (poolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}

func (poolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
checkoutCount.Inc()
bufferCount.Inc()
}

func (poolMetrics) recordBufferReturn(buf *Buffer) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(buf.Cap()))
totalBufferLength.Add(float64(buf.Len()))
buf.recordMetric()
}

// PoolOpts is a function that configures a BufferPool.
type PoolOpts func(pool *Pool)

// Pool of buffers.
type Pool struct {
*sync.Pool
bufferSize uint32

metrics poolMetrics
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBufferPool creates a new instance of BufferPool.
func NewBufferPool(opts ...PoolOpts) *Pool {
pool := &Pool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))}
},
}

return pool
}

// Get returns a Buffer from the pool.
func (p *Pool) Get() *Buffer {
buf, ok := p.Pool.Get().(*Buffer)
if !ok {
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
}
p.metrics.recordBufferRetrival()
buf.resetMetric()

return buf
}

// Put returns a Buffer to the pool.
func (p *Pool) Put(buf *Buffer) {
p.metrics.recordBufferReturn(buf)

// If the Buffer is more than twice the default size, replace it with a new Buffer.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
p.metrics.recordShrink(buf.Cap() - defaultBufferSize)
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
} else {
// Reset the Buffer to clear any existing data.
buf.Reset()
}

p.Pool.Put(buf)
}

// Buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking Buffer checkout duration.
type Buffer struct {
*bytes.Buffer
checkedOutAt time.Time
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBuffer creates a new instance of Buffer.
func NewBuffer() *Buffer { return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, defaultBufferSize))} }

Expand All @@ -101,12 +23,14 @@ func (b *Buffer) Grow(size int) {
b.recordGrowth(size)
}

func (b *Buffer) resetMetric() { b.checkedOutAt = time.Now() }
func (b *Buffer) ResetMetric() { b.checkedOutAt = time.Now() }

func (b *Buffer) recordMetric() {
func (b *Buffer) RecordMetric() {
dur := time.Since(b.checkedOutAt)
checkoutDuration.Observe(float64(dur.Microseconds()))
checkoutDurationTotal.Add(float64(dur.Microseconds()))
totalBufferSize.Add(float64(b.Cap()))
ahrav marked this conversation as resolved.
Show resolved Hide resolved
totalBufferLength.Add(float64(b.Len()))
}

func (b *Buffer) recordGrowth(size int) {
Expand All @@ -119,7 +43,7 @@ func (b *Buffer) Write(data []byte) (int, error) {
if b.Buffer == nil {
// This case should ideally never occur if buffers are properly managed.
b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
b.resetMetric()
b.ResetMetric()
}

size := len(data)
Expand Down
75 changes: 1 addition & 74 deletions pkg/buffers/buffer_test.go → pkg/buffers/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package buffers
package buffer

import (
"bytes"
Expand All @@ -7,79 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNewBufferPool(t *testing.T) {
t.Parallel()
tests := []struct {
name string
opts []PoolOpts
expectedBuffSize uint32
}{
{name: "Default pool size", expectedBuffSize: defaultBufferSize},
{
name: "Custom pool size",
opts: []PoolOpts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB
expectedBuffSize: 8 * 1024,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool(tc.opts...)
assert.Equal(t, tc.expectedBuffSize, pool.bufferSize)
})
}
}

func TestBufferPoolGetPut(t *testing.T) {
t.Parallel()
tests := []struct {
name string
preparePool func(p *Pool) *Buffer // Prepare the pool and return an initial buffer to put if needed
expectedCapBefore int // Expected capacity before putting it back
expectedCapAfter int // Expected capacity after retrieving it again
}{
{
name: "Get new buffer and put back without modification",
preparePool: func(_ *Pool) *Buffer {
return nil // No initial buffer to put
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize),
},
{
name: "Put oversized buffer, expect shrink",
preparePool: func(p *Pool) *Buffer {
buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))}
return buf
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize), // Should shrink back to default
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool()
initialBuf := tc.preparePool(pool)
if initialBuf != nil {
pool.Put(initialBuf)
}

buf := pool.Get()
assert.Equal(t, tc.expectedCapBefore, buf.Cap())

pool.Put(buf)

bufAfter := pool.Get()
assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap())
})
}
}

func TestBufferWrite(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down
57 changes: 11 additions & 46 deletions pkg/buffers/metrics.go → pkg/buffers/buffer/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package buffers
package buffer

import (
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -8,34 +8,6 @@ import (
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})

growCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -50,20 +22,6 @@ var (
Help: "Total amount of bytes buffers in the pool have grown by.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -79,10 +37,17 @@ var (
Buckets: prometheus.ExponentialBuckets(10, 10, 7),
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})
)
45 changes: 45 additions & 0 deletions pkg/buffers/pool/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pool

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
})
)
Loading
Loading