Skip to content

Commit

Permalink
db: refactor compaction splitting to reduce key comparisons
Browse files Browse the repository at this point in the history
Introduce a new type `frontiers`, designed to monitor several different user
key frontiers during a compaction. When a user key is encountered that equals
or exceeds the configured frontier, the code that specified the frontier is
notified and given an opportunity to set a new frontier. Internally,
`frontiers` uses a heap (code largely copied from the merging iterator's heap)
to avoid N key comparisons for every key.

This commit refactors the `limitFuncSplitter` type to make use of `frontiers`.
The `limitFuncSplitter` type is used to split flushes to L0 flush split keys,
and to split both flushes and compactions to avoid excessive overlap with
grandparent files.

This change is motivated by #2156, which will introduce an additional
compaction-output splitter that must perform key comparisons against the next
key to decide when to split a compaction. Additionally, the `frontiers` type
may also be useful for other uses, such as applying key-space-dependent logic
during a compaction (eg, compaction-time GC, disaggregated storage locality
policies, or keyspan-restricted snapshots #1810).
  • Loading branch information
jbowens committed Feb 8, 2023
1 parent 7d1e4ba commit 0a09528
Show file tree
Hide file tree
Showing 5 changed files with 426 additions and 63 deletions.
131 changes: 75 additions & 56 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func (cl compactionLevel) String() string {

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int
type maybeSplit int

const (
noSplit compactionSplitSuggestion = iota
noSplit maybeSplit = iota
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
func (c maybeSplit) String() string {
if c == noSplit {
return "no-split"
}
Expand All @@ -123,15 +123,15 @@ type compactionOutputSplitter interface {
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
// runs out of keys. The limit returned MUST be > key according to the
// compaction's comparator. The specified key is the first key in the new
// output, or nil if this sstable will only contain range tombstones already
// in the fragmenter.
onNewOutput(key *InternalKey) []byte
onNewOutput(key []byte) []byte
}

// fileSizeSplitter is a compactionOutputSplitter that makes a determination
Expand All @@ -142,9 +142,7 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
Expand All @@ -155,55 +153,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
return nil
}

func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
s := &limitFuncSplitter{limitFunc: limitFunc}
s.frontier.Init(f, nil, s.reached)
return s
}

type limitFuncSplitter struct {
c *compaction
frontier frontier
limitFunc func(userKey []byte) []byte
limit []byte
split maybeSplit
}

func (lf *limitFuncSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
// NB: The limit must be applied using >= since lf.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `lf.limit` to the
// previous sstable.
if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
return splitNow
}
return noSplit
func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
return lf.split
}

func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
lf.split = splitNow
return nil
}

func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
lf.limit = nil
func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
lf.split = noSplit
if key != nil {
lf.limit = lf.limitFunc(key.UserKey)
} else {
// Use the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key.
// We use this as opposed to the end key of the
// last written sstable to effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions,
// so if we were to find the limit after the last written key at
// the split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0
// split point.
if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
lf.limit = lf.limitFunc(startKey)
}
}
return lf.limit
// TODO(jackson): For some users, like L0 flush splits, there's no need
// to binary search over all the flush splits every time. The next split
// point must be ahead of the previous flush split point.
limit := lf.limitFunc(key)
lf.frontier.Update(limit)
return limit
}
lf.frontier.Update(nil)
return nil
}

// splitterGroup is a compactionOutputSplitter that splits whenever one of its
Expand All @@ -215,7 +201,7 @@ type splitterGroup struct {

func (a *splitterGroup) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) (suggestion compactionSplitSuggestion) {
) (suggestion maybeSplit) {
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) == splitNow {
return splitNow
Expand All @@ -224,7 +210,7 @@ func (a *splitterGroup) shouldSplitBefore(
return noSplit
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
func (a *splitterGroup) onNewOutput(key []byte) []byte {
var earliestLimit []byte
for _, splitter := range a.splitters {
limit := splitter.onNewOutput(key)
Expand Down Expand Up @@ -252,9 +238,21 @@ type userKeyChangeSplitter struct {
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// NB: The userKeyChangeSplitter only needs to suffer a key comparison if
// the wrapped splitter requests a split.
//
// We could implement this splitter using frontiers: When the inner splitter
// requests a split beofre key `k`, we'd update a frontier to be
// ImmediateSuccessor(k). Then on the next key greater than >k, the
// frontier's `reached` func would be called and we'd return splitNow.
// This doesn't really save work since duplicate user keys are rare, and it
// requires us to materialize the ImmediateSuccessor key. It also prevents
// us from splitting on the same key that the inner splitter requested a
// split for—instead we need to wait until the next key. The current
// implementation uses `unsafePrevUserKey` to gain access to the previous
// key which allows it to immediately respect the inner splitter if
// possible.
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
Expand All @@ -264,7 +262,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
return u.splitter.onNewOutput(key)
}

Expand Down Expand Up @@ -2849,10 +2847,10 @@ func (d *DB) runCompaction(
return c.rangeDelFrag.Start()
},
},
&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
}
if splitL0Outputs {
outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

Expand All @@ -2865,7 +2863,28 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)
var firstKey []byte
if key != nil {
firstKey = key.UserKey
} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
// Pass the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key. We
// use this as opposed to the end key of the last written sstable to
// effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions, so
// if we were to find the limit after the last written key at the
// split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0 split
// point.
firstKey = startKey
}
splitterSuggestion := splitter.onNewOutput(firstKey)

// Each inner loop iteration processes one key from the input iterator.
for ; key != nil; key, val = iter.Next() {
Expand Down
Loading

0 comments on commit 0a09528

Please sign in to comment.