Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: refactor compaction splitting to reduce key comparisons #2259

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 75 additions & 56 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func (cl compactionLevel) String() string {

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int
type maybeSplit int

const (
noSplit compactionSplitSuggestion = iota
noSplit maybeSplit = iota
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
func (c maybeSplit) String() string {
if c == noSplit {
return "no-split"
}
Expand All @@ -123,15 +123,15 @@ type compactionOutputSplitter interface {
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
// runs out of keys. The limit returned MUST be > key according to the
// compaction's comparator. The specified key is the first key in the new
// output, or nil if this sstable will only contain range tombstones already
// in the fragmenter.
onNewOutput(key *InternalKey) []byte
onNewOutput(key []byte) []byte
}

// fileSizeSplitter is a compactionOutputSplitter that makes a determination
Expand All @@ -142,9 +142,7 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
Expand All @@ -155,55 +153,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
return nil
}

func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
s := &limitFuncSplitter{limitFunc: limitFunc}
s.frontier.Init(f, nil, s.reached)
return s
}

type limitFuncSplitter struct {
c *compaction
frontier frontier
limitFunc func(userKey []byte) []byte
limit []byte
split maybeSplit
}

func (lf *limitFuncSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
// NB: The limit must be applied using >= since lf.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `lf.limit` to the
// previous sstable.
if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
return splitNow
}
return noSplit
func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
return lf.split
}

func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
lf.split = splitNow
return nil
}

func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
lf.limit = nil
func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
lf.split = noSplit
if key != nil {
lf.limit = lf.limitFunc(key.UserKey)
} else {
// Use the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key.
// We use this as opposed to the end key of the
// last written sstable to effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions,
// so if we were to find the limit after the last written key at
// the split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0
// split point.
if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
lf.limit = lf.limitFunc(startKey)
}
}
return lf.limit
// TODO(jackson): For some users, like L0 flush splits, there's no need
// to binary search over all the flush splits every time. The next split
// point must be ahead of the previous flush split point.
limit := lf.limitFunc(key)
lf.frontier.Update(limit)
return limit
}
lf.frontier.Update(nil)
return nil
}

// splitterGroup is a compactionOutputSplitter that splits whenever one of its
Expand All @@ -215,7 +201,7 @@ type splitterGroup struct {

func (a *splitterGroup) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) (suggestion compactionSplitSuggestion) {
) (suggestion maybeSplit) {
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) == splitNow {
return splitNow
Expand All @@ -224,7 +210,7 @@ func (a *splitterGroup) shouldSplitBefore(
return noSplit
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
func (a *splitterGroup) onNewOutput(key []byte) []byte {
var earliestLimit []byte
for _, splitter := range a.splitters {
limit := splitter.onNewOutput(key)
Expand Down Expand Up @@ -252,9 +238,21 @@ type userKeyChangeSplitter struct {
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// NB: The userKeyChangeSplitter only needs to suffer a key comparison if
// the wrapped splitter requests a split.
//
// We could implement this splitter using frontiers: When the inner splitter
// requests a split before key `k`, we'd update a frontier to be
// ImmediateSuccessor(k). Then on the next key greater than >k, the
// frontier's `reached` func would be called and we'd return splitNow.
// This doesn't really save work since duplicate user keys are rare, and it
// requires us to materialize the ImmediateSuccessor key. It also prevents
// us from splitting on the same key that the inner splitter requested a
// split for—instead we need to wait until the next key. The current
// implementation uses `unsafePrevUserKey` to gain access to the previous
// key which allows it to immediately respect the inner splitter if
// possible.
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
Expand All @@ -264,7 +262,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
return u.splitter.onNewOutput(key)
}

Expand Down Expand Up @@ -2849,10 +2847,10 @@ func (d *DB) runCompaction(
return c.rangeDelFrag.Start()
},
},
&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
}
if splitL0Outputs {
outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

Expand All @@ -2865,7 +2863,28 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)
var firstKey []byte
if key != nil {
firstKey = key.UserKey
} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
// Pass the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key. We
// use this as opposed to the end key of the last written sstable to
// effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions, so
// if we were to find the limit after the last written key at the
// split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0 split
// point.
firstKey = startKey
}
splitterSuggestion := splitter.onNewOutput(firstKey)

// Each inner loop iteration processes one key from the input iterator.
for ; key != nil; key, val = iter.Next() {
Expand Down
Loading