Skip to content

Commit

Permalink
Merge pull request #1379 from dt/writer-index-reallocs
Browse files Browse the repository at this point in the history
sstable: reduce writer allocations during index flushing
  • Loading branch information
dt authored Nov 23, 2021
2 parents 9cece17 + bd9b59c commit db1a741
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 53 deletions.
64 changes: 47 additions & 17 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ type Writer struct {
xxHasher *xxhash.Digest

topLevelIndexBlock blockWriter
indexPartitions []indexBlockWriterAndBlockProperties
indexPartitions []indexBlockAndBlockProperties
keyAlloc []byte
indexBlockAlloc []byte
}

type indexBlockWriterAndBlockProperties struct {
writer blockWriter
type indexBlockAndBlockProperties struct {
nEntries int
sep InternalKey
properties []byte
block []byte
}

// Set sets the value for the given key. The sequence number is set to
Expand Down Expand Up @@ -269,7 +273,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error {
// semantically identical, because we need to ensure that SmallestPoint.UserKey
// is not nil. This is required by WriterMetadata.Smallest in order to
// distinguish between an unset SmallestPoint and a zero-length one.
w.meta.SmallestPoint = w.meta.LargestPoint.Clone()
w.keyAlloc, w.meta.SmallestPoint = cloneKeyWithBuf(w.meta.LargestPoint, w.keyAlloc)
}

w.props.NumEntries++
Expand Down Expand Up @@ -434,14 +438,25 @@ func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) e
prevKey := base.DecodeInternalKey(w.block.curKey)
var sep InternalKey
if key.UserKey == nil && key.Trailer == 0 {
sep = prevKey.Successor(w.compare, w.successor, nil)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Successor(w.compare, w.successor, w.keyAlloc[:0])
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
} else {
sep = prevKey.Separator(w.compare, w.separator, nil, key)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Separator(w.compare, w.separator, w.keyAlloc[:0], key)
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)

if supportsTwoLevelIndex(w.tableFormat) &&
shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) {
if cap(w.indexPartitions) == 0 {
w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
}
// Enable two level indexes if there is more than one index block.
w.twoLevelIndex = true
if err := w.finishIndexBlock(); err != nil {
Expand Down Expand Up @@ -486,6 +501,19 @@ func shouldFlush(
return newSize > blockSize
}

const keyAllocSize = 256 << 10

func cloneKeyWithBuf(k InternalKey, buf []byte) ([]byte, InternalKey) {
if len(k.UserKey) == 0 {
return buf, k
}
if len(buf) < len(k.UserKey) {
buf = make([]byte, len(k.UserKey)+keyAllocSize)
}
n := copy(buf, k.UserKey)
return buf[n:], InternalKey{UserKey: buf[:n:n], Trailer: k.Trailer}
}

// finishIndexBlock finishes the current index block and adds it to the top
// level index block. This is only used when two level indexes are enabled.
func (w *Writer) finishIndexBlock() error {
Expand All @@ -500,14 +528,16 @@ func (w *Writer) finishIndexBlock() error {
w.blockPropsEncoder.addProp(shortID(i), scratch)
}
}
w.indexPartitions = append(w.indexPartitions,
indexBlockWriterAndBlockProperties{
writer: w.indexBlock,
properties: w.blockPropsEncoder.props(),
})
w.indexBlock = blockWriter{
restartInterval: 1,
part := indexBlockAndBlockProperties{nEntries: w.indexBlock.nEntries, properties: w.blockPropsEncoder.props()}
w.keyAlloc, part.sep = cloneKeyWithBuf(base.DecodeInternalKey(w.indexBlock.curKey), w.keyAlloc)
bk := w.indexBlock.finish()
if len(w.indexBlockAlloc) < len(bk) {
w.indexBlockAlloc = make([]byte, len(bk)*16)
}
n := copy(w.indexBlockAlloc, bk)
part.block = w.indexBlockAlloc[:n:n]
w.indexBlockAlloc = w.indexBlockAlloc[n:]
w.indexPartitions = append(w.indexPartitions, part)
return nil
}

Expand All @@ -519,9 +549,9 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {

for i := range w.indexPartitions {
b := &w.indexPartitions[i]
w.props.NumDataBlocks += uint64(b.writer.nEntries)
sep := base.DecodeInternalKey(b.writer.curKey)
data := b.writer.finish()
w.props.NumDataBlocks += uint64(b.nEntries)

data := b.block
w.props.IndexSize += uint64(len(data))
bh, err := w.writeBlock(data, w.compression)
if err != nil {
Expand All @@ -532,7 +562,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
Props: b.properties,
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)
w.topLevelIndexBlock.add(sep, encoded)
w.topLevelIndexBlock.add(b.sep, encoded)
}

// NB: RocksDB includes the block trailer length in the index size
Expand Down
77 changes: 41 additions & 36 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"testing"

"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -177,13 +179,14 @@ func TestWriterClearCache(t *testing.T) {
require.NoError(t, r.Close())
}

type discardFile struct{}
type discardFile struct{ wrote int64 }

func (f discardFile) Close() error {
return nil
}

func (f discardFile) Write(p []byte) (int, error) {
func (f *discardFile) Write(p []byte) (int, error) {
f.wrote += int64(len(p))
return len(p), nil
}

Expand All @@ -193,48 +196,50 @@ func (f discardFile) Sync() error {

func BenchmarkWriter(b *testing.B) {
keys := make([][]byte, 1e6)
const keyLen = 24
keySlab := make([]byte, keyLen*len(keys))
for i := range keys {
key := make([]byte, 24)
key := keySlab[i*keyLen : i*keyLen+keyLen]
binary.BigEndian.PutUint64(key[:8], 123) // 16-byte shared prefix
binary.BigEndian.PutUint64(key[8:16], 456)
binary.BigEndian.PutUint64(key[16:], uint64(i))
keys[i] = key
}

benchmarks := []struct {
name string
options WriterOptions
}{
{
name: "Default",
options: WriterOptions{
BlockRestartInterval: 16,
BlockSize: 32 << 10,
Compression: SnappyCompression,
FilterPolicy: bloom.FilterPolicy(10),
},
},
{
name: "Zstd",
options: WriterOptions{
BlockRestartInterval: 16,
BlockSize: 32 << 10,
Compression: ZstdCompression,
FilterPolicy: bloom.FilterPolicy(10),
},
},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := NewWriter(discardFile{}, bm.options)

for j := range keys {
if err := w.Set(keys[j], keys[j]); err != nil {
b.Fatal(err)
b.ResetTimer()

for _, bs := range []int{base.DefaultBlockSize, 32 << 10} {
b.Run(fmt.Sprintf("block=%s", humanize.IEC.Int64(int64(bs))), func(b *testing.B) {
for _, filter := range []bool{true, false} {
b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) {
for _, comp := range []Compression{NoCompression, SnappyCompression, ZstdCompression} {
b.Run(fmt.Sprintf("compression=%s", comp), func(b *testing.B) {
opts := WriterOptions{
BlockRestartInterval: 16,
BlockSize: bs,
Compression: comp,
}
if filter {
opts.FilterPolicy = bloom.FilterPolicy(10)
}
f := &discardFile{}
for i := 0; i < b.N; i++ {
f.wrote = 0
w := NewWriter(f, opts)

for j := range keys {
if err := w.Set(keys[j], keys[j]); err != nil {
b.Fatal(err)
}
}
if err := w.Close(); err != nil {
b.Fatal(err)
}
b.SetBytes(int64(f.wrote))
}
})
}
}
})
}
})
}
Expand Down

0 comments on commit db1a741

Please sign in to comment.