Skip to content

Commit

Permalink
span: Re-initialize iterator when forwarding
Browse files Browse the repository at this point in the history
Re-initialize iterator when forwarding span
frontier timestamp.  The underlying btree may be
mutated (by merge operation) invalidating previously
constructed iterator.

Fixes #115411

Release notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Dec 3, 2023
1 parent 45fbd10 commit bce833d
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 66 deletions.
106 changes: 83 additions & 23 deletions pkg/util/span/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Frontier interface {

// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
// The fn may not mutate this frontier while iterating.
Entries(fn Operation)

// SpanEntries invokes op for each sub-span of the specified span with the
Expand All @@ -77,6 +78,7 @@ type Frontier interface {
//
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
// The fn may not mutate this frontier while iterating.
SpanEntries(span roachpb.Span, op Operation)

// Len returns the number of spans tracked by the frontier.
Expand Down Expand Up @@ -161,6 +163,11 @@ type btreeFrontier struct {
idAlloc uint64

mergeAlloc []*btreeFrontierEntry // Amortize allocations.

// disallowMutationWhileIterating is set when iterating
// over frontier entries. Attempts to mutate this frontier
// will panic under the test or return an error.
disallowMutationWhileIterating bool
}

// btreeFrontierEntry represents a timestamped span. It is used as the nodes in both
Expand Down Expand Up @@ -199,6 +206,10 @@ type btreeFrontierEntry struct {
// the underlying keys. If the caller has to modify underlying key slices, they
// must pass in the copy.
func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) (retErr error) {
if err := f.checkDisallowedMutation(); err != nil {
return err
}

if expensiveChecksEnabled() {
defer func() {
if err := f.checkUnsafeKeyModification(); err != nil {
Expand All @@ -207,33 +218,31 @@ func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span)
}()
}

collectOverlaps := func(s roachpb.Span) (overlaps []*btreeFrontierEntry) {
key := newSearchKey(s.Key, s.EndKey)
defer putFrontierEntry(key)

it := f.tree.MakeIter()
for it.FirstOverlap(key); it.Valid(); it.NextOverlap(key) {
overlaps = append(overlaps, it.Cur())
}
return overlaps
}

for _, s := range spans {
for _, toAdd := range spans {
// Validate caller provided span.
if err := checkSpan(s); err != nil {
if err := checkSpan(toAdd); err != nil {
return err
}

var sg roachpb.SpanGroup
sg.Add(s)
for _, o := range collectOverlaps(s) {
if err := f.deleteEntry(o); err != nil {
if err := collectOverlaps(toAdd, startAt, f, &sg); err != nil {
return err
}

if err := sg.ForEach(func(s roachpb.Span) error {
// Add span to this frontier. This is done in 2 steps: first, span
// is added with 0 timestamp; then the timestamp is forwarded.
// This is done to ensure that adjacent spans are merged.
e := newFrontierEntry(&f.idAlloc, s.Key, s.EndKey, hlc.Timestamp{})
if err := f.setEntry(e); err != nil {
putFrontierEntry(e)
return err
}
}
if err := sg.ForEach(func(span roachpb.Span) error {
e := newFrontierEntry(&f.idAlloc, span.Key, span.EndKey, startAt)
return f.setEntry(e)
if err := f.forward(s, startAt); err != nil {
putFrontierEntry(e)
return err
}
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -283,6 +292,10 @@ func (f *btreeFrontier) PeekFrontierSpan() roachpb.Span {
func (f *btreeFrontier) Forward(
span roachpb.Span, ts hlc.Timestamp,
) (forwarded bool, retErr error) {
if err := f.checkDisallowedMutation(); err != nil {
return false, err
}

// Validate caller provided span.
if err := checkSpan(span); err != nil {
return false, err
Expand Down Expand Up @@ -455,7 +468,6 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
return f.mergeEntries(e)
}

it := f.tree.MakeIter()
for !todoEntry.isEmptyRange() { // Keep going as long as there is work to be done.
if expensiveChecksEnabled() {
if err := checkSpan(todoEntry.span()); err != nil {
Expand All @@ -464,6 +476,7 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
}

// Seek to the first entry overlapping todoEntry.
it := f.tree.MakeIter()
it.FirstOverlap(todoEntry)
if !it.Valid() {
break
Expand Down Expand Up @@ -501,9 +514,9 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
// At this point, we know that overlap timestamp is not ahead of the
// insertTS (otherwise we'd hit fast case above).
// We need to split overlap range into multiple parts.
// 1. Possibly isEmptyRange part before todoEntry.Start
// 1. Possibly empty part before todoEntry.Start
// 2. Middle part (with updated timestamp),
// 3. Possibly isEmptyRange part after todoEntry end.
// 3. Possibly empty part after todoEntry end.
if overlap.Start.Compare(todoEntry.Start) < 0 {
// Split overlap into 2 entries
// [overlap.Start, todoEntry.Start) and [todoEntry.Start, overlap.End)
Expand Down Expand Up @@ -549,9 +562,18 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
return nil
}

func (f *btreeFrontier) disallowMutations() func() {
f.disallowMutationWhileIterating = true
return func() {
f.disallowMutationWhileIterating = false
}
}

// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
func (f *btreeFrontier) Entries(fn Operation) {
defer f.disallowMutations()()

it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
if fn(it.Cur().span(), it.Cur().ts) == StopMatch {
Expand Down Expand Up @@ -579,6 +601,8 @@ func (f *btreeFrontier) Entries(fn Operation) {
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
func (f *btreeFrontier) SpanEntries(span roachpb.Span, op Operation) {
defer f.disallowMutations()()

todoRange := newSearchKey(span.Key, span.EndKey)
defer putFrontierEntry(todoRange)

Expand Down Expand Up @@ -776,12 +800,48 @@ func (f *btreeFrontier) checkUnsafeKeyModification() error {
return nil
}

func (f *btreeFrontier) checkDisallowedMutation() error {
if f.disallowMutationWhileIterating {
err := errors.AssertionFailedWithDepthf(1, "attempt to mutate frontier while iterating")
if buildutil.CrdbTestBuild {
panic(err)
}
return err
}
return nil
}

var disableSanityChecksForBenchmark bool

func expensiveChecksEnabled() bool {
return buildutil.CrdbTestBuild && !disableSanityChecksForBenchmark
}

// collectOverlaps collects all spans overlapping s, and including s, into span
// group. Any overlapping spans with timestamp less than startAt are forwarded.
func collectOverlaps(
s roachpb.Span, startAt hlc.Timestamp, f Frontier, sg *roachpb.SpanGroup,
) error {
sg.Add(s)

var forwardOverlaps []roachpb.Span
f.SpanEntries(s, func(overlap roachpb.Span, ts hlc.Timestamp) (done OpResult) {
if ts.Less(startAt) {
forwardOverlaps = append(forwardOverlaps, overlap)
}
sg.Sub(overlap)
return false
})

// Any overlap with lower timestamp needs to be forwarded to startAt.
for _, s := range forwardOverlaps {
if _, err := f.Forward(s, startAt); err != nil {
return err
}
}
return nil
}

type concurrentFrontier struct {
syncutil.Mutex
f Frontier
Expand Down
95 changes: 73 additions & 22 deletions pkg/util/span/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ func TestSequentialSpans(t *testing.T) {
})
}

func makeSingleCharSpan(start, end byte) roachpb.Span {
return roachpb.Span{Key: roachpb.Key{start}, EndKey: roachpb.Key{end}}
}

func makeSpan(start, end string) roachpb.Span {
return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)}
}
Expand Down Expand Up @@ -352,50 +348,50 @@ func TestSpanEntries(t *testing.T) {
defer enableBtreeFrontier(useBtreeFrontier)()

t.Run("contiguous frontier", func(t *testing.T) {
spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")
f, err := MakeFrontier(spAZ)
require.NoError(t, err)
// Nothing overlaps span fully to the left of frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('0', '9')))
require.Equal(t, ``, spanEntries(f, makeSpan("0", "9")))
// Nothing overlaps span fully to the right of the frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('a', 'z')))
require.Equal(t, ``, spanEntries(f, makeSpan("a", "z")))

// Span overlaps entire frontier.
require.Equal(t, `{A-Z}@0`, spanEntries(f, spAZ))
advance(f, spAZ, 1)
require.Equal(t, `{A-Z}@1`, spanEntries(f, spAZ))

// Span overlaps part of the frontier, with left part outside frontier.
require.Equal(t, `{A-C}@1`, spanEntries(f, makeSingleCharSpan('0', 'C')))
require.Equal(t, `{A-C}@1`, spanEntries(f, makeSpan("0", "C")))

// Span overlaps part of the frontier, with right part outside frontier.
require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSingleCharSpan('Q', 'c')))
require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSpan("Q", "c")))

// Span fully inside frontier.
require.Equal(t, `{P-W}@1`, spanEntries(f, makeSingleCharSpan('P', 'W')))
require.Equal(t, `{P-W}@1`, spanEntries(f, makeSpan("P", "W")))

// Advance part of the frontier.
advance(f, makeSingleCharSpan('C', 'E'), 2)
advance(f, makeSingleCharSpan('H', 'M'), 5)
advance(f, makeSingleCharSpan('N', 'Q'), 3)
advance(f, makeSpan("C", "E"), 2)
advance(f, makeSpan("H", "M"), 5)
advance(f, makeSpan("N", "Q"), 3)

// Span overlaps various parts of the frontier.
require.Equal(t,
`{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`,
spanEntries(f, makeSingleCharSpan('3', 'P')))
spanEntries(f, makeSpan("3", "P")))
})

t.Run("disjoint frontier", func(t *testing.T) {
spAB := makeSingleCharSpan('A', 'B')
spCE := makeSingleCharSpan('C', 'E')
spAB := makeSpan("A", "B")
spCE := makeSpan("C", "E")
f, err := MakeFrontier(spAB, spCE)
require.NoError(t, err)

// Nothing overlaps between the two spans in the frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('B', 'C')))
require.Equal(t, ``, spanEntries(f, makeSpan("B", "C")))

// Overlap with only one entry in the frontier
require.Equal(t, `{C-D}@0`, spanEntries(f, makeSingleCharSpan('B', 'D')))
require.Equal(t, `{C-D}@0`, spanEntries(f, makeSpan("B", "D")))
})
})
}
Expand Down Expand Up @@ -493,7 +489,7 @@ func advanceFrontier(t *testing.T, f Frontier, s roachpb.Span, wall int64) {
func TestForwardInvertedSpan(t *testing.T) {
defer leaktest.AfterTest(t)()

spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")
testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()

Expand Down Expand Up @@ -521,7 +517,7 @@ func TestForwardInvertedSpan(t *testing.T) {

func TestForwardToSameTimestamp(t *testing.T) {
defer enableBtreeFrontier(true)() // LLRB frontier fails this test
spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")

f, err := MakeFrontier(spAZ)
require.NoError(t, err)
Expand All @@ -547,11 +543,66 @@ func TestAddOverlappingSpans(t *testing.T) {
for r := 'A'; r < 'Z'; r++ {
require.NoError(t, f.AddSpansAt(ts(int64(r-'A'+1)), makeSpan(string(r), string(r+'a'-'A'))))
}
require.NoError(t, f.AddSpansAt(ts(42), makeSpan("a", "z")))
require.NoError(t, f.AddSpansAt(ts(42), makeSpan("A", "z")))
require.Equal(t, hlc.Timestamp{WallTime: 42}, f.Frontier(), "f=%s", f)
})
}

func TestBtreeFrontierMergesSpansDuringInitialization(t *testing.T) {
ts := func(wall int64) hlc.Timestamp {
return hlc.Timestamp{WallTime: wall}
}

testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()

f, err := MakeFrontier()
require.NoError(t, err)

require.NoError(t, f.AddSpansAt(ts(8), makeSpan("A", "C")))
require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "D")))
require.NoError(t, f.AddSpansAt(ts(9), makeSpan("C", "Z")))
start, end, err := checkContiguousFrontier(f)
require.NoError(t, err)
require.Equal(t, []byte{'A'}, start, f)
require.Equal(t, []byte{'Z'}, end, f)
require.Equal(t, "{A-B}@8 {B-D}@10 {D-Z}@9", entriesStr(f))
})
}

// Regression for #115411
func TestForwardDeepNestedFrontierEntry(t *testing.T) {
defer leaktest.AfterTest(t)()

ts := func(wall int) hlc.Timestamp {
return hlc.Timestamp{WallTime: int64(wall)}
}

testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()
f, err := MakeFrontier()
require.NoError(t, err)

require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "C")))

// Add a bunch of ranges inside [B-C) range.
// We want to add more than 32 of such ranges to make sure that
// the underlying b-tree node (if using btree frontier) gets some "children"
// nodes created.
bStart := "B"
for i := 0; i < 64; i++ {
bEnd := "B" + strings.Repeat("b", i+1)
require.NoError(t, f.AddSpansAt(ts(i+10), makeSpan(bStart, bEnd)))
_, _, err := checkContiguousFrontier(f)
require.NoError(t, err, f)
bStart = bEnd
}

advanceFrontier(t, f, makeSpan("A", "Z"), 100)
require.Equal(t, "{B-C}@100", entriesStr(f))
})
}

// symbols that can make up spans.
var spanSymbols = []byte("@$0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

Expand Down Expand Up @@ -649,7 +700,7 @@ func fuzzFrontier(f *testing.F) {
for i := 0; i < corpusSize; i++ {
s := spanMaker.rndSpan()
// Add fuzz corpus. Note: timestamps added could be negative, which
// is of course is not a valid timestamp, but makes it so much fun to test.
// of course is not a valid timestamp, but makes it so much fun to test.
f.Add([]byte(s.Key), []byte(s.EndKey), rnd.Intn(corpusSize)-rnd.Intn(corpusSize))
}

Expand Down
Loading

0 comments on commit bce833d

Please sign in to comment.