From fe27e0fbca45b391ecf3256556d461fab505137e Mon Sep 17 00:00:00 2001 From: ahrav Date: Mon, 13 May 2024 14:32:15 -0700 Subject: [PATCH] [refactor] - move buffer pool logic into own pkg (#2828) * move buffer pool logic into own pkg * fix test * fix test * whoops --- pkg/buffers/{ => buffer}/buffer.go | 90 ++----------------- pkg/buffers/{ => buffer}/buffer_test.go | 75 +--------------- pkg/buffers/{ => buffer}/metrics.go | 57 +++--------- pkg/buffers/pool/metrics.go | 45 ++++++++++ pkg/buffers/pool/pool.go | 84 +++++++++++++++++ pkg/buffers/pool/pool_test.go | 83 +++++++++++++++++ pkg/writers/buffer_writer/bufferwriter.go | 23 ++--- .../bufferedfilewriter.go | 19 ++-- .../bufferedfilewriter_test.go | 4 +- 9 files changed, 255 insertions(+), 225 deletions(-) rename pkg/buffers/{ => buffer}/buffer.go (63%) rename pkg/buffers/{ => buffer}/buffer_test.go (55%) rename pkg/buffers/{ => buffer}/metrics.go (58%) create mode 100644 pkg/buffers/pool/metrics.go create mode 100644 pkg/buffers/pool/pool.go create mode 100644 pkg/buffers/pool/pool_test.go diff --git a/pkg/buffers/buffer.go b/pkg/buffers/buffer/buffer.go similarity index 63% rename from pkg/buffers/buffer.go rename to pkg/buffers/buffer/buffer.go index f6b4337262d6..b3bd17749d6c 100644 --- a/pkg/buffers/buffer.go +++ b/pkg/buffers/buffer/buffer.go @@ -1,98 +1,20 @@ // Package buffer provides a custom buffer type that includes metrics for tracking buffer usage. // It also provides a pool for managing buffer reusability. -package buffers +package buffer import ( "bytes" "io" - "sync" "time" ) -type poolMetrics struct{} - -func (poolMetrics) recordShrink(amount int) { - shrinkCount.Inc() - shrinkAmount.Add(float64(amount)) -} - -func (poolMetrics) recordBufferRetrival() { - activeBufferCount.Inc() - checkoutCount.Inc() - bufferCount.Inc() -} - -func (poolMetrics) recordBufferReturn(buf *Buffer) { - activeBufferCount.Dec() - totalBufferSize.Add(float64(buf.Cap())) - totalBufferLength.Add(float64(buf.Len())) - buf.recordMetric() -} - -// PoolOpts is a function that configures a BufferPool. -type PoolOpts func(pool *Pool) - -// Pool of buffers. -type Pool struct { - *sync.Pool - bufferSize uint32 - - metrics poolMetrics -} - -const defaultBufferSize = 1 << 12 // 4KB -// NewBufferPool creates a new instance of BufferPool. -func NewBufferPool(opts ...PoolOpts) *Pool { - pool := &Pool{bufferSize: defaultBufferSize} - - for _, opt := range opts { - opt(pool) - } - pool.Pool = &sync.Pool{ - New: func() any { - return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))} - }, - } - - return pool -} - -// Get returns a Buffer from the pool. -func (p *Pool) Get() *Buffer { - buf, ok := p.Pool.Get().(*Buffer) - if !ok { - buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} - } - p.metrics.recordBufferRetrival() - buf.resetMetric() - - return buf -} - -// Put returns a Buffer to the pool. -func (p *Pool) Put(buf *Buffer) { - p.metrics.recordBufferReturn(buf) - - // If the Buffer is more than twice the default size, replace it with a new Buffer. - // This prevents us from returning very large buffers to the pool. - const maxAllowedCapacity = 2 * defaultBufferSize - if buf.Cap() > maxAllowedCapacity { - p.metrics.recordShrink(buf.Cap() - defaultBufferSize) - buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} - } else { - // Reset the Buffer to clear any existing data. - buf.Reset() - } - - p.Pool.Put(buf) -} - // Buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking Buffer checkout duration. type Buffer struct { *bytes.Buffer checkedOutAt time.Time } +const defaultBufferSize = 1 << 12 // 4KB // NewBuffer creates a new instance of Buffer. func NewBuffer() *Buffer { return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, defaultBufferSize))} } @@ -101,12 +23,14 @@ func (b *Buffer) Grow(size int) { b.recordGrowth(size) } -func (b *Buffer) resetMetric() { b.checkedOutAt = time.Now() } +func (b *Buffer) ResetMetric() { b.checkedOutAt = time.Now() } -func (b *Buffer) recordMetric() { +func (b *Buffer) RecordMetric() { dur := time.Since(b.checkedOutAt) checkoutDuration.Observe(float64(dur.Microseconds())) checkoutDurationTotal.Add(float64(dur.Microseconds())) + totalBufferSize.Add(float64(b.Cap())) + totalBufferLength.Add(float64(b.Len())) } func (b *Buffer) recordGrowth(size int) { @@ -119,7 +43,7 @@ func (b *Buffer) Write(data []byte) (int, error) { if b.Buffer == nil { // This case should ideally never occur if buffers are properly managed. b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize)) - b.resetMetric() + b.ResetMetric() } size := len(data) diff --git a/pkg/buffers/buffer_test.go b/pkg/buffers/buffer/buffer_test.go similarity index 55% rename from pkg/buffers/buffer_test.go rename to pkg/buffers/buffer/buffer_test.go index ce428c0fe0f1..bbf4987e23b5 100644 --- a/pkg/buffers/buffer_test.go +++ b/pkg/buffers/buffer/buffer_test.go @@ -1,4 +1,4 @@ -package buffers +package buffer import ( "bytes" @@ -7,79 +7,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewBufferPool(t *testing.T) { - t.Parallel() - tests := []struct { - name string - opts []PoolOpts - expectedBuffSize uint32 - }{ - {name: "Default pool size", expectedBuffSize: defaultBufferSize}, - { - name: "Custom pool size", - opts: []PoolOpts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB - expectedBuffSize: 8 * 1024, - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - pool := NewBufferPool(tc.opts...) - assert.Equal(t, tc.expectedBuffSize, pool.bufferSize) - }) - } -} - -func TestBufferPoolGetPut(t *testing.T) { - t.Parallel() - tests := []struct { - name string - preparePool func(p *Pool) *Buffer // Prepare the pool and return an initial buffer to put if needed - expectedCapBefore int // Expected capacity before putting it back - expectedCapAfter int // Expected capacity after retrieving it again - }{ - { - name: "Get new buffer and put back without modification", - preparePool: func(_ *Pool) *Buffer { - return nil // No initial buffer to put - }, - expectedCapBefore: int(defaultBufferSize), - expectedCapAfter: int(defaultBufferSize), - }, - { - name: "Put oversized buffer, expect shrink", - preparePool: func(p *Pool) *Buffer { - buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))} - return buf - }, - expectedCapBefore: int(defaultBufferSize), - expectedCapAfter: int(defaultBufferSize), // Should shrink back to default - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - pool := NewBufferPool() - initialBuf := tc.preparePool(pool) - if initialBuf != nil { - pool.Put(initialBuf) - } - - buf := pool.Get() - assert.Equal(t, tc.expectedCapBefore, buf.Cap()) - - pool.Put(buf) - - bufAfter := pool.Get() - assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap()) - }) - } -} - func TestBufferWrite(t *testing.T) { t.Parallel() tests := []struct { diff --git a/pkg/buffers/metrics.go b/pkg/buffers/buffer/metrics.go similarity index 58% rename from pkg/buffers/metrics.go rename to pkg/buffers/buffer/metrics.go index 47a4bcec079e..4a22580ebeb8 100644 --- a/pkg/buffers/metrics.go +++ b/pkg/buffers/buffer/metrics.go @@ -1,4 +1,4 @@ -package buffers +package buffer import ( "github.com/prometheus/client_golang/prometheus" @@ -8,34 +8,6 @@ import ( ) var ( - activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "active_buffer_count", - Help: "Current number of active buffers.", - }) - - bufferCount = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "buffer_count", - Help: "Total number of buffers managed by the pool.", - }) - - totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "total_buffer_length", - Help: "Total length of all buffers combined.", - }) - - totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "total_buffer_size", - Help: "Total size of all buffers combined.", - }) - growCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, @@ -50,20 +22,6 @@ var ( Help: "Total amount of bytes buffers in the pool have grown by.", }) - shrinkCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "shrink_count", - Help: "Total number of times buffers in the pool have shrunk.", - }) - - shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "shrink_amount", - Help: "Total amount of bytes buffers in the pool have shrunk by.", - }) - checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, @@ -79,10 +37,17 @@ var ( Buckets: prometheus.ExponentialBuckets(10, 10, 7), }) - checkoutCount = promauto.NewCounter(prometheus.CounterOpts{ + totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_buffer_length", + Help: "Total length of all buffers combined.", + }) + + totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, - Name: "checkout_count", - Help: "Total number of Buffer checkouts.", + Name: "total_buffer_size", + Help: "Total size of all buffers combined.", }) ) diff --git a/pkg/buffers/pool/metrics.go b/pkg/buffers/pool/metrics.go new file mode 100644 index 000000000000..3d4de5410611 --- /dev/null +++ b/pkg/buffers/pool/metrics.go @@ -0,0 +1,45 @@ +package pool + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/trufflesecurity/trufflehog/v3/pkg/common" +) + +var ( + activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "active_buffer_count", + Help: "Current number of active buffers.", + }) + + bufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "buffer_count", + Help: "Total number of buffers managed by the pool.", + }) + + shrinkCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_count", + Help: "Total number of times buffers in the pool have shrunk.", + }) + + shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_amount", + Help: "Total amount of bytes buffers in the pool have shrunk by.", + }) + + checkoutCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "checkout_count", + Help: "Total number of Buffer checkouts.", + }) +) diff --git a/pkg/buffers/pool/pool.go b/pkg/buffers/pool/pool.go new file mode 100644 index 000000000000..25ca6559f7e5 --- /dev/null +++ b/pkg/buffers/pool/pool.go @@ -0,0 +1,84 @@ +package pool + +import ( + "bytes" + "sync" + + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" +) + +type poolMetrics struct{} + +func (poolMetrics) recordShrink(amount int) { + shrinkCount.Inc() + shrinkAmount.Add(float64(amount)) +} + +func (poolMetrics) recordBufferRetrival() { + activeBufferCount.Inc() + checkoutCount.Inc() + bufferCount.Inc() +} + +func (poolMetrics) recordBufferReturn(buf *buffer.Buffer) { + activeBufferCount.Dec() + buf.RecordMetric() +} + +// Opts is a function that configures a BufferPool. +type Opts func(pool *Pool) + +// Pool of buffers. +type Pool struct { + *sync.Pool + bufferSize uint32 + + metrics poolMetrics +} + +const defaultBufferSize = 1 << 12 // 4KB +// NewBufferPool creates a new instance of BufferPool. +func NewBufferPool(opts ...Opts) *Pool { + pool := &Pool{bufferSize: defaultBufferSize} + + for _, opt := range opts { + opt(pool) + } + pool.Pool = &sync.Pool{ + New: func() any { + return &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))} + }, + } + + return pool +} + +// Get returns a Buffer from the pool. +func (p *Pool) Get() *buffer.Buffer { + buf, ok := p.Pool.Get().(*buffer.Buffer) + if !ok { + buf = &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} + } + p.metrics.recordBufferRetrival() + buf.ResetMetric() + + return buf +} + +// Put returns a Buffer to the pool. +func (p *Pool) Put(buf *buffer.Buffer) { + p.metrics.recordBufferReturn(buf) + + // If the Buffer is more than twice the default size, replace it with a new Buffer. + // This prevents us from returning very large buffers to the pool. + const maxAllowedCapacity = 2 * defaultBufferSize + if buf.Cap() > maxAllowedCapacity { + p.metrics.recordShrink(buf.Cap() - defaultBufferSize) + buf = &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} + } else { + // Reset the Buffer to clear any existing data. + buf.Reset() + } + + p.Pool.Put(buf) +} diff --git a/pkg/buffers/pool/pool_test.go b/pkg/buffers/pool/pool_test.go new file mode 100644 index 000000000000..1968356e1f47 --- /dev/null +++ b/pkg/buffers/pool/pool_test.go @@ -0,0 +1,83 @@ +package pool + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" +) + +func TestNewBufferPool(t *testing.T) { + t.Parallel() + tests := []struct { + name string + opts []Opts + expectedBuffSize uint32 + }{ + {name: "Default pool size", expectedBuffSize: defaultBufferSize}, + { + name: "Custom pool size", + opts: []Opts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB + expectedBuffSize: 8 * 1024, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pool := NewBufferPool(tc.opts...) + assert.Equal(t, tc.expectedBuffSize, pool.bufferSize) + }) + } +} + +func TestBufferPoolGetPut(t *testing.T) { + t.Parallel() + tests := []struct { + name string + preparePool func(p *Pool) *buffer.Buffer // Prepare the pool and return an initial buffer to put if needed + expectedCapBefore int // Expected capacity before putting it back + expectedCapAfter int // Expected capacity after retrieving it again + }{ + { + name: "Get new buffer and put back without modification", + preparePool: func(_ *Pool) *buffer.Buffer { + return nil // No initial buffer to put + }, + expectedCapBefore: int(defaultBufferSize), + expectedCapAfter: int(defaultBufferSize), + }, + { + name: "Put oversized buffer, expect shrink", + preparePool: func(p *Pool) *buffer.Buffer { + buf := &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))} + return buf + }, + expectedCapBefore: int(defaultBufferSize), + expectedCapAfter: int(defaultBufferSize), // Should shrink back to default + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pool := NewBufferPool() + initialBuf := tc.preparePool(pool) + if initialBuf != nil { + pool.Put(initialBuf) + } + + buf := pool.Get() + assert.Equal(t, tc.expectedCapBefore, buf.Cap()) + + pool.Put(buf) + + bufAfter := pool.Get() + assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap()) + }) + } +} diff --git a/pkg/writers/buffer_writer/bufferwriter.go b/pkg/writers/buffer_writer/bufferwriter.go index 1fa752fc3352..cbce213e4e71 100644 --- a/pkg/writers/buffer_writer/bufferwriter.go +++ b/pkg/writers/buffer_writer/bufferwriter.go @@ -1,4 +1,4 @@ -// Package bufferwritter provides a contentWriter implementation using a shared buffer pool for memory management. +// Package bufferwriter provides a contentWriter implementation using a shared buffer pool for memory management. package bufferwriter import ( @@ -6,7 +6,8 @@ import ( "io" "time" - "github.com/trufflesecurity/trufflehog/v3/pkg/buffers" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" ) type metrics struct{} @@ -16,11 +17,11 @@ func (metrics) recordDataProcessed(size int64, dur time.Duration) { totalWriteDuration.Add(float64(dur.Microseconds())) } -func init() { bufferPool = buffers.NewBufferPool() } +func init() { bufferPool = pool.NewBufferPool() } // bufferPool is the shared Buffer pool used by all BufferedFileWriters. // This allows for efficient reuse of buffers across multiple writers. -var bufferPool *buffers.Pool +var bufferPool *pool.Pool // state represents the current mode of buffer. type state uint8 @@ -34,10 +35,10 @@ const ( // BufferWriter implements contentWriter, using a shared buffer pool for memory management. type BufferWriter struct { - buf *buffers.Buffer // The current buffer in use. - bufPool *buffers.Pool // The buffer pool used to manage the buffer. - size int // The total size of the content written to the buffer. - state state // The current state of the buffer. + buf *buffer.Buffer // The current buffer in use. + bufPool *pool.Pool // The buffer pool used to manage the buffer. + size int // The total size of the content written to the buffer. + state state // The current state of the buffer. metrics metrics } @@ -54,7 +55,7 @@ func (b *BufferWriter) Write(data []byte) (int, error) { if b.buf == nil { b.buf = b.bufPool.Get() if b.buf == nil { - b.buf = buffers.NewBuffer() + b.buf = buffer.NewBuffer() } } @@ -63,8 +64,8 @@ func (b *BufferWriter) Write(data []byte) (int, error) { start := time.Now() defer func(start time.Time) { b.metrics.recordDataProcessed(int64(size), time.Since(start)) - }(start) + return b.buf.Write(data) } @@ -79,7 +80,7 @@ func (b *BufferWriter) ReadCloser() (io.ReadCloser, error) { return nil, fmt.Errorf("writer buffer is nil") } - return buffers.ReadCloser(b.buf.Bytes(), func() { b.bufPool.Put(b.buf) }), nil + return buffer.ReadCloser(b.buf.Bytes(), func() { b.bufPool.Put(b.buf) }), nil } // CloseForWriting is a no-op for buffer, as there is no resource cleanup needed for bytes.Buffer. diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index d82f44ba124d..516bbb85ffc5 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -10,15 +10,16 @@ import ( "os" "time" - "github.com/trufflesecurity/trufflehog/v3/pkg/buffers" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" ) // sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. // This allows for efficient reuse of buffers across multiple writers. -var sharedBufferPool *buffers.Pool +var sharedBufferPool *pool.Pool -func init() { sharedBufferPool = buffers.NewBufferPool() } +func init() { sharedBufferPool = pool.NewBufferPool() } type bufferedFileWriterMetrics struct{} @@ -48,10 +49,10 @@ type BufferedFileWriter struct { threshold uint64 // Threshold for switching to file writing. size uint64 // Total size of the data written. - bufPool *buffers.Pool // Pool for storing buffers for reuse. - buf *buffers.Buffer // Buffer for storing data under the threshold in memory. - filename string // Name of the temporary file. - file *os.File // File for storing data over the threshold. + bufPool *pool.Pool // Pool for storing buffers for reuse. + buf *buffer.Buffer // Buffer for storing data under the threshold in memory. + filename string // Name of the temporary file. + file *os.File // File for storing data over the threshold. state state // Current state of the writer. (writeOnly or readOnly) @@ -130,7 +131,7 @@ func (w *BufferedFileWriter) Write(data []byte) (int, error) { if w.buf == nil { w.buf = w.bufPool.Get() if w.buf == nil { - w.buf = buffers.NewBuffer() + w.buf = buffer.NewBuffer() } } @@ -271,7 +272,7 @@ func (w *BufferedFileWriter) ReadSeekCloser() (io.ReadSeekCloser, error) { } // Data is in memory. - return buffers.ReadCloser(w.buf.Bytes(), func() { w.bufPool.Put(w.buf) }), nil + return buffer.ReadCloser(w.buf.Bytes(), func() { w.bufPool.Put(w.buf) }), nil } // autoDeletingFileReader wraps an *os.File and deletes the file on Close. diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go index 13a0cf9a1f4f..a4d2102e8462 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/trufflesecurity/trufflehog/v3/pkg/buffers" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" ) func TestBufferedFileWriterNewThreshold(t *testing.T) { @@ -593,7 +593,7 @@ func TestNewFromReaderThresholdExceeded(t *testing.T) { } func TestBufferWriterCloseForWritingWithFile(t *testing.T) { - bufPool := buffers.NewBufferPool() + bufPool := pool.NewBufferPool() buf := bufPool.Get() writer := &BufferedFileWriter{