From 843334222c0abd34999b975ec992fd3124993c09 Mon Sep 17 00:00:00 2001 From: ahrav Date: Tue, 6 Feb 2024 09:22:25 -0800 Subject: [PATCH] [not-fixup] - Reduce memory consumption for Buffered File Writer (#2377) * correctly use the buffered file writer * use value from source * reorder fields * use only the DetectorKey as a map field * correctly use the buffered file writer * use value from source * reorder fields * add tests and update * Fix issue with buffer slices growing * fix test * fix * add singleton * use shared pool * optimize * rename and cleanup * use correct calculation to grow buffer * only grow if needed * address comments * remove unused * remove * rip out Grow * address coment * use 2k default buffer * update comment allow large buffers to be garbage collected --- pkg/gitparse/gitparse.go | 1 - .../bufferedfilewriter.go | 130 +++++++++--- .../bufferedfilewriter_test.go | 200 ++++++++++++++++++ 3 files changed, 295 insertions(+), 36 deletions(-) diff --git a/pkg/gitparse/gitparse.go b/pkg/gitparse/gitparse.go index 627546793e2a..1de5d7feacd3 100644 --- a/pkg/gitparse/gitparse.go +++ b/pkg/gitparse/gitparse.go @@ -449,7 +449,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch } // Create a new currentDiff and currentCommit currentDiff = diff() - // currentDiff = NewDiff(withCustomContentWriter(c.contentWriter())) currentCommit = &Commit{Message: strings.Builder{}} // Check that the commit line contains a hash and set it. if len(line) >= 47 { diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter.go b/pkg/writers/buffered_file_writer/bufferedfilewriter.go index e35c1ac3edb3..09a3c0396c54 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter.go @@ -13,12 +13,59 @@ import ( "github.com/trufflesecurity/trufflehog/v3/pkg/context" ) -// bufferPool is used to store buffers for reuse. -var bufferPool = sync.Pool{ - // TODO: Consider growing the buffer before returning it if we can find an optimal size. - // Ideally the size would cover the majority of cases without being too large. - // This would avoid the need to grow the buffer when writing to it, reducing allocations. - New: func() any { return new(bytes.Buffer) }, +type bufPoolOpt func(pool *bufferPool) + +type bufferPool struct { + bufferSize uint32 + *sync.Pool +} + +const defaultBufferSize = 2 << 10 // 2KB +func newBufferPool(opts ...bufPoolOpt) *bufferPool { + pool := &bufferPool{bufferSize: defaultBufferSize} + + for _, opt := range opts { + opt(pool) + } + pool.Pool = &sync.Pool{ + New: func() any { + buf := new(bytes.Buffer) + buf.Grow(int(pool.bufferSize)) + return buf + }, + } + + return pool +} + +// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. +// This allows for efficient reuse of buffers across multiple writers. +var sharedBufferPool *bufferPool + +func init() { sharedBufferPool = newBufferPool() } + +func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { + buf, ok := bp.Pool.Get().(*bytes.Buffer) + if !ok { + ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") + buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize)) + } + + return buf +} + +func (bp *bufferPool) put(buf *bytes.Buffer) { + // If the buffer is more than twice the default size, release it for garbage collection. + // This prevents us from returning very large buffers to the pool. + const maxAllowedCapacity = 2 * defaultBufferSize + if buf.Cap() > maxAllowedCapacity { + buf = nil // Release the large buffer for garbage collection. + } else { + // Reset the buffer to clear any existing data. + buf.Reset() + } + + bp.Put(buf) } // state represents the current mode of BufferedFileWriter. @@ -39,7 +86,8 @@ type BufferedFileWriter struct { state state // Current state of the writer. (writeOnly or readOnly) - buf bytes.Buffer // Buffer for storing data under the threshold in memory. + bufPool *bufferPool // Pool for storing buffers for reuse. + buf *bytes.Buffer // Buffer for storing data under the threshold in memory. filename string // Name of the temporary file. file io.WriteCloser // File for storing data over the threshold. } @@ -55,7 +103,11 @@ func WithThreshold(threshold uint64) Option { // New creates a new BufferedFileWriter with the given options. func New(opts ...Option) *BufferedFileWriter { const defaultThreshold = 10 * 1024 * 1024 // 10MB - w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly} + w := &BufferedFileWriter{ + threshold: defaultThreshold, + state: writeOnly, + bufPool: sharedBufferPool, + } for _, opt := range opts { opt(w) } @@ -78,17 +130,16 @@ func (w *BufferedFileWriter) String() (string, error) { } defer file.Close() - // Create a buffer large enough to hold file data and additional buffer data, if any. - fileSize := w.size - buf := bytes.NewBuffer(make([]byte, 0, fileSize)) - + var buf bytes.Buffer // Read the file contents into the buffer. - if _, err := io.Copy(buf, file); err != nil { + if _, err := io.CopyBuffer(&buf, file, nil); err != nil { return "", fmt.Errorf("failed to read file contents: %w", err) } // Append buffer data, if any, to the end of the file contents. - buf.Write(w.buf.Bytes()) + if _, err := w.buf.WriteTo(&buf); err != nil { + return "", err + } return buf.String(), nil } @@ -100,33 +151,44 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error } size := uint64(len(data)) + + if w.buf == nil || w.buf.Len() == 0 { + w.buf = w.bufPool.get(ctx) + } + + bufferLength := w.buf.Len() + defer func() { w.size += size ctx.Logger().V(4).Info( "write complete", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, "total_size", w.size, ) }() - if w.buf.Len() == 0 { - bufPtr, ok := bufferPool.Get().(*bytes.Buffer) - if !ok { - ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") - bufPtr = new(bytes.Buffer) - } - bufPtr.Reset() // Reset the buffer to clear any existing data - w.buf = *bufPtr - } - - if uint64(w.buf.Len())+size <= w.threshold { + totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) + if totalSizeNeeded <= w.threshold { // If the total size is within the threshold, write to the buffer. ctx.Logger().V(4).Info( "writing to buffer", "data_size", size, - "content_size", w.buf.Len(), + "content_size", bufferLength, ) + + availableSpace := w.buf.Cap() - bufferLength + growSize := int(totalSizeNeeded) - bufferLength + if growSize > availableSpace { + ctx.Logger().V(4).Info( + "buffer size exceeded, growing buffer", + "current_size", bufferLength, + "new_size", totalSizeNeeded, + "available_space", availableSpace, + "grow_size", growSize, + ) + } + return w.buf.Write(data) } @@ -143,14 +205,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error // Transfer existing data in buffer to the file, then clear the buffer. // This ensures all the data is in one place - either entirely in the buffer or the file. - if w.buf.Len() > 0 { - ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) - if _, err := w.file.Write(w.buf.Bytes()); err != nil { + if bufferLength > 0 { + ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength) + if _, err := w.buf.WriteTo(w.file); err != nil { return 0, err } - // Reset the buffer to clear any existing data and return it to the pool. - w.buf.Reset() - bufferPool.Put(&w.buf) + w.bufPool.put(w.buf) } } ctx.Logger().V(4).Info("writing to file", "data_size", size) @@ -167,7 +227,7 @@ func (w *BufferedFileWriter) CloseForWriting() error { } if w.buf.Len() > 0 { - _, err := w.file.Write(w.buf.Bytes()) + _, err := w.buf.WriteTo(w.file) if err != nil { return err } @@ -199,7 +259,7 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) { // Data is in memory. return &bufferReadCloser{ Reader: bytes.NewReader(w.buf.Bytes()), - onClose: func() { bufferPool.Put(&w.buf) }, + onClose: func() { w.bufPool.put(w.buf) }, }, nil } diff --git a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go index 471b9389dd16..1dcc07e69bf8 100644 --- a/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go +++ b/pkg/writers/buffered_file_writer/bufferedfilewriter_test.go @@ -1,6 +1,7 @@ package bufferedfilewriter import ( + "bytes" "os" "testing" "time" @@ -89,12 +90,151 @@ func TestBufferedFileWriterString(t *testing.T) { got, err := writer.String() assert.NoError(t, err) + err = writer.CloseForWriting() + assert.NoError(t, err) assert.Equal(t, tc.expectedStr, got, "String content mismatch") }) } } +const ( + smallBuffer = 2 << 5 // 64B + mediumBuffer = 2 << 10 // 2KB + smallFile = 2 << 25 // 32MB + mediumFile = 2 << 28 // 256MB +) + +func BenchmarkBufferedFileWriterString_BufferOnly_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallBuffer) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferOnly_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumBuffer) + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_OnlyFile_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_OnlyFile_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferWithFile_Small(b *testing.B) { + data := bytes.Repeat([]byte("a"), smallFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + // Write again so we also fill up the buffer. + _, err = writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func BenchmarkBufferedFileWriterString_BufferWithFile_Medium(b *testing.B) { + data := bytes.Repeat([]byte("a"), mediumFile) + + ctx := context.Background() + writer := New() + + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + + // Write again so we also fill up the buffer. + _, err = writer.Write(ctx, data) + assert.NoError(b, err) + + benchmarkBufferedFileWriterString(b, writer) + + err = writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() +} + +func benchmarkBufferedFileWriterString(b *testing.B, w *BufferedFileWriter) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := w.String() + assert.NoError(b, err) + } + b.StopTimer() +} + func TestBufferedFileWriterLen(t *testing.T) { t.Parallel() tests := []struct { @@ -306,3 +446,63 @@ func TestBufferedFileWriterWriteInReadOnlyState(t *testing.T) { _, err := writer.Write(context.Background(), []byte("should fail")) assert.Error(t, err) } + +func BenchmarkBufferedFileWriterWriteLarge(b *testing.B) { + ctx := context.Background() + data := make([]byte, 1024*1024*10) // 10MB + for i := range data { + data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Threshold is smaller than the data size, data should get flushed to the file. + writer := New(WithThreshold(1024)) + + b.StartTimer() + { + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + } + b.StopTimer() + + // Ensure proper cleanup after each write operation, including closing the file + err := writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() + } +} + +func BenchmarkBufferedFileWriterWriteSmall(b *testing.B) { + ctx := context.Background() + data := make([]byte, 1024*1024) // 1MB + for i := range data { + data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Threshold is the same as the buffer size, data should always be written to the buffer. + writer := New(WithThreshold(1024 * 1024)) + + b.StartTimer() + { + _, err := writer.Write(ctx, data) + assert.NoError(b, err) + } + b.StopTimer() + + // Ensure proper cleanup after each write operation, including closing the file. + err := writer.CloseForWriting() + assert.NoError(b, err) + + rc, err := writer.ReadCloser() + assert.NoError(b, err) + rc.Close() + } +}