From e7a2ba8ade7724797c1bc33337cd5b422c141f58 Mon Sep 17 00:00:00 2001 From: Rahul Aggarwal Date: Thu, 6 Jul 2023 15:33:46 -0400 Subject: [PATCH] pebble: Collect Pebble Key Statistics Created a new function `ScanStatistics` that returns counts of the different key kinds in Pebble (by level) as well as the number of snapshot keys. Also modified `ScanInternal` to surface the level of each key within each visitor function. Informs: #1996 --- db.go | 87 ++++++++++++++-- ingest_test.go | 51 +++++----- options.go | 10 ++ scan_internal.go | 109 +++++++++++++------- scan_internal_test.go | 209 ++++++++++++++++++++++++++++++++++++--- snapshot.go | 18 +++- testdata/scan_statistics | 134 +++++++++++++++++++++++++ 7 files changed, 535 insertions(+), 83 deletions(-) create mode 100644 testdata/scan_statistics diff --git a/db.go b/db.go index d95f4e68f7..8c4e35613f 100644 --- a/db.go +++ b/db.go @@ -1173,21 +1173,28 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator { func (d *DB) ScanInternal( ctx context.Context, lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, ) error { - iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{ + scanInternalOpts := &scanInternalOptions{ + visitPointKey: visitPointKey, + visitRangeDel: visitRangeDel, + visitRangeKey: visitRangeKey, + visitSharedFile: visitSharedFile, + skipSharedLevels: visitSharedFile != nil, + includeObsoleteKeys: includeObsoleteKeys, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, UpperBound: upper, }, - skipSharedLevels: visitSharedFile != nil, - }) + } + iter := d.newInternalIter(nil /* snapshot */, scanInternalOpts) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts) } // newInternalIter constructs and returns a new scanInternalIterator on this db. @@ -1226,6 +1233,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI newIters: d.newIters, newIterRangeKey: d.tableNewRangeKeyIter, seqNum: seqNum, + mergingIter: &buf.merging, } if o != nil { dbi.opts = *o @@ -1988,7 +1996,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) { if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) { continue } - destTables[j] = SSTableInfo{TableInfo: m.TableInfo()} if opt.withProperties { p, err := d.tableCache.getTableProperties( @@ -2573,6 +2580,74 @@ func (d *DB) SetCreatorID(creatorID uint64) error { return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID)) } +// KeyStatistics keeps track of the number of keys that have been pinned by a +// snapshot as well as counts of the different key kinds in the lsm. +type KeyStatistics struct { + // snapshotPinnedKeys represents the number of duplicate keys per sstable. + // This occurs when a compaction tries to compact ta key but can't due to it being pinned by an open snapshot. + snapshotPinnedKeys int + // the total number of bytes of all snapshot pinned keys. + snapshotPinnedKeysBytes uint64 + // Note: these fields are currently only populated for point keys. + kindsCount [InternalKeyKindMax + 1]int + bytesPerKind [InternalKeyKindMax + 1]uint64 +} + +// LSMKeyStatistics is used by DB.ScanStatistics. +type LSMKeyStatistics struct { + accumulated KeyStatistics + levels [numLevels]KeyStatistics +} + +// ScanStatistics returns the count of different key kinds within the lsm for a +// key span [lower, upper) as well as the number of snapshot keys. +func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) { + stats := LSMKeyStatistics{} + var prevKey InternalKey + + err := d.ScanInternal(ctx, lower, upper, + func(key *InternalKey, value LazyValue, iterInfo iterInfo) error { + // iterInfo.level == -1 indicates that the key does not come from a valid level. + if iterInfo.level == -1 { + return nil + } + // If the previous key is equal to the current point key, the current key was + // pinned by a snapshot. + size := uint64(key.Size()) + kind := key.Kind() + if d.cmp(prevKey.UserKey, key.UserKey) == 0 { + stats.levels[iterInfo.level].snapshotPinnedKeys++ + stats.levels[iterInfo.level].snapshotPinnedKeysBytes += size + stats.accumulated.snapshotPinnedKeys++ + stats.accumulated.snapshotPinnedKeysBytes += size + } + stats.levels[iterInfo.level].kindsCount[kind]++ + stats.levels[iterInfo.level].bytesPerKind[kind] += size + stats.accumulated.kindsCount[kind]++ + prevKey.CopyFrom(*key) + return nil + }, + func(start, end []byte, seqNum uint64) error { + stats.accumulated.kindsCount[InternalKeyKindRangeDelete]++ + return nil + }, + func(start, end []byte, keys []rangekey.Key) error { + for _, key := range keys { + stats.accumulated.kindsCount[key.Kind()]++ + } + return nil + }, + nil, + true, + ) + + if err != nil { + return LSMKeyStatistics{}, err + } + + return stats, nil +} + // ObjProvider returns the objstorage.Provider for this database. Meant to be // used for internal purposes only. func (d *DB) ObjProvider() objstorage.Provider { diff --git a/ingest_test.go b/ingest_test.go index 00b2aae0f8..effacc060a 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -905,29 +905,34 @@ func TestIngestShared(t *testing.T) { w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) var sharedSSTs []SharedSSTMeta - err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error { - val, _, err := value.Value(nil) - require.NoError(t, err) - require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) - return nil - }, func(start, end []byte, seqNum uint64) error { - require.NoError(t, w.DeleteRange(start, end)) - return nil - }, func(start, end []byte, keys []keyspan.Key) error { - s := keyspan.Span{ - Start: start, - End: end, - Keys: keys, - KeysOrder: 0, - } - require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { - return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) - })) - return nil - }, func(sst *SharedSSTMeta) error { - sharedSSTs = append(sharedSSTs, *sst) - return nil - }) + err = from.ScanInternal(context.TODO(), startKey, endKey, + func(key *InternalKey, value LazyValue, _ iterInfo) error { + val, _, err := value.Value(nil) + require.NoError(t, err) + require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) + return nil + }, + func(start, end []byte, seqNum uint64) error { + require.NoError(t, w.DeleteRange(start, end)) + return nil + }, + func(start, end []byte, keys []keyspan.Key) error { + s := keyspan.Span{ + Start: start, + End: end, + Keys: keys, + KeysOrder: 0, + } + require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { + return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) + })) + return nil + }, + func(sst *SharedSSTMeta) error { + sharedSSTs = append(sharedSSTs, *sst) + return nil + }, + false) require.NoError(t, err) require.NoError(t, w.Close()) diff --git a/options.go b/options.go index ea89f93911..805d2c469b 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/remote" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" ) @@ -246,9 +247,18 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti type scanInternalOptions struct { IterOptions + visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error + visitRangeDel func(start, end []byte, seqNum uint64) error + visitRangeKey func(start, end []byte, keys []rangekey.Key) error + visitSharedFile func(sst *SharedSSTMeta) error + // skipSharedLevels skips levels that are shareable (level >= // sharedLevelStart). skipSharedLevels bool + + // includeObsoleteKeys specifies whether keys shadowed by newer internal keys + // are exposed. If false, only one internal key per user key is exposed. + includeObsoleteKeys bool } // RangeKeyMasking configures automatic hiding of point keys by range keys. A diff --git a/scan_internal.go b/scan_internal.go index 747a981fbf..9238ecb754 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage" - "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" ) @@ -185,6 +184,10 @@ type pointCollapsingIterator struct { fixedSeqNum uint64 } +func (p *pointCollapsingIterator) Span() *keyspan.Span { + return p.iter.Span() +} + // SeekPrefixGE implements the InternalIterator interface. func (p *pointCollapsingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, @@ -671,6 +674,13 @@ func (p *pointCollapsingIterator) String() string { var _ internalIterator = &pointCollapsingIterator{} +// This is used with scanInternalIterator to surface additional iterator-specific info where possible. +// Note: this is struct is only provided for point keys. +type iterInfo struct { + // level may be inaccurate if scanInternalOptions.includeObsoleteKeys is False. + level int +} + // scanInternalIterator is an iterator that returns all internal keys, including // tombstones. For instance, an InternalKeyKindDelete would be returned as an // InternalKeyKindDelete instead of the iterator skipping over to the next key. @@ -690,13 +700,15 @@ type scanInternalIterator struct { iter internalIterator readState *readState rangeKey *iteratorRangeKeyState - pointKeyIter pointCollapsingIterator + pointKeyIter internalIterator iterKey *InternalKey iterValue LazyValue alloc *iterAlloc newIters tableNewIters newIterRangeKey keyspan.TableNewSpanIter seqNum uint64 + iterInfo []iterInfo + mergingIter *mergingIter // boundsBuf holds two buffers used to store the lower and upper bounds. // Whenever the InternalIterator's bounds change, the new bounds are copied @@ -877,15 +889,9 @@ func (d *DB) truncateSharedFile( } func scanInternalImpl( - ctx context.Context, - lower, upper []byte, - iter *scanInternalIterator, - visitPointKey func(key *InternalKey, value LazyValue) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, - visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error, + ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions, ) error { - if visitSharedFile != nil && (lower == nil || upper == nil) { + if opts.visitSharedFile != nil && (lower == nil || upper == nil) { panic("lower and upper bounds must be specified in skip-shared iteration mode") } // Before starting iteration, check if any files in levels sharedLevelsStart @@ -897,7 +903,7 @@ func scanInternalImpl( db := iter.readState.db provider := db.objProvider seqNum := iter.seqNum - if visitSharedFile != nil { + if opts.visitSharedFile != nil { if provider == nil { panic("expected non-nil Provider in skip-shared iteration mode") } @@ -925,7 +931,7 @@ func scanInternalImpl( if skip { continue } - if err = visitSharedFile(sst); err != nil { + if err = opts.visitSharedFile(sst); err != nil { return err } } @@ -934,22 +940,37 @@ func scanInternalImpl( for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() { key := iter.unsafeKey() - switch key.Kind() { case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet: - span := iter.unsafeSpan() - if err := visitRangeKey(span.Start, span.End, span.Keys); err != nil { - return err + if opts.visitRangeKey != nil { + span := iter.unsafeSpan() + if err := opts.visitRangeKey(span.Start, span.End, span.Keys); err != nil { + return err + } } case InternalKeyKindRangeDelete: - rangeDel := iter.unsafeRangeDel() - if err := visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { - return err + if opts.visitRangeDel != nil { + rangeDel := iter.unsafeRangeDel() + if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { + return err + } } default: - val := iter.lazyValue() - if err := visitPointKey(key, val); err != nil { - return err + if opts.visitPointKey != nil { + var info iterInfo + if len(iter.mergingIter.heap.items) > 0 { + mergingIterIdx := iter.mergingIter.heap.items[0].index + info = iter.iterInfo[mergingIterIdx] + } else { + // Point key does not have a valid level (mergingIter heap is empty). + info = iterInfo{ + level: -1, + } + } + val := iter.lazyValue() + if err := opts.visitPointKey(key, val, info); err != nil { + return err + } } } } @@ -970,8 +991,10 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * numLevelIters := 0 current := i.readState.current + numMergingLevels += len(current.L0SublevelFiles) numLevelIters += len(current.L0SublevelFiles) + for level := 1; level < len(current.Levels); level++ { if current.Levels[level].Empty() { continue @@ -994,19 +1017,24 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels) rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters) + i.iterInfo = make([]iterInfo, numMergingLevels) + mlevelsIndex := 0 + // Next are the memtables. for j := len(memtables) - 1; j >= 0; j-- { mem := memtables[j] mlevels = append(mlevels, mergingIterLevel{ iter: mem.newIter(&i.opts.IterOptions), }) + // Level -1 represents an invalid level such as an iterator from a memtable. + i.iterInfo[mlevelsIndex] = iterInfo{level: -1} + mlevelsIndex++ if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil { rangeDelIters = append(rangeDelIters, rdi) } } // Next are the file levels: L0 sub-levels followed by lower levels. - mlevelsIndex := len(mlevels) levelsIndex := len(levels) mlevels = mlevels[:numMergingLevels] levels = levels[:numLevelIters] @@ -1030,12 +1058,10 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * mlevelsIndex++ } - // Add level iterators for the L0 sublevels, iterating from newest to - // oldest. - for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- { - addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i)) + for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- { + i.iterInfo[mlevelsIndex] = iterInfo{level: 0} + addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j)) } - // Add level iterators for the non-empty non-L0 levels. for level := 1; level < numLevels; level++ { if current.Levels[level].Empty() { @@ -1044,18 +1070,28 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * if i.opts.skipSharedLevels && level >= sharedLevelsStart { continue } + i.iterInfo[mlevelsIndex] = iterInfo{level: level} addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } + buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...) - i.pointKeyIter = pointCollapsingIterator{ - comparer: i.comparer, - merge: i.merge, - seqNum: i.seqNum, + + if i.opts.includeObsoleteKeys { + iiter := &keyspan.InterleavingIter{} + iiter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = iiter + } else { + pcIter := &pointCollapsingIterator{ + comparer: i.comparer, + merge: i.merge, + seqNum: i.seqNum, + } + pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = pcIter } - i.pointKeyIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) - i.iter = &i.pointKeyIter + i.iter = i.pointKeyIter } // constructRangeKeyIter constructs the range-key iterator stack, populating @@ -1145,7 +1181,10 @@ func (i *scanInternalIterator) lazyValue() LazyValue { // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns // a non-rangedel kind. func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span { - return i.pointKeyIter.iter.Span() + type spanInternalIterator interface { + Span() *keyspan.Span + } + return i.pointKeyIter.(spanInternalIterator).Span() } // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns diff --git a/scan_internal_test.go b/scan_internal_test.go index 72995251ed..c29485cbdd 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -25,15 +25,190 @@ import ( "github.com/stretchr/testify/require" ) +func TestScanStatistics(t *testing.T) { + var d *DB + type scanInternalReader interface { + ScanStatistics( + ctx context.Context, + lower, upper []byte, + ) (LSMKeyStatistics, error) + } + batches := map[string]*Batch{} + snaps := map[string]*Snapshot{} + ctx := context.TODO() + + getOpts := func() *Options { + opts := &Options{ + FS: vfs.NewMem(), + Logger: testLogger{t: t}, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + BlockPropertyCollectors: []func() BlockPropertyCollector{ + sstable.NewTestKeysBlockPropertyCollector, + }, + } + opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": remote.NewInMem(), + }) + opts.Experimental.CreateOnShared = true + opts.Experimental.CreateOnSharedLocator = "" + opts.DisableAutomaticCompactions = true + opts.EnsureDefaults() + opts.WithFSDefaults() + return opts + } + cleanup := func() (err error) { + for key, batch := range batches { + err = firstError(err, batch.Close()) + delete(batches, key) + } + for key, snap := range snaps { + err = firstError(err, snap.Close()) + delete(snaps, key) + } + if d != nil { + err = firstError(err, d.Close()) + d = nil + } + return err + } + defer cleanup() + + datadriven.RunTest(t, "testdata/scan_statistics", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + if err := cleanup(); err != nil { + t.Fatal(err) + return err.Error() + } + var err error + d, err = Open("", getOpts()) + require.NoError(t, err) + require.NoError(t, d.SetCreatorID(1)) + return "" + case "snapshot": + s := d.NewSnapshot() + var name string + td.ScanArgs(t, "name", &name) + snaps[name] = s + return "" + case "batch": + var name string + td.MaybeScanArgs(t, "name", &name) + commit := td.HasArg("commit") + b := d.NewIndexedBatch() + require.NoError(t, runBatchDefineCmd(td, b)) + var err error + if commit { + func() { + defer func() { + if r := recover(); r != nil { + err = errors.New(r.(string)) + } + }() + err = b.Commit(nil) + }() + } else if name != "" { + batches[name] = b + } + if err != nil { + return err.Error() + } + count := b.Count() + if commit { + return fmt.Sprintf("committed %d keys\n", count) + } + return fmt.Sprintf("wrote %d keys to batch %q\n", count, name) + case "compact": + if err := runCompactCmd(td, d); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + case "flush": + err := d.Flush() + if err != nil { + return err.Error() + } + return "" + case "commit": + name := pluckStringCmdArg(td, "batch") + b := batches[name] + defer b.Close() + count := b.Count() + require.NoError(t, d.Apply(b, nil)) + delete(batches, name) + return fmt.Sprintf("committed %d keys\n", count) + case "scan-statistics": + var lower, upper []byte + var reader scanInternalReader = d + var b strings.Builder + var showSnapshotPinned = false + var keyKindsToDisplay []InternalKeyKind + var showLevels []string + + for _, arg := range td.CmdArgs { + switch arg.Key { + case "lower": + lower = []byte(arg.Vals[0]) + case "upper": + upper = []byte(arg.Vals[0]) + case "show-snapshot-pinned": + showSnapshotPinned = true + case "keys": + for _, key := range arg.Vals { + keyKindsToDisplay = append(keyKindsToDisplay, base.ParseKind(key)) + } + case "levels": + showLevels = append(showLevels, arg.Vals...) + default: + } + } + stats, err := reader.ScanStatistics(ctx, lower, upper) + if err != nil { + return err.Error() + } + + for _, level := range showLevels { + lvl, err := strconv.Atoi(level) + if err != nil || lvl >= numLevels { + return fmt.Sprintf("invalid level %s", level) + } + + fmt.Fprintf(&b, "Level %d:\n", lvl) + if showSnapshotPinned { + fmt.Fprintf(&b, " compaction pinned count: %d\n", stats.levels[lvl].snapshotPinnedKeys) + } + for _, kind := range keyKindsToDisplay { + fmt.Fprintf(&b, " %s key count: %d\n", kind.String(), stats.levels[lvl].kindsCount[kind]) + } + } + + fmt.Fprintf(&b, "Aggregate:\n") + if showSnapshotPinned { + fmt.Fprintf(&b, " snapshot pinned count: %d\n", stats.accumulated.snapshotPinnedKeys) + } + for _, kind := range keyKindsToDisplay { + fmt.Fprintf(&b, " %s key count: %d\n", kind.String(), stats.accumulated.kindsCount[kind]) + } + return b.String() + default: + return fmt.Sprintf("unknown command %q", td.Cmd) + } + }) +} + func TestScanInternal(t *testing.T) { var d *DB type scanInternalReader interface { ScanInternal( ctx context.Context, - lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue) error, + lower, upper []byte, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error) error + visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, + ) error } batches := map[string]*Batch{} snaps := map[string]*Snapshot{} @@ -237,18 +412,24 @@ func TestScanInternal(t *testing.T) { } } } - err := reader.ScanInternal(context.TODO(), lower, upper, func(key *InternalKey, value LazyValue) error { - v := value.InPlaceValue() - fmt.Fprintf(&b, "%s (%s)\n", key, v) - return nil - }, func(start, end []byte, seqNum uint64) error { - fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) - return nil - }, func(start, end []byte, keys []rangekey.Key) error { - s := keyspan.Span{Start: start, End: end, Keys: keys} - fmt.Fprintf(&b, "%s\n", s.String()) - return nil - }, fileVisitor) + err := reader.ScanInternal(context.TODO(), lower, upper, + func(key *InternalKey, value LazyValue, _ iterInfo) error { + v := value.InPlaceValue() + fmt.Fprintf(&b, "%s (%s)\n", key, v) + return nil + }, + func(start, end []byte, seqNum uint64) error { + fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) + return nil + }, + func(start, end []byte, keys []rangekey.Key) error { + s := keyspan.Span{Start: start, End: end, Keys: keys} + fmt.Fprintf(&b, "%s\n", s.String()) + return nil + }, + fileVisitor, + false, + ) if err != nil { return err.Error() } diff --git a/snapshot.go b/snapshot.go index 6617cce5f5..af66042ea4 100644 --- a/snapshot.go +++ b/snapshot.go @@ -66,25 +66,33 @@ func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) *Iter func (s *Snapshot) ScanInternal( ctx context.Context, lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, ) error { if s.db == nil { panic(ErrClosed) } - iter := s.db.newInternalIter(s, &scanInternalOptions{ + scanInternalOpts := &scanInternalOptions{ + visitPointKey: visitPointKey, + visitRangeDel: visitRangeDel, + visitRangeKey: visitRangeKey, + visitSharedFile: visitSharedFile, + skipSharedLevels: visitSharedFile != nil, + includeObsoleteKeys: includeObsoleteKeys, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, UpperBound: upper, }, - skipSharedLevels: visitSharedFile != nil, - }) + } + + iter := s.db.newInternalIter(s, scanInternalOpts) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts) } // Close closes the snapshot, releasing its resources. Close must be called. diff --git a/testdata/scan_statistics b/testdata/scan_statistics new file mode 100644 index 0000000000..9d8e07d725 --- /dev/null +++ b/testdata/scan_statistics @@ -0,0 +1,134 @@ + +reset +---- + +batch commit +set b d +set e foo +---- +committed 2 keys + +scan-statistics lower=b upper=e keys=(SET) +---- +Aggregate: + SET key count: 0 + +flush +---- + +scan-statistics lower=b upper=e keys=(SET) levels=(0) +---- +Level 0: + SET key count: 1 +Aggregate: + SET key count: 1 + +scan-statistics lower=b upper=f keys=(SET) levels=(0) +---- +Level 0: + SET key count: 2 +Aggregate: + SET key count: 2 + +scan-statistics lower=f upper=l keys=(SET) +---- +Aggregate: + SET key count: 0 + +batch commit +del b +del e +---- +committed 2 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET, DEL) levels=(0) +---- +Level 0: + SET key count: 2 + DEL key count: 2 +Aggregate: + SET key count: 2 + DEL key count: 2 + +reset +---- + +batch commit +set b hi +---- +committed 1 keys + +flush +---- + +batch commit +set b hello +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[b#0,SET-b#0,SET] + +scan-statistics lower=b upper=f keys=(SET) levels=(6) +---- +Level 6: + SET key count: 1 +Aggregate: + SET key count: 1 + +batch commit +set c a +---- +committed 1 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET) levels=(0, 6) +---- +Level 0: + SET key count: 1 +Level 6: + SET key count: 1 +Aggregate: + SET key count: 2 + +reset +---- + +batch commit +set a b +---- +committed 1 keys + +flush +---- + +snapshot name=first +---- + +batch commit +set a c +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[a#11,SET-a#0,SET] + +scan-statistics lower=a upper=z show-snapshot-pinned +---- +Aggregate: + snapshot pinned count: 1