Skip to content

Commit

Permalink
db: refactor compaction splitting to reduce key comparisons
Browse files Browse the repository at this point in the history
Introduce a new type `frontiers`, designed to monitor several different user
key frontiers during a compaction. When a user key is encountered that equals
or exceeds the configured frontier, the code that specified the frontier is
notified and given an opportunity to set a new frontier. Internally,
`frontiers` uses a heap (code largely copied from the merging iterator's heap)
to avoid N key comparisons for every key.

This commit refactors the `limitFuncSplitter` type to make use of `frontiers`.
The `limitFuncSplitter` type is used to split flushes to L0 flush split keys,
and to split both flushes and compactions to avoid excessive overlap with
grandparent files.

This change is motivated by #2156, which will introduce an additional
compaction-output splitter that must perform key comparisons against the next
key to decide when to split a compaction. Additionally, the `frontiers` type
may also be useful for other uses, such as applying key-space-dependent logic
during a compaction (eg, compaction-time GC, disaggregated storage locality
policies, or keyspan-restricted snapshots #1810).
  • Loading branch information
jbowens committed Jan 31, 2023
1 parent 19fce0e commit 2d28811
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 58 deletions.
109 changes: 58 additions & 51 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ type compactionLevel struct {

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int
type maybeSplit int

const (
noSplit compactionSplitSuggestion = iota
noSplit maybeSplit = iota
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
func (c maybeSplit) String() string {
if c == noSplit {
return "no-split"
}
Expand All @@ -111,15 +111,15 @@ type compactionOutputSplitter interface {
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
// runs out of keys. The limit returned MUST be > key according to the
// compaction's comparator. The specified key is the first key in the new
// output, or nil if this sstable will only contain range tombstones already
// in the fragmenter.
onNewOutput(key *InternalKey) []byte
onNewOutput(key []byte) []byte
}

// fileSizeSplitter is a compactionOutputSplitter that makes a determination
Expand All @@ -130,9 +130,7 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
Expand All @@ -143,54 +141,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
return nil
}

type limitFuncSplitter struct {
c *compaction
frontiers *frontiers
limitFunc func(userKey []byte) []byte
limit []byte
split maybeSplit
}

func (lf *limitFuncSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
// NB: The limit must be applied using >= since lf.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `lf.limit` to the
// previous sstable.
if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
return splitNow
}
return noSplit
var _ frontier = (*limitFuncSplitter)(nil)

func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
return lf.split
}

func (lf *limitFuncSplitter) key() []byte {
return lf.limit
}

func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
func (lf *limitFuncSplitter) reached(nextKey []byte) {
lf.limit = nil
lf.split = splitNow
}

func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
lf.split = noSplit
if key != nil {
lf.limit = lf.limitFunc(key.UserKey)
// TODO(jackson): For some users, like L0 flush splits, there's no need
// to binary search over all the flush splits every time. The next split
// point must be ahead of the previous flush split point.
lf.limit = lf.limitFunc(key)
} else {
// Use the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key.
// We use this as opposed to the end key of the
// last written sstable to effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions,
// so if we were to find the limit after the last written key at
// the split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0
// split point.
if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
lf.limit = lf.limitFunc(startKey)
}
lf.limit = nil
}
lf.frontiers.update(lf)
return lf.limit
}

Expand All @@ -203,7 +190,7 @@ type splitterGroup struct {

func (a *splitterGroup) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) (suggestion compactionSplitSuggestion) {
) (suggestion maybeSplit) {
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) == splitNow {
return splitNow
Expand All @@ -212,7 +199,7 @@ func (a *splitterGroup) shouldSplitBefore(
return noSplit
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
func (a *splitterGroup) onNewOutput(key []byte) []byte {
var earliestLimit []byte
for _, splitter := range a.splitters {
limit := splitter.onNewOutput(key)
Expand Down Expand Up @@ -240,9 +227,7 @@ type userKeyChangeSplitter struct {
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
Expand All @@ -252,7 +237,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
return u.splitter.onNewOutput(key)
}

Expand Down Expand Up @@ -2693,10 +2678,11 @@ func (d *DB) runCompaction(
return c.rangeDelFrag.Start()
},
},
&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
&limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findGrandparentLimit},
}
if splitL0Outputs {
outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
outputSplitters = append(outputSplitters,
&limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findL0Limit})
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

Expand All @@ -2709,7 +2695,28 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)
var firstKey []byte
if key != nil {
firstKey = key.UserKey
} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
// Pass the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key. We
// use this as opposed to the end key of the last written sstable to
// effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions, so
// if we were to find the limit after the last written key at the
// split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0 split
// point.
firstKey = startKey
}
splitterSuggestion := splitter.onNewOutput(firstKey)

// Each inner loop iteration processes one key from the input iterator.
for ; key != nil; key, val = iter.Next() {
Expand Down
Loading

0 comments on commit 2d28811

Please sign in to comment.