diff --git a/batch.go b/batch.go index 2a66836816..570b452d79 100644 --- a/batch.go +++ b/batch.go @@ -892,8 +892,10 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch { index: uint32(index), } if keySize := uint32(len(key)); keySize == 0 { - entry.keyStart = uint32(offset) - entry.keyEnd = uint32(offset) + // Must add 2 to the offset. One byte encodes `kind` and the next + // byte encodes `0`, which is the length of the key. + entry.keyStart = uint32(offset)+2 + entry.keyEnd = entry.keyStart } else { entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) - uintptr(unsafe.Pointer(&b.data[0]))) @@ -949,6 +951,19 @@ func (b *flushableBatch) newIter(o *IterOptions) internalIterator { } } +func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { + return &flushFlushableBatchIter{ + flushableBatchIter: flushableBatchIter{ + batch: b, + data: b.data, + offsets: b.offsets, + cmp: b.cmp, + index: -1, + }, + bytesIterated: bytesFlushed, + } +} + func (b *flushableBatch) newRangeDelIter(o *IterOptions) internalIterator { if len(b.rangeDelOffsets) == 0 { return nil @@ -983,6 +998,10 @@ func (b *flushableBatch) newRangeDelIter(o *IterOptions) internalIterator { return rangedel.NewIter(b.cmp, b.tombstones) } +func (b *flushableBatch) totalBytes() uint64 { + return uint64(len(b.data) - batchHeaderLen) +} + func (b *flushableBatch) flushed() chan struct{} { return b.flushedCh } @@ -1076,6 +1095,8 @@ func (i *flushableBatchIter) Last() (*InternalKey, []byte) { return &i.key, i.Value() } +// Note: flushFlushableBatchIter.Next mirrors the implementation of +// flushableBatchIter.Next due to performance. Keep the two in sync. func (i *flushableBatchIter) Next() (*InternalKey, []byte) { if i.index == len(i.offsets) { return nil, nil @@ -1120,10 +1141,26 @@ func (i *flushableBatchIter) Key() *InternalKey { } func (i *flushableBatchIter) Value() []byte { - offset := i.offsets[i.index].offset - _, _, value, ok := batchDecode(i.batch.data, offset) - if !ok { + p := i.data[i.offsets[i.index].offset:] + if len(p) == 0 { + i.err = fmt.Errorf("corrupted batch") + return nil + } + kind := InternalKeyKind(p[0]) + if kind > InternalKeyKindMax { i.err = fmt.Errorf("corrupted batch") + return nil + } + var value []byte + var ok bool + switch kind { + case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete: + keyEnd := i.offsets[i.index].keyEnd + _, value, ok = batchDecodeStr(i.data[keyEnd:]) + if !ok { + i.err = fmt.Errorf("corrupted batch") + return nil + } } return value } @@ -1144,3 +1181,80 @@ func (i *flushableBatchIter) SetBounds(lower, upper []byte) { i.lower = lower i.upper = upper } + +// flushFlushableBatchIter is similar to flushableBatchIter but it keeps track +// of number of bytes iterated. +type flushFlushableBatchIter struct { + flushableBatchIter + bytesIterated *uint64 +} + +// flushFlushableBatchIter implements the internalIterator interface. +var _ internalIterator = (*flushFlushableBatchIter)(nil) + +func (i *flushFlushableBatchIter) SeekGE(key []byte) (*InternalKey, []byte) { + panic("pebble: SeekGE unimplemented") +} + +func (i *flushFlushableBatchIter) SeekPrefixGE(prefix, key []byte) (*InternalKey, []byte) { + panic("pebble: SeekPrefixGE unimplemented") +} + +func (i *flushFlushableBatchIter) SeekLT(key []byte) (*InternalKey, []byte) { + panic("pebble: SeekLT unimplemented") +} + +func (i *flushFlushableBatchIter) First() (*InternalKey, []byte) { + key, val := i.flushableBatchIter.First() + if key == nil { + return nil, nil + } + entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset + *i.bytesIterated += uint64(entryBytes) + i.valueSize() + return key, val +} + +// Note: flushFlushableBatchIter.Next mirrors the implementation of +// flushableBatchIter.Next due to performance. Keep the two in sync. +func (i *flushFlushableBatchIter) Next() (*InternalKey, []byte) { + if i.index == len(i.offsets) { + return nil, nil + } + i.index++ + if i.index == len(i.offsets) { + return nil, nil + } + i.key = i.getKey(i.index) + entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset + *i.bytesIterated += uint64(entryBytes) + i.valueSize() + return &i.key, i.Value() +} + +func (i flushFlushableBatchIter) Prev() (*InternalKey, []byte) { + panic("pebble: Prev unimplemented") +} + +func (i flushFlushableBatchIter) valueSize() uint64 { + p := i.data[i.offsets[i.index].offset:] + if len(p) == 0 { + i.err = fmt.Errorf("corrupted batch") + return 0 + } + kind := InternalKeyKind(p[0]) + if kind > InternalKeyKindMax { + i.err = fmt.Errorf("corrupted batch") + return 0 + } + var length uint64 + switch kind { + case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete: + keyEnd := i.offsets[i.index].keyEnd + v, n := binary.Uvarint(i.data[keyEnd:]) + if n <= 0 { + i.err = fmt.Errorf("corrupted batch") + return 0 + } + length = v + uint64(n) + } + return length +} diff --git a/batch_test.go b/batch_test.go index 2884740ec7..41d68e0cf0 100644 --- a/batch_test.go +++ b/batch_test.go @@ -412,6 +412,27 @@ func TestFlushableBatchDeleteRange(t *testing.T) { }) } +func TestFlushableBatchBytesIterated(t *testing.T) { + batch := newBatch(nil) + for j := 0; j < 1000; j++ { + key := make([]byte, 8 + j%3) + value := make([]byte, 7 + j%5) + batch.Set(key, value, nil) + + fb := newFlushableBatch(batch, DefaultComparer) + + var bytesIterated uint64 + it := fb.newFlushIter(nil, &bytesIterated) + + for it.First(); it.Valid(); it.Next() {} + + expected := fb.totalBytes() + if bytesIterated != expected { + t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) + } + } +} + func BenchmarkBatchSet(b *testing.B) { value := make([]byte, 10) for i := range value { diff --git a/cmd/pebble/db.go b/cmd/pebble/db.go index bc5f795eae..75153bcea2 100644 --- a/cmd/pebble/db.go +++ b/cmd/pebble/db.go @@ -54,8 +54,8 @@ func newPebbleDB(dir string) DB { DisableWAL: disableWAL, MemTableSize: 64 << 20, MemTableStopWritesThreshold: 4, + MinFlushRate: 4 << 20, L0CompactionThreshold: 2, - L0SlowdownWritesThreshold: 20, L0StopWritesThreshold: 32, LBaseMaxBytes: 64 << 20, // 64 MB Levels: []pebble.LevelOptions{{ diff --git a/compaction.go b/compaction.go index 3d93c57bd5..f0886aca50 100644 --- a/compaction.go +++ b/compaction.go @@ -6,12 +6,14 @@ package pebble import ( "bytes" + "context" "errors" "fmt" "math" "os" "sort" "sync/atomic" + "time" "unsafe" "github.com/petermattis/pebble/internal/base" @@ -66,6 +68,8 @@ type compaction struct { // flushing contains the flushables (aka memtables) that are being flushed. flushing []flushable + // bytesIterated contains the number of bytes that have been flushed/compacted. + bytesIterated uint64 // inputs are the tables to be compacted. inputs [2][]fileMetadata @@ -461,7 +465,7 @@ func (c *compaction) newInputIter( if len(c.flushing) != 0 { if len(c.flushing) == 1 { f := c.flushing[0] - iter := f.newIter(nil) + iter := f.newFlushIter(nil, &c.bytesIterated) if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil { return newMergingIter(c.cmp, iter, rangeDelIter), nil } @@ -470,7 +474,7 @@ func (c *compaction) newInputIter( iters := make([]internalIterator, 0, 2*len(c.flushing)) for i := range c.flushing { f := c.flushing[i] - iters = append(iters, f.newIter(nil)) + iters = append(iters, f.newFlushIter(nil, &c.bytesIterated)) rangeDelIter := f.newRangeDelIter(nil) if rangeDelIter != nil { iters = append(iters, rangeDelIter) @@ -498,9 +502,9 @@ func (c *compaction) newInputIter( // one which iterates over the range deletions. These two iterators are // combined with a mergingIter. newRangeDelIter := func( - f *fileMetadata, _ *IterOptions, + f *fileMetadata, _ *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) { - iter, rangeDelIter, err := newIters(f, nil /* iter options */) + iter, rangeDelIter, err := newIters(f, nil /* iter options */, &c.bytesIterated) if err == nil { // TODO(peter): It is mildly wasteful to open the point iterator only to // immediately close it. One way to solve this would be to add new @@ -524,12 +528,12 @@ func (c *compaction) newInputIter( } if c.startLevel != 0 { - iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[0])) - iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[0])) + iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[0], &c.bytesIterated)) + iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[0], &c.bytesIterated)) } else { for i := range c.inputs[0] { f := &c.inputs[0][i] - iter, rangeDelIter, err := newIters(f, nil /* iter options */) + iter, rangeDelIter, err := newIters(f, nil /* iter options */, &c.bytesIterated) if err != nil { return nil, fmt.Errorf("pebble: could not open table %d: %v", f.fileNum, err) } @@ -540,8 +544,8 @@ func (c *compaction) newInputIter( } } - iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[1])) - iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[1])) + iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[1], &c.bytesIterated)) + iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[1], &c.bytesIterated)) return newMergingIter(c.cmp, iters...), nil } @@ -990,7 +994,58 @@ func (d *DB) runCompaction(c *compaction) ( return nil } + var prevBytesIterated uint64 + var iterCount int + totalBytes := d.memTableTotalBytes() + refreshDirtyBytesThreshold := uint64(d.opts.MemTableSize * 5/100) + for key, val := iter.First(); key != nil; key, val = iter.Next() { + // Slow down memtable flushing to match fill rate. + if c.flushing != nil { + // Recalculate total memtable bytes only once every 1000 iterations or + // when the refresh threshold is hit since getting the total memtable + // byte count requires grabbing DB.mu which is expensive. + if iterCount >= 1000 || c.bytesIterated > refreshDirtyBytesThreshold { + totalBytes = d.memTableTotalBytes() + refreshDirtyBytesThreshold = c.bytesIterated + uint64(d.opts.MemTableSize * 5/100) + iterCount = 0 + } + iterCount++ + + // dirtyBytes is the total number of bytes in the memtables minus the number of + // bytes flushed. It represents unflushed bytes in all the memtables, even the + // ones which aren't being flushed such as the mutable memtable. + dirtyBytes := totalBytes - c.bytesIterated + flushAmount := c.bytesIterated - prevBytesIterated + prevBytesIterated = c.bytesIterated + + // We slow down memtable flushing when the dirty bytes indicator falls + // below the low watermark, which is 105% memtable size. This will only + // occur if memtable flushing can keep up with the pace of incoming + // writes. If writes come in faster than how fast the memtable can flush, + // flushing proceeds at maximum (unthrottled) speed. + if dirtyBytes <= uint64(d.opts.MemTableSize * 105/100) { + burst := d.flushLimiter.Burst() + for flushAmount > uint64(burst) { + err := d.flushLimiter.WaitN(context.Background(), burst) + if err != nil { + return nil, pendingOutputs, err + } + flushAmount -= uint64(burst) + } + err := d.flushLimiter.WaitN(context.Background(), int(flushAmount)) + if err != nil { + return nil, pendingOutputs, err + } + } else { + burst := d.flushLimiter.Burst() + for flushAmount > uint64(burst) { + d.flushLimiter.AllowN(time.Now(), burst) + flushAmount -= uint64(burst) + } + d.flushLimiter.AllowN(time.Now(), int(flushAmount)) + } + } // TODO(peter,rangedel): Need to incorporate the range tombstones in the // shouldStopBefore decision. if tw != nil && (tw.EstimatedSize() >= c.maxOutputFileSize || c.shouldStopBefore(*key)) { @@ -1033,6 +1088,17 @@ func (d *DB) runCompaction(c *compaction) ( return ve, pendingOutputs, nil } +// memTableTotalBytes returns the total number of bytes in the memtables. Note +// that this includes the mutable memtable as well. +func (d *DB) memTableTotalBytes() (totalBytes uint64) { + d.mu.Lock() + for _, m := range d.mu.mem.queue { + totalBytes += m.totalBytes() + } + d.mu.Unlock() + return totalBytes +} + // scanObsoleteFiles scans the filesystem for files that are no longer needed // and adds those to the internal lists of obsolete files. Note that he files // are not actually deleted by this method. A subsequent call to diff --git a/db.go b/db.go index 002ca8ef6f..2d59fdcdee 100644 --- a/db.go +++ b/db.go @@ -11,10 +11,10 @@ import ( "io" "sync" "sync/atomic" - "time" "github.com/petermattis/pebble/internal/arenaskl" "github.com/petermattis/pebble/internal/base" + "github.com/petermattis/pebble/internal/rate" "github.com/petermattis/pebble/internal/record" "github.com/petermattis/pebble/vfs" ) @@ -39,7 +39,9 @@ var ( type flushable interface { newIter(o *IterOptions) internalIterator + newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator newRangeDelIter(o *IterOptions) internalIterator + totalBytes() uint64 flushed() chan struct{} readyForFlush() bool logInfo() (num, size uint64) @@ -170,6 +172,8 @@ type DB struct { closed int32 // updated atomically + flushLimiter *rate.Limiter + // TODO(peter): describe exactly what this mutex protects. So far: every // field in the struct. mu struct { @@ -394,9 +398,6 @@ func (d *DB) commitApply(b *Batch, mem *memTable) error { func (d *DB) commitWrite(b *Batch, wg *sync.WaitGroup) (*memTable, error) { d.mu.Lock() - // Throttle writes if there are too many L0 tables. - d.throttleWrite() - if b.flushable != nil { b.flushable.seqNum = b.seqNum() } @@ -505,7 +506,7 @@ func (d *DB) newIterInternal( current := readState.current for i := len(current.files[0]) - 1; i >= 0; i-- { f := ¤t.files[0][i] - iter, rangeDelIter, err := d.newIters(f, &dbi.opts) + iter, rangeDelIter, err := d.newIters(f, &dbi.opts, nil) if err != nil { dbi.err = err return dbi @@ -543,7 +544,7 @@ func (d *DB) newIterInternal( li = &levelIter{} } - li.init(&dbi.opts, d.cmp, d.newIters, current.files[level]) + li.init(&dbi.opts, d.cmp, d.newIters, current.files[level], nil) li.initRangeDel(&rangeDelIters[0]) li.initLargestUserKey(&largestUserKeys[0]) iters = append(iters, li) @@ -792,22 +793,6 @@ func (d *DB) walPreallocateSize() int { return size } -func (d *DB) throttleWrite() { - if len(d.mu.versions.currentVersion().files[0]) <= d.opts.L0SlowdownWritesThreshold { - return - } - // fmt.Printf("L0 slowdown writes threshold\n") - // We are getting close to hitting a hard limit on the number of L0 - // files. Rather than delaying a single write by several seconds when we hit - // the hard limit, start delaying each individual write by 1ms to reduce - // latency variance. - // - // TODO(peter): Use more sophisticated rate limiting. - d.mu.Unlock() - time.Sleep(1 * time.Millisecond) - d.mu.Lock() -} - func (d *DB) makeRoomForWrite(b *Batch) error { force := b == nil || b.flushable != nil for { diff --git a/db_test.go b/db_test.go index 2ba3a4b0a9..2d17394f65 100644 --- a/db_test.go +++ b/db_test.go @@ -421,6 +421,7 @@ func TestLargeBatch(t *testing.T) { FS: vfs.NewMem(), MemTableSize: 1400, MemTableStopWritesThreshold: 100, + MinFlushRate: 4 << 20, }) if err != nil { t.Fatal(err) diff --git a/get_iter.go b/get_iter.go index c109b753d8..6a65eb5765 100644 --- a/get_iter.go +++ b/get_iter.go @@ -132,7 +132,10 @@ func (g *getIter) Next() (*InternalKey, []byte) { // Create iterators from L0 from newest to oldest. if n := len(g.l0); n > 0 { l := &g.l0[n-1] - g.iter, g.rangeDelIter, g.err = g.newIters(l, nil /* iter options */) + g.iter, g.rangeDelIter, g.err = g.newIters( + l, + nil /* iter options */, + nil /* bytes iterated */) if g.err != nil { return nil, nil } @@ -151,7 +154,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { continue } - g.levelIter.init(nil, g.cmp, g.newIters, g.version.files[g.level]) + g.levelIter.init(nil, g.cmp, g.newIters, g.version.files[g.level], nil) g.levelIter.initRangeDel(&g.rangeDelIter) g.level++ g.iter = &g.levelIter diff --git a/get_iter_test.go b/get_iter_test.go index 91c437dd3b..3b27374673 100644 --- a/get_iter_test.go +++ b/get_iter_test.go @@ -441,7 +441,7 @@ func TestGetIter(t *testing.T) { // m is a map from file numbers to DBs. m := map[uint64]*memTable{} newIter := func( - meta *fileMetadata, _ *IterOptions, + meta *fileMetadata, _ *IterOptions, _ *uint64, ) (internalIterator, internalIterator, error) { d, ok := m[meta.fileNum] if !ok { diff --git a/internal/arenaskl/arena.go b/internal/arenaskl/arena.go index 39bfdf4d31..49637a960a 100644 --- a/internal/arenaskl/arena.go +++ b/internal/arenaskl/arena.go @@ -63,11 +63,11 @@ func (a *Arena) Capacity() uint32 { return uint32(len(a.buf)) } -func (a *Arena) alloc(size, align uint32) (uint32, error) { +func (a *Arena) alloc(size, align uint32) (uint32, uint32, error) { // Verify that the arena isn't already full. origSize := atomic.LoadUint64(&a.n) if int(origSize) > len(a.buf) { - return 0, ErrArenaFull + return 0, 0, ErrArenaFull } // Pad the allocation with enough bytes to ensure the requested alignment. @@ -75,12 +75,12 @@ func (a *Arena) alloc(size, align uint32) (uint32, error) { newSize := atomic.AddUint64(&a.n, uint64(padded)) if int(newSize) > len(a.buf) { - return 0, ErrArenaFull + return 0, 0, ErrArenaFull } // Return the aligned offset. offset := (uint32(newSize) - padded + uint32(align)) & ^uint32(align) - return offset, nil + return offset, padded, nil } func (a *Arena) getBytes(offset uint32, size uint32) []byte { diff --git a/internal/arenaskl/arena_test.go b/internal/arenaskl/arena_test.go index 39ff3e1545..effab1b9e1 100644 --- a/internal/arenaskl/arena_test.go +++ b/internal/arenaskl/arena_test.go @@ -30,19 +30,19 @@ func TestArenaSizeOverflow(t *testing.T) { a := NewArena(math.MaxUint32, 0) // Allocating under the limit throws no error. - offset, err := a.alloc(math.MaxUint16, 0) + offset, _, err := a.alloc(math.MaxUint16, 0) require.Nil(t, err) require.Equal(t, uint32(1), offset) require.Equal(t, uint32(math.MaxUint16)+1, a.Size()) // Allocating over the limit could cause an accounting // overflow if 32-bit arithmetic was used. It shouldn't. - _, err = a.alloc(math.MaxUint32, 0) + _, _, err = a.alloc(math.MaxUint32, 0) require.Equal(t, ErrArenaFull, err) require.Equal(t, uint32(math.MaxUint32), a.Size()) // Continuing to allocate continues to throw an error. - _, err = a.alloc(math.MaxUint16, 0) + _, _, err = a.alloc(math.MaxUint16, 0) require.Equal(t, ErrArenaFull, err) require.Equal(t, uint32(math.MaxUint32), a.Size()) } diff --git a/internal/arenaskl/flush_iterator.go b/internal/arenaskl/flush_iterator.go new file mode 100644 index 0000000000..aec7343741 --- /dev/null +++ b/internal/arenaskl/flush_iterator.go @@ -0,0 +1,73 @@ +/* + * Copyright 2017 Dgraph Labs, Inc. and Contributors + * Modifications copyright (C) 2017 Andy Kimball and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package arenaskl + +import ( + "github.com/petermattis/pebble/internal/base" +) + +// flushIterator is an iterator over the skiplist object. Use Skiplist.NewFlushIter +// to construct an iterator. The current state of the iterator can be cloned by +// simply value copying the struct. +type flushIterator struct { + Iterator + bytesIterated *uint64 +} + +func (it *flushIterator) SeekGE(key []byte) (*base.InternalKey, []byte) { + panic("pebble: SeekGE unimplemented") +} + +func (it *flushIterator) SeekPrefixGE(prefix, key []byte) (*base.InternalKey, []byte) { + panic("pebble: SeekPrefixGE unimplemented") +} + +func (it *flushIterator) SeekLT(key []byte) (*base.InternalKey, []byte) { + panic("pebble: SeekLT unimplemented") +} + +// First seeks position at the first entry in list. Returns the key and value +// if the iterator is pointing at a valid entry, and (nil, nil) otherwise. Note +// that First only checks the upper bound. It is up to the caller to ensure +// that key is greater than or equal to the lower bound. +func (it *flushIterator) First() (*base.InternalKey, []byte) { + key, val := it.Iterator.First() + if key == nil { + return nil, nil + } + *it.bytesIterated += uint64(it.nd.allocSize) + return key, val +} + +// Next advances to the next position. Returns the key and value if the +// iterator is pointing at a valid entry, and (nil, nil) otherwise. +// Note: flushIterator.Next mirrors the implementation of Iterator.Next +// due to performance. Keep the two in sync. +func (it *flushIterator) Next() (*base.InternalKey, []byte) { + it.nd = it.list.getNext(it.nd, 0) + if it.nd == it.list.tail { + return nil, nil + } + it.decodeKey() + *it.bytesIterated += uint64(it.nd.allocSize) + return &it.key, it.Value() +} + +func (it *flushIterator) Prev() (*base.InternalKey, []byte) { + panic("pebble: Prev unimplemented") +} diff --git a/internal/arenaskl/iterator.go b/internal/arenaskl/iterator.go index 589176a52e..c3dc73ebd0 100644 --- a/internal/arenaskl/iterator.go +++ b/internal/arenaskl/iterator.go @@ -143,6 +143,8 @@ func (it *Iterator) Last() (*base.InternalKey, []byte) { // Next advances to the next position. Returns the key and value if the // iterator is pointing at a valid entry, and (nil, nil) otherwise. +// Note: flushIterator.Next mirrors the implementation of Iterator.Next +// due to performance. Keep the two in sync. func (it *Iterator) Next() (*base.InternalKey, []byte) { it.nd = it.list.getNext(it.nd, 0) if it.nd == it.list.tail { diff --git a/internal/arenaskl/node.go b/internal/arenaskl/node.go index 06020b75f1..b20f5bb154 100644 --- a/internal/arenaskl/node.go +++ b/internal/arenaskl/node.go @@ -47,6 +47,7 @@ type node struct { // If valueSize is negative, the value is stored separately from the node in // arena.extValues. valueSize int32 + allocSize uint32 // Most nodes do not need to use the full height of the tower, since the // probability of each successive level decreases exponentially. Because @@ -90,7 +91,7 @@ func newRawNode(arena *Arena, height uint32, keySize, valueSize uint32) (nd *nod nodeSize := uint32(maxNodeSize - unusedSize) valueIndex := int32(valueSize) - nodeOffset, err := arena.alloc(nodeSize+keySize+valueSize, align4) + nodeOffset, allocSize, err := arena.alloc(nodeSize+keySize+valueSize, align4) if err != nil { return } @@ -99,6 +100,7 @@ func newRawNode(arena *Arena, height uint32, keySize, valueSize uint32) (nd *nod nd.keyOffset = nodeOffset + nodeSize nd.keySize = uint32(keySize) nd.valueSize = valueIndex + nd.allocSize = allocSize return } diff --git a/internal/arenaskl/skl.go b/internal/arenaskl/skl.go index 086adaa6f2..2138a1f9ce 100644 --- a/internal/arenaskl/skl.go +++ b/internal/arenaskl/skl.go @@ -313,6 +313,16 @@ func (s *Skiplist) NewIter(lower, upper []byte) *Iterator { return it } +// NewFlushIter returns a new flushIterator, which is similar to an Iterator +// but also sets the current number of the bytes that have been iterated +// through. +func (s *Skiplist) NewFlushIter(bytesFlushed *uint64) *flushIterator { + return &flushIterator{ + Iterator: Iterator{list: s, nd: s.head}, + bytesIterated: bytesFlushed, + } +} + func (s *Skiplist) newNode( key base.InternalKey, value []byte, ) (nd *node, height uint32, err error) { diff --git a/internal/arenaskl/skl_test.go b/internal/arenaskl/skl_test.go index 4c7960d828..1c464179b1 100644 --- a/internal/arenaskl/skl_test.go +++ b/internal/arenaskl/skl_test.go @@ -688,6 +688,29 @@ func TestIteratorBounds(t *testing.T) { require.False(t, it.Prev()) } +func TestBytesIterated(t *testing.T) { + l := NewSkiplist(NewArena(arenaSize, 0), bytes.Compare) + emptySize := l.arena.Size() + for i := 0; i < 200; i++ { + bytesIterated := l.bytesIterated() + expected := uint64(l.arena.Size() - emptySize) + if bytesIterated != expected { + t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) + } + l.Add(base.InternalKey{UserKey: []byte{byte(i)}}, nil) + } +} + +// bytesIterated returns the number of bytes iterated in the skiplist. +func (s *Skiplist) bytesIterated() (bytesIterated uint64) { + x := s.NewFlushIter(&bytesIterated) + for x.First(); x.Valid(); x.Next() {} + if x.Close() != nil { + return 0 + } + return bytesIterated +} + func randomKey(rng *rand.Rand, b []byte) base.InternalKey { key := rng.Uint32() key2 := rng.Uint32() diff --git a/internal/base/options.go b/internal/base/options.go index 50329df1f1..cdf489f902 100644 --- a/internal/base/options.go +++ b/internal/base/options.go @@ -247,10 +247,6 @@ type Options struct { // The number of files necessary to trigger an L0 compaction. L0CompactionThreshold int - // Soft limit on the number of L0 files. Writes are slowed down when this - // threshold is reached. - L0SlowdownWritesThreshold int - // Hard limit on the number of L0 files. Writes are stopped when this // threshold is reached. L0StopWritesThreshold int @@ -300,6 +296,10 @@ type Options struct { // The default merger concatenates values. Merger *Merger + // MinFlushRate sets the minimum rate at which the MemTables are flushed. The + // default is 4 MB/s. + MinFlushRate int + // TableFormat specifies the format version for sstables. The default is // TableFormatRocksDBv2 which creates RocksDB compatible sstables. Use // TableFormatLevelDB to create LevelDB compatible sstable which can be used @@ -336,9 +336,6 @@ func (o *Options) EnsureDefaults() *Options { if o.L0CompactionThreshold <= 0 { o.L0CompactionThreshold = 4 } - if o.L0SlowdownWritesThreshold <= 0 { - o.L0SlowdownWritesThreshold = 8 - } if o.L0StopWritesThreshold <= 0 { o.L0StopWritesThreshold = 12 } @@ -380,6 +377,9 @@ func (o *Options) EnsureDefaults() *Options { if o.Merger == nil { o.Merger = DefaultMerger } + if o.MinFlushRate == 0 { + o.MinFlushRate = 4 << 20 // 4 MB/s + } if o.FS == nil { o.FS = vfs.Default } @@ -411,13 +411,13 @@ func (o *Options) String() string { fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name) fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL) fmt.Fprintf(&buf, " l0_compaction_threshold=%d\n", o.L0CompactionThreshold) - fmt.Fprintf(&buf, " l0_slowdown_writes_threshold=%d\n", o.L0SlowdownWritesThreshold) fmt.Fprintf(&buf, " l0_stop_writes_threshold=%d\n", o.L0StopWritesThreshold) fmt.Fprintf(&buf, " lbase_max_bytes=%d\n", o.LBaseMaxBytes) fmt.Fprintf(&buf, " max_manifest_file_size=%d\n", o.MaxManifestFileSize) fmt.Fprintf(&buf, " max_open_files=%d\n", o.MaxOpenFiles) fmt.Fprintf(&buf, " mem_table_size=%d\n", o.MemTableSize) fmt.Fprintf(&buf, " mem_table_stop_writes_threshold=%d\n", o.MemTableStopWritesThreshold) + fmt.Fprintf(&buf, " min_flush_rate=%d\n", o.MinFlushRate) fmt.Fprintf(&buf, " merger=%s\n", o.Merger.Name) fmt.Fprintf(&buf, " table_property_collectors=[") for i := range o.TablePropertyCollectors { diff --git a/internal/base/options_test.go b/internal/base/options_test.go index 0f8b19b16d..bab7b64931 100644 --- a/internal/base/options_test.go +++ b/internal/base/options_test.go @@ -45,13 +45,13 @@ func TestOptionsString(t *testing.T) { comparer=leveldb.BytewiseComparator disable_wal=false l0_compaction_threshold=4 - l0_slowdown_writes_threshold=8 l0_stop_writes_threshold=12 lbase_max_bytes=67108864 max_manifest_file_size=134217728 max_open_files=1000 mem_table_size=4194304 mem_table_stop_writes_threshold=2 + min_flush_rate=4194304 merger=pebble.concatenate table_property_collectors=[] wal_dir= diff --git a/level_iter.go b/level_iter.go index cd0ae4ec7d..0b66e64f70 100644 --- a/level_iter.go +++ b/level_iter.go @@ -9,9 +9,10 @@ import ( ) // tableNewIters creates a new point and range-del iterator for the given file -// number. +// number. If bytesIterated is specified, it is incremented as the given file is +// iterated through. type tableNewIters func( - meta *fileMetadata, opts *IterOptions, + meta *fileMetadata, opts *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) // levelIter provides a merged view of the sstables in a level. @@ -53,21 +54,31 @@ type levelIter struct { // - `boundary` can hold either the lower- or upper-bound, depending on the iterator direction. // - `boundary` is not exposed to the next higher-level iterator, i.e., `mergingIter`. largestUserKey *[]byte + // bytesIterated keeps track of the number of bytes iterated during compaction. + bytesIterated *uint64 } // levelIter implements the internalIterator interface. var _ internalIterator = (*levelIter)(nil) func newLevelIter( - opts *IterOptions, cmp Compare, newIters tableNewIters, files []fileMetadata, + opts *IterOptions, + cmp Compare, + newIters tableNewIters, + files []fileMetadata, + bytesIterated *uint64, ) *levelIter { l := &levelIter{} - l.init(opts, cmp, newIters, files) + l.init(opts, cmp, newIters, files, bytesIterated) return l } func (l *levelIter) init( - opts *IterOptions, cmp Compare, newIters tableNewIters, files []fileMetadata, + opts *IterOptions, + cmp Compare, + newIters tableNewIters, + files []fileMetadata, + bytesIterated *uint64, ) { l.opts = opts if l.opts != nil { @@ -77,6 +88,7 @@ func (l *levelIter) init( l.index = -1 l.newIters = newIters l.files = files + l.bytesIterated = bytesIterated } func (l *levelIter) initRangeDel(rangeDelIter *internalIterator) { @@ -171,7 +183,7 @@ func (l *levelIter) loadFile(index, dir int) bool { } var rangeDelIter internalIterator - l.iter, rangeDelIter, l.err = l.newIters(f, &l.tableOpts) + l.iter, rangeDelIter, l.err = l.newIters(f, &l.tableOpts, l.bytesIterated) if l.err != nil || l.iter == nil { return false } diff --git a/level_iter_test.go b/level_iter_test.go index a3eac0f280..f9a948daf3 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -25,7 +25,7 @@ func TestLevelIter(t *testing.T) { var files []fileMetadata newIters := func( - meta *fileMetadata, opts *IterOptions, + meta *fileMetadata, opts *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) { f := *iters[meta.fileNum] return &f, nil, nil @@ -72,7 +72,7 @@ func TestLevelIter(t *testing.T) { } } - iter := newLevelIter(&opts, DefaultComparer.Compare, newIters, files) + iter := newLevelIter(&opts, DefaultComparer.Compare, newIters, files, nil) defer iter.Close() return runInternalIterCmd(d, iter) @@ -103,13 +103,13 @@ func TestLevelIter(t *testing.T) { var tableOpts *IterOptions newIters2 := func( - meta *fileMetadata, opts *IterOptions, + meta *fileMetadata, opts *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) { tableOpts = opts - return newIters(meta, opts) + return newIters(meta, opts, nil) } - iter := newLevelIter(&opts, DefaultComparer.Compare, newIters2, files) + iter := newLevelIter(&opts, DefaultComparer.Compare, newIters2, files, nil) iter.SeekGE([]byte(key)) lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound() return fmt.Sprintf("[%s,%s]\n", lower, upper) @@ -127,7 +127,7 @@ func TestLevelIterBoundaries(t *testing.T) { var files []fileMetadata newIters := func( - meta *fileMetadata, _ *IterOptions, + meta *fileMetadata, _ *IterOptions, _ *uint64, ) (internalIterator, internalIterator, error) { return readers[meta.fileNum].NewIter(nil /* lower */, nil /* upper */), nil, nil } @@ -201,7 +201,7 @@ func TestLevelIterBoundaries(t *testing.T) { return buf.String() case "iter": - iter := newLevelIter(nil, DefaultComparer.Compare, newIters, files) + iter := newLevelIter(nil, DefaultComparer.Compare, newIters, files, nil) defer iter.Close() // Fake up the range deletion initialization. iter.initRangeDel(new(internalIterator)) @@ -286,11 +286,11 @@ func BenchmarkLevelIterSeekGE(b *testing.B) { func(b *testing.B) { readers, files, keys := buildLevelIterTables(b, blockSize, restartInterval, count) newIters := func( - meta *fileMetadata, _ *IterOptions, + meta *fileMetadata, _ *IterOptions, _ *uint64, ) (internalIterator, internalIterator, error) { return readers[meta.fileNum].NewIter(nil /* lower */, nil /* upper */), nil, nil } - l := newLevelIter(nil, DefaultComparer.Compare, newIters, files) + l := newLevelIter(nil, DefaultComparer.Compare, newIters, files, nil) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -314,11 +314,11 @@ func BenchmarkLevelIterNext(b *testing.B) { func(b *testing.B) { readers, files, _ := buildLevelIterTables(b, blockSize, restartInterval, count) newIters := func( - meta *fileMetadata, _ *IterOptions, + meta *fileMetadata, _ *IterOptions, _ *uint64, ) (internalIterator, internalIterator, error) { return readers[meta.fileNum].NewIter(nil /* lower */, nil /* upper */), nil, nil } - l := newLevelIter(nil, DefaultComparer.Compare, newIters, files) + l := newLevelIter(nil, DefaultComparer.Compare, newIters, files, nil) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -344,11 +344,11 @@ func BenchmarkLevelIterPrev(b *testing.B) { func(b *testing.B) { readers, files, _ := buildLevelIterTables(b, blockSize, restartInterval, count) newIters := func( - meta *fileMetadata, _ *IterOptions, + meta *fileMetadata, _ *IterOptions, _ *uint64, ) (internalIterator, internalIterator, error) { return readers[meta.fileNum].NewIter(nil /* lower */, nil /* upper */), nil, nil } - l := newLevelIter(nil, DefaultComparer.Compare, newIters, files) + l := newLevelIter(nil, DefaultComparer.Compare, newIters, files, nil) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/mem_table.go b/mem_table.go index 0ba7458b8f..b4ce7d2580 100644 --- a/mem_table.go +++ b/mem_table.go @@ -179,6 +179,10 @@ func (m *memTable) newIter(o *IterOptions) internalIterator { return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound()) } +func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { + return m.skl.NewFlushIter(bytesFlushed) +} + func (m *memTable) newRangeDelIter(*IterOptions) internalIterator { tombstones := m.tombstones.get(m) if tombstones == nil { @@ -187,6 +191,10 @@ func (m *memTable) newRangeDelIter(*IterOptions) internalIterator { return rangedel.NewIter(m.cmp, tombstones) } +func (m *memTable) totalBytes() uint64 { + return uint64(m.skl.Size() - m.emptySize) +} + func (m *memTable) close() error { return nil } diff --git a/mem_table_test.go b/mem_table_test.go index 3ad52a7c2b..0e6a6939f9 100644 --- a/mem_table_test.go +++ b/mem_table_test.go @@ -46,6 +46,16 @@ func (m *memTable) count() (n int) { return n } +// bytesIterated returns the number of bytes iterated in a DB. +func (m *memTable) bytesIterated() (bytesIterated uint64) { + x := internalIterAdapter{m.newFlushIter(nil, &bytesIterated)} + for valid := x.First(); valid; valid = x.Next() {} + if x.Close() != nil { + return 0 + } + return bytesIterated +} + func ikey(s string) InternalKey { return base.MakeInternalKey([]byte(s), 0, InternalKeyKindSet) } @@ -115,6 +125,21 @@ func TestMemTableCount(t *testing.T) { } } +func TestMemTableBytesIterated(t *testing.T) { + m := newMemTable(nil) + for i := 0; i < 200; i++ { + bytesIterated := m.bytesIterated() + expected := m.totalBytes() + if bytesIterated != expected { + t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) + } + m.set(InternalKey{UserKey: []byte{byte(i)}}, nil) + } + if err := m.close(); err != nil { + t.Fatal(err) + } +} + func TestMemTableEmpty(t *testing.T) { m := newMemTable(nil) if !m.empty() { diff --git a/open.go b/open.go index 4de07c4a9c..6b11e72e40 100644 --- a/open.go +++ b/open.go @@ -14,6 +14,7 @@ import ( "sort" "github.com/petermattis/pebble/internal/arenaskl" + "github.com/petermattis/pebble/internal/rate" "github.com/petermattis/pebble/internal/record" "github.com/petermattis/pebble/vfs" ) @@ -81,6 +82,7 @@ func Open(dirname string, opts *Options) (*DB, error) { apply: d.commitApply, write: d.commitWrite, }) + d.flushLimiter = rate.NewLimiter(rate.Limit(d.opts.MinFlushRate), d.opts.MinFlushRate) d.mu.nextJobID = 1 d.mu.mem.cond.L = &d.mu.Mutex d.mu.mem.mutable = newMemTable(d.opts) diff --git a/sstable/reader.go b/sstable/reader.go index 6696151703..052896f4af 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -61,6 +61,7 @@ type Iterator struct { reader *Reader index blockIter data blockIter + dataBH blockHandle err error closeHook func() error } @@ -133,12 +134,13 @@ func (i *Iterator) loadBlock() bool { } // Load the next block. v := i.index.Value() - h, n := decodeBlockHandle(v) + var n int + i.dataBH, n = decodeBlockHandle(v) if n == 0 || n != len(v) { i.err = errors.New("pebble/table: corrupt index entry") return false } - block, _, err := i.reader.readBlock(h, nil /* transform */) + block, _, err := i.reader.readBlock(i.dataBH, nil /* transform */) if err != nil { i.err = err return false @@ -341,6 +343,8 @@ func (i *Iterator) Last() (*InternalKey, []byte) { // Next implements internalIterator.Next, as documented in the pebble // package. +// Note: compactionIterator.Next mirrors the implementation of Iterator.Next +// due to performance. Keep the two in sync. func (i *Iterator) Next() (*InternalKey, []byte) { if i.err != nil { return nil, nil @@ -467,6 +471,100 @@ func (i *Iterator) SetBounds(lower, upper []byte) { i.upper = upper } +// compactionIterator is similar to Iterator but it increments the number of +// bytes that have been iterated through. +type compactionIterator struct { + *Iterator + bytesIterated *uint64 + prevOffset uint64 +} + +func (i *compactionIterator) SeekGE(key []byte) (*InternalKey, []byte) { + panic("pebble: SeekGE unimplemented") +} + +func (i *compactionIterator) SeekPrefixGE(prefix, key []byte) (*InternalKey, []byte) { + panic("pebble: SeekPrefixGE unimplemented") +} + +func (i *compactionIterator) SeekLT(key []byte) (*InternalKey, []byte) { + panic("pebble: SeekLT unimplemented") +} + +func (i *compactionIterator) First() (*InternalKey, []byte) { + key, val := i.Iterator.First() + if key == nil { + // An empty sstable will still encode the block trailer and restart points, so bytes + // iterated must be incremented. + + // We must use i.dataBH.length instead of (4*(i.data.numRestarts+1)) to calculate the + // number of bytes for the restart points, since i.dataBH.length accounts for + // compression. When uncompressed, i.dataBH.length == (4*(i.data.numRestarts+1)) + *i.bytesIterated += blockTrailerLen + i.dataBH.length + return nil, nil + } + // If the sstable only has 1 entry, we are at the last entry in the block and we must + // increment bytes iterated by the size of the block trailer and restart points. + if i.data.nextOffset + (4*(i.data.numRestarts+1)) == len(i.data.data) { + i.prevOffset = blockTrailerLen + i.dataBH.length + } else { + // i.dataBH.length/len(i.data.data) is the compression ratio. If uncompressed, this is 1. + // i.data.nextOffset is the uncompressed size of the first record. + i.prevOffset = (uint64(i.data.nextOffset) * i.dataBH.length) / uint64(len(i.data.data)) + } + *i.bytesIterated += i.prevOffset + return key, val +} + +func (i *compactionIterator) Last() (*InternalKey, []byte) { + panic("pebble: Last unimplemented") +} + +// Note: compactionIterator.Next mirrors the implementation of Iterator.Next +// due to performance. Keep the two in sync. +func (i *compactionIterator) Next() (*InternalKey, []byte) { + if i.err != nil { + return nil, nil + } + key, val := i.data.Next() + if key == nil { + for { + if i.data.err != nil { + i.err = i.data.err + return nil, nil + } + if key, _ := i.index.Next(); key == nil { + return nil, nil + } + if i.loadBlock() { + key, val = i.data.First() + if key == nil { + return nil, nil + } + break + } + } + } + + // i.dataBH.length/len(i.data.data) is the compression ratio. If uncompressed, this is 1. + // i.data.nextOffset is the uncompressed position of the current record in the block. + // i.dataBH.offset is the offset of the block in the sstable before decompression. + recordOffset := (uint64(i.data.nextOffset) * i.dataBH.length) / uint64(len(i.data.data)) + curOffset := i.dataBH.offset + recordOffset + // Last entry in the block must increment bytes iterated by the size of the block trailer + // and restart points. + if i.data.nextOffset + (4*(i.data.numRestarts+1)) == len(i.data.data) { + curOffset += blockTrailerLen + uint64(4*(i.data.numRestarts+1)) + } + *i.bytesIterated += uint64(curOffset - i.prevOffset) + i.prevOffset = curOffset + return key, val +} + +func (i *compactionIterator) Prev() (*InternalKey, []byte) { + panic("pebble: Prev unimplemented") +} + type weakCachedBlock struct { bh blockHandle mu sync.RWMutex @@ -562,6 +660,17 @@ func (r *Reader) NewIter(lower, upper []byte) *Iterator { return i } +// NewCompactionIter returns an internal iterator similar to NewIter but it also increments +// the number of bytes iterated. +func (r *Reader) NewCompactionIter(bytesIterated *uint64) *compactionIterator { + i := iterPool.Get().(*Iterator) + _ = i.Init(r, nil /* lower */, nil /* upper */) + return &compactionIterator{ + Iterator: i, + bytesIterated: bytesIterated, + } +} + // NewRangeDelIter returns an internal iterator for the contents of the // range-del block for the table. Returns nil if the table does not contain any // range deletions. diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 966330e347..5b52880e06 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -257,6 +257,76 @@ func checkValidPrefix(prefix, key []byte) bool { return prefix == nil || bytes.HasPrefix(key, prefix) } +func TestBytesIteratedCompressed(t *testing.T) { + for _, blockSize := range []int{10, 100, 1000, 4096} { + for _, numEntries := range []uint64{0, 1, 1e5} { + r := buildTestTable(t, numEntries, blockSize, SnappyCompression) + var bytesIterated uint64 + citer := r.NewCompactionIter(&bytesIterated) + for citer.First(); citer.Valid(); citer.Next() {} + + expected := r.Properties.DataSize + // There is some inaccuracy due to compression estimation. + if bytesIterated < expected * 99/100 || bytesIterated > expected * 101/100 { + t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) + } + } + } +} + +func TestBytesIteratedUncompressed(t *testing.T) { + for _, blockSize := range []int{10, 100, 1000, 4096} { + for _, numEntries := range []uint64{0, 1, 1e5} { + r := buildTestTable(t, numEntries, blockSize, NoCompression) + var bytesIterated uint64 + citer := r.NewCompactionIter(&bytesIterated) + for citer.First(); citer.Valid(); citer.Next() {} + + expected := r.Properties.DataSize + if bytesIterated != expected { + t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) + } + } + } +} + +func buildTestTable(t *testing.T, numEntries uint64, blockSize int, compression Compression) *Reader { + mem := vfs.NewMem() + f0, err := mem.Create("test") + if err != nil { + t.Fatal(err) + } + defer f0.Close() + + w := NewWriter(f0, nil, TableOptions{ + BlockSize: blockSize, + Compression: compression, + FilterPolicy: nil, + }) + + var ikey InternalKey + for i := uint64(0); i < numEntries; i++ { + key := make([]byte, 8 + i%3) + value := make([]byte, 7 + i%5) + binary.BigEndian.PutUint64(key, i) + ikey.UserKey = key + w.Add(ikey, value) + } + + if err := w.Close(); err != nil { + t.Fatal(err) + } + + // Re-open that filename for reading. + f1, err := mem.Open("test") + if err != nil { + t.Fatal(err) + } + return NewReader(f1, 0, &Options{ + Cache: cache.New(128 << 20), + }) +} + func buildBenchmarkTable(b *testing.B, blockSize, restartInterval int) (*Reader, [][]byte) { mem := vfs.NewMem() f0, err := mem.Create("bench") diff --git a/table_cache.go b/table_cache.go index e72bc3ee70..ea82a8233c 100644 --- a/table_cache.go +++ b/table_cache.go @@ -37,9 +37,9 @@ func (c *tableCache) getShard(fileNum uint64) *tableCacheShard { } func (c *tableCache) newIters( - meta *fileMetadata, opts *IterOptions, + meta *fileMetadata, opts *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) { - return c.getShard(meta.fileNum).newIters(meta, opts) + return c.getShard(meta.fileNum).newIters(meta, opts, bytesIterated) } func (c *tableCache) evict(fileNum uint64) { @@ -98,7 +98,7 @@ func (c *tableCacheShard) init(dirname string, fs vfs.FS, opts *Options, size, h } func (c *tableCacheShard) newIters( - meta *fileMetadata, opts *IterOptions, + meta *fileMetadata, opts *IterOptions, bytesIterated *uint64, ) (internalIterator, internalIterator, error) { // Calling findNode gives us the responsibility of decrementing n's // refCount. If opening the underlying table resulted in error, then we @@ -118,25 +118,50 @@ func (c *tableCacheShard) newIters( // using a singleton is fine. return emptyIter, nil, nil } + var iter internalIterator + if bytesIterated != nil { + tableCompactionIter := n.reader.NewCompactionIter(bytesIterated) + atomic.AddInt32(&c.iterCount, 1) + if raceEnabled { + c.mu.Lock() + c.mu.iters[tableCompactionIter.Iterator] = debug.Stack() + c.mu.Unlock() + } - iter := n.reader.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) - atomic.AddInt32(&c.iterCount, 1) - if raceEnabled { - c.mu.Lock() - c.mu.iters[iter] = debug.Stack() - c.mu.Unlock() - } + tableCompactionIter.SetCloseHook(func() error { + if raceEnabled { + c.mu.Lock() + delete(c.mu.iters, tableCompactionIter.Iterator) + c.mu.Unlock() + } + c.unrefNode(n) + atomic.AddInt32(&c.iterCount, -1) + return nil + }) - iter.SetCloseHook(func() error { + iter = tableCompactionIter + } else { + tableIter := n.reader.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) + atomic.AddInt32(&c.iterCount, 1) if raceEnabled { c.mu.Lock() - delete(c.mu.iters, iter) + c.mu.iters[tableIter] = debug.Stack() c.mu.Unlock() } - c.unrefNode(n) - atomic.AddInt32(&c.iterCount, -1) - return nil - }) + + tableIter.SetCloseHook(func() error { + if raceEnabled { + c.mu.Lock() + delete(c.mu.iters, tableIter) + c.mu.Unlock() + } + c.unrefNode(n) + atomic.AddInt32(&c.iterCount, -1) + return nil + }) + + iter = tableIter + } // NB: range-del iterator does not maintain a reference to the table, nor // does it need to read from it after creation. diff --git a/table_cache_test.go b/table_cache_test.go index 9c31990e51..aea36d9da4 100644 --- a/table_cache_test.go +++ b/table_cache_test.go @@ -170,7 +170,10 @@ func testTableCacheRandomAccess(t *testing.T, concurrent bool) { rngMu.Lock() fileNum, sleepTime := rng.Intn(tableCacheTestNumTables), rng.Intn(1000) rngMu.Unlock() - iter, _, err := c.newIters(&fileMetadata{fileNum: uint64(fileNum)}, nil /* iter options */) + iter, _, err := c.newIters( + &fileMetadata{fileNum: uint64(fileNum)}, + nil /* iter options */, + nil /* bytes iterated */) if err != nil { errc <- fmt.Errorf("i=%d, fileNum=%d: find: %v", i, fileNum, err) return @@ -229,7 +232,10 @@ func TestTableCacheFrequentlyUsed(t *testing.T) { for i := 0; i < N; i++ { for _, j := range [...]int{pinned0, i % tableCacheTestNumTables, pinned1} { - iter, _, err := c.newIters(&fileMetadata{fileNum: uint64(j)}, nil /* iter options */) + iter, _, err := c.newIters( + &fileMetadata{fileNum: uint64(j)}, + nil /* iter options */, + nil /* bytes iterated */) if err != nil { t.Fatalf("i=%d, j=%d: find: %v", i, j, err) } @@ -264,7 +270,10 @@ func TestTableCacheEvictions(t *testing.T) { rng := rand.New(rand.NewSource(2)) for i := 0; i < N; i++ { j := rng.Intn(tableCacheTestNumTables) - iter, _, err := c.newIters(&fileMetadata{fileNum: uint64(j)}, nil /* iter options */) + iter, _, err := c.newIters( + &fileMetadata{fileNum: uint64(j)}, + nil /* iter options */, + nil /* bytes iterated */) if err != nil { t.Fatalf("i=%d, j=%d: find: %v", i, j, err) } @@ -304,7 +313,10 @@ func TestTableCacheIterLeak(t *testing.T) { if err != nil { t.Fatal(err) } - if _, _, err := c.newIters(&fileMetadata{fileNum: 0}, nil /* iter options */); err != nil { + if _, _, err := c.newIters( + &fileMetadata{fileNum: 0}, + nil /* iter options */, + nil /* bytes iterated */); err != nil { t.Fatal(err) } if err := c.Close(); err == nil {