Skip to content

Commit

Permalink
[refactor] - move buffer pool logic into own pkg (#2828)
Browse files Browse the repository at this point in the history
* move buffer pool logic into own pkg

* fix test

* fix test

* whoops
  • Loading branch information
ahrav authored May 13, 2024
1 parent 2b389f5 commit fe27e0f
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 225 deletions.
90 changes: 7 additions & 83 deletions pkg/buffers/buffer.go → pkg/buffers/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -1,98 +1,20 @@
// Package buffer provides a custom buffer type that includes metrics for tracking buffer usage.
// It also provides a pool for managing buffer reusability.
package buffers
package buffer

import (
"bytes"
"io"
"sync"
"time"
)

type poolMetrics struct{}

func (poolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}

func (poolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
checkoutCount.Inc()
bufferCount.Inc()
}

func (poolMetrics) recordBufferReturn(buf *Buffer) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(buf.Cap()))
totalBufferLength.Add(float64(buf.Len()))
buf.recordMetric()
}

// PoolOpts is a function that configures a BufferPool.
type PoolOpts func(pool *Pool)

// Pool of buffers.
type Pool struct {
*sync.Pool
bufferSize uint32

metrics poolMetrics
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBufferPool creates a new instance of BufferPool.
func NewBufferPool(opts ...PoolOpts) *Pool {
pool := &Pool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))}
},
}

return pool
}

// Get returns a Buffer from the pool.
func (p *Pool) Get() *Buffer {
buf, ok := p.Pool.Get().(*Buffer)
if !ok {
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
}
p.metrics.recordBufferRetrival()
buf.resetMetric()

return buf
}

// Put returns a Buffer to the pool.
func (p *Pool) Put(buf *Buffer) {
p.metrics.recordBufferReturn(buf)

// If the Buffer is more than twice the default size, replace it with a new Buffer.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
p.metrics.recordShrink(buf.Cap() - defaultBufferSize)
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
} else {
// Reset the Buffer to clear any existing data.
buf.Reset()
}

p.Pool.Put(buf)
}

// Buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking Buffer checkout duration.
type Buffer struct {
*bytes.Buffer
checkedOutAt time.Time
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBuffer creates a new instance of Buffer.
func NewBuffer() *Buffer { return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, defaultBufferSize))} }

Expand All @@ -101,12 +23,14 @@ func (b *Buffer) Grow(size int) {
b.recordGrowth(size)
}

func (b *Buffer) resetMetric() { b.checkedOutAt = time.Now() }
func (b *Buffer) ResetMetric() { b.checkedOutAt = time.Now() }

func (b *Buffer) recordMetric() {
func (b *Buffer) RecordMetric() {
dur := time.Since(b.checkedOutAt)
checkoutDuration.Observe(float64(dur.Microseconds()))
checkoutDurationTotal.Add(float64(dur.Microseconds()))
totalBufferSize.Add(float64(b.Cap()))
totalBufferLength.Add(float64(b.Len()))
}

func (b *Buffer) recordGrowth(size int) {
Expand All @@ -119,7 +43,7 @@ func (b *Buffer) Write(data []byte) (int, error) {
if b.Buffer == nil {
// This case should ideally never occur if buffers are properly managed.
b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
b.resetMetric()
b.ResetMetric()
}

size := len(data)
Expand Down
75 changes: 1 addition & 74 deletions pkg/buffers/buffer_test.go → pkg/buffers/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package buffers
package buffer

import (
"bytes"
Expand All @@ -7,79 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNewBufferPool(t *testing.T) {
t.Parallel()
tests := []struct {
name string
opts []PoolOpts
expectedBuffSize uint32
}{
{name: "Default pool size", expectedBuffSize: defaultBufferSize},
{
name: "Custom pool size",
opts: []PoolOpts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB
expectedBuffSize: 8 * 1024,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool(tc.opts...)
assert.Equal(t, tc.expectedBuffSize, pool.bufferSize)
})
}
}

func TestBufferPoolGetPut(t *testing.T) {
t.Parallel()
tests := []struct {
name string
preparePool func(p *Pool) *Buffer // Prepare the pool and return an initial buffer to put if needed
expectedCapBefore int // Expected capacity before putting it back
expectedCapAfter int // Expected capacity after retrieving it again
}{
{
name: "Get new buffer and put back without modification",
preparePool: func(_ *Pool) *Buffer {
return nil // No initial buffer to put
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize),
},
{
name: "Put oversized buffer, expect shrink",
preparePool: func(p *Pool) *Buffer {
buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))}
return buf
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize), // Should shrink back to default
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool()
initialBuf := tc.preparePool(pool)
if initialBuf != nil {
pool.Put(initialBuf)
}

buf := pool.Get()
assert.Equal(t, tc.expectedCapBefore, buf.Cap())

pool.Put(buf)

bufAfter := pool.Get()
assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap())
})
}
}

func TestBufferWrite(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down
57 changes: 11 additions & 46 deletions pkg/buffers/metrics.go → pkg/buffers/buffer/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package buffers
package buffer

import (
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -8,34 +8,6 @@ import (
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})

growCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -50,20 +22,6 @@ var (
Help: "Total amount of bytes buffers in the pool have grown by.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -79,10 +37,17 @@ var (
Buckets: prometheus.ExponentialBuckets(10, 10, 7),
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})
)
45 changes: 45 additions & 0 deletions pkg/buffers/pool/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pool

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
})
)
Loading

0 comments on commit fe27e0f

Please sign in to comment.