Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sstable: reduce writer allocations during index flushing #1379

Merged
merged 2 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,16 @@ type Writer struct {
xxHasher *xxhash.Digest

topLevelIndexBlock blockWriter
indexPartitions []indexBlockWriterAndBlockProperties
indexPartitions []indexBlockAndBlockProperties
keyAlloc []byte
indexBlockAlloc []byte
}

type indexBlockWriterAndBlockProperties struct {
writer blockWriter
type indexBlockAndBlockProperties struct {
nEntries int
sep InternalKey
properties []byte
block []byte
}

// Set sets the value for the given key. The sequence number is set to
Expand Down Expand Up @@ -267,7 +271,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error {
// semantically identical, because we need to ensure that SmallestPoint.UserKey
// is not nil. This is required by WriterMetadata.Smallest in order to
// distinguish between an unset SmallestPoint and a zero-length one.
w.meta.SmallestPoint = w.meta.LargestPoint.Clone()
w.keyAlloc, w.meta.SmallestPoint = cloneKeyWithBuf(w.meta.LargestPoint, w.keyAlloc)
}

w.props.NumEntries++
Expand Down Expand Up @@ -432,14 +436,25 @@ func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) e
prevKey := base.DecodeInternalKey(w.block.curKey)
var sep InternalKey
if key.UserKey == nil && key.Trailer == 0 {
sep = prevKey.Successor(w.compare, w.successor, nil)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Successor(w.compare, w.successor, w.keyAlloc[:0])
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
} else {
sep = prevKey.Separator(w.compare, w.separator, nil, key)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Separator(w.compare, w.separator, w.keyAlloc[:0], key)
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)

if supportsTwoLevelIndex(w.tableFormat) &&
shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) {
if cap(w.indexPartitions) == 0 {
w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
}
// Enable two level indexes if there is more than one index block.
w.twoLevelIndex = true
if err := w.finishIndexBlock(); err != nil {
Expand Down Expand Up @@ -484,6 +499,19 @@ func shouldFlush(
return newSize > blockSize
}

const keyAllocSize = 256 << 10

func cloneKeyWithBuf(k InternalKey, buf []byte) ([]byte, InternalKey) {
if len(k.UserKey) == 0 {
return buf, k
}
if len(buf) < len(k.UserKey) {
buf = make([]byte, len(k.UserKey)+keyAllocSize)
}
n := copy(buf, k.UserKey)
return buf[n:], InternalKey{UserKey: buf[:n:n], Trailer: k.Trailer}
}

// finishIndexBlock finishes the current index block and adds it to the top
// level index block. This is only used when two level indexes are enabled.
func (w *Writer) finishIndexBlock() error {
Expand All @@ -498,14 +526,16 @@ func (w *Writer) finishIndexBlock() error {
w.blockPropsEncoder.addProp(shortID(i), scratch)
}
}
w.indexPartitions = append(w.indexPartitions,
indexBlockWriterAndBlockProperties{
writer: w.indexBlock,
properties: w.blockPropsEncoder.props(),
})
w.indexBlock = blockWriter{
restartInterval: 1,
part := indexBlockAndBlockProperties{nEntries: w.indexBlock.nEntries, properties: w.blockPropsEncoder.props()}
w.keyAlloc, part.sep = cloneKeyWithBuf(base.DecodeInternalKey(w.indexBlock.curKey), w.keyAlloc)
bk := w.indexBlock.finish()
if len(w.indexBlockAlloc) < len(bk) {
w.indexBlockAlloc = make([]byte, len(bk)*16)
}
n := copy(w.indexBlockAlloc, bk)
part.block = w.indexBlockAlloc[:n:n]
w.indexBlockAlloc = w.indexBlockAlloc[n:]
w.indexPartitions = append(w.indexPartitions, part)
return nil
}

Expand All @@ -517,9 +547,9 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {

for i := range w.indexPartitions {
b := &w.indexPartitions[i]
w.props.NumDataBlocks += uint64(b.writer.nEntries)
sep := base.DecodeInternalKey(b.writer.curKey)
data := b.writer.finish()
w.props.NumDataBlocks += uint64(b.nEntries)

data := b.block
w.props.IndexSize += uint64(len(data))
bh, err := w.writeBlock(data, w.compression)
if err != nil {
Expand All @@ -530,7 +560,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
Props: b.properties,
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)
w.topLevelIndexBlock.add(sep, encoded)
w.topLevelIndexBlock.add(b.sep, encoded)
}

// NB: RocksDB includes the block trailer length in the index size
Expand Down
77 changes: 41 additions & 36 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"testing"

"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -177,13 +179,14 @@ func TestWriterClearCache(t *testing.T) {
require.NoError(t, r.Close())
}

type discardFile struct{}
type discardFile struct{ wrote int64 }

func (f discardFile) Close() error {
return nil
}

func (f discardFile) Write(p []byte) (int, error) {
func (f *discardFile) Write(p []byte) (int, error) {
f.wrote += int64(len(p))
return len(p), nil
}

Expand All @@ -193,48 +196,50 @@ func (f discardFile) Sync() error {

func BenchmarkWriter(b *testing.B) {
keys := make([][]byte, 1e6)
const keyLen = 24
keySlab := make([]byte, keyLen*len(keys))
for i := range keys {
key := make([]byte, 24)
key := keySlab[i*keyLen : i*keyLen+keyLen]
binary.BigEndian.PutUint64(key[:8], 123) // 16-byte shared prefix
binary.BigEndian.PutUint64(key[8:16], 456)
binary.BigEndian.PutUint64(key[16:], uint64(i))
keys[i] = key
}

benchmarks := []struct {
name string
options WriterOptions
}{
{
name: "Default",
options: WriterOptions{
BlockRestartInterval: 16,
BlockSize: 32 << 10,
Compression: SnappyCompression,
FilterPolicy: bloom.FilterPolicy(10),
},
},
{
name: "Zstd",
options: WriterOptions{
BlockRestartInterval: 16,
BlockSize: 32 << 10,
Compression: ZstdCompression,
FilterPolicy: bloom.FilterPolicy(10),
},
},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := NewWriter(discardFile{}, bm.options)

for j := range keys {
if err := w.Set(keys[j], keys[j]); err != nil {
b.Fatal(err)
b.ResetTimer()

for _, bs := range []int{base.DefaultBlockSize, 32 << 10} {
b.Run(fmt.Sprintf("block=%s", humanize.IEC.Int64(int64(bs))), func(b *testing.B) {
for _, filter := range []bool{true, false} {
b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) {
for _, comp := range []Compression{NoCompression, SnappyCompression, ZstdCompression} {
b.Run(fmt.Sprintf("compression=%s", comp), func(b *testing.B) {
opts := WriterOptions{
BlockRestartInterval: 16,
BlockSize: bs,
Compression: comp,
}
if filter {
opts.FilterPolicy = bloom.FilterPolicy(10)
}
f := &discardFile{}
for i := 0; i < b.N; i++ {
f.wrote = 0
w := NewWriter(f, opts)

for j := range keys {
if err := w.Set(keys[j], keys[j]); err != nil {
b.Fatal(err)
}
}
if err := w.Close(); err != nil {
b.Fatal(err)
}
b.SetBytes(int64(f.wrote))
}
})
}
}
})
}
})
}
Expand Down