Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: rework merging iterator range deletion handling #3600

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,7 @@ func (i *flushableBatchIter) extractValue() base.LazyValue {
return base.LazyValue{}
}
kind := InternalKeyKind(p[0])
if kind > InternalKeyKindMax {
if kind > base.InternalKeyKindDurableMax {
i.err = base.CorruptionErrorf("corrupted batch")
return base.LazyValue{}
}
Expand Down
2 changes: 1 addition & 1 deletion batchrepr/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o
return 0, nil, nil, false, nil
}
kind = base.InternalKeyKind((*r)[0])
if kind > base.InternalKeyKindMax {
if kind > base.InternalKeyKindDurableMax {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
}
*r, ukey, ok = DecodeStr((*r)[1:])
Expand Down
20 changes: 9 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -1431,30 +1432,26 @@ func (i *Iterator) constructPointIter(
} else {
i.batch.initInternalIter(&i.opts, &i.batchPointIter)
i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
mil := mergingIterLevel{iter: &i.batchPointIter, getTombstone: nil}
// Only include the batch's rangedel iterator if it's non-empty.
// This requires some subtle logic in the case a rangedel is later
// written to the batch and the view of the batch is refreshed
// during a call to SetOptions—in this case, we need to reconstruct
// the point iterator to add the batch rangedel iterator.
var rangeDelIter keyspan.FragmentIterator
if i.batchRangeDelIter.Count() > 0 {
rangeDelIter = &i.batchRangeDelIter
mil.iter, mil.getTombstone = rangedel.Interleave(&i.comparer, &i.batchPointIter, &i.batchRangeDelIter)
}
mlevels = append(mlevels, mergingIterLevel{
iter: &i.batchPointIter,
rangeDelIter: rangeDelIter,
})
mlevels = append(mlevels, mil)
}
}

if !i.batchOnlyIter {
// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts),
rangeDelIter: mem.newRangeDelIter(&i.opts),
})
mil := mergingIterLevel{}
mil.iter, mil.getTombstone = rangedel.Interleave(&i.comparer, mem.newIter(&i.opts), mem.newRangeDelIter(&i.opts))
mlevels = append(mlevels, mil)
}

// Next are the file levels: L0 sub-levels followed by lower levels.
Expand All @@ -1467,10 +1464,11 @@ func (i *Iterator) constructPointIter(
li := &levels[levelsIndex]

li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
li.interleaveRangeDeletions = true
li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
mlevels[mlevelsIndex].levelIter = li
mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
mlevels[mlevelsIndex].getTombstone = li.getTombstone

levelsIndex++
mlevelsIndex++
Expand Down
10 changes: 6 additions & 4 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/sstable"
)

Expand Down Expand Up @@ -212,10 +213,11 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
if err != nil {
return nil, err
}
mlevels = append(mlevels, mergingIterLevel{
iter: pointIter,
rangeDelIter: rangeDelIter,
})
mil := mergingIterLevel{iter: pointIter, getTombstone: nil}
if rangeDelIter != nil {
mil.iter, mil.getTombstone = rangedel.Interleave(&it.comparer, mil.iter, rangeDelIter)
}
mlevels = append(mlevels, mil)
}
}

Expand Down
6 changes: 3 additions & 3 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func ingestSynthesizeShared(
// a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
// correct bound, we just use the maximum key kind (which sorts first).
// Similarly, we use the smallest key kind for the largest key.
smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax)
smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindDurableMax)
largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
if sm.LargestPointKey.IsExclusiveSentinel() {
largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
Expand Down Expand Up @@ -220,12 +220,12 @@ func ingestLoad1External(
if e.EndKeyIsInclusive {
meta.ExtendPointKeyBounds(
opts.Comparer.Compare,
base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindDurableMax),
base.MakeInternalKey(largestCopy, 0, 0))
} else {
meta.ExtendPointKeyBounds(
opts.Comparer.Compare,
base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindDurableMax),
base.MakeRangeDeleteSentinelKey(largestCopy))
}
}
Expand Down
28 changes: 26 additions & 2 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ const (
// heuristics, but is not required to be accurate for correctness.
InternalKeyKindDeleteSized InternalKeyKind = 23

InternalKeyKindSpanStart InternalKeyKind = 24
InternalKeyKindSpanEnd InternalKeyKind = 25

// This maximum value isn't part of the file format. Future extensions may
// increase this value.
//
Expand All @@ -105,7 +108,12 @@ const (
// which sorts 'less than or equal to' any other valid internalKeyKind, when
// searching for any kind of internal key formed by a certain user key and
// seqNum.
InternalKeyKindMax InternalKeyKind = 23
InternalKeyKindMax InternalKeyKind = 25

// NB: This is less than InternalKeyKindSpanStart and InternalKeyKindSpanEnd
// because those key kinds are never used in durable formats; only as
// special in-memory indicators.
InternalKeyKindDurableMax InternalKeyKind = 23

// Internal to the sstable format. Not exposed by any sstable iterator.
// Declared here to prevent definition of valid key kinds that set this bit.
Expand Down Expand Up @@ -157,6 +165,8 @@ var internalKeyKindNames = []string{
InternalKeyKindRangeKeyDelete: "RANGEKEYDEL",
InternalKeyKindIngestSST: "INGESTSST",
InternalKeyKindDeleteSized: "DELSIZED",
InternalKeyKindSpanStart: "SPANSTART",
InternalKeyKindSpanEnd: "SPANEND",
InternalKeyKindInvalid: "INVALID",
}

Expand Down Expand Up @@ -249,6 +259,8 @@ var kindsMap = map[string]InternalKeyKind{
"RANGEKEYDEL": InternalKeyKindRangeKeyDelete,
"INGESTSST": InternalKeyKindIngestSST,
"DELSIZED": InternalKeyKindDeleteSized,
"SPANSTART": InternalKeyKindSpanStart,
"SPANEND": InternalKeyKindSpanEnd,
}

// ParseInternalKey parses the string representation of an internal key. The
Expand Down Expand Up @@ -476,7 +488,8 @@ func (k InternalKey) IsExclusiveSentinel() bool {
}
switch kind := k.Kind(); kind {
case InternalKeyKindRangeDelete, InternalKeyKindRangeKeyDelete,
InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet,
InternalKeyKindSpanStart, InternalKeyKindSpanEnd:
return true
default:
return false
Expand Down Expand Up @@ -561,3 +574,14 @@ func (kv *InternalKV) Visible(snapshot, batchSnapshot uint64) bool {
func (kv *InternalKV) IsExclusiveSentinel() bool {
return kv.K.IsExclusiveSentinel()
}

// String returns a string representation of the kv pair.
func (kv *InternalKV) String() string {
if kv == nil {
return "<nil>"
}
if kv.V.Fetcher != nil {
return fmt.Sprintf("%s=<valblk:%x>", kv.K, kv.V.ValueOrHandle)
}
return fmt.Sprintf("%s:%s", kv.K, FormatBytes(kv.V.ValueOrHandle))
}
2 changes: 1 addition & 1 deletion internal/base/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestInvalidInternalKey(t *testing.T) {
"\x01\x02\x03\x04\x05\x06\x07",
"foo",
"foo\x08\x07\x06\x05\x04\x03\x02",
"foo\x18\x07\x06\x05\x04\x03\x02\x01",
"foo\x1a\x07\x06\x05\x04\x03\x02\x01",
}
for _, tc := range testCases {
k := DecodeInternalKey([]byte(tc))
Expand Down
17 changes: 15 additions & 2 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ type InterleavingIterOpts struct {
// end keys of spans (in addition to the start keys, which are always
// interleaved).
InterleaveEndKeys bool
// UseBoundaryKeyKinds configures the interleaving iterator to interleave
// keys using SPANSTART and SPANEND key kinds for start and end boundaries
// respectively, rather than the key kind of the first key in the Span.
UseBoundaryKeyKinds bool
}

// Init initializes the InterleavingIter to interleave point keys from pointIter
Expand Down Expand Up @@ -979,7 +983,12 @@ func (i *InterleavingIter) yieldPointKey() *base.InternalKV {

func (i *InterleavingIter) yieldSyntheticSpanStartMarker(lowerBound []byte) *base.InternalKV {
i.spanMarker.K.UserKey = i.startKey()
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, i.span.Keys[0].Kind())

kind := base.InternalKeyKindSpanStart
if !i.opts.UseBoundaryKeyKinds {
kind = i.span.Keys[0].Kind()
}
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, kind)

// Truncate the key we return to our lower bound if we have one. Note that
// we use the lowerBound function parameter, not i.lower. The lowerBound
Expand Down Expand Up @@ -1015,8 +1024,12 @@ func (i *InterleavingIter) yieldSyntheticSpanStartMarker(lowerBound []byte) *bas
}

func (i *InterleavingIter) yieldSyntheticSpanEndMarker() *base.InternalKV {
kind := base.InternalKeyKindSpanEnd
if !i.opts.UseBoundaryKeyKinds {
kind = i.span.Keys[0].Kind()
}
i.spanMarker.K.UserKey = i.endKey()
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, i.span.Keys[0].Kind())
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, kind)
return i.verify(&i.spanMarker)
}

Expand Down
1 change: 1 addition & 0 deletions internal/keyspan/interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func runInterleavingIterTest(t *testing.T, filename string) {
hooks.threshold = []byte(strings.Join(cmdArg.Vals, ""))
}
opts.InterleaveEndKeys = td.HasArg("interleave-end-keys")
opts.UseBoundaryKeyKinds = td.HasArg("use-boundary-key-kinds")
iter.Init(testkeys.Comparer, &pointIter, keyspanIter, opts)
// Clear any previous bounds.
pointIter.SetBounds(nil, nil)
Expand Down
7 changes: 4 additions & 3 deletions internal/keyspan/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,11 @@ func (s Span) Visible(snapshot uint64) Span {
// VisibleAt requires the Span's keys be in ByTrailerDesc order. It panics if
// the span's keys are sorted in a different order.
func (s *Span) VisibleAt(snapshot uint64) bool {
if s.KeysOrder != ByTrailerDesc {
if s == nil {
return false
} else if s.KeysOrder != ByTrailerDesc {
panic("pebble: span's keys unexpectedly not in trailer order")
}
if len(s.Keys) == 0 {
} else if len(s.Keys) == 0 {
return false
} else if first := s.Keys[0].SeqNum(); first&base.InternalKeySeqNumBatch != 0 {
// Only visible batch keys are included when an Iterator's batch spans
Expand Down
52 changes: 52 additions & 0 deletions internal/keyspan/testdata/interleaving_iter
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,58 @@ Span: f-g:{(#6,RANGEDEL)}
-- SpanChanged(nil)
.

iter interleave-end-keys use-boundary-key-kinds
first
next
next
next
next
next
next
next
next
next
next
----
-- SpanChanged(nil)
-- SpanChanged(nil)
PointKey: a@4#8,SET
Span: <invalid>
-
-- SpanChanged(b-e:{(#5,RANGEDEL)})
PointKey: b#72057594037927935,SPANSTART
Span: b-e:{(#5,RANGEDEL)}
-
PointKey: c@11#8,SET
Span: b-e:{(#5,RANGEDEL)}
-
PointKey: c@3#8,SET
Span: b-e:{(#5,RANGEDEL)}
-
PointKey: c@1#4,SET
Span: b-e:{(#5,RANGEDEL)}
-
PointKey: d@5#3,SET
Span: b-e:{(#5,RANGEDEL)}
-
PointKey: e#72057594037927935,SPANEND
Span: b-e:{(#5,RANGEDEL)}
-
-- SpanChanged(nil)
PointKey: e@9#2,SET
Span: <invalid>
-
-- SpanChanged(f-g:{(#6,RANGEDEL)})
PointKey: f#72057594037927935,SPANSTART
Span: f-g:{(#6,RANGEDEL)}
-
PointKey: g#72057594037927935,SPANEND
Span: f-g:{(#6,RANGEDEL)}
-
-- SpanChanged(nil)
.


iter interleave-end-keys
last
prev
Expand Down
48 changes: 48 additions & 0 deletions internal/rangedel/rangedel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package rangedel

import (
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
)
Expand Down Expand Up @@ -41,3 +43,49 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) keyspan.Span {
}),
}
}

// Interleave takes a point iterator and a range deletion iterator, returning an
// iterator that interleaves range deletion boundary keys at the maximal
// sequence number among the stream of point keys with SPANSTART and SPANEND key
// kinds.
//
// In addition, Interleave returns a function that may be used to retrieve the
// range tombstone overlapping the current iterator position, if any.
//
// The returned iterator must only be closed once.
func Interleave(
comparer *base.Comparer, iter base.InternalIterator, rangeDelIter keyspan.FragmentIterator,
) (base.InternalIterator, func() *keyspan.Span) {
// If there is no range deletion iterator, don't bother using an interleaving
// iterator. We can return iter verbatim and a func that unconditionally
// returns nil.
if rangeDelIter == nil {
return iter, nil
}

ii := interleavingIterPool.Get().(*interleavingIter)
ii.Init(comparer, iter, rangeDelIter, keyspan.InterleavingIterOpts{
InterleaveEndKeys: true,
UseBoundaryKeyKinds: true,
})
return ii, ii.Span
}

var interleavingIterPool = sync.Pool{
New: func() interface{} {
return &interleavingIter{}
},
}

type interleavingIter struct {
keyspan.InterleavingIter
}

// Close closes the interleaving iterator and returns the interleaving iterator
// to the pool.
func (i *interleavingIter) Close() error {
err := i.InterleavingIter.Close()
*i = interleavingIter{}
interleavingIterPool.Put(i)
return err
}
4 changes: 2 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,8 +1820,8 @@ func (i *Iterator) nextPrefix() IterValidityState {
return i.iterValidityState
}
if invariants.Enabled && !i.equal(i.iterKV.K.UserKey, i.key) {
i.opts.getLogger().Fatalf("pebble: invariant violation: Nexting internal iterator from iterPosPrev landed on %q, not %q",
i.iterKV.K.UserKey, i.key)
panic(errors.AssertionFailedf("pebble: invariant violation: Nexting internal iterator from iterPosPrev landed on %q, not %q",
i.iterKV.K.UserKey, i.key))
}
}
// The internal iterator is now positioned at i.key. Advance to the next
Expand Down
Loading
Loading