Skip to content

Commit

Permalink
*: propagate pointer to InternalIteratorStats
Browse files Browse the repository at this point in the history
Replace the base.InternalIteratorWithStats interface with logic to propagate a
pointer to a shared *InternalIteratorStats down the iterator tree during
iterator construction. This reduces the cost of collecting stats, which no
longer needs to descend the iterator tree summing stats as it goes.

Informs cockroachdb/cockroach#86083.
Informs cockroachdb/cockroach#82559.
  • Loading branch information
jbowens committed Aug 23, 2022
1 parent 9c2bc01 commit 14d80cd
Show file tree
Hide file tree
Showing 24 changed files with 162 additions and 271 deletions.
11 changes: 6 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ type compaction struct {
formatKey base.FormatKey
logger Logger
version *version
stats base.InternalIteratorStats

score float64

Expand Down Expand Up @@ -1028,12 +1029,12 @@ func (c *compaction) newInputIter(
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
c.rangeDelIter.Init(c.cmp, rangeDelIter)
iter = newMergingIter(c.logger, c.cmp, nil, iter, &c.rangeDelIter)
iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iter, &c.rangeDelIter)
}
if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIter)
c.rangeKeyInterleaving.Init(c.comparer, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
c.rangeKeyInterleaving.Init(c.comparer, iter, mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
iter = &c.rangeKeyInterleaving
}
return iter, nil
Expand All @@ -1056,11 +1057,11 @@ func (c *compaction) newInputIter(
c.rangeDelIter.Init(c.cmp, rangeDelIters...)
iters = append(iters, &c.rangeDelIter)
}
var iter base.InternalIteratorWithStats = newMergingIter(c.logger, c.cmp, nil, iters...)
var iter internalIterator = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
if len(rangeKeyIters) > 0 {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...)
c.rangeKeyInterleaving.Init(c.comparer, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
c.rangeKeyInterleaving.Init(c.comparer, iter, mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
iter = &c.rangeKeyInterleaving
}
return iter, nil
Expand Down Expand Up @@ -1264,7 +1265,7 @@ func (c *compaction) newInputIter(
c.rangeDelIter.Init(c.cmp, rangeDelIters...)
iters = append(iters, &c.rangeDelIter)
}
pointKeyIter := newMergingIter(c.logger, c.cmp, nil, iters...)
pointKeyIter := newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
if len(rangeKeyIters) > 0 {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...)
Expand Down
2 changes: 1 addition & 1 deletion compaction_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestCompactionIter(t *testing.T) {
interleavingIter = &keyspan.InterleavingIter{}
interleavingIter.Init(
base.DefaultComparer,
base.WrapIterWithStats(fi),
fi,
keyspan.NewIter(base.DefaultComparer.Compare, rangeKeys),
nil, nil, nil)
iter := newInvalidatingIter(interleavingIter)
Expand Down
35 changes: 20 additions & 15 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ import (
"github.com/stretchr/testify/require"
)

type iterCmdOpt int
type iterCmdOpts struct {
verboseKey bool
stats *base.InternalIteratorStats
}

const (
iterCmdVerboseKey iterCmdOpt = iota
)
type iterCmdOpt func(*iterCmdOpts)

func iterCmdVerboseKey(opts *iterCmdOpts) { opts.verboseKey = true }

func iterCmdStats(stats *base.InternalIteratorStats) iterCmdOpt {
return func(opts *iterCmdOpts) {
opts.stats = stats
}
}

func runGetCmd(td *datadriven.TestData, d *DB) string {
snap := Snapshot{
Expand Down Expand Up @@ -367,11 +376,9 @@ func writeRangeKeys(b io.Writer, iter *Iterator) {
}

func runInternalIterCmd(d *datadriven.TestData, iter internalIterator, opts ...iterCmdOpt) string {
var verboseKey bool
var o iterCmdOpts
for _, opt := range opts {
if opt == iterCmdVerboseKey {
verboseKey = true
}
opt(&o)
}

var b bytes.Buffer
Expand Down Expand Up @@ -448,22 +455,20 @@ func runInternalIterCmd(d *datadriven.TestData, iter internalIterator, opts ...i
iter.SetBounds(lower, upper)
continue
case "stats":
ii, ok := iter.(internalIteratorWithStats)
if ok {
fmt.Fprintf(&b, "%+v\n", ii.Stats())
if o.stats != nil {
fmt.Fprintf(&b, "%+v\n", *o.stats)
}
continue
case "reset-stats":
ii, ok := iter.(internalIteratorWithStats)
if ok {
ii.ResetStats()
if o.stats != nil {
*o.stats = base.InternalIteratorStats{}
}
continue
default:
return fmt.Sprintf("unknown op: %s", parts[0])
}
if key != nil {
if verboseKey {
if o.verboseKey {
fmt.Fprintf(&b, "%s:%s\n", key, value)
} else {
fmt.Fprintf(&b, "%s:%s\n", key.UserKey, value)
Expand Down
16 changes: 8 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
}

i := &buf.dbi
pointIter := base.WrapIterWithStats(get)
pointIter := get
*i = Iterator{
getIterAlloc: buf,
iter: pointIter,
Expand Down Expand Up @@ -1037,6 +1037,10 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
// Already have one.
return
}
internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
if i.opts.RangeKeyMasking.Filter != nil {
internalOpts.boundLimitedFilter = &i.rangeKeyMasking
}

// Merging levels and levels from iterAlloc.
mlevels := buf.mlevels[:0]
Expand Down Expand Up @@ -1092,7 +1096,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
rangeDelIter = &i.batchRangeDelIter
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(&i.batchPointIter),
iter: &i.batchPointIter,
rangeDelIter: rangeDelIter,
})
}
Expand All @@ -1102,7 +1106,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(mem.newIter(&i.opts)),
iter: mem.newIter(&i.opts),
rangeDelIter: mem.newRangeDelIter(&i.opts),
})
}
Expand All @@ -1112,10 +1116,6 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
levelsIndex := len(levels)
mlevels = mlevels[:numMergingLevels]
levels = levels[:numLevelIters]
var internalOpts internalIterOpts
if i.opts.RangeKeyMasking.Filter != nil {
internalOpts.boundLimitedFilter = &i.rangeKeyMasking
}
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
li := &levels[levelsIndex]

Expand All @@ -1142,7 +1142,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
}
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}
buf.merging.init(&i.opts, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.snapshot = i.seqNum
buf.merging.elideRangeTombstones = true
buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
Expand Down
4 changes: 1 addition & 3 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type errorIter struct {
}

// errorIter implements the base.InternalIterator interface.
var _ internalIteratorWithStats = (*errorIter)(nil)
var _ internalIterator = (*errorIter)(nil)

func newErrorIter(err error) *errorIter {
return &errorIter{err: err}
Expand Down Expand Up @@ -63,8 +63,6 @@ func (c *errorIter) String() string {
}

func (c *errorIter) SetBounds(lower, upper []byte) {}
func (c *errorIter) Stats() InternalIteratorStats { return InternalIteratorStats{} }
func (c *errorIter) ResetStats() {}

type errorKeyspanIter struct {
err error
Expand Down
6 changes: 3 additions & 3 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func finishInitializingExternal(it *Iterator) {
continue
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
iter: pointIter,
rangeDelIter: rangeDelIter,
})
}
Expand All @@ -218,12 +218,12 @@ func finishInitializingExternal(it *Iterator) {
}
sli.init(it.opts)
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(sli),
iter: sli,
rangeDelIter: nil,
})
}
}
it.alloc.merging.init(&it.opts, it.comparer.Compare, it.comparer.Split, mlevels...)
it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
it.alloc.merging.elideRangeTombstones = true
it.pointIter = &it.alloc.merging
Expand Down
2 changes: 1 addition & 1 deletion get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestGetIter(t *testing.T) {
i := &buf.dbi
i.comparer = *testkeys.Comparer
i.merge = DefaultMerger.Merge
i.iter = base.WrapIterWithStats(get)
i.iter = get

defer i.Close()
if !i.First() {
Expand Down
2 changes: 0 additions & 2 deletions internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type InternalKey = base.InternalKey

type internalIterator = base.InternalIterator

type internalIteratorWithStats = base.InternalIteratorWithStats

// ErrCorruption is a marker to indicate that data in a file (WAL, MANIFEST,
// sstable) isn't in the expected format.
var ErrCorruption = base.ErrCorruption
34 changes: 0 additions & 34 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ func (s SeekLTFlags) DisableRelativeSeek() SeekLTFlags {
return s &^ (1 << seekLTFlagRelativeSeek)
}

// InternalIteratorWithStats extends InternalIterator to expose stats.
type InternalIteratorWithStats interface {
InternalIterator
Stats() InternalIteratorStats
ResetStats()
}

// InternalIteratorStats contains miscellaneous stats produced by
// InternalIterators that are part of the InternalIterator tree. Not every
// field is relevant for an InternalIterator implementation. The field values
Expand Down Expand Up @@ -341,30 +334,3 @@ func (s *InternalIteratorStats) Merge(from InternalIteratorStats) {
s.PointCount += from.PointCount
s.PointsCoveredByRangeTombstones += from.PointsCoveredByRangeTombstones
}

type internalIteratorWithEmptyStats struct {
InternalIterator
}

var _ InternalIteratorWithStats = internalIteratorWithEmptyStats{}

// Stats implements InternalIteratorWithStats.
func (i internalIteratorWithEmptyStats) Stats() InternalIteratorStats {
return InternalIteratorStats{}
}

// ResetStats implements InternalIteratorWithStats.
func (i internalIteratorWithEmptyStats) ResetStats() {}

// WrapIterWithStats ensures that either iter implements the stats methods or
// wraps it, such that the return value implements InternalIteratorWithStats.
func WrapIterWithStats(iter InternalIterator) InternalIteratorWithStats {
if iter == nil {
return nil
}
i, ok := iter.(InternalIteratorWithStats)
if ok {
return i
}
return internalIteratorWithEmptyStats{InternalIterator: iter}
}
27 changes: 0 additions & 27 deletions internal/base/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,9 @@ package base

import (
"fmt"
"math/rand"
"reflect"
"testing"

"github.com/stretchr/testify/require"
)

func setRandUint64(v reflect.Value) uint64 {
val := rand.Uint64()
v.SetUint(val)
return val
}

func TestInternalIteratorStatsMerge(t *testing.T) {
var from, to, expected InternalIteratorStats
n := reflect.ValueOf(from).NumField()
for i := 0; i < n; i++ {
switch reflect.ValueOf(from).Type().Field(i).Type.Kind() {
case reflect.Uint64:
v1 := setRandUint64(reflect.ValueOf(&from).Elem().Field(i))
v2 := setRandUint64(reflect.ValueOf(&to).Elem().Field(i))
reflect.ValueOf(&expected).Elem().Field(i).SetUint(v1 + v2)
default:
t.Fatalf("unknown kind %v", reflect.ValueOf(from).Type().Field(i).Type.Kind())
}
}
to.Merge(from)
require.Equal(t, expected, to)
}

func TestFlags(t *testing.T) {
t.Run("SeekGEFlags", func(t *testing.T) {
f := SeekGEFlagsNone
Expand Down
16 changes: 2 additions & 14 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type SpanMask interface {
type InterleavingIter struct {
cmp base.Compare
comparer *base.Comparer
pointIter base.InternalIteratorWithStats
pointIter base.InternalIterator
keyspanIter FragmentIterator
mask SpanMask

Expand Down Expand Up @@ -178,7 +178,7 @@ var _ base.InternalIterator = &InterleavingIter{}
// propagate the bounds down the iterator stack.
func (i *InterleavingIter) Init(
comparer *base.Comparer,
pointIter base.InternalIteratorWithStats,
pointIter base.InternalIterator,
keyspanIter FragmentIterator,
mask SpanMask,
lowerBound, upperBound []byte,
Expand Down Expand Up @@ -1083,18 +1083,6 @@ func (i *InterleavingIter) String() string {
return fmt.Sprintf("keyspan-interleaving(%q)", i.pointIter.String())
}

var _ base.InternalIteratorWithStats = &InterleavingIter{}

// Stats implements InternalIteratorWithStats.
func (i *InterleavingIter) Stats() base.InternalIteratorStats {
return i.pointIter.Stats()
}

// ResetStats implements InternalIteratorWithStats.
func (i *InterleavingIter) ResetStats() {
i.pointIter.ResetStats()
}

func firstError(err0, err1 error) error {
if err0 != nil {
return err0
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func runInterleavingIterTest(t *testing.T, filename string) {
}
keyspanIter.Init(cmp, noopTransform, NewIter(cmp, spans))
hooks.maskSuffix = nil
iter.Init(testkeys.Comparer, base.WrapIterWithStats(&pointIter), &keyspanIter, &hooks, nil, nil)
iter.Init(testkeys.Comparer, &pointIter, &keyspanIter, &hooks, nil, nil)
return "OK"
case "define-pointkeys":
var points []base.InternalKey
Expand All @@ -117,7 +117,7 @@ func runInterleavingIterTest(t *testing.T, filename string) {
}
pointIter = pointIterator{cmp: cmp, keys: points}
hooks.maskSuffix = nil
iter.Init(testkeys.Comparer, base.WrapIterWithStats(&pointIter), &keyspanIter, &hooks, nil, nil)
iter.Init(testkeys.Comparer, &pointIter, &keyspanIter, &hooks, nil, nil)
return "OK"
case "iter":
buf.Reset()
Expand Down
Loading

0 comments on commit 14d80cd

Please sign in to comment.