Skip to content

Commit

Permalink
Merge #99373
Browse files Browse the repository at this point in the history
99373: backupccl: ensure makeSimpleImportSpans handles span frontier checkpointing r=rhu713 a=msbutler

If bulkio.restore.use_simple_import_spans is true on 23.1, we previously only used the high water mark to checkpoint things. But the high water mark checkpointing is version gated not feature gated. This leads to a real bad bug: on 23.1, if bulkio.restore.use_simple_import_spans is set to true, a restore cannot read the persisted span frontier, implying it would have to redo all work again. This patch ensures that makeSimpleImportSpans can read from the span frontier.

Additional testing for this will come in #99304

Informs #98779

Release note: None

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Mar 24, 2023
2 parents b7c1874 + 78ec0d1 commit 319c5ce
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 114 deletions.
195 changes: 83 additions & 112 deletions pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ func makeSimpleImportSpans(
backups []backuppb.BackupManifest,
layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
backupLocalityMap map[int]storeByLocalityKV,
introducedSpanFrontier *spanUtils.Frontier,
lowWaterMark roachpb.Key,
targetSize int64,
filter spanCoveringFilter,
) ([]execinfrapb.RestoreSpanEntry, error) {
if len(backups) < 1 {
return nil, nil
Expand All @@ -147,126 +145,100 @@ func makeSimpleImportSpans(
}
var cover []execinfrapb.RestoreSpanEntry

for _, span := range requiredSpans {
if span.EndKey.Compare(lowWaterMark) < 0 {
continue
}
if span.Key.Compare(lowWaterMark) < 0 {
span.Key = lowWaterMark
}

spanCoverStart := len(cover)
for layer := range backups {

var coveredLater bool
introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span,
ts hlc.Timestamp) (done spanUtils.OpResult) {
if backups[layer].EndTime.Less(ts) {
coveredLater = true
for _, requiredSpan := range requiredSpans {
filteredSpans := filter.filterCompleted(requiredSpan)
for _, span := range filteredSpans {
layersCoveredLater := filter.getLayersCoveredLater(span, backups)
spanCoverStart := len(cover)
for layer := range backups {
if layersCoveredLater[layer] {
continue
}
return spanUtils.StopMatch
})
if coveredLater {
// Don't use this backup to cover this span if the span was reintroduced
// after the backup's endTime. In this case, this backup may have
// invalid data, and further, a subsequent backup will contain all of
// this span's data. Consider the following example:
//
// T0: Begin IMPORT INTO on existing table foo, ingest some data
// T1: Backup foo
// T2: Rollback IMPORT via clearRange
// T3: Incremental backup of foo, with a full reintroduction of foo’s span
// T4: RESTORE foo: should only restore foo from the incremental backup.
// If data from the full backup were also restored,
// the imported-but-then-clearRanged data will leak in the restored cluster.
// This logic seeks to avoid this form of data corruption.
continue
}

// If the manifest for this backup layer is a `BACKUP_METADATA` then
// we reach out to ExternalStorage to read the accompanying SST that
// contains the BackupManifest_Files.
iterFactory := layerToBackupManifestFileIterFactory[layer]
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return nil, err
}
defer it.Close()

covPos := spanCoverStart

// lastCovSpanSize is the size of files added to the right-most span of
// the cover so far.
var lastCovSpanSize int64
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
// If the manifest for this backup layer is a `BACKUP_METADATA` then
// we reach out to ExternalStorage to read the accompanying SST that
// contains the BackupManifest_Files.
iterFactory := layerToBackupManifestFileIterFactory[layer]
it, err := iterFactory.NewFileIter(ctx)
if err != nil {
return nil, err
} else if !ok {
break
}
f := it.Value()
if sp := span.Intersect(f.Span); sp.Valid() {
fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir}
if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok {
fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir}
defer it.Close()

covPos := spanCoverStart

// lastCovSpanSize is the size of files added to the right-most span of
// the cover so far.
var lastCovSpanSize int64
for ; ; it.Next() {
if ok, err := it.Valid(); err != nil {
return nil, err
} else if !ok {
break
}
f := it.Value()
if sp := span.Intersect(f.Span); sp.Valid() {
fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir}
if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok {
fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir}
}

// Lookup the size of the file being added; if the backup didn't
// record a file size, just assume it is 16mb for estimating.
sz := f.EntryCounts.DataSize
if sz == 0 {
sz = 16 << 20
}
// Lookup the size of the file being added; if the backup didn't
// record a file size, just assume it is 16mb for estimating.
sz := f.EntryCounts.DataSize
if sz == 0 {
sz = 16 << 20
}

if len(cover) == spanCoverStart {
cover = append(cover, makeEntry(span.Key, sp.EndKey, fileSpec))
lastCovSpanSize = sz
} else {
// If this file extends beyond the end of the last partition of the
// cover, either append a new partition for the uncovered span or
// grow the last one if size allows.
if covEnd := cover[len(cover)-1].Span.EndKey; sp.EndKey.Compare(covEnd) > 0 {
// If adding the item size to the current rightmost span size will
// exceed the target size, make a new span, otherwise extend the
// rightmost span to include the item.
if lastCovSpanSize+sz > targetSize {
cover = append(cover, makeEntry(covEnd, sp.EndKey, fileSpec))
lastCovSpanSize = sz
} else {
cover[len(cover)-1].Span.EndKey = sp.EndKey
cover[len(cover)-1].Files = append(cover[len(cover)-1].Files, fileSpec)
lastCovSpanSize += sz
if len(cover) == spanCoverStart {
cover = append(cover, makeEntry(span.Key, sp.EndKey, fileSpec))
lastCovSpanSize = sz
} else {
// If this file extends beyond the end of the last partition of the
// cover, either append a new partition for the uncovered span or
// grow the last one if size allows.
if covEnd := cover[len(cover)-1].Span.EndKey; sp.EndKey.Compare(covEnd) > 0 {
// If adding the item size to the current rightmost span size will
// exceed the target size, make a new span, otherwise extend the
// rightmost span to include the item.
if lastCovSpanSize+sz > filter.targetSize {
cover = append(cover, makeEntry(covEnd, sp.EndKey, fileSpec))
lastCovSpanSize = sz
} else {
cover[len(cover)-1].Span.EndKey = sp.EndKey
cover[len(cover)-1].Files = append(cover[len(cover)-1].Files, fileSpec)
lastCovSpanSize += sz
}
}
}
// Now ensure the file is included in any partition in the existing
// cover which overlaps.
for i := covPos; i < len(cover) && cover[i].Span.Key.Compare(sp.EndKey) < 0; i++ {
// If file overlaps, it needs to be in this partition.
if cover[i].Span.Overlaps(sp) {
// If this is the last partition, we might have added it above.
if i == len(cover)-1 {
if last := len(cover[i].Files) - 1; last < 0 || cover[i].Files[last] != fileSpec {
// Now ensure the file is included in any partition in the existing
// cover which overlaps.
for i := covPos; i < len(cover) && cover[i].Span.Key.Compare(sp.EndKey) < 0; i++ {
// If file overlaps, it needs to be in this partition.
if cover[i].Span.Overlaps(sp) {
// If this is the last partition, we might have added it above.
if i == len(cover)-1 {
if last := len(cover[i].Files) - 1; last < 0 || cover[i].Files[last] != fileSpec {
cover[i].Files = append(cover[i].Files, fileSpec)
lastCovSpanSize += sz
}
} else {
// If it isn't the last partition, we always need to add it.
cover[i].Files = append(cover[i].Files, fileSpec)
lastCovSpanSize += sz
}
} else {
// If it isn't the last partition, we always need to add it.
cover[i].Files = append(cover[i].Files, fileSpec)
}
}
// If partition i of the cover ends before this file starts, we
// know it also ends before any remaining files start too, as the
// files are sorted above by start key, so remaining files can
// start their search after this partition.
if cover[i].Span.EndKey.Compare(sp.Key) <= 0 {
covPos = i + 1
// If partition i of the cover ends before this file starts, we
// know it also ends before any remaining files start too, as the
// files are sorted above by start key, so remaining files can
// start their search after this partition.
if cover[i].Span.EndKey.Compare(sp.Key) <= 0 {
covPos = i + 1
}
}
}
} else if span.EndKey.Compare(f.Span.Key) <= 0 {
// If this file starts after the needed span ends, then all the files
// remaining do too so we're done checking files for this span.
break
}
} else if span.EndKey.Compare(f.Span.Key) <= 0 {
// If this file starts after the needed span ends, then all the files
// remaining do too so we're done checking files for this span.
break
}
}
}
Expand Down Expand Up @@ -451,8 +423,7 @@ func generateAndSendImportSpans(

if useSimpleImportSpans {
importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups,
layerToBackupManifestFileIterFactory, backupLocalityMap, filter.introducedSpanFrontier,
filter.highWaterMark, filter.targetSize)
layerToBackupManifestFileIterFactory, backupLocalityMap, filter)
if err != nil {
return err
}
Expand Down
71 changes: 69 additions & 2 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,23 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(cover))
coverSimple, err := makeImportSpans(
ctx,
spans,
backups,
layerToIterFactory,
noSpanTargetSize,
emptySpanFrontier,
emptyCompletedSpans,
true)
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "c"), Files: c.paths("1", "4", "6")},
{Span: c.sp("c", "e"), Files: c.paths("2", "4", "6")},
{Span: c.sp("e", "f"), Files: c.paths("6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverSimple))
})

t.Run("target-size", func(t *testing.T) {
Expand All @@ -433,6 +450,22 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverSized))

coverSizedSimple, err := makeImportSpans(
ctx,
spans,
backups,
layerToIterFactory,
2<<20,
emptySpanFrontier,
emptyCompletedSpans,
true)
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "f"), Files: c.paths("1", "2", "4", "6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverSizedSimple))
})

t.Run("introduced-spans", func(t *testing.T) {
Expand All @@ -456,6 +489,22 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverIntroduced))

coverIntroducedSimple, err := makeImportSpans(
ctx,
spans,
backups,
layerToIterFactory,
noSpanTargetSize,
introducedSpanFrontier,
emptyCompletedSpans,
true)
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "f"), Files: c.paths("6")},
{Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverIntroducedSimple))
})
t.Run("completed-spans", func(t *testing.T) {

Expand All @@ -468,7 +517,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {

frontier, err := spanUtils.MakeFrontierAt(completedSpanTime, completedSpans...)
require.NoError(t, err)
coverIntroduced, err := makeImportSpans(
coverCompleted, err := makeImportSpans(
ctx,
spans,
backups,
Expand All @@ -483,7 +532,25 @@ func TestRestoreEntryCoverExample(t *testing.T) {
{Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")},
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverIntroduced))
}), reduce(coverCompleted))

coverCompletedSimple, err := makeImportSpans(
ctx,
spans,
backups,
layerToIterFactory,
noSpanTargetSize,
emptySpanFrontier,
persistFrontier(frontier, 0),
true)
require.NoError(t, err)
require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{
{Span: c.sp("a", "b"), Files: c.paths("1", "6")},
{Span: c.sp("c", "e"), Files: c.paths("2", "4", "6")},
{Span: c.sp("e", "f"), Files: c.paths("6")},
{Span: c.sp("f", "g"), Files: c.paths("6")},
{Span: c.sp("l", "m"), Files: c.paths("9")},
}), reduce(coverCompletedSimple))
})
}

Expand Down

0 comments on commit 319c5ce

Please sign in to comment.