Skip to content

Commit

Permalink
sstable: reduce allocations during index flushing
Browse files Browse the repository at this point in the history
While working on a separate change that included passing a large number
of pre-generated blocks all to addIndexEntry in a tight loop, some of
the allocations it was doing per block flushed became more apparent in
profiles. By batch allocating keys used as separators, and by adjusting
the buffering of filled sub-index blocks to store there finished byte
representation (also batch allocated) and relevant details, rather than
the whole block writer used for that sub-index block, that blockWriter
can instead be reused, which importantly allows reuse of its allocated
buffers for things like restarts, eliminating a significant source of
allocations for when flushing many sub-index blocks.

```
name                                                          old time/op    new time/op    delta
Writer/block=4.0_K/filter=false/compression=NoCompression-10    50.4ms ± 0%    50.4ms ± 1%      ~     (p=0.730 n=4+5)
Writer/block=4.0_K/filter=false/compression=Snappy-10           62.6ms ± 2%    62.1ms ± 1%      ~     (p=0.222 n=5+5)
Writer/block=32_K/filter=false/compression=NoCompression-10     48.1ms ± 0%    48.5ms ± 0%    +0.73%  (p=0.008 n=5+5)
Writer/block=32_K/filter=false/compression=Snappy-10            59.0ms ± 3%    58.1ms ± 2%      ~     (p=0.690 n=5+5)

name                                                          old speed      new speed      delta
Writer/block=4.0_K/filter=false/compression=NoCompression-10   759MB/s ± 0%   759MB/s ± 1%      ~     (p=0.730 n=4+5)
Writer/block=4.0_K/filter=false/compression=Snappy-10          156MB/s ± 2%   157MB/s ± 1%      ~     (p=0.222 n=5+5)
Writer/block=32_K/filter=false/compression=NoCompression-10    785MB/s ± 0%   779MB/s ± 0%    -0.73%  (p=0.008 n=5+5)
Writer/block=32_K/filter=false/compression=Snappy-10           115MB/s ± 3%   117MB/s ± 2%      ~     (p=0.690 n=5+5)

name                                                          old alloc/op   new alloc/op   delta
Writer/block=4.0_K/filter=false/compression=NoCompression-10    1.18MB ± 0%    0.80MB ± 0%   -32.14%  (p=0.008 n=5+5)
Writer/block=4.0_K/filter=false/compression=Snappy-10           1.20MB ± 0%    0.83MB ± 0%   -31.08%  (p=0.008 n=5+5)
Writer/block=32_K/filter=false/compression=NoCompression-10      252kB ± 0%     952kB ± 0%  +278.63%  (p=0.008 n=5+5)
Writer/block=32_K/filter=false/compression=Snappy-10             456kB ± 0%    1157kB ± 0%  +153.58%  (p=0.008 n=5+5)

name                                                          old allocs/op  new allocs/op  delta
Writer/block=4.0_K/filter=false/compression=NoCompression-10     10.7k ± 0%      0.1k ± 0%   -99.03%  (p=0.008 n=5+5)
Writer/block=4.0_K/filter=false/compression=Snappy-10            10.7k ± 0%      0.1k ± 0%   -98.97%  (p=0.008 n=5+5)
Writer/block=32_K/filter=false/compression=NoCompression-10      1.27k ± 0%     0.10k ± 0%      ~     (p=0.079 n=4+5)
Writer/block=32_K/filter=false/compression=Snappy-10             1.27k ± 0%     0.10k ± 0%   -91.84%  (p=0.008 n=5+5)
```
  • Loading branch information
dt committed Nov 23, 2021
1 parent e57e5fd commit bd9b59c
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,16 @@ type Writer struct {
xxHasher *xxhash.Digest

topLevelIndexBlock blockWriter
indexPartitions []indexBlockWriterAndBlockProperties
indexPartitions []indexBlockAndBlockProperties
keyAlloc []byte
indexBlockAlloc []byte
}

type indexBlockWriterAndBlockProperties struct {
writer blockWriter
type indexBlockAndBlockProperties struct {
nEntries int
sep InternalKey
properties []byte
block []byte
}

// Set sets the value for the given key. The sequence number is set to
Expand Down Expand Up @@ -267,7 +271,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error {
// semantically identical, because we need to ensure that SmallestPoint.UserKey
// is not nil. This is required by WriterMetadata.Smallest in order to
// distinguish between an unset SmallestPoint and a zero-length one.
w.meta.SmallestPoint = w.meta.LargestPoint.Clone()
w.keyAlloc, w.meta.SmallestPoint = cloneKeyWithBuf(w.meta.LargestPoint, w.keyAlloc)
}

w.props.NumEntries++
Expand Down Expand Up @@ -432,14 +436,25 @@ func (w *Writer) addIndexEntry(key InternalKey, bhp BlockHandleWithProperties) e
prevKey := base.DecodeInternalKey(w.block.curKey)
var sep InternalKey
if key.UserKey == nil && key.Trailer == 0 {
sep = prevKey.Successor(w.compare, w.successor, nil)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Successor(w.compare, w.successor, w.keyAlloc[:0])
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
} else {
sep = prevKey.Separator(w.compare, w.separator, nil, key)
if len(w.keyAlloc) < len(prevKey.UserKey) {
w.keyAlloc = make([]byte, len(prevKey.UserKey)+keyAllocSize)
}
sep = prevKey.Separator(w.compare, w.separator, w.keyAlloc[:0], key)
w.keyAlloc = w.keyAlloc[len(sep.UserKey):]
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)

if supportsTwoLevelIndex(w.tableFormat) &&
shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) {
if cap(w.indexPartitions) == 0 {
w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
}
// Enable two level indexes if there is more than one index block.
w.twoLevelIndex = true
if err := w.finishIndexBlock(); err != nil {
Expand Down Expand Up @@ -484,6 +499,19 @@ func shouldFlush(
return newSize > blockSize
}

const keyAllocSize = 256 << 10

func cloneKeyWithBuf(k InternalKey, buf []byte) ([]byte, InternalKey) {
if len(k.UserKey) == 0 {
return buf, k
}
if len(buf) < len(k.UserKey) {
buf = make([]byte, len(k.UserKey)+keyAllocSize)
}
n := copy(buf, k.UserKey)
return buf[n:], InternalKey{UserKey: buf[:n:n], Trailer: k.Trailer}
}

// finishIndexBlock finishes the current index block and adds it to the top
// level index block. This is only used when two level indexes are enabled.
func (w *Writer) finishIndexBlock() error {
Expand All @@ -498,14 +526,16 @@ func (w *Writer) finishIndexBlock() error {
w.blockPropsEncoder.addProp(shortID(i), scratch)
}
}
w.indexPartitions = append(w.indexPartitions,
indexBlockWriterAndBlockProperties{
writer: w.indexBlock,
properties: w.blockPropsEncoder.props(),
})
w.indexBlock = blockWriter{
restartInterval: 1,
part := indexBlockAndBlockProperties{nEntries: w.indexBlock.nEntries, properties: w.blockPropsEncoder.props()}
w.keyAlloc, part.sep = cloneKeyWithBuf(base.DecodeInternalKey(w.indexBlock.curKey), w.keyAlloc)
bk := w.indexBlock.finish()
if len(w.indexBlockAlloc) < len(bk) {
w.indexBlockAlloc = make([]byte, len(bk)*16)
}
n := copy(w.indexBlockAlloc, bk)
part.block = w.indexBlockAlloc[:n:n]
w.indexBlockAlloc = w.indexBlockAlloc[n:]
w.indexPartitions = append(w.indexPartitions, part)
return nil
}

Expand All @@ -517,9 +547,9 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {

for i := range w.indexPartitions {
b := &w.indexPartitions[i]
w.props.NumDataBlocks += uint64(b.writer.nEntries)
sep := base.DecodeInternalKey(b.writer.curKey)
data := b.writer.finish()
w.props.NumDataBlocks += uint64(b.nEntries)

data := b.block
w.props.IndexSize += uint64(len(data))
bh, err := w.writeBlock(data, w.compression)
if err != nil {
Expand All @@ -530,7 +560,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
Props: b.properties,
}
encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp)
w.topLevelIndexBlock.add(sep, encoded)
w.topLevelIndexBlock.add(b.sep, encoded)
}

// NB: RocksDB includes the block trailer length in the index size
Expand Down

0 comments on commit bd9b59c

Please sign in to comment.