diff --git a/pkg/util/span/frontier.go b/pkg/util/span/frontier.go index 3242ea118a95..5e36dd2b51a9 100644 --- a/pkg/util/span/frontier.go +++ b/pkg/util/span/frontier.go @@ -57,6 +57,7 @@ type Frontier interface { // Entries invokes the given callback with the current timestamp for each // component span in the tracked span set. + // The fn may not mutate this frontier while iterating. Entries(fn Operation) // SpanEntries invokes op for each sub-span of the specified span with the @@ -77,6 +78,7 @@ type Frontier interface { // // Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans // tracked by this frontier. + // The fn may not mutate this frontier while iterating. SpanEntries(span roachpb.Span, op Operation) // Len returns the number of spans tracked by the frontier. @@ -161,6 +163,11 @@ type btreeFrontier struct { idAlloc uint64 mergeAlloc []*btreeFrontierEntry // Amortize allocations. + + // disallowMutationWhileIterating is set when iterating + // over frontier entries. Attempts to mutate this frontier + // will panic under the test or return an error. + disallowMutationWhileIterating bool } // btreeFrontierEntry represents a timestamped span. It is used as the nodes in both @@ -199,6 +206,10 @@ type btreeFrontierEntry struct { // the underlying keys. If the caller has to modify underlying key slices, they // must pass in the copy. func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) (retErr error) { + if err := f.checkDisallowedMutation(); err != nil { + return err + } + if expensiveChecksEnabled() { defer func() { if err := f.checkUnsafeKeyModification(); err != nil { @@ -207,34 +218,24 @@ func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) }() } - collectOverlaps := func(s roachpb.Span) (overlaps []*btreeFrontierEntry) { - key := newSearchKey(s.Key, s.EndKey) - defer putFrontierEntry(key) - - it := f.tree.MakeIter() - for it.FirstOverlap(key); it.Valid(); it.NextOverlap(key) { - overlaps = append(overlaps, it.Cur()) - } - return overlaps - } - - for _, s := range spans { + for _, toAdd := range spans { // Validate caller provided span. - if err := checkSpan(s); err != nil { + if err := checkSpan(toAdd); err != nil { return err } - var sg roachpb.SpanGroup - sg.Add(s) - for _, o := range collectOverlaps(s) { - if err := f.deleteEntry(o); err != nil { + // Add toAdd sub-spans that do not overlap this frontier. To ensure that + // adjacent spans are merged, sub-spans are added in two steps: first, + // non-overlapping spans are added with 0 timestamp; then the timestamp for + // the entire toAdd span is forwarded. + for _, s := range spanDifference(toAdd, f) { + e := newFrontierEntry(&f.idAlloc, s.Key, s.EndKey, hlc.Timestamp{}) + if err := f.setEntry(e); err != nil { + putFrontierEntry(e) return err } } - if err := sg.ForEach(func(span roachpb.Span) error { - e := newFrontierEntry(&f.idAlloc, span.Key, span.EndKey, startAt) - return f.setEntry(e) - }); err != nil { + if err := f.forward(toAdd, startAt); err != nil { return err } } @@ -283,6 +284,10 @@ func (f *btreeFrontier) PeekFrontierSpan() roachpb.Span { func (f *btreeFrontier) Forward( span roachpb.Span, ts hlc.Timestamp, ) (forwarded bool, retErr error) { + if err := f.checkDisallowedMutation(); err != nil { + return false, err + } + // Validate caller provided span. if err := checkSpan(span); err != nil { return false, err @@ -455,7 +460,6 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error return f.mergeEntries(e) } - it := f.tree.MakeIter() for !todoEntry.isEmptyRange() { // Keep going as long as there is work to be done. if expensiveChecksEnabled() { if err := checkSpan(todoEntry.span()); err != nil { @@ -464,6 +468,7 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error } // Seek to the first entry overlapping todoEntry. + it := f.tree.MakeIter() it.FirstOverlap(todoEntry) if !it.Valid() { break @@ -501,9 +506,9 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error // At this point, we know that overlap timestamp is not ahead of the // insertTS (otherwise we'd hit fast case above). // We need to split overlap range into multiple parts. - // 1. Possibly isEmptyRange part before todoEntry.Start + // 1. Possibly empty part before todoEntry.Start // 2. Middle part (with updated timestamp), - // 3. Possibly isEmptyRange part after todoEntry end. + // 3. Possibly empty part after todoEntry end. if overlap.Start.Compare(todoEntry.Start) < 0 { // Split overlap into 2 entries // [overlap.Start, todoEntry.Start) and [todoEntry.Start, overlap.End) @@ -549,9 +554,18 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error return nil } +func (f *btreeFrontier) disallowMutations() func() { + f.disallowMutationWhileIterating = true + return func() { + f.disallowMutationWhileIterating = false + } +} + // Entries invokes the given callback with the current timestamp for each // component span in the tracked span set. func (f *btreeFrontier) Entries(fn Operation) { + defer f.disallowMutations()() + it := f.tree.MakeIter() for it.First(); it.Valid(); it.Next() { if fn(it.Cur().span(), it.Cur().ts) == StopMatch { @@ -579,6 +593,8 @@ func (f *btreeFrontier) Entries(fn Operation) { // Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans // tracked by this frontier. func (f *btreeFrontier) SpanEntries(span roachpb.Span, op Operation) { + defer f.disallowMutations()() + todoRange := newSearchKey(span.Key, span.EndKey) defer putFrontierEntry(todoRange) @@ -776,12 +792,37 @@ func (f *btreeFrontier) checkUnsafeKeyModification() error { return nil } +func (f *btreeFrontier) checkDisallowedMutation() error { + if f.disallowMutationWhileIterating { + err := errors.AssertionFailedWithDepthf(1, "attempt to mutate frontier while iterating") + if buildutil.CrdbTestBuild { + panic(err) + } + return err + } + return nil +} + var disableSanityChecksForBenchmark bool func expensiveChecksEnabled() bool { return buildutil.CrdbTestBuild && !disableSanityChecksForBenchmark } +// spanDifference subtracts frontier (spans) from this span, and +// returns set difference. +func spanDifference(s roachpb.Span, f Frontier) []roachpb.Span { + var sg roachpb.SpanGroup + sg.Add(s) + + f.SpanEntries(s, func(overlap roachpb.Span, ts hlc.Timestamp) (done OpResult) { + sg.Sub(overlap) + return false + }) + + return sg.Slice() +} + type concurrentFrontier struct { syncutil.Mutex f Frontier diff --git a/pkg/util/span/frontier_test.go b/pkg/util/span/frontier_test.go index 6a4c4a2a9916..901286e0d0c3 100644 --- a/pkg/util/span/frontier_test.go +++ b/pkg/util/span/frontier_test.go @@ -320,10 +320,6 @@ func TestSequentialSpans(t *testing.T) { }) } -func makeSingleCharSpan(start, end byte) roachpb.Span { - return roachpb.Span{Key: roachpb.Key{start}, EndKey: roachpb.Key{end}} -} - func makeSpan(start, end string) roachpb.Span { return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} } @@ -352,13 +348,13 @@ func TestSpanEntries(t *testing.T) { defer enableBtreeFrontier(useBtreeFrontier)() t.Run("contiguous frontier", func(t *testing.T) { - spAZ := makeSingleCharSpan('A', 'Z') + spAZ := makeSpan("A", "Z") f, err := MakeFrontier(spAZ) require.NoError(t, err) // Nothing overlaps span fully to the left of frontier. - require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('0', '9'))) + require.Equal(t, ``, spanEntries(f, makeSpan("0", "9"))) // Nothing overlaps span fully to the right of the frontier. - require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('a', 'z'))) + require.Equal(t, ``, spanEntries(f, makeSpan("a", "z"))) // Span overlaps entire frontier. require.Equal(t, `{A-Z}@0`, spanEntries(f, spAZ)) @@ -366,36 +362,36 @@ func TestSpanEntries(t *testing.T) { require.Equal(t, `{A-Z}@1`, spanEntries(f, spAZ)) // Span overlaps part of the frontier, with left part outside frontier. - require.Equal(t, `{A-C}@1`, spanEntries(f, makeSingleCharSpan('0', 'C'))) + require.Equal(t, `{A-C}@1`, spanEntries(f, makeSpan("0", "C"))) // Span overlaps part of the frontier, with right part outside frontier. - require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSingleCharSpan('Q', 'c'))) + require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSpan("Q", "c"))) // Span fully inside frontier. - require.Equal(t, `{P-W}@1`, spanEntries(f, makeSingleCharSpan('P', 'W'))) + require.Equal(t, `{P-W}@1`, spanEntries(f, makeSpan("P", "W"))) // Advance part of the frontier. - advance(f, makeSingleCharSpan('C', 'E'), 2) - advance(f, makeSingleCharSpan('H', 'M'), 5) - advance(f, makeSingleCharSpan('N', 'Q'), 3) + advance(f, makeSpan("C", "E"), 2) + advance(f, makeSpan("H", "M"), 5) + advance(f, makeSpan("N", "Q"), 3) // Span overlaps various parts of the frontier. require.Equal(t, `{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`, - spanEntries(f, makeSingleCharSpan('3', 'P'))) + spanEntries(f, makeSpan("3", "P"))) }) t.Run("disjoint frontier", func(t *testing.T) { - spAB := makeSingleCharSpan('A', 'B') - spCE := makeSingleCharSpan('C', 'E') + spAB := makeSpan("A", "B") + spCE := makeSpan("C", "E") f, err := MakeFrontier(spAB, spCE) require.NoError(t, err) // Nothing overlaps between the two spans in the frontier. - require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('B', 'C'))) + require.Equal(t, ``, spanEntries(f, makeSpan("B", "C"))) // Overlap with only one entry in the frontier - require.Equal(t, `{C-D}@0`, spanEntries(f, makeSingleCharSpan('B', 'D'))) + require.Equal(t, `{C-D}@0`, spanEntries(f, makeSpan("B", "D"))) }) }) } @@ -493,7 +489,7 @@ func advanceFrontier(t *testing.T, f Frontier, s roachpb.Span, wall int64) { func TestForwardInvertedSpan(t *testing.T) { defer leaktest.AfterTest(t)() - spAZ := makeSingleCharSpan('A', 'Z') + spAZ := makeSpan("A", "Z") testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) { defer enableBtreeFrontier(useBtreeFrontier)() @@ -521,7 +517,7 @@ func TestForwardInvertedSpan(t *testing.T) { func TestForwardToSameTimestamp(t *testing.T) { defer enableBtreeFrontier(true)() // LLRB frontier fails this test - spAZ := makeSingleCharSpan('A', 'Z') + spAZ := makeSpan("A", "Z") f, err := MakeFrontier(spAZ) require.NoError(t, err) @@ -547,11 +543,66 @@ func TestAddOverlappingSpans(t *testing.T) { for r := 'A'; r < 'Z'; r++ { require.NoError(t, f.AddSpansAt(ts(int64(r-'A'+1)), makeSpan(string(r), string(r+'a'-'A')))) } - require.NoError(t, f.AddSpansAt(ts(42), makeSpan("a", "z"))) + require.NoError(t, f.AddSpansAt(ts(42), makeSpan("A", "z"))) require.Equal(t, hlc.Timestamp{WallTime: 42}, f.Frontier(), "f=%s", f) }) } +func TestBtreeFrontierMergesSpansDuringInitialization(t *testing.T) { + ts := func(wall int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: wall} + } + + testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) { + defer enableBtreeFrontier(useBtreeFrontier)() + + f, err := MakeFrontier() + require.NoError(t, err) + + require.NoError(t, f.AddSpansAt(ts(8), makeSpan("A", "C"))) + require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "D"))) + require.NoError(t, f.AddSpansAt(ts(9), makeSpan("C", "Z"))) + start, end, err := checkContiguousFrontier(f) + require.NoError(t, err) + require.Equal(t, []byte{'A'}, start, f) + require.Equal(t, []byte{'Z'}, end, f) + require.Equal(t, "{A-B}@8 {B-D}@10 {D-Z}@9", entriesStr(f)) + }) +} + +// Regression for #115411 +func TestForwardDeepNestedFrontierEntry(t *testing.T) { + defer leaktest.AfterTest(t)() + + ts := func(wall int) hlc.Timestamp { + return hlc.Timestamp{WallTime: int64(wall)} + } + + testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) { + defer enableBtreeFrontier(useBtreeFrontier)() + f, err := MakeFrontier() + require.NoError(t, err) + + require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "C"))) + + // Add a bunch of ranges inside [B-C) range. + // We want to add more than 32 of such ranges to make sure that + // the underlying b-tree node (if using btree frontier) gets some "children" + // nodes created. + bStart := "B" + for i := 0; i < 64; i++ { + bEnd := "B" + strings.Repeat("b", i+1) + require.NoError(t, f.AddSpansAt(ts(i+10), makeSpan(bStart, bEnd))) + _, _, err := checkContiguousFrontier(f) + require.NoError(t, err, f) + bStart = bEnd + } + + advanceFrontier(t, f, makeSpan("A", "Z"), 100) + require.Equal(t, "{B-C}@100", entriesStr(f)) + }) +} + // symbols that can make up spans. var spanSymbols = []byte("@$0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") @@ -649,7 +700,7 @@ func fuzzFrontier(f *testing.F) { for i := 0; i < corpusSize; i++ { s := spanMaker.rndSpan() // Add fuzz corpus. Note: timestamps added could be negative, which - // is of course is not a valid timestamp, but makes it so much fun to test. + // of course is not a valid timestamp, but makes it so much fun to test. f.Add([]byte(s.Key), []byte(s.EndKey), rnd.Intn(corpusSize)-rnd.Intn(corpusSize)) } diff --git a/pkg/util/span/llrb_frontier.go b/pkg/util/span/llrb_frontier.go index 73e6de08c694..566d553f9142 100644 --- a/pkg/util/span/llrb_frontier.go +++ b/pkg/util/span/llrb_frontier.go @@ -122,42 +122,28 @@ func (f *llrbFrontier) Release() {} // AddSpansAt adds the provided spans to the llrbFrontier at the provided timestamp. func (f *llrbFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error { - collectOverlaps := func(s roachpb.Span, sg *roachpb.SpanGroup) (overlaps []*llrbFrontierEntry) { - f.tree.DoMatching(func(i interval.Interface) (done bool) { - overlap := i.(*llrbFrontierEntry) - overlaps = append(overlaps, overlap) - sg.Add(overlap.span) - return false - }, s.AsRange()) - return overlaps - } - for _, s := range spans { - span := copyRangeToSpan(s.AsRange()) - var sg roachpb.SpanGroup - sg.Add(span) - overlaps := collectOverlaps(span, &sg) - for _, o := range overlaps { - if err := f.tree.Delete(o, true /* fast */); err != nil { - return err - } - heap.Remove(&f.minHeap, o.index) - } + for _, toAdd := range spans { + toAdd = copyRangeToSpan(toAdd.AsRange()) - if err := sg.ForEach(func(span roachpb.Span) error { + // Add toAdd sub-spans that do not overlap this frontier. To ensure that + // adjacent spans are merged, sub-spans are added in two steps: first, + // non-overlapping spans are added with 0 timestamp; then the timestamp for + // the entire toAdd span is forwarded. + for _, span := range spanDifference(toAdd, f) { e := &llrbFrontierEntry{ id: f.idAlloc, keys: span.AsRange(), span: span, - ts: startAt, + ts: hlc.Timestamp{}, } f.idAlloc++ - if err := f.tree.Insert(e, true /* fast */); err != nil { + if err := f.tree.Insert(e, false /* fast */); err != nil { return err } heap.Push(&f.minHeap, e) - return nil - }); err != nil { + } + if err := f.insert(toAdd, startAt); err != nil { return err } }