Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

span: Re-initialize iterator when forwarding #115509

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {

frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)
frontier = span.MakeConcurrentFrontier(frontier)

// This error causes rangefeed to restart.
transientErrEvent := kvpb.RangeFeedEvent{
Expand Down Expand Up @@ -887,6 +888,7 @@ func TestMuxRangeFeedCanCloseStream(t *testing.T) {

frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)
frontier = span.MakeConcurrentFrontier(frontier)

expectFrontierAdvance := func() {
t.Helper()
Expand Down
89 changes: 65 additions & 24 deletions pkg/util/span/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Frontier interface {

// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
// The fn may not mutate this frontier while iterating.
Entries(fn Operation)

// SpanEntries invokes op for each sub-span of the specified span with the
Expand All @@ -77,6 +78,7 @@ type Frontier interface {
//
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
// The fn may not mutate this frontier while iterating.
SpanEntries(span roachpb.Span, op Operation)

// Len returns the number of spans tracked by the frontier.
Expand Down Expand Up @@ -161,6 +163,11 @@ type btreeFrontier struct {
idAlloc uint64

mergeAlloc []*btreeFrontierEntry // Amortize allocations.

// disallowMutationWhileIterating is set when iterating
// over frontier entries. Attempts to mutate this frontier
// will panic under the test or return an error.
disallowMutationWhileIterating bool
}

// btreeFrontierEntry represents a timestamped span. It is used as the nodes in both
Expand Down Expand Up @@ -199,6 +206,10 @@ type btreeFrontierEntry struct {
// the underlying keys. If the caller has to modify underlying key slices, they
// must pass in the copy.
func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) (retErr error) {
if err := f.checkDisallowedMutation(); err != nil {
return err
}

if expensiveChecksEnabled() {
defer func() {
if err := f.checkUnsafeKeyModification(); err != nil {
Expand All @@ -207,34 +218,24 @@ func (f *btreeFrontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span)
}()
}

collectOverlaps := func(s roachpb.Span) (overlaps []*btreeFrontierEntry) {
key := newSearchKey(s.Key, s.EndKey)
defer putFrontierEntry(key)

it := f.tree.MakeIter()
for it.FirstOverlap(key); it.Valid(); it.NextOverlap(key) {
overlaps = append(overlaps, it.Cur())
}
return overlaps
}

for _, s := range spans {
for _, toAdd := range spans {
// Validate caller provided span.
if err := checkSpan(s); err != nil {
if err := checkSpan(toAdd); err != nil {
return err
}

var sg roachpb.SpanGroup
sg.Add(s)
for _, o := range collectOverlaps(s) {
if err := f.deleteEntry(o); err != nil {
// Add toAdd sub-spans that do not overlap this frontier. To ensure that
// adjacent spans are merged, sub-spans are added in two steps: first,
// non-overlapping spans are added with 0 timestamp; then the timestamp for
// the entire toAdd span is forwarded.
for _, s := range spanDifference(toAdd, f) {
e := newFrontierEntry(&f.idAlloc, s.Key, s.EndKey, hlc.Timestamp{})
if err := f.setEntry(e); err != nil {
putFrontierEntry(e)
return err
}
}
if err := sg.ForEach(func(span roachpb.Span) error {
e := newFrontierEntry(&f.idAlloc, span.Key, span.EndKey, startAt)
return f.setEntry(e)
}); err != nil {
if err := f.forward(toAdd, startAt); err != nil {
return err
}
}
Expand Down Expand Up @@ -283,6 +284,10 @@ func (f *btreeFrontier) PeekFrontierSpan() roachpb.Span {
func (f *btreeFrontier) Forward(
span roachpb.Span, ts hlc.Timestamp,
) (forwarded bool, retErr error) {
if err := f.checkDisallowedMutation(); err != nil {
return false, err
}

// Validate caller provided span.
if err := checkSpan(span); err != nil {
return false, err
Expand Down Expand Up @@ -455,7 +460,6 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
return f.mergeEntries(e)
}

it := f.tree.MakeIter()
for !todoEntry.isEmptyRange() { // Keep going as long as there is work to be done.
if expensiveChecksEnabled() {
if err := checkSpan(todoEntry.span()); err != nil {
Expand All @@ -464,6 +468,7 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
}

// Seek to the first entry overlapping todoEntry.
it := f.tree.MakeIter()
it.FirstOverlap(todoEntry)
if !it.Valid() {
break
Expand Down Expand Up @@ -501,9 +506,9 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
// At this point, we know that overlap timestamp is not ahead of the
// insertTS (otherwise we'd hit fast case above).
// We need to split overlap range into multiple parts.
// 1. Possibly isEmptyRange part before todoEntry.Start
// 1. Possibly empty part before todoEntry.Start
// 2. Middle part (with updated timestamp),
// 3. Possibly isEmptyRange part after todoEntry end.
// 3. Possibly empty part after todoEntry end.
if overlap.Start.Compare(todoEntry.Start) < 0 {
// Split overlap into 2 entries
// [overlap.Start, todoEntry.Start) and [todoEntry.Start, overlap.End)
Expand Down Expand Up @@ -549,9 +554,18 @@ func (f *btreeFrontier) forward(span roachpb.Span, insertTS hlc.Timestamp) error
return nil
}

func (f *btreeFrontier) disallowMutations() func() {
f.disallowMutationWhileIterating = true
return func() {
f.disallowMutationWhileIterating = false
}
}

// Entries invokes the given callback with the current timestamp for each
// component span in the tracked span set.
func (f *btreeFrontier) Entries(fn Operation) {
defer f.disallowMutations()()

it := f.tree.MakeIter()
for it.First(); it.Valid(); it.Next() {
if fn(it.Cur().span(), it.Cur().ts) == StopMatch {
Expand Down Expand Up @@ -579,6 +593,8 @@ func (f *btreeFrontier) Entries(fn Operation) {
// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans
// tracked by this frontier.
func (f *btreeFrontier) SpanEntries(span roachpb.Span, op Operation) {
defer f.disallowMutations()()

todoRange := newSearchKey(span.Key, span.EndKey)
defer putFrontierEntry(todoRange)

Expand Down Expand Up @@ -776,12 +792,37 @@ func (f *btreeFrontier) checkUnsafeKeyModification() error {
return nil
}

func (f *btreeFrontier) checkDisallowedMutation() error {
if f.disallowMutationWhileIterating {
err := errors.AssertionFailedWithDepthf(1, "attempt to mutate frontier while iterating")
if buildutil.CrdbTestBuild {
panic(err)
}
return err
}
return nil
}

var disableSanityChecksForBenchmark bool

func expensiveChecksEnabled() bool {
return buildutil.CrdbTestBuild && !disableSanityChecksForBenchmark
}

// spanDifference subtracts frontier (spans) from this span, and
// returns set difference.
func spanDifference(s roachpb.Span, f Frontier) []roachpb.Span {
var sg roachpb.SpanGroup
sg.Add(s)

f.SpanEntries(s, func(overlap roachpb.Span, ts hlc.Timestamp) (done OpResult) {
sg.Sub(overlap)
return false
})

return sg.Slice()
}

type concurrentFrontier struct {
syncutil.Mutex
f Frontier
Expand Down
95 changes: 73 additions & 22 deletions pkg/util/span/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ func TestSequentialSpans(t *testing.T) {
})
}

func makeSingleCharSpan(start, end byte) roachpb.Span {
return roachpb.Span{Key: roachpb.Key{start}, EndKey: roachpb.Key{end}}
}

func makeSpan(start, end string) roachpb.Span {
return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)}
}
Expand Down Expand Up @@ -352,50 +348,50 @@ func TestSpanEntries(t *testing.T) {
defer enableBtreeFrontier(useBtreeFrontier)()

t.Run("contiguous frontier", func(t *testing.T) {
spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")
f, err := MakeFrontier(spAZ)
require.NoError(t, err)
// Nothing overlaps span fully to the left of frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('0', '9')))
require.Equal(t, ``, spanEntries(f, makeSpan("0", "9")))
// Nothing overlaps span fully to the right of the frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('a', 'z')))
require.Equal(t, ``, spanEntries(f, makeSpan("a", "z")))

// Span overlaps entire frontier.
require.Equal(t, `{A-Z}@0`, spanEntries(f, spAZ))
advance(f, spAZ, 1)
require.Equal(t, `{A-Z}@1`, spanEntries(f, spAZ))

// Span overlaps part of the frontier, with left part outside frontier.
require.Equal(t, `{A-C}@1`, spanEntries(f, makeSingleCharSpan('0', 'C')))
require.Equal(t, `{A-C}@1`, spanEntries(f, makeSpan("0", "C")))

// Span overlaps part of the frontier, with right part outside frontier.
require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSingleCharSpan('Q', 'c')))
require.Equal(t, `{Q-Z}@1`, spanEntries(f, makeSpan("Q", "c")))

// Span fully inside frontier.
require.Equal(t, `{P-W}@1`, spanEntries(f, makeSingleCharSpan('P', 'W')))
require.Equal(t, `{P-W}@1`, spanEntries(f, makeSpan("P", "W")))

// Advance part of the frontier.
advance(f, makeSingleCharSpan('C', 'E'), 2)
advance(f, makeSingleCharSpan('H', 'M'), 5)
advance(f, makeSingleCharSpan('N', 'Q'), 3)
advance(f, makeSpan("C", "E"), 2)
advance(f, makeSpan("H", "M"), 5)
advance(f, makeSpan("N", "Q"), 3)

// Span overlaps various parts of the frontier.
require.Equal(t,
`{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`,
spanEntries(f, makeSingleCharSpan('3', 'P')))
spanEntries(f, makeSpan("3", "P")))
})

t.Run("disjoint frontier", func(t *testing.T) {
spAB := makeSingleCharSpan('A', 'B')
spCE := makeSingleCharSpan('C', 'E')
spAB := makeSpan("A", "B")
spCE := makeSpan("C", "E")
f, err := MakeFrontier(spAB, spCE)
require.NoError(t, err)

// Nothing overlaps between the two spans in the frontier.
require.Equal(t, ``, spanEntries(f, makeSingleCharSpan('B', 'C')))
require.Equal(t, ``, spanEntries(f, makeSpan("B", "C")))

// Overlap with only one entry in the frontier
require.Equal(t, `{C-D}@0`, spanEntries(f, makeSingleCharSpan('B', 'D')))
require.Equal(t, `{C-D}@0`, spanEntries(f, makeSpan("B", "D")))
})
})
}
Expand Down Expand Up @@ -493,7 +489,7 @@ func advanceFrontier(t *testing.T, f Frontier, s roachpb.Span, wall int64) {
func TestForwardInvertedSpan(t *testing.T) {
defer leaktest.AfterTest(t)()

spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")
testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()

Expand Down Expand Up @@ -521,7 +517,7 @@ func TestForwardInvertedSpan(t *testing.T) {

func TestForwardToSameTimestamp(t *testing.T) {
defer enableBtreeFrontier(true)() // LLRB frontier fails this test
spAZ := makeSingleCharSpan('A', 'Z')
spAZ := makeSpan("A", "Z")

f, err := MakeFrontier(spAZ)
require.NoError(t, err)
Expand All @@ -547,11 +543,66 @@ func TestAddOverlappingSpans(t *testing.T) {
for r := 'A'; r < 'Z'; r++ {
require.NoError(t, f.AddSpansAt(ts(int64(r-'A'+1)), makeSpan(string(r), string(r+'a'-'A'))))
}
require.NoError(t, f.AddSpansAt(ts(42), makeSpan("a", "z")))
require.NoError(t, f.AddSpansAt(ts(42), makeSpan("A", "z")))
require.Equal(t, hlc.Timestamp{WallTime: 42}, f.Frontier(), "f=%s", f)
})
}

func TestBtreeFrontierMergesSpansDuringInitialization(t *testing.T) {
ts := func(wall int64) hlc.Timestamp {
return hlc.Timestamp{WallTime: wall}
}

testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()

f, err := MakeFrontier()
require.NoError(t, err)

require.NoError(t, f.AddSpansAt(ts(8), makeSpan("A", "C")))
require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "D")))
require.NoError(t, f.AddSpansAt(ts(9), makeSpan("C", "Z")))
start, end, err := checkContiguousFrontier(f)
require.NoError(t, err)
require.Equal(t, []byte{'A'}, start, f)
require.Equal(t, []byte{'Z'}, end, f)
require.Equal(t, "{A-B}@8 {B-D}@10 {D-Z}@9", entriesStr(f))
})
}

// Regression for #115411
func TestForwardDeepNestedFrontierEntry(t *testing.T) {
defer leaktest.AfterTest(t)()

ts := func(wall int) hlc.Timestamp {
return hlc.Timestamp{WallTime: int64(wall)}
}

testutils.RunTrueAndFalse(t, "btree", func(t *testing.T, useBtreeFrontier bool) {
defer enableBtreeFrontier(useBtreeFrontier)()
f, err := MakeFrontier()
require.NoError(t, err)

require.NoError(t, f.AddSpansAt(ts(10), makeSpan("B", "C")))

// Add a bunch of ranges inside [B-C) range.
// We want to add more than 32 of such ranges to make sure that
// the underlying b-tree node (if using btree frontier) gets some "children"
// nodes created.
bStart := "B"
for i := 0; i < 64; i++ {
bEnd := "B" + strings.Repeat("b", i+1)
require.NoError(t, f.AddSpansAt(ts(i+10), makeSpan(bStart, bEnd)))
_, _, err := checkContiguousFrontier(f)
require.NoError(t, err, f)
bStart = bEnd
}

advanceFrontier(t, f, makeSpan("A", "Z"), 100)
require.Equal(t, "{B-C}@100", entriesStr(f))
})
}

// symbols that can make up spans.
var spanSymbols = []byte("@$0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

Expand Down Expand Up @@ -649,7 +700,7 @@ func fuzzFrontier(f *testing.F) {
for i := 0; i < corpusSize; i++ {
s := spanMaker.rndSpan()
// Add fuzz corpus. Note: timestamps added could be negative, which
// is of course is not a valid timestamp, but makes it so much fun to test.
// of course is not a valid timestamp, but makes it so much fun to test.
f.Add([]byte(s.Key), []byte(s.EndKey), rnd.Intn(corpusSize)-rnd.Intn(corpusSize))
}

Expand Down
Loading