diff --git a/compaction.go b/compaction.go index b27d54dd1c..f5da98d7aa 100644 --- a/compaction.go +++ b/compaction.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/compact" - "github.com/cockroachdb/pebble/internal/invalidating" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl" @@ -239,12 +238,6 @@ type compaction struct { smallest InternalKey largest InternalKey - // rangeDelInterlaving is an interleaving iterator for range deletions, that - // interleaves range tombstones among the point keys. - rangeDelInterleaving keyspan.InterleavingIter - // rangeKeyInterleaving is the interleaving iter for range keys. - rangeKeyInterleaving keyspan.InterleavingIter - // A list of objects to close when the compaction finishes. Used by input // iteration to keep rangeDelIters open for the lifetime of the compaction, // and only close them when the compaction finishes. @@ -699,22 +692,27 @@ func (c *compaction) allowZeroSeqNum() bool { return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything() } -func (c *compaction) newInputIter( +// newInputIters returns an iterator over all the input tables in a compaction. +func (c *compaction) newInputIters( newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, -) (_ internalIterator, retErr error) { +) ( + pointIter internalIterator, + rangeDelIter, rangeKeyIter keyspan.FragmentIterator, + retErr error, +) { // Validate the ordering of compaction input files for defense in depth. if len(c.flushing) == 0 { if c.startLevel.level >= 0 { err := manifest.CheckOrdering(c.cmp, c.formatKey, manifest.Level(c.startLevel.level), c.startLevel.files.Iter()) if err != nil { - return nil, err + return nil, nil, nil, err } } err := manifest.CheckOrdering(c.cmp, c.formatKey, manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter()) if err != nil { - return nil, err + return nil, nil, nil, err } if c.startLevel.level == 0 { if c.startLevel.l0SublevelInfo == nil { @@ -724,7 +722,7 @@ func (c *compaction) newInputIter( err := manifest.CheckOrdering(c.cmp, c.formatKey, info.sublevel, info.Iter()) if err != nil { - return nil, err + return nil, nil, nil, err } } } @@ -736,7 +734,7 @@ func (c *compaction) newInputIter( err := manifest.CheckOrdering(c.cmp, c.formatKey, manifest.Level(interLevel.level), interLevel.files.Iter()) if err != nil { - return nil, err + return nil, nil, nil, err } } } @@ -901,13 +899,13 @@ func (c *compaction) newInputIter( for _, info := range c.startLevel.l0SublevelInfo { sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil} if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil { - return nil, err + return nil, nil, nil, err } } continue } if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil { - return nil, err + return nil, nil, nil, err } } } @@ -916,9 +914,9 @@ func (c *compaction) newInputIter( // of a *mergingIter. This is possible, for example, when performing a flush // of a single memtable. Otherwise, combine all the iterators into a merging // iter. - iter := iters[0] + pointIter = iters[0] if len(iters) > 1 { - iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...) + pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...) } // In normal operation, levelIter iterates over the point operations in a @@ -937,8 +935,7 @@ func (c *compaction) newInputIter( if len(rangeDelIters) > 0 { mi := &keyspanimpl.MergingIter{} mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...) - c.rangeDelInterleaving.Init(c.comparer, iter, mi, keyspan.InterleavingIterOpts{}) - iter = &c.rangeDelInterleaving + rangeDelIter = mi } // If there are range key iterators, we need to combine them using @@ -946,12 +943,12 @@ func (c *compaction) newInputIter( if len(rangeKeyIters) > 0 { mi := &keyspanimpl.MergingIter{} mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...) + // TODO(radu): why do we have a defragmenter here but not above? di := &keyspan.DefragmentingIter{} di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers)) - c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{}) - iter = &c.rangeKeyInterleaving + rangeKeyIter = di } - return iter, nil + return pointIter, rangeDelIter, rangeKeyIter, nil } func (c *compaction) newRangeDelIter( @@ -2580,15 +2577,13 @@ func (d *DB) runCompaction( c.bufferPool.Init(12) defer c.bufferPool.Release() - iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter) + pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter) if err != nil { return nil, pendingOutputs, stats, err } c.allowedZeroSeqNum = c.allowZeroSeqNum() - iiter = invalidating.MaybeWrapIfInvariants(iiter) cfg := compact.IterConfig{ - Cmp: c.cmp, - Equal: c.equal, + Comparer: c.comparer, Merge: d.merge, TombstoneElision: c.delElision, RangeKeyElision: c.rangeKeyElision, @@ -2597,7 +2592,7 @@ func (d *DB) runCompaction( IneffectualSingleDeleteCallback: d.opts.Experimental.IneffectualSingleDeleteCallback, SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback, } - iter := compact.NewIter(cfg, iiter) + iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) var ( createdFiles []base.DiskFileNum @@ -2943,14 +2938,14 @@ func (d *DB) runCompaction( // Since the keys' Suffix and Value fields are not deep cloned, the // underlying blockIter must be kept open for the lifetime of the // compaction. - iter.AddTombstoneSpan(c.rangeDelInterleaving.Span()) + iter.AddTombstoneSpan(iter.RangeDelSpan()) continue case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: // Range keys are handled in the same way as range tombstones. // Since the keys' Suffix and Value fields are not deep cloned, the // underlying blockIter must be kept open for the lifetime of the // compaction. - iter.AddRangeKeySpan(c.rangeKeyInterleaving.Span()) + iter.AddRangeKeySpan(iter.RangeKeySpan()) continue } if tw == nil { diff --git a/compaction_test.go b/compaction_test.go index 2a710ecb2d..4710dec29e 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2457,7 +2457,7 @@ func TestCompactionCheckOrdering(t *testing.T) { return iterSet{point: &errorIter{}}, nil } result := "OK" - _, err := c.newInputIter(newIters, nil) + _, _, _, err := c.newInputIters(newIters, nil) if err != nil { result = fmt.Sprint(err) } diff --git a/internal/compact/iterator.go b/internal/compact/iterator.go index 578d4438f6..074326e950 100644 --- a/internal/compact/iterator.go +++ b/internal/compact/iterator.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/bytealloc" + "github.com/cockroachdb/pebble/internal/invalidating" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/rangekey" @@ -154,7 +155,16 @@ type Iter struct { cfg IterConfig - iter base.InternalIterator + // rangeDelInterlaving is an interleaving iterator for range deletions, that + // interleaves range tombstones among the point keys. + rangeDelInterleaving keyspan.InterleavingIter + // rangeKeyInterleaving is the interleaving iter for range keys. + rangeKeyInterleaving keyspan.InterleavingIter + + // iter is the iterator which interleaves points with RANGEDELs and range + // keys. + iter base.InternalIterator + delElider pointTombstoneElider rangeDelElider rangeTombstoneElider rangeKeyElider rangeTombstoneElider @@ -241,9 +251,8 @@ type Iter struct { // IterConfig contains the parameters necessary to create a compaction iterator. type IterConfig struct { - Cmp base.Compare - Equal base.Equal - Merge base.Merge + Comparer *base.Comparer + Merge base.Merge // The snapshot sequence numbers that need to be maintained. These sequence // numbers define the snapshot stripes. @@ -290,17 +299,33 @@ const ( // NewIter creates a new compaction iterator. See the comment for Iter for a // detailed description. -func NewIter(cfg IterConfig, iter base.InternalIterator) *Iter { +// rangeDelIter and rangeKeyIter can be nil. +func NewIter( + cfg IterConfig, + pointIter base.InternalIterator, + rangeDelIter, rangeKeyIter keyspan.FragmentIterator, +) *Iter { cfg.ensureDefaults() i := &Iter{ - cmp: cfg.Cmp, - cfg: cfg, - iter: iter, + cmp: cfg.Comparer.Compare, + cfg: cfg, + } + + iter := pointIter + if rangeDelIter != nil { + i.rangeDelInterleaving.Init(cfg.Comparer, iter, rangeDelIter, keyspan.InterleavingIterOpts{}) + iter = &i.rangeDelInterleaving + } + if rangeKeyIter != nil { + i.rangeKeyInterleaving.Init(cfg.Comparer, iter, rangeKeyIter, keyspan.InterleavingIterOpts{}) + iter = &i.rangeKeyInterleaving } - i.frontiers.Init(cfg.Cmp) - i.delElider.Init(cfg.Cmp, cfg.TombstoneElision) - i.rangeDelElider.Init(cfg.Cmp, cfg.TombstoneElision) - i.rangeKeyElider.Init(cfg.Cmp, cfg.RangeKeyElision) + i.iter = invalidating.MaybeWrapIfInvariants(iter) + + i.frontiers.Init(i.cmp) + i.delElider.Init(i.cmp, cfg.TombstoneElision) + i.rangeDelElider.Init(i.cmp, cfg.TombstoneElision) + i.rangeKeyElider.Init(i.cmp, cfg.RangeKeyElision) return i } @@ -348,6 +373,9 @@ func (i *Iter) First() (*base.InternalKey, []byte) { } // Next has the same semantics as InternalIterator.Next. +// Note that when Next returns a RANGEDEL key, the caller can call +// RangeDelSpan() to get the corresponding span. Similarly, when Next returns a +// range key, the caller can use RangeKeySpan(). func (i *Iter) Next() (*base.InternalKey, []byte) { if i.err != nil { return nil, nil @@ -581,6 +609,20 @@ func (i *Iter) Next() (*base.InternalKey, []byte) { return nil, nil } +// RangeDelSpan returns the range deletion span corresponding to the current +// key. Can only be called right after a Next() call that returned a RANGEDEL +// key. +func (i *Iter) RangeDelSpan() *keyspan.Span { + return i.rangeDelInterleaving.Span() +} + +// RangeKeySpan returns the range deletion span corresponding to the current +// key. Can only be called right after a Next() call that returned a range +// key. +func (i *Iter) RangeKeySpan() *keyspan.Span { + return i.rangeKeyInterleaving.Span() +} + func (i *Iter) closeValueCloser() error { if i.valueCloser == nil { return nil @@ -670,7 +712,7 @@ func (i *Iter) nextInStripeHelper() stripeChangeType { // number ordering within a user key. If the previous key was one // of these keys, we consider the new key a `newStripeNewKey` to // reflect that it's the beginning of a new stream of point keys. - if i.key.IsExclusiveSentinel() || !i.cfg.Equal(i.key.UserKey, kv.K.UserKey) { + if i.key.IsExclusiveSentinel() || !i.cfg.Comparer.Equal(i.key.UserKey, kv.K.UserKey) { i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(kv.SeqNum()) return newStripeNewKey } @@ -1421,7 +1463,7 @@ func (i *Iter) RangeKeysUpTo(key []byte) []keyspan.Span { } if y > start { keysDst := dst.Keys[usedLen:cap(dst.Keys)] - rangekey.Coalesce(i.cmp, i.cfg.Equal, s.Keys[start:y], &keysDst) + rangekey.Coalesce(i.cmp, i.cfg.Comparer.Equal, s.Keys[start:y], &keysDst) if y == len(s.Keys) { // This is the last snapshot stripe. Unsets and deletes can be elided. keysDst = elideInLastStripe(keysDst) @@ -1433,7 +1475,7 @@ func (i *Iter) RangeKeysUpTo(key []byte) []keyspan.Span { } if y < len(s.Keys) { keysDst := dst.Keys[usedLen:cap(dst.Keys)] - rangekey.Coalesce(i.cmp, i.cfg.Equal, s.Keys[y:], &keysDst) + rangekey.Coalesce(i.cmp, i.cfg.Comparer.Equal, s.Keys[y:], &keysDst) keysDst = elideInLastStripe(keysDst) usedLen += len(keysDst) dst.Keys = append(dst.Keys, keysDst...) diff --git a/internal/compact/iterator_test.go b/internal/compact/iterator_test.go index a329da29b5..e05c012fbc 100644 --- a/internal/compact/iterator_test.go +++ b/internal/compact/iterator_test.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/invalidating" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/rangekey" "github.com/stretchr/testify/require" @@ -61,7 +60,7 @@ func TestCompactionIter(t *testing.T) { ineffectualSingleDeleteKeys = ineffectualSingleDeleteKeys[:0] invariantViolationSingleDeleteKeys = invariantViolationSingleDeleteKeys[:0] } - newIter := func() (iter *Iter, rangeKeyInterleaving, rangeDelInterleaving *keyspan.InterleavingIter) { + newIter := func() *Iter { resetSingleDelStats() if merge == nil { merge = func(key, value []byte) (base.ValueMerger, error) { @@ -76,8 +75,7 @@ func TestCompactionIter(t *testing.T) { elision = ElideTombstonesOutsideOf(nil) } cfg := IterConfig{ - Cmp: base.DefaultComparer.Compare, - Equal: base.DefaultComparer.Equal, + Comparer: base.DefaultComparer, Merge: merge, Snapshots: snapshots, TombstoneElision: elision, @@ -90,8 +88,8 @@ func TestCompactionIter(t *testing.T) { invariantViolationSingleDeleteKeys = append(invariantViolationSingleDeleteKeys, string(userKey)) }, } - input, rangeDelInterleaving, rangeKeyInterleaving := makeInputIter(kvs, rangeDels, rangeKeys) - return NewIter(cfg, input), rangeDelInterleaving, rangeKeyInterleaving + pointIter, rangeDelIter, rangeKeyIter := makeInputIters(kvs, rangeDels, rangeKeys) + return NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) } runTest := func(t *testing.T, file string) { @@ -190,7 +188,7 @@ func TestCompactionIter(t *testing.T) { } slices.Sort(snapshots) - iter, rangeDelInterleaving, rangeKeyInterleaving := newIter() + iter := newIter() var b bytes.Buffer for _, line := range strings.Split(d.Input, "\n") { parts := strings.Fields(line) @@ -253,12 +251,12 @@ func TestCompactionIter(t *testing.T) { } fmt.Fprintf(&b, "%s:%s%s%s", iter.Key(), v, snapshotPinned, forceObsolete) if iter.Key().Kind() == base.InternalKeyKindRangeDelete { - iter.AddTombstoneSpan(rangeDelInterleaving.Span()) - fmt.Fprintf(&b, "; Span() = %s", *rangeDelInterleaving.Span()) + iter.AddTombstoneSpan(iter.RangeDelSpan()) + fmt.Fprintf(&b, "; Span() = %s", *iter.RangeDelSpan()) } if rangekey.IsRangeKey(iter.Key().Kind()) { - iter.AddRangeKeySpan(rangeKeyInterleaving.Span()) - fmt.Fprintf(&b, "; Span() = %s", *rangeKeyInterleaving.Span()) + iter.AddRangeKeySpan(iter.RangeKeySpan()) + fmt.Fprintf(&b, "; Span() = %s", *iter.RangeKeySpan()) } fmt.Fprintln(&b) } else if err := iter.Error(); err != nil { @@ -318,16 +316,14 @@ func TestIterRangeKeys(t *testing.T) { } cfg := IterConfig{ - Cmp: base.DefaultComparer.Compare, - Equal: base.DefaultComparer.Equal, + Comparer: base.DefaultComparer, Snapshots: snapshots, AllowZeroSeqNum: false, TombstoneElision: NoTombstoneElision(), RangeKeyElision: ElideTombstonesOutsideOf(keyRanges), } - input, _, _ := makeInputIter(nil, nil, nil) - - iter := NewIter(cfg, input) + pointIter, rangeDelIter, rangeKeyIter := makeInputIters(nil, nil, nil) + iter := NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter) iter.AddRangeKeySpan(&span) outSpans := iter.RangeKeysUpTo(nil) @@ -342,37 +338,16 @@ func TestIterRangeKeys(t *testing.T) { }) } -// makeInputIter creates an iterator that can be used as an input for the -// compaction Iter, along with rangeDel and rangeKey interleaving iterators -// which can be used to retrieve the span corresponding to a range del or range -// key. -func makeInputIter( +// makeInputIters creates the iterators necessthat can be used to create a compaction +// Iter. +func makeInputIters( points []base.InternalKV, rangeDels, rangeKeys []keyspan.Span, -) ( - input base.InternalIterator, - rangeDelInterleaving, rangeKeyInterleaving *keyspan.InterleavingIter, -) { +) (pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator) { // To adhere to the existing assumption that range deletion blocks in // SSTables are not released while iterating, and therefore not // susceptible to use-after-free bugs, we skip the zeroing of // RangeDelete keys. - fi := base.NewFakeIter(points) - rangeDelInterleaving = &keyspan.InterleavingIter{} - rangeDelInterleaving.Init( - base.DefaultComparer, - fi, + return base.NewFakeIter(points), keyspan.NewIter(base.DefaultComparer.Compare, rangeDels), - keyspan.InterleavingIterOpts{}) - rangeKeyInterleaving = &keyspan.InterleavingIter{} - rangeKeyInterleaving.Init( - base.DefaultComparer, - rangeDelInterleaving, - keyspan.NewIter(base.DefaultComparer.Compare, rangeKeys), - keyspan.InterleavingIterOpts{}) - // To adhere to the existing assumption that range deletion blocks in - // SSTables are not released while iterating, and therefore not - // susceptible to use-after-free bugs, we skip the zeroing of - // RangeDelete keys. - input = invalidating.NewIter(rangeKeyInterleaving, invalidating.IgnoreKinds(base.InternalKeyKindRangeDelete)) - return input, rangeDelInterleaving, rangeKeyInterleaving + keyspan.NewIter(base.DefaultComparer.Compare, rangeKeys) }