Skip to content

Commit

Permalink
*: remove bytesIterated
Browse files Browse the repository at this point in the history
Remove the bytesIterated used in flushes and compactions to incrementally
update compaction.bytesIterated. It was a vestige of an attempt at pacing
flushes and compactions to reduce impact on foreground traffic. That pacing
logic had been inactive for years and has been removed from the code base for a
while.

The only remaining use was for the flush throughput metric, but this metric did
not require an incrementally-updated value, only an aggregate after the flush
has completed.
  • Loading branch information
jbowens committed Apr 13, 2024
1 parent e53a25d commit c34894c
Show file tree
Hide file tree
Showing 18 changed files with 33 additions and 281 deletions.
42 changes: 3 additions & 39 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
}

// newFlushIter is part of the flushable interface.
func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator {
return &flushFlushableBatchIter{
flushableBatchIter: flushableBatchIter{
batch: b,
Expand All @@ -2065,7 +2065,6 @@ func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) inte
cmp: b.cmp,
index: -1,
},
bytesIterated: bytesFlushed,
}
}

Expand Down Expand Up @@ -2327,7 +2326,6 @@ func (i *flushableBatchIter) SetContext(_ context.Context) {}
// of number of bytes iterated.
type flushFlushableBatchIter struct {
flushableBatchIter
bytesIterated *uint64
}

// flushFlushableBatchIter implements the base.InternalIterator interface.
Expand All @@ -2353,13 +2351,7 @@ func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *ba

func (i *flushFlushableBatchIter) First() *base.InternalKV {
i.err = nil // clear cached iteration error
kv := i.flushableBatchIter.First()
if kv == nil {
return nil
}
entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
*i.bytesIterated += uint64(entryBytes) + i.valueSize()
return kv
return i.flushableBatchIter.First()
}

func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
Expand All @@ -2376,41 +2368,13 @@ func (i *flushFlushableBatchIter) Next() *base.InternalKV {
if i.index == len(i.offsets) {
return nil
}
kv := i.getKV(i.index)
entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
*i.bytesIterated += uint64(entryBytes) + i.valueSize()
return kv
return i.getKV(i.index)
}

func (i flushFlushableBatchIter) Prev() *base.InternalKV {
panic("pebble: Prev unimplemented")
}

func (i flushFlushableBatchIter) valueSize() uint64 {
p := i.data[i.offsets[i.index].offset:]
if len(p) == 0 {
i.err = base.CorruptionErrorf("corrupted batch")
return 0
}
kind := InternalKeyKind(p[0])
if kind > InternalKeyKindMax {
i.err = base.CorruptionErrorf("corrupted batch")
return 0
}
var length uint64
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
keyEnd := i.offsets[i.index].keyEnd
v, n := binary.Uvarint(i.data[keyEnd:])
if n <= 0 {
i.err = base.CorruptionErrorf("corrupted batch")
return 0
}
length = v + uint64(n)
}
return length
}

// batchOptions holds the parameters to configure batch.
type batchOptions struct {
initialSizeBytes int
Expand Down
28 changes: 0 additions & 28 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,34 +1279,6 @@ func scanKeyspanIterator(w io.Writer, ki keyspan.FragmentIterator) {
}
}

func TestFlushableBatchBytesIterated(t *testing.T) {
batch := newBatch(nil)
for j := 0; j < 1000; j++ {
key := make([]byte, 8+j%3)
value := make([]byte, 7+j%5)
batch.Set(key, value, nil)

fb, err := newFlushableBatch(batch, DefaultComparer)
require.NoError(t, err)

var bytesIterated uint64
it := fb.newFlushIter(nil, &bytesIterated)

var prevIterated uint64
for kv := it.First(); kv != nil; kv = it.Next() {
if bytesIterated < prevIterated {
t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated)
}
prevIterated = bytesIterated
}

expected := fb.inuseBytes()
if bytesIterated != expected {
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected)
}
}
}

func TestEmptyFlushableBatch(t *testing.T) {
// Verify that we can create a flushable batch on an empty batch.
fb, err := newFlushableBatch(newBatch(nil), DefaultComparer)
Expand Down
28 changes: 10 additions & 18 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,6 @@ type compaction struct {

// flushing contains the flushables (aka memtables) that are being flushed.
flushing flushableList
// bytesIterated contains the number of bytes that have been flushed/compacted.
bytesIterated uint64
// bytesWritten contains the number of bytes that have been written to outputs.
bytesWritten int64

Expand Down Expand Up @@ -942,7 +940,7 @@ func (c *compaction) newInputIter(
// stored in c.flushing.
for i := range c.flushing {
f := c.flushing[i]
iters = append(iters, f.newFlushIter(nil, &c.bytesIterated))
iters = append(iters, f.newFlushIter(nil))
rangeDelIter := f.newRangeDelIter(nil)
if rangeDelIter != nil {
rangeDelIters = append(rangeDelIters, rangeDelIter)
Expand All @@ -959,8 +957,8 @@ func (c *compaction) newInputIter(
// deletions to compactions are handled below.
iters = append(iters, newLevelIter(context.Background(),
iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
compaction: true,
bufferPool: &c.bufferPool,
}))
// TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
// deletions into memory upfront. (See #2015, which reverted this.) There
Expand Down Expand Up @@ -1000,7 +998,7 @@ func (c *compaction) newInputIter(
iter := level.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
rangeDelIter, closer, err := c.newRangeDelIter(
newIters, iter.Take(), iterOpts, l, &c.bytesIterated)
newIters, iter.Take(), iterOpts, l)
if err != nil {
// The error will already be annotated with the BackingFileNum, so
// we annotate it with the FileNum.
Expand Down Expand Up @@ -1113,17 +1111,13 @@ func (c *compaction) newInputIter(
}

func (c *compaction) newRangeDelIter(
newIters tableNewIters,
f manifest.LevelFile,
opts IterOptions,
l manifest.Level,
bytesIterated *uint64,
newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Level,
) (keyspan.FragmentIterator, io.Closer, error) {
opts.level = l
iterSet, err := newIters(context.Background(), f.FileMetadata,
&opts, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
internalIterOpts{
compaction: true,
bufferPool: &c.bufferPool,
}, iterRangeDeletions)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1767,8 +1761,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
d.mu.versions.logUnlock()
}

bytesFlushed = c.bytesIterated

// If err != nil, then the flush will be retried, and we will recalculate
// these metrics.
if err == nil {
Expand Down Expand Up @@ -1820,7 +1812,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
close(flushed[i].flushed)
}

return bytesFlushed, err
return inputBytes, err
}

// maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
Expand Down
4 changes: 2 additions & 2 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// flushable defines the interface for immutable memtables.
type flushable interface {
newIter(o *IterOptions) internalIterator
newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator
newFlushIter(o *IterOptions) internalIterator
newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
containsRangeKeys() bool
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
}

// newFlushIter is part of the flushable interface.
func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
// newFlushIter is only used for writing memtables to disk as sstables.
// Since ingested sstables are already present on disk, they don't need to
// make use of a flush iter.
Expand Down
9 changes: 1 addition & 8 deletions internal/arenaskl/flush_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import "github.com/cockroachdb/pebble/internal/base"
// simply value copying the struct.
type flushIterator struct {
Iterator
bytesIterated *uint64
}

// flushIterator implements the base.InternalIterator interface.
Expand All @@ -51,12 +50,7 @@ func (it *flushIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.Intern
// that First only checks the upper bound. It is up to the caller to ensure
// that key is greater than or equal to the lower bound.
func (it *flushIterator) First() *base.InternalKV {
kv := it.Iterator.First()
if kv == nil {
return nil
}
*it.bytesIterated += uint64(it.nd.allocSize)
return kv
return it.Iterator.First()
}

// Next advances to the next position. Returns the key and value if the
Expand All @@ -69,7 +63,6 @@ func (it *flushIterator) Next() *base.InternalKV {
return nil
}
it.decodeKey()
*it.bytesIterated += uint64(it.nd.allocSize)
it.kv.V = base.MakeInPlaceValue(it.value())
return &it.kv
}
Expand Down
5 changes: 2 additions & 3 deletions internal/arenaskl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,9 @@ func (s *Skiplist) NewIter(lower, upper []byte) *Iterator {
// NewFlushIter returns a new flushIterator, which is similar to an Iterator
// but also sets the current number of the bytes that have been iterated
// through.
func (s *Skiplist) NewFlushIter(bytesFlushed *uint64) base.InternalIterator {
func (s *Skiplist) NewFlushIter() base.InternalIterator {
return &flushIterator{
Iterator: Iterator{list: s, nd: s.head},
bytesIterated: bytesFlushed,
Iterator: Iterator{list: s, nd: s.head},
}
}

Expand Down
29 changes: 0 additions & 29 deletions internal/arenaskl/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,35 +769,6 @@ func TestIteratorBounds(t *testing.T) {
require.False(t, it.Prev())
}

func TestBytesIterated(t *testing.T) {
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
emptySize := l.arena.Size()
for i := 0; i < 200; i++ {
bytesIterated := l.bytesIterated(t)
expected := uint64(l.arena.Size() - emptySize)
if bytesIterated != expected {
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected)
}
l.Add(base.InternalKey{UserKey: []byte{byte(i)}}, nil)
}
}

// bytesIterated returns the number of bytes iterated in the skiplist.
func (s *Skiplist) bytesIterated(t *testing.T) (bytesIterated uint64) {
x := s.NewFlushIter(&bytesIterated)
var prevIterated uint64
for kv := x.First(); kv != nil; kv = x.Next() {
if bytesIterated < prevIterated {
t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated)
}
prevIterated = bytesIterated
}
if x.Close() != nil {
return 0
}
return bytesIterated
}

func randomKey(rng *rand.Rand, b []byte) base.InternalKey {
key := rng.Uint32()
key2 := rng.Uint32()
Expand Down
5 changes: 4 additions & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
)

type internalIterOpts struct {
bytesIterated *uint64
// if compaction is set, sstable-level iterators will be created using
// NewCompactionIter; these iterators have a more constrained interface
// and are optimized for the sequential scan of a compaction.
compaction bool
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
Expand Down
4 changes: 2 additions & 2 deletions mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ func (m *memTable) newIter(o *IterOptions) internalIterator {
}

// newFlushIter is part of the flushable interface.
func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
return m.skl.NewFlushIter(bytesFlushed)
func (m *memTable) newFlushIter(o *IterOptions) internalIterator {
return m.skl.NewFlushIter()
}

// newRangeDelIter is part of the flushable interface.
Expand Down
28 changes: 0 additions & 28 deletions mem_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,6 @@ func (m *memTable) count() (n int) {
return n
}

// bytesIterated returns the number of bytes iterated in a DB.
func (m *memTable) bytesIterated(t *testing.T) (bytesIterated uint64) {
x := m.newFlushIter(nil, &bytesIterated)
var prevIterated uint64
for kv := x.First(); kv != nil; kv = x.Next() {
if bytesIterated < prevIterated {
t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated)
}
prevIterated = bytesIterated
}
if x.Close() != nil {
return 0
}
return bytesIterated
}

func ikey(s string) InternalKey {
return base.MakeInternalKey([]byte(s), 0, InternalKeyKindSet)
}
Expand Down Expand Up @@ -158,18 +142,6 @@ func TestMemTableCount(t *testing.T) {
}
}

func TestMemTableBytesIterated(t *testing.T) {
m := newMemTable(memTableOptions{})
for i := 0; i < 200; i++ {
bytesIterated := m.bytesIterated(t)
expected := m.inuseBytes()
if bytesIterated != expected {
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected)
}
m.set(InternalKey{UserKey: []byte{byte(i)}}, nil)
}
}

func TestMemTableEmpty(t *testing.T) {
m := newMemTable(memTableOptions{})
if !m.empty() {
Expand Down
Loading

0 comments on commit c34894c

Please sign in to comment.