diff --git a/internal/base/options.go b/internal/base/options.go index eff22c7fac..f5f127c5ca 100644 --- a/internal/base/options.go +++ b/internal/base/options.go @@ -6,9 +6,10 @@ package base // SSTable block defaults. const ( - DefaultBlockRestartInterval = 16 - DefaultBlockSize = 4096 - DefaultBlockSizeThreshold = 90 + DefaultBlockRestartInterval = 16 + DefaultBlockSize = 4096 + DefaultBlockSizeThreshold = 90 + SizeClassAwareBlockSizeThreshold = 60 ) // FilterType is the level at which to apply a filter: block or table. diff --git a/internal/cache/value.go b/internal/cache/value.go index 6d2cae15e5..b8203e6deb 100644 --- a/internal/cache/value.go +++ b/internal/cache/value.go @@ -4,6 +4,12 @@ package cache +import "unsafe" + +// ValueMetadataSize denotes the number of bytes of metadata allocated for a +// cache entry. +const ValueMetadataSize = int(unsafe.Sizeof(Value{})) + // Value holds a reference counted immutable value. type Value struct { buf []byte diff --git a/internal/cache/value_cgo.go b/internal/cache/value_cgo.go new file mode 100644 index 0000000000..6357638e09 --- /dev/null +++ b/internal/cache/value_cgo.go @@ -0,0 +1,39 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +//go:build ((!invariants && !tracing) || race) && cgo +// +build !invariants,!tracing race +// +build cgo + +package cache + +import ( + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +func newValue(n int) *Value { + if n == 0 { + return nil + } + + // When we're not performing leak detection, the lifetime of the returned + // Value is exactly the lifetime of the backing buffer and we can manually + // allocate both. + b := manual.New(ValueMetadataSize + n) + v := (*Value)(unsafe.Pointer(&b[0])) + v.buf = b[ValueMetadataSize:] + v.ref.init(1) + return v +} + +func (v *Value) free() { + // When we're not performing leak detection, the Value and buffer were + // allocated contiguously. + n := ValueMetadataSize + cap(v.buf) + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] + v.buf = nil + manual.Free(buf) +} diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go index e03379d53f..1926b757ea 100644 --- a/internal/cache/value_normal.go +++ b/internal/cache/value_normal.go @@ -2,56 +2,23 @@ // of this source code is governed by a BSD-style license that can be found in // the LICENSE file. -//go:build (!invariants && !tracing) || race +//go:build ((!invariants && !tracing) || race) && !cgo // +build !invariants,!tracing race +// +build !cgo package cache -import ( - "unsafe" - - "github.com/cockroachdb/pebble/internal/manual" -) - -const valueSize = int(unsafe.Sizeof(Value{})) - func newValue(n int) *Value { if n == 0 { return nil } - if !cgoEnabled { - // If Cgo is disabled then all memory is allocated from the Go heap and we - // can't play the trick below to combine the Value and buffer allocation. - v := &Value{buf: make([]byte, n)} - v.ref.init(1) - return v - } - - // When we're not performing leak detection, the lifetime of the returned - // Value is exactly the lifetime of the backing buffer and we can manually - // allocate both. - // - // TODO(peter): It may be better to separate the allocation of the value and - // the buffer in order to reduce internal fragmentation in malloc. If the - // buffer is right at a power of 2, adding valueSize might push the - // allocation over into the next larger size. - b := manual.New(valueSize + n) - v := (*Value)(unsafe.Pointer(&b[0])) - v.buf = b[valueSize:] + // Since Cgo is disabled then all memory is allocated from the Go heap we + // can't play the trick below to combine the Value and buffer allocation. + v := &Value{buf: make([]byte, n)} v.ref.init(1) return v } func (v *Value) free() { - if !cgoEnabled { - return - } - - // When we're not performing leak detection, the Value and buffer were - // allocated contiguously. - n := valueSize + cap(v.buf) - buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] - v.buf = nil - manual.Free(buf) } diff --git a/options.go b/options.go index 11e5c7233a..2016661299 100644 --- a/options.go +++ b/options.go @@ -1060,6 +1060,12 @@ type Options struct { // to temporarily persist data spilled to disk for row-oriented SQL query execution. EnableSQLRowSpillMetrics bool + // AllocatorSizeClasses provides a sorted list containing the supported size + // classes of the underlying memory allocator. This provides hints to the + // sstable block writer's flushing policy to select block sizes that + // preemptively reduce internal fragmentation when loaded into the block cache. + AllocatorSizeClasses []int + // private options are only used by internal tests or are used internally // for facilitating upgrade paths of unconfigurable functionality. private struct { @@ -1970,6 +1976,7 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab writerOpts.FilterPolicy = levelOpts.FilterPolicy writerOpts.FilterType = levelOpts.FilterType writerOpts.IndexBlockSize = levelOpts.IndexBlockSize + writerOpts.AllocatorSizeClasses = o.AllocatorSizeClasses return writerOpts } diff --git a/sstable/options.go b/sstable/options.go index 8d88a32deb..4110ca54dd 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -132,9 +132,17 @@ type WriterOptions struct { // specified percentage of the target block size and adding the next entry // would cause the block to be larger than the target block size. // - // The default value is 90 + // The default value is 90. BlockSizeThreshold int + // SizeClassAwareThreshold imposes a minimum block size restriction for blocks + // to be flushed, that is computed as the percentage of the target block size. + // Note that this threshold takes precedence over BlockSizeThreshold when + // valid AllocatorSizeClasses are specified. + // + // The default value is 60. + SizeClassAwareThreshold int + // Cache is used to cache uncompressed blocks from sstables. // // The default is a nil cache. @@ -230,6 +238,12 @@ type WriterOptions struct { // 750MB sstables -- see // https://github.com/cockroachdb/cockroach/issues/117113). DisableValueBlocks bool + + // AllocatorSizeClasses provides a sorted list containing the supported size + // classes of the underlying memory allocator. This provides hints to the + // writer's flushing policy to select block sizes that preemptively reduce + // internal fragmentation when loaded into the block cache. + AllocatorSizeClasses []int } func (o WriterOptions) ensureDefaults() WriterOptions { @@ -242,6 +256,9 @@ func (o WriterOptions) ensureDefaults() WriterOptions { if o.BlockSizeThreshold <= 0 { o.BlockSizeThreshold = base.DefaultBlockSizeThreshold } + if o.SizeClassAwareThreshold <= 0 { + o.SizeClassAwareThreshold = base.SizeClassAwareBlockSizeThreshold + } if o.Comparer == nil { o.Comparer = base.DefaultComparer } diff --git a/sstable/testdata/flush_heuristics b/sstable/testdata/flush_heuristics new file mode 100644 index 0000000000..1e65181ee2 --- /dev/null +++ b/sstable/testdata/flush_heuristics @@ -0,0 +1,59 @@ +# Block size exceeds target block size. +build key-size=0 val-size=0 block-size=64 target-size=64 threshold=59 size-class-threshold=0 +---- +true + +# Block size does not exceed threshold size. +build key-size=0 val-size=0 block-size=59 target-size=64 threshold=59 size-class-threshold=0 +---- +false + +# New block size exceeds the target size. +build key-size=1 val-size=1 block-size=60 target-size=64 threshold=32 size-class-threshold=0 +---- +true + +# New block size does not exceed the target size. +build key-size=1 val-size=1 block-size=40 target-size=64 threshold=32 size-class-threshold=0 +---- +false + +# New block size does not exceed the target size with hints enabled. +build key-size=1 val-size=1 block-size=8 target-size=64 threshold=0 size-class-threshold=0 hints=8,16,32,64,128 +---- +false + +# Block size does meet size class aware threshold with hints enabled. +build key-size=1 val-size=1 block-size=6 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64,128 +---- +false + +# New block size reduces internal fragmentation. +build key-size=1 val-size=60 block-size=8 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64,128 +---- +false + +# New block size increases internal fragmentation. +build key-size=1 val-size=40 block-size=8 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64,128 +---- +true + +# Block size target exceeded with hints enabled. +build key-size=1 val-size=1 block-size=32 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64,128 +---- +true + +# Block size target exceeded, however, new block would reduce internal fragmentation. +build key-size=1 val-size=1 block-size=38 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64,128 +---- +false + +# Fall back to heuristics with hints disabled when size class is limited. +build key-size=1 val-size=1 block-size=27 target-size=64 threshold=59 size-class-threshold=38 hints=8,16,32 +---- +false + +# Fall back to heuristics with hints disabled when new size class could not be computed. +build key-size=1 val-size=60 block-size=18 target-size=64 threshold=0 size-class-threshold=38 hints=8,16,32,64 +---- +false diff --git a/sstable/writer.go b/sstable/writer.go index bb076d3dcd..0766477b42 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -10,6 +10,7 @@ import ( "fmt" "math" "runtime" + "slices" "sort" "sync" @@ -110,6 +111,16 @@ func (m *WriterMetadata) updateSeqNum(seqNum uint64) { } } +// flushDecisionOptions holds parameters to inform the sstable block flushing +// heuristics. +type flushDecisionOptions struct { + blockSize int + blockSizeThreshold int + // sizeClassAwareThreshold takes precedence over blockSizeThreshold when the + // Writer is aware of the allocator's size classes. + sizeClassAwareThreshold int +} + // Writer is a table writer. type Writer struct { writable objstorage.Writable @@ -120,23 +131,23 @@ type Writer struct { // collisions. cacheID uint64 fileNum base.DiskFileNum + // dataBlockOptions and indexBlockOptions are used to configure the sstable + // block flush heuristics. + dataBlockOptions flushDecisionOptions + indexBlockOptions flushDecisionOptions // The following fields are copied from Options. - blockSize int - blockSizeThreshold int - indexBlockSize int - indexBlockSizeThreshold int - compare Compare - split Split - formatKey base.FormatKey - compression Compression - separator Separator - successor Successor - tableFormat TableFormat - isStrictObsolete bool - writingToLowestLevel bool - cache *cache.Cache - restartInterval int - checksumType ChecksumType + compare Compare + split Split + formatKey base.FormatKey + compression Compression + separator Separator + successor Successor + tableFormat TableFormat + isStrictObsolete bool + writingToLowestLevel bool + cache *cache.Cache + restartInterval int + checksumType ChecksumType // disableKeyOrderChecks disables the checks that keys are added to an // sstable in order. It is intended for internal use only in the construction // of invalid sstables for testing. See tool/make_test_sstables.go. @@ -210,6 +221,8 @@ type Writer struct { // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff // WriterOptions.DisableValueBlocks was true. valueBlockWriter *valueBlockWriter + + allocatorSizeClasses []int } type pointKeyInfo struct { @@ -425,7 +438,7 @@ func newIndexBlockBuf(useMutex bool) *indexBlockBuf { } func (i *indexBlockBuf) shouldFlush( - sep InternalKey, valueLen, targetBlockSize, sizeThreshold int, + sep InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int, ) bool { if i.size.useMutex { i.size.mu.Lock() @@ -433,9 +446,9 @@ func (i *indexBlockBuf) shouldFlush( } nEntries := i.size.estimate.numTotalEntries() - return shouldFlush( - sep, valueLen, i.restartInterval, int(i.size.estimate.size()), - int(nEntries), targetBlockSize, sizeThreshold) + return shouldFlushWithHints( + sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()), + int(nEntries), flushOptions, sizeClassHints) } func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) { @@ -653,11 +666,11 @@ func (d *dataBlockBuf) compressAndChecksum(c Compression) { } func (d *dataBlockBuf) shouldFlush( - key InternalKey, valueLen, targetBlockSize, sizeThreshold int, + key InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int, ) bool { - return shouldFlush( - key, valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(), - d.dataBlock.nEntries, targetBlockSize, sizeThreshold) + return shouldFlushWithHints( + key.Size(), valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(), + d.dataBlock.nEntries, flushOptions, sizeClassHints) } type indexBlockAndBlockProperties struct { @@ -1400,7 +1413,7 @@ func (w *Writer) flush(key InternalKey) error { // to determine that we are going to flush the index block from the Writer // client. shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush( - sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, + sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses, ) var indexProps []byte @@ -1449,7 +1462,7 @@ func (w *Writer) flush(key InternalKey) error { } func (w *Writer) maybeFlush(key InternalKey, valueLen int) error { - if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold) { + if !w.dataBlockBuf.shouldFlush(key, valueLen, w.dataBlockOptions, w.allocatorSizeClasses) { return nil } @@ -1584,7 +1597,7 @@ func (w *Writer) addIndexEntrySep( ) error { shouldFlush := supportsTwoLevelIndex( w.tableFormat) && w.indexBlock.shouldFlush( - sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold, + sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses, ) var flushableIndexBlock *indexBlockBuf var props []byte @@ -1610,35 +1623,114 @@ func (w *Writer) addIndexEntrySep( return err } -func shouldFlush( - key InternalKey, - valueLen int, - restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int, +func shouldFlushWithHints( + keyLen, valueLen int, + restartInterval, estimatedBlockSize, numEntries int, + flushOptions flushDecisionOptions, + sizeClassHints []int, ) bool { if numEntries == 0 { return false } - if estimatedBlockSize >= targetBlockSize { + // If we are not informed about the memory allocator's size classes we fall + // back to a simple set of flush heuristics that are unaware of internal + // fragmentation in block cache allocations. + if len(sizeClassHints) == 0 { + return shouldFlushWithoutHints( + keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions) + } + + // For size-class aware flushing we need to account for the metadata that is + // allocated when this block is loaded into the block cache. For instance, if + // a block has size 1020B it may fit within a 1024B class. However, when + // loaded into the block cache we also allocate space for the cache entry + // metadata. The new allocation of size ~1052B may now only fit within a + // 2048B class, which increases internal fragmentation. + blockSizeWithMetadata := estimatedBlockSize + cache.ValueMetadataSize + + // For the fast path we can avoid computing the exact varint encoded + // key-value pair size. Instead, we combine the key-value pair size with an + // upper-bound estimate of the associated metadata (4B restart point, 4B + // shared prefix length, 5B varint unshared key size, 5B varint value size). + newEstimatedSize := blockSizeWithMetadata + keyLen + valueLen + 18 + // Our new block size estimate disregards key prefix compression. This puts + // us at risk of overestimating the size and flushing small blocks. We + // mitigate this by imposing a minimum size restriction. + if blockSizeWithMetadata <= flushOptions.sizeClassAwareThreshold || newEstimatedSize <= flushOptions.blockSize { + return false + } + + sizeClass, ok := blockSizeClass(blockSizeWithMetadata, sizeClassHints) + // If the block size could not be mapped to a size class we fall back to + // using a simpler set of flush heuristics. + if !ok { + return shouldFlushWithoutHints( + keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions) + } + + // Tighter upper-bound estimate of the metadata stored with the next + // key-value pair. + newSize := blockSizeWithMetadata + keyLen + valueLen + if numEntries%restartInterval == 0 { + newSize += 4 + } + newSize += 4 // varint for shared prefix length + newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes + newSize += uvarintLen(uint32(valueLen)) // varint for value size + + if blockSizeWithMetadata < flushOptions.blockSize { + newSizeClass, ok := blockSizeClass(newSize, sizeClassHints) + if ok && newSizeClass-newSize >= sizeClass-blockSizeWithMetadata { + // Although the block hasn't reached the target size, waiting to insert the + // next entry would exceed the target and increase memory fragmentation. + return true + } + return false + } + + // Flush if inserting the next entry bumps the block size to the memory + // allocator's next size class. + return newSize > sizeClass +} + +func shouldFlushWithoutHints( + keyLen, valueLen int, + restartInterval, estimatedBlockSize, numEntries int, + flushOptions flushDecisionOptions, +) bool { + if estimatedBlockSize >= flushOptions.blockSize { return true } // The block is currently smaller than the target size. - if estimatedBlockSize <= sizeThreshold { + if estimatedBlockSize <= flushOptions.blockSizeThreshold { // The block is smaller than the threshold size at which we'll consider // flushing it. return false } - newSize := estimatedBlockSize + key.Size() + valueLen + newSize := estimatedBlockSize + keyLen + valueLen if numEntries%restartInterval == 0 { newSize += 4 } - newSize += 4 // varint for shared prefix length - newSize += uvarintLen(uint32(key.Size())) // varint for unshared key bytes - newSize += uvarintLen(uint32(valueLen)) // varint for value size + newSize += 4 // varint for shared prefix length + newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes + newSize += uvarintLen(uint32(valueLen)) // varint for value size // Flush if the block plus the new entry is larger than the target size. - return newSize > targetBlockSize + return newSize > flushOptions.blockSize +} + +// blockSizeClass returns the smallest memory allocator size class that could +// hold a block of a given size and returns a boolean indicating whether an +// appropriate size class was found. It is useful for computing the potential +// space wasted by an allocation. +func blockSizeClass(blockSize int, sizeClassHints []int) (int, bool) { + sizeClassIdx, _ := slices.BinarySearch(sizeClassHints, blockSize) + if sizeClassIdx == len(sizeClassHints) { + return -1, false + } + return sizeClassHints[sizeClassIdx], true } func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) { @@ -2188,23 +2280,29 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write meta: WriterMetadata{ SmallestSeqNum: math.MaxUint64, }, - blockSize: o.BlockSize, - blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100, - indexBlockSize: o.IndexBlockSize, - indexBlockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100, - compare: o.Comparer.Compare, - split: o.Comparer.Split, - formatKey: o.Comparer.FormatKey, - compression: o.Compression, - separator: o.Comparer.Separator, - successor: o.Comparer.Successor, - tableFormat: o.TableFormat, - isStrictObsolete: o.IsStrictObsolete, - writingToLowestLevel: o.WritingToLowestLevel, - cache: o.Cache, - restartInterval: o.BlockRestartInterval, - checksumType: o.Checksum, - indexBlock: newIndexBlockBuf(o.Parallelism), + dataBlockOptions: flushDecisionOptions{ + blockSize: o.BlockSize, + blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100, + sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100, + }, + indexBlockOptions: flushDecisionOptions{ + blockSize: o.IndexBlockSize, + blockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100, + sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100, + }, + compare: o.Comparer.Compare, + split: o.Comparer.Split, + formatKey: o.Comparer.FormatKey, + compression: o.Compression, + separator: o.Comparer.Separator, + successor: o.Comparer.Successor, + tableFormat: o.TableFormat, + isStrictObsolete: o.IsStrictObsolete, + writingToLowestLevel: o.WritingToLowestLevel, + cache: o.Cache, + restartInterval: o.BlockRestartInterval, + checksumType: o.Checksum, + indexBlock: newIndexBlockBuf(o.Parallelism), rangeDelBlock: blockWriter{ restartInterval: 1, }, @@ -2218,13 +2316,14 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write Cmp: o.Comparer.Compare, Format: o.Comparer.FormatKey, }, + allocatorSizeClasses: o.AllocatorSizeClasses, } if w.tableFormat >= TableFormatPebblev3 { w.shortAttributeExtractor = o.ShortAttributeExtractor w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound if !o.DisableValueBlocks { w.valueBlockWriter = newValueBlockWriter( - w.blockSize, w.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) { + w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) { w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0) }) } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index a475ce7067..f627475950 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -665,6 +665,51 @@ func TestWriterClearCache(t *testing.T) { require.NoError(t, r.Close()) } +func TestWriterFlushHeuristics(t *testing.T) { + datadriven.RunTest(t, "testdata/flush_heuristics", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "build": + var keySize, valSize, blockSize, targetSize, sizeThreshold, sizeClassAwareThreshold int + td.ScanArgs(t, "key-size", &keySize) + td.ScanArgs(t, "val-size", &valSize) + td.ScanArgs(t, "block-size", &blockSize) + td.ScanArgs(t, "target-size", &targetSize) + td.ScanArgs(t, "threshold", &sizeThreshold) + td.ScanArgs(t, "size-class-threshold", &sizeClassAwareThreshold) + + var sizeClasses []int + if td.HasArg("hints") { + var sizeClassHints string + td.ScanArgs(t, "hints", &sizeClassHints) + sizeStrClasses := strings.Split(sizeClassHints, ",") + for _, strClass := range sizeStrClasses { + size, err := strconv.Atoi(strClass) + require.NoError(t, err) + sizeClasses = append(sizeClasses, size) + } + } + + flush := shouldFlushWithHints( + keySize, + valSize, + base.DefaultBlockRestartInterval, + blockSize, + 1, /* numEntries */ + flushDecisionOptions{ + blockSize: targetSize, + blockSizeThreshold: sizeThreshold, + sizeClassAwareThreshold: sizeClassAwareThreshold, + }, + sizeClasses, + ) + return strconv.FormatBool(flush) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + type discardFile struct { wrote int64 }