diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index f88e869994a9..39861d41a52a 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -169,6 +169,7 @@ go_test( "key_rewriter_test.go", "main_test.go", "partitioned_backup_test.go", + "restore_checkpointing_test.go", "restore_data_processor_test.go", "restore_mid_schema_change_test.go", "restore_old_sequences_test.go", diff --git a/pkg/ccl/backupccl/restore_checkpointing_test.go b/pkg/ccl/backupccl/restore_checkpointing_test.go new file mode 100644 index 000000000000..f00c89d7da39 --- /dev/null +++ b/pkg/ccl/backupccl/restore_checkpointing_test.go @@ -0,0 +1,171 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + gosql "database/sql" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestRestoreCheckpointing checks that all completed spans are +// skipped over when creating the slice for makeSimpleImportSpans. +func TestRestoreCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetProgressThresholds()() + + var allowResponse chan struct{} + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + <-allowResponse + }, + }, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true} + params.ServerArgs = testServerArgs + params.ServerArgs.Knobs = knobs + + ctx := context.Background() + _, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1, + InitManualReplication, params) + conn := sqlDB.DB.(*gosql.DB) + defer cleanupFn() + + sqlDB.Exec(t, `CREATE DATABASE r1`) + for char := 'a'; char <= 'g'; char++ { + tableName := "r1." + string(char) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName)) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) + } + sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`) + + restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=r2` + + backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "a") + + var jobID jobspb.JobID + // The do function on a restore stops more progress from being persisted to the job record + // after some progress is made. + do := func(query string, check inProgressChecker) { + t.Logf("checking query %q", query) + + var totalExpectedResponses int + if strings.Contains(query, "RESTORE") { + // We expect restore to process each file in the backup individually. + // SST files are written per-range in the backup. So we expect the + // restore to process #(ranges) that made up the original table. + totalExpectedResponses = 7 + } else { + t.Fatal("expected query to be either a backup or restore") + } + jobDone := make(chan error) + allowResponse = make(chan struct{}, totalExpectedResponses) + + go func() { + _, err := conn.Exec(query) + jobDone <- err + }() + + // Allow one of the total expected responses to proceed. + for i := 0; i < 1; i++ { + allowResponse <- struct{}{} + } + + err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + return check(ctx, inProgressState{ + DB: conn, + backupTableID: backupTableID, + dir: dir, + name: "foo", + }) + }) + + // Close the channel to allow all remaining responses to proceed. We do this + // even if the above retry.ForDuration failed, otherwise the test will hang + // forever. + close(allowResponse) + + if err := <-jobDone; err != nil { + t.Fatalf("%q: %+v", query, err) + } + + if err != nil { + t.Log(err) + } + } + + progressQuery := `select crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) as progress from system.jobs where id=$1` + + var progressMessage string + + checkFraction := func(ctx context.Context, ip inProgressState) error { + latestJobID, err := ip.latestJobID() + if err != nil { + return err + } + var fractionCompleted float32 + if err := ip.QueryRow( + `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`, + latestJobID, + ).Scan(&fractionCompleted); err != nil { + return err + } + err = ip.QueryRow(progressQuery, latestJobID).Scan(&progressMessage) + if err != nil { + return err + } + t.Logf(progressMessage) + if fractionCompleted < 0.01 || fractionCompleted > 0.99 { + return errors.Errorf( + "expected progress to be in range [0.01, 0.99] but got %f", + fractionCompleted, + ) + } + return nil + } + + do(restoreQuery, checkFraction) + + sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + require.NotEmpty(t, jobProgress.GetRestore().CompletedSpans) + require.Equal(t, jobProgress.GetRestore().CompletedSpans[0].Timestamp, hlc.Timestamp{WallTime: 1}) + + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobID) + sqlDB.Exec(t, `RESUME JOB $1`, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) + jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + require.Equal(t, 7, len(jobProgress.GetRestore().CompletedSpans)) +} diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6e2a72678683..47861ce88c7e 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -14,7 +14,6 @@ import ( "fmt" "runtime" - "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) writeAtBatchTS = false } - - // disallowShadowingBelow is set to an empty hlc.Timestamp in release builds - // i.e. allow all shadowing without AddSSTable having to check for overlapping - // keys. This is because RESTORE is expected to ingest into an empty keyspace. - // If a restore job is resumed, the un-checkpointed spans that are re-ingested - // will shadow (equal key, value; different ts) the already ingested keys. - // - // NB: disallowShadowingBelow used to be unconditionally set to logical=1. - // This permissive value would allow shadowing in case the RESTORE has to - // retry ingestions but served to force evaluation of AddSSTable to check for - // overlapping keys. It was believed that even across resumptions of a restore - // job, `checkForKeyCollisions` would be inexpensive because of our frequent - // job checkpointing. Further investigation in - // https://github.com/cockroachdb/cockroach/issues/81116 revealed that our - // progress checkpointing could significantly lag behind the spans we have - // ingested, making a resumed restore spend a lot of time in - // `checkForKeyCollisions` leading to severely degraded performance. We have - // *never* seen a restore fail because of the invariant enforced by setting - // `disallowShadowingBelow` to a non-empty value, and so we feel comfortable - // disabling this check entirely. A future release will work on fixing our - // progress checkpointing so that we do not have a buildup of un-checkpointed - // work, at which point we can reassess reverting to logical=1. - disallowShadowingBelow := hlc.Timestamp{} - if !build.IsRelease() { - disallowShadowingBelow = hlc.Timestamp{Logical: 1} - } + disallowShadowingBelow := hlc.Timestamp{Logical: 1} var err error batcher, err = bulk.MakeSSTBatcher(ctx, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 531d1f512d4e..824b7fa224ff 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -63,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -221,6 +223,65 @@ func makeBackupLocalityMap( return backupLocalityMap, nil } +func filterCompletedImportSpans( + completedSpansSlice []jobspb.RestoreProgress_RestoreProgressFrontierEntry, + importSpans []execinfrapb.RestoreSpanEntry, +) (*spanUtils.Frontier, []execinfrapb.RestoreSpanEntry, error) { + var frontierCreationSlice []roachpb.Span + var modifiedImportSpans []execinfrapb.RestoreSpanEntry + for _, frontierSpanEntry := range completedSpansSlice { + frontierCreationSlice = append(frontierCreationSlice, frontierSpanEntry.Entry) + } + checkpointingSpanFrontier, err := spanUtils.MakeFrontier(frontierCreationSlice...) + if err != nil { + return checkpointingSpanFrontier, modifiedImportSpans, err + } + for _, completedSpanEntry := range completedSpansSlice { + _, err = checkpointingSpanFrontier.Forward(completedSpanEntry.Entry, completedSpanEntry.Timestamp) + if err != nil { + return checkpointingSpanFrontier, modifiedImportSpans, err + } + } + + for _, importSpan := range importSpans { + skip := false + checkpointingSpanFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if ts.Equal(hlc.Timestamp{WallTime: 1}) { + skip = true + return spanUtils.ContinueMatch + } + skip = false + return spanUtils.StopMatch + }) + if skip { + continue + } + modifiedImportSpans = append(modifiedImportSpans, importSpan) + } + if len(completedSpansSlice) == 0 { + // construct span frontier + var checkpointingFrontierImportSpans = make([]roachpb.Span, len(modifiedImportSpans)) + for i, span := range modifiedImportSpans { + checkpointingFrontierImportSpans[i] = span.Span + } + checkpointingSpanFrontier, err = spanUtils.MakeFrontier(checkpointingFrontierImportSpans...) + if err != nil { + return checkpointingSpanFrontier, modifiedImportSpans, err + } + } else { + var importSpansSlice = make([]roachpb.Span, len(modifiedImportSpans)) + for i, importSpanEntry := range modifiedImportSpans { + importSpansSlice[i] = importSpanEntry.Span + } + err = checkpointingSpanFrontier.AddSpansAt(hlc.Timestamp{}, importSpansSlice...) + if err != nil { + return checkpointingSpanFrontier, modifiedImportSpans, err + } + } + return checkpointingSpanFrontier, modifiedImportSpans, err +} + // restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func restore( @@ -256,9 +317,10 @@ func restore( mu := struct { syncutil.Mutex - highWaterMark int - res roachpb.RowCount - requestsCompleted []bool + highWaterMark int + res roachpb.RowCount + requestsCompleted []bool + checkpointingSpanFrontier *spanUtils.Frontier }{ highWaterMark: -1, } @@ -281,9 +343,17 @@ func restore( // which are grouped by keyrange. highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, + var importSpans []execinfrapb.RestoreSpanEntry + importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + mu.checkpointingSpanFrontier, importSpans, err = filterCompletedImportSpans(job.Progress().Details.(*jobspb.Progress_Restore).Restore.CompletedSpans, importSpans) + if err != nil { + return emptyRowCount, err + } + } + if len(importSpans) == 0 { // There are no files to restore. return emptyRowCount, nil @@ -332,8 +402,20 @@ func restore( switch d := details.(type) { case *jobspb.Progress_Restore: mu.Lock() - if mu.highWaterMark >= 0 { - d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + completedSpansSlice := make([]jobspb.RestoreProgress_RestoreProgressFrontierEntry, 0) + mu.checkpointingSpanFrontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { + // We only append completed spans to store in the jobs record. + if ts.Equal(hlc.Timestamp{WallTime: 1}) { + completedSpansSlice = append(completedSpansSlice, jobspb.RestoreProgress_RestoreProgressFrontierEntry{Entry: sp, Timestamp: ts}) + } + return spanUtils.ContinueMatch + }) + d.Restore.CompletedSpans = completedSpansSlice + } else { + if mu.highWaterMark >= 0 { + d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + } } mu.Unlock() default: @@ -355,24 +437,37 @@ func restore( // to progCh. for progress := range progCh { mu.Lock() - var progDetails backuppb.RestoreProgress - if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { - log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) - } + if clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } - mu.res.Add(progDetails.Summary) - idx := progDetails.ProgressIdx + mu.res.Add(progDetails.Summary) + _, err = mu.checkpointingSpanFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1}) + if err != nil { + log.Errorf(ctx, "unable to forward timestamp: %+v", err) + } + } else { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, importSpans[idx], - ) - } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j + mu.res.Add(progDetails.Summary) + idx := progDetails.ProgressIdx + + // Assert that we're actually marking the correct span done. See #23977. + if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { + mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, importSpans[idx], + ) + } + mu.requestsCompleted[idx] = true + for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } } mu.Unlock() @@ -380,6 +475,7 @@ func restore( // progress. requestFinishedCh <- struct{}{} } + return nil } tasks = append(tasks, jobCheckpointLoop) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 70de443bbcc5..b29c62885bd7 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -451,7 +451,14 @@ message RestoreDetails { message RestoreProgress { + message RestoreProgressFrontierEntry { + roachpb.Span entry = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + } + bytes high_water = 1; + // CompletedSpans tracks the completed spans in a RESTORE operation. + repeated RestoreProgressFrontierEntry completed_spans = 2 [(gogoproto.nullable) = false]; } message ImportDetails {