Skip to content

Commit

Permalink
db: refactor compaction splitting to reduce key comparisons
Browse files Browse the repository at this point in the history
Introduce a new type `frontiers`, designed to monitor several different user
key frontiers during a compaction. When a user key is encountered that equals
or exceeds the configured frontier, the code that specified the frontier is
notified and given an opportunity to set a new frontier. Internally,
`frontiers` uses a heap (code largely copied from the merging iterator's heap)
to avoid N key comparisons for every key.

This commit refactors the `limitFuncSplitter` type to make use of `frontiers`.
The `limitFuncSplitter` type is used to split flushes to L0 flush split keys,
and to split both flushes and compactions to avoid excessive overlap with
grandparent files.

This change is motivated by #2156, which will introduce an additional
compaction-output splitter that must perform key comparisons against the next
key to decide when to split a compaction. Additionally, the `frontiers` type
may also be useful for other uses, such as applying key-space-dependent logic
during a compaction (eg, compaction-time GC, disaggregated storage locality
policies, or keyspan-restricted snapshots #1810).
  • Loading branch information
jbowens committed Jan 22, 2023
1 parent 19fce0e commit 6ac10ed
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 58 deletions.
109 changes: 58 additions & 51 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ type compactionLevel struct {

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int
type maybeSplit int

const (
noSplit compactionSplitSuggestion = iota
noSplit maybeSplit = iota
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
func (c maybeSplit) String() string {
if c == noSplit {
return "no-split"
}
Expand All @@ -111,15 +111,15 @@ type compactionOutputSplitter interface {
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
// runs out of keys. The limit returned MUST be > key according to the
// compaction's comparator. The specified key is the first key in the new
// output, or nil if this sstable will only contain range tombstones already
// in the fragmenter.
onNewOutput(key *InternalKey) []byte
onNewOutput(key []byte) []byte
}

// fileSizeSplitter is a compactionOutputSplitter that makes a determination
Expand All @@ -130,9 +130,7 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
Expand All @@ -143,54 +141,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
return nil
}

type limitFuncSplitter struct {
c *compaction
frontiers *frontiers
limitFunc func(userKey []byte) []byte
limit []byte
split maybeSplit
}

func (lf *limitFuncSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
// NB: The limit must be applied using >= since lf.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `lf.limit` to the
// previous sstable.
if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
return splitNow
}
return noSplit
var _ frontier = (*limitFuncSplitter)(nil)

func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
return lf.split
}

func (lf *limitFuncSplitter) key() []byte {
return lf.limit
}

func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
func (lf *limitFuncSplitter) reached(nextKey []byte) {
lf.limit = nil
lf.split = splitNow
}

func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
lf.split = noSplit
if key != nil {
lf.limit = lf.limitFunc(key.UserKey)
// TODO(jackson): For some users, like L0 flush splits, there's no need
// to binary search over all the flush splits every time. The next split
// point must be ahead of the previous flush split point.
lf.limit = lf.limitFunc(key)
} else {
// Use the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key.
// We use this as opposed to the end key of the
// last written sstable to effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions,
// so if we were to find the limit after the last written key at
// the split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0
// split point.
if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
lf.limit = lf.limitFunc(startKey)
}
lf.limit = nil
}
lf.frontiers.update(lf)
return lf.limit
}

Expand All @@ -203,7 +190,7 @@ type splitterGroup struct {

func (a *splitterGroup) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) (suggestion compactionSplitSuggestion) {
) (suggestion maybeSplit) {
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) == splitNow {
return splitNow
Expand All @@ -212,7 +199,7 @@ func (a *splitterGroup) shouldSplitBefore(
return noSplit
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
func (a *splitterGroup) onNewOutput(key []byte) []byte {
var earliestLimit []byte
for _, splitter := range a.splitters {
limit := splitter.onNewOutput(key)
Expand Down Expand Up @@ -240,9 +227,7 @@ type userKeyChangeSplitter struct {
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
Expand All @@ -252,7 +237,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
return u.splitter.onNewOutput(key)
}

Expand Down Expand Up @@ -2693,10 +2678,11 @@ func (d *DB) runCompaction(
return c.rangeDelFrag.Start()
},
},
&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
&limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findGrandparentLimit},
}
if splitL0Outputs {
outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
outputSplitters = append(outputSplitters,
&limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findL0Limit})
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

Expand All @@ -2709,7 +2695,28 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)
var firstKey []byte
if key != nil {
firstKey = key.UserKey
} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
// Pass the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key. We
// use this as opposed to the end key of the last written sstable to
// effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions, so
// if we were to find the limit after the last written key at the
// split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0 split
// point.
firstKey = startKey
}
splitterSuggestion := splitter.onNewOutput(firstKey)

// Each inner loop iteration processes one key from the input iterator.
for ; key != nil; key, val = iter.Next() {
Expand Down
159 changes: 159 additions & 0 deletions compaction_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"bytes"
"fmt"
"io"
"sort"
Expand Down Expand Up @@ -201,6 +202,12 @@ type compactionIter struct {
// numbers define the snapshot stripes (see the Snapshots description
// above). The sequence numbers are in ascending order.
snapshots []uint64
// frontiers holds a heap of user keys that affect compaction behavior when
// they're exceeded. Before a new key is returned, the compaction iterator
// advances the frontier, notifying any code that subscribed to be notified
// when a key was reached. See the compactionOutputSplitter implementations
// in compaction.go for one use.
frontiers frontiers
// Reference to the range deletion tombstone fragmenter (e.g.,
// `compaction.rangeDelFrag`).
rangeDelFrag *keyspan.Fragmenter
Expand Down Expand Up @@ -238,6 +245,7 @@ func newCompactionIter(
merge: merge,
iter: iter,
snapshots: snapshots,
frontiers: frontiers{cmp: cmp},
rangeDelFrag: rangeDelFrag,
rangeKeyFrag: rangeKeyFrag,
allowZeroSeqNum: allowZeroSeqNum,
Expand Down Expand Up @@ -770,6 +778,7 @@ func (i *compactionIter) saveKey() {
i.key.UserKey = i.keyBuf
i.key.Trailer = i.iterKey.Trailer
i.keyTrailer = i.iterKey.Trailer
i.frontiers.advance(i.key.UserKey)
}

func (i *compactionIter) cloneKey(key []byte) []byte {
Expand Down Expand Up @@ -898,3 +907,153 @@ func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
}
i.key.SetSeqNum(0)
}

// frontier encapsulates a monitored frontier. When `key` is reached or
// surpassed, the frontier's reached method is invoked with the key that reached
// the frontier. During the execution of reached, a frontier implementation may
// update the value of its `key`. If the `key` method returns nil, the frontier
// is removed from the heap and `reached` will not be invoked again, unless
// explictly re-added to the heap.
//
// A frontier's `key` must be stable between calls to `reached`. If a frontier
// needs to update its key outside the context of a `reached` invocation, it may
// call frontiers.set, passing itself in order to reposition the frontier within
// the heap.
type frontier interface {
key() []byte
reached(key []byte)
}

// frontiers implements a simple heap over user keys, intended for propagating
// information about progress of a compaction. Code that cares about when a
// compaction is about to surpass a key may add a frontier, with a `reached`
// function that will be invoked when the key is about to be reached or
// surpassed.
type frontiers struct {
cmp Compare
items []frontier
}

// String implements fmt.Stringer.
func (f *frontiers) String() string {
var buf bytes.Buffer
for i := 0; i < len(f.items); i++ {
if i > 0 {
fmt.Fprint(&buf, ", ")
}
fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key())
}
return buf.String()
}

// advance is called by the compaction loop with the next key that the
// compaction will write. It notifies all member frontiers with user keys ≤ k.
func (f *frontiers) advance(k []byte) {
for len(f.items) > 0 && f.cmp(k, f.items[0].key()) >= 0 {
// This frontier has been reached. Invoke the closure, and update with
// the next frontier.
f.items[0].reached(k)
if f.items[0].key() == nil {
// This was the final frontier that this user was concerned with.
// Remove it from the heap.
f.pop()
} else {
// Fix up the heap root.
f.fix(0)
}
}
}

// update must be called when a frontier's key has changed outside the context
// of a call to `reached`. If frontier.key() now returns nil, set removes the
// frontier from the heap. If frontier.key() now returns a non-nil key, set adds
// the frontier if not already contained with the heap, and fixes up its
// position if it already is.
func (f *frontiers) update(ff frontier) {
hasKey := ff.key() != nil
for i := 0; i < len(f.items); i++ {
if f.items[i] == ff {
if hasKey {
f.fix(i)
} else {
n := f.len() - 1
f.swap(i, n)
f.down(i, n)
f.items = f.items[:n]
}
return
}
}
if hasKey {
f.push(ff)
}
}

// push adds the provided frontier to the set of frontiers. If the provided
// frontier is already in the heap, it will be added again and will receive
// duplicate `reached` calls.
func (f *frontiers) push(ff frontier) {
n := len(f.items)
f.items = append(f.items, ff)
f.up(n)
}

func (f *frontiers) len() int {
return len(f.items)
}

func (f *frontiers) less(i, j int) bool {
return f.cmp(f.items[i].key(), f.items[j].key()) < 0
}

func (f *frontiers) swap(i, j int) {
f.items[i], f.items[j] = f.items[j], f.items[i]
}

// fix, up and down are copied from the go stdlib.

func (f *frontiers) fix(i int) {
if !f.down(i, f.len()) {
f.up(i)
}
}

func (f *frontiers) pop() *frontier {
n := f.len() - 1
f.swap(0, n)
f.down(0, n)
item := &f.items[n]
f.items = f.items[:n]
return item
}

func (f *frontiers) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || !f.less(j, i) {
break
}
f.swap(i, j)
j = i
}
}

func (f *frontiers) down(i0, n int) bool {
i := i0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
j = j2 // = 2*i + 2 // right child
}
if !f.less(j, i) {
break
}
f.swap(i, j)
i = j
}
return i > i0
}
Loading

0 comments on commit 6ac10ed

Please sign in to comment.