diff --git a/pkg/writers/buffer/buffer.go b/pkg/buffers/buffer/buffer.go similarity index 63% rename from pkg/writers/buffer/buffer.go rename to pkg/buffers/buffer/buffer.go index 9435ab877ef5..b3bd17749d6c 100644 --- a/pkg/writers/buffer/buffer.go +++ b/pkg/buffers/buffer/buffer.go @@ -5,94 +5,16 @@ package buffer import ( "bytes" "io" - "sync" "time" ) -type poolMetrics struct{} - -func (poolMetrics) recordShrink(amount int) { - shrinkCount.Inc() - shrinkAmount.Add(float64(amount)) -} - -func (poolMetrics) recordBufferRetrival() { - activeBufferCount.Inc() - checkoutCount.Inc() - bufferCount.Inc() -} - -func (poolMetrics) recordBufferReturn(buf *Buffer) { - activeBufferCount.Dec() - totalBufferSize.Add(float64(buf.Cap())) - totalBufferLength.Add(float64(buf.Len())) - buf.recordMetric() -} - -// PoolOpts is a function that configures a BufferPool. -type PoolOpts func(pool *Pool) - -// Pool of buffers. -type Pool struct { - *sync.Pool - bufferSize uint32 - - metrics poolMetrics -} - -const defaultBufferSize = 1 << 12 // 4KB -// NewBufferPool creates a new instance of BufferPool. -func NewBufferPool(opts ...PoolOpts) *Pool { - pool := &Pool{bufferSize: defaultBufferSize} - - for _, opt := range opts { - opt(pool) - } - pool.Pool = &sync.Pool{ - New: func() any { - return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))} - }, - } - - return pool -} - -// Get returns a Buffer from the pool. -func (p *Pool) Get() *Buffer { - buf, ok := p.Pool.Get().(*Buffer) - if !ok { - buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} - } - p.metrics.recordBufferRetrival() - buf.resetMetric() - - return buf -} - -// Put returns a Buffer to the pool. -func (p *Pool) Put(buf *Buffer) { - p.metrics.recordBufferReturn(buf) - - // If the Buffer is more than twice the default size, replace it with a new Buffer. - // This prevents us from returning very large buffers to the pool. - const maxAllowedCapacity = 2 * defaultBufferSize - if buf.Cap() > maxAllowedCapacity { - p.metrics.recordShrink(buf.Cap() - defaultBufferSize) - buf = &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} - } else { - // Reset the Buffer to clear any existing data. - buf.Reset() - } - - p.Pool.Put(buf) -} - // Buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking Buffer checkout duration. type Buffer struct { *bytes.Buffer checkedOutAt time.Time } +const defaultBufferSize = 1 << 12 // 4KB // NewBuffer creates a new instance of Buffer. func NewBuffer() *Buffer { return &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, defaultBufferSize))} } @@ -101,12 +23,14 @@ func (b *Buffer) Grow(size int) { b.recordGrowth(size) } -func (b *Buffer) resetMetric() { b.checkedOutAt = time.Now() } +func (b *Buffer) ResetMetric() { b.checkedOutAt = time.Now() } -func (b *Buffer) recordMetric() { +func (b *Buffer) RecordMetric() { dur := time.Since(b.checkedOutAt) checkoutDuration.Observe(float64(dur.Microseconds())) checkoutDurationTotal.Add(float64(dur.Microseconds())) + totalBufferSize.Add(float64(b.Cap())) + totalBufferLength.Add(float64(b.Len())) } func (b *Buffer) recordGrowth(size int) { @@ -119,7 +43,7 @@ func (b *Buffer) Write(data []byte) (int, error) { if b.Buffer == nil { // This case should ideally never occur if buffers are properly managed. b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize)) - b.resetMetric() + b.ResetMetric() } size := len(data) diff --git a/pkg/writers/buffer/buffer_test.go b/pkg/buffers/buffer/buffer_test.go similarity index 56% rename from pkg/writers/buffer/buffer_test.go rename to pkg/buffers/buffer/buffer_test.go index 714f387af987..bbf4987e23b5 100644 --- a/pkg/writers/buffer/buffer_test.go +++ b/pkg/buffers/buffer/buffer_test.go @@ -7,79 +7,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewBufferPool(t *testing.T) { - t.Parallel() - tests := []struct { - name string - opts []PoolOpts - expectedBuffSize uint32 - }{ - {name: "Default pool size", expectedBuffSize: defaultBufferSize}, - { - name: "Custom pool size", - opts: []PoolOpts{func(p *Pool) { p.bufferSize = 8 * 1024 }}, // 8KB - expectedBuffSize: 8 * 1024, - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - pool := NewBufferPool(tc.opts...) - assert.Equal(t, tc.expectedBuffSize, pool.bufferSize) - }) - } -} - -func TestBufferPoolGetPut(t *testing.T) { - t.Parallel() - tests := []struct { - name string - preparePool func(p *Pool) *Buffer // Prepare the pool and return an initial buffer to put if needed - expectedCapBefore int // Expected capacity before putting it back - expectedCapAfter int // Expected capacity after retrieving it again - }{ - { - name: "Get new buffer and put back without modification", - preparePool: func(_ *Pool) *Buffer { - return nil // No initial buffer to put - }, - expectedCapBefore: int(defaultBufferSize), - expectedCapAfter: int(defaultBufferSize), - }, - { - name: "Put oversized buffer, expect shrink", - preparePool: func(p *Pool) *Buffer { - buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))} - return buf - }, - expectedCapBefore: int(defaultBufferSize), - expectedCapAfter: int(defaultBufferSize), // Should shrink back to default - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - pool := NewBufferPool() - initialBuf := tc.preparePool(pool) - if initialBuf != nil { - pool.Put(initialBuf) - } - - buf := pool.Get() - assert.Equal(t, tc.expectedCapBefore, buf.Cap()) - - pool.Put(buf) - - bufAfter := pool.Get() - assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap()) - }) - } -} - func TestBufferWrite(t *testing.T) { t.Parallel() tests := []struct { diff --git a/pkg/writers/buffer/metrics.go b/pkg/buffers/buffer/metrics.go similarity index 59% rename from pkg/writers/buffer/metrics.go rename to pkg/buffers/buffer/metrics.go index 7dbb2f05a8f9..4a22580ebeb8 100644 --- a/pkg/writers/buffer/metrics.go +++ b/pkg/buffers/buffer/metrics.go @@ -8,34 +8,6 @@ import ( ) var ( - activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "active_buffer_count", - Help: "Current number of active buffers.", - }) - - bufferCount = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "buffer_count", - Help: "Total number of buffers managed by the pool.", - }) - - totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "total_buffer_length", - Help: "Total length of all buffers combined.", - }) - - totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "total_buffer_size", - Help: "Total size of all buffers combined.", - }) - growCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, @@ -50,20 +22,6 @@ var ( Help: "Total amount of bytes buffers in the pool have grown by.", }) - shrinkCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "shrink_count", - Help: "Total number of times buffers in the pool have shrunk.", - }) - - shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: common.MetricsNamespace, - Subsystem: common.MetricsSubsystem, - Name: "shrink_amount", - Help: "Total amount of bytes buffers in the pool have shrunk by.", - }) - checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, @@ -79,10 +37,17 @@ var ( Buckets: prometheus.ExponentialBuckets(10, 10, 7), }) - checkoutCount = promauto.NewCounter(prometheus.CounterOpts{ + totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "total_buffer_length", + Help: "Total length of all buffers combined.", + }) + + totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, - Name: "checkout_count", - Help: "Total number of Buffer checkouts.", + Name: "total_buffer_size", + Help: "Total size of all buffers combined.", }) ) diff --git a/pkg/buffers/pool/metrics.go b/pkg/buffers/pool/metrics.go new file mode 100644 index 000000000000..3d4de5410611 --- /dev/null +++ b/pkg/buffers/pool/metrics.go @@ -0,0 +1,45 @@ +package pool + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/trufflesecurity/trufflehog/v3/pkg/common" +) + +var ( + activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "active_buffer_count", + Help: "Current number of active buffers.", + }) + + bufferCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "buffer_count", + Help: "Total number of buffers managed by the pool.", + }) + + shrinkCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_count", + Help: "Total number of times buffers in the pool have shrunk.", + }) + + shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "shrink_amount", + Help: "Total amount of bytes buffers in the pool have shrunk by.", + }) + + checkoutCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: common.MetricsNamespace, + Subsystem: common.MetricsSubsystem, + Name: "checkout_count", + Help: "Total number of Buffer checkouts.", + }) +) diff --git a/pkg/buffers/pool/pool.go b/pkg/buffers/pool/pool.go new file mode 100644 index 000000000000..d3e5c82d806e --- /dev/null +++ b/pkg/buffers/pool/pool.go @@ -0,0 +1,78 @@ +package pool + +import ( + "bytes" + "sync" + + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" +) + +type poolMetrics struct{} + +func (poolMetrics) recordShrink(amount int) { + shrinkCount.Inc() + shrinkAmount.Add(float64(amount)) +} + +func (poolMetrics) recordBufferRetrival() { + activeBufferCount.Inc() + checkoutCount.Inc() + bufferCount.Inc() +} + +func (poolMetrics) recordBufferReturn(buf *buffer.Buffer) { + activeBufferCount.Dec() + buf.RecordMetric() +} + +// Pool of buffers. +type Pool struct { + *sync.Pool + bufferSize int + + metrics poolMetrics +} + +const defaultBufferSize = 1 << 12 // 4KB +// NewBufferPool creates a new instance of BufferPool. +func NewBufferPool(size int) *Pool { + pool := &Pool{bufferSize: size} + + pool.Pool = &sync.Pool{ + New: func() any { + return &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))} + }, + } + + return pool +} + +// Get returns a Buffer from the pool. +func (p *Pool) Get() *buffer.Buffer { + buf, ok := p.Pool.Get().(*buffer.Buffer) + if !ok { + buf = &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} + } + p.metrics.recordBufferRetrival() + buf.ResetMetric() + + return buf +} + +// Put returns a Buffer to the pool. +func (p *Pool) Put(buf *buffer.Buffer) { + p.metrics.recordBufferReturn(buf) + + // If the Buffer is more than twice the default size, replace it with a new Buffer. + // This prevents us from returning very large buffers to the pool. + const maxAllowedCapacity = 2 * defaultBufferSize + if buf.Cap() > int(maxAllowedCapacity) { + p.metrics.recordShrink(buf.Cap() - defaultBufferSize) + buf = &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, p.bufferSize))} + } else { + // Reset the Buffer to clear any existing data. + buf.Reset() + } + + p.Pool.Put(buf) +} diff --git a/pkg/buffers/pool/pool_test.go b/pkg/buffers/pool/pool_test.go new file mode 100644 index 000000000000..3fb85a028e7c --- /dev/null +++ b/pkg/buffers/pool/pool_test.go @@ -0,0 +1,83 @@ +package pool + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" +) + +func TestNewBufferPool(t *testing.T) { + t.Parallel() + tests := []struct { + name string + size int + expectedBuffSize int + }{ + {name: "Default pool size", size: defaultBufferSize, expectedBuffSize: defaultBufferSize}, + { + name: "Custom pool size", + size: 8 * 1024, + expectedBuffSize: 8 * 1024, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pool := NewBufferPool(tc.size) + assert.Equal(t, tc.expectedBuffSize, pool.bufferSize) + }) + } +} + +func TestBufferPoolGetPut(t *testing.T) { + t.Parallel() + tests := []struct { + name string + preparePool func(p *Pool) *buffer.Buffer // Prepare the pool and return an initial buffer to put if needed + expectedCapBefore int // Expected capacity before putting it back + expectedCapAfter int // Expected capacity after retrieving it again + }{ + { + name: "Get new buffer and put back without modification", + preparePool: func(_ *Pool) *buffer.Buffer { + return nil // No initial buffer to put + }, + expectedCapBefore: defaultBufferSize, + expectedCapAfter: defaultBufferSize, + }, + { + name: "Put oversized buffer, expect shrink", + preparePool: func(p *Pool) *buffer.Buffer { + buf := &buffer.Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, 3*defaultBufferSize))} + return buf + }, + expectedCapBefore: defaultBufferSize, + expectedCapAfter: defaultBufferSize, // Should shrink back to default + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pool := NewBufferPool(defaultBufferSize) + initialBuf := tc.preparePool(pool) + if initialBuf != nil { + pool.Put(initialBuf) + } + + buf := pool.Get() + assert.Equal(t, tc.expectedCapBefore, buf.Cap()) + + pool.Put(buf) + + bufAfter := pool.Get() + assert.Equal(t, tc.expectedCapAfter, bufAfter.Cap()) + }) + } +} diff --git a/pkg/writers/buffer_writer/bufferwriter.go b/pkg/writers/buffer_writer/bufferwriter.go index 8f0122223343..fe836d942b65 100644 --- a/pkg/writers/buffer_writer/bufferwriter.go +++ b/pkg/writers/buffer_writer/bufferwriter.go @@ -1,4 +1,4 @@ -// Package bufferwritter provides a contentWriter implementation using a shared buffer pool for memory management. +// Package bufferwriter provides a contentWriter implementation using a shared buffer pool for memory management. package bufferwriter import ( @@ -6,7 +6,8 @@ import ( "io" "time" - "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" ) type metrics struct{} @@ -16,11 +17,12 @@ func (metrics) recordDataProcessed(size int64, dur time.Duration) { totalWriteDuration.Add(float64(dur.Microseconds())) } -func init() { bufferPool = buffer.NewBufferPool() } +const defaultBufferSize = 1 << 12 // 4KB +func init() { bufferPool = pool.NewBufferPool(defaultBufferSize) } // bufferPool is the shared Buffer pool used by all BufferedFileWriters. // This allows for efficient reuse of buffers across multiple writers. -var bufferPool *buffer.Pool +var bufferPool *pool.Pool // state represents the current mode of buffer. type state uint8 @@ -35,7 +37,7 @@ const ( // BufferWriter implements contentWriter, using a shared buffer pool for memory management. type BufferWriter struct { buf *buffer.Buffer // The current buffer in use. - bufPool *buffer.Pool // The buffer pool used to manage the buffer. + bufPool *pool.Pool // The buffer pool used to manage the buffer. size int // The total size of the content written to the buffer. state state // The current state of the buffer. @@ -43,14 +45,15 @@ type BufferWriter struct { } // New creates a new instance of BufferWriter. -func New() *BufferWriter { return &BufferWriter{state: writeOnly, bufPool: bufferPool} } +func New() *BufferWriter { + return &BufferWriter{state: writeOnly, bufPool: bufferPool} +} // Write delegates the writing operation to the underlying bytes.Buffer. func (b *BufferWriter) Write(data []byte) (int, error) { if b.state != writeOnly { return 0, fmt.Errorf("buffer must be in write-only mode to write data; current state: %d", b.state) } - if b.buf == nil { b.buf = b.bufPool.Get() if b.buf == nil { @@ -63,8 +66,8 @@ func (b *BufferWriter) Write(data []byte) (int, error) { start := time.Now() defer func(start time.Time) { b.metrics.recordDataProcessed(int64(size), time.Since(start)) - }(start) + return b.buf.Write(data) } diff --git a/pkg/writers/buffer_writer/metrics.go b/pkg/writers/buffer_writer/metrics.go index 29c923039d95..e47c040e7cf3 100644 --- a/pkg/writers/buffer_writer/metrics.go +++ b/pkg/writers/buffer_writer/metrics.go @@ -12,7 +12,7 @@ var ( Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, Name: "buffer_writer_write_size_bytes", - Help: "Size of data written by the BufferWriter in bytes.", + Help: "Total size of data written by the BufferWriter in bytes.", Buckets: prometheus.ExponentialBuckets(100, 10, 7), }) diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index 3e9a5aa37db4..63c6733bad1f 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -10,16 +10,11 @@ import ( "os" "time" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" - "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffer" ) -// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. -// This allows for efficient reuse of buffers across multiple writers. -var sharedBufferPool *buffer.Pool - -func init() { sharedBufferPool = buffer.NewBufferPool() } - type bufferedFileWriterMetrics struct{} func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) { @@ -32,6 +27,30 @@ func (bufferedFileWriterMetrics) recordDiskWrite(size int64) { fileSizeHistogram.Observe(float64(size)) } +type PoolSize int + +const ( + Default PoolSize = iota + Large +) + +const ( + defaultBufferSize = 1 << 12 // 4KB + largeBufferSize = 1 << 16 // 64KB +) + +func init() { + defaultBufferPool = pool.NewBufferPool(defaultBufferSize) + largeBufferPool = pool.NewBufferPool(largeBufferSize) +} + +// Different buffer pools for different buffer sizes. +// This allows for more efficient memory management based on the size of the data being written. +var ( + defaultBufferPool *pool.Pool + largeBufferPool *pool.Pool +) + // state represents the current mode of BufferedFileWriter. type state uint8 @@ -48,7 +67,7 @@ type BufferedFileWriter struct { threshold uint64 // Threshold for switching to file writing. size uint64 // Total size of the data written. - bufPool *buffer.Pool // Pool for storing buffers for reuse. + bufPool *pool.Pool // Pool for storing buffers for reuse. buf *buffer.Buffer // Buffer for storing data under the threshold in memory. filename string // Name of the temporary file. file *os.File // File for storing data over the threshold. @@ -66,23 +85,42 @@ func WithThreshold(threshold uint64) Option { return func(w *BufferedFileWriter) { w.threshold = threshold } } +// WithBufferSize sets the buffer size for the BufferedFileWriter. +func WithBufferSize(size PoolSize) Option { + return func(w *BufferedFileWriter) { + switch size { + case Default: + w.bufPool = defaultBufferPool + case Large: + w.bufPool = largeBufferPool + default: + w.bufPool = defaultBufferPool + } + } +} + const defaultThreshold = 10 * 1024 * 1024 // 10MB // New creates a new BufferedFileWriter with the given options. func New(opts ...Option) *BufferedFileWriter { w := &BufferedFileWriter{ threshold: defaultThreshold, state: writeOnly, - bufPool: sharedBufferPool, } + for _, opt := range opts { opt(w) } + if w.bufPool == nil { + w.bufPool = defaultBufferPool + } + return w } // NewFromReader creates a new instance of BufferedFileWriter and writes the content from the provided reader to the writer. func NewFromReader(r io.Reader, opts ...Option) (*BufferedFileWriter, error) { + opts = append(opts, WithBufferSize(Large)) writer := New(opts...) if _, err := io.Copy(writer, r); err != nil && !errors.Is(err, io.EOF) { return nil, fmt.Errorf("error writing to buffered file writer: %w", err) @@ -162,9 +200,12 @@ func (w *BufferedFileWriter) Write(data []byte) (int, error) { // This ensures all the data is in one place - either entirely in the buffer or the file. if bufferLength > 0 { if _, err := w.buf.WriteTo(w.file); err != nil { + if err := os.RemoveAll(w.filename); err != nil { + return 0, fmt.Errorf("failed to remove file: %w", err) + } return 0, err } - w.bufPool.Put(w.buf) + w.buf.Reset() } } diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go index 329bbe4182b5..310b8086c855 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/trufflesecurity/trufflehog/v3/pkg/writers/buffer" + "github.com/trufflesecurity/trufflehog/v3/pkg/buffers/pool" ) func TestBufferedFileWriterNewThreshold(t *testing.T) { @@ -596,7 +596,7 @@ func TestNewFromReaderThresholdExceeded(t *testing.T) { } func TestBufferWriterCloseForWritingWithFile(t *testing.T) { - bufPool := buffer.NewBufferPool() + bufPool := pool.NewBufferPool(defaultBufferSize) buf := bufPool.Get() writer := &BufferedFileWriter{