diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 57704f7cb8e6..70bad3a6efd6 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -272,6 +272,7 @@ go_test( "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", + "//pkg/util/span", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/ccl/backupccl/backup.proto b/pkg/ccl/backupccl/backup.proto index aa4ede0b2fed..31c7667c4724 100644 --- a/pkg/ccl/backupccl/backup.proto +++ b/pkg/ccl/backupccl/backup.proto @@ -80,6 +80,12 @@ message BackupManifest { // here are covered in the interval (0, startTime], which, in conjunction with // the coverage from (startTime, endTime] implied for all spans in Spans, // results in coverage from [0, endTime] for these spans. + // + // The first set of spans in this field are new spans that did not + // exist in the previous backup (a new index, for example), while the remaining + // spans are re-introduced spans, which need to be backed up again from (0, + // startTime] because a non-mvcc operation may have occurred on this span. See + // the getReintroducedSpans() for more information. repeated roachpb.Span introduced_spans = 15 [(gogoproto.nullable) = false]; repeated DescriptorRevision descriptor_changes = 16 [(gogoproto.nullable) = false]; diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 6f6137b89a5a..764a1dee92dd 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1455,7 +1455,7 @@ func getReintroducedSpans( // backup was offline at the endTime of the last backup. latestTableDescChangeInLastBackup := make(map[descpb.ID]*descpb.TableDescriptor) for _, rev := range lastBackup.DescriptorChanges { - if table, _, _, _, _ := descpb.FromDescriptor(rev.Desc); table != nil { + if table, _, _, _ := descpb.FromDescriptor(rev.Desc); table != nil { if trackedRev, ok := latestTableDescChangeInLastBackup[table.GetID()]; !ok { latestTableDescChangeInLastBackup[table.GetID()] = table } else if trackedRev.Version < table.Version { @@ -1486,6 +1486,7 @@ func getReintroducedSpans( // table may have been OFFLINE at the time of the last backup, and OFFLINE at // the time of the current backup, but may have been PUBLIC at some time in // between. + for _, rev := range revs { rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc) if rawTable == nil { @@ -1510,7 +1511,6 @@ func getReintroducedSpans( allRevs = append(allRevs, rev) } } - tableSpans, err := spansForAllTableIndexes(execCfg, tablesToReinclude, allRevs) if err != nil { return nil, err @@ -1872,8 +1872,7 @@ func getBackupDetailAndManifest( } } - var newSpans roachpb.Spans - + var newSpans, reIntroducedSpans roachpb.Spans var priorIDs map[descpb.ID]descpb.ID var revs []BackupManifest_DescriptorRevision @@ -1954,11 +1953,11 @@ func getBackupDetailAndManifest( newSpans = filterSpans(spans, prevBackups[len(prevBackups)-1].Spans) - tableSpans, err := getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime) + reIntroducedSpans, err = getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime) if err != nil { return jobspb.BackupDetails{}, BackupManifest{}, err } - newSpans = append(newSpans, tableSpans...) + newSpans = append(newSpans, reIntroducedSpans...) } // if CompleteDbs is lost by a 1.x node, FormatDescriptorTrackingVersion diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index baea944266dc..042668378081 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -10,10 +10,12 @@ package backupccl import ( "context" - fmt "fmt" + "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" ) func BenchmarkCoverageChecks(b *testing.B) { @@ -44,7 +46,6 @@ func BenchmarkCoverageChecks(b *testing.B) { func BenchmarkRestoreEntryCover(b *testing.B) { r, _ := randutil.NewTestRand() - for _, numBackups := range []int{1, 2, 24, 24 * 4} { b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) { for _, baseFiles := range []int{0, 100, 10000} { @@ -58,7 +59,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) { if err := checkCoverage(ctx, backups[numBackups-1].Spans, backups); err != nil { b.Fatal(err) } - cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil) + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(b, err) + + cov := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, introducedSpanFrontier, nil) b.ReportMetric(float64(len(cov)), "coverSize") } }) diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index b81e852b67f8..3cae85ade631 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -533,6 +533,30 @@ func TestDataDriven(t *testing.T) { require.NoError(t, err) return "" + case "import": + server := lastCreatedServer + user := "root" + jobType := "IMPORT" + + // First, run the backup. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + case "restore": server := lastCreatedServer user := "root" diff --git a/pkg/ccl/backupccl/restoration_data.go b/pkg/ccl/backupccl/restoration_data.go index b91a775ac379..841b7cac24bb 100644 --- a/pkg/ccl/backupccl/restoration_data.go +++ b/pkg/ccl/backupccl/restoration_data.go @@ -63,10 +63,13 @@ func (*mainRestorationData) isMainBundle() bool { return true } type restorationDataBase struct { // spans is the spans included in this bundle. spans []roachpb.Span + // rekeys maps old table IDs to their new table descriptor. tableRekeys []execinfrapb.TableRekey + // tenantRekeys maps tenants being restored to their new ID. tenantRekeys []execinfrapb.TenantRekey + // pkIDs stores the ID of the primary keys for all of the tables that we're // restoring for RowCount calculation. pkIDs map[uint64]bool diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 01c348078e37..080f0a185724 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -251,6 +251,11 @@ func restore( return emptyRowCount, errors.Wrap(err, "resolving locality locations") } + introducedSpanFrontier, err := createIntroducedSpanFrontier(backupManifests, endTime) + if err != nil { + return emptyRowCount, err + } + if err := checkCoverage(restoreCtx, dataToRestore.getSpans(), backupManifests); err != nil { return emptyRowCount, err } @@ -260,7 +265,7 @@ func restore( highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap, - highWaterMark) + introducedSpanFrontier, highWaterMark) if len(importSpans) == 0 { // There are no files to restore. diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index e8f00ddfdf5e..bddab942398c 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -13,7 +13,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" ) type intervalSpan roachpb.Span @@ -34,25 +36,29 @@ func (ie intervalSpan) Range() interval.Range { // based on the lowWaterMark before the covering for them is generated. Consider // a chain of backups with files f1, f2… which cover spans as follows: // -// backup -// 0| a___1___c c__2__e h__3__i -// 1| b___4___d g____5___i -// 2| a___________6______________h j_7_k -// 3| h_8_i l_9_m -// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// backup +// 0| a___1___c c__2__e h__3__i +// 1| b___4___d g____5___i +// 2| a___________6______________h j_7_k +// 3| h_8_i l_9_m +// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// // spans: |-------span1-------||---span2---| |---span3---| // // The cover for those spans would look like: -// [a, c): 1, 4, 6 -// [c, e): 2, 4, 6 -// [e, f): 6 -// [f, i): 3, 5, 6, 8 -// [l, m): 9 +// +// [a, c): 1, 4, 6 +// [c, e): 2, 4, 6 +// [e, f): 6 +// [f, i): 3, 5, 6, 8 +// [l, m): 9 +// // This example is tested in TestRestoreEntryCoverExample. func makeSimpleImportSpans( requiredSpans []roachpb.Span, backups []BackupManifest, backupLocalityMap map[int]storeByLocalityKV, + introducedSpanFrontier *spanUtils.Frontier, lowWaterMark roachpb.Key, ) []execinfrapb.RestoreSpanEntry { if len(backups) < 1 { @@ -62,8 +68,8 @@ func makeSimpleImportSpans( for i := range backups { sort.Sort(BackupFileDescriptors(backups[i].Files)) } - var cover []execinfrapb.RestoreSpanEntry + for _, span := range requiredSpans { if span.EndKey.Compare(lowWaterMark) < 0 { continue @@ -73,8 +79,32 @@ func makeSimpleImportSpans( } spanCoverStart := len(cover) - for layer := range backups { + + var coveredLater bool + introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if backups[layer].EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + }) + if coveredLater { + // Don't use this backup to cover this span if the span was reintroduced + // after the backup's endTime. In this case, this backup may have + // invalid data, and further, a subsequent backup will contain all of + // this span's data. Consider the following example: + // + // T0: Begin IMPORT INTO on existing table foo, ingest some data + // T1: Backup foo + // T2: Rollback IMPORT via clearRange + // T3: Incremental backup of foo, with a full reintroduction of foo’s span + // T4: RESTORE foo: should only restore foo from the incremental backup. + // If data from the full backup were also restored, + // the imported-but-then-clearRanged data will leak in the restored cluster. + // This logic seeks to avoid this form of data corruption. + continue + } covPos := spanCoverStart // TODO(dt): binary search to the first file in required span? for _, f := range backups[layer].Files { @@ -117,6 +147,31 @@ func makeSimpleImportSpans( return cover } +// createIntroducedSpanFrontier creates a span frontier that tracks the end time +// of the latest incremental backup of each introduced span in the backup chain. +// See ReintroducedSpans( ) for more information. Note: this function assumes +// that manifests are sorted in increasing EndTime. +func createIntroducedSpanFrontier( + manifests []BackupManifest, asOf hlc.Timestamp, +) (*spanUtils.Frontier, error) { + introducedSpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{}) + if err != nil { + return nil, err + } + for i, m := range manifests { + if i == 0 { + continue + } + if !asOf.IsEmpty() && asOf.Less(m.StartTime) { + break + } + if err := introducedSpanFrontier.AddSpansAt(m.EndTime, m.IntroducedSpans...); err != nil { + return nil, err + } + } + return introducedSpanFrontier, nil +} + func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrapb.RestoreSpanEntry { return execinfrapb.RestoreSpanEntry{ Span: roachpb.Span{Key: start, EndKey: end}, Files: []execinfrapb.RestoreFileSpec{f}, diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index 60d3cefa24ae..b6bdb86b3cd3 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -14,33 +14,64 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) // MockBackupChain returns a chain of mock backup manifests that have spans and // file spans suitable for checking coverage computations. Every 3rd inc backup -// introduces a span and drops a span. Incremental backups have half as many -// files as the base. Files spans are ordered by start key but may overlap. +// reintroduces a span. On a random backup, one random span is dropped and +// another is added. Incremental backups have half as many files as the base. +// Files spans are ordered by start key but may overlap. func MockBackupChain(length, spans, baseFiles int, r *rand.Rand) []BackupManifest { backups := make([]BackupManifest, length) ts := hlc.Timestamp{WallTime: time.Second.Nanoseconds()} + + // spanIdxToDrop represents that span that will get dropped during this mock backup chain. + spanIdxToDrop := r.Intn(spans) + + // backupWithDroppedSpan represents the first backup that will observe the dropped span. + backupWithDroppedSpan := r.Intn(len(backups)) + + genTableID := func(j int) uint32 { + return uint32(10 + j*2) + } + for i := range backups { backups[i].Spans = make(roachpb.Spans, spans) + backups[i].IntroducedSpans = make(roachpb.Spans, 0) for j := range backups[i].Spans { - backups[i].Spans[j] = makeTableSpan(uint32(100 + j + (i / 3))) + tableID := genTableID(j) + backups[i].Spans[j] = makeTableSpan(tableID) } backups[i].EndTime = ts.Add(time.Minute.Nanoseconds()*int64(i), 0) if i > 0 { backups[i].StartTime = backups[i-1].EndTime + + if i >= backupWithDroppedSpan { + // At and after the backupWithDroppedSpan, drop the span at + // span[spanIdxToDrop], present in the first i backups, and add a new + // one. + newTableID := genTableID(spanIdxToDrop) + 1 + backups[i].Spans[spanIdxToDrop] = makeTableSpan(newTableID) + backups[i].IntroducedSpans = append(backups[i].IntroducedSpans, backups[i].Spans[spanIdxToDrop]) + } + if i%3 == 0 { - backups[i].IntroducedSpans = roachpb.Spans{backups[i].Spans[spans-1]} + // Reintroduce an existing span + spanIdx := r.Intn(spans) + backups[i].IntroducedSpans = append(backups[i].IntroducedSpans, backups[i].Spans[spanIdx]) } } @@ -79,16 +110,40 @@ func MockBackupChain(length, spans, baseFiles int, r *rand.Rand) []BackupManifes // length to the number of files a file's end key was greater than any prior end // key when walking files in order by start key in the backups. This check is // thus sensitive to ordering; the coverage correctness check however is not. +// +// The function also verifies that a cover does not cross a span boundary. func checkRestoreCovering( backups []BackupManifest, spans roachpb.Spans, cov []execinfrapb.RestoreSpanEntry, ) error { var expectedPartitions int required := make(map[string]*roachpb.SpanGroup) - for _, s := range spans { + + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + if err != nil { + return err + } + + for _, span := range spans { var last roachpb.Key for _, b := range backups { + var coveredLater bool + introducedSpanFrontier.Entries(func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if span.Overlaps(s) { + if b.EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + } + return spanUtils.ContinueMatch + }) + if coveredLater { + // Skip spans that were later re-introduced. See makeSimpleImportSpans + // for explanation. + continue + } for _, f := range b.Files { - if sp := s.Intersect(f.Span); sp.Valid() { + if sp := span.Intersect(f.Span); sp.Valid() { if required[f.Path] == nil { required[f.Path] = &roachpb.SpanGroup{} } @@ -101,10 +156,21 @@ func checkRestoreCovering( } } } + var spanIdx int for _, c := range cov { for _, f := range c.Files { required[f.Path].Sub(c.Span) } + for spans[spanIdx].EndKey.Compare(c.Span.Key) < 0 { + spanIdx++ + } + // Assert that every cover is contained by a required span. + requiredSpan := spans[spanIdx] + if requiredSpan.Overlaps(c.Span) && !requiredSpan.Contains(c.Span) { + return errors.Errorf("cover with requiredSpan %v is not contained by required requiredSpan"+ + " %v", c.Span, requiredSpan) + } + } for name, uncovered := range required { for _, missing := range uncovered.Slice() { @@ -134,7 +200,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { return r } - // Setup and test the example in the comnent on makeSimpleImportSpans. + // Setup and test the example in the comment of makeSimpleImportSpans. spans := []roachpb.Span{sp("a", "f"), sp("f", "i"), sp("l", "m")} backups := []BackupManifest{ {Files: []BackupManifest_File{f("a", "c", "1"), f("c", "e", "2"), f("h", "i", "3")}}, @@ -142,7 +208,16 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Files: []BackupManifest_File{f("a", "h", "6"), f("j", "k", "7")}}, {Files: []BackupManifest_File{f("h", "i", "8"), f("l", "m", "9")}}, } - cover := makeSimpleImportSpans(spans, backups, nil, nil) + + for i := range backups { + backups[i].StartTime = hlc.Timestamp{WallTime: int64(i)} + backups[i].EndTime = hlc.Timestamp{WallTime: int64(i + 1)} + } + + emptySpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{}) + require.NoError(t, err) + + cover := makeSimpleImportSpans(spans, backups, nil, emptySpanFrontier, nil) require.Equal(t, []execinfrapb.RestoreSpanEntry{ {Span: sp("a", "c"), Files: paths("1", "4", "6")}, {Span: sp("c", "e"), Files: paths("2", "4", "6")}, @@ -150,11 +225,243 @@ func TestRestoreEntryCoverExample(t *testing.T) { {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, cover) + + // check that introduced spans are properly elided + backups[2].IntroducedSpans = []roachpb.Span{sp("a", "f")} + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + + coverIntroduced := makeSimpleImportSpans(spans, backups, nil, introducedSpanFrontier, nil) + require.Equal(t, []execinfrapb.RestoreSpanEntry{ + {Span: sp("a", "f"), Files: paths("6")}, + {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("l", "m"), Files: paths("9")}, + }, coverIntroduced) } -func TestRestoreEntryCover(t *testing.T) { +type mockBackupInfo struct { + // tableIDs identifies the tables included in the backup. + tableIDs []int + + // indexIDs defines a map from tableID to tableIndexes included in the backup. + indexIDs map[int][]int + + // reintroducedTableIDs identifies a set of TableIDs to reintroduce in the backup. + reintroducedTableIDs map[int]struct{} + + // expectedBackupSpanCount defines the number of backup spans created by spansForAllTableIndexes. + expectedBackupSpanCount int +} + +func createMockTables( + info mockBackupInfo, +) (tables []catalog.TableDescriptor, reIntroducedTables []catalog.TableDescriptor) { + tables = make([]catalog.TableDescriptor, 0) + reIntroducedTables = make([]catalog.TableDescriptor, 0) + for _, tableID := range info.tableIDs { + indexes := make([]descpb.IndexDescriptor, 0) + for _, indexID := range info.indexIDs[tableID] { + indexes = append(indexes, getMockIndexDesc(descpb.IndexID(indexID))) + } + table := getMockTableDesc(descpb.ID(tableID), indexes[0], indexes, nil, nil) + tables = append(tables, table) + if _, ok := info.reintroducedTableIDs[tableID]; ok { + reIntroducedTables = append(reIntroducedTables, table) + } + } + return tables, reIntroducedTables +} + +func createMockManifest( + t *testing.T, + execCfg *sql.ExecutorConfig, + info mockBackupInfo, + endTime hlc.Timestamp, + path string, +) BackupManifest { + tables, _ := createMockTables(info) + + spans, err := spansForAllTableIndexes(execCfg, tables, + nil /* revs */) + require.NoError(t, err) + require.Equal(t, info.expectedBackupSpanCount, len(spans)) + + files := make([]BackupManifest_File, len(spans)) + for _, sp := range spans { + files = append(files, BackupManifest_File{Span: sp, Path: path}) + } + + return BackupManifest{Spans: spans, + EndTime: endTime, Files: files} +} + +// TestRestoreEntryCoverReIntroducedSpans checks that all reintroduced spans are +// covered in RESTORE by files in the incremental backup that reintroduced the +// spans. The test also checks the invariants required during RESTORE to elide +// files from the full backup that are later reintroduced. These include: +// +// - During BackupManifest creation, spansForAllTableIndexes will merge +// adjacent indexes within a table, but not indexes across tables. +// +// - During spansForAllRestoreTableIndexes, each restored index will have its +// own span. +func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { defer leaktest.AfterTest(t)() + codec := keys.SystemSQLCodec + execCfg := &sql.ExecutorConfig{ + Codec: codec, + } + + testCases := []struct { + name string + full mockBackupInfo + inc mockBackupInfo + + // expectedRestoreSpanCount defines the number of required spans passed to + // makeSimpleImportSpans. + expectedRestoreSpanCount int + }{ + { + name: "adjacent indexes", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 2}, 2: {1}}, + expectedBackupSpanCount: 2, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 2}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 2, + }, + expectedRestoreSpanCount: 3, + }, + { + name: "non-adjacent indexes", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 3}, 2: {1}}, + expectedBackupSpanCount: 3, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 3}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 3, + }, + expectedRestoreSpanCount: 3, + }, + { + name: "dropped non-adjacent index", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 3}, 2: {1}}, + expectedBackupSpanCount: 3, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 2, + }, + expectedRestoreSpanCount: 2, + }, + { + name: "new non-adjacent index", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1}, 2: {1}}, + expectedBackupSpanCount: 2, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 3}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 3, + }, + expectedRestoreSpanCount: 3, + }, + { + name: "new adjacent index", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1}, 2: {1}}, + expectedBackupSpanCount: 2, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 2}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 2, + }, + expectedRestoreSpanCount: 3, + }, + { + name: "new in-between index", + full: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 3}, 2: {1}}, + expectedBackupSpanCount: 3, + }, + inc: mockBackupInfo{ + tableIDs: []int{1, 2}, + indexIDs: map[int][]int{1: {1, 2, 3}, 2: {1}}, + reintroducedTableIDs: map[int]struct{}{1: {}}, + expectedBackupSpanCount: 2, + }, + expectedRestoreSpanCount: 4, + }, + } + + fullBackupPath := "full" + incBackupPath := "inc" + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + backups := []BackupManifest{ + createMockManifest(t, execCfg, test.full, hlc.Timestamp{WallTime: int64(1)}, fullBackupPath), + createMockManifest(t, execCfg, test.inc, hlc.Timestamp{WallTime: int64(2)}, incBackupPath), + } + + // Create the IntroducedSpans field for incremental backup. + incTables, reIntroducedTables := createMockTables(test.inc) + + newSpans := filterSpans(backups[1].Spans, backups[0].Spans) + reIntroducedSpans, err := spansForAllTableIndexes(execCfg, reIntroducedTables, nil) + require.NoError(t, err) + backups[1].IntroducedSpans = append(newSpans, reIntroducedSpans...) + + restoreSpans := spansForAllRestoreTableIndexes(codec, incTables, nil) + require.Equal(t, test.expectedRestoreSpanCount, len(restoreSpans)) + + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + + cover := makeSimpleImportSpans(restoreSpans, backups, nil, introducedSpanFrontier, nil) + + for _, reIntroTable := range reIntroducedTables { + var coveredReIntroducedGroup roachpb.SpanGroup + for _, entry := range cover { + // If a restoreSpanEntry overlaps with re-introduced span, + // assert the entry only contains files from the incremental backup. + if reIntroTable.TableSpan(codec).Overlaps(entry.Span) { + coveredReIntroducedGroup.Add(entry.Span) + for _, files := range entry.Files { + require.Equal(t, incBackupPath, files.Path) + } + } + } + // Assert that all re-introduced indexes are included in the restore + for _, reIntroIndexSpan := range reIntroTable.AllIndexSpans(codec) { + require.Equal(t, true, coveredReIntroducedGroup.Encloses(reIntroIndexSpan)) + } + } + }) + } +} + +func TestRestoreEntryCover(t *testing.T) { + defer leaktest.AfterTest(t)() r, _ := randutil.NewTestRand() for _, numBackups := range []int{1, 2, 3, 5, 9, 10, 11, 12} { for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} { @@ -162,7 +469,10 @@ func TestRestoreEntryCover(t *testing.T) { backups := MockBackupChain(numBackups, spans, files, r) t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d", numBackups, spans, files), func(t *testing.T) { - cover := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, nil) + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + cover := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, nil, + introducedSpanFrontier, nil) if err := checkRestoreCovering(backups, backups[numBackups-1].Spans, cover); err != nil { t.Fatal(err) } diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index ec3b7f269d6f..52e1d16d2542 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -576,13 +576,18 @@ func MakeBackupTableEntry( return BackupTableEntry{}, errors.Wrapf(err, "making spans for table %s", fullyQualifiedTableName) } + introducedSpanFrontier, err := createIntroducedSpanFrontier(backupManifests, hlc.Timestamp{}) + if err != nil { + return BackupTableEntry{}, err + } + entry := makeSimpleImportSpans( []roachpb.Span{tablePrimaryIndexSpan}, backupManifests, - nil, /*backupLocalityInfo*/ + nil, /*backupLocalityInfo*/ + introducedSpanFrontier, roachpb.Key{}, /*lowWaterMark*/ ) - lastSchemaChangeTime := findLastSchemaChangeTime(backupManifests, tbDesc, endTime) backupTableEntry := BackupTableEntry{ diff --git a/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-missing-data b/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-missing-data new file mode 100644 index 000000000000..f2c2e7437cf1 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-missing-data @@ -0,0 +1,404 @@ +# This test induces the skipped span introduction bug described in +# https://github.com/cockroachdb/cockroach/issues/88042 +# and ensures that RESTORE now fails with an expected error message. +# The exact timeline that induces the bug is the following: +# +# - t0: begin import jobs and pause it +# - t1: wildcard table or database backup with revision history +# - t2: cancel an import job, and continue a different one +# - t3: run an incremental backup with revision history +# - the testing knob will induce this incremental backup to skip the +# introduction of the imported tables +# - t4: restore the tables. +# - If we restore AOST after the imports completed/failed, the restore should fail as it now +# detects that the incremental backup at t3 forgot to re-introduce the span. +# - If we restore AOST before the import completed/failed, the restore should succeed, as the +# skipped span introduction is irrelevant. +# +# The test also ensures that similar backup/restore/import scenarios are safe from corruption: +# - the timeline above with table/database backups w/o revision history +# - revision history cluster backups +# - revision history table/database backups which includes an incremental backup between t1 and t2 + +new-server name=s1 knobs=skip-descriptor-change-intro +---- + +exec-sql +CREATE DATABASE d; +USE d; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +CREATE TABLE foofoo (i INT PRIMARY KEY, s STRING); +INSERT INTO foofoo VALUES (10, 'x0'); +CREATE TABLE goodfoo (i INT PRIMARY KEY, s STRING); +CREATE INDEX goodfoo_idx on goodfoo (s); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z'); +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'; +---- + + +exec-sql +EXPORT INTO CSV 'nodelocal://0/export1/' FROM SELECT * FROM baz; +---- + + +# Pause the import job, in order to back up the importing data. +import expect-pausepoint tag=a +IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv') +---- +job paused at pausepoint + + +import expect-pausepoint tag=aa +IMPORT INTO foofoo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv') +---- +job paused at pausepoint + +import expect-pausepoint tag=aaa +IMPORT INTO goodfoo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv') +---- +job paused at pausepoint + + +# Create 5 backup chains: +# +# 1. A corrupt database backup chain with revision history +# +# 2. A corrupt wildcard table backup chain with revision history +# +# 3. A clean database backup chain with revision history which includes two +# backups that observe the offline descriptors. +# +# 4. A clean data database backup chain without revision history +# +# 5. A clean cluster backup chain with revision history + + +exec-sql +BACKUP DATABASE d INTO 'nodelocal://0/database/' with revision_history; +---- + + +exec-sql +BACKUP TABLE d.* INTO 'nodelocal://0/table/' with revision_history; +---- + + +exec-sql +BACKUP DATABASE d INTO 'nodelocal://0/database_double_inc/' with revision_history; +---- + +exec-sql +BACKUP DATABASE d INTO 'nodelocal://0/database_no_hist/'; +---- + +exec-sql +BACKUP INTO 'nodelocal://0/cluster/' with revision_history; +---- + + +# Conduct another incremental backup on the database_double_inc chain +# and ensure the processors spin up +exec-sql +INSERT INTO baz VALUES (4, 'a'); +---- + +exec-sql +BACKUP DATABASE d INTO LATEST IN 'nodelocal://0/database_double_inc/' with revision_history; +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +save-cluster-ts tag=t0 +---- + +# Resume the job so the next set of incremental backups observes that tables are back online +job cancel=a +---- + +job cancel=aa +---- + +job tag=a wait-for-state=cancelled +---- + + +job tag=aa wait-for-state=cancelled +---- + +job resume=aaa +---- + +job tag=aaa wait-for-state=succeeded +---- + + +# Verify proper rollback +query-sql +SELECT count(*) FROM d.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d.foofoo; +---- +1 + +# Verify completed import +query-sql +SELECT count(*) FROM d.goodfoo; +---- +3 + + +# Because BackupRestoreTestingKnobs.SkipDescriptorChangeIntroduction is turned on, +# this backup will be unable to introduce foo, foofoo, and goodfoo. +exec-sql +BACKUP DATABASE d INTO LATEST IN 'nodelocal://0/database/' with revision_history; +---- + +exec-sql +BACKUP TABLE d.* INTO LATEST IN 'nodelocal://0/table/' with revision_history; +---- + +# However, the following backup chains avoid the bug +exec-sql +BACKUP DATABASE d INTO LATEST IN 'nodelocal://0/database_double_inc/' with revision_history; +---- + +exec-sql +BACKUP DATABASE d INTO LATEST IN 'nodelocal://0/database_no_hist/'; +---- + +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/cluster/' with revision_history; +---- + + +# Note that we're purposely skipping the reintroduction of foo, foofoo, goodfoo in the +# incremental to simulate the bug. +# - note that goodfoo did get backed up in the incremental backup, but only because when +# the import resumed, it re-ingested goodfoo using an addsstable request at a timestamp greater than +# the incremental backup start time. If it had been introduced, 6 rows should have appeared in +# SHOW BACKUP, as seen in different backup chains. +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/database/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' or object_name = 'goodfoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 0 incremental +d goodfoo table 3 incremental + + +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/table/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' or object_name = 'goodfoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 0 incremental +d goodfoo table 3 incremental + +# In all show backups below, the foo,foofoo, and goodfoo should get introduced +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/database_double_inc/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' or object_name = 'goodfoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 1 incremental +d goodfoo table 6 incremental + +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/database_no_hist/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' or object_name = 'goodfoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 1 incremental +d goodfoo table 6 incremental + +# Note that the cluster level SHOW BACKUP includes foo and foofoo in the full +# backup while the database ones do not. This is because CLUSTER +# backup manifests includes these in the Descriptors field (i.e. cluster backups +# explicitly backup offline tables, see #88043), while database +# backups only include these descriptors in manifest.DescriptorChanges (see +# #88042). +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/cluster/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' or object_name = 'goodfoo' +ORDER BY + start_time, database_name; +---- +d foo table 3 full +d foofoo table 4 full +d goodfoo table 3 full +d foo table 0 incremental +d foofoo table 1 incremental +d goodfoo table 6 incremental + + +############ +# Restore validation +############ + +# check that RESTORE will fail if any corrupt table restore is attempted +exec-sql expect-error-regex='table "foo" cannot be safely restored from this backup .*' +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/database/' with new_db_name = d_fail; +---- +regex matches error + +exec-sql expect-error-regex='table "goodfoo" cannot be safely restored from this backup .*' +RESTORE TABLE d.goodfoo FROM LATEST IN 'nodelocal://0/database/' with into_db = defaultdb; +---- +regex matches error + +exec-sql expect-error-regex='table "foofoo" cannot be safely restored from this backup .*' +RESTORE TABLE d.foofoo FROM LATEST IN 'nodelocal://0/database/' with into_db = defaultdb; +---- +regex matches error + + +# Ensure the restore succeeds if there are no corrupt tables in the target +exec-sql +RESTORE TABLE d.baz FROM LATEST IN 'nodelocal://0/database/' with into_db = defaultdb; +---- + + +# Check that the wildcard restore encounters the same errors + +exec-sql +CREATE DATABASE defaultdb2; +---- + +exec-sql expect-error-regex='table "foo" cannot be safely restored from this backup .*' +RESTORE TABLE d.* FROM LATEST IN 'nodelocal://0/table/' with into_db = defaultdb2; +---- +regex matches error + +exec-sql expect-error-regex='table "foofoo" cannot be safely restored from this backup .*' +RESTORE TABLE d.foofoo FROM LATEST IN 'nodelocal://0/table/' with into_db = defaultdb2; +---- +regex matches error + +# Ensure the restore succeeds if there are no corrupt tables in the target +exec-sql +RESTORE TABLE d.baz FROM LATEST IN 'nodelocal://0/table/' with into_db = defaultdb2; +---- + +###################### +# Check that you can restore AOST while the tables are offline +restore aost=t0 +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/database/' AS OF SYSTEM TIME t0 with new_db_name=d_t0; +---- + + +query-sql +SELECT table_name FROM [SHOW TABLES FROM d_t0]; +---- +baz + + +########################### +# Check the alternative scenarios +########################### + +# Check that a backup chain with the intermediate incremental backup restores properly +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/database_double_inc/' with new_db_name = d_double_inc; +---- + +query-sql +SELECT count(*) FROM d_double_inc.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d_double_inc.foofoo; +---- +1 + +query-sql +SELECT count(*) FROM d_double_inc.goodfoo; +---- +3 + + +# Check that that backup chain with the intermediate incremental backup restores properly +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/database_no_hist/' with new_db_name = d_no_hist; +---- + +query-sql +SELECT count(*) FROM d_no_hist.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d_no_hist.foofoo; +---- +1 + +query-sql +SELECT count(*) FROM d_no_hist.goodfoo; +---- +3 + + +# Check the cluster level restore +new-server name=s2 share-io-dir=s1 +---- + +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/cluster/'; +---- + +query-sql +SELECT count(*) FROM d.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d.foofoo; +---- +1 + + +query-sql +SELECT count(*) FROM d.goodfoo; +---- +3 diff --git a/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-rollback b/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-rollback new file mode 100644 index 000000000000..84b8ca31e6ae --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/in-progress-import-rollback @@ -0,0 +1,251 @@ + +# Ensure clear range induces full reintroduction of spans and that restore properly elides +# clear ranged data from initial backup +# - begin import jobs and pause it +# - run inc backup - verify inc has captured the data +# - roll it back it back non-mvcc +# - run an inc backup and ensure we reintroduce the table spans + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE d; +USE d; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +CREATE INDEX foo_idx ON foo (s); +CREATE INDEX foo_to_drop_idx ON foo (s); +CREATE TABLE foofoo (i INT PRIMARY KEY, s STRING); +INSERT INTO foofoo VALUES (10, 'x0'); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z'); +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'; +---- + + +exec-sql +EXPORT INTO CSV 'nodelocal://0/export1/' FROM SELECT * FROM baz; +---- + + +# Pause the import job, in order to back up the importing data. +import expect-pausepoint tag=a +IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv') +---- +job paused at pausepoint + + +import expect-pausepoint tag=aa +IMPORT INTO foofoo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv') +---- +job paused at pausepoint + + +# Ensure table, database, and cluster full backups capture importing rows. +exec-sql +BACKUP INTO 'nodelocal://0/cluster/' with revision_history; +---- + + +exec-sql +BACKUP DATABASE d INTO 'nodelocal://0/database/' with revision_history; +---- + +exec-sql +BACKUP TABLE d.* INTO 'nodelocal://0/table/' with revision_history; +---- + + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + + +# Resume the job so the next set of incremental backups observes that tables are back online +job cancel=a +---- + +job cancel=aa +---- + +job tag=a wait-for-state=cancelled +---- + + +job tag=aa wait-for-state=cancelled +---- + +# Verify proper rollback +query-sql +SELECT count(*) FROM d.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d.foofoo; +---- +1 + + +# Even though the full table will get backed up from ts=0 during the next round of incremental +# backups, only active indexes (foo_idx and foo_new_idx) should appear in the restored cluster. +exec-sql +DROP INDEX foo_to_drop_idx; +---- +NOTICE: the data for dropped indexes is reclaimed asynchronously +HINT: The reclamation delay can be customized in the zone configuration for the table. + +exec-sql +CREATE INDEX foo_new_idx ON foo (s); +---- + + +# Ensure incremental backups backup the newly online spans from ts=0, as the +# import was rolled back via non-mvcc clear range. So, backup 0 rows from foo +# (it was empty pre-import), and 1 row from foo (had 1 row pre-import); +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/cluster/' with revision_history; +---- + +exec-sql +BACKUP DATABASE d INTO LATEST IN 'nodelocal://0/database/' with revision_history; +---- + + +exec-sql +BACKUP TABLE d.* INTO LATEST IN 'nodelocal://0/table/' with revision_history; +---- + + +# Note that the cluster level SHOW BACKUP includes foo and foofoo in the full +# backup while the the table and database ones do not. This is because CLUSTER +# backup manifests includes these in the Descriptors field (i.e. cluster backups +# explicitly backup offline tables, see #88043), while table and database +# backups only include these descriptors in manifest.DescriptorChanges (see +# #88042). + +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/cluster/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' +ORDER BY + start_time, database_name; +---- +d foo table 3 full +d foofoo table 4 full +d foo table 0 incremental +d foofoo table 1 incremental + +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/database/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 1 incremental + + +query-sql +SELECT + database_name, object_name, object_type, rows, backup_type +FROM + [SHOW BACKUP FROM LATEST IN 'nodelocal://0/table/'] +WHERE + object_name = 'foo' or object_name = 'foofoo' +ORDER BY + start_time, database_name; +---- +d foo table 0 incremental +d foofoo table 1 incremental + + +# To verify the incremental backed up the pre-import state table, restore d and ensure all tables +# are in their pre-import state. + +new-server name=s2 share-io-dir=s1 +---- + +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/cluster/'; +---- + + +query-sql +SELECT count(*) FROM d.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d.foofoo; +---- +1 + + +query-sql +select DISTINCT index_name FROM [SHOW INDEXES FROM d.foo]; +---- +foo_pkey +foo_idx +foo_new_idx + + +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/database/' with new_db_name=d2; +---- + +query-sql +SELECT count(*) FROM d2.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d2.foofoo; +---- +1 + +query-sql +select DISTINCT index_name FROM [SHOW INDEXES FROM d2.foo]; +---- +foo_pkey +foo_idx +foo_new_idx + +exec-sql +CREATE DATABASE d3; +---- + +exec-sql +RESTORE TABLE d.* FROM LATEST IN 'nodelocal://0/database/' with into_db=d3; +---- + +query-sql +SELECT count(*) FROM d3.foo; +---- +0 + + +query-sql +SELECT count(*) FROM d3.foofoo; +---- +1 + +query-sql +select DISTINCT index_name FROM [SHOW INDEXES FROM d3.foo]; +---- +foo_pkey +foo_idx +foo_new_idx diff --git a/pkg/util/span/frontier.go b/pkg/util/span/frontier.go index 2c2661163046..8710286af83c 100644 --- a/pkg/util/span/frontier.go +++ b/pkg/util/span/frontier.go @@ -117,23 +117,36 @@ func makeSpan(r interval.Range) (res roachpb.Span) { // MakeFrontier returns a Frontier that tracks the given set of spans. func MakeFrontier(spans ...roachpb.Span) (*Frontier, error) { + return MakeFrontierAt(hlc.Timestamp{}, spans...) +} + +// MakeFrontierAt returns a Frontier that tracks the given set of spans at the given time. +func MakeFrontierAt(startAt hlc.Timestamp, spans ...roachpb.Span) (*Frontier, error) { f := &Frontier{tree: interval.NewTree(interval.ExclusiveOverlapper)} + if err := f.AddSpansAt(startAt, spans...); err != nil { + return nil, err + } + return f, nil +} + +// AddSpansAt adds the provided spans to the frontier at the provided timestamp. +func (f *Frontier) AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error { for _, s := range spans { span := makeSpan(s.AsRange()) e := &frontierEntry{ id: f.idAlloc, keys: span.AsRange(), span: span, - ts: hlc.Timestamp{}, + ts: startAt, } f.idAlloc++ if err := f.tree.Insert(e, true /* fast */); err != nil { - return nil, err + return err } heap.Push(&f.minHeap, e) } f.tree.AdjustRanges() - return f, nil + return nil } // Frontier returns the minimum timestamp being tracked. @@ -389,11 +402,14 @@ func (f *Frontier) Entries(fn Operation) { // 4| . h__k . // 3| . e__f . // 1 ---a----------------------m---q-- Frontier -// |___________span___________| +// +// |___________span___________| // // In the above example, frontier tracks [b, m) and the current frontier // timestamp is 1. SpanEntries for span [a-q) will invoke op with: -// ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1). +// +// ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1). +// // Note: neither [a-b) nor [m, q) will be emitted since they fall outside the spans // tracked by this frontier. func (f *Frontier) SpanEntries(span roachpb.Span, op Operation) {