diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index b8baf2e98e9a..a5025629a3d7 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "restore_job.go", "restore_planning.go", "restore_processor_planning.go", + "restore_progress.go", "restore_schema_change_creation.go", "restore_span_covering.go", "schedule_exec.go", @@ -182,6 +183,7 @@ go_test( "restore_mid_schema_change_test.go", "restore_old_sequences_test.go", "restore_old_versions_test.go", + "restore_progress_test.go", "restore_span_covering_test.go", "schedule_pts_chaining_test.go", "show_test.go", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 21038602ec2d..f427ca11e25b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" "github.com/cockroachdb/cockroach/pkg/cloud/gcp" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1488,6 +1489,125 @@ WHERE do(`RESTORE data.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb')`, checkRestore) } +// TestRestoreCheckpointing checks that progress persists to the job record +// using the new span frontier. The test takes the following approach: +// +// 1. Backup and restore a database with x tables, each of which will have a +// disjoint span. This implies the restore will process x requiredSpans. Since +// each required span has two rows, each required span will result in a single +// restoreSpanEntry, and consequently, a single AddSSTable flush. Because the +// checkpoint frontier merges disjoint required spans, the persisted frontier +// should only have 1 entry. +// +// 2. This test will then block and pause the restore after y AddSStable +// flushes, and assert that y disjoint spans have been persisted to the +// progress frontier. +// +// 3. The test will then resume the restore job, allowing it to complete. +// Afterwards, the test asserts that all x spans have been persisted to the +// frontier, and that only x-y AddSStable requests were sent after resume, +// implying that on resume, no work already persisted to the frontier was +// duplicated. +func TestRestoreCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetProgressThresholds()() + + // totalEntries represents the number of entries to appear in the persisted frontier. + totalEntries := 7 + entriesBeforePause := 4 + entriesCount := 0 + var alreadyPaused atomic.Bool + postResumeCount := 0 + blockDBRestore := make(chan struct{}) + waitForProgress := make(chan struct{}) + var mu syncutil.Mutex + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + // Because the restore processor has several workers that + // concurrently send addsstable requests and because all workers will + // wait on the lock below, when one flush gets blocked on the + // pre-pause blockDBRestore chan, several pre-pause requests will + // queue up behind it. Because of the lock, these requests will + // actually complete _after_ the job was paused and will not get + // counted in the checkpointed span. For test correctness, we do not + // want to count these requests as part of postResumedCount by + // checking if the job was paused in each request before it began + // waiting for the lock. + wasPausedBeforeWaiting := alreadyPaused.Load() + mu.Lock() + defer mu.Unlock() + if entriesCount == entriesBeforePause { + close(waitForProgress) + <-blockDBRestore + } + entriesCount++ + if wasPausedBeforeWaiting { + postResumeCount++ + } + }, + }, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true} + params.ServerArgs = testServerArgs + params.ServerArgs.Knobs = knobs + + _, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1, + InitManualReplication, params) + defer cleanupFn() + + var jobID jobspb.JobID + checkPersistedSpanLength := func(expectedCompletedSpans int) { + testutils.SucceedsSoon(t, func() error { + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + numSpans := len(jobProgress.GetRestore().Checkpoint) + if numSpans != expectedCompletedSpans { + return errors.Newf("expected %d checkpoints, but only see %d", expectedCompletedSpans, numSpans) + } + return nil + }) + } + + sqlDB.Exec(t, `CREATE DATABASE d`) + + for i := 1; i <= totalEntries; i++ { + tableName := fmt.Sprintf("d.t%d", i) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName)) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) + } + sqlDB.Exec(t, `BACKUP DATABASE d INTO $1`, localFoo) + + sqlDB.QueryRow(t, `RESTORE DATABASE d FROM LATEST IN $1 WITH DETACHED, new_db_name=d2`, localFoo).Scan(&jobID) + + // Pause the job after some progress has been logged. + <-waitForProgress + + // To ensure that progress gets persisted, sleep well beyond the test only job update interval. + time.Sleep(time.Second) + + sqlDB.Exec(t, `PAUSE JOB $1`, &jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobID) + // NB: we don't check the persisted span length here, though we expect there + // to be 1 completed persisted span most of the time. Occasionally, two + // disjoint spans may be persisted if the required spans are processed out of + // order; i.e. table 5's span gets processed before table 4's. + require.Equal(t, entriesBeforePause, entriesCount) + + close(blockDBRestore) + alreadyPaused.Store(true) + sqlDB.Exec(t, `RESUME JOB $1`, &jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) + + // Ensure that no persisted work was repeated on resume and that all work was persisted. + checkPersistedSpanLength(1) + require.Equal(t, totalEntries-entriesBeforePause, postResumeCount) +} + func TestBackupRestoreSystemJobsProgress(t *testing.T) { defer leaktest.AfterTest(t)() skip.WithIssue(t, 68571, "flaky test") @@ -1526,14 +1646,16 @@ func createAndWaitForJob( descriptorIDs []descpb.ID, details jobspb.Details, progress jobspb.ProgressDetails, + clusterVersionAtJobStart roachpb.Version, ) { t.Helper() now := timeutil.ToUnixMicros(timeutil.Now()) payload, err := protoutil.Marshal(&jobspb.Payload{ - UsernameProto: username.RootUserName().EncodeProto(), - DescriptorIDs: descriptorIDs, - StartedMicros: now, - Details: jobspb.WrapPayloadDetails(details), + CreationClusterVersion: clusterVersionAtJobStart, + UsernameProto: username.RootUserName().EncodeProto(), + DescriptorIDs: descriptorIDs, + StartedMicros: now, + Details: jobspb.WrapPayloadDetails(details), }) if err != nil { t.Fatal(err) @@ -1622,6 +1744,7 @@ func TestBackupRestoreResume(t *testing.T) { URI: "nodelocal://0/backup" + "-" + item.testName, }, jobspb.BackupProgress{}, + roachpb.Version{}, ) // If the backup properly took the (incorrect) checkpoint into account, it @@ -1665,6 +1788,9 @@ func TestBackupRestoreResume(t *testing.T) { if err != nil { t.Fatal(err) } + tableStartKey, err := randgen.TestingMakePrimaryIndexKeyForTenant(backupTableDesc, codec) + require.NoError(t, err) + createAndWaitForJob( t, sqlDB, []descpb.ID{restoreTableID}, jobspb.RestoreDetails{ @@ -1678,8 +1804,14 @@ func TestBackupRestoreResume(t *testing.T) { URIs: []string{restoreDir}, }, jobspb.RestoreProgress{ - HighWater: restoreHighWaterMark, + Checkpoint: []jobspb.RestoreProgress_FrontierEntry{ + { + Span: roachpb.Span{Key: tableStartKey, EndKey: restoreHighWaterMark}, + Timestamp: completedSpanTime}, + }, }, + // Required because restore checkpointing is version gated. + clusterversion.ByKey(clusterversion.V23_1Start), ) // If the restore properly took the (incorrect) low-water mark into account, // the first half of the table will be missing. diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 517fb8df5da4..4695a5e7b601 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -90,11 +91,29 @@ func BenchmarkRestoreEntryCover(b *testing.B) { spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + checkpointFrontier, err := loadCheckpointFrontier(backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{}) + require.NoError(b, err) + + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + nil, + introducedSpanFrontier, + 0, + false) + require.NoError(b, err) + g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { defer close(spanCh) - return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false) + return generateAndSendImportSpans( + ctx, + backups[numBackups-1].Spans, + backups, + layerToBackupManifestFileIterFactory, + nil, + filter, + false, + spanCh) }) var cov []execinfrapb.RestoreSpanEntry diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 568bd0fa8314..6246e78141e6 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -281,28 +281,36 @@ func runGenerativeSplitAndScatter( if err != nil { return err } - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime) if err != nil { return err } - backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User()) if err != nil { return err } - + checkpointFrontier, err := loadCheckpointFrontier(spec.Spans, spec.CheckpointedSpans) + if err != nil { + return err + } + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + spec.HighWater, + introducedSpanFrontier, + spec.TargetSize, + spec.UseFrontierCheckpointing) + if err != nil { + return err + } return generateAndSendImportSpans( ctx, spec.Spans, backups, layerToFileIterFactory, backupLocalityMap, - introducedSpanFrontier, - spec.HighWater, - spec.TargetSize, - restoreSpanEntriesCh, + filter, spec.UseSimpleImportSpans, + restoreSpanEntriesCh, ) }) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index ff9bcbac7dde..ad9e5593ad3d 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -70,11 +70,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - pbtypes "github.com/gogo/protobuf/types" ) // restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of @@ -289,10 +287,30 @@ func restore( return emptyRowCount, err } + on231 := clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) + progressTracker, err := makeProgressTracker( + dataToRestore.getSpans(), + job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint, + on231, + restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV)) + if err != nil { + return emptyRowCount, err + } + + progressTracker.mu.Lock() + filter, err := makeSpanCoveringFilter( + progressTracker.mu.checkpointFrontier, + job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater, + introducedSpanFrontier, + targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV), + progressTracker.useFrontier) + progressTracker.mu.Unlock() + if err != nil { + return roachpb.RowCount{}, err + } + // Pivot the backups, which are grouped by time, into requests for import, // which are grouped by keyrange. - highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, backupManifests, encryption, kmsEnv) if err != nil { return roachpb.RowCount{}, err @@ -300,28 +318,6 @@ func restore( simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) - mu := struct { - syncutil.Mutex - highWaterMark int64 - ceiling int64 - res roachpb.RowCount - // As part of job progress tracking, inFlightImportSpans tracks all the - // spans that have been generated are being processed by the processors in - // distRestore. requestsCompleleted tracks the spans from - // inFlightImportSpans that have completed its processing. Once all spans up - // to index N have been processed (and appear in requestsCompleted), then - // any spans with index < N will be removed from both inFlightImportSpans - // and requestsCompleted maps. - inFlightImportSpans map[int64]roachpb.Span - requestsCompleted map[int64]bool - }{ - highWaterMark: -1, - ceiling: 0, - inFlightImportSpans: make(map[int64]roachpb.Span), - requestsCompleted: make(map[int64]bool), - } - - targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000) genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { defer close(spanCh) @@ -331,14 +327,11 @@ func restore( backupManifests, layerToIterFactory, backupLocalityMap, - introducedSpanFrontier, - highWaterMark, - targetSize, - spanCh, + filter, simpleImportSpans, + spanCh, ) } - // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error @@ -356,29 +349,15 @@ func restore( return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block + // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error if dataToRestore.isMainBundle() { // Only update the job progress on the main data bundle. This should account // for the bulk of the data to restore. Other data (e.g. zone configs in - // cluster restores) may be restored first. When restoring that data, we - // don't want to update the high-water mark key, so instead progress is just - // defined on the main data bundle (of which there should only be one). - progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), - func(progressedCtx context.Context, details jobspb.ProgressDetails) { - switch d := details.(type) { - case *jobspb.Progress_Restore: - mu.Lock() - if mu.highWaterMark >= 0 { - d.Restore.HighWater = mu.inFlightImportSpans[mu.highWaterMark].Key - } - mu.Unlock() - default: - log.Errorf(progressedCtx, "job payload had unexpected type %T", d) - } - }) + // cluster restores) may be restored first. + progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), progressTracker.updateJobCallback) jobProgressLoop := func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log") @@ -387,54 +366,20 @@ func restore( } tasks = append(tasks, jobProgressLoop) } - + if !progressTracker.useFrontier { + // This goroutine feeds the deprecated high water mark variant of the + // generativeCheckpointLoop. + tasks = append(tasks, func(ctx context.Context) error { + return genSpan(ctx, progressTracker.inFlightSpanFeeder) + }) + } progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - generativeCheckpointLoop := func(ctx context.Context) error { defer close(requestFinishedCh) for progress := range progCh { - mu.Lock() - var progDetails backuppb.RestoreProgress - if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { - log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) - } - - mu.res.Add(progDetails.Summary) - idx := progDetails.ProgressIdx - - if idx >= mu.ceiling { - for i := mu.ceiling; i <= idx; i++ { - importSpan, ok := <-importSpanCh - if !ok { - // The channel has been closed, there is nothing left to do. - log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") - return nil - } - mu.inFlightImportSpans[i] = importSpan.Span - } - mu.ceiling = idx + 1 - } - - if sp, ok := mu.inFlightImportSpans[idx]; ok { - // Assert that we're actually marking the correct span done. See #23977. - if !sp.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, sp, - ) - } - mu.requestsCompleted[idx] = true - prevHighWater := mu.highWaterMark - for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j - } - for j := prevHighWater; j < mu.highWaterMark; j++ { - delete(mu.requestsCompleted, j) - delete(mu.inFlightImportSpans, j) - } + if err := progressTracker.ingestUpdate(ctx, progress); err != nil { + return err } - mu.Unlock() - // Signal that the processor has finished importing a span, to update job // progress. requestFinishedCh <- struct{}{} @@ -442,27 +387,19 @@ func restore( return nil } tasks = append(tasks, generativeCheckpointLoop) - tasks = append(tasks, func(ctx context.Context) error { - return genSpan(ctx, importSpanCh) - }) runRestore := func(ctx context.Context) error { return distRestore( ctx, execCtx, int64(job.ID()), - dataToRestore.getPKIDs(), + dataToRestore, + endTime, encryption, kmsEnv, - dataToRestore.getRekeys(), - dataToRestore.getTenantRekeys(), - endTime, - dataToRestore.isValidateOnly(), details.URIs, - dataToRestore.getSpans(), backupLocalityInfo, - highWaterMark, - targetSize, + filter, numNodes, numImportSpans, simpleImportSpans, @@ -477,8 +414,10 @@ func restore( // TODO(dan): Build tooling to allow a user to restart a failed restore. return emptyRowCount, errors.Wrapf(err, "importing %d ranges", numImportSpans) } - - return mu.res, nil + // progress go routines should be shutdown, but use lock just to be safe. + progressTracker.mu.Lock() + defer progressTracker.mu.Unlock() + return progressTracker.mu.res, nil } // loadBackupSQLDescs extracts the backup descriptors, the latest backup @@ -1894,6 +1833,10 @@ func (r *restoreResumer) validateJobIsResumable(execConfig *sql.ExecutorConfig) // Validate that we aren't in the middle of an upgrade. To avoid unforseen // issues, we want to avoid full cluster restores if it is possible that an // upgrade is in progress. We also check this during planning. + // + // Note: If the cluster began in a mixed version state, + // the CreationClusterVersion may still be equal to binaryVersion, + // which means the cluster restore will proceed. creationClusterVersion := r.job.Payload().CreationClusterVersion binaryVersion := execConfig.Settings.Version.BinaryVersion() isClusterRestore := details.DescriptorCoverage == tree.AllDescriptors diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 20da5a60f4ea..22fd790b8d23 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" @@ -62,18 +61,13 @@ func distRestore( ctx context.Context, execCtx sql.JobExecContext, jobID int64, - pkIDs map[uint64]bool, + dataToRestore restorationData, + restoreTime hlc.Timestamp, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, - tableRekeys []execinfrapb.TableRekey, - tenantRekeys []execinfrapb.TenantRekey, - restoreTime hlc.Timestamp, - validateOnly bool, uris []string, - requiredSpans []roachpb.Span, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, - lowWaterMark roachpb.Key, - targetSize int64, + spanFilter spanCoveringFilter, numNodes int, numImportSpans int, useSimpleImportSpans bool, @@ -120,10 +114,10 @@ func distRestore( JobID: jobID, RestoreTime: restoreTime, Encryption: fileEncryption, - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - PKIDs: pkIDs, - ValidateOnly: validateOnly, + TableRekeys: dataToRestore.getRekeys(), + TenantRekeys: dataToRestore.getTenantRekeys(), + PKIDs: dataToRestore.getPKIDs(), + ValidateOnly: dataToRestore.isValidateOnly(), } // Plan SplitAndScatter in a round-robin fashion. @@ -174,22 +168,26 @@ func distRestore( id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID() spec := &execinfrapb.GenerativeSplitAndScatterSpec{ - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - ValidateOnly: validateOnly, - URIs: uris, - Encryption: encryption, - EndTime: restoreTime, - Spans: requiredSpans, - BackupLocalityInfo: backupLocalityInfo, - HighWater: lowWaterMark, - UserProto: execCtx.User().EncodeProto(), - TargetSize: targetSize, - ChunkSize: int64(chunkSize), - NumEntries: int64(numImportSpans), - NumNodes: int64(numNodes), - UseSimpleImportSpans: useSimpleImportSpans, - JobID: jobID, + TableRekeys: dataToRestore.getRekeys(), + TenantRekeys: dataToRestore.getTenantRekeys(), + ValidateOnly: dataToRestore.isValidateOnly(), + URIs: uris, + Encryption: encryption, + EndTime: restoreTime, + Spans: dataToRestore.getSpans(), + BackupLocalityInfo: backupLocalityInfo, + HighWater: spanFilter.highWaterMark, + UserProto: execCtx.User().EncodeProto(), + TargetSize: spanFilter.targetSize, + ChunkSize: int64(chunkSize), + NumEntries: int64(numImportSpans), + NumNodes: int64(numNodes), + UseSimpleImportSpans: useSimpleImportSpans, + UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing, + JobID: jobID, + } + if spanFilter.useFrontierCheckpointing { + spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0) } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_progress.go b/pkg/ccl/backupccl/restore_progress.go new file mode 100644 index 000000000000..dae900d1501b --- /dev/null +++ b/pkg/ccl/backupccl/restore_progress.go @@ -0,0 +1,281 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + pbtypes "github.com/gogo/protobuf/types" +) + +// restoreCheckpointMaxBytes controls the maximum number of key bytes that will be added +// to the checkpoint record. The default is set using the same reasoning as +// changefeed.frontier_checkpoint_max_bytes. +var restoreCheckpointMaxBytes = settings.RegisterByteSizeSetting( + settings.TenantWritable, + "restore.frontier_checkpoint_max_bytes", + "controls the maximum size of the restore checkpoint frontier as a the sum of the (span,"+ + "timestamp) tuples", + 1<<20, // 1 MiB +) + +// completedSpanTime indicates the timestamp that the progress frontier will +// mark completed spans with. +var completedSpanTime = hlc.MaxTimestamp + +type progressTracker struct { + // nextRequiredSpanKey maps a required span endkey to the subsequent requiredSpan's startKey. + nextRequiredSpanKey map[string]roachpb.Key + + maxBytes int64 + + mu struct { + // fields that may get updated while read are put in the lock. + syncutil.Mutex + + checkpointFrontier *spanUtils.Frontier + + // res tracks the amount of data that has been ingested. + res roachpb.RowCount + + // Note that the fields below are used for the deprecated high watermark progress + // tracker. + // highWaterMark represents the index into the requestsCompleted map. + highWaterMark int64 + ceiling int64 + + // As part of job progress tracking, inFlightImportSpans tracks all the + // spans that have been generated are being processed by the processors in + // distRestore. requestsCompleleted tracks the spans from + // inFlightImportSpans that have completed its processing. Once all spans up + // to index N have been processed (and appear in requestsCompleted), then + // any spans with index < N will be removed from both inFlightImportSpans + // and requestsCompleted maps. + inFlightImportSpans map[int64]roachpb.Span + requestsCompleted map[int64]bool + } + useFrontier bool + inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry +} + +func makeProgressTracker( + requiredSpans roachpb.Spans, + persistedSpans []jobspb.RestoreProgress_FrontierEntry, + useFrontier bool, + maxBytes int64, +) (*progressTracker, error) { + + var ( + checkpointFrontier *spanUtils.Frontier + err error + nextRequiredSpanKey map[string]roachpb.Key + inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry + ) + if useFrontier { + checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) + if err != nil { + return nil, err + } + nextRequiredSpanKey = make(map[string]roachpb.Key) + for i := 0; i < len(requiredSpans)-1; i++ { + nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key + } + + } else { + inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000) + } + + pt := &progressTracker{} + pt.mu.checkpointFrontier = checkpointFrontier + pt.mu.highWaterMark = -1 + pt.mu.ceiling = 0 + pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span) + pt.mu.requestsCompleted = make(map[int64]bool) + pt.nextRequiredSpanKey = nextRequiredSpanKey + pt.maxBytes = maxBytes + pt.useFrontier = useFrontier + pt.inFlightSpanFeeder = inFlightSpanFeeder + return pt, nil +} + +func loadCheckpointFrontier( + requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry, +) (*spanUtils.Frontier, error) { + numRequiredSpans := len(requiredSpans) - 1 + contiguousSpan := roachpb.Span{ + Key: requiredSpans[0].Key, + EndKey: requiredSpans[numRequiredSpans].EndKey, + } + checkpointFrontier, err := spanUtils.MakeFrontier(contiguousSpan) + if err != nil { + return nil, err + } + for _, sp := range persistedSpans { + _, err = checkpointFrontier.Forward(sp.Span, sp.Timestamp) + if err != nil { + return nil, err + } + } + return checkpointFrontier, err +} + +// persistFrontier converts a span frontier into a list of (span, timestamp) +// tuples that can persist to disk. If the user passes a nonzero maxBytes, the +// first N spans in the frontier that remain below the maxBytes memory limit +// will return. +func persistFrontier( + frontier *spanUtils.Frontier, maxBytes int64, +) []jobspb.RestoreProgress_FrontierEntry { + var used int64 + completedSpansSlice := make([]jobspb.RestoreProgress_FrontierEntry, 0) + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { + if ts.Equal(completedSpanTime) { + + // Persist the first N spans in the frontier that remain below the memory limit. + used += int64(len(sp.Key) + len(sp.EndKey) + ts.Size()) + if maxBytes != 0 && used > maxBytes { + return spanUtils.StopMatch + } + // TODO (msbutler): we may want to persist spans that have been + // restored up to a certain system time, if on resume, we build + // facilities in the generative split and scatter processor to + // create a restore span entry with files from a minimum + // timestamp. + completedSpansSlice = append(completedSpansSlice, + jobspb.RestoreProgress_FrontierEntry{Span: sp, Timestamp: ts}) + } + return spanUtils.ContinueMatch + }) + return completedSpansSlice +} + +func (pt *progressTracker) updateJobCallback( + progressedCtx context.Context, progressDetails jobspb.ProgressDetails, +) { + switch d := progressDetails.(type) { + case *jobspb.Progress_Restore: + pt.mu.Lock() + if pt.useFrontier { + // TODO (msbutler): this requires iterating over every span in the frontier, + // and rewriting every completed required span to disk. + // We may want to be more intelligent about this. + d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) + } else { + if pt.mu.highWaterMark >= 0 { + d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key + } + } + pt.mu.Unlock() + default: + log.Errorf(progressedCtx, "job payload had unexpected type %T", d) + } +} + +// ingestUpdate updates the progressTracker after a progress update returns from +// the distributed processors. +func (pt *progressTracker) ingestUpdate( + ctx context.Context, rawProgress *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, +) error { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&rawProgress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } + pt.mu.Lock() + defer pt.mu.Unlock() + pt.mu.res.Add(progDetails.Summary) + if pt.useFrontier { + updateSpan := progDetails.DataSpan.Clone() + // If the completedSpan has the same end key as a requiredSpan_i, forward + // the frontier for the span [completedSpan_startKey, + // requiredSpan_i+1_startKey]. This trick ensures the span frontier will + // contain a single entry when the restore completes. Recall that requiredSpans are + // disjoint, and a spanFrontier never merges disjoint spans. So, without + // this trick, the spanFrontier will have O(requiredSpans) entries when the + // restore completes. This trick ensures all spans persisted to the frontier are adjacent, + // and consequently, will eventually merge. + // + // Here's a visual example: + // - this restore has two required spans: [a,d) and [e,h). + // - the restore span entry [c,d) just completed, implying the frontier logically looks like: + // + // tC| x---o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - since [c,d)'s endkey equals the required span (a,d]'s endkey, + // also update the gap between required span 1 and 2 in the frontier: + // + // tC| x-------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - this will ensure that when all subspans in required spans 1 and 2 complete, + // the checkpoint frontier has one span: + // + // tC| x---------------------------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { + updateSpan.EndKey = newEndKey + } + if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { + return err + } + } else { + idx := progDetails.ProgressIdx + + if idx >= pt.mu.ceiling { + for i := pt.mu.ceiling; i <= idx; i++ { + importSpan, ok := <-pt.inFlightSpanFeeder + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } + pt.mu.inFlightImportSpans[i] = importSpan.Span + } + pt.mu.ceiling = idx + 1 + } + + if sp, ok := pt.mu.inFlightImportSpans[idx]; ok { + // Assert that we're actually marking the correct span done. See #23977. + if !sp.Key.Equal(progDetails.DataSpan.Key) { + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, sp, + ) + } + pt.mu.requestsCompleted[idx] = true + prevHighWater := pt.mu.highWaterMark + for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ { + pt.mu.highWaterMark = j + } + for j := prevHighWater; j < pt.mu.highWaterMark; j++ { + delete(pt.mu.requestsCompleted, j) + delete(pt.mu.inFlightImportSpans, j) + } + } + } + return nil +} diff --git a/pkg/ccl/backupccl/restore_progress_test.go b/pkg/ccl/backupccl/restore_progress_test.go new file mode 100644 index 000000000000..12b88ade5de8 --- /dev/null +++ b/pkg/ccl/backupccl/restore_progress_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + gogotypes "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/require" +) + +// TestProgressTracker tests the lifecycle of the checkpoint frontier by testing the following +// over a sequence of updates on a mock set of required spans: +// - loading the persisted frontier into memory +// - updating to the in memory frontier +// - persisting the in memory frontier +func TestProgressTracker(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + + requiredSpans := []roachpb.Span{c.sp("a", "e"), c.sp("f", "i")} + + mockUpdate := func(sp roachpb.Span) *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress { + restoreProgress := backuppb.RestoreProgress{DataSpan: sp} + details, err := gogotypes.MarshalAny(&restoreProgress) + require.NoError(t, err) + return &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{ProgressDetails: *details} + } + + pSp := func(spans ...roachpb.Span) []jobspb.RestoreProgress_FrontierEntry { + pSpans := make([]jobspb.RestoreProgress_FrontierEntry, 0) + for _, sp := range spans { + pSpans = append(pSpans, jobspb.RestoreProgress_FrontierEntry{Span: sp, Timestamp: completedSpanTime}) + } + return pSpans + } + + // testStep characterizes an update to the span frontier and the expected + // persisted spans after the update. + type testStep struct { + update roachpb.Span + expectedPersisted []jobspb.RestoreProgress_FrontierEntry + } + + // Each test step builds on the persistedSpan. + persistedSpans := make([]jobspb.RestoreProgress_FrontierEntry, 0) + for _, step := range []testStep{ + { + update: c.sp("a", "c"), + expectedPersisted: pSp(c.sp("a", "c")), + }, + { + // Last update in first required span should extend the persisted end key + // to the start key of the second required span. + update: c.sp("c", "e"), + expectedPersisted: pSp(c.sp("a", "f")), + }, + { + update: c.sp("h", "i"), + expectedPersisted: pSp(c.sp("a", "f"), c.sp("h", "i")), + }, + { + // After both required spans completed, only one persisted span should exist. + update: c.sp("f", "h"), + expectedPersisted: pSp(c.sp("a", "i")), + }, + } { + pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0) + require.NoError(t, err) + + require.NoError(t, pt.ingestUpdate(ctx, mockUpdate(step.update))) + + persistedSpans = persistFrontier(pt.mu.checkpointFrontier, 0) + require.Equal(t, step.expectedPersisted, persistedSpans) + } +} diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 9dabb45df1e6..206e17f0fae3 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -307,6 +307,96 @@ func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrap } } +// spanCoveringFilter holds metadata that filters which backups and required spans are used to +// populate a restoreSpanEntry +type spanCoveringFilter struct { + checkpointFrontier *spanUtils.Frontier + highWaterMark roachpb.Key + introducedSpanFrontier *spanUtils.Frontier + useFrontierCheckpointing bool + targetSize int64 +} + +func makeSpanCoveringFilter( + checkpointFrontier *spanUtils.Frontier, + highWater roachpb.Key, + introducedSpanFrontier *spanUtils.Frontier, + targetSize int64, + useFrontierCheckpointing bool, +) (spanCoveringFilter, error) { + sh := spanCoveringFilter{ + introducedSpanFrontier: introducedSpanFrontier, + targetSize: targetSize, + highWaterMark: highWater, + useFrontierCheckpointing: useFrontierCheckpointing, + checkpointFrontier: checkpointFrontier, + } + return sh, nil +} + +// filterCompleted returns the subspans of the requiredSpan that still need to be +// restored. +func (f spanCoveringFilter) filterCompleted(requiredSpan roachpb.Span) roachpb.Spans { + if f.useFrontierCheckpointing { + return f.findToDoSpans(requiredSpan) + } + if requiredSpan.EndKey.Compare(f.highWaterMark) <= 0 { + return roachpb.Spans{} + } + if requiredSpan.Key.Compare(f.highWaterMark) < 0 { + requiredSpan.Key = f.highWaterMark + } + return []roachpb.Span{requiredSpan} +} + +// findToDoSpans returns the sub spans within the required span that have not completed. +func (f spanCoveringFilter) findToDoSpans(requiredSpan roachpb.Span) roachpb.Spans { + toDoSpans := make(roachpb.Spans, 0) + f.checkpointFrontier.SpanEntries(requiredSpan, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if !ts.Equal(completedSpanTime) { + toDoSpans = append(toDoSpans, s) + } + return spanUtils.ContinueMatch + }) + return toDoSpans +} + +// getLayersCoveredLater creates a map which indicates which backup layers introduced the +// given span. +func (f spanCoveringFilter) getLayersCoveredLater( + span roachpb.Span, backups []backuppb.BackupManifest, +) map[int]bool { + layersCoveredLater := make(map[int]bool) + for layer := range backups { + var coveredLater bool + f.introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if backups[layer].EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + }) + if coveredLater { + // Don't use this backup to cover this span if the span was reintroduced + // after the backup's endTime. In this case, this backup may have + // invalid data, and further, a subsequent backup will contain all of + // this span's data. Consider the following example: + // + // T0: Begin IMPORT INTO on existing table foo, ingest some data + // T1: Backup foo + // T2: Rollback IMPORT via clearRange + // T3: Incremental backup of foo, with a full reintroduction of foo’s span + // T4: RESTORE foo: should only restore foo from the incremental backup. + // If data from the full backup were also restored, + // the imported-but-then-clearRanged data will leak in the restored cluster. + // This logic seeks to avoid this form of data corruption. + layersCoveredLater[layer] = true + } + } + return layersCoveredLater +} + // generateAndSendImportSpans partitions the spans of requiredSpans into a // covering of RestoreSpanEntry's which each have all overlapping files from the // passed backups assigned to them. The spans of requiredSpans are @@ -354,18 +444,18 @@ func generateAndSendImportSpans( backups []backuppb.BackupManifest, layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, - introducedSpanFrontier *spanUtils.Frontier, - lowWaterMark roachpb.Key, - targetSize int64, - spanCh chan execinfrapb.RestoreSpanEntry, + filter spanCoveringFilter, useSimpleImportSpans bool, + spanCh chan execinfrapb.RestoreSpanEntry, ) error { + if useSimpleImportSpans { - importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, + layerToBackupManifestFileIterFactory, backupLocalityMap, filter.introducedSpanFrontier, + filter.highWaterMark, filter.targetSize) if err != nil { return err } - for _, sp := range importSpans { spanCh <- sp } @@ -398,7 +488,6 @@ func generateAndSendImportSpans( entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } - for layer := range covFilesByLayer { for _, f := range covFilesByLayer[layer] { fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} @@ -408,7 +497,6 @@ func generateAndSendImportSpans( entry.Files = append(entry.Files, fileSpec) } } - if len(entry.Files) > 0 { select { case <-ctx.Done(): @@ -420,137 +508,105 @@ func generateAndSendImportSpans( return nil } - for _, span := range requiredSpans { - firstInSpan = true - if span.EndKey.Compare(lowWaterMark) < 0 { - continue - } - if span.Key.Compare(lowWaterMark) < 0 { - span.Key = lowWaterMark - } - - layersCoveredLater := make(map[int]bool) - for layer := range backups { - var coveredLater bool - introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, - ts hlc.Timestamp) (done spanUtils.OpResult) { - if backups[layer].EndTime.Less(ts) { - coveredLater = true + for _, requiredSpan := range requiredSpans { + filteredSpans := filter.filterCompleted(requiredSpan) + for _, span := range filteredSpans { + firstInSpan = true + layersCoveredLater := filter.getLayersCoveredLater(span, backups) + for { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + return err + } + break } - return spanUtils.StopMatch - }) - if coveredLater { - // Don't use this backup to cover this span if the span was reintroduced - // after the backup's endTime. In this case, this backup may have - // invalid data, and further, a subsequent backup will contain all of - // this span's data. Consider the following example: - // - // T0: Begin IMPORT INTO on existing table foo, ingest some data - // T1: Backup foo - // T2: Rollback IMPORT via clearRange - // T3: Incremental backup of foo, with a full reintroduction of foo’s span - // T4: RESTORE foo: should only restore foo from the incremental backup. - // If data from the full backup were also restored, - // the imported-but-then-clearRanged data will leak in the restored cluster. - // This logic seeks to avoid this form of data corruption. - layersCoveredLater[layer] = true - } - } - for { - if ok, err := startEndKeyIt.valid(); !ok { - if err != nil { - return err + key := startEndKeyIt.value() + if span.Key.Compare(key) >= 0 { + startEndKeyIt.next() + continue } - break - } - key := startEndKeyIt.value() - if span.Key.Compare(key) >= 0 { - startEndKeyIt.next() - continue - } - - var coverSpan roachpb.Span - if firstInSpan { - coverSpan.Key = span.Key - } else { - coverSpan.Key = lastCovSpan.EndKey - } + var coverSpan roachpb.Span + if firstInSpan { + coverSpan.Key = span.Key + } else { + coverSpan.Key = lastCovSpan.EndKey + } - if span.ContainsKey(key) { - coverSpan.EndKey = startEndKeyIt.value() - } else { - coverSpan.EndKey = span.EndKey - } + if span.ContainsKey(key) { + coverSpan.EndKey = startEndKeyIt.value() + } else { + coverSpan.EndKey = span.EndKey + } - newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) - if err != nil { - return err - } + newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) + if err != nil { + return err + } - var filesByLayer [][]*backuppb.BackupManifest_File - var covSize int64 - var newCovFilesSize int64 + var filesByLayer [][]*backuppb.BackupManifest_File + var covSize int64 + var newCovFilesSize int64 - for layer := range newFilesByLayer { - for _, file := range newFilesByLayer[layer] { - sz := file.EntryCounts.DataSize - if sz == 0 { - sz = 16 << 20 + for layer := range newFilesByLayer { + for _, file := range newFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + newCovFilesSize += sz } - newCovFilesSize += sz + filesByLayer = append(filesByLayer, newFilesByLayer[layer]) } - filesByLayer = append(filesByLayer, newFilesByLayer[layer]) - } - for layer := range covFilesByLayer { - for _, file := range covFilesByLayer[layer] { - sz := file.EntryCounts.DataSize - if sz == 0 { - sz = 16 << 20 - } + for layer := range covFilesByLayer { + for _, file := range covFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } - if coverSpan.Overlaps(file.Span) { - covSize += sz - filesByLayer[layer] = append(filesByLayer[layer], file) + if coverSpan.Overlaps(file.Span) { + covSize += sz + filesByLayer[layer] = append(filesByLayer[layer], file) + } } } - } - if covFilesByLayer == nil { - covFilesByLayer = newFilesByLayer - lastCovSpan = coverSpan - lastCovSpanSize = newCovFilesSize - } else { - if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= targetSize) && !firstInSpan { - // If there are no new files that cover this span or if we can add the - // files in the new span's cover to the last span's cover and still stay - // below targetSize, then we should merge the two spans. - for layer := range newFilesByLayer { - covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) - } - lastCovSpan.EndKey = coverSpan.EndKey - lastCovSpanSize = lastCovSpanSize + newCovFilesSize + if covFilesByLayer == nil { + covFilesByLayer = newFilesByLayer + lastCovSpan = coverSpan + lastCovSpanSize = newCovFilesSize } else { - if err := flush(ctx); err != nil { - return err + if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= filter.targetSize) && !firstInSpan { + // If there are no new files that cover this span or if we can add the + // files in the new span's cover to the last span's cover and still stay + // below targetSize, then we should merge the two spans. + for layer := range newFilesByLayer { + covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) + } + lastCovSpan.EndKey = coverSpan.EndKey + lastCovSpanSize = lastCovSpanSize + newCovFilesSize + } else { + if err := flush(ctx); err != nil { + return err + } + lastCovSpan = coverSpan + covFilesByLayer = filesByLayer + lastCovSpanSize = covSize } - lastCovSpan = coverSpan - covFilesByLayer = filesByLayer - lastCovSpanSize = covSize } - } - firstInSpan = false + firstInSpan = false - if lastCovSpan.EndKey.Equal(span.EndKey) { - break - } + if lastCovSpan.EndKey.Equal(span.EndKey) { + break + } - startEndKeyIt.next() + startEndKeyIt.next() + } } } - return flush(ctx) } diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index dda2ab42b6c8..aa936dd316fd 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -249,6 +250,7 @@ func makeImportSpans( layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targetSize int64, introducedSpanFrontier *spanUtils.Frontier, + completedSpans []jobspb.RestoreProgress_FrontierEntry, useSimpleImportSpans bool, ) ([]execinfrapb.RestoreSpanEntry, error) { var cover []execinfrapb.RestoreSpanEntry @@ -261,7 +263,30 @@ func makeImportSpans( return nil }) - err := generateAndSendImportSpans(ctx, spans, backups, layerToIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + checkpointFrontier, err := loadCheckpointFrontier(spans, completedSpans) + if err != nil { + return nil, err + } + + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + nil, + introducedSpanFrontier, + targetSize, + true) + if err != nil { + return nil, err + } + + err = generateAndSendImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + nil, + filter, + useSimpleImportSpans, + spanCh) close(spanCh) if err != nil { @@ -349,43 +374,117 @@ func TestRestoreEntryCoverExample(t *testing.T) { layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, - {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, - {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, - {Span: c.sp("f", "g"), Files: c.paths("6")}, - {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, cover) - - coverSized, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, 2<<20, emptySpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, - {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, - {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, - {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, coverSized) - - // check that introduced spans are properly elided - backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) - require.NoError(t, err) - coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, introducedSpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "f"), Files: c.paths("6")}, - {Span: c.sp("f", "g"), Files: c.paths("6")}, - {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, coverIntroduced) + emptyCompletedSpans := []jobspb.RestoreProgress_FrontierEntry{} + + type simpleRestoreSpanEntry struct { + span roachpb.Span + paths []string + } + reduce := func(entries []execinfrapb.RestoreSpanEntry) []simpleRestoreSpanEntry { + reduced := make([]simpleRestoreSpanEntry, len(entries)) + for i := range entries { + reduced[i].span = entries[i].Span + reduced[i].paths = make([]string, len(entries[i].Files)) + for j := range entries[i].Files { + reduced[i].paths[j] = entries[i].Files[j].Path + } + } + return reduced + } + t.Run("base", func(t *testing.T) { + cover, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(cover)) + }) + + t.Run("target-size", func(t *testing.T) { + coverSized, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + 2<<20, + emptySpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverSized)) + }) + + t.Run("introduced-spans", func(t *testing.T) { + backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + coverIntroduced, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + introducedSpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverIntroduced)) + }) + t.Run("completed-spans", func(t *testing.T) { + + completedSpans := []roachpb.Span{ + // log some progress on part of a restoreSpanEntry. + c.sp("b", "c"), + + // Log some progress over multiple restoreSpanEntries. + c.sp("g", "i")} + + frontier, err := spanUtils.MakeFrontierAt(completedSpanTime, completedSpans...) + require.NoError(t, err) + coverIntroduced, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + persistFrontier(frontier, 0), + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverIntroduced)) + }) } func TestFileSpanStartKeyIterator(t *testing.T) { @@ -499,6 +598,56 @@ func TestFileSpanStartKeyIterator(t *testing.T) { } } +// TestCheckpointFilter ensures the filterCompleted( ) function properly splits +// a required span into remaining toDo spans. +func TestCheckpointFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + + requiredSpan := c.sp("b", "e") + + type testCase struct { + completedSpans roachpb.Spans + expectedToDoSpans roachpb.Spans + } + + for _, tc := range []testCase{ + { + completedSpans: roachpb.Spans{c.sp("a", "c")}, + expectedToDoSpans: roachpb.Spans{c.sp("c", "e")}, + }, + { + completedSpans: roachpb.Spans{c.sp("c", "d")}, + expectedToDoSpans: roachpb.Spans{c.sp("b", "c"), c.sp("d", "e")}, + }, + { + completedSpans: roachpb.Spans{c.sp("a", "c"), c.sp("d", "e")}, + expectedToDoSpans: roachpb.Spans{c.sp("c", "d")}, + }, + } { + frontier, err := spanUtils.MakeFrontier(requiredSpan) + require.NoError(t, err) + for i := range tc.completedSpans { + _, err := frontier.Forward(tc.completedSpans[i], completedSpanTime) + require.NoError(t, err) + } + + f, err := makeSpanCoveringFilter( + frontier, + nil, + nil, + 0, + true) + require.NoError(t, err) + require.Equal(t, tc.expectedToDoSpans, f.filterCompleted(requiredSpan)) + } +} + type mockBackupInfo struct { // tableIDs identifies the tables included in the backup. tableIDs []int @@ -705,8 +854,15 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToIterFactory, - 0, introducedSpanFrontier, false) + cover, err := makeImportSpans( + ctx, + restoreSpans, + backups, + layerToIterFactory, + 0, + introducedSpanFrontier, + []jobspb.RestoreProgress_FrontierEntry{}, + false) require.NoError(t, err) for _, reIntroTable := range reIntroducedTables { @@ -781,8 +937,15 @@ func TestRestoreEntryCover(t *testing.T) { numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + cover, err := makeImportSpans( + ctx, + backups[numBackups-1].Spans, + backups, + layerToIterFactory, + target<<20, + introducedSpanFrontier, + []jobspb.RestoreProgress_FrontierEntry{}, + simpleImportSpans) require.NoError(t, err) require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index db7ea46ecc0d..d866dff30974 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -469,6 +469,13 @@ message RestoreDetails { message RestoreProgress { bytes high_water = 1; + + message FrontierEntry { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + } + + repeated FrontierEntry checkpoint = 2 [(gogoproto.nullable) = false]; } message ImportDetails { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index c90ccd5ed853..761ca768c35d 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -420,6 +420,8 @@ message GenerativeSplitAndScatterSpec { optional int64 num_nodes = 17[(gogoproto.nullable) = false]; optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; optional bool use_simple_import_spans = 19 [(gogoproto.nullable) = false]; + optional bool use_frontier_checkpointing = 20 [(gogoproto.nullable) = false]; + repeated jobs.jobspb.RestoreProgress.FrontierEntry checkpointed_spans = 21 [(gogoproto.nullable) = false]; } diff --git a/pkg/util/span/frontier.go b/pkg/util/span/frontier.go index 7d2b7b2b9241..b4d426783aa0 100644 --- a/pkg/util/span/frontier.go +++ b/pkg/util/span/frontier.go @@ -426,7 +426,7 @@ func (f *Frontier) Entries(fn Operation) { // // ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1). // -// Note: neither [a-b) nor [m, q) will be emitted since they fall outside the spans +// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans // tracked by this frontier. func (f *Frontier) SpanEntries(span roachpb.Span, op Operation) { f.Lock() diff --git a/pkg/util/span/frontier_test.go b/pkg/util/span/frontier_test.go index e31e651e0904..82c71cf260f7 100644 --- a/pkg/util/span/frontier_test.go +++ b/pkg/util/span/frontier_test.go @@ -314,16 +314,12 @@ func TestSpanEntries(t *testing.T) { return roachpb.Span{Key: key(start), EndKey: key(end)} } - spAZ := mkspan('A', 'Z') - f, err := MakeFrontier(spAZ) - require.NoError(t, err) - - advance := func(s roachpb.Span, wall int64) { + advance := func(f *Frontier, s roachpb.Span, wall int64) { _, err := f.Forward(s, hlc.Timestamp{WallTime: wall}) require.NoError(t, err) } - spanEntries := func(sp roachpb.Span) string { + spanEntries := func(f *Frontier, sp roachpb.Span) string { var buf strings.Builder f.SpanEntries(sp, func(s roachpb.Span, ts hlc.Timestamp) OpResult { if buf.Len() != 0 { @@ -334,35 +330,52 @@ func TestSpanEntries(t *testing.T) { }) return buf.String() } + t.Run("contiguous frontier", func(t *testing.T) { + spAZ := mkspan('A', 'Z') + f, err := MakeFrontier(spAZ) + require.NoError(t, err) + // Nothing overlaps span fully to the left of frontier. + require.Equal(t, ``, spanEntries(f, mkspan('0', '9'))) + // Nothing overlaps span fully to the right of the frontier. + require.Equal(t, ``, spanEntries(f, mkspan('a', 'z'))) + + // Span overlaps entire frontier. + require.Equal(t, `{A-Z}@0`, spanEntries(f, spAZ)) + advance(f, spAZ, 1) + require.Equal(t, `{A-Z}@1`, spanEntries(f, spAZ)) + + // Span overlaps part of the frontier, with left part outside frontier. + require.Equal(t, `{A-C}@1`, spanEntries(f, mkspan('0', 'C'))) + + // Span overlaps part of the frontier, with right part outside frontier. + require.Equal(t, `{Q-Z}@1`, spanEntries(f, mkspan('Q', 'c'))) + + // Span fully inside frontier. + require.Equal(t, `{P-W}@1`, spanEntries(f, mkspan('P', 'W'))) + + // Advance part of the frontier. + advance(f, mkspan('C', 'E'), 2) + advance(f, mkspan('H', 'M'), 5) + advance(f, mkspan('N', 'Q'), 3) + + // Span overlaps various parts of the frontier. + require.Equal(t, + `{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`, + spanEntries(f, mkspan('3', 'P'))) + }) - // Nothing overlaps span fully to the left of frontier. - require.Equal(t, ``, spanEntries(mkspan('0', '9'))) - // Nothing overlaps span fully to the right of the frontier. - require.Equal(t, ``, spanEntries(mkspan('a', 'z'))) - - // Span overlaps entire frontier. - require.Equal(t, `{A-Z}@0`, spanEntries(spAZ)) - advance(spAZ, 1) - require.Equal(t, `{A-Z}@1`, spanEntries(spAZ)) - - // Span overlaps part of the frontier, with left part outside frontier. - require.Equal(t, `{A-C}@1`, spanEntries(mkspan('0', 'C'))) - - // Span overlaps part of the frontier, with right part outside frontier. - require.Equal(t, `{Q-Z}@1`, spanEntries(mkspan('Q', 'c'))) - - // Span fully inside frontier. - require.Equal(t, `{P-W}@1`, spanEntries(mkspan('P', 'W'))) + t.Run("disjoint frontier", func(t *testing.T) { + spAB := mkspan('A', 'B') + spCE := mkspan('C', 'E') + f, err := MakeFrontier(spAB, spCE) + require.NoError(t, err) - // Advance part of the frontier. - advance(mkspan('C', 'E'), 2) - advance(mkspan('H', 'M'), 5) - advance(mkspan('N', 'Q'), 3) + // Nothing overlaps between the two spans in the frontier. + require.Equal(t, ``, spanEntries(f, mkspan('B', 'C'))) - // Span overlaps various parts of the frontier. - require.Equal(t, - `{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`, - spanEntries(mkspan('3', 'P'))) + // Overlap with only one entry in the frontier + require.Equal(t, `{C-D}@0`, spanEntries(f, mkspan('B', 'D'))) + }) } // symbols that can make up spans.