From e57e5fdb39ff84f1df42a3785236ba9d6a60d9bb Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 19 Nov 2021 23:06:59 +0000 Subject: [PATCH 1/2] sstable: extend BenchmarkWriter This extends BenchmarkWriter to test block sizes as well as compressions, and generates names based on the parameters. Additionally it Close()'s the Writer since some writes are deferred until Close but should be included in the overall benchmark numbers. Finally, the discardWriter is extended to include a written byte counter to provide MB/s metrics to the benchmark. --- sstable/writer_test.go | 77 ++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 5837eb11e1..b3d6c80bd4 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -11,8 +11,10 @@ import ( "testing" "github.com/cockroachdb/pebble/bloom" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -177,13 +179,14 @@ func TestWriterClearCache(t *testing.T) { require.NoError(t, r.Close()) } -type discardFile struct{} +type discardFile struct{ wrote int64 } func (f discardFile) Close() error { return nil } -func (f discardFile) Write(p []byte) (int, error) { +func (f *discardFile) Write(p []byte) (int, error) { + f.wrote += int64(len(p)) return len(p), nil } @@ -193,48 +196,50 @@ func (f discardFile) Sync() error { func BenchmarkWriter(b *testing.B) { keys := make([][]byte, 1e6) + const keyLen = 24 + keySlab := make([]byte, keyLen*len(keys)) for i := range keys { - key := make([]byte, 24) + key := keySlab[i*keyLen : i*keyLen+keyLen] binary.BigEndian.PutUint64(key[:8], 123) // 16-byte shared prefix binary.BigEndian.PutUint64(key[8:16], 456) binary.BigEndian.PutUint64(key[16:], uint64(i)) keys[i] = key } - benchmarks := []struct { - name string - options WriterOptions - }{ - { - name: "Default", - options: WriterOptions{ - BlockRestartInterval: 16, - BlockSize: 32 << 10, - Compression: SnappyCompression, - FilterPolicy: bloom.FilterPolicy(10), - }, - }, - { - name: "Zstd", - options: WriterOptions{ - BlockRestartInterval: 16, - BlockSize: 32 << 10, - Compression: ZstdCompression, - FilterPolicy: bloom.FilterPolicy(10), - }, - }, - } - - for _, bm := range benchmarks { - b.Run(bm.name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - w := NewWriter(discardFile{}, bm.options) - - for j := range keys { - if err := w.Set(keys[j], keys[j]); err != nil { - b.Fatal(err) + b.ResetTimer() + + for _, bs := range []int{base.DefaultBlockSize, 32 << 10} { + b.Run(fmt.Sprintf("block=%s", humanize.IEC.Int64(int64(bs))), func(b *testing.B) { + for _, filter := range []bool{true, false} { + b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) { + for _, comp := range []Compression{NoCompression, SnappyCompression, ZstdCompression} { + b.Run(fmt.Sprintf("compression=%s", comp), func(b *testing.B) { + opts := WriterOptions{ + BlockRestartInterval: 16, + BlockSize: bs, + Compression: comp, + } + if filter { + opts.FilterPolicy = bloom.FilterPolicy(10) + } + f := &discardFile{} + for i := 0; i < b.N; i++ { + f.wrote = 0 + w := NewWriter(f, opts) + + for j := range keys { + if err := w.Set(keys[j], keys[j]); err != nil { + b.Fatal(err) + } + } + if err := w.Close(); err != nil { + b.Fatal(err) + } + b.SetBytes(int64(f.wrote)) + } + }) } - } + }) } }) } From bd9b59c88beab562413d8049aa8f33263da47a86 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 20 Nov 2021 01:51:41 +0000 Subject: [PATCH 2/2] sstable: reduce allocations during index flushing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While working on a separate change that included passing a large number of pre-generated blocks all to addIndexEntry in a tight loop, some of the allocations it was doing per block flushed became more apparent in profiles. By batch allocating keys used as separators, and by adjusting the buffering of filled sub-index blocks to store there finished byte representation (also batch allocated) and relevant details, rather than the whole block writer used for that sub-index block, that blockWriter can instead be reused, which importantly allows reuse of its allocated buffers for things like restarts, eliminating a significant source of allocations for when flushing many sub-index blocks. ``` name old time/op new time/op delta Writer/block=4.0_K/filter=false/compression=NoCompression-10 50.4ms ± 0% 50.4ms ± 1% ~ (p=0.730 n=4+5) Writer/block=4.0_K/filter=false/compression=Snappy-10 62.6ms ± 2% 62.1ms ± 1% ~ (p=0.222 n=5+5) Writer/block=32_K/filter=false/compression=NoCompression-10 48.1ms ± 0% 48.5ms ± 0% +0.73% (p=0.008 n=5+5) Writer/block=32_K/filter=false/compression=Snappy-10 59.0ms ± 3% 58.1ms ± 2% ~ (p=0.690 n=5+5) name old speed new speed delta Writer/block=4.0_K/filter=false/compression=NoCompression-10 759MB/s ± 0% 759MB/s ± 1% ~ (p=0.730 n=4+5) Writer/block=4.0_K/filter=false/compression=Snappy-10 156MB/s ± 2% 157MB/s ± 1% ~ (p=0.222 n=5+5) Writer/block=32_K/filter=false/compression=NoCompression-10 785MB/s ± 0% 779MB/s ± 0% -0.73% (p=0.008 n=5+5) Writer/block=32_K/filter=false/compression=Snappy-10 115MB/s ± 3% 117MB/s ± 2% ~ (p=0.690 n=5+5) name old alloc/op new alloc/op delta Writer/block=4.0_K/filter=false/compression=NoCompression-10 1.18MB ± 0% 0.80MB ± 0% -32.14% (p=0.008 n=5+5) Writer/block=4.0_K/filter=false/compression=Snappy-10 1.20MB ± 0% 0.83MB ± 0% -31.08% (p=0.008 n=5+5) Writer/block=32_K/filter=false/compression=NoCompression-10 252kB ± 0% 952kB ± 0% +278.63% (p=0.008 n=5+5) Writer/block=32_K/filter=false/compression=Snappy-10 456kB ± 0% 1157kB ± 0% +153.58% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Writer/block=4.0_K/filter=false/compression=NoCompression-10 10.7k ± 0% 0.1k ± 0% -99.03% (p=0.008 n=5+5) Writer/block=4.0_K/filter=false/compression=Snappy-10 10.7k ± 0% 0.1k ± 0% -98.97% (p=0.008 n=5+5) Writer/block=32_K/filter=false/compression=NoCompression-10 1.27k ± 0% 0.10k ± 0% ~ (p=0.079 n=4+5) Writer/block=32_K/filter=false/compression=Snappy-10 1.27k ± 0% 0.10k ± 0% -91.84% (p=0.008 n=5+5) ``` --- sstable/writer.go | 64 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/sstable/writer.go b/sstable/writer.go index e99a8b30ee..684a04de31 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -152,12 +152,16 @@ type Writer struct { xxHasher *xxhash.Digest topLevelIndexBlock blockWriter - indexPartitions []indexBlockWriterAndBlockProperties + indexPartitions []indexBlockAndBlockProperties + keyAlloc []byte + indexBlockAlloc []byte } -type indexBlockWriterAndBlockProperties struct { - writer blockWriter +type indexBlockAndBlockProperties struct { + nEntries int + sep InternalKey properties []byte + block []byte } // Set sets the value for the given key. The sequence number is set to @@ -267,7 +271,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { // semantically identical, because we need to ensure that SmallestPoint.UserKey // is not nil. This is required by WriterMetadata.Smallest in order to // distinguish between an unset SmallestPoint and a zero-length one. - w.meta.SmallestPoint = w.meta.LargestPoint.Clone() + w.keyAlloc, w.meta.SmallestPoint = cloneKeyWithBuf(w.meta.LargestPoint, w.keyAlloc) } w.props.NumEntries++ @@ -432,14 +436,25 @@ func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) e prevKey := base.DecodeInternalKey(w.block.curKey) var sep InternalKey if key.UserKey == nil && key.Trailer == 0 { - sep = prevKey.Successor(w.compare, w.successor, nil) + if len(w.keyAlloc) < len(prevKey.UserKey) { + w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize) + } + sep = prevKey.Successor(w.compare, w.successor, w.keyAlloc[:0]) + w.keyAlloc = w.keyAlloc[len(sep.UserKey):] } else { - sep = prevKey.Separator(w.compare, w.separator, nil, key) + if len(w.keyAlloc) < len(prevKey.UserKey) { + w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize) + } + sep = prevKey.Separator(w.compare, w.separator, w.keyAlloc[:0], key) + w.keyAlloc = w.keyAlloc[len(sep.UserKey):] } encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) if supportsTwoLevelIndex(w.tableFormat) && shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { + if cap(w.indexPartitions) == 0 { + w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32) + } // Enable two level indexes if there is more than one index block. w.twoLevelIndex = true if err := w.finishIndexBlock(); err != nil { @@ -484,6 +499,19 @@ func shouldFlush( return newSize > blockSize } +const keyAllocSize = 256 << 10 + +func cloneKeyWithBuf(k InternalKey, buf []byte) ([]byte, InternalKey) { + if len(k.UserKey) == 0 { + return buf, k + } + if len(buf) < len(k.UserKey) { + buf = make([]byte, len(k.UserKey)+keyAllocSize) + } + n := copy(buf, k.UserKey) + return buf[n:], InternalKey{UserKey: buf[:n:n], Trailer: k.Trailer} +} + // finishIndexBlock finishes the current index block and adds it to the top // level index block. This is only used when two level indexes are enabled. func (w *Writer) finishIndexBlock() error { @@ -498,14 +526,16 @@ func (w *Writer) finishIndexBlock() error { w.blockPropsEncoder.addProp(shortID(i), scratch) } } - w.indexPartitions = append(w.indexPartitions, - indexBlockWriterAndBlockProperties{ - writer: w.indexBlock, - properties: w.blockPropsEncoder.props(), - }) - w.indexBlock = blockWriter{ - restartInterval: 1, + part := indexBlockAndBlockProperties{nEntries: w.indexBlock.nEntries, properties: w.blockPropsEncoder.props()} + w.keyAlloc, part.sep = cloneKeyWithBuf(base.DecodeInternalKey(w.indexBlock.curKey), w.keyAlloc) + bk := w.indexBlock.finish() + if len(w.indexBlockAlloc) < len(bk) { + w.indexBlockAlloc = make([]byte, len(bk)*16) } + n := copy(w.indexBlockAlloc, bk) + part.block = w.indexBlockAlloc[:n:n] + w.indexBlockAlloc = w.indexBlockAlloc[n:] + w.indexPartitions = append(w.indexPartitions, part) return nil } @@ -517,9 +547,9 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { for i := range w.indexPartitions { b := &w.indexPartitions[i] - w.props.NumDataBlocks += uint64(b.writer.nEntries) - sep := base.DecodeInternalKey(b.writer.curKey) - data := b.writer.finish() + w.props.NumDataBlocks += uint64(b.nEntries) + + data := b.block w.props.IndexSize += uint64(len(data)) bh, err := w.writeBlock(data, w.compression) if err != nil { @@ -530,7 +560,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { Props: b.properties, } encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) - w.topLevelIndexBlock.add(sep, encoded) + w.topLevelIndexBlock.add(b.sep, encoded) } // NB: RocksDB includes the block trailer length in the index size