diff --git a/sstable/writer.go b/sstable/writer.go index e99a8b30ee..684a04de31 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -152,12 +152,16 @@ type Writer struct { xxHasher *xxhash.Digest topLevelIndexBlock blockWriter - indexPartitions []indexBlockWriterAndBlockProperties + indexPartitions []indexBlockAndBlockProperties + keyAlloc []byte + indexBlockAlloc []byte } -type indexBlockWriterAndBlockProperties struct { - writer blockWriter +type indexBlockAndBlockProperties struct { + nEntries int + sep InternalKey properties []byte + block []byte } // Set sets the value for the given key. The sequence number is set to @@ -267,7 +271,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { // semantically identical, because we need to ensure that SmallestPoint.UserKey // is not nil. This is required by WriterMetadata.Smallest in order to // distinguish between an unset SmallestPoint and a zero-length one. - w.meta.SmallestPoint = w.meta.LargestPoint.Clone() + w.keyAlloc, w.meta.SmallestPoint = cloneKeyWithBuf(w.meta.LargestPoint, w.keyAlloc) } w.props.NumEntries++ @@ -432,14 +436,25 @@ func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) e prevKey := base.DecodeInternalKey(w.block.curKey) var sep InternalKey if key.UserKey == nil && key.Trailer == 0 { - sep = prevKey.Successor(w.compare, w.successor, nil) + if len(w.keyAlloc) < len(prevKey.UserKey) { + w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize) + } + sep = prevKey.Successor(w.compare, w.successor, w.keyAlloc[:0]) + w.keyAlloc = w.keyAlloc[len(sep.UserKey):] } else { - sep = prevKey.Separator(w.compare, w.separator, nil, key) + if len(w.keyAlloc) < len(prevKey.UserKey) { + w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize) + } + sep = prevKey.Separator(w.compare, w.separator, w.keyAlloc[:0], key) + w.keyAlloc = w.keyAlloc[len(sep.UserKey):] } encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) if supportsTwoLevelIndex(w.tableFormat) && shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { + if cap(w.indexPartitions) == 0 { + w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32) + } // Enable two level indexes if there is more than one index block. w.twoLevelIndex = true if err := w.finishIndexBlock(); err != nil { @@ -484,6 +499,19 @@ func shouldFlush( return newSize > blockSize } +const keyAllocSize = 256 << 10 + +func cloneKeyWithBuf(k InternalKey, buf []byte) ([]byte, InternalKey) { + if len(k.UserKey) == 0 { + return buf, k + } + if len(buf) < len(k.UserKey) { + buf = make([]byte, len(k.UserKey)+keyAllocSize) + } + n := copy(buf, k.UserKey) + return buf[n:], InternalKey{UserKey: buf[:n:n], Trailer: k.Trailer} +} + // finishIndexBlock finishes the current index block and adds it to the top // level index block. This is only used when two level indexes are enabled. func (w *Writer) finishIndexBlock() error { @@ -498,14 +526,16 @@ func (w *Writer) finishIndexBlock() error { w.blockPropsEncoder.addProp(shortID(i), scratch) } } - w.indexPartitions = append(w.indexPartitions, - indexBlockWriterAndBlockProperties{ - writer: w.indexBlock, - properties: w.blockPropsEncoder.props(), - }) - w.indexBlock = blockWriter{ - restartInterval: 1, + part := indexBlockAndBlockProperties{nEntries: w.indexBlock.nEntries, properties: w.blockPropsEncoder.props()} + w.keyAlloc, part.sep = cloneKeyWithBuf(base.DecodeInternalKey(w.indexBlock.curKey), w.keyAlloc) + bk := w.indexBlock.finish() + if len(w.indexBlockAlloc) < len(bk) { + w.indexBlockAlloc = make([]byte, len(bk)*16) } + n := copy(w.indexBlockAlloc, bk) + part.block = w.indexBlockAlloc[:n:n] + w.indexBlockAlloc = w.indexBlockAlloc[n:] + w.indexPartitions = append(w.indexPartitions, part) return nil } @@ -517,9 +547,9 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { for i := range w.indexPartitions { b := &w.indexPartitions[i] - w.props.NumDataBlocks += uint64(b.writer.nEntries) - sep := base.DecodeInternalKey(b.writer.curKey) - data := b.writer.finish() + w.props.NumDataBlocks += uint64(b.nEntries) + + data := b.block w.props.IndexSize += uint64(len(data)) bh, err := w.writeBlock(data, w.compression) if err != nil { @@ -530,7 +560,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { Props: b.properties, } encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) - w.topLevelIndexBlock.add(sep, encoded) + w.topLevelIndexBlock.add(b.sep, encoded) } // NB: RocksDB includes the block trailer length in the index size diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 5837eb11e1..b3d6c80bd4 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -11,8 +11,10 @@ import ( "testing" "github.com/cockroachdb/pebble/bloom" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -177,13 +179,14 @@ func TestWriterClearCache(t *testing.T) { require.NoError(t, r.Close()) } -type discardFile struct{} +type discardFile struct{ wrote int64 } func (f discardFile) Close() error { return nil } -func (f discardFile) Write(p []byte) (int, error) { +func (f *discardFile) Write(p []byte) (int, error) { + f.wrote += int64(len(p)) return len(p), nil } @@ -193,48 +196,50 @@ func (f discardFile) Sync() error { func BenchmarkWriter(b *testing.B) { keys := make([][]byte, 1e6) + const keyLen = 24 + keySlab := make([]byte, keyLen*len(keys)) for i := range keys { - key := make([]byte, 24) + key := keySlab[i*keyLen : i*keyLen+keyLen] binary.BigEndian.PutUint64(key[:8], 123) // 16-byte shared prefix binary.BigEndian.PutUint64(key[8:16], 456) binary.BigEndian.PutUint64(key[16:], uint64(i)) keys[i] = key } - benchmarks := []struct { - name string - options WriterOptions - }{ - { - name: "Default", - options: WriterOptions{ - BlockRestartInterval: 16, - BlockSize: 32 << 10, - Compression: SnappyCompression, - FilterPolicy: bloom.FilterPolicy(10), - }, - }, - { - name: "Zstd", - options: WriterOptions{ - BlockRestartInterval: 16, - BlockSize: 32 << 10, - Compression: ZstdCompression, - FilterPolicy: bloom.FilterPolicy(10), - }, - }, - } - - for _, bm := range benchmarks { - b.Run(bm.name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - w := NewWriter(discardFile{}, bm.options) - - for j := range keys { - if err := w.Set(keys[j], keys[j]); err != nil { - b.Fatal(err) + b.ResetTimer() + + for _, bs := range []int{base.DefaultBlockSize, 32 << 10} { + b.Run(fmt.Sprintf("block=%s", humanize.IEC.Int64(int64(bs))), func(b *testing.B) { + for _, filter := range []bool{true, false} { + b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) { + for _, comp := range []Compression{NoCompression, SnappyCompression, ZstdCompression} { + b.Run(fmt.Sprintf("compression=%s", comp), func(b *testing.B) { + opts := WriterOptions{ + BlockRestartInterval: 16, + BlockSize: bs, + Compression: comp, + } + if filter { + opts.FilterPolicy = bloom.FilterPolicy(10) + } + f := &discardFile{} + for i := 0; i < b.N; i++ { + f.wrote = 0 + w := NewWriter(f, opts) + + for j := range keys { + if err := w.Set(keys[j], keys[j]); err != nil { + b.Fatal(err) + } + } + if err := w.Close(); err != nil { + b.Fatal(err) + } + b.SetBytes(int64(f.wrote)) + } + }) } - } + }) } }) }