diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 88345cf4dbbf..dd2b4a886fc4 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -169,6 +169,7 @@ go_test( "key_rewriter_test.go", "main_test.go", "partitioned_backup_test.go", + "restore_checkpointing_test.go", "restore_data_processor_test.go", "restore_mid_schema_change_test.go", "restore_old_sequences_test.go", diff --git a/pkg/ccl/backupccl/restore_checkpointing_test.go b/pkg/ccl/backupccl/restore_checkpointing_test.go new file mode 100644 index 000000000000..8cfe2a54d6ea --- /dev/null +++ b/pkg/ccl/backupccl/restore_checkpointing_test.go @@ -0,0 +1,175 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + gosql "database/sql" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestRestoreCheckpointing checks that all completed spans are +// skipped over when creating the slice for makeSimpleImportSpans. +func TestRestoreCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetProgressThresholds()() + + var allowResponse chan struct{} + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + <-allowResponse + }, + }, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true} + params.ServerArgs = testServerArgs + params.ServerArgs.Knobs = knobs + + ctx := context.Background() + _, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, 1, + InitManualReplication, params) + conn := sqlDB.DB.(*gosql.DB) + defer cleanupFn() + + sqlDB.Exec(t, `CREATE DATABASE r1`) + sqlDB.Exec(t, `CREATE TABLE r1.foo (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.baz (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.a (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.b (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.c (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.d (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `CREATE TABLE r1.e (id INT PRIMARY KEY, s STRING)`) + sqlDB.Exec(t, `INSERT INTO r1.foo VALUES (1, 'x'),(2,'y')`) + sqlDB.Exec(t, `INSERT INTO r1.baz VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `INSERT INTO r1.a VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `INSERT INTO r1.b VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `INSERT INTO r1.c VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `INSERT INTO r1.d VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `INSERT INTO r1.e VALUES (11, 'xx'),(22,'yy')`) + sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/test-root'`) + + restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/test-root' WITH detached, new_db_name=data2` + + backupTableID := sqlutils.QueryTableID(t, conn, "r1", "public", "foo") + + var jobID jobspb.JobID + do := func(query string, check inProgressChecker) { + t.Logf("checking query %q", query) + + var totalExpectedResponses int + if strings.Contains(query, "RESTORE") { + // We expect restore to process each file in the backup individually. + // SST files are written per-range in the backup. So we expect the + // restore to process #(ranges) that made up the original table. + totalExpectedResponses = 5 + } else { + t.Fatal("expected query to be either a backup or restore") + } + jobDone := make(chan error) + allowResponse = make(chan struct{}, totalExpectedResponses) + + go func() { + _, err := conn.Exec(query) + jobDone <- err + }() + + // Allow half the total expected responses to proceed. + for i := 0; i < totalExpectedResponses/2; i++ { + allowResponse <- struct{}{} + } + + err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + return check(ctx, inProgressState{ + DB: conn, + backupTableID: backupTableID, + dir: dir, + name: "foo", + }) + }) + + // Close the channel to allow all remaining responses to proceed. We do this + // even if the above retry.ForDuration failed, otherwise the test will hang + // forever. + close(allowResponse) + + if err := <-jobDone; err != nil { + t.Fatalf("%q: %+v", query, err) + } + + if err != nil { + t.Log(err) + } + } + + progressQuery := `select crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Progress', progress) as progress from system.jobs where id=$1` + + var progressMessage string + //var unused interface{} + + checkFraction := func(ctx context.Context, ip inProgressState) error { + jobID, err := ip.latestJobID() + if err != nil { + return err + } + var fractionCompleted float32 + if err := ip.QueryRow( + `SELECT fraction_completed FROM crdb_internal.jobs WHERE job_id = $1`, + jobID, + ).Scan(&fractionCompleted); err != nil { + return err + } + ip.QueryRow(progressQuery, jobID).Scan(&progressMessage) + t.Logf(progressMessage) + if fractionCompleted < 0.01 || fractionCompleted > 0.99 { + return errors.Errorf( + "expected progress to be in range [0.01, 0.99] but got %f", + fractionCompleted, + ) + } + return nil + } + + do(restoreQuery, checkFraction) + + sqlDB.QueryRow(t, `SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + sqlDB.QueryRow(t, progressQuery, jobID).Scan(&progressMessage) + require.NotNil(t, progressMessage) + require.Contains(t, progressMessage, "\"wallTime\": \"1\"") + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobID) + sqlDB.Exec(t, `RESUME JOB $1`, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) + jobProgress = jobutils.GetJobProgress(t, sqlDB, jobID) + require.NotNil(t, jobProgress) + require.Equal(t, 7, len(jobProgress.GetRestore().Frontier)) +} diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6e2a72678683..47861ce88c7e 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -14,7 +14,6 @@ import ( "fmt" "runtime" - "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -414,32 +413,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) writeAtBatchTS = false } - - // disallowShadowingBelow is set to an empty hlc.Timestamp in release builds - // i.e. allow all shadowing without AddSSTable having to check for overlapping - // keys. This is because RESTORE is expected to ingest into an empty keyspace. - // If a restore job is resumed, the un-checkpointed spans that are re-ingested - // will shadow (equal key, value; different ts) the already ingested keys. - // - // NB: disallowShadowingBelow used to be unconditionally set to logical=1. - // This permissive value would allow shadowing in case the RESTORE has to - // retry ingestions but served to force evaluation of AddSSTable to check for - // overlapping keys. It was believed that even across resumptions of a restore - // job, `checkForKeyCollisions` would be inexpensive because of our frequent - // job checkpointing. Further investigation in - // https://github.com/cockroachdb/cockroach/issues/81116 revealed that our - // progress checkpointing could significantly lag behind the spans we have - // ingested, making a resumed restore spend a lot of time in - // `checkForKeyCollisions` leading to severely degraded performance. We have - // *never* seen a restore fail because of the invariant enforced by setting - // `disallowShadowingBelow` to a non-empty value, and so we feel comfortable - // disabling this check entirely. A future release will work on fixing our - // progress checkpointing so that we do not have a buildup of un-checkpointed - // work, at which point we can reassess reverting to logical=1. - disallowShadowingBelow := hlc.Timestamp{} - if !build.IsRelease() { - disallowShadowingBelow = hlc.Timestamp{Logical: 1} - } + disallowShadowingBelow := hlc.Timestamp{Logical: 1} var err error batcher, err = bulk.MakeSSTBatcher(ctx, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 7626a6925019..ea2f84ca8ff7 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -63,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -259,6 +261,7 @@ func restore( highWaterMark int res roachpb.RowCount requestsCompleted []bool + spanFrontierSlice []jobspb.RestoreProgress_RestoreProgressFrontierEntry }{ highWaterMark: -1, } @@ -281,8 +284,66 @@ func restore( // which are grouped by keyrange. highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - importSpans := makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, - backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) + var importSpans []execinfrapb.RestoreSpanEntry + var checkpointingSpanFrontier *spanUtils.Frontier + if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) { + spanFrontierSlice := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Frontier + var frontierCreationSlice []roachpb.Span + for _, frontierSpanEntry := range spanFrontierSlice { + frontierCreationSlice = append(frontierCreationSlice, frontierSpanEntry.Entry) + } + checkpointingSpanFrontier, err = spanUtils.MakeFrontier(frontierCreationSlice...) + for _, frontierSpanEntry := range spanFrontierSlice { + _, err = checkpointingSpanFrontier.Forward(frontierSpanEntry.Entry, frontierSpanEntry.Timestamp) + if err != nil { + return emptyRowCount, err + } + } + importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, + backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) + var modifiedImportSpans []execinfrapb.RestoreSpanEntry + for _, importSpan := range importSpans { + skip := false + checkpointingSpanFrontier.SpanEntries(importSpan.Span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if ts.Equal(hlc.Timestamp{WallTime: 1}) { + skip = true + return spanUtils.ContinueMatch + } + skip = false + return spanUtils.StopMatch + }) + if skip { + continue + } + modifiedImportSpans = append(modifiedImportSpans, importSpan) + } + importSpans = modifiedImportSpans + if len(spanFrontierSlice) == 0 { + // construct span frontier + var checkpointingFrontierImportSpans = make([]roachpb.Span, len(importSpans)) + for i, span := range importSpans { + checkpointingFrontierImportSpans[i] = span.Span + } + checkpointingSpanFrontier, err = spanUtils.MakeFrontier(checkpointingFrontierImportSpans...) + if err != nil { + return emptyRowCount, err + } + } else { + var importSpansSlice = make([]roachpb.Span, len(importSpans)) + for i, importSpanEntry := range importSpans { + importSpansSlice[i] = importSpanEntry.Span + } + err = checkpointingSpanFrontier.AddSpansAt(hlc.Timestamp{}, importSpansSlice...) + if err != nil { + return emptyRowCount, err + } + mu.spanFrontierSlice = spanFrontierSlice + } + } else { + importSpans = makeSimpleImportSpans(dataToRestore.getSpans(), backupManifests, + backupLocalityMap, introducedSpanFrontier, highWaterMark, targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) + } if len(importSpans) == 0 { // There are no files to restore. @@ -335,6 +396,9 @@ func restore( if mu.highWaterMark >= 0 { d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key } + if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) { + d.Restore.Frontier = mu.spanFrontierSlice + } mu.Unlock() default: log.Errorf(progressedCtx, "job payload had unexpected type %T", d) @@ -355,24 +419,43 @@ func restore( // to progCh. for progress := range progCh { mu.Lock() - var progDetails backuppb.RestoreProgress - if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { - log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) - } + if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1Start) { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } - mu.res.Add(progDetails.Summary) - idx := progDetails.ProgressIdx + mu.res.Add(progDetails.Summary) + _, err = checkpointingSpanFrontier.Forward(progDetails.DataSpan, hlc.Timestamp{WallTime: 1}) + if err != nil { + log.Errorf(ctx, "unable to forward timestamp: %+v", err) + } - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, importSpans[idx], - ) - } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j + mu.spanFrontierSlice = append(mu.spanFrontierSlice, + jobspb.RestoreProgress_RestoreProgressFrontierEntry{ + Entry: progDetails.DataSpan, + Timestamp: hlc.Timestamp{WallTime: 1}, + }) + } else { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } + + mu.res.Add(progDetails.Summary) + idx := progDetails.ProgressIdx + + // Assert that we're actually marking the correct span done. See #23977. + if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { + mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, importSpans[idx], + ) + } + mu.requestsCompleted[idx] = true + for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } } mu.Unlock() @@ -380,6 +463,10 @@ func restore( // progress. requestFinishedCh <- struct{}{} } + //if execCtx.ExecCfg().Settings.Version.IsActive(restoreCtx, clusterversion.V23_1TenantNames) { + // + //} + return nil } tasks = append(tasks, jobCheckpointLoop) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 47af9b73eff5..68d7e87b2302 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -369,6 +369,10 @@ func (j *Job) FractionProgressed( FractionCompleted: fractionCompleted, } ju.UpdateProgress(md.Progress) + err := j.registry.CheckPausepoint("restore.load_first_dataspan") + if err != nil { + return err + } return nil }) } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index f7fc9ffeb8d5..34b7d77987ea 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -445,7 +445,14 @@ message RestoreDetails { message RestoreProgress { + message RestoreProgressFrontierEntry { + roachpb.Span entry = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + } + bytes high_water = 1; + // frontier tracks the completed spans in a RESTORE operation. + repeated RestoreProgressFrontierEntry frontier = 2 [(gogoproto.nullable) = false]; } message ImportDetails {