Skip to content

Commit

Permalink
Merge pull request #166 from Ryanfsdf/ryankim/memtable-flushing
Browse files Browse the repository at this point in the history
Add memtable flush pacing mechanism
  • Loading branch information
Ryanfsdf authored Jul 9, 2019
2 parents a3902b0 + 7ee9345 commit 649af7e
Show file tree
Hide file tree
Showing 26 changed files with 661 additions and 98 deletions.
124 changes: 119 additions & 5 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch {
index: uint32(index),
}
if keySize := uint32(len(key)); keySize == 0 {
entry.keyStart = uint32(offset)
entry.keyEnd = uint32(offset)
// Must add 2 to the offset. One byte encodes `kind` and the next
// byte encodes `0`, which is the length of the key.
entry.keyStart = uint32(offset)+2
entry.keyEnd = entry.keyStart
} else {
entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
uintptr(unsafe.Pointer(&b.data[0])))
Expand Down Expand Up @@ -949,6 +951,19 @@ func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
}
}

func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
return &flushFlushableBatchIter{
flushableBatchIter: flushableBatchIter{
batch: b,
data: b.data,
offsets: b.offsets,
cmp: b.cmp,
index: -1,
},
bytesIterated: bytesFlushed,
}
}

func (b *flushableBatch) newRangeDelIter(o *IterOptions) internalIterator {
if len(b.rangeDelOffsets) == 0 {
return nil
Expand Down Expand Up @@ -983,6 +998,10 @@ func (b *flushableBatch) newRangeDelIter(o *IterOptions) internalIterator {
return rangedel.NewIter(b.cmp, b.tombstones)
}

func (b *flushableBatch) totalBytes() uint64 {
return uint64(len(b.data) - batchHeaderLen)
}

func (b *flushableBatch) flushed() chan struct{} {
return b.flushedCh
}
Expand Down Expand Up @@ -1076,6 +1095,8 @@ func (i *flushableBatchIter) Last() (*InternalKey, []byte) {
return &i.key, i.Value()
}

// Note: flushFlushableBatchIter.Next mirrors the implementation of
// flushableBatchIter.Next due to performance. Keep the two in sync.
func (i *flushableBatchIter) Next() (*InternalKey, []byte) {
if i.index == len(i.offsets) {
return nil, nil
Expand Down Expand Up @@ -1120,10 +1141,26 @@ func (i *flushableBatchIter) Key() *InternalKey {
}

func (i *flushableBatchIter) Value() []byte {
offset := i.offsets[i.index].offset
_, _, value, ok := batchDecode(i.batch.data, offset)
if !ok {
p := i.data[i.offsets[i.index].offset:]
if len(p) == 0 {
i.err = fmt.Errorf("corrupted batch")
return nil
}
kind := InternalKeyKind(p[0])
if kind > InternalKeyKindMax {
i.err = fmt.Errorf("corrupted batch")
return nil
}
var value []byte
var ok bool
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
keyEnd := i.offsets[i.index].keyEnd
_, value, ok = batchDecodeStr(i.data[keyEnd:])
if !ok {
i.err = fmt.Errorf("corrupted batch")
return nil
}
}
return value
}
Expand All @@ -1144,3 +1181,80 @@ func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
i.lower = lower
i.upper = upper
}

// flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
// of number of bytes iterated.
type flushFlushableBatchIter struct {
flushableBatchIter
bytesIterated *uint64
}

// flushFlushableBatchIter implements the internalIterator interface.
var _ internalIterator = (*flushFlushableBatchIter)(nil)

func (i *flushFlushableBatchIter) SeekGE(key []byte) (*InternalKey, []byte) {
panic("pebble: SeekGE unimplemented")
}

func (i *flushFlushableBatchIter) SeekPrefixGE(prefix, key []byte) (*InternalKey, []byte) {
panic("pebble: SeekPrefixGE unimplemented")
}

func (i *flushFlushableBatchIter) SeekLT(key []byte) (*InternalKey, []byte) {
panic("pebble: SeekLT unimplemented")
}

func (i *flushFlushableBatchIter) First() (*InternalKey, []byte) {
key, val := i.flushableBatchIter.First()
if key == nil {
return nil, nil
}
entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
*i.bytesIterated += uint64(entryBytes) + i.valueSize()
return key, val
}

// Note: flushFlushableBatchIter.Next mirrors the implementation of
// flushableBatchIter.Next due to performance. Keep the two in sync.
func (i *flushFlushableBatchIter) Next() (*InternalKey, []byte) {
if i.index == len(i.offsets) {
return nil, nil
}
i.index++
if i.index == len(i.offsets) {
return nil, nil
}
i.key = i.getKey(i.index)
entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
*i.bytesIterated += uint64(entryBytes) + i.valueSize()
return &i.key, i.Value()
}

func (i flushFlushableBatchIter) Prev() (*InternalKey, []byte) {
panic("pebble: Prev unimplemented")
}

func (i flushFlushableBatchIter) valueSize() uint64 {
p := i.data[i.offsets[i.index].offset:]
if len(p) == 0 {
i.err = fmt.Errorf("corrupted batch")
return 0
}
kind := InternalKeyKind(p[0])
if kind > InternalKeyKindMax {
i.err = fmt.Errorf("corrupted batch")
return 0
}
var length uint64
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
keyEnd := i.offsets[i.index].keyEnd
v, n := binary.Uvarint(i.data[keyEnd:])
if n <= 0 {
i.err = fmt.Errorf("corrupted batch")
return 0
}
length = v + uint64(n)
}
return length
}
21 changes: 21 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,27 @@ func TestFlushableBatchDeleteRange(t *testing.T) {
})
}

func TestFlushableBatchBytesIterated(t *testing.T) {
batch := newBatch(nil)
for j := 0; j < 1000; j++ {
key := make([]byte, 8 + j%3)
value := make([]byte, 7 + j%5)
batch.Set(key, value, nil)

fb := newFlushableBatch(batch, DefaultComparer)

var bytesIterated uint64
it := fb.newFlushIter(nil, &bytesIterated)

for it.First(); it.Valid(); it.Next() {}

expected := fb.totalBytes()
if bytesIterated != expected {
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected)
}
}
}

func BenchmarkBatchSet(b *testing.B) {
value := make([]byte, 10)
for i := range value {
Expand Down
2 changes: 1 addition & 1 deletion cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func newPebbleDB(dir string) DB {
DisableWAL: disableWAL,
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0SlowdownWritesThreshold: 20,
L0StopWritesThreshold: 32,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: []pebble.LevelOptions{{
Expand Down
84 changes: 75 additions & 9 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package pebble

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"os"
"sort"
"sync/atomic"
"time"
"unsafe"

"github.com/petermattis/pebble/internal/base"
Expand Down Expand Up @@ -66,6 +68,8 @@ type compaction struct {

// flushing contains the flushables (aka memtables) that are being flushed.
flushing []flushable
// bytesIterated contains the number of bytes that have been flushed/compacted.
bytesIterated uint64
// inputs are the tables to be compacted.
inputs [2][]fileMetadata

Expand Down Expand Up @@ -461,7 +465,7 @@ func (c *compaction) newInputIter(
if len(c.flushing) != 0 {
if len(c.flushing) == 1 {
f := c.flushing[0]
iter := f.newIter(nil)
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
return newMergingIter(c.cmp, iter, rangeDelIter), nil
}
Expand All @@ -470,7 +474,7 @@ func (c *compaction) newInputIter(
iters := make([]internalIterator, 0, 2*len(c.flushing))
for i := range c.flushing {
f := c.flushing[i]
iters = append(iters, f.newIter(nil))
iters = append(iters, f.newFlushIter(nil, &c.bytesIterated))
rangeDelIter := f.newRangeDelIter(nil)
if rangeDelIter != nil {
iters = append(iters, rangeDelIter)
Expand Down Expand Up @@ -498,9 +502,9 @@ func (c *compaction) newInputIter(
// one which iterates over the range deletions. These two iterators are
// combined with a mergingIter.
newRangeDelIter := func(
f *fileMetadata, _ *IterOptions,
f *fileMetadata, _ *IterOptions, bytesIterated *uint64,
) (internalIterator, internalIterator, error) {
iter, rangeDelIter, err := newIters(f, nil /* iter options */)
iter, rangeDelIter, err := newIters(f, nil /* iter options */, &c.bytesIterated)
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand All @@ -524,12 +528,12 @@ func (c *compaction) newInputIter(
}

if c.startLevel != 0 {
iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[0]))
iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[0]))
iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[0], &c.bytesIterated))
iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[0], &c.bytesIterated))
} else {
for i := range c.inputs[0] {
f := &c.inputs[0][i]
iter, rangeDelIter, err := newIters(f, nil /* iter options */)
iter, rangeDelIter, err := newIters(f, nil /* iter options */, &c.bytesIterated)
if err != nil {
return nil, fmt.Errorf("pebble: could not open table %d: %v", f.fileNum, err)
}
Expand All @@ -540,8 +544,8 @@ func (c *compaction) newInputIter(
}
}

iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[1]))
iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[1]))
iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[1], &c.bytesIterated))
iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[1], &c.bytesIterated))
return newMergingIter(c.cmp, iters...), nil
}

Expand Down Expand Up @@ -990,7 +994,58 @@ func (d *DB) runCompaction(c *compaction) (
return nil
}

var prevBytesIterated uint64
var iterCount int
totalBytes := d.memTableTotalBytes()
refreshDirtyBytesThreshold := uint64(d.opts.MemTableSize * 5/100)

for key, val := iter.First(); key != nil; key, val = iter.Next() {
// Slow down memtable flushing to match fill rate.
if c.flushing != nil {
// Recalculate total memtable bytes only once every 1000 iterations or
// when the refresh threshold is hit since getting the total memtable
// byte count requires grabbing DB.mu which is expensive.
if iterCount >= 1000 || c.bytesIterated > refreshDirtyBytesThreshold {
totalBytes = d.memTableTotalBytes()
refreshDirtyBytesThreshold = c.bytesIterated + uint64(d.opts.MemTableSize * 5/100)
iterCount = 0
}
iterCount++

// dirtyBytes is the total number of bytes in the memtables minus the number of
// bytes flushed. It represents unflushed bytes in all the memtables, even the
// ones which aren't being flushed such as the mutable memtable.
dirtyBytes := totalBytes - c.bytesIterated
flushAmount := c.bytesIterated - prevBytesIterated
prevBytesIterated = c.bytesIterated

// We slow down memtable flushing when the dirty bytes indicator falls
// below the low watermark, which is 105% memtable size. This will only
// occur if memtable flushing can keep up with the pace of incoming
// writes. If writes come in faster than how fast the memtable can flush,
// flushing proceeds at maximum (unthrottled) speed.
if dirtyBytes <= uint64(d.opts.MemTableSize * 105/100) {
burst := d.flushLimiter.Burst()
for flushAmount > uint64(burst) {
err := d.flushLimiter.WaitN(context.Background(), burst)
if err != nil {
return nil, pendingOutputs, err
}
flushAmount -= uint64(burst)
}
err := d.flushLimiter.WaitN(context.Background(), int(flushAmount))
if err != nil {
return nil, pendingOutputs, err
}
} else {
burst := d.flushLimiter.Burst()
for flushAmount > uint64(burst) {
d.flushLimiter.AllowN(time.Now(), burst)
flushAmount -= uint64(burst)
}
d.flushLimiter.AllowN(time.Now(), int(flushAmount))
}
}
// TODO(peter,rangedel): Need to incorporate the range tombstones in the
// shouldStopBefore decision.
if tw != nil && (tw.EstimatedSize() >= c.maxOutputFileSize || c.shouldStopBefore(*key)) {
Expand Down Expand Up @@ -1033,6 +1088,17 @@ func (d *DB) runCompaction(c *compaction) (
return ve, pendingOutputs, nil
}

// memTableTotalBytes returns the total number of bytes in the memtables. Note
// that this includes the mutable memtable as well.
func (d *DB) memTableTotalBytes() (totalBytes uint64) {
d.mu.Lock()
for _, m := range d.mu.mem.queue {
totalBytes += m.totalBytes()
}
d.mu.Unlock()
return totalBytes
}

// scanObsoleteFiles scans the filesystem for files that are no longer needed
// and adds those to the internal lists of obsolete files. Note that he files
// are not actually deleted by this method. A subsequent call to
Expand Down
Loading

0 comments on commit 649af7e

Please sign in to comment.