Skip to content

Commit

Permalink
*: add skip-shared iteration mode to ScanInternal
Browse files Browse the repository at this point in the history
This change adds a skip-shared iteration mode to ScanInternal(),
allowing the caller to pass in a Visitor function to expose files
in shareable levels (defined as L5 and below) instead of individually
visiting all the keys in those files. The file's bounds are truncated
to keys within the scan bounds before calling the visitor function.
If a file in a shareable level is not marked as shared according
to objstorage.Provider, we return a special error, allowing the caller
to restart scanning in non-skip-shared mode or so.

This change also updates ScanInternal to do point-key collapsing,
so we return only one point at most per user key.
  • Loading branch information
itsbilal committed Apr 18, 2023
1 parent d54c329 commit 101876a
Show file tree
Hide file tree
Showing 9 changed files with 1,209 additions and 357 deletions.
13 changes: 12 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,18 @@ func (d *DB) runCompaction(
ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
}
}
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, fileNum.DiskFileNum(), objstorage.CreateOptions{} /* TODO */)
// Prefer shared storage if present.
//
// TODO(bilal): This might be inefficient for short-lived files in higher
// levels if we're only writing to shared storage and not double-writing
// to local storage. Either implement double-writing functionality, or
// set PreferSharedStorage to c.outputLevel.level >= 5. The latter needs
// some careful handling around move compactions to ensure all files in
// lower levels are in shared storage.
createOpts := objstorage.CreateOptions{
PreferSharedStorage: true,
}
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, fileNum.DiskFileNum(), createOpts)
if err != nil {
return err
}
Expand Down
46 changes: 31 additions & 15 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,28 +1116,47 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// method, while the range deletion deleting that key must be exposed using
// visitRangeDel. Keys that would be masked by range key masking (if an
// appropriate prefix were set) should be exposed, alongside the range key
// that would have masked it.
// that would have masked it. This method also collapses all point keys into
// one InternalKey; so only one internal key at most per user key is returned
// to visitPointKey.
//
// If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
// mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
// their metadatas truncated to [lower, upper) and passed into visitSharedFile.
// ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
// sstable in L5 or L6 is found that is not in shared storage according to
// provider.IsShared. Examples of when this could happen could be if Pebble
// started writing sstables before a creator ID was set (as creator IDs are
// necessary to enable shared storage) resulting in some lower level SSTs being
// on non-shared storage. Skip-shared iteration is invalid in those cases.
func (d *DB) ScanInternal(
ctx context.Context,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []keyspan.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
) error {
iter := d.newInternalIter(nil /* snapshot */, &IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
},
skipSharedLevels: visitSharedFile != nil,
})
defer iter.close()
return scanInternalImpl(lower, iter, visitPointKey, visitRangeDel, visitRangeKey)
return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// NewInternalIter constructs and returns a new scanInternalIterator on this db.
// newInternalIter constructs and returns a new scanInternalIterator on this db.
// If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
// to the internal iterator.
//
// TODO(bilal): This method has a lot of similarities with db.newIter as well as
// finishInitializingIter. Both pairs of methods should be refactored to reduce
// this duplication.
func (d *DB) newInternalIter(s *Snapshot, o *IterOptions) *scanInternalIterator {
func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalIterator {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand All @@ -1160,6 +1179,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *IterOptions) *scanInternalIterator
buf := iterAllocPool.Get().(*iterAlloc)
dbi := &scanInternalIterator{
comparer: d.opts.Comparer,
merge: d.opts.Merger.Merge,
readState: readState,
alloc: buf,
newIters: d.newIters,
Expand Down Expand Up @@ -1193,13 +1213,9 @@ func finishInitializingInternalIter(buf *iterAlloc, i *scanInternalIterator) *sc

// For internal iterators, we skip the lazy combined iteration optimization
// entirely, and create the range key iterator stack directly.
if i.rangeKey == nil {
i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts)
i.constructRangeKeyIter()
} else {
i.rangeKey.iterConfig.SetBounds(i.opts.LowerBound, i.opts.UpperBound)
}
i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
i.constructRangeKeyIter()

// Wrap the point iterator (currently i.iter) with an interleaving
// iterator that interleaves range keys pulled from
Expand Down
7 changes: 7 additions & 0 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ func (k InternalKey) Clone() InternalKey {
}
}

// CopyFrom converts this InternalKey into a clone of the passed-in InternalKey,
// reusing any space already used for the current UserKey.
func (k *InternalKey) CopyFrom(k2 InternalKey) {
k.UserKey = append(k.UserKey[:0], k2.UserKey...)
k.Trailer = k2.Trailer
}

// String returns a string representation of the key.
func (k InternalKey) String() string {
return fmt.Sprintf("%s#%d,%d", FormatBytes(k.UserKey), k.SeqNum(), k.Kind())
Expand Down
Loading

0 comments on commit 101876a

Please sign in to comment.