From 14d80cd0ad8e2e275ed09066154a73221158983c Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 19 Aug 2022 15:32:32 -0400 Subject: [PATCH] *: propagate pointer to InternalIteratorStats Replace the base.InternalIteratorWithStats interface with logic to propagate a pointer to a shared *InternalIteratorStats down the iterator tree during iterator construction. This reduces the cost of collecting stats, which no longer needs to descend the iterator tree summing stats as it goes. Informs cockroachdb/cockroach#86083. Informs cockroachdb/cockroach#82559. --- compaction.go | 11 ++--- compaction_iter_test.go | 2 +- data_test.go | 35 +++++++++------- db.go | 16 +++---- error_iter.go | 4 +- external_iterator.go | 6 +-- get_iter_test.go | 2 +- internal.go | 2 - internal/base/iterator.go | 34 --------------- internal/base/iterator_test.go | 27 ------------ internal/keyspan/interleaving_iter.go | 16 +------ internal/keyspan/interleaving_iter_test.go | 4 +- iterator.go | 9 ++-- iterator_test.go | 49 +++++++++------------- level_iter.go | 28 ++----------- level_iter_test.go | 16 +++---- merging_iter.go | 40 +++++++----------- merging_iter_test.go | 32 ++++++++------ range_keys.go | 10 +---- sstable/block_property_test.go | 6 ++- sstable/data_test.go | 9 +++- sstable/reader.go | 47 +++++++++++---------- sstable/reader_test.go | 26 ++++-------- table_cache.go | 2 +- 24 files changed, 162 insertions(+), 271 deletions(-) diff --git a/compaction.go b/compaction.go index df40cdc525..d470e89fcc 100644 --- a/compaction.go +++ b/compaction.go @@ -383,6 +383,7 @@ type compaction struct { formatKey base.FormatKey logger Logger version *version + stats base.InternalIteratorStats score float64 @@ -1028,12 +1029,12 @@ func (c *compaction) newInputIter( iter := f.newFlushIter(nil, &c.bytesIterated) if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil { c.rangeDelIter.Init(c.cmp, rangeDelIter) - iter = newMergingIter(c.logger, c.cmp, nil, iter, &c.rangeDelIter) + iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iter, &c.rangeDelIter) } if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil { mi := &keyspan.MergingIter{} mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIter) - c.rangeKeyInterleaving.Init(c.comparer, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */) + c.rangeKeyInterleaving.Init(c.comparer, iter, mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */) iter = &c.rangeKeyInterleaving } return iter, nil @@ -1056,11 +1057,11 @@ func (c *compaction) newInputIter( c.rangeDelIter.Init(c.cmp, rangeDelIters...) iters = append(iters, &c.rangeDelIter) } - var iter base.InternalIteratorWithStats = newMergingIter(c.logger, c.cmp, nil, iters...) + var iter internalIterator = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...) if len(rangeKeyIters) > 0 { mi := &keyspan.MergingIter{} mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...) - c.rangeKeyInterleaving.Init(c.comparer, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */) + c.rangeKeyInterleaving.Init(c.comparer, iter, mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */) iter = &c.rangeKeyInterleaving } return iter, nil @@ -1264,7 +1265,7 @@ func (c *compaction) newInputIter( c.rangeDelIter.Init(c.cmp, rangeDelIters...) iters = append(iters, &c.rangeDelIter) } - pointKeyIter := newMergingIter(c.logger, c.cmp, nil, iters...) + pointKeyIter := newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...) if len(rangeKeyIters) > 0 { mi := &keyspan.MergingIter{} mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...) diff --git a/compaction_iter_test.go b/compaction_iter_test.go index d1e5ea9aed..ede349bb30 100644 --- a/compaction_iter_test.go +++ b/compaction_iter_test.go @@ -101,7 +101,7 @@ func TestCompactionIter(t *testing.T) { interleavingIter = &keyspan.InterleavingIter{} interleavingIter.Init( base.DefaultComparer, - base.WrapIterWithStats(fi), + fi, keyspan.NewIter(base.DefaultComparer.Compare, rangeKeys), nil, nil, nil) iter := newInvalidatingIter(interleavingIter) diff --git a/data_test.go b/data_test.go index f4b93de724..4b6df7fc25 100644 --- a/data_test.go +++ b/data_test.go @@ -27,11 +27,20 @@ import ( "github.com/stretchr/testify/require" ) -type iterCmdOpt int +type iterCmdOpts struct { + verboseKey bool + stats *base.InternalIteratorStats +} -const ( - iterCmdVerboseKey iterCmdOpt = iota -) +type iterCmdOpt func(*iterCmdOpts) + +func iterCmdVerboseKey(opts *iterCmdOpts) { opts.verboseKey = true } + +func iterCmdStats(stats *base.InternalIteratorStats) iterCmdOpt { + return func(opts *iterCmdOpts) { + opts.stats = stats + } +} func runGetCmd(td *datadriven.TestData, d *DB) string { snap := Snapshot{ @@ -367,11 +376,9 @@ func writeRangeKeys(b io.Writer, iter *Iterator) { } func runInternalIterCmd(d *datadriven.TestData, iter internalIterator, opts ...iterCmdOpt) string { - var verboseKey bool + var o iterCmdOpts for _, opt := range opts { - if opt == iterCmdVerboseKey { - verboseKey = true - } + opt(&o) } var b bytes.Buffer @@ -448,22 +455,20 @@ func runInternalIterCmd(d *datadriven.TestData, iter internalIterator, opts ...i iter.SetBounds(lower, upper) continue case "stats": - ii, ok := iter.(internalIteratorWithStats) - if ok { - fmt.Fprintf(&b, "%+v\n", ii.Stats()) + if o.stats != nil { + fmt.Fprintf(&b, "%+v\n", *o.stats) } continue case "reset-stats": - ii, ok := iter.(internalIteratorWithStats) - if ok { - ii.ResetStats() + if o.stats != nil { + *o.stats = base.InternalIteratorStats{} } continue default: return fmt.Sprintf("unknown op: %s", parts[0]) } if key != nil { - if verboseKey { + if o.verboseKey { fmt.Fprintf(&b, "%s:%s\n", key, value) } else { fmt.Fprintf(&b, "%s:%s\n", key.UserKey, value) diff --git a/db.go b/db.go index 8b45e28937..b424531b66 100644 --- a/db.go +++ b/db.go @@ -523,7 +523,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, } i := &buf.dbi - pointIter := base.WrapIterWithStats(get) + pointIter := get *i = Iterator{ getIterAlloc: buf, iter: pointIter, @@ -1037,6 +1037,10 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { // Already have one. return } + internalOpts := internalIterOpts{stats: &i.stats.InternalStats} + if i.opts.RangeKeyMasking.Filter != nil { + internalOpts.boundLimitedFilter = &i.rangeKeyMasking + } // Merging levels and levels from iterAlloc. mlevels := buf.mlevels[:0] @@ -1092,7 +1096,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { rangeDelIter = &i.batchRangeDelIter } mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(&i.batchPointIter), + iter: &i.batchPointIter, rangeDelIter: rangeDelIter, }) } @@ -1102,7 +1106,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { for j := len(memtables) - 1; j >= 0; j-- { mem := memtables[j] mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(mem.newIter(&i.opts)), + iter: mem.newIter(&i.opts), rangeDelIter: mem.newRangeDelIter(&i.opts), }) } @@ -1112,10 +1116,6 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { levelsIndex := len(levels) mlevels = mlevels[:numMergingLevels] levels = levels[:numLevelIters] - var internalOpts internalIterOpts - if i.opts.RangeKeyMasking.Filter != nil { - internalOpts.boundLimitedFilter = &i.rangeKeyMasking - } addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) { li := &levels[levelsIndex] @@ -1142,7 +1142,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { } addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } - buf.merging.init(&i.opts, i.comparer.Compare, i.comparer.Split, mlevels...) + buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum buf.merging.elideRangeTombstones = true buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState diff --git a/error_iter.go b/error_iter.go index e7ef449a6f..0aecb43249 100644 --- a/error_iter.go +++ b/error_iter.go @@ -14,7 +14,7 @@ type errorIter struct { } // errorIter implements the base.InternalIterator interface. -var _ internalIteratorWithStats = (*errorIter)(nil) +var _ internalIterator = (*errorIter)(nil) func newErrorIter(err error) *errorIter { return &errorIter{err: err} @@ -63,8 +63,6 @@ func (c *errorIter) String() string { } func (c *errorIter) SetBounds(lower, upper []byte) {} -func (c *errorIter) Stats() InternalIteratorStats { return InternalIteratorStats{} } -func (c *errorIter) ResetStats() {} type errorKeyspanIter struct { err error diff --git a/external_iterator.go b/external_iterator.go index fefadb1c6b..b62a4e4cf0 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -207,7 +207,7 @@ func finishInitializingExternal(it *Iterator) { continue } mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(pointIter), + iter: pointIter, rangeDelIter: rangeDelIter, }) } @@ -218,12 +218,12 @@ func finishInitializingExternal(it *Iterator) { } sli.init(it.opts) mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(sli), + iter: sli, rangeDelIter: nil, }) } } - it.alloc.merging.init(&it.opts, it.comparer.Compare, it.comparer.Split, mlevels...) + it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...) it.alloc.merging.snapshot = base.InternalKeySeqNumMax it.alloc.merging.elideRangeTombstones = true it.pointIter = &it.alloc.merging diff --git a/get_iter_test.go b/get_iter_test.go index 45bc624647..98690cfa3e 100644 --- a/get_iter_test.go +++ b/get_iter_test.go @@ -541,7 +541,7 @@ func TestGetIter(t *testing.T) { i := &buf.dbi i.comparer = *testkeys.Comparer i.merge = DefaultMerger.Merge - i.iter = base.WrapIterWithStats(get) + i.iter = get defer i.Close() if !i.First() { diff --git a/internal.go b/internal.go index e07931b24e..a856e99b6c 100644 --- a/internal.go +++ b/internal.go @@ -33,8 +33,6 @@ type InternalKey = base.InternalKey type internalIterator = base.InternalIterator -type internalIteratorWithStats = base.InternalIteratorWithStats - // ErrCorruption is a marker to indicate that data in a file (WAL, MANIFEST, // sstable) isn't in the expected format. var ErrCorruption = base.ErrCorruption diff --git a/internal/base/iterator.go b/internal/base/iterator.go index 892ee77ee1..424b8f97e8 100644 --- a/internal/base/iterator.go +++ b/internal/base/iterator.go @@ -295,13 +295,6 @@ func (s SeekLTFlags) DisableRelativeSeek() SeekLTFlags { return s &^ (1 << seekLTFlagRelativeSeek) } -// InternalIteratorWithStats extends InternalIterator to expose stats. -type InternalIteratorWithStats interface { - InternalIterator - Stats() InternalIteratorStats - ResetStats() -} - // InternalIteratorStats contains miscellaneous stats produced by // InternalIterators that are part of the InternalIterator tree. Not every // field is relevant for an InternalIterator implementation. The field values @@ -341,30 +334,3 @@ func (s *InternalIteratorStats) Merge(from InternalIteratorStats) { s.PointCount += from.PointCount s.PointsCoveredByRangeTombstones += from.PointsCoveredByRangeTombstones } - -type internalIteratorWithEmptyStats struct { - InternalIterator -} - -var _ InternalIteratorWithStats = internalIteratorWithEmptyStats{} - -// Stats implements InternalIteratorWithStats. -func (i internalIteratorWithEmptyStats) Stats() InternalIteratorStats { - return InternalIteratorStats{} -} - -// ResetStats implements InternalIteratorWithStats. -func (i internalIteratorWithEmptyStats) ResetStats() {} - -// WrapIterWithStats ensures that either iter implements the stats methods or -// wraps it, such that the return value implements InternalIteratorWithStats. -func WrapIterWithStats(iter InternalIterator) InternalIteratorWithStats { - if iter == nil { - return nil - } - i, ok := iter.(InternalIteratorWithStats) - if ok { - return i - } - return internalIteratorWithEmptyStats{InternalIterator: iter} -} diff --git a/internal/base/iterator_test.go b/internal/base/iterator_test.go index 750d5e09e8..e393dbdf7e 100644 --- a/internal/base/iterator_test.go +++ b/internal/base/iterator_test.go @@ -6,36 +6,9 @@ package base import ( "fmt" - "math/rand" - "reflect" "testing" - - "github.com/stretchr/testify/require" ) -func setRandUint64(v reflect.Value) uint64 { - val := rand.Uint64() - v.SetUint(val) - return val -} - -func TestInternalIteratorStatsMerge(t *testing.T) { - var from, to, expected InternalIteratorStats - n := reflect.ValueOf(from).NumField() - for i := 0; i < n; i++ { - switch reflect.ValueOf(from).Type().Field(i).Type.Kind() { - case reflect.Uint64: - v1 := setRandUint64(reflect.ValueOf(&from).Elem().Field(i)) - v2 := setRandUint64(reflect.ValueOf(&to).Elem().Field(i)) - reflect.ValueOf(&expected).Elem().Field(i).SetUint(v1 + v2) - default: - t.Fatalf("unknown kind %v", reflect.ValueOf(from).Type().Field(i).Type.Kind()) - } - } - to.Merge(from) - require.Equal(t, expected, to) -} - func TestFlags(t *testing.T) { t.Run("SeekGEFlags", func(t *testing.T) { f := SeekGEFlagsNone diff --git a/internal/keyspan/interleaving_iter.go b/internal/keyspan/interleaving_iter.go index 957ca52dc5..78c877cb43 100644 --- a/internal/keyspan/interleaving_iter.go +++ b/internal/keyspan/interleaving_iter.go @@ -94,7 +94,7 @@ type SpanMask interface { type InterleavingIter struct { cmp base.Compare comparer *base.Comparer - pointIter base.InternalIteratorWithStats + pointIter base.InternalIterator keyspanIter FragmentIterator mask SpanMask @@ -178,7 +178,7 @@ var _ base.InternalIterator = &InterleavingIter{} // propagate the bounds down the iterator stack. func (i *InterleavingIter) Init( comparer *base.Comparer, - pointIter base.InternalIteratorWithStats, + pointIter base.InternalIterator, keyspanIter FragmentIterator, mask SpanMask, lowerBound, upperBound []byte, @@ -1083,18 +1083,6 @@ func (i *InterleavingIter) String() string { return fmt.Sprintf("keyspan-interleaving(%q)", i.pointIter.String()) } -var _ base.InternalIteratorWithStats = &InterleavingIter{} - -// Stats implements InternalIteratorWithStats. -func (i *InterleavingIter) Stats() base.InternalIteratorStats { - return i.pointIter.Stats() -} - -// ResetStats implements InternalIteratorWithStats. -func (i *InterleavingIter) ResetStats() { - i.pointIter.ResetStats() -} - func firstError(err0, err1 error) error { if err0 != nil { return err0 diff --git a/internal/keyspan/interleaving_iter_test.go b/internal/keyspan/interleaving_iter_test.go index 2ac7a85bdd..8b5e37daad 100644 --- a/internal/keyspan/interleaving_iter_test.go +++ b/internal/keyspan/interleaving_iter_test.go @@ -107,7 +107,7 @@ func runInterleavingIterTest(t *testing.T, filename string) { } keyspanIter.Init(cmp, noopTransform, NewIter(cmp, spans)) hooks.maskSuffix = nil - iter.Init(testkeys.Comparer, base.WrapIterWithStats(&pointIter), &keyspanIter, &hooks, nil, nil) + iter.Init(testkeys.Comparer, &pointIter, &keyspanIter, &hooks, nil, nil) return "OK" case "define-pointkeys": var points []base.InternalKey @@ -117,7 +117,7 @@ func runInterleavingIterTest(t *testing.T, filename string) { } pointIter = pointIterator{cmp: cmp, keys: points} hooks.maskSuffix = nil - iter.Init(testkeys.Comparer, base.WrapIterWithStats(&pointIter), &keyspanIter, &hooks, nil, nil) + iter.Init(testkeys.Comparer, &pointIter, &keyspanIter, &hooks, nil, nil) return "OK" case "iter": buf.Reset() diff --git a/iterator.go b/iterator.go index c5eba3e4d5..808bc1b2ce 100644 --- a/iterator.go +++ b/iterator.go @@ -149,8 +149,8 @@ type Iterator struct { opts IterOptions merge Merge comparer base.Comparer - iter internalIteratorWithStats - pointIter internalIteratorWithStats + iter internalIterator + pointIter internalIterator readState *readState // rangeKey holds iteration state specific to iteration over range keys. // The range key field may be nil if the Iterator has never been configured @@ -2138,14 +2138,11 @@ func (i *Iterator) Metrics() IteratorMetrics { // ResetStats resets the stats to 0. func (i *Iterator) ResetStats() { i.stats = IteratorStats{} - i.iter.ResetStats() } // Stats returns the current stats. func (i *Iterator) Stats() IteratorStats { - stats := i.stats - stats.InternalStats = i.iter.Stats() - return stats + return i.stats } // CloneOptions configures an iterator constructed through Iterator.Clone. diff --git a/iterator_test.go b/iterator_test.go index 58868da1bf..188ea7cbb2 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -222,14 +222,14 @@ func (f *fakeIter) SetBounds(lower, upper []byte) { // invalidatingIter tests unsafe key/value slice reuse by modifying the last // returned key/value to all 1s. type invalidatingIter struct { - iter internalIteratorWithStats + iter internalIterator lastKey *InternalKey lastValue []byte ignoreKinds [base.InternalKeyKindMax + 1]bool } func newInvalidatingIter(iter internalIterator) *invalidatingIter { - return &invalidatingIter{iter: base.WrapIterWithStats(iter)} + return &invalidatingIter{iter: iter} } func (i *invalidatingIter) ignoreKind(kind base.InternalKeyKind) { @@ -317,14 +317,6 @@ func (i *invalidatingIter) String() string { return i.iter.String() } -func (i *invalidatingIter) Stats() base.InternalIteratorStats { - return i.iter.Stats() -} - -func (i *invalidatingIter) ResetStats() { - i.iter.ResetStats() -} - // testIterator tests creating a combined iterator from a number of sub- // iterators. newFunc is a constructor function. splitFunc returns a random // split of the testKeyValuePairs slice such that walking a combined iterator @@ -485,19 +477,6 @@ func TestIterator(t *testing.T) { var vals [][]byte newIter := func(seqNum uint64, opts IterOptions) *Iterator { - cmp := testkeys.Comparer.Compare - split := testkeys.Comparer.Split - // NB: Use a mergingIter to filter entries newer than seqNum. - iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{ - lower: opts.GetLowerBound(), - upper: opts.GetUpperBound(), - keys: keys, - vals: vals, - }) - iter.snapshot = seqNum - iter.elideRangeTombstones = true - // NB: This Iterator cannot be cloned since it is not constructed - // with a readState. It suffices for this test. if merge == nil { merge = DefaultMerger.Merge } @@ -507,12 +486,24 @@ func TestIterator(t *testing.T) { } return merge(key, value) } - return &Iterator{ + it := &Iterator{ opts: opts, comparer: *testkeys.Comparer, merge: wrappedMerge, - iter: newInvalidatingIter(iter), } + // NB: Use a mergingIter to filter entries newer than seqNum. + iter := newMergingIter(nil /* logger */, &it.stats.InternalStats, it.cmp, it.split, &fakeIter{ + lower: opts.GetLowerBound(), + upper: opts.GetUpperBound(), + keys: keys, + vals: vals, + }) + iter.snapshot = seqNum + iter.elideRangeTombstones = true + // NB: This Iterator cannot be cloned since it is not constructed + // with a readState. It suffices for this test. + it.iter = newInvalidatingIter(iter) + return it } datadriven.RunTest(t, "testdata/iterator", func(d *datadriven.TestData) string { @@ -1188,7 +1179,7 @@ func TestIteratorSeekOptErrors(t *testing.T) { opts: opts, comparer: *testkeys.Comparer, merge: DefaultMerger.Merge, - iter: base.WrapIterWithStats(&errorIter), + iter: &errorIter, } } @@ -1913,7 +1904,7 @@ func BenchmarkIteratorSeekGE(b *testing.B) { m, keys := buildMemTable(b) iter := &Iterator{ comparer: *DefaultComparer, - iter: base.WrapIterWithStats(m.newIter(nil)), + iter: m.newIter(nil), } rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) @@ -1928,7 +1919,7 @@ func BenchmarkIteratorNext(b *testing.B) { m, _ := buildMemTable(b) iter := &Iterator{ comparer: *DefaultComparer, - iter: base.WrapIterWithStats(m.newIter(nil)), + iter: m.newIter(nil), } b.ResetTimer() @@ -1944,7 +1935,7 @@ func BenchmarkIteratorPrev(b *testing.B) { m, _ := buildMemTable(b) iter := &Iterator{ comparer: *DefaultComparer, - iter: base.WrapIterWithStats(m.newIter(nil)), + iter: m.newIter(nil), } b.ResetTimer() diff --git a/level_iter.go b/level_iter.go index 7083bc705f..24f3ef8ffe 100644 --- a/level_iter.go +++ b/level_iter.go @@ -26,6 +26,7 @@ type tableNewIters func( type internalIterOpts struct { bytesIterated *uint64 + stats *base.InternalIteratorStats boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter } @@ -89,7 +90,7 @@ type levelIter struct { // - err != nil // - some other constraint, like the bounds in opts, caused the file at index to not // be relevant to the iteration. - iter internalIteratorWithStats + iter internalIterator iterFile *fileMetadata // filteredIter is an optional interface that may be implemented by internal // iterators that perform filtering of keys. When a new file's iterator is @@ -115,8 +116,6 @@ type levelIter struct { rangeDelIterCopy keyspan.FragmentIterator files manifest.LevelIterator err error - // stats accumulates the stats of iters that have been closed. - stats InternalIteratorStats // Pointer into this level's entry in `mergingIterLevel::levelIterBoundaryContext`. // We populate it with the corresponding bounds for the currently opened file. It is used for @@ -242,7 +241,6 @@ func (l *levelIter) init( l.newIters = newIters l.files = files l.internalOpts = internalOpts - l.stats = InternalIteratorStats{} } func (l *levelIter) initRangeDel(rangeDelIter *keyspan.FragmentIterator) { @@ -581,7 +579,7 @@ func (l *levelIter) loadFile(file *fileMetadata, dir int) loadFileReturnIndicato var rangeDelIter keyspan.FragmentIterator var iter internalIterator iter, rangeDelIter, l.err = l.newIters(l.files.Current(), &l.tableOpts, l.internalOpts) - l.iter = base.WrapIterWithStats(iter) + l.iter = iter if l.err != nil { return noFileLoaded } @@ -1036,7 +1034,6 @@ func (l *levelIter) Error() error { func (l *levelIter) Close() error { if l.iter != nil { - l.stats.Merge(l.iter.Stats()) l.err = l.iter.Close() l.iter = nil } @@ -1077,21 +1074,4 @@ func (l *levelIter) String() string { return fmt.Sprintf("%s: fileNum=", l.level) } -var _ internalIteratorWithStats = &levelIter{} - -// Stats implements InternalIteratorWithStats. -func (l *levelIter) Stats() base.InternalIteratorStats { - stats := l.stats - if l.iter != nil { - stats.Merge(l.iter.Stats()) - } - return stats -} - -// ResetStats implements InternalIteratorWithStats. -func (l *levelIter) ResetStats() { - l.stats = base.InternalIteratorStats{} - if l.iter != nil { - l.iter.ResetStats() - } -} +var _ internalIterator = &levelIter{} diff --git a/level_iter_test.go b/level_iter_test.go index 4fce2c5286..f645840e4c 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -156,10 +156,10 @@ func newLevelIterTest() *levelIterTest { } func (lt *levelIterTest) newIters( - file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts, + file *manifest.FileMetadata, opts *IterOptions, iio internalIterOpts, ) (internalIterator, keyspan.FragmentIterator, error) { lt.itersCreated++ - iter, err := lt.readers[file.FileNum].NewIter(opts.LowerBound, opts.UpperBound) + iter, err := lt.readers[file.FileNum].NewIterWithBlockPropertyFilters(opts.LowerBound, opts.UpperBound, nil, true, iio.stats) if err != nil { return nil, nil, err } @@ -410,15 +410,15 @@ func TestLevelIterSeek(t *testing.T) { return lt.runBuild(d) case "iter": + var stats base.InternalIteratorStats slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas) - iter := &levelIterTestIter{ - levelIter: newLevelIter(IterOptions{}, DefaultComparer.Compare, - func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), - manifest.Level(level), nil), - } + iter := &levelIterTestIter{levelIter: &levelIter{}} + iter.init(IterOptions{}, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), + manifest.Level(level), internalIterOpts{stats: &stats}) defer iter.Close() iter.initRangeDel(&iter.rangeDelIter) - return runInternalIterCmd(d, iter, iterCmdVerboseKey) + return runInternalIterCmd(d, iter, iterCmdVerboseKey, iterCmdStats(&stats)) case "iters-created": return fmt.Sprintf("%d", lt.itersCreated) diff --git a/merging_iter.go b/merging_iter.go index 8de97797a7..fecb71ca6d 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -16,7 +16,7 @@ import ( ) type mergingIterLevel struct { - iter internalIteratorWithStats + iter internalIterator // rangeDelIter is set to the range-deletion iterator for the level. When // configured with a levelIter, this pointer changes as sstable boundaries // are crossed. See levelIter.initRangeDel and the Range Deletions comment @@ -240,7 +240,7 @@ type mergingIter struct { prefix []byte lower []byte upper []byte - stats InternalIteratorStats + stats *InternalIteratorStats combinedIterState *combinedIterState @@ -264,19 +264,27 @@ var _ base.InternalIterator = (*mergingIter)(nil) // // None of the iters may be nil. func newMergingIter( - logger Logger, cmp Compare, split Split, iters ...internalIterator, + logger Logger, + stats *base.InternalIteratorStats, + cmp Compare, + split Split, + iters ...internalIterator, ) *mergingIter { m := &mergingIter{} levels := make([]mergingIterLevel, len(iters)) for i := range levels { - levels[i].iter = base.WrapIterWithStats(iters[i]) + levels[i].iter = iters[i] } - m.init(&IterOptions{logger: logger}, cmp, split, levels...) + m.init(&IterOptions{logger: logger}, stats, cmp, split, levels...) return m } func (m *mergingIter) init( - opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel, + opts *IterOptions, + stats *base.InternalIteratorStats, + cmp Compare, + split Split, + levels ...mergingIterLevel, ) { m.err = nil // clear cached iteration error m.logger = opts.getLogger() @@ -288,6 +296,7 @@ func (m *mergingIter) init( m.levels = levels m.heap.cmp = cmp m.split = split + m.stats = stats if cap(m.heap.items) < len(levels) { m.heap.items = make([]mergingIterItem, 0, len(levels)) } else { @@ -1185,21 +1194,4 @@ func (m *mergingIter) addItemStats(item *mergingIterItem) { m.stats.ValueBytes += uint64(len(item.value)) } -var _ internalIteratorWithStats = &mergingIter{} - -// Stats implements InternalIteratorWithStats. -func (m *mergingIter) Stats() base.InternalIteratorStats { - stats := m.stats - for i := range m.levels { - stats.Merge(m.levels[i].iter.Stats()) - } - return stats -} - -// ResetStats implements InternalIteratorWithStats. -func (m *mergingIter) ResetStats() { - m.stats = InternalIteratorStats{} - for i := range m.levels { - m.levels[i].iter.ResetStats() - } -} +var _ internalIterator = &mergingIter{} diff --git a/merging_iter_test.go b/merging_iter_test.go index 82667ae8e7..277bb2057b 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -23,8 +23,9 @@ import ( ) func TestMergingIter(t *testing.T) { + var stats base.InternalIteratorStats newFunc := func(iters ...internalIterator) internalIterator { - return newMergingIter(nil /* logger */, DefaultComparer.Compare, + return newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) } testIterator(t, newFunc, func(r *rand.Rand) [][]string { @@ -60,7 +61,8 @@ func TestMergingIterSeek(t *testing.T) { iters = append(iters, f) } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + var stats base.InternalIteratorStats + iter := newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -118,7 +120,8 @@ func TestMergingIterNextPrev(t *testing.T) { } } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + var stats base.InternalIteratorStats + iter := newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -148,13 +151,13 @@ func TestMergingIterCornerCases(t *testing.T) { var fileNum base.FileNum newIters := - func(file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts) (internalIterator, keyspan.FragmentIterator, error) { + func(file *manifest.FileMetadata, opts *IterOptions, iio internalIterOpts) (internalIterator, keyspan.FragmentIterator, error) { r := readers[file.FileNum] rangeDelIter, err := r.NewRawRangeDelIter() if err != nil { return nil, nil, err } - iter, err := r.NewIter(opts.GetLowerBound(), opts.GetUpperBound()) + iter, err := r.NewIterWithBlockPropertyFilters(opts.GetLowerBound(), opts.GetUpperBound(), nil, true /* useFilterBlock */, iio.stats) if err != nil { return nil, nil, err } @@ -240,6 +243,7 @@ func TestMergingIterCornerCases(t *testing.T) { return v.String() case "iter": levelIters := make([]mergingIterLevel, 0, len(v.Levels)) + var stats base.InternalIteratorStats for i, l := range v.Levels { slice := l.Slice() if slice.Empty() { @@ -247,16 +251,16 @@ func TestMergingIterCornerCases(t *testing.T) { } li := &levelIter{} li.init(IterOptions{}, cmp, func(a []byte) int { return len(a) }, newIters, - slice.Iter(), manifest.Level(i), internalIterOpts{}) + slice.Iter(), manifest.Level(i), internalIterOpts{stats: &stats}) i := len(levelIters) levelIters = append(levelIters, mergingIterLevel{iter: li}) li.initRangeDel(&levelIters[i].rangeDelIter) li.initBoundaryContext(&levelIters[i].levelIterBoundaryContext) } miter := &mergingIter{} - miter.init(nil /* opts */, cmp, func(a []byte) int { return len(a) }, levelIters...) + miter.init(nil /* opts */, &stats, cmp, func(a []byte) int { return len(a) }, levelIters...) defer miter.Close() - return runInternalIterCmd(d, miter, iterCmdVerboseKey) + return runInternalIterCmd(d, miter, iterCmdVerboseKey, iterCmdStats(&stats)) default: return fmt.Sprintf("unknown command: %s", d.Cmd) } @@ -349,7 +353,8 @@ func BenchmarkMergingIterSeekGE(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + var stats base.InternalIteratorStats + m := newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) @@ -381,7 +386,8 @@ func BenchmarkMergingIterNext(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + var stats base.InternalIteratorStats + m := newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) b.ResetTimer() @@ -416,7 +422,8 @@ func BenchmarkMergingIterPrev(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + var stats base.InternalIteratorStats + m := newMergingIter(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, iters...) b.ResetTimer() @@ -586,8 +593,9 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS l.initBoundaryContext(&mils[level].levelIterBoundaryContext) mils[level].iter = l } + var stats base.InternalIteratorStats m := &mergingIter{} - m.init(nil /* logger */, DefaultComparer.Compare, + m.init(nil /* logger */, &stats, DefaultComparer.Compare, func(a []byte) int { return len(a) }, mils...) return m } diff --git a/range_keys.go b/range_keys.go index 17c3b1da2f..89b5f43f75 100644 --- a/range_keys.go +++ b/range_keys.go @@ -391,7 +391,7 @@ type lazyCombinedIter struct { // iterator. It's used to mutate the internalIterator in use when switching // to combined iteration. parent *Iterator - pointIter internalIteratorWithStats + pointIter internalIterator combinedIterState combinedIterState } @@ -663,11 +663,3 @@ func (i *lazyCombinedIter) String() string { } return i.pointIter.String() } - -func (i *lazyCombinedIter) Stats() InternalIteratorStats { - return i.pointIter.Stats() -} - -func (i *lazyCombinedIter) ResetStats() { - i.pointIter.ResetStats() -} diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 5c73083cb6..670e6294db 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -872,6 +872,7 @@ func TestBlockProperties(t *testing.T) { } }() + var stats base.InternalIteratorStats datadriven.RunTest(t, "testdata/block_properties", func(td *datadriven.TestData) string { switch td.Cmd { case "build": @@ -1006,7 +1007,7 @@ func TestBlockProperties(t *testing.T) { } else if !ok { return "filter excludes entire table" } - iter, err := r.NewIterWithBlockPropertyFilters(lower, upper, filterer, false /* use (bloom) filter */) + iter, err := r.NewIterWithBlockPropertyFilters(lower, upper, filterer, false /* use (bloom) filter */, &stats) if err != nil { return err.Error() } @@ -1029,6 +1030,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) { } }() + var stats base.InternalIteratorStats datadriven.RunTest(t, "testdata/block_properties_boundlimited", func(td *datadriven.TestData) string { switch td.Cmd { case "build": @@ -1083,7 +1085,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) { } else if !ok { return "filter excludes entire table" } - iter, err := r.NewIterWithBlockPropertyFilters(lower, upper, filterer, false /* use (bloom) filter */) + iter, err := r.NewIterWithBlockPropertyFilters(lower, upper, filterer, false /* use (bloom) filter */, &stats) if err != nil { return err.Error() } diff --git a/sstable/data_test.go b/sstable/data_test.go index a16615a7df..bc81d07fc3 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -249,6 +249,7 @@ type runIterCmdOption func(*runIterCmdOptions) type runIterCmdOptions struct { everyOp func(io.Writer) everyOpAfter func(io.Writer) + stats *base.InternalIteratorStats } func runIterCmdEveryOp(everyOp func(io.Writer)) runIterCmdOption { @@ -259,6 +260,10 @@ func runIterCmdEveryOpAfter(everyOp func(io.Writer)) runIterCmdOption { return func(opts *runIterCmdOptions) { opts.everyOpAfter = everyOp } } +func runIterCmdStats(stats *base.InternalIteratorStats) runIterCmdOption { + return func(opts *runIterCmdOptions) { opts.stats = stats } +} + func runIterCmd(td *datadriven.TestData, origIter Iterator, opt ...runIterCmdOption) string { var opts runIterCmdOptions for _, o := range opt { @@ -347,10 +352,10 @@ func runIterCmd(td *datadriven.TestData, origIter Iterator, opt ...runIterCmdOpt } iter.SetBounds(lower, upper) case "stats": - fmt.Fprintf(&b, "%+v\n", iter.Stats()) + fmt.Fprintf(&b, "%+v\n", *opts.stats) continue case "reset-stats": - iter.ResetStats() + *opts.stats = base.InternalIteratorStats{} continue } if opts.everyOp != nil { diff --git a/sstable/reader.go b/sstable/reader.go index 4ce57a19b2..a2a5a7bec1 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -127,7 +127,7 @@ type singleLevelIterator struct { dataBH BlockHandle err error closeHook func(i Iterator) error - stats base.InternalIteratorStats + stats *base.InternalIteratorStats // boundsCmp and positionedUsingLatestBounds are for optimizing iteration // that uses multiple adjacent bounds. The seek after setting a new bound @@ -295,7 +295,11 @@ func checkRangeKeyFragmentBlockIterator(obj interface{}) { // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. func (i *singleLevelIterator) init( - r *Reader, lower, upper []byte, filterer *BlockPropertiesFilterer, useFilter bool, + r *Reader, + lower, upper []byte, + filterer *BlockPropertiesFilterer, + useFilter bool, + stats *base.InternalIteratorStats, ) error { if r.err != nil { return r.err @@ -311,6 +315,7 @@ func (i *singleLevelIterator) init( i.useFilter = useFilter i.reader = r i.cmp = r.Compare + i.stats = stats err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum) if err != nil { // blockIter.Close releases indexH and always returns a nil error @@ -505,7 +510,7 @@ func (i *singleLevelIterator) readBlockWithStats( bh BlockHandle, raState *readaheadState, ) (cache.Handle, error) { block, cacheHit, err := i.reader.readBlock(bh, nil /* transform */, raState) - if err == nil { + if err == nil && i.stats != nil { n := bh.Length i.stats.BlockBytes += n if cacheHit { @@ -1233,18 +1238,8 @@ func (i *singleLevelIterator) SetBounds(lower, upper []byte) { i.blockUpper = nil } -var _ base.InternalIteratorWithStats = &singleLevelIterator{} -var _ base.InternalIteratorWithStats = &twoLevelIterator{} - -// Stats implements InternalIteratorWithStats. -func (i *singleLevelIterator) Stats() base.InternalIteratorStats { - return i.stats -} - -// ResetStats implements InternalIteratorWithStats. -func (i *singleLevelIterator) ResetStats() { - i.stats = base.InternalIteratorStats{} -} +var _ base.InternalIterator = &singleLevelIterator{} +var _ base.InternalIterator = &twoLevelIterator{} // compactionIterator is similar to Iterator but it increments the number of // bytes that have been iterated through. @@ -1454,7 +1449,11 @@ func (i *twoLevelIterator) resolveMaybeExcluded(dir int8) intersectsResult { } func (i *twoLevelIterator) init( - r *Reader, lower, upper []byte, filterer *BlockPropertiesFilterer, useFilter bool, + r *Reader, + lower, upper []byte, + filterer *BlockPropertiesFilterer, + useFilter bool, + stats *base.InternalIteratorStats, ) error { if r.err != nil { return r.err @@ -1470,6 +1469,7 @@ func (i *twoLevelIterator) init( i.useFilter = useFilter i.reader = r i.cmp = r.Compare + i.stats = stats err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error @@ -2420,14 +2420,17 @@ func (r *Reader) Close() error { // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after // itself and returns a nil iterator. func (r *Reader) NewIterWithBlockPropertyFilters( - lower, upper []byte, filterer *BlockPropertiesFilterer, useFilterBlock bool, + lower, upper []byte, + filterer *BlockPropertiesFilterer, + useFilterBlock bool, + stats *base.InternalIteratorStats, ) (Iterator, error) { // NB: pebble.tableCache wraps the returned iterator with one which performs // reference counting on the Reader, preventing the Reader from being closed // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, lower, upper, filterer, useFilterBlock) + err := i.init(r, lower, upper, filterer, useFilterBlock, stats) if err != nil { return nil, err } @@ -2435,7 +2438,7 @@ func (r *Reader) NewIterWithBlockPropertyFilters( } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, lower, upper, filterer, useFilterBlock) + err := i.init(r, lower, upper, filterer, useFilterBlock, stats) if err != nil { return nil, err } @@ -2445,7 +2448,7 @@ func (r *Reader) NewIterWithBlockPropertyFilters( // NewIter returns an iterator for the contents of the table. If an error // occurs, NewIter cleans up after itself and returns a nil iterator. func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { - return r.NewIterWithBlockPropertyFilters(lower, upper, nil, true /* useFilterBlock */) + return r.NewIterWithBlockPropertyFilters(lower, upper, nil, true /* useFilterBlock */, nil /* stats */) } // NewCompactionIter returns an iterator similar to NewIter but it also increments @@ -2454,7 +2457,7 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */, nil, false /* useFilter */) + err := i.init(r, nil /* lower */, nil /* upper */, nil, false /* useFilter */, nil /* stats */) if err != nil { return nil, err } @@ -2465,7 +2468,7 @@ func (r *Reader) NewCompactionIter(bytesIterated *uint64) (Iterator, error) { }, nil } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(r, nil /* lower */, nil /* upper */, nil, false /* useFilter */) + err := i.init(r, nil /* lower */, nil /* upper */, nil, false /* useFilter */, nil /* stats */) if err != nil { return nil, err } diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 4d721f53db..8dc76d652b 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -154,21 +154,6 @@ func (i *iterAdapter) SetBounds(lower, upper []byte) { i.key = nil } -func (i *iterAdapter) Stats() base.InternalIteratorStats { - ii, ok := i.Iterator.(base.InternalIteratorWithStats) - if ok { - return ii.Stats() - } - return base.InternalIteratorStats{} -} - -func (i *iterAdapter) ResetStats() { - ii, ok := i.Iterator.(base.InternalIteratorWithStats) - if ok { - ii.ResetStats() - } -} - func TestReader(t *testing.T) { writerOpts := map[string]WriterOptions{ // No bloom filters. @@ -363,12 +348,19 @@ func runTestReader(t *testing.T, o WriterOptions, dir string, r *Reader, cacheSi if err != nil { return err.Error() } + var stats base.InternalIteratorStats r.Properties.GlobalSeqNum = seqNum - iter, err := r.NewIter(nil /* lower */, nil /* upper */) + iter, err := r.NewIterWithBlockPropertyFilters( + nil, /* lower */ + nil, /* upper */ + nil, /* filterer */ + true, /* use filter block */ + &stats, + ) if err != nil { return err.Error() } - return runIterCmd(d, iter) + return runIterCmd(d, iter, runIterCmdStats(&stats)) case "get": var b bytes.Buffer diff --git a/table_cache.go b/table_cache.go index 93872f12f8..495f59b980 100644 --- a/table_cache.go +++ b/table_cache.go @@ -414,7 +414,7 @@ func (c *tableCacheShard) newIters( iter, err = v.reader.NewCompactionIter(internalOpts.bytesIterated) } else { iter, err = v.reader.NewIterWithBlockPropertyFilters( - opts.GetLowerBound(), opts.GetUpperBound(), filterer, useFilter) + opts.GetLowerBound(), opts.GetUpperBound(), filterer, useFilter, internalOpts.stats) } if err != nil { if rangeDelIter != nil {