From 852a5dbff0920dc40d2db29cc3552aaf012c8bfb Mon Sep 17 00:00:00 2001 From: Cheran Mahalingam Date: Tue, 9 Apr 2024 18:46:18 -0400 Subject: [PATCH] sstable: reduce block cache memory fragmentation Currently, the sstable writer contains heuristics to flush sstable blocks once the size reaches a specified threshold. In CRDB this is defined as 32KiB. However, when these blocks are loaded into memory additional metadata is allocated sometimes exceeding the 32KiB threshold. Since CRDB uses jemalloc, these allocations use a 40KiB size class which leads to significant internal fragmentation. In addition, since the system is unaware of these size classes we cannot design heuristics that prioritize reducing memory fragmentation. Reducing internal fragmentation can help reduce CRDB's memory footprint. This commit decrements the target block size to prevent internal fragmentation for small key-value pairs and adds support for optionally specifying size classes to enable a new set of heuristics that will reduce internal fragmentation for workloads with larger key-value pairs. Fixes: #999. --- internal/base/options.go | 7 +- internal/cache/value_cgo.go | 43 +++++++++++ internal/cache/value_invariants.go | 4 + internal/cache/value_normal.go | 45 ++--------- iterator_test.go | 5 +- options.go | 7 ++ sstable/options.go | 6 ++ sstable/testdata/flush_heuristics | 59 +++++++++++++++ sstable/writer.go | 118 ++++++++++++++++++++++++----- sstable/writer_test.go | 44 +++++++++++ 10 files changed, 279 insertions(+), 59 deletions(-) create mode 100644 internal/cache/value_cgo.go create mode 100644 sstable/testdata/flush_heuristics diff --git a/internal/base/options.go b/internal/base/options.go index eff22c7fac..f5f127c5ca 100644 --- a/internal/base/options.go +++ b/internal/base/options.go @@ -6,9 +6,10 @@ package base // SSTable block defaults. const ( - DefaultBlockRestartInterval = 16 - DefaultBlockSize = 4096 - DefaultBlockSizeThreshold = 90 + DefaultBlockRestartInterval = 16 + DefaultBlockSize = 4096 + DefaultBlockSizeThreshold = 90 + SizeClassAwareBlockSizeThreshold = 60 ) // FilterType is the level at which to apply a filter: block or table. diff --git a/internal/cache/value_cgo.go b/internal/cache/value_cgo.go new file mode 100644 index 0000000000..dc0b3943b7 --- /dev/null +++ b/internal/cache/value_cgo.go @@ -0,0 +1,43 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +//go:build ((!invariants && !tracing) || race) && cgo +// +build !invariants,!tracing race +// +build cgo + +package cache + +import ( + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +// ValueMetadataSize denotes the number of bytes of metadata allocated for a +// cache entry. +const ValueMetadataSize = int(unsafe.Sizeof(Value{})) + +func newValue(n int) *Value { + if n == 0 { + return nil + } + + // When we're not performing leak detection, the lifetime of the returned + // Value is exactly the lifetime of the backing buffer and we can manually + // allocate both. + b := manual.New(ValueMetadataSize + n) + v := (*Value)(unsafe.Pointer(&b[0])) + v.buf = b[ValueMetadataSize:] + v.ref.init(1) + return v +} + +func (v *Value) free() { + // When we're not performing leak detection, the Value and buffer were + // allocated contiguously. + n := ValueMetadataSize + cap(v.buf) + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] + v.buf = nil + manual.Free(buf) +} diff --git a/internal/cache/value_invariants.go b/internal/cache/value_invariants.go index 1e30d2714b..abe6ae49f6 100644 --- a/internal/cache/value_invariants.go +++ b/internal/cache/value_invariants.go @@ -15,6 +15,10 @@ import ( "github.com/cockroachdb/pebble/internal/manual" ) +// ValueMetadataSize denotes the number of bytes of metadata allocated for a +// cache entry. +const ValueMetadataSize = 0 + // newValue creates a Value with a manually managed buffer of size n. // // This definition of newValue is used when either the "invariants" or diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go index e03379d53f..eb27012c8a 100644 --- a/internal/cache/value_normal.go +++ b/internal/cache/value_normal.go @@ -2,56 +2,27 @@ // of this source code is governed by a BSD-style license that can be found in // the LICENSE file. -//go:build (!invariants && !tracing) || race +//go:build ((!invariants && !tracing) || race) && !cgo // +build !invariants,!tracing race +// +build !cgo package cache -import ( - "unsafe" - - "github.com/cockroachdb/pebble/internal/manual" -) - -const valueSize = int(unsafe.Sizeof(Value{})) +// ValueMetadataSize denotes the number of bytes of metadata allocated for a +// cache entry. +const ValueMetadataSize = 0 func newValue(n int) *Value { if n == 0 { return nil } - if !cgoEnabled { - // If Cgo is disabled then all memory is allocated from the Go heap and we - // can't play the trick below to combine the Value and buffer allocation. - v := &Value{buf: make([]byte, n)} - v.ref.init(1) - return v - } - - // When we're not performing leak detection, the lifetime of the returned - // Value is exactly the lifetime of the backing buffer and we can manually - // allocate both. - // - // TODO(peter): It may be better to separate the allocation of the value and - // the buffer in order to reduce internal fragmentation in malloc. If the - // buffer is right at a power of 2, adding valueSize might push the - // allocation over into the next larger size. - b := manual.New(valueSize + n) - v := (*Value)(unsafe.Pointer(&b[0])) - v.buf = b[valueSize:] + // Since Cgo is disabled then all memory is allocated from the Go heap we + // can't play the trick below to combine the Value and buffer allocation. + v := &Value{buf: make([]byte, n)} v.ref.init(1) return v } func (v *Value) free() { - if !cgoEnabled { - return - } - - // When we're not performing leak detection, the Value and buffer were - // allocated contiguously. - n := valueSize + cap(v.buf) - buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] - v.buf = nil - manual.Free(buf) } diff --git a/iterator_test.go b/iterator_test.go index c674c9d5f6..54aa838ace 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -1190,7 +1190,10 @@ func TestIteratorBlockIntervalFilter(t *testing.T) { FormatMajorVersion: internalFormatNewest, BlockPropertyCollectors: bpCollectors, } - lo := LevelOptions{BlockSize: 1, IndexBlockSize: 1} + lo := LevelOptions{ + BlockSize: 1, + IndexBlockSize: 1, + } opts.Levels = append(opts.Levels, lo) // Automatic compactions may compact away tombstones from L6, making diff --git a/options.go b/options.go index 11e5c7233a..2016661299 100644 --- a/options.go +++ b/options.go @@ -1060,6 +1060,12 @@ type Options struct { // to temporarily persist data spilled to disk for row-oriented SQL query execution. EnableSQLRowSpillMetrics bool + // AllocatorSizeClasses provides a sorted list containing the supported size + // classes of the underlying memory allocator. This provides hints to the + // sstable block writer's flushing policy to select block sizes that + // preemptively reduce internal fragmentation when loaded into the block cache. + AllocatorSizeClasses []int + // private options are only used by internal tests or are used internally // for facilitating upgrade paths of unconfigurable functionality. private struct { @@ -1970,6 +1976,7 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab writerOpts.FilterPolicy = levelOpts.FilterPolicy writerOpts.FilterType = levelOpts.FilterType writerOpts.IndexBlockSize = levelOpts.IndexBlockSize + writerOpts.AllocatorSizeClasses = o.AllocatorSizeClasses return writerOpts } diff --git a/sstable/options.go b/sstable/options.go index 8d88a32deb..346fd9a74d 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -230,6 +230,12 @@ type WriterOptions struct { // 750MB sstables -- see // https://github.com/cockroachdb/cockroach/issues/117113). DisableValueBlocks bool + + // AllocatorSizeClasses provides a sorted list containing the supported size + // classes of the underlying memory allocator. This provides hints to the + // writer's flushing policy to select block sizes that preemptively reduce + // internal fragmentation when loaded into the block cache. + AllocatorSizeClasses []int } func (o WriterOptions) ensureDefaults() WriterOptions { diff --git a/sstable/testdata/flush_heuristics b/sstable/testdata/flush_heuristics new file mode 100644 index 0000000000..7319a5c2a0 --- /dev/null +++ b/sstable/testdata/flush_heuristics @@ -0,0 +1,59 @@ +# Block size exceeds target block size. +build key-size=0 val-size=0 block-size-with-metadata=64 target-size=64 threshold=59 +---- +true + +# Block size does not exceed threshold size. +build key-size=0 val-size=0 block-size-with-metadata=59 target-size=64 threshold=59 +---- +false + +# New block size exceeds the target size. +build key-size=1 val-size=1 block-size-with-metadata=60 target-size=64 threshold=32 +---- +true + +# New block size does not exceed the target size. +build key-size=1 val-size=1 block-size-with-metadata=40 target-size=64 threshold=32 +---- +false + +# New block size does not exceed the target size with hints enabled. +build key-size=1 val-size=1 block-size-with-metadata=40 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +false + +# Block size does meet size threshold with hints enabled. +build key-size=1 val-size=1 block-size-with-metadata=38 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +false + +# New block size reduces internal fragmentation. +build key-size=1 val-size=60 block-size-with-metadata=40 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +false + +# New block size increases internal fragmentation. +build key-size=1 val-size=40 block-size-with-metadata=40 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +true + +# Block size target exceeded with hints enabled. +build key-size=1 val-size=1 block-size-with-metadata=64 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +true + +# Block size target exceeded, however, new block would reduce internal fragmentation. +build key-size=1 val-size=1 block-size-with-metadata=70 target-size=64 threshold=0 hints=8,16,32,64,128 +---- +false + +# Fall back to heuristics with hints disabled when size class is limited. +build key-size=1 val-size=1 block-size-with-metadata=59 target-size=64 threshold=59 hints=8,16,32 +---- +false + +# Flush when new size class could not be computed. +build key-size=1 val-size=60 block-size-with-metadata=50 target-size=64 threshold=0 hints=8,16,32,64 +---- +true diff --git a/sstable/writer.go b/sstable/writer.go index bb076d3dcd..34e5228466 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -10,6 +10,7 @@ import ( "fmt" "math" "runtime" + "slices" "sort" "sync" @@ -210,6 +211,8 @@ type Writer struct { // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff // WriterOptions.DisableValueBlocks was true. valueBlockWriter *valueBlockWriter + + allocatorSizeClasses []int } type pointKeyInfo struct { @@ -425,7 +428,7 @@ func newIndexBlockBuf(useMutex bool) *indexBlockBuf { } func (i *indexBlockBuf) shouldFlush( - sep InternalKey, valueLen, targetBlockSize, sizeThreshold int, + sep InternalKey, valueLen, targetBlockSize, sizeThreshold int, sizeClassHints []int, ) bool { if i.size.useMutex { i.size.mu.Lock() @@ -433,9 +436,9 @@ func (i *indexBlockBuf) shouldFlush( } nEntries := i.size.estimate.numTotalEntries() - return shouldFlush( - sep, valueLen, i.restartInterval, int(i.size.estimate.size()), - int(nEntries), targetBlockSize, sizeThreshold) + return shouldFlushWithHints( + sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()), + int(nEntries), targetBlockSize, sizeThreshold, sizeClassHints) } func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) { @@ -653,11 +656,11 @@ func (d *dataBlockBuf) compressAndChecksum(c Compression) { } func (d *dataBlockBuf) shouldFlush( - key InternalKey, valueLen, targetBlockSize, sizeThreshold int, + key InternalKey, valueLen, targetBlockSize, sizeThreshold int, sizeClassHints []int, ) bool { - return shouldFlush( - key, valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(), - d.dataBlock.nEntries, targetBlockSize, sizeThreshold) + return shouldFlushWithHints( + key.Size(), valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(), + d.dataBlock.nEntries, targetBlockSize, sizeThreshold, sizeClassHints) } type indexBlockAndBlockProperties struct { @@ -1400,7 +1403,7 @@ func (w *Writer) flush(key InternalKey) error { // to determine that we are going to flush the index block from the Writer // client. shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush( - sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, + sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, w.allocatorSizeClasses, ) var indexProps []byte @@ -1449,7 +1452,7 @@ func (w *Writer) flush(key InternalKey) error { } func (w *Writer) maybeFlush(key InternalKey, valueLen int) error { - if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold) { + if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold, w.allocatorSizeClasses) { return nil } @@ -1584,7 +1587,7 @@ func (w *Writer) addIndexEntrySep( ) error { shouldFlush := supportsTwoLevelIndex( w.tableFormat) && w.indexBlock.shouldFlush( - sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, + sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, w.allocatorSizeClasses, ) var flushableIndexBlock *indexBlockBuf var props []byte @@ -1610,15 +1613,81 @@ func (w *Writer) addIndexEntrySep( return err } -func shouldFlush( - key InternalKey, - valueLen int, +func shouldFlushWithHints( + keyLen, valueLen int, restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int, + sizeClassHints []int, ) bool { if numEntries == 0 { return false } + // If we are not informed about the memory allocator's size classes we fall + // back to a simple set of flush heuristics that are unaware of internal + // fragmentation in block cache allocations. + if len(sizeClassHints) == 0 { + return shouldFlushWithoutHints( + keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold) + } + + // For size-class aware flushing we need to account for the metadata that is + // allocated when this block is loaded into the block cache. For instance, if + // a block has size 1020B it may fit within a 1024B class. However, when + // loaded into the block cache we also allocate space for the cache entry + // metadata. The new allocation of size ~1052B may now only fit within a + // 2048B class, which increases fragmentation. + blockSizeWithMetadata := estimatedBlockSize + cache.ValueMetadataSize + + // For the fast path we can avoid computing the exact varint encoded + // key-value pair size. Instead, we combine the key-value pair size with an + // upper-bound estimate of the associated metadata (4B restart point, 4B + // shared prefix length, 5B varint unshared key size, 5B varint value size). + newEstimatedSize := blockSizeWithMetadata + keyLen + valueLen + 18 + // Our new block size estimate disregards key prefix compression. This puts + // us at risk of overestimating the size and flushing small blocks. We + // mitigate this by imposing a minimum size restriction. + sizeClassAwareThreshold := (targetBlockSize*base.SizeClassAwareBlockSizeThreshold + 99) / 100 + if blockSizeWithMetadata <= sizeClassAwareThreshold || newEstimatedSize <= targetBlockSize { + return false + } + + sizeClass, ok := blockSizeClass(blockSizeWithMetadata, sizeClassHints) + // If the block size could not be mapped to a size class we fall back to + // using a simpler set of flush heuristics. + if !ok { + return shouldFlushWithoutHints( + keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold) + } + + // Tighter upper-bound estimate of the metadata stored with the next + // key-value pair. + newSize := blockSizeWithMetadata + keyLen + valueLen + if numEntries%restartInterval == 0 { + newSize += 4 + } + newSize += 4 // varint for shared prefix length + newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes + newSize += uvarintLen(uint32(valueLen)) // varint for value size + + if blockSizeWithMetadata < targetBlockSize { + newSizeClass, ok := blockSizeClass(newSize, sizeClassHints) + if !ok || newSizeClass-newSize >= sizeClass-blockSizeWithMetadata { + // Although the block hasn't reached the target size, waiting to insert the + // next entry would exceed the target and increase memory fragmentation. + return true + } + return false + } + + // Flush if inserting the next entry bumps the block size to the memory + // allocator's next size class. + return newSize > sizeClass +} + +func shouldFlushWithoutHints( + keyLen, valueLen int, + restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int, +) bool { if estimatedBlockSize >= targetBlockSize { return true } @@ -1630,17 +1699,29 @@ func shouldFlush( return false } - newSize := estimatedBlockSize + key.Size() + valueLen + newSize := estimatedBlockSize + keyLen + valueLen if numEntries%restartInterval == 0 { newSize += 4 } - newSize += 4 // varint for shared prefix length - newSize += uvarintLen(uint32(key.Size())) // varint for unshared key bytes - newSize += uvarintLen(uint32(valueLen)) // varint for value size + newSize += 4 // varint for shared prefix length + newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes + newSize += uvarintLen(uint32(valueLen)) // varint for value size // Flush if the block plus the new entry is larger than the target size. return newSize > targetBlockSize } +// blockSizeClass returns the smallest memory allocator size class that could +// hold a block of a given size and returns a boolean indicating whether an +// appropriate size class was found. It is useful for computing the potential +// space wasted by an allocation. +func blockSizeClass(blockSize int, sizeClassHints []int) (int, bool) { + sizeClassIdx, _ := slices.BinarySearch(sizeClassHints, blockSize) + if sizeClassIdx == len(sizeClassHints) { + return -1, false + } + return sizeClassHints[sizeClassIdx], true +} + func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) { if len(k.UserKey) == 0 { return a, k @@ -2218,6 +2299,7 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write Cmp: o.Comparer.Compare, Format: o.Comparer.FormatKey, }, + allocatorSizeClasses: o.AllocatorSizeClasses, } if w.tableFormat >= TableFormatPebblev3 { w.shortAttributeExtractor = o.ShortAttributeExtractor diff --git a/sstable/writer_test.go b/sstable/writer_test.go index a475ce7067..e297b2226a 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -665,6 +665,50 @@ func TestWriterClearCache(t *testing.T) { require.NoError(t, r.Close()) } +func TestWriterFlushHeuristics(t *testing.T) { + datadriven.RunTest(t, "testdata/flush_heuristics", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "build": + var keySize, valSize, blockSize, targetSize, sizeThreshold int + td.ScanArgs(t, "key-size", &keySize) + td.ScanArgs(t, "val-size", &valSize) + td.ScanArgs(t, "block-size-with-metadata", &blockSize) + td.ScanArgs(t, "target-size", &targetSize) + td.ScanArgs(t, "threshold", &sizeThreshold) + + var sizeClasses []int + if td.HasArg("hints") { + var sizeClassHints string + td.ScanArgs(t, "hints", &sizeClassHints) + sizeStrClasses := strings.Split(sizeClassHints, ",") + for _, strClass := range sizeStrClasses { + size, err := strconv.Atoi(strClass) + require.NoError(t, err) + sizeClasses = append(sizeClasses, size) + } + } + if len(sizeClasses) > 0 { + blockSize -= cache.ValueMetadataSize + } + + flush := shouldFlushWithHints( + keySize, + valSize, + base.DefaultBlockRestartInterval, + blockSize, + 1, /* numEntries */ + targetSize, + sizeThreshold, + sizeClasses, + ) + return strconv.FormatBool(flush) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + type discardFile struct { wrote int64 }