Skip to content

Commit

Permalink
backupccl: avoid creating spans that start or end on a .Next() key
Browse files Browse the repository at this point in the history
Remove all calls to key.Next() from generateAndSendImportSpans so that all cover
spans have valid keys as start and end keys.

Fixes: cockroachdb#109483

Release note: None
  • Loading branch information
Rui Hu committed Aug 30, 2023
1 parent 8645582 commit 1a59b84
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 45 deletions.
46 changes: 21 additions & 25 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ func generateAndSendImportSpans(
return err
}

var key roachpb.Key

fileIterByLayer := make([]bulk.Iterator[*backuppb.BackupManifest_File], 0, len(backups))
for layer := range backups {
iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx)
Expand Down Expand Up @@ -506,10 +508,16 @@ func generateAndSendImportSpans(
if err != nil {
return err
}
break

if key.Compare(span.EndKey) < 0 {
key = span.EndKey
} else {
break
}
} else {
key = startEndKeyIt.value()
}

key := startEndKeyIt.value()
if span.Key.Compare(key) >= 0 {
startEndKeyIt.next()
continue
Expand Down Expand Up @@ -674,18 +682,12 @@ func (i *fileSpanStartAndEndKeyIterator) next() {
break
}

if minItem.cmpEndKey {
minItem.fileIter.Next()
if ok, err := minItem.fileIter.Valid(); err != nil {
i.err = err
return
} else if ok {
minItem.cmpEndKey = false
minItem.file = minItem.fileIter.Value()
heap.Push(i.heap, minItem)
}
} else {
minItem.cmpEndKey = true
minItem.fileIter.Next()
if ok, err := minItem.fileIter.Valid(); err != nil {
i.err = err
return
} else if ok {
minItem.file = minItem.fileIter.Value()
heap.Push(i.heap, minItem)
}
}
Expand Down Expand Up @@ -725,26 +727,20 @@ func (i *fileSpanStartAndEndKeyIterator) reset() {
}

i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{
fileIter: iter,
file: iter.Value(),
cmpEndKey: false,
fileIter: iter,
file: iter.Value(),
})
}
heap.Init(i.heap)
}

type fileHeapItem struct {
fileIter bulk.Iterator[*backuppb.BackupManifest_File]
file *backuppb.BackupManifest_File
cmpEndKey bool
fileIter bulk.Iterator[*backuppb.BackupManifest_File]
file *backuppb.BackupManifest_File
}

func (f fileHeapItem) key() roachpb.Key {
fspan := endKeyInclusiveSpan(f.file.Span)
if f.cmpEndKey {
return fspan.EndKey
}
return fspan.Key
return f.file.Span.Key
}

type fileHeap struct {
Expand Down
42 changes: 22 additions & 20 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
c := makeCoverUtils(ctx, t, &execCfg)

// Setup and test the example in the comment of makeSimpleImportSpans.
spans := []roachpb.Span{c.sp("a", "f"), c.sp("f", "i"), c.sp("l", "m")}
spans := []roachpb.Span{c.sp("a", "f"), c.sp("f", "i"), c.sp("l", "p")}

backups := c.makeManifests([]roachpb.Spans{
{c.sp("a", "c"), c.sp("c", "e"), c.sp("h", "i")},
Expand Down Expand Up @@ -427,11 +427,12 @@ func TestRestoreEntryCoverExample(t *testing.T) {
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "b"), Files: c.paths("1", "6")},
{Span: c.sp("b", "f"), Files: c.paths("2", "1", "4", "6")},
{Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")},
{Span: c.sp("c", "f"), Files: c.paths("2", "1", "4", "6")},
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("g", "h"), Files: c.paths("5", "6")},
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "p"), Files: c.paths("9")},
}), reduce(cover))
coverSimple, err := makeImportSpans(
ctx,
Expand All @@ -449,7 +450,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("c\x00", "e\x00"), Files: c.paths("2", "4", "6")},
{Span: c.sp("e\x00", "f"), Files: c.paths("6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "m\x00"), Files: c.paths("9")},
}), reduce(coverSimple))
})

Expand All @@ -467,10 +468,11 @@ func TestRestoreEntryCoverExample(t *testing.T) {
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "b"), Files: c.paths("1", "6")},
{Span: c.sp("b", "f"), Files: c.paths("2", "1", "4", "6")},
{Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")},
{Span: c.sp("c", "f"), Files: c.paths("2", "1", "4", "6")},
{Span: c.sp("f", "h"), Files: c.paths("5", "6")},
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "p"), Files: c.paths("9")},
}), reduce(coverSized))

coverSizedSimple, err := makeImportSpans(
Expand All @@ -487,7 +489,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "f"), Files: c.paths("1", "2", "4", "6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "m\x00"), Files: c.paths("9")},
}), reduce(coverSizedSimple))
})

Expand All @@ -511,7 +513,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("g", "h"), Files: c.paths("5", "6")},
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "p"), Files: c.paths("9")},
}), reduce(coverIntroduced))

coverIntroducedSimple, err := makeImportSpans(
Expand All @@ -528,7 +530,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "f"), Files: c.paths("6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "m\x00"), Files: c.paths("9")},
}), reduce(coverIntroducedSimple))
})
t.Run("completed-spans", func(t *testing.T) {
Expand Down Expand Up @@ -557,7 +559,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("a", "b"), Files: c.paths("1", "6")},
{Span: c.sp("c", "f"), Files: c.paths("2", "1", "4", "6")},
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "p"), Files: c.paths("9")},
}), reduce(coverCompleted))

coverCompletedSimple, err := makeImportSpans(
Expand All @@ -577,7 +579,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("c\x00", "e\x00"), Files: c.paths("2", "4", "6")},
{Span: c.sp("e\x00", "f"), Files: c.paths("6")},
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
{Span: c.sp("l", "m\x00"), Files: c.paths("9")},
}), reduce(coverCompletedSimple))
})
}
Expand All @@ -603,53 +605,53 @@ func TestFileSpanStartKeyIterator(t *testing.T) {
manifestFiles: []roachpb.Spans{
{c.sp("a", "b"), c.sp("c", "d"), c.sp("d\x00", "e")},
},
keysSurfaced: []string{"a", "b\x00", "c", "d\x00", "e\x00"},
keysSurfaced: []string{"a", "c", "d\x00"},
},
{
// shadow start key (b) if another span covers it.
// overlapping file spans.
manifestFiles: []roachpb.Spans{
{c.sp("a", "c"), c.sp("b", "d")},
},
keysSurfaced: []string{"a", "c\x00", "d\x00"},
keysSurfaced: []string{"a", "b"},
},
{
// swap the file order and expect an error.
manifestFiles: []roachpb.Spans{
{c.sp("b", "d"), c.sp("a", "c")},
},
keysSurfaced: []string{"b", "d\x00", "a", "c\x00"},
keysSurfaced: []string{"b", "a"},
expectedError: "out of order backup keys",
},
{
// overlapping files within a level.
manifestFiles: []roachpb.Spans{
{c.sp("b", "f"), c.sp("c", "d"), c.sp("e", "g")},
},
keysSurfaced: []string{"b", "f\x00", "g\x00"},
keysSurfaced: []string{"b", "c", "e"},
},
{
// overlapping files within and across levels.
manifestFiles: []roachpb.Spans{
{c.sp("a", "e"), c.sp("d", "f")},
{c.sp("b", "c")},
},
keysSurfaced: []string{"a", "b", "c\x00", "e\x00", "f\x00"},
keysSurfaced: []string{"a", "b", "d"},
},
{
// overlapping start key in one level, but non overlapping in another level.
manifestFiles: []roachpb.Spans{
{c.sp("a", "c"), c.sp("b", "d")},
{c.sp("b", "c")},
},
keysSurfaced: []string{"a", "b", "c\x00", "d\x00"},
keysSurfaced: []string{"a", "b"},
},
{
// overlapping files in both levels.
manifestFiles: []roachpb.Spans{
{c.sp("b", "e"), c.sp("d", "i")},
{c.sp("a", "c"), c.sp("b", "h")},
},
keysSurfaced: []string{"a", "b", "c\x00", "e\x00", "h\x00", "i\x00"},
keysSurfaced: []string{"a", "b", "d"},
},
{
// ensure everything works with 3 layers.
Expand All @@ -658,7 +660,7 @@ func TestFileSpanStartKeyIterator(t *testing.T) {
{c.sp("b", "e"), c.sp("e", "f")},
{c.sp("c", "e"), c.sp("d", "f")},
},
keysSurfaced: []string{"a", "b", "c", "e\x00", "f\x00"},
keysSurfaced: []string{"a", "b", "c", "d", "e"},
},
} {
backups := c.makeManifests(sp.manifestFiles)
Expand Down

0 comments on commit 1a59b84

Please sign in to comment.