Skip to content

Commit

Permalink
[not-fixup] - Reduce memory consumption for Buffered File Writer (#2377)
Browse files Browse the repository at this point in the history
* correctly use the buffered file writer

* use value from source

* reorder fields

* use only the DetectorKey as a map field

* correctly use the buffered file writer

* use value from source

* reorder fields

* add tests and update

* Fix issue with buffer slices growing

* fix test

* fix

* add singleton

* use shared pool

* optimize

* rename and cleanup

* use correct calculation to grow buffer

* only grow if needed

* address comments

* remove unused

* remove

* rip out Grow

* address coment

* use 2k default buffer

* update comment allow large buffers to be garbage collected
  • Loading branch information
ahrav authored Feb 6, 2024
1 parent 8104611 commit 8433342
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 36 deletions.
1 change: 0 additions & 1 deletion pkg/gitparse/gitparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
}
// Create a new currentDiff and currentCommit
currentDiff = diff()
// currentDiff = NewDiff(withCustomContentWriter(c.contentWriter()))
currentCommit = &Commit{Message: strings.Builder{}}
// Check that the commit line contains a hash and set it.
if len(line) >= 47 {
Expand Down
130 changes: 95 additions & 35 deletions pkg/writers/buffered_file_writer/bufferedfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,59 @@ import (
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)

// bufferPool is used to store buffers for reuse.
var bufferPool = sync.Pool{
// TODO: Consider growing the buffer before returning it if we can find an optimal size.
// Ideally the size would cover the majority of cases without being too large.
// This would avoid the need to grow the buffer when writing to it, reducing allocations.
New: func() any { return new(bytes.Buffer) },
type bufPoolOpt func(pool *bufferPool)

type bufferPool struct {
bufferSize uint32
*sync.Pool
}

const defaultBufferSize = 2 << 10 // 2KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize}

for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
buf := new(bytes.Buffer)
buf.Grow(int(pool.bufferSize))
return buf
},
}

return pool
}

// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters.
// This allows for efficient reuse of buffers across multiple writers.
var sharedBufferPool *bufferPool

func init() { sharedBufferPool = newBufferPool() }

func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer {
buf, ok := bp.Pool.Get().(*bytes.Buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize))
}

return buf
}

func (bp *bufferPool) put(buf *bytes.Buffer) {
// If the buffer is more than twice the default size, release it for garbage collection.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
buf = nil // Release the large buffer for garbage collection.
} else {
// Reset the buffer to clear any existing data.
buf.Reset()
}

bp.Put(buf)
}

// state represents the current mode of BufferedFileWriter.
Expand All @@ -39,7 +86,8 @@ type BufferedFileWriter struct {

state state // Current state of the writer. (writeOnly or readOnly)

buf bytes.Buffer // Buffer for storing data under the threshold in memory.
bufPool *bufferPool // Pool for storing buffers for reuse.
buf *bytes.Buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold.
}
Expand All @@ -55,7 +103,11 @@ func WithThreshold(threshold uint64) Option {
// New creates a new BufferedFileWriter with the given options.
func New(opts ...Option) *BufferedFileWriter {
const defaultThreshold = 10 * 1024 * 1024 // 10MB
w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly}
w := &BufferedFileWriter{
threshold: defaultThreshold,
state: writeOnly,
bufPool: sharedBufferPool,
}
for _, opt := range opts {
opt(w)
}
Expand All @@ -78,17 +130,16 @@ func (w *BufferedFileWriter) String() (string, error) {
}
defer file.Close()

// Create a buffer large enough to hold file data and additional buffer data, if any.
fileSize := w.size
buf := bytes.NewBuffer(make([]byte, 0, fileSize))

var buf bytes.Buffer
// Read the file contents into the buffer.
if _, err := io.Copy(buf, file); err != nil {
if _, err := io.CopyBuffer(&buf, file, nil); err != nil {
return "", fmt.Errorf("failed to read file contents: %w", err)
}

// Append buffer data, if any, to the end of the file contents.
buf.Write(w.buf.Bytes())
if _, err := w.buf.WriteTo(&buf); err != nil {
return "", err
}

return buf.String(), nil
}
Expand All @@ -100,33 +151,44 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
}

size := uint64(len(data))

if w.buf == nil || w.buf.Len() == 0 {
w.buf = w.bufPool.get(ctx)
}

bufferLength := w.buf.Len()

defer func() {
w.size += size
ctx.Logger().V(4).Info(
"write complete",
"data_size", size,
"content_size", w.buf.Len(),
"content_size", bufferLength,
"total_size", w.size,
)
}()

if w.buf.Len() == 0 {
bufPtr, ok := bufferPool.Get().(*bytes.Buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
bufPtr = new(bytes.Buffer)
}
bufPtr.Reset() // Reset the buffer to clear any existing data
w.buf = *bufPtr
}

if uint64(w.buf.Len())+size <= w.threshold {
totalSizeNeeded := uint64(bufferLength) + uint64(len(data))
if totalSizeNeeded <= w.threshold {
// If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info(
"writing to buffer",
"data_size", size,
"content_size", w.buf.Len(),
"content_size", bufferLength,
)

availableSpace := w.buf.Cap() - bufferLength
growSize := int(totalSizeNeeded) - bufferLength
if growSize > availableSpace {
ctx.Logger().V(4).Info(
"buffer size exceeded, growing buffer",
"current_size", bufferLength,
"new_size", totalSizeNeeded,
"available_space", availableSpace,
"grow_size", growSize,
)
}

return w.buf.Write(data)
}

Expand All @@ -143,14 +205,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error

// Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the data is in one place - either entirely in the buffer or the file.
if w.buf.Len() > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len())
if _, err := w.file.Write(w.buf.Bytes()); err != nil {
if bufferLength > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength)
if _, err := w.buf.WriteTo(w.file); err != nil {
return 0, err
}
// Reset the buffer to clear any existing data and return it to the pool.
w.buf.Reset()
bufferPool.Put(&w.buf)
w.bufPool.put(w.buf)
}
}
ctx.Logger().V(4).Info("writing to file", "data_size", size)
Expand All @@ -167,7 +227,7 @@ func (w *BufferedFileWriter) CloseForWriting() error {
}

if w.buf.Len() > 0 {
_, err := w.file.Write(w.buf.Bytes())
_, err := w.buf.WriteTo(w.file)
if err != nil {
return err
}
Expand Down Expand Up @@ -199,7 +259,7 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) {
// Data is in memory.
return &bufferReadCloser{
Reader: bytes.NewReader(w.buf.Bytes()),
onClose: func() { bufferPool.Put(&w.buf) },
onClose: func() { w.bufPool.put(w.buf) },
}, nil
}

Expand Down
Loading

0 comments on commit 8433342

Please sign in to comment.