Skip to content

Commit

Permalink
[chore] - move buffers pkg out of writers pkg (#2826)
Browse files Browse the repository at this point in the history
* Remove specialized handler and archive struct and restructure handlers pkg.

* Refactor RPM archive handlers to use a library instead of shelling out

* make rpm handling context aware

* update test

* Refactor AR/deb archive handler to use an existing library instead of shelling out

* Update tests

* Handle non-archive data within the DefaultHandler

* make structs and methods private

* Remove non-archive data handling within sources

* add max size check

* add filename and size to context kvp

* move skip file check and is binary check before opening file

* fix test

* preserve existing funcitonality of not handling non-archive files in HandleFile

* Handle non-archive data within the DefaultHandler

* rebase

* Remove non-archive data handling within sources

* Adjust check for rpm/deb archive type

* add additional deb mime type

* add gzip

* move diskbuffered rereader setup into handler pkg

* remove DiskBuffereReader creation logic within sources

* update comment

* move rewind closer

* reduce log verbosity

* add metrics for file handling

* add metrics for errors

* make defaultBufferSize a const

* add metrics for file handling

* add metrics for errors

* fix tests

* add metrics for max archive depth and skipped files

* update error

* skip symlinks and dirs

* update err

* Address incompatible reader to openArchive

* remove nil check

* fix err assignment

* Allow git cat-file blob to complete before trying to handle the file

* wrap compReader with DiskbufferReader

* Allow git cat-file blob to complete before trying to handle the file

* updates

* use buffer writer

* update

* refactor

* update context pkg

* revert stuff

* update test

* fix test

* remove

* use correct reader

* add metrics for file handling

* add metrics for errors

* fix tests

* rebase

* add metrics for errors

* add metrics for max archive depth and skipped files

* update error

* skip symlinks and dirs

* update err

* fix err assignment

* rebase

* remove

* Update write method in contentWriter interface

* Add bufferReadSeekCloser

* update name

* update comment

* fix lint

* Remove specialized handler and archive struct and restructure handlers pkg.

* Refactor RPM archive handlers to use a library instead of shelling out

* make rpm handling context aware

* update test

* Refactor AR/deb archive handler to use an existing library instead of shelling out

* Update tests

* add max size check

* add filename and size to context kvp

* move skip file check and is binary check before opening file

* fix test

* preserve existing funcitonality of not handling non-archive files in HandleFile

* Handle non-archive data within the DefaultHandler

* rebase

* Remove non-archive data handling within sources

* Handle non-archive data within the DefaultHandler

* add gzip

* move diskbuffered rereader setup into handler pkg

* remove DiskBuffereReader creation logic within sources

* update comment

* move rewind closer

* reduce log verbosity

* make defaultBufferSize a const

* add metrics for file handling

* add metrics for errors

* fix tests

* add metrics for max archive depth and skipped files

* update error

* skip symlinks and dirs

* update err

* Address incompatible reader to openArchive

* remove nil check

* fix err assignment

* wrap compReader with DiskbufferReader

* Allow git cat-file blob to complete before trying to handle the file

* updates

* use buffer writer

* update

* refactor

* update context pkg

* revert stuff

* update test

* remove

* rebase

* go mod tidy

* lint check

* update metric to ms

* update metric

* update comments

* dont use ptr

* update

* fix

* Remove specialized handler and archive struct and restructure handlers pkg.

* Refactor RPM archive handlers to use a library instead of shelling out

* make rpm handling context aware

* update test

* Refactor AR/deb archive handler to use an existing library instead of shelling out

* Update tests

* add max size check

* add filename and size to context kvp

* move skip file check and is binary check before opening file

* fix test

* preserve existing funcitonality of not handling non-archive files in HandleFile

* Adjust check for rpm/deb archive type

* add additional deb mime type

* update comment

* go mod tidy

* update go mod

* Add a buffered file reader

* update comments

* use Buffered File Readder

* return buffer

* update

* fix

* return

* go mod tidy

* merge

* use a shared pool

* use sync.Once

* reorganzie

* remove unused code

* fix double init

* fix stuff

* nil check

* reduce allocations

* updates

* update metrics

* updates

* reset buffer instead of putting it back

* skip binaries

* skip

* concurrently process diffs

* close chan

* concurrently enumerate orgs

* increase workers

* ignore pbix and vsdx files

* add metrics for gitparse's Diffchan

* fix metric

* update metrics

* update

* fix checks

* fix

* inc

* update

* reduce

* Create workers to handle binary files

* modify workers

* updates

* add check

* delete code

* use custom reader

* rename struct

* add nonarchive handler

* fix break

* add comments

* add tests

* refactor

* remove log

* do not scan rpm links

* simplify

* rename var

* rename

* fix benchmark

* add buffer

* buffer

* buffer

* handle panic

* merge main

* merge main

* add recover

* revert stuff

* revert

* revert to using reader

* fixes

* remove

* update

* fixes

* linter

* fix test

* move buffers pkg out of writers pkg

* rename

* [refactor] - move buffer pool logic into own pkg (#2828)

* move buffer pool logic into own pkg

* fix test

* fix test

* whoops

* [feat] - additional buffer pool (#2829)

* move buffer pool logic into own pkg

* move

* fix test

* fix test

* fix test

* remove

* fix test

* whoops

* revert

* fix
  • Loading branch information
ahrav authored May 16, 2024
1 parent 896e6e7 commit 5257069
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 221 deletions.
88 changes: 6 additions & 82 deletions pkg/writers/buffer/buffer.go → pkg/buffers/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,94 +5,16 @@ package buffer
import (
"bytes"
"io"
"sync"
"time"
)

type poolMetrics struct{}

func (poolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}

func (poolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
checkoutCount.Inc()
bufferCount.Inc()
}

func (poolMetrics) recordBufferReturn(buf *Buffer) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(buf.Cap()))
totalBufferLength.Add(float64(buf.Len()))
buf.recordMetric()
}

// PoolOpts is a function that configures a BufferPool.
type PoolOpts func(pool *Pool)

// Pool of buffers.
type Pool struct {
*sync.Pool
bufferSize uint32

metrics poolMetrics
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBufferPool creates a new instance of BufferPool.
func NewBufferPool(opts ...PoolOpts) *Pool {
pool := &Pool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))}
},
}

return pool
}

// Get returns a Buffer from the pool.
func (p *Pool) Get() *Buffer {
buf, ok := p.Pool.Get().(*Buffer)
if !ok {
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
}
p.metrics.recordBufferRetrival()
buf.resetMetric()

return buf
}

// Put returns a Buffer to the pool.
func (p *Pool) Put(buf *Buffer) {
p.metrics.recordBufferReturn(buf)

// If the Buffer is more than twice the default size, replace it with a new Buffer.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
p.metrics.recordShrink(buf.Cap() - defaultBufferSize)
buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))}
} else {
// Reset the Buffer to clear any existing data.
buf.Reset()
}

p.Pool.Put(buf)
}

// Buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking Buffer checkout duration.
type Buffer struct {
*bytes.Buffer
checkedOutAt time.Time
}

const defaultBufferSize = 1 << 12 // 4KB
// NewBuffer creates a new instance of Buffer.
func NewBuffer() *Buffer { return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, defaultBufferSize))} }

Expand All @@ -101,12 +23,14 @@ func (b *Buffer) Grow(size int) {
b.recordGrowth(size)
}

func (b *Buffer) resetMetric() { b.checkedOutAt = time.Now() }
func (b *Buffer) ResetMetric() { b.checkedOutAt = time.Now() }

func (b *Buffer) recordMetric() {
func (b *Buffer) RecordMetric() {
dur := time.Since(b.checkedOutAt)
checkoutDuration.Observe(float64(dur.Microseconds()))
checkoutDurationTotal.Add(float64(dur.Microseconds()))
totalBufferSize.Add(float64(b.Cap()))
totalBufferLength.Add(float64(b.Len()))
}

func (b *Buffer) recordGrowth(size int) {
Expand All @@ -119,7 +43,7 @@ func (b *Buffer) Write(data []byte) (int, error) {
if b.Buffer == nil {
// This case should ideally never occur if buffers are properly managed.
b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
b.resetMetric()
b.ResetMetric()
}

size := len(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNewBufferPool(t *testing.T) {
t.Parallel()
tests := []struct {
name string
opts []PoolOpts
expectedBuffSize uint32
}{
{name: "Default pool size", expectedBuffSize: defaultBufferSize},
{
name: "Custom pool size",
opts: []PoolOpts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB
expectedBuffSize: 8 * 1024,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool(tc.opts...)
assert.Equal(t, tc.expectedBuffSize, pool.bufferSize)
})
}
}

func TestBufferPoolGetPut(t *testing.T) {
t.Parallel()
tests := []struct {
name string
preparePool func(p *Pool) *Buffer // Prepare the pool and return an initial buffer to put if needed
expectedCapBefore int // Expected capacity before putting it back
expectedCapAfter int // Expected capacity after retrieving it again
}{
{
name: "Get new buffer and put back without modification",
preparePool: func(_ *Pool) *Buffer {
return nil // No initial buffer to put
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize),
},
{
name: "Put oversized buffer, expect shrink",
preparePool: func(p *Pool) *Buffer {
buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))}
return buf
},
expectedCapBefore: int(defaultBufferSize),
expectedCapAfter: int(defaultBufferSize), // Should shrink back to default
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pool := NewBufferPool()
initialBuf := tc.preparePool(pool)
if initialBuf != nil {
pool.Put(initialBuf)
}

buf := pool.Get()
assert.Equal(t, tc.expectedCapBefore, buf.Cap())

pool.Put(buf)

bufAfter := pool.Get()
assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap())
})
}
}

func TestBufferWrite(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down
55 changes: 10 additions & 45 deletions pkg/writers/buffer/metrics.go → pkg/buffers/buffer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,6 @@ import (
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})

growCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -50,20 +22,6 @@ var (
Help: "Total amount of bytes buffers in the pool have grown by.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand All @@ -79,10 +37,17 @@ var (
Buckets: prometheus.ExponentialBuckets(10, 10, 7),
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})

totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})
)
45 changes: 45 additions & 0 deletions pkg/buffers/pool/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pool

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)

var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})

bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})

shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})

shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})

checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of Buffer checkouts.",
})
)
Loading

0 comments on commit 5257069

Please sign in to comment.