diff --git a/compaction.go b/compaction.go index bb318ff3bc..18d6c57a44 100644 --- a/compaction.go +++ b/compaction.go @@ -84,15 +84,15 @@ type compactionLevel struct { // Return output from compactionOutputSplitters. See comment on // compactionOutputSplitter.shouldSplitBefore() on how this value is used. -type compactionSplitSuggestion int +type maybeSplit int const ( - noSplit compactionSplitSuggestion = iota + noSplit maybeSplit = iota splitNow ) // String implements the Stringer interface. -func (c compactionSplitSuggestion) String() string { +func (c maybeSplit) String() string { if c == noSplit { return "no-split" } @@ -111,7 +111,7 @@ type compactionOutputSplitter interface { // means no split is advised. If shouldSplitBefore(a) advises a split then // shouldSplitBefore(b) should also advise a split given b >= a, until // onNewOutput is called. - shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion + shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit // onNewOutput updates internal splitter state when the compaction switches // to a new sstable, and returns the next limit for the new output which // would get used to truncate range tombstones if the compaction iterator @@ -119,7 +119,7 @@ type compactionOutputSplitter interface { // compaction's comparator. The specified key is the first key in the new // output, or nil if this sstable will only contain range tombstones already // in the fragmenter. - onNewOutput(key *InternalKey) []byte + onNewOutput(key []byte) []byte } // fileSizeSplitter is a compactionOutputSplitter that makes a determination @@ -130,9 +130,7 @@ type fileSizeSplitter struct { maxFileSize uint64 } -func (f *fileSizeSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { // The Kind != RangeDelete part exists because EstimatedSize doesn't grow // rightaway when a range tombstone is added to the fragmenter. It's always // better to make a sequence of range tombstones visible to the fragmenter. @@ -143,54 +141,43 @@ func (f *fileSizeSplitter) shouldSplitBefore( return noSplit } -func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte { +func (f *fileSizeSplitter) onNewOutput(key []byte) []byte { return nil } type limitFuncSplitter struct { - c *compaction + frontiers *frontiers limitFunc func(userKey []byte) []byte limit []byte + split maybeSplit } -func (lf *limitFuncSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { - // NB: The limit must be applied using >= since lf.limit may be used as the - // `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to - // serve as an *exclusive* end boundary truncation point. If we used > then, - // we may have already added a key with the user key `lf.limit` to the - // previous sstable. - if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 { - return splitNow - } - return noSplit +var _ frontier = (*limitFuncSplitter)(nil) + +func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { + return lf.split +} + +func (lf *limitFuncSplitter) key() []byte { + return lf.limit } -func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte { +func (lf *limitFuncSplitter) reached(nextKey []byte) { lf.limit = nil + lf.split = splitNow +} + +func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte { + lf.split = noSplit if key != nil { - lf.limit = lf.limitFunc(key.UserKey) + // TODO(jackson): For some users, like L0 flush splits, there's no need + // to binary search over all the flush splits every time. The next split + // point must be ahead of the previous flush split point. + lf.limit = lf.limitFunc(key) } else { - // Use the start key of the first pending tombstone to find the - // next limit. All pending tombstones have the same start key. - // We use this as opposed to the end key of the - // last written sstable to effectively handle cases like these: - // - // a.SET.3 - // (lf.limit at b) - // d.RANGEDEL.4:f - // - // In this case, the partition after b has only range deletions, - // so if we were to find the limit after the last written key at - // the split point (key a), we'd get the limit b again, and - // finishOutput() would not advance any further because the next - // range tombstone to write does not start until after the L0 - // split point. - if startKey := lf.c.rangeDelFrag.Start(); startKey != nil { - lf.limit = lf.limitFunc(startKey) - } + lf.limit = nil } + lf.frontiers.update(lf) return lf.limit } @@ -203,7 +190,7 @@ type splitterGroup struct { func (a *splitterGroup) shouldSplitBefore( key *InternalKey, tw *sstable.Writer, -) (suggestion compactionSplitSuggestion) { +) (suggestion maybeSplit) { for _, splitter := range a.splitters { if splitter.shouldSplitBefore(key, tw) == splitNow { return splitNow @@ -212,7 +199,7 @@ func (a *splitterGroup) shouldSplitBefore( return noSplit } -func (a *splitterGroup) onNewOutput(key *InternalKey) []byte { +func (a *splitterGroup) onNewOutput(key []byte) []byte { var earliestLimit []byte for _, splitter := range a.splitters { limit := splitter.onNewOutput(key) @@ -240,9 +227,7 @@ type userKeyChangeSplitter struct { unsafePrevUserKey func() []byte } -func (u *userKeyChangeSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow { return split } @@ -252,7 +237,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore( return noSplit } -func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte { +func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte { return u.splitter.onNewOutput(key) } @@ -2693,10 +2678,11 @@ func (d *DB) runCompaction( return c.rangeDelFrag.Start() }, }, - &limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit}, + &limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findGrandparentLimit}, } if splitL0Outputs { - outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit}) + outputSplitters = append(outputSplitters, + &limitFuncSplitter{frontiers: &iter.frontiers, limitFunc: c.findL0Limit}) } splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters} @@ -2709,7 +2695,28 @@ func (d *DB) runCompaction( // progress guarantees ensure that eventually the input iterator will be // exhausted and the range tombstone fragments will all be flushed. for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); { - splitterSuggestion := splitter.onNewOutput(key) + var firstKey []byte + if key != nil { + firstKey = key.UserKey + } else if startKey := c.rangeDelFrag.Start(); startKey != nil { + // Pass the start key of the first pending tombstone to find the + // next limit. All pending tombstones have the same start key. We + // use this as opposed to the end key of the last written sstable to + // effectively handle cases like these: + // + // a.SET.3 + // (lf.limit at b) + // d.RANGEDEL.4:f + // + // In this case, the partition after b has only range deletions, so + // if we were to find the limit after the last written key at the + // split point (key a), we'd get the limit b again, and + // finishOutput() would not advance any further because the next + // range tombstone to write does not start until after the L0 split + // point. + firstKey = startKey + } + splitterSuggestion := splitter.onNewOutput(firstKey) // Each inner loop iteration processes one key from the input iterator. for ; key != nil; key, val = iter.Next() { diff --git a/compaction_iter.go b/compaction_iter.go index 3789972d4c..a4c1f4b377 100644 --- a/compaction_iter.go +++ b/compaction_iter.go @@ -5,6 +5,7 @@ package pebble import ( + "bytes" "fmt" "io" "sort" @@ -201,6 +202,12 @@ type compactionIter struct { // numbers define the snapshot stripes (see the Snapshots description // above). The sequence numbers are in ascending order. snapshots []uint64 + // frontiers holds a heap of user keys that affect compaction behavior when + // they're exceeded. Before a new key is returned, the compaction iterator + // advances the frontier, notifying any code that subscribed to be notified + // when a key was reached. See the compactionOutputSplitter implementations + // in compaction.go for one use. + frontiers frontiers // Reference to the range deletion tombstone fragmenter (e.g., // `compaction.rangeDelFrag`). rangeDelFrag *keyspan.Fragmenter @@ -238,6 +245,7 @@ func newCompactionIter( merge: merge, iter: iter, snapshots: snapshots, + frontiers: frontiers{cmp: cmp}, rangeDelFrag: rangeDelFrag, rangeKeyFrag: rangeKeyFrag, allowZeroSeqNum: allowZeroSeqNum, @@ -770,6 +778,7 @@ func (i *compactionIter) saveKey() { i.key.UserKey = i.keyBuf i.key.Trailer = i.iterKey.Trailer i.keyTrailer = i.iterKey.Trailer + i.frontiers.advance(i.key.UserKey) } func (i *compactionIter) cloneKey(key []byte) []byte { @@ -898,3 +907,180 @@ func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) { } i.key.SetSeqNum(0) } + +// frontier encapsulates code concerned with a compaction's progression across +// the user keyspace. +// +// A frontier exposes the next user key boundary that it's concerned with +// through its `key` method. If/when the compaction iterator returns an +// InternalKey with a user key _k_ such that k ≥ frontier.key(), the compaction +// iterator invokes the frontier's `reached` method, passing _k_ as its +// argument. +// +// After the execution of `reached`, the frontier's `key` method must return nil +// or a new user key strictly greater than the previous user key returned by +// `key`. If `key` returns nil, the frontier is forgotten and its `reached` +// method will not be invoked again, unless the frontier is explictly re-added +// to the `frontiers` type with a new `key` value. +// +// A frontier's `key` must be stable between calls to `reached`. If a frontier +// needs to update its key outside the context of a `reached` invocation, it may +// call frontiers.update, passing itself. +type frontier interface { + // key returns the user key that the frontier is concerned with, requesting + // to be notified if a user key is encountered that is ≥ key(). + key() []byte + // reached is invoked to inform a frontier that its key() has been reached. + // It's invoked with the user key that reached the limit. The `key` argument + // is guaranteed to be ≥ the frontier's key(). + // + // After reached is invoked, the frontier's `key()` method must return a new + // key strictly greater than it's previous value of `key()`, or nil if the + // frontier is no longer interested in the frontiers' progression. Note + // bene, the frontier is permitted to update its `key()` to a user key ≤ the + // argument `key`: + // + // 1. previous frontier.key() ≤ key + // 2. previous frontier.key() < new frontier.key() + // + // If a frontier updates its `key()` to a value ≤ key, reached will be + // updated invoked again with the same `key`. + reached(key []byte) +} + +// frontiers holds a collection of `frontier` implementations, handling +// notifying them when their user keys are reached. The compaction iterator +// calls `advance` with the user key of every key it returns to the compaction +// loop. The `advance(k)` method handles invoking `reached` method on all +// tracked frontiers with `key()` values ≤ k. +// +// Internally, frontiers is implemented as a simple heap. +type frontiers struct { + cmp Compare + items []frontier +} + +// String implements fmt.Stringer. +func (f *frontiers) String() string { + var buf bytes.Buffer + for i := 0; i < len(f.items); i++ { + if i > 0 { + fmt.Fprint(&buf, ", ") + } + fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key()) + } + return buf.String() +} + +// advance notifies all member frontiers with key()s ≤ k and restores the heap +// for updated frontiers. +func (f *frontiers) advance(k []byte) { + for len(f.items) > 0 && f.cmp(k, f.items[0].key()) >= 0 { + // This frontier has been reached. Invoke the closure, and update with + // the next frontier. + f.items[0].reached(k) + if f.items[0].key() == nil { + // This was the final frontier that this user was concerned with. + // Remove it from the heap. + f.pop() + } else { + // Fix up the heap root. + f.fix(0) + } + } +} + +// update must be called when a frontier's key has changed outside the context +// of a call to `reached`. If frontier.key() now returns nil, update removes the +// frontier from the heap. If frontier.key() now returns a non-nil key, update +// adds the frontier if not already contained with the heap, and fixes up its +// position if it already is. +func (f *frontiers) update(ff frontier) { + hasKey := ff.key() != nil + for i := 0; i < len(f.items); i++ { + if f.items[i] == ff { + if hasKey { + f.fix(i) + } else { + n := f.len() - 1 + f.swap(i, n) + f.down(i, n) + f.items = f.items[:n] + } + return + } + } + if hasKey { + f.push(ff) + } +} + +// push adds the provided frontier to the set of frontiers. If the provided +// frontier is already in the heap, it will be added again and will receive +// duplicate `reached` calls. Callers that want to update the key of an existing +// heap should use update. +func (f *frontiers) push(ff frontier) { + n := len(f.items) + f.items = append(f.items, ff) + f.up(n) +} + +func (f *frontiers) len() int { + return len(f.items) +} + +func (f *frontiers) less(i, j int) bool { + return f.cmp(f.items[i].key(), f.items[j].key()) < 0 +} + +func (f *frontiers) swap(i, j int) { + f.items[i], f.items[j] = f.items[j], f.items[i] +} + +// fix, up and down are copied from the go stdlib. + +func (f *frontiers) fix(i int) { + if !f.down(i, f.len()) { + f.up(i) + } +} + +func (f *frontiers) pop() *frontier { + n := f.len() - 1 + f.swap(0, n) + f.down(0, n) + item := &f.items[n] + f.items = f.items[:n] + return item +} + +func (f *frontiers) up(j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !f.less(j, i) { + break + } + f.swap(i, j) + j = i + } +} + +func (f *frontiers) down(i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && f.less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !f.less(j, i) { + break + } + f.swap(i, j) + i = j + } + return i > i0 +} diff --git a/compaction_iter_test.go b/compaction_iter_test.go index e68c672dca..79ba7fac49 100644 --- a/compaction_iter_test.go +++ b/compaction_iter_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/internal/testkeys" ) func TestSnapshotIndex(t *testing.T) { @@ -273,3 +274,57 @@ func TestCompactionIter(t *testing.T) { }) } } + +func TestFrontiers(t *testing.T) { + cmp := testkeys.Comparer.Compare + var keySets [][][]byte + datadriven.RunTest(t, "testdata/frontiers", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "init": + keySets = keySets[:0] + for _, line := range strings.Split(td.Input, "\n") { + keySets = append(keySets, bytes.Fields([]byte(line))) + } + return "" + case "scan": + f := frontiers{cmp: cmp} + for i, keys := range keySets { + f.update(&testFrontier{label: strconv.Itoa(i), cmp: cmp, keys: keys}) + } + var buf bytes.Buffer + for _, kStr := range strings.Fields(td.Input) { + k := []byte(kStr) + f.advance(k) + fmt.Fprintf(&buf, "%s : { %s }\n", kStr, f.String()) + } + return buf.String() + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} + +type testFrontier struct { + label string + cmp Compare + keys [][]byte +} + +func (tf *testFrontier) String() string { return tf.label } + +func (tf *testFrontier) reached(k []byte) { + i := 1 + for ; i < len(tf.keys); i++ { + if tf.cmp(tf.keys[i], k) > 0 { + break + } + } + tf.keys = tf.keys[i:] +} + +func (tf *testFrontier) key() []byte { + if len(tf.keys) == 0 { + return nil + } + return tf.keys[0] +} diff --git a/compaction_test.go b/compaction_test.go index 9d6683943c..380a48c397 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -3041,16 +3041,14 @@ func TestCompactionCheckOrdering(t *testing.T) { } type mockSplitter struct { - shouldSplitVal compactionSplitSuggestion + shouldSplitVal maybeSplit } -func (m *mockSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (m *mockSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { return m.shouldSplitVal } -func (m *mockSplitter) onNewOutput(key *InternalKey) []byte { +func (m *mockSplitter) onNewOutput(key []byte) []byte { return nil } @@ -3106,7 +3104,7 @@ func TestCompactionOutputSplitters(t *testing.T) { return "expected at least 2 args" } splitterToSet := (*pickSplitter(d.CmdArgs[0].Key)).(*mockSplitter) - var val compactionSplitSuggestion + var val maybeSplit switch d.CmdArgs[1].Key { case "split-now": val = splitNow @@ -3123,7 +3121,7 @@ func TestCompactionOutputSplitters(t *testing.T) { key := base.ParseInternalKey(d.CmdArgs[0].Key) shouldSplit := main.shouldSplitBefore(&key, nil) if shouldSplit == splitNow { - main.onNewOutput(&key) + main.onNewOutput(key.UserKey) prevUserKey = nil } else { prevUserKey = key.UserKey diff --git a/testdata/frontiers b/testdata/frontiers new file mode 100644 index 0000000000..a8fa14d534 --- /dev/null +++ b/testdata/frontiers @@ -0,0 +1,52 @@ +init +b e j +a p n z + +f +---- + +scan +a b c d e f g h j i k l m n o p q r s t u v w x y z +---- +a : { 0: "b", 1: "p", 3: "f" } +b : { 0: "e", 1: "p", 3: "f" } +c : { 0: "e", 1: "p", 3: "f" } +d : { 0: "e", 1: "p", 3: "f" } +e : { 3: "f", 1: "p", 0: "j" } +f : { 0: "j", 1: "p" } +g : { 0: "j", 1: "p" } +h : { 0: "j", 1: "p" } +j : { 1: "p" } +i : { 1: "p" } +k : { 1: "p" } +l : { 1: "p" } +m : { 1: "p" } +n : { 1: "p" } +o : { 1: "p" } +p : { 1: "z" } +q : { 1: "z" } +r : { 1: "z" } +s : { 1: "z" } +t : { 1: "z" } +u : { 1: "z" } +v : { 1: "z" } +w : { 1: "z" } +x : { 1: "z" } +y : { 1: "z" } +z : { } + +scan +z +---- +z : { } + +scan +a z +---- +a : { 0: "b", 1: "p", 3: "f" } +z : { } + +scan +e +---- +e : { 3: "f", 1: "p", 0: "j" }