diff --git a/compaction.go b/compaction.go index f63df64fbe..eee9ecdd2c 100644 --- a/compaction.go +++ b/compaction.go @@ -96,15 +96,15 @@ func (cl compactionLevel) String() string { // Return output from compactionOutputSplitters. See comment on // compactionOutputSplitter.shouldSplitBefore() on how this value is used. -type compactionSplitSuggestion int +type maybeSplit int const ( - noSplit compactionSplitSuggestion = iota + noSplit maybeSplit = iota splitNow ) // String implements the Stringer interface. -func (c compactionSplitSuggestion) String() string { +func (c maybeSplit) String() string { if c == noSplit { return "no-split" } @@ -123,7 +123,7 @@ type compactionOutputSplitter interface { // means no split is advised. If shouldSplitBefore(a) advises a split then // shouldSplitBefore(b) should also advise a split given b >= a, until // onNewOutput is called. - shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion + shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit // onNewOutput updates internal splitter state when the compaction switches // to a new sstable, and returns the next limit for the new output which // would get used to truncate range tombstones if the compaction iterator @@ -131,7 +131,7 @@ type compactionOutputSplitter interface { // compaction's comparator. The specified key is the first key in the new // output, or nil if this sstable will only contain range tombstones already // in the fragmenter. - onNewOutput(key *InternalKey) []byte + onNewOutput(key []byte) []byte } // fileSizeSplitter is a compactionOutputSplitter that makes a determination @@ -142,9 +142,7 @@ type fileSizeSplitter struct { maxFileSize uint64 } -func (f *fileSizeSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { // The Kind != RangeDelete part exists because EstimatedSize doesn't grow // rightaway when a range tombstone is added to the fragmenter. It's always // better to make a sequence of range tombstones visible to the fragmenter. @@ -155,55 +153,43 @@ func (f *fileSizeSplitter) shouldSplitBefore( return noSplit } -func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte { +func (f *fileSizeSplitter) onNewOutput(key []byte) []byte { return nil } +func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter { + s := &limitFuncSplitter{limitFunc: limitFunc} + s.frontier.Init(f, nil, s.reached) + return s +} + type limitFuncSplitter struct { - c *compaction + frontier frontier limitFunc func(userKey []byte) []byte - limit []byte + split maybeSplit } -func (lf *limitFuncSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { - // NB: The limit must be applied using >= since lf.limit may be used as the - // `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to - // serve as an *exclusive* end boundary truncation point. If we used > then, - // we may have already added a key with the user key `lf.limit` to the - // previous sstable. - if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 { - return splitNow - } - return noSplit +func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { + return lf.split +} + +func (lf *limitFuncSplitter) reached(nextKey []byte) []byte { + lf.split = splitNow + return nil } -func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte { - lf.limit = nil +func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte { + lf.split = noSplit if key != nil { - lf.limit = lf.limitFunc(key.UserKey) - } else { - // Use the start key of the first pending tombstone to find the - // next limit. All pending tombstones have the same start key. - // We use this as opposed to the end key of the - // last written sstable to effectively handle cases like these: - // - // a.SET.3 - // (lf.limit at b) - // d.RANGEDEL.4:f - // - // In this case, the partition after b has only range deletions, - // so if we were to find the limit after the last written key at - // the split point (key a), we'd get the limit b again, and - // finishOutput() would not advance any further because the next - // range tombstone to write does not start until after the L0 - // split point. - if startKey := lf.c.rangeDelFrag.Start(); startKey != nil { - lf.limit = lf.limitFunc(startKey) - } - } - return lf.limit + // TODO(jackson): For some users, like L0 flush splits, there's no need + // to binary search over all the flush splits every time. The next split + // point must be ahead of the previous flush split point. + limit := lf.limitFunc(key) + lf.frontier.Update(limit) + return limit + } + lf.frontier.Update(nil) + return nil } // splitterGroup is a compactionOutputSplitter that splits whenever one of its @@ -215,7 +201,7 @@ type splitterGroup struct { func (a *splitterGroup) shouldSplitBefore( key *InternalKey, tw *sstable.Writer, -) (suggestion compactionSplitSuggestion) { +) (suggestion maybeSplit) { for _, splitter := range a.splitters { if splitter.shouldSplitBefore(key, tw) == splitNow { return splitNow @@ -224,7 +210,7 @@ func (a *splitterGroup) shouldSplitBefore( return noSplit } -func (a *splitterGroup) onNewOutput(key *InternalKey) []byte { +func (a *splitterGroup) onNewOutput(key []byte) []byte { var earliestLimit []byte for _, splitter := range a.splitters { limit := splitter.onNewOutput(key) @@ -252,9 +238,21 @@ type userKeyChangeSplitter struct { unsafePrevUserKey func() []byte } -func (u *userKeyChangeSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { + // NB: The userKeyChangeSplitter only needs to suffer a key comparison if + // the wrapped splitter requests a split. + // + // We could implement this splitter using frontiers: When the inner splitter + // requests a split before key `k`, we'd update a frontier to be + // ImmediateSuccessor(k). Then on the next key greater than >k, the + // frontier's `reached` func would be called and we'd return splitNow. + // This doesn't really save work since duplicate user keys are rare, and it + // requires us to materialize the ImmediateSuccessor key. It also prevents + // us from splitting on the same key that the inner splitter requested a + // split for—instead we need to wait until the next key. The current + // implementation uses `unsafePrevUserKey` to gain access to the previous + // key which allows it to immediately respect the inner splitter if + // possible. if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow { return split } @@ -264,7 +262,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore( return noSplit } -func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte { +func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte { return u.splitter.onNewOutput(key) } @@ -2849,10 +2847,10 @@ func (d *DB) runCompaction( return c.rangeDelFrag.Start() }, }, - &limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit}, + newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit), } if splitL0Outputs { - outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit}) + outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit)) } splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters} @@ -2865,7 +2863,28 @@ func (d *DB) runCompaction( // progress guarantees ensure that eventually the input iterator will be // exhausted and the range tombstone fragments will all be flushed. for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); { - splitterSuggestion := splitter.onNewOutput(key) + var firstKey []byte + if key != nil { + firstKey = key.UserKey + } else if startKey := c.rangeDelFrag.Start(); startKey != nil { + // Pass the start key of the first pending tombstone to find the + // next limit. All pending tombstones have the same start key. We + // use this as opposed to the end key of the last written sstable to + // effectively handle cases like these: + // + // a.SET.3 + // (lf.limit at b) + // d.RANGEDEL.4:f + // + // In this case, the partition after b has only range deletions, so + // if we were to find the limit after the last written key at the + // split point (key a), we'd get the limit b again, and + // finishOutput() would not advance any further because the next + // range tombstone to write does not start until after the L0 split + // point. + firstKey = startKey + } + splitterSuggestion := splitter.onNewOutput(firstKey) // Each inner loop iteration processes one key from the input iterator. for ; key != nil; key, val = iter.Next() { diff --git a/compaction_iter.go b/compaction_iter.go index 3789972d4c..a8827e1690 100644 --- a/compaction_iter.go +++ b/compaction_iter.go @@ -5,6 +5,7 @@ package pebble import ( + "bytes" "fmt" "io" "sort" @@ -201,6 +202,17 @@ type compactionIter struct { // numbers define the snapshot stripes (see the Snapshots description // above). The sequence numbers are in ascending order. snapshots []uint64 + // frontiers holds a heap of user keys that affect compaction behavior when + // they're exceeded. Before a new key is returned, the compaction iterator + // advances the frontier, notifying any code that subscribed to be notified + // when a key was reached. The primary use today is within the + // implementation of compactionOutputSplitters in compaction.go. Many of + // these splitters wait for the compaction iterator to call Advance(k) when + // it's returning a new key. If the key that they're waiting for is + // surpassed, these splitters update internal state recording that they + // should request a compaction split next time they're asked in + // [shouldSplitBefore]. + frontiers frontiers // Reference to the range deletion tombstone fragmenter (e.g., // `compaction.rangeDelFrag`). rangeDelFrag *keyspan.Fragmenter @@ -238,6 +250,7 @@ func newCompactionIter( merge: merge, iter: iter, snapshots: snapshots, + frontiers: frontiers{cmp: cmp}, rangeDelFrag: rangeDelFrag, rangeKeyFrag: rangeKeyFrag, allowZeroSeqNum: allowZeroSeqNum, @@ -770,6 +783,7 @@ func (i *compactionIter) saveKey() { i.key.UserKey = i.keyBuf i.key.Trailer = i.iterKey.Trailer i.keyTrailer = i.iterKey.Trailer + i.frontiers.Advance(i.key.UserKey) } func (i *compactionIter) cloneKey(key []byte) []byte { @@ -898,3 +912,204 @@ func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) { } i.key.SetSeqNum(0) } + +// A frontier is used to monitor a compaction's progression across the user +// keyspace. +// +// A frontier hold a user key boundary that it's concerned with in its `key` +// field. If/when the compaction iterator returns an InternalKey with a user key +// _k_ such that k ≥ frontier.key, the compaction iterator invokes the +// frontier's `reached` function, passing _k_ as its argument. +// +// The `reached` function returns a new value to use as the key. If `reached` +// returns nil, the frontier is forgotten and its `reached` method will not be +// invoked again, unless the user calls [Update] to set a new key. +// +// A frontier's key may be updated outside the context of a `reached` +// invocation at any time, through its Update method. +type frontier struct { + // container points to the containing *frontiers that was passed to Init + // when the frontier was initialized. + container *frontiers + + // key holds the frontier's current key. If nil, this frontier is inactive + // and its reached func will not be invoked. The value of this key may only + // be updated by the `frontiers` type, or the Update method. + key []byte + + // reached is invoked to inform a frontier that its key has been reached. + // It's invoked with the user key that reached the limit. The `key` argument + // is guaranteed to be ≥ the frontier's key. + // + // After reached is invoked, the frontier's key is updated to the return + // value of `reached`. Note bene, the frontier is permitted to update its + // key to a user key ≤ the argument `key`. + // + // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the + // frontier will receive reached(k2) calls until it returns nil or a key + // `k3` such that k2 < k3. This property is useful for frontiers that use + // `reached` invocations to drive iteration through collections of keys that + // may contain multiple keys that are both < k2 and ≥ k1. + reached func(key []byte) (next []byte) +} + +// Init initializes the frontier with the provided key and reached callback. +// The frontier is attached to the provided *frontiers and the provided reached +// func will be invoked when the *frontiers is advanced to a key ≥ this +// frontier's key. +func (f *frontier) Init( + frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte), +) { + *f = frontier{ + container: frontiers, + key: initialKey, + reached: reached, + } + if initialKey != nil { + f.container.push(f) + } +} + +// String implements fmt.Stringer. +func (f *frontier) String() string { + return string(f.key) +} + +// Update replaces the existing frontier's key with the provided key. The +// frontier's reached func will be invoked when the new key is reached. +func (f *frontier) Update(key []byte) { + c := f.container + prevKeyIsNil := f.key == nil + f.key = key + if prevKeyIsNil { + if key != nil { + c.push(f) + } + return + } + + // Find the frontier within the heap (it must exist within the heap because + // f.key was != nil). If the frontier key is now nil, remove it from the + // heap. Otherwise, fix up its position. + for i := 0; i < len(c.items); i++ { + if c.items[i] == f { + if key != nil { + c.fix(i) + } else { + n := c.len() - 1 + c.swap(i, n) + c.down(i, n) + c.items = c.items[:n] + } + return + } + } + panic("unreachable") +} + +// frontiers is used to track progression of a task (eg, compaction) across the +// keyspace. Clients that want to be informed when the task advances to a key ≥ +// some frontier may register a frontier, providing a callback. The task calls +// `Advance(k)` with each user key encountered, which invokes the `reached` func +// on all tracked frontiers with `key`s ≤ k. +// +// Internally, frontiers is implemented as a simple heap. +type frontiers struct { + cmp Compare + items []*frontier +} + +// String implements fmt.Stringer. +func (f *frontiers) String() string { + var buf bytes.Buffer + for i := 0; i < len(f.items); i++ { + if i > 0 { + fmt.Fprint(&buf, ", ") + } + fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key) + } + return buf.String() +} + +// Advance notifies all member frontiers with keys ≤ k. +func (f *frontiers) Advance(k []byte) { + for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 { + // This frontier has been reached. Invoke the closure and update with + // the next frontier. + f.items[0].key = f.items[0].reached(k) + if f.items[0].key == nil { + // This was the final frontier that this user was concerned with. + // Remove it from the heap. + f.pop() + } else { + // Fix up the heap root. + f.fix(0) + } + } +} + +func (f *frontiers) len() int { + return len(f.items) +} + +func (f *frontiers) less(i, j int) bool { + return f.cmp(f.items[i].key, f.items[j].key) < 0 +} + +func (f *frontiers) swap(i, j int) { + f.items[i], f.items[j] = f.items[j], f.items[i] +} + +// fix, up and down are copied from the go stdlib. + +func (f *frontiers) fix(i int) { + if !f.down(i, f.len()) { + f.up(i) + } +} + +func (f *frontiers) push(ff *frontier) { + n := len(f.items) + f.items = append(f.items, ff) + f.up(n) +} + +func (f *frontiers) pop() *frontier { + n := f.len() - 1 + f.swap(0, n) + f.down(0, n) + item := f.items[n] + f.items = f.items[:n] + return item +} + +func (f *frontiers) up(j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !f.less(j, i) { + break + } + f.swap(i, j) + j = i + } +} + +func (f *frontiers) down(i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && f.less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !f.less(j, i) { + break + } + f.swap(i, j) + i = j + } + return i > i0 +} diff --git a/compaction_iter_test.go b/compaction_iter_test.go index e68c672dca..233b009c15 100644 --- a/compaction_iter_test.go +++ b/compaction_iter_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/internal/testkeys" ) func TestSnapshotIndex(t *testing.T) { @@ -273,3 +274,62 @@ func TestCompactionIter(t *testing.T) { }) } } + +func TestFrontiers(t *testing.T) { + cmp := testkeys.Comparer.Compare + var keySets [][][]byte + datadriven.RunTest(t, "testdata/frontiers", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "init": + // Init configures a frontier per line of input. Each line should + // contain a sorted whitespace-separated list of keys that the + // frontier will use. + // + // For example, the following input creates two separate monitored + // frontiers: one that sets its key successively to 'd', 'e', 'j' + // and one that sets its key to 'a', 'p', 'n', 'z': + // + // init + // b e j + // a p n z + + keySets = keySets[:0] + for _, line := range strings.Split(td.Input, "\n") { + keySets = append(keySets, bytes.Fields([]byte(line))) + } + return "" + case "scan": + f := &frontiers{cmp: cmp} + for _, keys := range keySets { + initTestFrontier(f, keys...) + } + var buf bytes.Buffer + for _, kStr := range strings.Fields(td.Input) { + k := []byte(kStr) + f.Advance(k) + fmt.Fprintf(&buf, "%s : { %s }\n", kStr, f.String()) + } + return buf.String() + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} + +// initTestFrontiers adds a new frontier to f that iterates through the provided +// keys. The keys slice must be sorted. +func initTestFrontier(f *frontiers, keys ...[]byte) *frontier { + ff := &frontier{} + var key []byte + if len(keys) > 0 { + key, keys = keys[0], keys[1:] + } + reached := func(k []byte) (nextKey []byte) { + if len(keys) > 0 { + nextKey, keys = keys[0], keys[1:] + } + return nextKey + } + ff.Init(f, key, reached) + return ff +} diff --git a/compaction_test.go b/compaction_test.go index 46d7e7e0e5..d4c56e3fce 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -3038,16 +3038,14 @@ func TestCompactionCheckOrdering(t *testing.T) { } type mockSplitter struct { - shouldSplitVal compactionSplitSuggestion + shouldSplitVal maybeSplit } -func (m *mockSplitter) shouldSplitBefore( - key *InternalKey, tw *sstable.Writer, -) compactionSplitSuggestion { +func (m *mockSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit { return m.shouldSplitVal } -func (m *mockSplitter) onNewOutput(key *InternalKey) []byte { +func (m *mockSplitter) onNewOutput(key []byte) []byte { return nil } @@ -3103,7 +3101,7 @@ func TestCompactionOutputSplitters(t *testing.T) { return "expected at least 2 args" } splitterToSet := (*pickSplitter(d.CmdArgs[0].Key)).(*mockSplitter) - var val compactionSplitSuggestion + var val maybeSplit switch d.CmdArgs[1].Key { case "split-now": val = splitNow @@ -3120,7 +3118,7 @@ func TestCompactionOutputSplitters(t *testing.T) { key := base.ParseInternalKey(d.CmdArgs[0].Key) shouldSplit := main.shouldSplitBefore(&key, nil) if shouldSplit == splitNow { - main.onNewOutput(&key) + main.onNewOutput(key.UserKey) prevUserKey = nil } else { prevUserKey = key.UserKey diff --git a/testdata/frontiers b/testdata/frontiers new file mode 100644 index 0000000000..313f693647 --- /dev/null +++ b/testdata/frontiers @@ -0,0 +1,71 @@ +# NB: The empty line in 'init' configures a frontier with no keys. It should +# never be added to the heap. + +init +b e j +a p n z + +f +---- + +scan +a b c d e f g h j i k l m n o p q r s t u v w x y z +---- +a : { b: "b", p: "p", f: "f" } +b : { e: "e", p: "p", f: "f" } +c : { e: "e", p: "p", f: "f" } +d : { e: "e", p: "p", f: "f" } +e : { f: "f", p: "p", j: "j" } +f : { j: "j", p: "p" } +g : { j: "j", p: "p" } +h : { j: "j", p: "p" } +j : { p: "p" } +i : { p: "p" } +k : { p: "p" } +l : { p: "p" } +m : { p: "p" } +n : { p: "p" } +o : { p: "p" } +p : { z: "z" } +q : { z: "z" } +r : { z: "z" } +s : { z: "z" } +t : { z: "z" } +u : { z: "z" } +v : { z: "z" } +w : { z: "z" } +x : { z: "z" } +y : { z: "z" } +z : { } + +scan +z +---- +z : { } + +scan +a z +---- +a : { b: "b", p: "p", f: "f" } +z : { } + +scan +e +---- +e : { f: "f", p: "p", j: "j" } + +# Test duplicate user keys within a frontier and across individual frontiers. + +init +b e e g +c e z +---- + +scan +a c d f z +---- +a : { b: "b", c: "c" } +c : { e: "e", e: "e" } +d : { e: "e", e: "e" } +f : { g: "g", z: "z" } +z : { }