From 2ea46c825eda83accacd35f85faff783941a6441 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 23 Feb 2023 15:11:18 -0500 Subject: [PATCH] backupccl: checkpoint restore progress with a span frontier Previously, after a node restart or pause event, a resumed restore would have to redo a significant amount of work due to our naive progress checkpointing procedure described in #87843. This patch upgrades our checkpointing procedure to use a span frontier, significantly reducing wasted work. Here's the basic idea: 1. Create a span frontier behind a lock on the coordinator. 2. When the coordinator receives a progress update about a span that was ingested, forward that span's time stamp in the frontier to the hardcoded `completedSpanTime`. 3. The frontier is periodically persisted to the job record every 15 seconds or after 5% progress. Note, only the first N spans are persisted, to bound the amount of data written to the jobs table. The bound is set by the new restore.frontier_checkpoint_max_bytes private setting. 4. If the job is resumed after some progress was persisted, the generative split and scatter processor will skip the completed spans in the persisted frontier. Note that if a part of requiredSpan was complete, only the subset that is incomplete will get processed into a restoreSpanEntry. As part of this work, many of the function signatures in the restore codebase were refactored to prevent a large number input parameters. Further, much of the restore checkpointing code was moved to the seperate restore_progress.go file. Fixes #87843 Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 2 + pkg/ccl/backupccl/backup_test.go | 142 ++++++++- pkg/ccl/backupccl/bench_covering_test.go | 23 +- .../generative_split_and_scatter_processor.go | 22 +- pkg/ccl/backupccl/restore_job.go | 151 +++------ .../backupccl/restore_processor_planning.go | 56 ++-- pkg/ccl/backupccl/restore_progress.go | 281 +++++++++++++++++ pkg/ccl/backupccl/restore_progress_test.go | 96 ++++++ pkg/ccl/backupccl/restore_span_covering.go | 290 +++++++++++------- .../backupccl/restore_span_covering_test.go | 245 ++++++++++++--- pkg/jobs/jobspb/jobs.proto | 7 + pkg/sql/execinfrapb/processors_bulk_io.proto | 2 + pkg/util/span/frontier.go | 2 +- pkg/util/span/frontier_test.go | 77 +++-- 14 files changed, 1058 insertions(+), 338 deletions(-) create mode 100644 pkg/ccl/backupccl/restore_progress.go create mode 100644 pkg/ccl/backupccl/restore_progress_test.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index b8baf2e98e9a..a5025629a3d7 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "restore_job.go", "restore_planning.go", "restore_processor_planning.go", + "restore_progress.go", "restore_schema_change_creation.go", "restore_span_covering.go", "schedule_exec.go", @@ -182,6 +183,7 @@ go_test( "restore_mid_schema_change_test.go", "restore_old_sequences_test.go", "restore_old_versions_test.go", + "restore_progress_test.go", "restore_span_covering_test.go", "schedule_pts_chaining_test.go", "show_test.go", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 21038602ec2d..f427ca11e25b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" "github.com/cockroachdb/cockroach/pkg/cloud/gcp" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1488,6 +1489,125 @@ WHERE do(`RESTORE data.* FROM LATEST IN $1 WITH OPTIONS (into_db='restoredb')`, checkRestore) } +// TestRestoreCheckpointing checks that progress persists to the job record +// using the new span frontier. The test takes the following approach: +// +// 1. Backup and restore a database with x tables, each of which will have a +// disjoint span. This implies the restore will process x requiredSpans. Since +// each required span has two rows, each required span will result in a single +// restoreSpanEntry, and consequently, a single AddSSTable flush. Because the +// checkpoint frontier merges disjoint required spans, the persisted frontier +// should only have 1 entry. +// +// 2. This test will then block and pause the restore after y AddSStable +// flushes, and assert that y disjoint spans have been persisted to the +// progress frontier. +// +// 3. The test will then resume the restore job, allowing it to complete. +// Afterwards, the test asserts that all x spans have been persisted to the +// frontier, and that only x-y AddSStable requests were sent after resume, +// implying that on resume, no work already persisted to the frontier was +// duplicated. +func TestRestoreCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.TestingSetProgressThresholds()() + + // totalEntries represents the number of entries to appear in the persisted frontier. + totalEntries := 7 + entriesBeforePause := 4 + entriesCount := 0 + var alreadyPaused atomic.Bool + postResumeCount := 0 + blockDBRestore := make(chan struct{}) + waitForProgress := make(chan struct{}) + var mu syncutil.Mutex + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { + // Because the restore processor has several workers that + // concurrently send addsstable requests and because all workers will + // wait on the lock below, when one flush gets blocked on the + // pre-pause blockDBRestore chan, several pre-pause requests will + // queue up behind it. Because of the lock, these requests will + // actually complete _after_ the job was paused and will not get + // counted in the checkpointed span. For test correctness, we do not + // want to count these requests as part of postResumedCount by + // checking if the job was paused in each request before it began + // waiting for the lock. + wasPausedBeforeWaiting := alreadyPaused.Load() + mu.Lock() + defer mu.Unlock() + if entriesCount == entriesBeforePause { + close(waitForProgress) + <-blockDBRestore + } + entriesCount++ + if wasPausedBeforeWaiting { + postResumeCount++ + } + }, + }, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + testServerArgs := base.TestServerArgs{DisableDefaultTestTenant: true} + params.ServerArgs = testServerArgs + params.ServerArgs.Knobs = knobs + + _, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1, + InitManualReplication, params) + defer cleanupFn() + + var jobID jobspb.JobID + checkPersistedSpanLength := func(expectedCompletedSpans int) { + testutils.SucceedsSoon(t, func() error { + jobProgress := jobutils.GetJobProgress(t, sqlDB, jobID) + numSpans := len(jobProgress.GetRestore().Checkpoint) + if numSpans != expectedCompletedSpans { + return errors.Newf("expected %d checkpoints, but only see %d", expectedCompletedSpans, numSpans) + } + return nil + }) + } + + sqlDB.Exec(t, `CREATE DATABASE d`) + + for i := 1; i <= totalEntries; i++ { + tableName := fmt.Sprintf("d.t%d", i) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (id INT PRIMARY KEY, s STRING)`, tableName)) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) + } + sqlDB.Exec(t, `BACKUP DATABASE d INTO $1`, localFoo) + + sqlDB.QueryRow(t, `RESTORE DATABASE d FROM LATEST IN $1 WITH DETACHED, new_db_name=d2`, localFoo).Scan(&jobID) + + // Pause the job after some progress has been logged. + <-waitForProgress + + // To ensure that progress gets persisted, sleep well beyond the test only job update interval. + time.Sleep(time.Second) + + sqlDB.Exec(t, `PAUSE JOB $1`, &jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobID) + // NB: we don't check the persisted span length here, though we expect there + // to be 1 completed persisted span most of the time. Occasionally, two + // disjoint spans may be persisted if the required spans are processed out of + // order; i.e. table 5's span gets processed before table 4's. + require.Equal(t, entriesBeforePause, entriesCount) + + close(blockDBRestore) + alreadyPaused.Store(true) + sqlDB.Exec(t, `RESUME JOB $1`, &jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) + + // Ensure that no persisted work was repeated on resume and that all work was persisted. + checkPersistedSpanLength(1) + require.Equal(t, totalEntries-entriesBeforePause, postResumeCount) +} + func TestBackupRestoreSystemJobsProgress(t *testing.T) { defer leaktest.AfterTest(t)() skip.WithIssue(t, 68571, "flaky test") @@ -1526,14 +1646,16 @@ func createAndWaitForJob( descriptorIDs []descpb.ID, details jobspb.Details, progress jobspb.ProgressDetails, + clusterVersionAtJobStart roachpb.Version, ) { t.Helper() now := timeutil.ToUnixMicros(timeutil.Now()) payload, err := protoutil.Marshal(&jobspb.Payload{ - UsernameProto: username.RootUserName().EncodeProto(), - DescriptorIDs: descriptorIDs, - StartedMicros: now, - Details: jobspb.WrapPayloadDetails(details), + CreationClusterVersion: clusterVersionAtJobStart, + UsernameProto: username.RootUserName().EncodeProto(), + DescriptorIDs: descriptorIDs, + StartedMicros: now, + Details: jobspb.WrapPayloadDetails(details), }) if err != nil { t.Fatal(err) @@ -1622,6 +1744,7 @@ func TestBackupRestoreResume(t *testing.T) { URI: "nodelocal://0/backup" + "-" + item.testName, }, jobspb.BackupProgress{}, + roachpb.Version{}, ) // If the backup properly took the (incorrect) checkpoint into account, it @@ -1665,6 +1788,9 @@ func TestBackupRestoreResume(t *testing.T) { if err != nil { t.Fatal(err) } + tableStartKey, err := randgen.TestingMakePrimaryIndexKeyForTenant(backupTableDesc, codec) + require.NoError(t, err) + createAndWaitForJob( t, sqlDB, []descpb.ID{restoreTableID}, jobspb.RestoreDetails{ @@ -1678,8 +1804,14 @@ func TestBackupRestoreResume(t *testing.T) { URIs: []string{restoreDir}, }, jobspb.RestoreProgress{ - HighWater: restoreHighWaterMark, + Checkpoint: []jobspb.RestoreProgress_FrontierEntry{ + { + Span: roachpb.Span{Key: tableStartKey, EndKey: restoreHighWaterMark}, + Timestamp: completedSpanTime}, + }, }, + // Required because restore checkpointing is version gated. + clusterversion.ByKey(clusterversion.V23_1Start), ) // If the restore properly took the (incorrect) low-water mark into account, // the first half of the table will be missing. diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 517fb8df5da4..4695a5e7b601 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -90,11 +91,29 @@ func BenchmarkRestoreEntryCover(b *testing.B) { spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + checkpointFrontier, err := loadCheckpointFrontier(backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{}) + require.NoError(b, err) + + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + nil, + introducedSpanFrontier, + 0, + false) + require.NoError(b, err) + g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { defer close(spanCh) - return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false) + return generateAndSendImportSpans( + ctx, + backups[numBackups-1].Spans, + backups, + layerToBackupManifestFileIterFactory, + nil, + filter, + false, + spanCh) }) var cov []execinfrapb.RestoreSpanEntry diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 568bd0fa8314..6246e78141e6 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -281,28 +281,36 @@ func runGenerativeSplitAndScatter( if err != nil { return err } - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime) if err != nil { return err } - backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User()) if err != nil { return err } - + checkpointFrontier, err := loadCheckpointFrontier(spec.Spans, spec.CheckpointedSpans) + if err != nil { + return err + } + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + spec.HighWater, + introducedSpanFrontier, + spec.TargetSize, + spec.UseFrontierCheckpointing) + if err != nil { + return err + } return generateAndSendImportSpans( ctx, spec.Spans, backups, layerToFileIterFactory, backupLocalityMap, - introducedSpanFrontier, - spec.HighWater, - spec.TargetSize, - restoreSpanEntriesCh, + filter, spec.UseSimpleImportSpans, + restoreSpanEntriesCh, ) }) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index ff9bcbac7dde..ad9e5593ad3d 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -70,11 +70,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - pbtypes "github.com/gogo/protobuf/types" ) // restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of @@ -289,10 +287,30 @@ func restore( return emptyRowCount, err } + on231 := clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) + progressTracker, err := makeProgressTracker( + dataToRestore.getSpans(), + job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint, + on231, + restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV)) + if err != nil { + return emptyRowCount, err + } + + progressTracker.mu.Lock() + filter, err := makeSpanCoveringFilter( + progressTracker.mu.checkpointFrontier, + job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater, + introducedSpanFrontier, + targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV), + progressTracker.useFrontier) + progressTracker.mu.Unlock() + if err != nil { + return roachpb.RowCount{}, err + } + // Pivot the backups, which are grouped by time, into requests for import, // which are grouped by keyrange. - highWaterMark := job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater - layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(restoreCtx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, backupManifests, encryption, kmsEnv) if err != nil { return roachpb.RowCount{}, err @@ -300,28 +318,6 @@ func restore( simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) - mu := struct { - syncutil.Mutex - highWaterMark int64 - ceiling int64 - res roachpb.RowCount - // As part of job progress tracking, inFlightImportSpans tracks all the - // spans that have been generated are being processed by the processors in - // distRestore. requestsCompleleted tracks the spans from - // inFlightImportSpans that have completed its processing. Once all spans up - // to index N have been processed (and appear in requestsCompleted), then - // any spans with index < N will be removed from both inFlightImportSpans - // and requestsCompleted maps. - inFlightImportSpans map[int64]roachpb.Span - requestsCompleted map[int64]bool - }{ - highWaterMark: -1, - ceiling: 0, - inFlightImportSpans: make(map[int64]roachpb.Span), - requestsCompleted: make(map[int64]bool), - } - - targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000) genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { defer close(spanCh) @@ -331,14 +327,11 @@ func restore( backupManifests, layerToIterFactory, backupLocalityMap, - introducedSpanFrontier, - highWaterMark, - targetSize, - spanCh, + filter, simpleImportSpans, + spanCh, ) } - // Count number of import spans. var numImportSpans int var countTasks []func(ctx context.Context) error @@ -356,29 +349,15 @@ func restore( return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block + // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error if dataToRestore.isMainBundle() { // Only update the job progress on the main data bundle. This should account // for the bulk of the data to restore. Other data (e.g. zone configs in - // cluster restores) may be restored first. When restoring that data, we - // don't want to update the high-water mark key, so instead progress is just - // defined on the main data bundle (of which there should only be one). - progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), - func(progressedCtx context.Context, details jobspb.ProgressDetails) { - switch d := details.(type) { - case *jobspb.Progress_Restore: - mu.Lock() - if mu.highWaterMark >= 0 { - d.Restore.HighWater = mu.inFlightImportSpans[mu.highWaterMark].Key - } - mu.Unlock() - default: - log.Errorf(progressedCtx, "job payload had unexpected type %T", d) - } - }) + // cluster restores) may be restored first. + progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), progressTracker.updateJobCallback) jobProgressLoop := func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log") @@ -387,54 +366,20 @@ func restore( } tasks = append(tasks, jobProgressLoop) } - + if !progressTracker.useFrontier { + // This goroutine feeds the deprecated high water mark variant of the + // generativeCheckpointLoop. + tasks = append(tasks, func(ctx context.Context) error { + return genSpan(ctx, progressTracker.inFlightSpanFeeder) + }) + } progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - generativeCheckpointLoop := func(ctx context.Context) error { defer close(requestFinishedCh) for progress := range progCh { - mu.Lock() - var progDetails backuppb.RestoreProgress - if err := pbtypes.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { - log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) - } - - mu.res.Add(progDetails.Summary) - idx := progDetails.ProgressIdx - - if idx >= mu.ceiling { - for i := mu.ceiling; i <= idx; i++ { - importSpan, ok := <-importSpanCh - if !ok { - // The channel has been closed, there is nothing left to do. - log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") - return nil - } - mu.inFlightImportSpans[i] = importSpan.Span - } - mu.ceiling = idx + 1 - } - - if sp, ok := mu.inFlightImportSpans[idx]; ok { - // Assert that we're actually marking the correct span done. See #23977. - if !sp.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, sp, - ) - } - mu.requestsCompleted[idx] = true - prevHighWater := mu.highWaterMark - for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j - } - for j := prevHighWater; j < mu.highWaterMark; j++ { - delete(mu.requestsCompleted, j) - delete(mu.inFlightImportSpans, j) - } + if err := progressTracker.ingestUpdate(ctx, progress); err != nil { + return err } - mu.Unlock() - // Signal that the processor has finished importing a span, to update job // progress. requestFinishedCh <- struct{}{} @@ -442,27 +387,19 @@ func restore( return nil } tasks = append(tasks, generativeCheckpointLoop) - tasks = append(tasks, func(ctx context.Context) error { - return genSpan(ctx, importSpanCh) - }) runRestore := func(ctx context.Context) error { return distRestore( ctx, execCtx, int64(job.ID()), - dataToRestore.getPKIDs(), + dataToRestore, + endTime, encryption, kmsEnv, - dataToRestore.getRekeys(), - dataToRestore.getTenantRekeys(), - endTime, - dataToRestore.isValidateOnly(), details.URIs, - dataToRestore.getSpans(), backupLocalityInfo, - highWaterMark, - targetSize, + filter, numNodes, numImportSpans, simpleImportSpans, @@ -477,8 +414,10 @@ func restore( // TODO(dan): Build tooling to allow a user to restart a failed restore. return emptyRowCount, errors.Wrapf(err, "importing %d ranges", numImportSpans) } - - return mu.res, nil + // progress go routines should be shutdown, but use lock just to be safe. + progressTracker.mu.Lock() + defer progressTracker.mu.Unlock() + return progressTracker.mu.res, nil } // loadBackupSQLDescs extracts the backup descriptors, the latest backup @@ -1894,6 +1833,10 @@ func (r *restoreResumer) validateJobIsResumable(execConfig *sql.ExecutorConfig) // Validate that we aren't in the middle of an upgrade. To avoid unforseen // issues, we want to avoid full cluster restores if it is possible that an // upgrade is in progress. We also check this during planning. + // + // Note: If the cluster began in a mixed version state, + // the CreationClusterVersion may still be equal to binaryVersion, + // which means the cluster restore will proceed. creationClusterVersion := r.job.Payload().CreationClusterVersion binaryVersion := execConfig.Settings.Version.BinaryVersion() isClusterRestore := details.DescriptorCoverage == tree.AllDescriptors diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 20da5a60f4ea..22fd790b8d23 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" @@ -62,18 +61,13 @@ func distRestore( ctx context.Context, execCtx sql.JobExecContext, jobID int64, - pkIDs map[uint64]bool, + dataToRestore restorationData, + restoreTime hlc.Timestamp, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, - tableRekeys []execinfrapb.TableRekey, - tenantRekeys []execinfrapb.TenantRekey, - restoreTime hlc.Timestamp, - validateOnly bool, uris []string, - requiredSpans []roachpb.Span, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, - lowWaterMark roachpb.Key, - targetSize int64, + spanFilter spanCoveringFilter, numNodes int, numImportSpans int, useSimpleImportSpans bool, @@ -120,10 +114,10 @@ func distRestore( JobID: jobID, RestoreTime: restoreTime, Encryption: fileEncryption, - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - PKIDs: pkIDs, - ValidateOnly: validateOnly, + TableRekeys: dataToRestore.getRekeys(), + TenantRekeys: dataToRestore.getTenantRekeys(), + PKIDs: dataToRestore.getPKIDs(), + ValidateOnly: dataToRestore.isValidateOnly(), } // Plan SplitAndScatter in a round-robin fashion. @@ -174,22 +168,26 @@ func distRestore( id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID() spec := &execinfrapb.GenerativeSplitAndScatterSpec{ - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - ValidateOnly: validateOnly, - URIs: uris, - Encryption: encryption, - EndTime: restoreTime, - Spans: requiredSpans, - BackupLocalityInfo: backupLocalityInfo, - HighWater: lowWaterMark, - UserProto: execCtx.User().EncodeProto(), - TargetSize: targetSize, - ChunkSize: int64(chunkSize), - NumEntries: int64(numImportSpans), - NumNodes: int64(numNodes), - UseSimpleImportSpans: useSimpleImportSpans, - JobID: jobID, + TableRekeys: dataToRestore.getRekeys(), + TenantRekeys: dataToRestore.getTenantRekeys(), + ValidateOnly: dataToRestore.isValidateOnly(), + URIs: uris, + Encryption: encryption, + EndTime: restoreTime, + Spans: dataToRestore.getSpans(), + BackupLocalityInfo: backupLocalityInfo, + HighWater: spanFilter.highWaterMark, + UserProto: execCtx.User().EncodeProto(), + TargetSize: spanFilter.targetSize, + ChunkSize: int64(chunkSize), + NumEntries: int64(numImportSpans), + NumNodes: int64(numNodes), + UseSimpleImportSpans: useSimpleImportSpans, + UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing, + JobID: jobID, + } + if spanFilter.useFrontierCheckpointing { + spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0) } proc := physicalplan.Processor{ diff --git a/pkg/ccl/backupccl/restore_progress.go b/pkg/ccl/backupccl/restore_progress.go new file mode 100644 index 000000000000..dae900d1501b --- /dev/null +++ b/pkg/ccl/backupccl/restore_progress.go @@ -0,0 +1,281 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + pbtypes "github.com/gogo/protobuf/types" +) + +// restoreCheckpointMaxBytes controls the maximum number of key bytes that will be added +// to the checkpoint record. The default is set using the same reasoning as +// changefeed.frontier_checkpoint_max_bytes. +var restoreCheckpointMaxBytes = settings.RegisterByteSizeSetting( + settings.TenantWritable, + "restore.frontier_checkpoint_max_bytes", + "controls the maximum size of the restore checkpoint frontier as a the sum of the (span,"+ + "timestamp) tuples", + 1<<20, // 1 MiB +) + +// completedSpanTime indicates the timestamp that the progress frontier will +// mark completed spans with. +var completedSpanTime = hlc.MaxTimestamp + +type progressTracker struct { + // nextRequiredSpanKey maps a required span endkey to the subsequent requiredSpan's startKey. + nextRequiredSpanKey map[string]roachpb.Key + + maxBytes int64 + + mu struct { + // fields that may get updated while read are put in the lock. + syncutil.Mutex + + checkpointFrontier *spanUtils.Frontier + + // res tracks the amount of data that has been ingested. + res roachpb.RowCount + + // Note that the fields below are used for the deprecated high watermark progress + // tracker. + // highWaterMark represents the index into the requestsCompleted map. + highWaterMark int64 + ceiling int64 + + // As part of job progress tracking, inFlightImportSpans tracks all the + // spans that have been generated are being processed by the processors in + // distRestore. requestsCompleleted tracks the spans from + // inFlightImportSpans that have completed its processing. Once all spans up + // to index N have been processed (and appear in requestsCompleted), then + // any spans with index < N will be removed from both inFlightImportSpans + // and requestsCompleted maps. + inFlightImportSpans map[int64]roachpb.Span + requestsCompleted map[int64]bool + } + useFrontier bool + inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry +} + +func makeProgressTracker( + requiredSpans roachpb.Spans, + persistedSpans []jobspb.RestoreProgress_FrontierEntry, + useFrontier bool, + maxBytes int64, +) (*progressTracker, error) { + + var ( + checkpointFrontier *spanUtils.Frontier + err error + nextRequiredSpanKey map[string]roachpb.Key + inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry + ) + if useFrontier { + checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) + if err != nil { + return nil, err + } + nextRequiredSpanKey = make(map[string]roachpb.Key) + for i := 0; i < len(requiredSpans)-1; i++ { + nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key + } + + } else { + inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000) + } + + pt := &progressTracker{} + pt.mu.checkpointFrontier = checkpointFrontier + pt.mu.highWaterMark = -1 + pt.mu.ceiling = 0 + pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span) + pt.mu.requestsCompleted = make(map[int64]bool) + pt.nextRequiredSpanKey = nextRequiredSpanKey + pt.maxBytes = maxBytes + pt.useFrontier = useFrontier + pt.inFlightSpanFeeder = inFlightSpanFeeder + return pt, nil +} + +func loadCheckpointFrontier( + requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry, +) (*spanUtils.Frontier, error) { + numRequiredSpans := len(requiredSpans) - 1 + contiguousSpan := roachpb.Span{ + Key: requiredSpans[0].Key, + EndKey: requiredSpans[numRequiredSpans].EndKey, + } + checkpointFrontier, err := spanUtils.MakeFrontier(contiguousSpan) + if err != nil { + return nil, err + } + for _, sp := range persistedSpans { + _, err = checkpointFrontier.Forward(sp.Span, sp.Timestamp) + if err != nil { + return nil, err + } + } + return checkpointFrontier, err +} + +// persistFrontier converts a span frontier into a list of (span, timestamp) +// tuples that can persist to disk. If the user passes a nonzero maxBytes, the +// first N spans in the frontier that remain below the maxBytes memory limit +// will return. +func persistFrontier( + frontier *spanUtils.Frontier, maxBytes int64, +) []jobspb.RestoreProgress_FrontierEntry { + var used int64 + completedSpansSlice := make([]jobspb.RestoreProgress_FrontierEntry, 0) + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done spanUtils.OpResult) { + if ts.Equal(completedSpanTime) { + + // Persist the first N spans in the frontier that remain below the memory limit. + used += int64(len(sp.Key) + len(sp.EndKey) + ts.Size()) + if maxBytes != 0 && used > maxBytes { + return spanUtils.StopMatch + } + // TODO (msbutler): we may want to persist spans that have been + // restored up to a certain system time, if on resume, we build + // facilities in the generative split and scatter processor to + // create a restore span entry with files from a minimum + // timestamp. + completedSpansSlice = append(completedSpansSlice, + jobspb.RestoreProgress_FrontierEntry{Span: sp, Timestamp: ts}) + } + return spanUtils.ContinueMatch + }) + return completedSpansSlice +} + +func (pt *progressTracker) updateJobCallback( + progressedCtx context.Context, progressDetails jobspb.ProgressDetails, +) { + switch d := progressDetails.(type) { + case *jobspb.Progress_Restore: + pt.mu.Lock() + if pt.useFrontier { + // TODO (msbutler): this requires iterating over every span in the frontier, + // and rewriting every completed required span to disk. + // We may want to be more intelligent about this. + d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) + } else { + if pt.mu.highWaterMark >= 0 { + d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key + } + } + pt.mu.Unlock() + default: + log.Errorf(progressedCtx, "job payload had unexpected type %T", d) + } +} + +// ingestUpdate updates the progressTracker after a progress update returns from +// the distributed processors. +func (pt *progressTracker) ingestUpdate( + ctx context.Context, rawProgress *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, +) error { + var progDetails backuppb.RestoreProgress + if err := pbtypes.UnmarshalAny(&rawProgress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) + } + pt.mu.Lock() + defer pt.mu.Unlock() + pt.mu.res.Add(progDetails.Summary) + if pt.useFrontier { + updateSpan := progDetails.DataSpan.Clone() + // If the completedSpan has the same end key as a requiredSpan_i, forward + // the frontier for the span [completedSpan_startKey, + // requiredSpan_i+1_startKey]. This trick ensures the span frontier will + // contain a single entry when the restore completes. Recall that requiredSpans are + // disjoint, and a spanFrontier never merges disjoint spans. So, without + // this trick, the spanFrontier will have O(requiredSpans) entries when the + // restore completes. This trick ensures all spans persisted to the frontier are adjacent, + // and consequently, will eventually merge. + // + // Here's a visual example: + // - this restore has two required spans: [a,d) and [e,h). + // - the restore span entry [c,d) just completed, implying the frontier logically looks like: + // + // tC| x---o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - since [c,d)'s endkey equals the required span (a,d]'s endkey, + // also update the gap between required span 1 and 2 in the frontier: + // + // tC| x-------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - this will ensure that when all subspans in required spans 1 and 2 complete, + // the checkpoint frontier has one span: + // + // tC| x---------------------------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { + updateSpan.EndKey = newEndKey + } + if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { + return err + } + } else { + idx := progDetails.ProgressIdx + + if idx >= pt.mu.ceiling { + for i := pt.mu.ceiling; i <= idx; i++ { + importSpan, ok := <-pt.inFlightSpanFeeder + if !ok { + // The channel has been closed, there is nothing left to do. + log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") + return nil + } + pt.mu.inFlightImportSpans[i] = importSpan.Span + } + pt.mu.ceiling = idx + 1 + } + + if sp, ok := pt.mu.inFlightImportSpans[idx]; ok { + // Assert that we're actually marking the correct span done. See #23977. + if !sp.Key.Equal(progDetails.DataSpan.Key) { + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, sp, + ) + } + pt.mu.requestsCompleted[idx] = true + prevHighWater := pt.mu.highWaterMark + for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ { + pt.mu.highWaterMark = j + } + for j := prevHighWater; j < pt.mu.highWaterMark; j++ { + delete(pt.mu.requestsCompleted, j) + delete(pt.mu.inFlightImportSpans, j) + } + } + } + return nil +} diff --git a/pkg/ccl/backupccl/restore_progress_test.go b/pkg/ccl/backupccl/restore_progress_test.go new file mode 100644 index 000000000000..12b88ade5de8 --- /dev/null +++ b/pkg/ccl/backupccl/restore_progress_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + gogotypes "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/require" +) + +// TestProgressTracker tests the lifecycle of the checkpoint frontier by testing the following +// over a sequence of updates on a mock set of required spans: +// - loading the persisted frontier into memory +// - updating to the in memory frontier +// - persisting the in memory frontier +func TestProgressTracker(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + + requiredSpans := []roachpb.Span{c.sp("a", "e"), c.sp("f", "i")} + + mockUpdate := func(sp roachpb.Span) *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress { + restoreProgress := backuppb.RestoreProgress{DataSpan: sp} + details, err := gogotypes.MarshalAny(&restoreProgress) + require.NoError(t, err) + return &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{ProgressDetails: *details} + } + + pSp := func(spans ...roachpb.Span) []jobspb.RestoreProgress_FrontierEntry { + pSpans := make([]jobspb.RestoreProgress_FrontierEntry, 0) + for _, sp := range spans { + pSpans = append(pSpans, jobspb.RestoreProgress_FrontierEntry{Span: sp, Timestamp: completedSpanTime}) + } + return pSpans + } + + // testStep characterizes an update to the span frontier and the expected + // persisted spans after the update. + type testStep struct { + update roachpb.Span + expectedPersisted []jobspb.RestoreProgress_FrontierEntry + } + + // Each test step builds on the persistedSpan. + persistedSpans := make([]jobspb.RestoreProgress_FrontierEntry, 0) + for _, step := range []testStep{ + { + update: c.sp("a", "c"), + expectedPersisted: pSp(c.sp("a", "c")), + }, + { + // Last update in first required span should extend the persisted end key + // to the start key of the second required span. + update: c.sp("c", "e"), + expectedPersisted: pSp(c.sp("a", "f")), + }, + { + update: c.sp("h", "i"), + expectedPersisted: pSp(c.sp("a", "f"), c.sp("h", "i")), + }, + { + // After both required spans completed, only one persisted span should exist. + update: c.sp("f", "h"), + expectedPersisted: pSp(c.sp("a", "i")), + }, + } { + pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0) + require.NoError(t, err) + + require.NoError(t, pt.ingestUpdate(ctx, mockUpdate(step.update))) + + persistedSpans = persistFrontier(pt.mu.checkpointFrontier, 0) + require.Equal(t, step.expectedPersisted, persistedSpans) + } +} diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 9dabb45df1e6..206e17f0fae3 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -307,6 +307,96 @@ func makeEntry(start, end roachpb.Key, f execinfrapb.RestoreFileSpec) execinfrap } } +// spanCoveringFilter holds metadata that filters which backups and required spans are used to +// populate a restoreSpanEntry +type spanCoveringFilter struct { + checkpointFrontier *spanUtils.Frontier + highWaterMark roachpb.Key + introducedSpanFrontier *spanUtils.Frontier + useFrontierCheckpointing bool + targetSize int64 +} + +func makeSpanCoveringFilter( + checkpointFrontier *spanUtils.Frontier, + highWater roachpb.Key, + introducedSpanFrontier *spanUtils.Frontier, + targetSize int64, + useFrontierCheckpointing bool, +) (spanCoveringFilter, error) { + sh := spanCoveringFilter{ + introducedSpanFrontier: introducedSpanFrontier, + targetSize: targetSize, + highWaterMark: highWater, + useFrontierCheckpointing: useFrontierCheckpointing, + checkpointFrontier: checkpointFrontier, + } + return sh, nil +} + +// filterCompleted returns the subspans of the requiredSpan that still need to be +// restored. +func (f spanCoveringFilter) filterCompleted(requiredSpan roachpb.Span) roachpb.Spans { + if f.useFrontierCheckpointing { + return f.findToDoSpans(requiredSpan) + } + if requiredSpan.EndKey.Compare(f.highWaterMark) <= 0 { + return roachpb.Spans{} + } + if requiredSpan.Key.Compare(f.highWaterMark) < 0 { + requiredSpan.Key = f.highWaterMark + } + return []roachpb.Span{requiredSpan} +} + +// findToDoSpans returns the sub spans within the required span that have not completed. +func (f spanCoveringFilter) findToDoSpans(requiredSpan roachpb.Span) roachpb.Spans { + toDoSpans := make(roachpb.Spans, 0) + f.checkpointFrontier.SpanEntries(requiredSpan, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if !ts.Equal(completedSpanTime) { + toDoSpans = append(toDoSpans, s) + } + return spanUtils.ContinueMatch + }) + return toDoSpans +} + +// getLayersCoveredLater creates a map which indicates which backup layers introduced the +// given span. +func (f spanCoveringFilter) getLayersCoveredLater( + span roachpb.Span, backups []backuppb.BackupManifest, +) map[int]bool { + layersCoveredLater := make(map[int]bool) + for layer := range backups { + var coveredLater bool + f.introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if backups[layer].EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + }) + if coveredLater { + // Don't use this backup to cover this span if the span was reintroduced + // after the backup's endTime. In this case, this backup may have + // invalid data, and further, a subsequent backup will contain all of + // this span's data. Consider the following example: + // + // T0: Begin IMPORT INTO on existing table foo, ingest some data + // T1: Backup foo + // T2: Rollback IMPORT via clearRange + // T3: Incremental backup of foo, with a full reintroduction of foo’s span + // T4: RESTORE foo: should only restore foo from the incremental backup. + // If data from the full backup were also restored, + // the imported-but-then-clearRanged data will leak in the restored cluster. + // This logic seeks to avoid this form of data corruption. + layersCoveredLater[layer] = true + } + } + return layersCoveredLater +} + // generateAndSendImportSpans partitions the spans of requiredSpans into a // covering of RestoreSpanEntry's which each have all overlapping files from the // passed backups assigned to them. The spans of requiredSpans are @@ -354,18 +444,18 @@ func generateAndSendImportSpans( backups []backuppb.BackupManifest, layerToBackupManifestFileIterFactory backupinfo.LayerToBackupManifestFileIterFactory, backupLocalityMap map[int]storeByLocalityKV, - introducedSpanFrontier *spanUtils.Frontier, - lowWaterMark roachpb.Key, - targetSize int64, - spanCh chan execinfrapb.RestoreSpanEntry, + filter spanCoveringFilter, useSimpleImportSpans bool, + spanCh chan execinfrapb.RestoreSpanEntry, ) error { + if useSimpleImportSpans { - importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + importSpans, err := makeSimpleImportSpans(ctx, requiredSpans, backups, + layerToBackupManifestFileIterFactory, backupLocalityMap, filter.introducedSpanFrontier, + filter.highWaterMark, filter.targetSize) if err != nil { return err } - for _, sp := range importSpans { spanCh <- sp } @@ -398,7 +488,6 @@ func generateAndSendImportSpans( entry := execinfrapb.RestoreSpanEntry{ Span: lastCovSpan, } - for layer := range covFilesByLayer { for _, f := range covFilesByLayer[layer] { fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} @@ -408,7 +497,6 @@ func generateAndSendImportSpans( entry.Files = append(entry.Files, fileSpec) } } - if len(entry.Files) > 0 { select { case <-ctx.Done(): @@ -420,137 +508,105 @@ func generateAndSendImportSpans( return nil } - for _, span := range requiredSpans { - firstInSpan = true - if span.EndKey.Compare(lowWaterMark) < 0 { - continue - } - if span.Key.Compare(lowWaterMark) < 0 { - span.Key = lowWaterMark - } - - layersCoveredLater := make(map[int]bool) - for layer := range backups { - var coveredLater bool - introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, - ts hlc.Timestamp) (done spanUtils.OpResult) { - if backups[layer].EndTime.Less(ts) { - coveredLater = true + for _, requiredSpan := range requiredSpans { + filteredSpans := filter.filterCompleted(requiredSpan) + for _, span := range filteredSpans { + firstInSpan = true + layersCoveredLater := filter.getLayersCoveredLater(span, backups) + for { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + return err + } + break } - return spanUtils.StopMatch - }) - if coveredLater { - // Don't use this backup to cover this span if the span was reintroduced - // after the backup's endTime. In this case, this backup may have - // invalid data, and further, a subsequent backup will contain all of - // this span's data. Consider the following example: - // - // T0: Begin IMPORT INTO on existing table foo, ingest some data - // T1: Backup foo - // T2: Rollback IMPORT via clearRange - // T3: Incremental backup of foo, with a full reintroduction of foo’s span - // T4: RESTORE foo: should only restore foo from the incremental backup. - // If data from the full backup were also restored, - // the imported-but-then-clearRanged data will leak in the restored cluster. - // This logic seeks to avoid this form of data corruption. - layersCoveredLater[layer] = true - } - } - for { - if ok, err := startEndKeyIt.valid(); !ok { - if err != nil { - return err + key := startEndKeyIt.value() + if span.Key.Compare(key) >= 0 { + startEndKeyIt.next() + continue } - break - } - key := startEndKeyIt.value() - if span.Key.Compare(key) >= 0 { - startEndKeyIt.next() - continue - } - - var coverSpan roachpb.Span - if firstInSpan { - coverSpan.Key = span.Key - } else { - coverSpan.Key = lastCovSpan.EndKey - } + var coverSpan roachpb.Span + if firstInSpan { + coverSpan.Key = span.Key + } else { + coverSpan.Key = lastCovSpan.EndKey + } - if span.ContainsKey(key) { - coverSpan.EndKey = startEndKeyIt.value() - } else { - coverSpan.EndKey = span.EndKey - } + if span.ContainsKey(key) { + coverSpan.EndKey = startEndKeyIt.value() + } else { + coverSpan.EndKey = span.EndKey + } - newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) - if err != nil { - return err - } + newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) + if err != nil { + return err + } - var filesByLayer [][]*backuppb.BackupManifest_File - var covSize int64 - var newCovFilesSize int64 + var filesByLayer [][]*backuppb.BackupManifest_File + var covSize int64 + var newCovFilesSize int64 - for layer := range newFilesByLayer { - for _, file := range newFilesByLayer[layer] { - sz := file.EntryCounts.DataSize - if sz == 0 { - sz = 16 << 20 + for layer := range newFilesByLayer { + for _, file := range newFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + newCovFilesSize += sz } - newCovFilesSize += sz + filesByLayer = append(filesByLayer, newFilesByLayer[layer]) } - filesByLayer = append(filesByLayer, newFilesByLayer[layer]) - } - for layer := range covFilesByLayer { - for _, file := range covFilesByLayer[layer] { - sz := file.EntryCounts.DataSize - if sz == 0 { - sz = 16 << 20 - } + for layer := range covFilesByLayer { + for _, file := range covFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } - if coverSpan.Overlaps(file.Span) { - covSize += sz - filesByLayer[layer] = append(filesByLayer[layer], file) + if coverSpan.Overlaps(file.Span) { + covSize += sz + filesByLayer[layer] = append(filesByLayer[layer], file) + } } } - } - if covFilesByLayer == nil { - covFilesByLayer = newFilesByLayer - lastCovSpan = coverSpan - lastCovSpanSize = newCovFilesSize - } else { - if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= targetSize) && !firstInSpan { - // If there are no new files that cover this span or if we can add the - // files in the new span's cover to the last span's cover and still stay - // below targetSize, then we should merge the two spans. - for layer := range newFilesByLayer { - covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) - } - lastCovSpan.EndKey = coverSpan.EndKey - lastCovSpanSize = lastCovSpanSize + newCovFilesSize + if covFilesByLayer == nil { + covFilesByLayer = newFilesByLayer + lastCovSpan = coverSpan + lastCovSpanSize = newCovFilesSize } else { - if err := flush(ctx); err != nil { - return err + if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= filter.targetSize) && !firstInSpan { + // If there are no new files that cover this span or if we can add the + // files in the new span's cover to the last span's cover and still stay + // below targetSize, then we should merge the two spans. + for layer := range newFilesByLayer { + covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) + } + lastCovSpan.EndKey = coverSpan.EndKey + lastCovSpanSize = lastCovSpanSize + newCovFilesSize + } else { + if err := flush(ctx); err != nil { + return err + } + lastCovSpan = coverSpan + covFilesByLayer = filesByLayer + lastCovSpanSize = covSize } - lastCovSpan = coverSpan - covFilesByLayer = filesByLayer - lastCovSpanSize = covSize } - } - firstInSpan = false + firstInSpan = false - if lastCovSpan.EndKey.Equal(span.EndKey) { - break - } + if lastCovSpan.EndKey.Equal(span.EndKey) { + break + } - startEndKeyIt.next() + startEndKeyIt.next() + } } } - return flush(ctx) } diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index dda2ab42b6c8..aa936dd316fd 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -249,6 +250,7 @@ func makeImportSpans( layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, targetSize int64, introducedSpanFrontier *spanUtils.Frontier, + completedSpans []jobspb.RestoreProgress_FrontierEntry, useSimpleImportSpans bool, ) ([]execinfrapb.RestoreSpanEntry, error) { var cover []execinfrapb.RestoreSpanEntry @@ -261,7 +263,30 @@ func makeImportSpans( return nil }) - err := generateAndSendImportSpans(ctx, spans, backups, layerToIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + checkpointFrontier, err := loadCheckpointFrontier(spans, completedSpans) + if err != nil { + return nil, err + } + + filter, err := makeSpanCoveringFilter( + checkpointFrontier, + nil, + introducedSpanFrontier, + targetSize, + true) + if err != nil { + return nil, err + } + + err = generateAndSendImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + nil, + filter, + useSimpleImportSpans, + spanCh) close(spanCh) if err != nil { @@ -349,43 +374,117 @@ func TestRestoreEntryCoverExample(t *testing.T) { layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, - {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, - {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, - {Span: c.sp("f", "g"), Files: c.paths("6")}, - {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, cover) - - coverSized, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, 2<<20, emptySpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, - {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, - {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, - {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, coverSized) - - // check that introduced spans are properly elided - backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) - require.NoError(t, err) - coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToIterFactory, noSpanTargetSize, introducedSpanFrontier, false) - require.NoError(t, err) - require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: c.sp("a", "f"), Files: c.paths("6")}, - {Span: c.sp("f", "g"), Files: c.paths("6")}, - {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, - {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, - {Span: c.sp("l", "m"), Files: c.paths("9")}, - }, coverIntroduced) + emptyCompletedSpans := []jobspb.RestoreProgress_FrontierEntry{} + + type simpleRestoreSpanEntry struct { + span roachpb.Span + paths []string + } + reduce := func(entries []execinfrapb.RestoreSpanEntry) []simpleRestoreSpanEntry { + reduced := make([]simpleRestoreSpanEntry, len(entries)) + for i := range entries { + reduced[i].span = entries[i].Span + reduced[i].paths = make([]string, len(entries[i].Files)) + for j := range entries[i].Files { + reduced[i].paths[j] = entries[i].Files[j].Path + } + } + return reduced + } + t.Run("base", func(t *testing.T) { + cover, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(cover)) + }) + + t.Run("target-size", func(t *testing.T) { + coverSized, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + 2<<20, + emptySpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("b", "c"), Files: c.paths("1", "4", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverSized)) + }) + + t.Run("introduced-spans", func(t *testing.T) { + backups[2].IntroducedSpans = []roachpb.Span{c.sp("a", "f")} + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + coverIntroduced, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + introducedSpanFrontier, + emptyCompletedSpans, + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "f"), Files: c.paths("6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("g", "h"), Files: c.paths("5", "6")}, + {Span: c.sp("h", "i"), Files: c.paths("3", "5", "8")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverIntroduced)) + }) + t.Run("completed-spans", func(t *testing.T) { + + completedSpans := []roachpb.Span{ + // log some progress on part of a restoreSpanEntry. + c.sp("b", "c"), + + // Log some progress over multiple restoreSpanEntries. + c.sp("g", "i")} + + frontier, err := spanUtils.MakeFrontierAt(completedSpanTime, completedSpans...) + require.NoError(t, err) + coverIntroduced, err := makeImportSpans( + ctx, + spans, + backups, + layerToIterFactory, + noSpanTargetSize, + emptySpanFrontier, + persistFrontier(frontier, 0), + false) + require.NoError(t, err) + require.Equal(t, reduce([]execinfrapb.RestoreSpanEntry{ + {Span: c.sp("a", "b"), Files: c.paths("1", "6")}, + {Span: c.sp("c", "f"), Files: c.paths("2", "4", "6")}, + {Span: c.sp("f", "g"), Files: c.paths("6")}, + {Span: c.sp("l", "m"), Files: c.paths("9")}, + }), reduce(coverIntroduced)) + }) } func TestFileSpanStartKeyIterator(t *testing.T) { @@ -499,6 +598,56 @@ func TestFileSpanStartKeyIterator(t *testing.T) { } } +// TestCheckpointFilter ensures the filterCompleted( ) function properly splits +// a required span into remaining toDo spans. +func TestCheckpointFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + c := makeCoverUtils(ctx, t, &execCfg) + + requiredSpan := c.sp("b", "e") + + type testCase struct { + completedSpans roachpb.Spans + expectedToDoSpans roachpb.Spans + } + + for _, tc := range []testCase{ + { + completedSpans: roachpb.Spans{c.sp("a", "c")}, + expectedToDoSpans: roachpb.Spans{c.sp("c", "e")}, + }, + { + completedSpans: roachpb.Spans{c.sp("c", "d")}, + expectedToDoSpans: roachpb.Spans{c.sp("b", "c"), c.sp("d", "e")}, + }, + { + completedSpans: roachpb.Spans{c.sp("a", "c"), c.sp("d", "e")}, + expectedToDoSpans: roachpb.Spans{c.sp("c", "d")}, + }, + } { + frontier, err := spanUtils.MakeFrontier(requiredSpan) + require.NoError(t, err) + for i := range tc.completedSpans { + _, err := frontier.Forward(tc.completedSpans[i], completedSpanTime) + require.NoError(t, err) + } + + f, err := makeSpanCoveringFilter( + frontier, + nil, + nil, + 0, + true) + require.NoError(t, err) + require.Equal(t, tc.expectedToDoSpans, f.filterCompleted(requiredSpan)) + } +} + type mockBackupInfo struct { // tableIDs identifies the tables included in the backup. tableIDs []int @@ -705,8 +854,15 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx, execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil) require.NoError(t, err) - cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToIterFactory, - 0, introducedSpanFrontier, false) + cover, err := makeImportSpans( + ctx, + restoreSpans, + backups, + layerToIterFactory, + 0, + introducedSpanFrontier, + []jobspb.RestoreProgress_FrontierEntry{}, + false) require.NoError(t, err) for _, reIntroTable := range reIntroducedTables { @@ -781,8 +937,15 @@ func TestRestoreEntryCover(t *testing.T) { numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, - layerToIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + cover, err := makeImportSpans( + ctx, + backups[numBackups-1].Spans, + backups, + layerToIterFactory, + target<<20, + introducedSpanFrontier, + []jobspb.RestoreProgress_FrontierEntry{}, + simpleImportSpans) require.NoError(t, err) require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index db7ea46ecc0d..d866dff30974 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -469,6 +469,13 @@ message RestoreDetails { message RestoreProgress { bytes high_water = 1; + + message FrontierEntry { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + } + + repeated FrontierEntry checkpoint = 2 [(gogoproto.nullable) = false]; } message ImportDetails { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index c90ccd5ed853..761ca768c35d 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -420,6 +420,8 @@ message GenerativeSplitAndScatterSpec { optional int64 num_nodes = 17[(gogoproto.nullable) = false]; optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; optional bool use_simple_import_spans = 19 [(gogoproto.nullable) = false]; + optional bool use_frontier_checkpointing = 20 [(gogoproto.nullable) = false]; + repeated jobs.jobspb.RestoreProgress.FrontierEntry checkpointed_spans = 21 [(gogoproto.nullable) = false]; } diff --git a/pkg/util/span/frontier.go b/pkg/util/span/frontier.go index 7d2b7b2b9241..b4d426783aa0 100644 --- a/pkg/util/span/frontier.go +++ b/pkg/util/span/frontier.go @@ -426,7 +426,7 @@ func (f *Frontier) Entries(fn Operation) { // // ([b-c), 5), ([c-e), 1), ([e-f), 3], ([f, h], 1) ([h, k), 4), ([k, m), 1). // -// Note: neither [a-b) nor [m, q) will be emitted since they fall outside the spans +// Note: neither [a-b) nor [m, q) will be emitted since they do not intersect with the spans // tracked by this frontier. func (f *Frontier) SpanEntries(span roachpb.Span, op Operation) { f.Lock() diff --git a/pkg/util/span/frontier_test.go b/pkg/util/span/frontier_test.go index e31e651e0904..82c71cf260f7 100644 --- a/pkg/util/span/frontier_test.go +++ b/pkg/util/span/frontier_test.go @@ -314,16 +314,12 @@ func TestSpanEntries(t *testing.T) { return roachpb.Span{Key: key(start), EndKey: key(end)} } - spAZ := mkspan('A', 'Z') - f, err := MakeFrontier(spAZ) - require.NoError(t, err) - - advance := func(s roachpb.Span, wall int64) { + advance := func(f *Frontier, s roachpb.Span, wall int64) { _, err := f.Forward(s, hlc.Timestamp{WallTime: wall}) require.NoError(t, err) } - spanEntries := func(sp roachpb.Span) string { + spanEntries := func(f *Frontier, sp roachpb.Span) string { var buf strings.Builder f.SpanEntries(sp, func(s roachpb.Span, ts hlc.Timestamp) OpResult { if buf.Len() != 0 { @@ -334,35 +330,52 @@ func TestSpanEntries(t *testing.T) { }) return buf.String() } + t.Run("contiguous frontier", func(t *testing.T) { + spAZ := mkspan('A', 'Z') + f, err := MakeFrontier(spAZ) + require.NoError(t, err) + // Nothing overlaps span fully to the left of frontier. + require.Equal(t, ``, spanEntries(f, mkspan('0', '9'))) + // Nothing overlaps span fully to the right of the frontier. + require.Equal(t, ``, spanEntries(f, mkspan('a', 'z'))) + + // Span overlaps entire frontier. + require.Equal(t, `{A-Z}@0`, spanEntries(f, spAZ)) + advance(f, spAZ, 1) + require.Equal(t, `{A-Z}@1`, spanEntries(f, spAZ)) + + // Span overlaps part of the frontier, with left part outside frontier. + require.Equal(t, `{A-C}@1`, spanEntries(f, mkspan('0', 'C'))) + + // Span overlaps part of the frontier, with right part outside frontier. + require.Equal(t, `{Q-Z}@1`, spanEntries(f, mkspan('Q', 'c'))) + + // Span fully inside frontier. + require.Equal(t, `{P-W}@1`, spanEntries(f, mkspan('P', 'W'))) + + // Advance part of the frontier. + advance(f, mkspan('C', 'E'), 2) + advance(f, mkspan('H', 'M'), 5) + advance(f, mkspan('N', 'Q'), 3) + + // Span overlaps various parts of the frontier. + require.Equal(t, + `{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`, + spanEntries(f, mkspan('3', 'P'))) + }) - // Nothing overlaps span fully to the left of frontier. - require.Equal(t, ``, spanEntries(mkspan('0', '9'))) - // Nothing overlaps span fully to the right of the frontier. - require.Equal(t, ``, spanEntries(mkspan('a', 'z'))) - - // Span overlaps entire frontier. - require.Equal(t, `{A-Z}@0`, spanEntries(spAZ)) - advance(spAZ, 1) - require.Equal(t, `{A-Z}@1`, spanEntries(spAZ)) - - // Span overlaps part of the frontier, with left part outside frontier. - require.Equal(t, `{A-C}@1`, spanEntries(mkspan('0', 'C'))) - - // Span overlaps part of the frontier, with right part outside frontier. - require.Equal(t, `{Q-Z}@1`, spanEntries(mkspan('Q', 'c'))) - - // Span fully inside frontier. - require.Equal(t, `{P-W}@1`, spanEntries(mkspan('P', 'W'))) + t.Run("disjoint frontier", func(t *testing.T) { + spAB := mkspan('A', 'B') + spCE := mkspan('C', 'E') + f, err := MakeFrontier(spAB, spCE) + require.NoError(t, err) - // Advance part of the frontier. - advance(mkspan('C', 'E'), 2) - advance(mkspan('H', 'M'), 5) - advance(mkspan('N', 'Q'), 3) + // Nothing overlaps between the two spans in the frontier. + require.Equal(t, ``, spanEntries(f, mkspan('B', 'C'))) - // Span overlaps various parts of the frontier. - require.Equal(t, - `{A-C}@1 {C-E}@2 {E-H}@1 {H-M}@5 {M-N}@1 {N-P}@3`, - spanEntries(mkspan('3', 'P'))) + // Overlap with only one entry in the frontier + require.Equal(t, `{C-D}@0`, spanEntries(f, mkspan('B', 'D'))) + }) } // symbols that can make up spans.