From 78ec0d155480e8d5707b4e02535dcfe4a7af775c Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 23 Mar 2023 14:34:59 -0400 Subject: [PATCH] backupccl: ensure makeSimpleImportSpans handles span frontier checkpointing If bulkio.restore.use_simple_import_spans is true on 23.1, we previously only used the high water mark to checkpoint things. But the high water mark checkpointing is version gated not feature gated. This leads to a real bad bug: on 23.1, if bulkio.restore.use_simple_import_spans is set to true, a restore cannot read the persisted span frontier, implying it would have to redo all work again. This patch ensures that makeSimpleImportSpans can read from the span frontier. Additional testing for this will come in #99304 Informs #98779 Release note: None --- pkg/ccl/backupccl/restore_span_covering.go | 195 ++++++++---------- .../backupccl/restore_span_covering_test.go | 71 ++++++- 2 files changed, 152 insertions(+), 114 deletions(-) diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 206e17f0fae3..7f9900d07592 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -134,9 +134,7 @@ func makeSimpleImportSpans( backups []backuppb.BackupManifest, layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, - introducedSpanFrontier *spanUtils.Frontier, - lowWaterMark roachpb.Key, - targetSize int64, + filter spanCoveringFilter, ) ([]execinfrapb.RestoreSpanEntry, error) { if len(backups) < 1 { return nil, nil @@ -147,126 +145,100 @@ func makeSimpleImportSpans( } var cover []execinfrapb.RestoreSpanEntry - for _, span := range requiredSpans { - if span.EndKey.Compare(lowWaterMark) < 0 { - continue - } - if span.Key.Compare(lowWaterMark) < 0 { - span.Key = lowWaterMark - } - - spanCoverStart := len(cover) - for layer := range backups { - - var coveredLater bool - introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, - ts hlc.Timestamp) (done spanUtils.OpResult) { - if backups[layer].EndTime.Less(ts) { - coveredLater = true + for _, requiredSpan := range requiredSpans { + filteredSpans := filter.filterCompleted(requiredSpan) + for _, span := range filteredSpans { + layersCoveredLater := filter.getLayersCoveredLater(span, backups) + spanCoverStart := len(cover) + for layer := range backups { + if layersCoveredLater[layer] { + continue } - return spanUtils.StopMatch - }) - if coveredLater { - // Don't use this backup to cover this span if the span was reintroduced - // after the backup's endTime. In this case, this backup may have - // invalid data, and further, a subsequent backup will contain all of - // this span's data. Consider the following example: - // - // T0: Begin IMPORT INTO on existing table foo, ingest some data - // T1: Backup foo - // T2: Rollback IMPORT via clearRange - // T3: Incremental backup of foo, with a full reintroduction of foo’s span - // T4: RESTORE foo: should only restore foo from the incremental backup. - // If data from the full backup were also restored, - // the imported-but-then-clearRanged data will leak in the restored cluster. - // This logic seeks to avoid this form of data corruption. - continue - } - - // If the manifest for this backup layer is a `BACKUP_METADATA` then - // we reach out to ExternalStorage to read the accompanying SST that - // contains the BackupManifest_Files. - iterFactory := layerToBackupManifestFileIterFactory[layer] - it, err := iterFactory.NewFileIter(ctx) - if err != nil { - return nil, err - } - defer it.Close() - - covPos := spanCoverStart - - // lastCovSpanSize is the size of files added to the right-most span of - // the cover so far. - var lastCovSpanSize int64 - for ; ; it.Next() { - if ok, err := it.Valid(); err != nil { + // If the manifest for this backup layer is a `BACKUP_METADATA` then + // we reach out to ExternalStorage to read the accompanying SST that + // contains the BackupManifest_Files. + iterFactory := layerToBackupManifestFileIterFactory[layer] + it, err := iterFactory.NewFileIter(ctx) + if err != nil { return nil, err - } else if !ok { - break } - f := it.Value() - if sp := span.Intersect(f.Span); sp.Valid() { - fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} - if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { - fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir} + defer it.Close() + + covPos := spanCoverStart + + // lastCovSpanSize is the size of files added to the right-most span of + // the cover so far. + var lastCovSpanSize int64 + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + return nil, err + } else if !ok { + break } + f := it.Value() + if sp := span.Intersect(f.Span); sp.Valid() { + fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} + if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { + fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir} + } - // Lookup the size of the file being added; if the backup didn't - // record a file size, just assume it is 16mb for estimating. - sz := f.EntryCounts.DataSize - if sz == 0 { - sz = 16 << 20 - } + // Lookup the size of the file being added; if the backup didn't + // record a file size, just assume it is 16mb for estimating. + sz := f.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } - if len(cover) == spanCoverStart { - cover = append(cover, makeEntry(span.Key, sp.EndKey, fileSpec)) - lastCovSpanSize = sz - } else { - // If this file extends beyond the end of the last partition of the - // cover, either append a new partition for the uncovered span or - // grow the last one if size allows. - if covEnd := cover[len(cover)-1].Span.EndKey; sp.EndKey.Compare(covEnd) > 0 { - // If adding the item size to the current rightmost span size will - // exceed the target size, make a new span, otherwise extend the - // rightmost span to include the item. - if lastCovSpanSize+sz > targetSize { - cover = append(cover, makeEntry(covEnd, sp.EndKey, fileSpec)) - lastCovSpanSize = sz - } else { - cover[len(cover)-1].Span.EndKey = sp.EndKey - cover[len(cover)-1].Files = append(cover[len(cover)-1].Files, fileSpec) - lastCovSpanSize += sz + if len(cover) == spanCoverStart { + cover = append(cover, makeEntry(span.Key, sp.EndKey, fileSpec)) + lastCovSpanSize = sz + } else { + // If this file extends beyond the end of the last partition of the + // cover, either append a new partition for the uncovered span or + // grow the last one if size allows. + if covEnd := cover[len(cover)-1].Span.EndKey; sp.EndKey.Compare(covEnd) > 0 { + // If adding the item size to the current rightmost span size will + // exceed the target size, make a new span, otherwise extend the + // rightmost span to include the item. + if lastCovSpanSize+sz > filter.targetSize { + cover = append(cover, makeEntry(covEnd, sp.EndKey, fileSpec)) + lastCovSpanSize = sz + } else { + cover[len(cover)-1].Span.EndKey = sp.EndKey + cover[len(cover)-1].Files = append(cover[len(cover)-1].Files, fileSpec) + lastCovSpanSize += sz + } } - } - // Now ensure the file is included in any partition in the existing - // cover which overlaps. - for i := covPos; i < len(cover) && cover[i].Span.Key.Compare(sp.EndKey) < 0; i++ { - // If file overlaps, it needs to be in this partition. - if cover[i].Span.Overlaps(sp) { - // If this is the last partition, we might have added it above. - if i == len(cover)-1 { - if last := len(cover[i].Files) - 1; last < 0 || cover[i].Files[last] != fileSpec { + // Now ensure the file is included in any partition in the existing + // cover which overlaps. + for i := covPos; i < len(cover) && cover[i].Span.Key.Compare(sp.EndKey) < 0; i++ { + // If file overlaps, it needs to be in this partition. + if cover[i].Span.Overlaps(sp) { + // If this is the last partition, we might have added it above. + if i == len(cover)-1 { + if last := len(cover[i].Files) - 1; last < 0 || cover[i].Files[last] != fileSpec { + cover[i].Files = append(cover[i].Files, fileSpec) + lastCovSpanSize += sz + } + } else { + // If it isn't the last partition, we always need to add it. cover[i].Files = append(cover[i].Files, fileSpec) - lastCovSpanSize += sz } - } else { - // If it isn't the last partition, we always need to add it. - cover[i].Files = append(cover[i].Files, fileSpec) } - } - // If partition i of the cover ends before this file starts, we - // know it also ends before any remaining files start too, as the - // files are sorted above by start key, so remaining files can - // start their search after this partition. - if cover[i].Span.EndKey.Compare(sp.Key) <= 0 { - covPos = i + 1 + // If partition i of the cover ends before this file starts, we + // know it also ends before any remaining files start too, as the + // files are sorted above by start key, so remaining files can + // start their search after this partition. + if cover[i].Span.EndKey.Compare(sp.Key) <= 0 { + covPos = i + 1 + } } } + } else if span.EndKey.Compare(f.Span.Key) <= 0 { + // If this file starts after the needed span ends, then all the files + // remaining do too so we're done checking files for this span. + break } - } else if span.EndKey.Compare(f.Span.Key) <= 0 { - // If this file starts after the needed span ends, then all the files - // remaining do too so we're done checking files for this span. - break } } } @@ -451,8 +423,7 @@ func generateAndSendImportSpans( if useSimpleImportSpans { importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, - layerToBackupManifestFileIterFactory, backupLocalityMap, filter.introducedSpanFrontier, - filter.highWaterMark, filter.targetSize) + layerToBackupManifestFileIterFactory, backupLocalityMap, filter) if err != nil { return err } diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index aa936dd316fd..35f9e67b175a 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -412,6 +412,23 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, {Span: c.sp("l", "m"), Files: c.paths("9")}, }), reduce(cover)) + coverSimple, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + emptyCompletedSpans, + true) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "e"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("e", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverSimple)) }) t.Run("target-size", func(t *testing.T) { @@ -433,6 +450,22 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, {Span: c.sp("l", "m"), Files: c.paths("9")}, }), reduce(coverSized)) + + coverSizedSimple, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + 2<<20, + emptySpanFrontier, + emptyCompletedSpans, + true) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "f"), Files: c.paths("1", "2", "4", "6")}, + {Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverSizedSimple)) }) t.Run("introduced-spans", func(t *testing.T) { @@ -456,6 +489,22 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, {Span: c.sp("l", "m"), Files: c.paths("9")}, }), reduce(coverIntroduced)) + + coverIntroducedSimple, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + introducedSpanFrontier, + emptyCompletedSpans, + true) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "i"), Files: c.paths("3", "5", "6", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverIntroducedSimple)) }) t.Run("completed-spans", func(t *testing.T) { @@ -468,7 +517,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { frontier, err := spanUtils.MakeFrontierAt(completedSpanTime, completedSpans...) require.NoError(t, err) - coverIntroduced, err := makeImportSpans( + coverCompleted, err := makeImportSpans( ctx, spans, backups, @@ -483,7 +532,25 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, {Span: c.sp("f", "g"), Files: c.paths("6")}, {Span: c.sp("l", "m"), Files: c.paths("9")}, - }), reduce(coverIntroduced)) + }), reduce(coverCompleted)) + + coverCompletedSimple, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + persistFrontier(frontier, 0), + true) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("c", "e"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("e", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverCompletedSimple)) }) }