diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index e9dafdb14a19..4d16ab29ce49 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1499,6 +1499,153 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) { checkInProgressBackupRestore(t, checkFraction, checkFraction) } +// TestRestoreCheckpointing checks that progress is being persisted to the job record +// when progress is made in the form of a span frontier. +func TestRestoreCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetProgressThresholds()() + + var allowResponse chan struct{} + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + <-allowResponse + }, + }, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true} + params.ServerArgs = testServerArgs + params.ServerArgs.Knobs = knobs + + ctx := context.Background() + _, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1, + InitManualReplication, params) + conn := sqlDB.DB.(*gosql.DB) + defer cleanupFn() + + sqlDB.Exec(t, `CREATE DATABASE r1`) + // We create these tables to ensure there are enough spans to restore and that we have partial progress + // when stopping the job. + var numTables int + for char := 'a'; char <= 'g'; char++ { + tableName := "r1." + string(char) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName)) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) + numTables += 1 + } + sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`) + + restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=r2` + + backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "a") + + var jobID jobspb.JobID + // The do function on a restore stops more progress from being persisted to the job record + // after some progress is made. + do := func(query string, check inProgressChecker) { + t.Logf("checking query %q", query) + + var totalExpectedResponses int + if strings.Contains(query, "RESTORE") { + // We expect restore to process each file in the backup individually. + // SST files are written per-range in the backup. So we expect the + // restore to process #(ranges) that made up the original table. + totalExpectedResponses = numTables + } else { + t.Fatal("expected query to be either a backup or restore") + } + jobDone := make(chan error) + allowResponse = make(chan struct{}, totalExpectedResponses) + + go func() { + _, err := conn.Exec(query) + jobDone <- err + }() + + // Allow one of the total expected responses to proceed. + for i := 0; i < 1; i++ { + allowResponse <- struct{}{} + } + + err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + return check(ctx, inProgressState{ + DB: conn, + backupTableID: backupTableID, + dir: dir, + name: "foo", + }) + }) + + // Close the channel to allow all remaining responses to proceed. We do this + // even if the above retry.ForDuration failed, otherwise the test will hang + // forever. + close(allowResponse) + + if err := <-jobDone; err != nil { + t.Fatalf("%q: %+v", query, err) + } + + if err != nil { + t.Log(err) + } + } + + checkFraction := func(ctx context.Context, ip inProgressState) error { + latestJobID, err := ip.latestJobID() + if err != nil { + return err + } + var fractionCompleted float32 + if err := ip.QueryRow( + `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`, + latestJobID, + ).Scan(&fractionCompleted); err != nil { + return err + } + if fractionCompleted < 0.01 || fractionCompleted > 0.99 { + return errors.Errorf( + "expected progress to be in range [0.01, 0.99] but got %f", + fractionCompleted, + ) + } + return nil + } + + do(restoreQuery, checkFraction) + + sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + require.NotEmpty(t, jobProgress.GetRestore().CompletedSpans) + require.Equal(t, hlc.Timestamp{WallTime: 1}, jobProgress.GetRestore().CompletedSpans[0].Timestamp) + require.NotEqual(t, numTables, len(jobProgress.GetRestore().CompletedSpans)) + + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobID) + sqlDB.Exec(t, `RESUME JOB $1`, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) + // After a job is resumed we want to test if all the job progress has been persisted to the job record. + // Additionally, we also want to verify that each of the tables has the same row count as the tables backed up. + jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + require.Equal(t, numTables, len(jobProgress.GetRestore().CompletedSpans)) + for _, completedSpan := range jobProgress.GetRestore().CompletedSpans { + require.Equal(t, hlc.Timestamp{WallTime: 1}, completedSpan.Timestamp) + } + for char := 'a'; char <= 'g'; char++ { + tableName := "r2." + string(char) + var rowCount int + sqlDB.QueryRow(t, fmt.Sprintf(`SELECT count(*) AS row_count FROM %s`, tableName)).Scan(&rowCount) + // We expect 2 since each table was initialized with 2 rows. + require.Equal(t, 2, rowCount) + } +} + func createAndWaitForJob( t *testing.T, db *sqlutils.SQLRunner, diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6e2a72678683..47861ce88c7e 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -14,7 +14,6 @@ import ( "fmt" "runtime" - "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) writeAtBatchTS = false } - - // disallowShadowingBelow is set to an empty hlc.Timestamp in release builds - // i.e. allow all shadowing without AddSSTable having to check for overlapping - // keys. This is because RESTORE is expected to ingest into an empty keyspace. - // If a restore job is resumed, the un-checkpointed spans that are re-ingested - // will shadow (equal key, value; different ts) the already ingested keys. - // - // NB: disallowShadowingBelow used to be unconditionally set to logical=1. - // This permissive value would allow shadowing in case the RESTORE has to - // retry ingestions but served to force evaluation of AddSSTable to check for - // overlapping keys. It was believed that even across resumptions of a restore - // job, `checkForKeyCollisions` would be inexpensive because of our frequent - // job checkpointing. Further investigation in - // https://github.com/cockroachdb/cockroach/issues/81116 revealed that our - // progress checkpointing could significantly lag behind the spans we have - // ingested, making a resumed restore spend a lot of time in - // `checkForKeyCollisions` leading to severely degraded performance. We have - // *never* seen a restore fail because of the invariant enforced by setting - // `disallowShadowingBelow` to a non-empty value, and so we feel comfortable - // disabling this check entirely. A future release will work on fixing our - // progress checkpointing so that we do not have a buildup of un-checkpointed - // work, at which point we can reassess reverting to logical=1. - disallowShadowingBelow := hlc.Timestamp{} - if !build.IsRelease() { - disallowShadowingBelow = hlc.Timestamp{Logical: 1} - } + disallowShadowingBelow := hlc.Timestamp{Logical: 1} var err error batcher, err = bulk.MakeSSTBatcher(ctx, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 70c54c2c347b..35e80efa5958 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -63,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -221,6 +223,52 @@ func makeBackupLocalityMap( return backupLocalityMap, nil } +// filterCompletedImportSpans takes imported spans and filters them based on completed spans. +// It returns a span frontier with completed spans forwarded to timestamp 1 and incomplete +// spans with empty timestamps and a toDoSpans slice that contains spans that have not yet +// been restored. +// Initially, a span frontier is constructed with all import spans with an empty timestamp. +// Then, each of the completed spans' timestamps are forwarded. +// Finally, we check the span frontier for completed spans and skip over these when adding +// to our toDoSpans. +func filterCompletedImportSpans( + completedSpans []jobspb.RestoreProgress_RestoreProgressFrontierEntry, + importSpans []execinfrapb.RestoreSpanEntry, +) (*spanUtils.Frontier, []execinfrapb.RestoreSpanEntry, error) { + frontierCreation := make([]roachpb.Span, 0, len(importSpans)) + for _, frontierSpanEntry := range importSpans { + frontierCreation = append(frontierCreation, frontierSpanEntry.Span) + } + checkpointFrontier, err := spanUtils.MakeFrontier(frontierCreation...) + if err != nil { + return nil, nil, err + } + for _, completedSpanEntry := range completedSpans { + _, err = checkpointFrontier.Forward(completedSpanEntry.Entry, completedSpanEntry.Timestamp) + if err != nil { + return nil, nil, err + } + } + + var toDoSpans []execinfrapb.RestoreSpanEntry + for _, importSpan := range importSpans { + skip := false + checkpointFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if ts.Equal(hlc.Timestamp{WallTime: 1}) { + skip = true + return spanUtils.ContinueMatch + } + skip = false + return spanUtils.StopMatch + }) + if !skip { + toDoSpans = append(toDoSpans, importSpan) + } + } + return checkpointFrontier, toDoSpans, err +} + // restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func restore( @@ -256,9 +304,10 @@ func restore( mu := struct { syncutil.Mutex - highWaterMark int - res roachpb.RowCount - requestsCompleted []bool + highWaterMark int + res roachpb.RowCount + requestsCompleted []bool + checkpointFrontier *spanUtils.Frontier }{ highWaterMark: -1, } @@ -284,6 +333,13 @@ func restore( importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + mu.checkpointFrontier, importSpans, err = filterCompletedImportSpans(job.Progress().Details.(*jobspb.Progress_Restore).Restore.CompletedSpans, importSpans) + if err != nil { + return emptyRowCount, err + } + } + if len(importSpans) == 0 { // There are no files to restore. return emptyRowCount, nil @@ -332,8 +388,20 @@ func restore( switch d := details.(type) { case *jobspb.Progress_Restore: mu.Lock() - if mu.highWaterMark >= 0 { - d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + completedSpansSlice := make([]jobspb.RestoreProgress_RestoreProgressFrontierEntry, 0) + mu.checkpointFrontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { + // We only append completed spans to store in the jobs record. + if !ts.IsEmpty() { + completedSpansSlice = append(completedSpansSlice, jobspb.RestoreProgress_RestoreProgressFrontierEntry{Entry: sp, Timestamp: ts}) + } + return spanUtils.ContinueMatch + }) + d.Restore.CompletedSpans = completedSpansSlice + } else { + if mu.highWaterMark >= 0 { + d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + } } mu.Unlock() default: @@ -355,24 +423,37 @@ func restore( // to progCh. for progress := range progCh { mu.Lock() - var progDetails backuppb.RestoreProgress - if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { - log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) - } + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } - mu.res.Add(progDetails.Summary) - idx := progDetails.ProgressIdx + mu.res.Add(progDetails.Summary) + _, err = mu.checkpointFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1}) + if err != nil { + log.Errorf(ctx, "unable to forward timestamp: %+v", err) + } + } else { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, importSpans[idx], - ) - } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j + mu.res.Add(progDetails.Summary) + idx := progDetails.ProgressIdx + + // Assert that we're actually marking the correct span done. See #23977. + if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { + mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, importSpans[idx], + ) + } + mu.requestsCompleted[idx] = true + for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } } mu.Unlock() @@ -380,6 +461,7 @@ func restore( // progress. requestFinishedCh <- struct{}{} } + return nil } tasks = append(tasks, jobCheckpointLoop) diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 4a837d2aebfe..8ce61f03615b 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -87,6 +87,7 @@ func makeSimpleImportSpans( backups []backuppb.BackupManifest, backupLocalityMap map[int]storeByLocalityKV, introducedSpanFrontier *spanUtils.Frontier, + // TODO(mb): Remove lowWaterMark argument in 23.1. lowWaterMark roachpb.Key, targetSize int64, ) []execinfrapb.RestoreSpanEntry { diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index abc06cbcda96..aa748d60a314 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -504,3 +505,64 @@ func TestRestoreEntryCover(t *testing.T) { } } } + +// TestFilterCompletedImportSpans checks that all completed spans are +// skipped over when creating the completed span frontier and toDoSpans slice in memory. +func TestFilterCompletedImportSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + + sp := func(start, end string) roachpb.Span { + return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} + } + f := func(start, end, path string) backuppb.BackupManifest_File { + return backuppb.BackupManifest_File{Span: sp(start, end), Path: path} + } + paths := func(names ...string) []execinfrapb.RestoreFileSpec { + r := make([]execinfrapb.RestoreFileSpec, len(names)) + for i := range names { + r[i].Path = names[i] + } + return r + } + + // Setup and test the example in the comment of makeSimpleImportSpans. + spans := []roachpb.Span{sp("a", "f"), sp("f", "i"), sp("l", "m")} + backups := []backuppb.BackupManifest{ + {Files: []backuppb.BackupManifest_File{f("a", "c", "1"), f("c", "e", "2"), f("h", "i", "3")}}, + {Files: []backuppb.BackupManifest_File{f("b", "d", "4"), f("g", "i", "5")}}, + {Files: []backuppb.BackupManifest_File{f("a", "h", "6"), f("j", "k", "7")}}, + {Files: []backuppb.BackupManifest_File{f("h", "i", "8"), f("l", "m", "9")}}, + } + + emptySpanFrontier, err := spanUtils.MakeFrontier(roachpb.Span{}) + require.NoError(t, err) + + importSpans := makeSimpleImportSpans(spans, backups, nil, emptySpanFrontier, nil, 2<<20) + + // Tests that no completed spans returns a frontier with all empty timestamps and the original import span slice. + completedImportSpans, newImportSpans, err := filterCompletedImportSpans(nil, importSpans) + require.NoError(t, err) + completedImportSpans.Entries(func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + require.Equal(t, hlc.Timestamp{}, ts) + return spanUtils.ContinueMatch + }) + require.Equal(t, importSpans, newImportSpans) + + // Tests that one completed span returns a frontier with one forwarded timestamp + // and the import spans are reduced by one span. + mockCompletedSpans := []jobspb.RestoreProgress_RestoreProgressFrontierEntry{{sp("a", "f"), hlc.Timestamp{WallTime: 1}}} + completedImportSpans, newImportSpans, err = filterCompletedImportSpans(mockCompletedSpans, importSpans) + require.NoError(t, err) + completedImportSpans.Entries(func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if !ts.IsEmpty() { + require.Equal(t, sp("a", "f"), s) + } + return spanUtils.ContinueMatch + }) + require.Equal(t, []execinfrapb.RestoreSpanEntry{ + {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("l", "m"), Files: paths("9")}, + }, newImportSpans) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 70de443bbcc5..b29c62885bd7 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -451,7 +451,14 @@ message RestoreDetails { message RestoreProgress { + message RestoreProgressFrontierEntry { + roachpb.Span entry = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + } + bytes high_water = 1; + // CompletedSpans tracks the completed spans in a RESTORE operation. + repeated RestoreProgressFrontierEntry completed_spans = 2 [(gogoproto.nullable) = false]; } message ImportDetails {