From 0a09528a3804f2b456b324593a8413e96dfd4e83 Mon Sep 17 00:00:00 2001
From: Jackson Owens <jackson@cockroachlabs.com>
Date: Fri, 20 Jan 2023 16:37:36 -0500
Subject: [PATCH] db: refactor compaction splitting to reduce key comparisons

Introduce a new type `frontiers`, designed to monitor several different user
key frontiers during a compaction. When a user key is encountered that equals
or exceeds the configured frontier, the code that specified the frontier is
notified and given an opportunity to set a new frontier. Internally,
`frontiers` uses a heap (code largely copied from the merging iterator's heap)
to avoid N key comparisons for every key.

This commit refactors the `limitFuncSplitter` type to make use of `frontiers`.
The `limitFuncSplitter` type is used to split flushes to L0 flush split keys,
and to split both flushes and compactions to avoid excessive overlap with
grandparent files.

This change is motivated by #2156, which will introduce an additional
compaction-output splitter that must perform key comparisons against the next
key to decide when to split a compaction. Additionally, the `frontiers` type
may also be useful for other uses, such as applying key-space-dependent logic
during a compaction (eg, compaction-time GC, disaggregated storage locality
policies, or keyspan-restricted snapshots #1810).
---
 compaction.go           | 131 +++++++++++++-----------
 compaction_iter.go      | 215 ++++++++++++++++++++++++++++++++++++++++
 compaction_iter_test.go |  60 +++++++++++
 compaction_test.go      |  12 +--
 testdata/frontiers      |  71 +++++++++++++
 5 files changed, 426 insertions(+), 63 deletions(-)
 create mode 100644 testdata/frontiers

diff --git a/compaction.go b/compaction.go
index f63df64fbe..37b62cd167 100644
--- a/compaction.go
+++ b/compaction.go
@@ -96,15 +96,15 @@ func (cl compactionLevel) String() string {
 
 // Return output from compactionOutputSplitters. See comment on
 // compactionOutputSplitter.shouldSplitBefore() on how this value is used.
-type compactionSplitSuggestion int
+type maybeSplit int
 
 const (
-	noSplit compactionSplitSuggestion = iota
+	noSplit maybeSplit = iota
 	splitNow
 )
 
 // String implements the Stringer interface.
-func (c compactionSplitSuggestion) String() string {
+func (c maybeSplit) String() string {
 	if c == noSplit {
 		return "no-split"
 	}
@@ -123,7 +123,7 @@ type compactionOutputSplitter interface {
 	// means no split is advised. If shouldSplitBefore(a) advises a split then
 	// shouldSplitBefore(b) should also advise a split given b >= a, until
 	// onNewOutput is called.
-	shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
+	shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
 	// onNewOutput updates internal splitter state when the compaction switches
 	// to a new sstable, and returns the next limit for the new output which
 	// would get used to truncate range tombstones if the compaction iterator
@@ -131,7 +131,7 @@ type compactionOutputSplitter interface {
 	// compaction's comparator. The specified key is the first key in the new
 	// output, or nil if this sstable will only contain range tombstones already
 	// in the fragmenter.
-	onNewOutput(key *InternalKey) []byte
+	onNewOutput(key []byte) []byte
 }
 
 // fileSizeSplitter is a compactionOutputSplitter that makes a determination
@@ -142,9 +142,7 @@ type fileSizeSplitter struct {
 	maxFileSize uint64
 }
 
-func (f *fileSizeSplitter) shouldSplitBefore(
-	key *InternalKey, tw *sstable.Writer,
-) compactionSplitSuggestion {
+func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
 	// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
 	// rightaway when a range tombstone is added to the fragmenter. It's always
 	// better to make a sequence of range tombstones visible to the fragmenter.
@@ -155,55 +153,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
 	return noSplit
 }
 
-func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
+func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
 	return nil
 }
 
+func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
+	s := &limitFuncSplitter{limitFunc: limitFunc}
+	s.frontier.Init(f, nil, s.reached)
+	return s
+}
+
 type limitFuncSplitter struct {
-	c         *compaction
+	frontier  frontier
 	limitFunc func(userKey []byte) []byte
-	limit     []byte
+	split     maybeSplit
 }
 
-func (lf *limitFuncSplitter) shouldSplitBefore(
-	key *InternalKey, tw *sstable.Writer,
-) compactionSplitSuggestion {
-	// NB: The limit must be applied using >= since lf.limit may be used as the
-	// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
-	// serve as an *exclusive* end boundary truncation point. If we used > then,
-	// we may have already added a key with the user key `lf.limit` to the
-	// previous sstable.
-	if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
-		return splitNow
-	}
-	return noSplit
+func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
+	return lf.split
+}
+
+func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
+	lf.split = splitNow
+	return nil
 }
 
-func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
-	lf.limit = nil
+func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
+	lf.split = noSplit
 	if key != nil {
-		lf.limit = lf.limitFunc(key.UserKey)
-	} else {
-		// Use the start key of the first pending tombstone to find the
-		// next limit. All pending tombstones have the same start key.
-		// We use this as opposed to the end key of the
-		// last written sstable to effectively handle cases like these:
-		//
-		// a.SET.3
-		// (lf.limit at b)
-		// d.RANGEDEL.4:f
-		//
-		// In this case, the partition after b has only range deletions,
-		// so if we were to find the limit after the last written key at
-		// the split point (key a), we'd get the limit b again, and
-		// finishOutput() would not advance any further because the next
-		// range tombstone to write does not start until after the L0
-		// split point.
-		if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
-			lf.limit = lf.limitFunc(startKey)
-		}
-	}
-	return lf.limit
+		// TODO(jackson): For some users, like L0 flush splits, there's no need
+		// to binary search over all the flush splits every time. The next split
+		// point must be ahead of the previous flush split point.
+		limit := lf.limitFunc(key)
+		lf.frontier.Update(limit)
+		return limit
+	}
+	lf.frontier.Update(nil)
+	return nil
 }
 
 // splitterGroup is a compactionOutputSplitter that splits whenever one of its
@@ -215,7 +201,7 @@ type splitterGroup struct {
 
 func (a *splitterGroup) shouldSplitBefore(
 	key *InternalKey, tw *sstable.Writer,
-) (suggestion compactionSplitSuggestion) {
+) (suggestion maybeSplit) {
 	for _, splitter := range a.splitters {
 		if splitter.shouldSplitBefore(key, tw) == splitNow {
 			return splitNow
@@ -224,7 +210,7 @@ func (a *splitterGroup) shouldSplitBefore(
 	return noSplit
 }
 
-func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
+func (a *splitterGroup) onNewOutput(key []byte) []byte {
 	var earliestLimit []byte
 	for _, splitter := range a.splitters {
 		limit := splitter.onNewOutput(key)
@@ -252,9 +238,21 @@ type userKeyChangeSplitter struct {
 	unsafePrevUserKey func() []byte
 }
 
-func (u *userKeyChangeSplitter) shouldSplitBefore(
-	key *InternalKey, tw *sstable.Writer,
-) compactionSplitSuggestion {
+func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
+	// NB: The userKeyChangeSplitter only needs to suffer a key comparison if
+	// the wrapped splitter requests a split.
+	//
+	// We could implement this splitter using frontiers: When the inner splitter
+	// requests a split beofre key `k`, we'd update a frontier to be
+	// ImmediateSuccessor(k). Then on the next key greater than >k, the
+	// frontier's `reached` func would be called and we'd return splitNow.
+	// This doesn't really save work since duplicate user keys are rare, and it
+	// requires us to materialize the ImmediateSuccessor key. It also prevents
+	// us from splitting on the same key that the inner splitter requested a
+	// split for—instead we need to wait until the next key. The current
+	// implementation uses `unsafePrevUserKey` to gain access to the previous
+	// key which allows it to immediately respect the inner splitter if
+	// possible.
 	if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
 		return split
 	}
@@ -264,7 +262,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
 	return noSplit
 }
 
-func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
+func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
 	return u.splitter.onNewOutput(key)
 }
 
@@ -2849,10 +2847,10 @@ func (d *DB) runCompaction(
 				return c.rangeDelFrag.Start()
 			},
 		},
-		&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
+		newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
 	}
 	if splitL0Outputs {
-		outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
+		outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
 	}
 	splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}
 
@@ -2865,7 +2863,28 @@ func (d *DB) runCompaction(
 	// progress guarantees ensure that eventually the input iterator will be
 	// exhausted and the range tombstone fragments will all be flushed.
 	for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
-		splitterSuggestion := splitter.onNewOutput(key)
+		var firstKey []byte
+		if key != nil {
+			firstKey = key.UserKey
+		} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
+			// Pass the start key of the first pending tombstone to find the
+			// next limit. All pending tombstones have the same start key. We
+			// use this as opposed to the end key of the last written sstable to
+			// effectively handle cases like these:
+			//
+			// a.SET.3
+			// (lf.limit at b)
+			// d.RANGEDEL.4:f
+			//
+			// In this case, the partition after b has only range deletions, so
+			// if we were to find the limit after the last written key at the
+			// split point (key a), we'd get the limit b again, and
+			// finishOutput() would not advance any further because the next
+			// range tombstone to write does not start until after the L0 split
+			// point.
+			firstKey = startKey
+		}
+		splitterSuggestion := splitter.onNewOutput(firstKey)
 
 		// Each inner loop iteration processes one key from the input iterator.
 		for ; key != nil; key, val = iter.Next() {
diff --git a/compaction_iter.go b/compaction_iter.go
index 3789972d4c..a8827e1690 100644
--- a/compaction_iter.go
+++ b/compaction_iter.go
@@ -5,6 +5,7 @@
 package pebble
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"sort"
@@ -201,6 +202,17 @@ type compactionIter struct {
 	// numbers define the snapshot stripes (see the Snapshots description
 	// above). The sequence numbers are in ascending order.
 	snapshots []uint64
+	// frontiers holds a heap of user keys that affect compaction behavior when
+	// they're exceeded. Before a new key is returned, the compaction iterator
+	// advances the frontier, notifying any code that subscribed to be notified
+	// when a key was reached. The primary use today is within the
+	// implementation of compactionOutputSplitters in compaction.go. Many of
+	// these splitters wait for the compaction iterator to call Advance(k) when
+	// it's returning a new key. If the key that they're waiting for is
+	// surpassed, these splitters update internal state recording that they
+	// should request a compaction split next time they're asked in
+	// [shouldSplitBefore].
+	frontiers frontiers
 	// Reference to the range deletion tombstone fragmenter (e.g.,
 	// `compaction.rangeDelFrag`).
 	rangeDelFrag *keyspan.Fragmenter
@@ -238,6 +250,7 @@ func newCompactionIter(
 		merge:               merge,
 		iter:                iter,
 		snapshots:           snapshots,
+		frontiers:           frontiers{cmp: cmp},
 		rangeDelFrag:        rangeDelFrag,
 		rangeKeyFrag:        rangeKeyFrag,
 		allowZeroSeqNum:     allowZeroSeqNum,
@@ -770,6 +783,7 @@ func (i *compactionIter) saveKey() {
 	i.key.UserKey = i.keyBuf
 	i.key.Trailer = i.iterKey.Trailer
 	i.keyTrailer = i.iterKey.Trailer
+	i.frontiers.Advance(i.key.UserKey)
 }
 
 func (i *compactionIter) cloneKey(key []byte) []byte {
@@ -898,3 +912,204 @@ func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
 	}
 	i.key.SetSeqNum(0)
 }
+
+// A frontier is used to monitor a compaction's progression across the user
+// keyspace.
+//
+// A frontier hold a user key boundary that it's concerned with in its `key`
+// field. If/when the compaction iterator returns an InternalKey with a user key
+// _k_ such that k ≥ frontier.key, the compaction iterator invokes the
+// frontier's `reached` function, passing _k_ as its argument.
+//
+// The `reached` function returns a new value to use as the key. If `reached`
+// returns nil, the frontier is forgotten and its `reached` method will not be
+// invoked again, unless the user calls [Update] to set a new key.
+//
+// A frontier's key may be updated outside the context of a `reached`
+// invocation at any time, through its Update method.
+type frontier struct {
+	// container points to the containing *frontiers that was passed to Init
+	// when the frontier was initialized.
+	container *frontiers
+
+	// key holds the frontier's current key. If nil, this frontier is inactive
+	// and its reached func will not be invoked. The value of this key may only
+	// be updated by the `frontiers` type, or the Update method.
+	key []byte
+
+	// reached is invoked to inform a frontier that its key has been reached.
+	// It's invoked with the user key that reached the limit. The `key` argument
+	// is guaranteed to be ≥ the frontier's key.
+	//
+	// After reached is invoked, the frontier's key is updated to the return
+	// value of `reached`. Note bene, the frontier is permitted to update its
+	// key to a user key ≤ the argument `key`.
+	//
+	// If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
+	// frontier will receive reached(k2) calls until it returns nil or a key
+	// `k3` such that k2 < k3. This property is useful for frontiers that use
+	// `reached` invocations to drive iteration through collections of keys that
+	// may contain multiple keys that are both < k2 and ≥ k1.
+	reached func(key []byte) (next []byte)
+}
+
+// Init initializes the frontier with the provided key and reached callback.
+// The frontier is attached to the provided *frontiers and the provided reached
+// func will be invoked when the *frontiers is advanced to a key ≥ this
+// frontier's key.
+func (f *frontier) Init(
+	frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
+) {
+	*f = frontier{
+		container: frontiers,
+		key:       initialKey,
+		reached:   reached,
+	}
+	if initialKey != nil {
+		f.container.push(f)
+	}
+}
+
+// String implements fmt.Stringer.
+func (f *frontier) String() string {
+	return string(f.key)
+}
+
+// Update replaces the existing frontier's key with the provided key. The
+// frontier's reached func will be invoked when the new key is reached.
+func (f *frontier) Update(key []byte) {
+	c := f.container
+	prevKeyIsNil := f.key == nil
+	f.key = key
+	if prevKeyIsNil {
+		if key != nil {
+			c.push(f)
+		}
+		return
+	}
+
+	// Find the frontier within the heap (it must exist within the heap because
+	// f.key was != nil). If the frontier key is now nil, remove it from the
+	// heap. Otherwise, fix up its position.
+	for i := 0; i < len(c.items); i++ {
+		if c.items[i] == f {
+			if key != nil {
+				c.fix(i)
+			} else {
+				n := c.len() - 1
+				c.swap(i, n)
+				c.down(i, n)
+				c.items = c.items[:n]
+			}
+			return
+		}
+	}
+	panic("unreachable")
+}
+
+// frontiers is used to track progression of a task (eg, compaction) across the
+// keyspace. Clients that want to be informed when the task advances to a key ≥
+// some frontier may register a frontier, providing a callback. The task calls
+// `Advance(k)` with each user key encountered, which invokes the `reached` func
+// on all tracked frontiers with `key`s ≤ k.
+//
+// Internally, frontiers is implemented as a simple heap.
+type frontiers struct {
+	cmp   Compare
+	items []*frontier
+}
+
+// String implements fmt.Stringer.
+func (f *frontiers) String() string {
+	var buf bytes.Buffer
+	for i := 0; i < len(f.items); i++ {
+		if i > 0 {
+			fmt.Fprint(&buf, ", ")
+		}
+		fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
+	}
+	return buf.String()
+}
+
+// Advance notifies all member frontiers with keys ≤ k.
+func (f *frontiers) Advance(k []byte) {
+	for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
+		// This frontier has been reached. Invoke the closure and update with
+		// the next frontier.
+		f.items[0].key = f.items[0].reached(k)
+		if f.items[0].key == nil {
+			// This was the final frontier that this user was concerned with.
+			// Remove it from the heap.
+			f.pop()
+		} else {
+			// Fix up the heap root.
+			f.fix(0)
+		}
+	}
+}
+
+func (f *frontiers) len() int {
+	return len(f.items)
+}
+
+func (f *frontiers) less(i, j int) bool {
+	return f.cmp(f.items[i].key, f.items[j].key) < 0
+}
+
+func (f *frontiers) swap(i, j int) {
+	f.items[i], f.items[j] = f.items[j], f.items[i]
+}
+
+// fix, up and down are copied from the go stdlib.
+
+func (f *frontiers) fix(i int) {
+	if !f.down(i, f.len()) {
+		f.up(i)
+	}
+}
+
+func (f *frontiers) push(ff *frontier) {
+	n := len(f.items)
+	f.items = append(f.items, ff)
+	f.up(n)
+}
+
+func (f *frontiers) pop() *frontier {
+	n := f.len() - 1
+	f.swap(0, n)
+	f.down(0, n)
+	item := f.items[n]
+	f.items = f.items[:n]
+	return item
+}
+
+func (f *frontiers) up(j int) {
+	for {
+		i := (j - 1) / 2 // parent
+		if i == j || !f.less(j, i) {
+			break
+		}
+		f.swap(i, j)
+		j = i
+	}
+}
+
+func (f *frontiers) down(i0, n int) bool {
+	i := i0
+	for {
+		j1 := 2*i + 1
+		if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
+			break
+		}
+		j := j1 // left child
+		if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
+			j = j2 // = 2*i + 2  // right child
+		}
+		if !f.less(j, i) {
+			break
+		}
+		f.swap(i, j)
+		i = j
+	}
+	return i > i0
+}
diff --git a/compaction_iter_test.go b/compaction_iter_test.go
index e68c672dca..233b009c15 100644
--- a/compaction_iter_test.go
+++ b/compaction_iter_test.go
@@ -17,6 +17,7 @@ import (
 	"github.com/cockroachdb/pebble/internal/base"
 	"github.com/cockroachdb/pebble/internal/keyspan"
 	"github.com/cockroachdb/pebble/internal/rangekey"
+	"github.com/cockroachdb/pebble/internal/testkeys"
 )
 
 func TestSnapshotIndex(t *testing.T) {
@@ -273,3 +274,62 @@ func TestCompactionIter(t *testing.T) {
 		})
 	}
 }
+
+func TestFrontiers(t *testing.T) {
+	cmp := testkeys.Comparer.Compare
+	var keySets [][][]byte
+	datadriven.RunTest(t, "testdata/frontiers", func(t *testing.T, td *datadriven.TestData) string {
+		switch td.Cmd {
+		case "init":
+			// Init configures a frontier per line of input. Each line should
+			// contain a sorted whitespace-separated list of keys that the
+			// frontier will use.
+			//
+			// For example, the following input creates two separate monitored
+			// frontiers: one that sets its key successively to 'd', 'e', 'j'
+			// and one that sets its key to 'a', 'p', 'n', 'z':
+			//
+			//    init
+			//    b e j
+			//    a p n z
+
+			keySets = keySets[:0]
+			for _, line := range strings.Split(td.Input, "\n") {
+				keySets = append(keySets, bytes.Fields([]byte(line)))
+			}
+			return ""
+		case "scan":
+			f := &frontiers{cmp: cmp}
+			for _, keys := range keySets {
+				initTestFrontier(f, keys...)
+			}
+			var buf bytes.Buffer
+			for _, kStr := range strings.Fields(td.Input) {
+				k := []byte(kStr)
+				f.Advance(k)
+				fmt.Fprintf(&buf, "%s : { %s }\n", kStr, f.String())
+			}
+			return buf.String()
+		default:
+			return fmt.Sprintf("unrecognized command %q", td.Cmd)
+		}
+	})
+}
+
+// initTestFrontiers adds a new frontier to f that iterates through the provided
+// keys. The keys slice must be sorted.
+func initTestFrontier(f *frontiers, keys ...[]byte) *frontier {
+	ff := &frontier{}
+	var key []byte
+	if len(keys) > 0 {
+		key, keys = keys[0], keys[1:]
+	}
+	reached := func(k []byte) (nextKey []byte) {
+		if len(keys) > 0 {
+			nextKey, keys = keys[0], keys[1:]
+		}
+		return nextKey
+	}
+	ff.Init(f, key, reached)
+	return ff
+}
diff --git a/compaction_test.go b/compaction_test.go
index 46d7e7e0e5..d4c56e3fce 100644
--- a/compaction_test.go
+++ b/compaction_test.go
@@ -3038,16 +3038,14 @@ func TestCompactionCheckOrdering(t *testing.T) {
 }
 
 type mockSplitter struct {
-	shouldSplitVal compactionSplitSuggestion
+	shouldSplitVal maybeSplit
 }
 
-func (m *mockSplitter) shouldSplitBefore(
-	key *InternalKey, tw *sstable.Writer,
-) compactionSplitSuggestion {
+func (m *mockSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
 	return m.shouldSplitVal
 }
 
-func (m *mockSplitter) onNewOutput(key *InternalKey) []byte {
+func (m *mockSplitter) onNewOutput(key []byte) []byte {
 	return nil
 }
 
@@ -3103,7 +3101,7 @@ func TestCompactionOutputSplitters(t *testing.T) {
 					return "expected at least 2 args"
 				}
 				splitterToSet := (*pickSplitter(d.CmdArgs[0].Key)).(*mockSplitter)
-				var val compactionSplitSuggestion
+				var val maybeSplit
 				switch d.CmdArgs[1].Key {
 				case "split-now":
 					val = splitNow
@@ -3120,7 +3118,7 @@ func TestCompactionOutputSplitters(t *testing.T) {
 				key := base.ParseInternalKey(d.CmdArgs[0].Key)
 				shouldSplit := main.shouldSplitBefore(&key, nil)
 				if shouldSplit == splitNow {
-					main.onNewOutput(&key)
+					main.onNewOutput(key.UserKey)
 					prevUserKey = nil
 				} else {
 					prevUserKey = key.UserKey
diff --git a/testdata/frontiers b/testdata/frontiers
new file mode 100644
index 0000000000..313f693647
--- /dev/null
+++ b/testdata/frontiers
@@ -0,0 +1,71 @@
+# NB: The empty line in 'init' configures a frontier with no keys. It should
+# never be added to the heap.
+
+init
+b e j
+a p n z
+
+f
+----
+
+scan
+a b c d e f g h j i k l m n o p q r s t u v w x y z
+----
+a : { b: "b", p: "p", f: "f" }
+b : { e: "e", p: "p", f: "f" }
+c : { e: "e", p: "p", f: "f" }
+d : { e: "e", p: "p", f: "f" }
+e : { f: "f", p: "p", j: "j" }
+f : { j: "j", p: "p" }
+g : { j: "j", p: "p" }
+h : { j: "j", p: "p" }
+j : { p: "p" }
+i : { p: "p" }
+k : { p: "p" }
+l : { p: "p" }
+m : { p: "p" }
+n : { p: "p" }
+o : { p: "p" }
+p : { z: "z" }
+q : { z: "z" }
+r : { z: "z" }
+s : { z: "z" }
+t : { z: "z" }
+u : { z: "z" }
+v : { z: "z" }
+w : { z: "z" }
+x : { z: "z" }
+y : { z: "z" }
+z : {  }
+
+scan
+z
+----
+z : {  }
+
+scan
+a z
+----
+a : { b: "b", p: "p", f: "f" }
+z : {  }
+
+scan
+e
+----
+e : { f: "f", p: "p", j: "j" }
+
+# Test duplicate user keys within a frontier and across individual frontiers.
+
+init
+b e e g
+c e z
+----
+
+scan
+a c d f z
+----
+a : { b: "b", c: "c" }
+c : { e: "e", e: "e" }
+d : { e: "e", e: "e" }
+f : { g: "g", z: "z" }
+z : {  }