Skip to content

Commit

Permalink
*: implement compaction/flushing of range keys
Browse files Browse the repository at this point in the history
This change implements compaction of sstables containing
range keys, as well as allows memtables with non-empty
rangeKeySkl skiplists to write correctly to sstable range
key blocks in flushes. Range key elision and snapshot
striping is implement in a keyspan.Transform passed to
keyspan.MergingIter inside the compaction iter. Otherwise,
behaviour is kept as close to that of range deletions
as possible, to improve confidence in correctness.

Fixes #1686.
  • Loading branch information
itsbilal committed Jun 10, 2022
1 parent a015e5a commit d72083d
Show file tree
Hide file tree
Showing 13 changed files with 581 additions and 67 deletions.
210 changes: 184 additions & 26 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/private"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -309,6 +310,69 @@ func (k compactionKind) String() string {
return "?"
}

// rangeKeyCompactionTransform is used to transform range key spans as part of the
// keyspan.MergingIter. As part of this transformation step, we can elide range
// keys in the last snapshot stripe, as well as coalesce range keys within
// snapshot stripes.
func rangeKeyCompactionTransform(
snapshots []uint64, elideRangeKey func(start, end []byte) bool,
) keyspan.Transformer {
return keyspan.TransformerFunc(func(cmp base.Compare, s keyspan.Span, dst *keyspan.Span) error {
elideInLastStripe := func(keys []keyspan.Key) []keyspan.Key {
// Unsets and deletes in the last snapshot stripe can be elided.
k := 0
for j := range keys {
if elideRangeKey(s.Start, s.End) &&
(keys[j].Kind() == InternalKeyKindRangeKeyUnset || keys[j].Kind() == InternalKeyKindRangeKeyDelete) {
continue
}
keys[k] = keys[j]
k++
}
keys = keys[:k]
return keys
}
// snapshots are in ascending order, while s.keys are in descending seqnum
// order. Partition s.keys by snapshot stripes, and call rangekey.Coalesce
// on each partition.
dst.Start = s.Start
dst.End = s.End
dst.Keys = dst.Keys[:0]
i, j := len(snapshots)-1, 0
usedLen := 0
for i >= 0 {
start := j
for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i]) {
// Include j in current partition.
j++
}
if j > start {
keysDst := dst.Keys[usedLen:cap(dst.Keys)]
if err := rangekey.Coalesce(cmp, s.Keys[start:j], &keysDst); err != nil {
return err
}
if j == len(s.Keys) {
// This is the last snapshot stripe. Unsets and deletes can be elided.
keysDst = elideInLastStripe(keysDst)
}
usedLen += len(keysDst)
dst.Keys = append(dst.Keys, keysDst...)
}
i--
}
if j < len(s.Keys) {
keysDst := dst.Keys[usedLen:cap(dst.Keys)]
if err := rangekey.Coalesce(cmp, s.Keys[j:], &keysDst); err != nil {
return err
}
keysDst = elideInLastStripe(keysDst)
usedLen += len(keysDst)
dst.Keys = append(dst.Keys, keysDst...)
}
return nil
})
}

// compaction is a table compaction from one level to the next, starting from a
// given version.
type compaction struct {
Expand Down Expand Up @@ -344,10 +408,10 @@ type compaction struct {
// maxOverlapBytes is the maximum number of bytes of overlap allowed for a
// single output table with the tables in the grandparent level.
maxOverlapBytes uint64
// disableRangeTombstoneElision disables elision of range tombstones. Used by
// tests to allow range tombstones to be added to tables where they would
// otherwise be elided.
disableRangeTombstoneElision bool
// disableSpanElision disables elision of range tombstones and range keys. Used
// by tests to allow range tombstones or range keys to be added to tables where
// they would otherwise be elided.
disableSpanElision bool

// flushing contains the flushables (aka memtables) that are being flushed.
flushing flushableList
Expand All @@ -370,12 +434,17 @@ type compaction struct {
// returned from `compactionIter` and fragments them for output to files.
// Referenced by `compactionIter` which uses it to check whether keys are deleted.
rangeDelFrag keyspan.Fragmenter
// The range key fragmenter. Similar to rangeDelFrag in that it gets range
// keys from the compaction iter and fragments them for output to files.
rangeKeyFrag keyspan.Fragmenter
// The range deletion tombstone iterator, that merges and fragments
// tombstones across levels. This iterator is included within the compaction
// input iterator as a single level.
// TODO(jackson): Remove this when the refactor of FragmentIterator,
// InterleavingIterator, etc is complete.
rangeDelIter keyspan.InternalIteratorShim
// rangeKeyInterleaving is the interleaving iter for range keys.
rangeKeyInterleaving keyspan.InterleavingIter

// A list of objects to close when the compaction finishes. Used by input
// iteration to keep rangeDelIters open for the lifetime of the compaction,
Expand Down Expand Up @@ -615,6 +684,9 @@ func newFlush(
}

updateRangeBounds := func(iter keyspan.FragmentIterator) {
// File bounds require s != nil && !s.Empty(). We only need to check for
// s != nil here, as the memtable's FragmentIterator would never surface
// empty spans.
if s := iter.First(); s != nil {
if key := s.SmallestKey(); !smallestSet ||
base.InternalCompare(c.cmp, c.smallest, key) > 0 {
Expand All @@ -638,6 +710,9 @@ func newFlush(
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
updateRangeBounds(rangeDelIter)
}
if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
updateRangeBounds(rangeKeyIter)
}
flushingBytes += f.inuseBytes()
}

Expand Down Expand Up @@ -908,7 +983,7 @@ func (c *compaction) elideRangeTombstone(start, end []byte) bool {
// code doesn't know that L0 contains files and zeroing of seqnums should
// be disabled. That is fixable, but it seems safer to just match the
// RocksDB behavior for now.
if c.disableRangeTombstoneElision || len(c.flushing) != 0 {
if c.disableSpanElision || len(c.flushing) != 0 {
return false
}

Expand All @@ -921,35 +996,66 @@ func (c *compaction) elideRangeTombstone(start, end []byte) bool {
return lower >= upper
}

// elideRangeKey returns true if it is ok to elide the specified range key. A
// return value of true guarantees that there are no key/value pairs at
// c.outputLevel.level+1 or higher that possibly overlap the specified range key.
func (c *compaction) elideRangeKey(start, end []byte) bool {
// TODO(bilal): Track inuseKeyRanges separately for the range keyspace as
// opposed to the point keyspace. Once that is done, elideRangeTombstone
// can just check in the point keyspace, and this function can check for
// inuseKeyRanges in the range keyspace.
return c.elideRangeTombstone(start, end)
}

// newInputIter returns an iterator over all the input tables in a compaction.
func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, retErr error) {
func (c *compaction) newInputIter(
newIters tableNewIters, newSpanIter keyspan.TableNewSpanIter, snapshots []uint64,
) (_ internalIterator, retErr error) {
var rangeDelIters []keyspan.FragmentIterator
var rangeKeyIters []keyspan.FragmentIterator

if len(c.flushing) != 0 {
if len(c.flushing) == 1 {
f := c.flushing[0]
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
c.rangeDelIter.Init(c.cmp, rangeDelIter)
return newMergingIter(c.logger, c.cmp, nil, iter, &c.rangeDelIter), nil
iter = newMergingIter(c.logger, c.cmp, nil, iter, &c.rangeDelIter)
}
if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIter)
c.rangeKeyInterleaving.Init(c.cmp, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
iter = &c.rangeKeyInterleaving
}
return iter, nil
}
iters := make([]internalIterator, 0, len(c.flushing)+1)
rangeDelIters = make([]keyspan.FragmentIterator, 0, len(c.flushing))
rangeKeyIters = make([]keyspan.FragmentIterator, 0, len(c.flushing))
for i := range c.flushing {
f := c.flushing[i]
iters = append(iters, f.newFlushIter(nil, &c.bytesIterated))
rangeDelIter := f.newRangeDelIter(nil)
if rangeDelIter != nil {
rangeDelIters = append(rangeDelIters, rangeDelIter)
}
if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
}
}
if len(rangeDelIters) > 0 {
c.rangeDelIter.Init(c.cmp, rangeDelIters...)
iters = append(iters, &c.rangeDelIter)
}
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
var iter base.InternalIteratorWithStats = newMergingIter(c.logger, c.cmp, nil, iters...)
if len(rangeKeyIters) > 0 {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...)
c.rangeKeyInterleaving.Init(c.cmp, base.WrapIterWithStats(iter), mi, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
iter = &c.rangeKeyInterleaving
}
return iter, nil
}

if c.startLevel.level >= 0 {
Expand Down Expand Up @@ -1081,6 +1187,20 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
li := &keyspan.LevelIter{}
li.Init(keyspan.SpanIterOptions{}, c.cmp, wrapper, level.files.Iter(), l, c.logger, manifest.KeyTypePoint)
rangeDelIters = append(rangeDelIters, li)
// Check if this level has any range keys.
hasRangeKeys := false
iter := level.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if f.HasRangeKeys {
hasRangeKeys = true
break
}
}
if hasRangeKeys {
li := &keyspan.LevelIter{}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newSpanIter, level.files.Iter(), l, c.logger, manifest.KeyTypeRange)
rangeKeyIters = append(rangeKeyIters, li)
}
return nil
}

Expand Down Expand Up @@ -1114,7 +1234,17 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
c.rangeDelIter.Init(c.cmp, rangeDelIters...)
iters = append(iters, &c.rangeDelIter)
}
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
pointKeyIter := newMergingIter(c.logger, c.cmp, nil, iters...)
if len(rangeKeyIters) > 0 {
mi := &keyspan.MergingIter{}
mi.Init(c.cmp, rangeKeyCompactionTransform(snapshots, c.elideRangeKey), rangeKeyIters...)
di := &keyspan.DefragmentingIter{}
di.Init(c.cmp, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer)
c.rangeKeyInterleaving.Init(c.cmp, pointKeyIter, di, nil /* hooks */, nil /* lowerBound */, nil /* upperBound */)
return &c.rangeKeyInterleaving, nil
}

return pointKeyIter, nil
}

func (c *compaction) String() string {
Expand Down Expand Up @@ -2048,13 +2178,13 @@ func (d *DB) runCompaction(
d.mu.Unlock()
defer d.mu.Lock()

iiter, err := c.newInputIter(d.newIters)
iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum()
iter := newCompactionIter(c.cmp, c.equal, c.formatKey, d.merge, iiter, snapshots,
&c.rangeDelFrag, c.allowedZeroSeqNum, c.elideTombstone,
&c.rangeDelFrag, &c.rangeKeyFrag, c.allowedZeroSeqNum, c.elideTombstone,
c.elideRangeTombstone, d.FormatMajorVersion())

var (
Expand Down Expand Up @@ -2182,18 +2312,18 @@ func (d *DB) runCompaction(
// should be flushed. Typically, this is the first key of the next
// sstable or an empty key if this output is the final sstable.
finishOutput := func(splitKey []byte) error {
// If we haven't output any point records to the sstable (tw == nil)
// then the sstable will only contain range tombstones. The smallest
// key in the sstable will be the start key of the first range
// tombstone added. We need to ensure that this start key is distinct
// from the splitKey passed to finishOutput (if set), otherwise we
// would generate an sstable where the largest key is smaller than the
// smallest key due to how the largest key boundary is set below.
// NB: It is permissible for the range tombstone start key to be the
// empty string.
// TODO: It is unfortunate that we have to do this check here rather
// than when we decide to finish the sstable in the runCompaction
// loop. A better structure currently eludes us.
// If we haven't output any point records to the sstable (tw == nil) then the
// sstable will only contain range tombstones and/or range keys. The smallest
// key in the sstable will be the start key of the first range tombstone or
// range key added. We need to ensure that this start key is distinct from
// the splitKey passed to finishOutput (if set), otherwise we would generate
// an sstable where the largest key is smaller than the smallest key due to
// how the largest key boundary is set below. NB: It is permissible for the
// range tombstone / range key start key to be the empty string.
//
// TODO: It is unfortunate that we have to do this check here rather than
// when we decide to finish the sstable in the runCompaction loop. A better
// structure currently eludes us.
if tw == nil {
startKey := c.rangeDelFrag.Start()
if len(iter.tombstones) > 0 {
Expand Down Expand Up @@ -2238,7 +2368,18 @@ func (d *DB) runCompaction(
// added to the writer, eliding out-of-file range tombstones based
// on sequence number at this stage is difficult, and necessitates
// read-time logic to ignore range tombstones outside file bounds.
if rangedel.Encode(&v, tw.Add); err != nil {
if err := rangedel.Encode(&v, tw.Add); err != nil {
return err
}
}
for _, v := range iter.RangeKeys(splitKey) {
// Same logic as for range tombstones, except added using tw.AddRangeKey.
if tw == nil {
if err := newOutput(); err != nil {
return err
}
}
if err := rangekey.Encode(&v, tw.AddRangeKey); err != nil {
return err
}
}
Expand Down Expand Up @@ -2410,7 +2551,7 @@ func (d *DB) runCompaction(
// to a grandparent file largest key, or nil. Taken together, these
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty(); {
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)

// Each inner loop iteration processes one key from the input iterator.
Expand All @@ -2425,7 +2566,8 @@ func (d *DB) runCompaction(
return nil, pendingOutputs, err
}
}
if key.Kind() == InternalKeyKindRangeDelete {
switch key.Kind() {
case InternalKeyKindRangeDelete:
// Range tombstones are handled specially. They are fragmented,
// and they're not written until later during `finishOutput()`.
// We add them to the `Fragmenter` now to make them visible to
Expand Down Expand Up @@ -2462,6 +2604,22 @@ func (d *DB) runCompaction(
c.rangeDelFrag.Add(clone)
}
continue
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
// Range keys are handled in the same way as range tombstones, except
// with a dedicated fragmenter.
if s := c.rangeKeyInterleaving.Span(); !s.Empty() {
clone := keyspan.Span{
Start: iter.cloneKey(s.Start),
End: iter.cloneKey(s.End),
Keys: make([]keyspan.Key, len(s.Keys)),
}
// Since the keys' Suffix and Value fields are not deep cloned, the
// underlying blockIter must be kept open for the lifetime of the
// compaction.
copy(clone.Keys, s.Keys)
c.rangeKeyFrag.Add(clone)
}
continue
}
if tw == nil {
if err := newOutput(); err != nil {
Expand Down
Loading

0 comments on commit d72083d

Please sign in to comment.