diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 13d6e53ea608..8f38887f8d86 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -87,6 +87,13 @@ var ( settings.WithName("bulkio.backup.split_keys_on_timestamps.enabled"), ) + preSplitExports = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "bulkio.backup.presplit_request_spans.enabled", + "split the spans that will be requests before requesting them", + util.ConstantWithMetamorphicTestBool("backup-presplit-spans", true), + ) + sendExportRequestWithVerboseTracing = settings.RegisterBoolSetting( settings.ApplicationLevel, "bulkio.backup.export_request_verbose_tracing", @@ -284,13 +291,12 @@ func (bp *backupDataProcessor) ConsumerClosed() { } type spanAndTime struct { - // spanIdx is a unique identifier of this object. - spanIdx int - span roachpb.Span - firstKeyTS hlc.Timestamp - start, end hlc.Timestamp - attempts int - lastTried time.Time + span roachpb.Span + firstKeyTS hlc.Timestamp + start, end hlc.Timestamp + attempts int + lastTried time.Time + finishesSpec bool } type exportedSpan struct { @@ -312,23 +318,47 @@ func runBackupProcessor( clusterSettings := flowCtx.Cfg.Settings totalSpans := len(spec.Spans) + len(spec.IntroducedSpans) - todo := make(chan spanAndTime, totalSpans) - var spanIdx int - for _, s := range spec.IntroducedSpans { - todo <- spanAndTime{ - spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: hlc.Timestamp{}, - end: spec.BackupStartTime, + requestSpans := make([]spanAndTime, 0, totalSpans) + rangeSizedSpans := preSplitExports.Get(&flowCtx.EvalCtx.Settings.SV) + + splitSpans := func(spans []roachpb.Span, start, end hlc.Timestamp) error { + for _, fullSpan := range spans { + remainingSpan := fullSpan + + if rangeSizedSpans { + rdi, err := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig).RangeDescIteratorFactory.NewIterator(ctx, fullSpan) + if err != nil { + return err + } + for ; rdi.Valid(); rdi.Next() { + rangeDesc := rdi.CurRangeDescriptor() + rangeSpan := roachpb.Span{Key: rangeDesc.StartKey.AsRawKey(), EndKey: rangeDesc.EndKey.AsRawKey()} + subspan := remainingSpan.Intersect(rangeSpan) + if !subspan.Valid() { + return errors.AssertionFailedf("%s not in %s of %s", rangeSpan, remainingSpan, fullSpan) + } + requestSpans = append(requestSpans, spanAndTime{span: subspan, start: start, end: end}) + remainingSpan.Key = subspan.EndKey + } + } + + if remainingSpan.Valid() { + requestSpans = append(requestSpans, spanAndTime{span: remainingSpan, start: start, end: end}) + } + requestSpans[len(requestSpans)-1].finishesSpec = true } - spanIdx++ + return nil } - for _, s := range spec.Spans { - todo <- spanAndTime{ - spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: spec.BackupStartTime, - end: spec.BackupEndTime, - } - spanIdx++ + + if err := splitSpans(spec.IntroducedSpans, hlc.Timestamp{}, spec.BackupStartTime); err != nil { + return err + } + if err := splitSpans(spec.Spans, spec.BackupStartTime, spec.BackupEndTime); err != nil { + return err } + log.Infof(ctx, "backup processor is assigned %d spans covering %d ranges", totalSpans, len(requestSpans)) + destURI := spec.DefaultURI var destLocalityKV string @@ -389,6 +419,32 @@ func runBackupProcessor( log.Infof(ctx, "starting %d backup export workers", numSenders) defer release() + todo := make(chan []spanAndTime, len(requestSpans)) + + const maxChunkSize = 100 + // Aim to make at least 4 chunks per worker, ensuring size is >=1 and <= max. + chunkSize := (len(requestSpans) / (numSenders * 4)) + 1 + if chunkSize > maxChunkSize { + chunkSize = maxChunkSize + } + + chunk := make([]spanAndTime, 0, chunkSize) + for i := range requestSpans { + if !rangeSizedSpans { + todo <- []spanAndTime{requestSpans[i]} + continue + } + + chunk = append(chunk, requestSpans[i]) + if len(chunk) > chunkSize { + todo <- chunk + chunk = make([]spanAndTime, 0, chunkSize) + } + } + if len(chunk) > 0 { + todo <- chunk + } + return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { readTime := spec.BackupEndTime.GoTime() sink := makeFileSSTSink(sinkConf, storage) @@ -410,217 +466,219 @@ func runBackupProcessor( select { case <-ctxDone: return ctx.Err() - case span := <-todo: - for len(span.span.Key) != 0 { - splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) - // If we started splitting already, we must continue until we reach the end - // of split span. - if !span.firstKeyTS.IsEmpty() { - splitMidKey = true - } - - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - SplitMidKey: splitMidKey, - } + case spans := <-todo: + for _, span := range spans { + for len(span.span.Key) != 0 { + splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) + // If we started splitting already, we must continue until we reach the end + // of split span. + if !span.firstKeyTS.IsEmpty() { + splitMidKey = true + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttempt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-ctxDone: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + SplitMidKey: splitMidKey, } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttempt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - header := kvpb.Header{ - // We set the DistSender response target bytes field to a sentinel - // value. The sentinel value of 1 forces the ExportRequest to paginate - // after creating a single SST. - TargetBytes: 1, - Timestamp: span.end, - ReturnElasticCPUResumeSpans: true, - } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - admissionHeader := kvpb.AdmissionHeader{ - // Export requests are currently assigned BulkNormalPri. - // - // TODO(dt): Consider linking this to/from the UserPriority field. - Priority: int32(admissionpb.BulkNormalPri), - CreateTime: timeutil.Now().UnixNano(), - Source: kvpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - } - log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawResp kvpb.Response - var recording tracingpb.Recording - var pErr *kvpb.Error - requestSentAt := timeutil.Now() - exportRequestErr := timeutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - sp := tracing.SpanFromContext(ctx) - opts := make([]tracing.SpanOption, 0) - opts = append(opts, tracing.WithParent(sp)) - if sendExportRequestWithVerboseTracing.Get(&clusterSettings.SV) { - opts = append(opts, tracing.WithRecording(tracingpb.RecordingVerbose)) - } - ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "backupccl.ExportRequest", opts...) - rawResp, pErr = kv.SendWrappedWithAdmission( - ctx, flowCtx.Cfg.DB.KV().NonTransactionalSender(), header, admissionHeader, req) - recording = exportSpan.FinishAndGetConfiguredRecording() - if pErr != nil { - return pErr.GoError() - } - return nil - }) - if exportRequestErr != nil { - if lockErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error()) - span = spanAndTime{} - continue + header := kvpb.Header{ + // We set the DistSender response target bytes field to a sentinel + // value. The sentinel value of 1 forces the ExportRequest to paginate + // after creating a single SST. + TargetBytes: 1, + Timestamp: span.end, + ReturnElasticCPUResumeSpans: true, } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*timeutil.TimeoutError)(nil)) { - if recording != nil { - log.Errorf(ctx, "failed export request for span %s\n trace:\n%s", span.span, recording) - } - return errors.Wrap(exportRequestErr, "export request timeout") + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error } - // BatchTimestampBeforeGCError is returned if the ExportRequest - // attempts to read below the range's GC threshold. - if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*kvpb.BatchTimestampBeforeGCError); ok { - // If the range we are exporting is marked to be excluded from - // backup, it is safe to ignore the error. It is likely that the - // table has been configured with a low GC TTL, and so the data - // the backup is targeting has already been gc'ed. - if batchTimestampBeforeGCError.DataExcludedFromBackup { + + admissionHeader := kvpb.AdmissionHeader{ + // Export requests are currently assigned BulkNormalPri. + // + // TODO(dt): Consider linking this to/from the UserPriority field. + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: kvpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + } + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawResp kvpb.Response + var recording tracingpb.Recording + var pErr *kvpb.Error + requestSentAt := timeutil.Now() + exportRequestErr := timeutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + sp := tracing.SpanFromContext(ctx) + opts := make([]tracing.SpanOption, 0) + opts = append(opts, tracing.WithParent(sp)) + if sendExportRequestWithVerboseTracing.Get(&clusterSettings.SV) { + opts = append(opts, tracing.WithRecording(tracingpb.RecordingVerbose)) + } + ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "backupccl.ExportRequest", opts...) + rawResp, pErr = kv.SendWrappedWithAdmission( + ctx, flowCtx.Cfg.DB.KV().NonTransactionalSender(), header, admissionHeader, req) + recording = exportSpan.FinishAndGetConfiguredRecording() + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if lockErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- []spanAndTime{span} + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error()) span = spanAndTime{} continue } - } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*timeutil.TimeoutError)(nil)) { + if recording != nil { + log.Errorf(ctx, "failed export request for span %s\n trace:\n%s", span.span, recording) + } + return errors.Wrap(exportRequestErr, "export request timeout") + } + // BatchTimestampBeforeGCError is returned if the ExportRequest + // attempts to read below the range's GC threshold. + if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*kvpb.BatchTimestampBeforeGCError); ok { + // If the range we are exporting is marked to be excluded from + // backup, it is safe to ignore the error. It is likely that the + // table has been configured with a low GC TTL, and so the data + // the backup is targeting has already been gc'ed. + if batchTimestampBeforeGCError.DataExcludedFromBackup { + span = spanAndTime{} + continue + } + } - if recording != nil { - log.Errorf(ctx, "failed export request %s\n trace:\n%s", span.span, recording) + if recording != nil { + log.Errorf(ctx, "failed export request %s\n trace:\n%s", span.span, recording) + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - resp := rawResp.(*kvpb.ExportResponse) + resp := rawResp.(*kvpb.ExportResponse) - // If the reply has a resume span, we process it immediately. - var resumeSpan spanAndTime - if resp.ResumeSpan != nil { - if !resp.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) - } + // If the reply has a resume span, we process it immediately. + var resumeSpan spanAndTime + if resp.ResumeSpan != nil { + if !resp.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) + } - resumeTS := hlc.Timestamp{} - // Taking resume timestamp from the last file of response since files must - // always be consecutive even if we currently expect only one. - if fileCount := len(resp.Files); fileCount > 0 { - resumeTS = resp.Files[fileCount-1].EndKeyTS - } - resumeSpan = spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + resumeTS := hlc.Timestamp{} + // Taking resume timestamp from the last file of response since files must + // always be consecutive even if we currently expect only one. + if fileCount := len(resp.Files); fileCount > 0 { + resumeTS = resp.Files[fileCount-1].EndKeyTS + } + resumeSpan = spanAndTime{ + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } } - } - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + } } - } - - var completedSpans int32 - if resp.ResumeSpan == nil { - completedSpans = 1 - } - if len(resp.Files) > 1 { - log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") - } + var completedSpans int32 + if span.finishesSpec && resp.ResumeSpan == nil { + completedSpans = 1 + } - // Even if the ExportRequest did not export any data we want to report - // the span as completed for accurate progress tracking. - if len(resp.Files) == 0 { - sink.writeWithNoData(exportedSpan{completedSpans: completedSpans}) - } - for i, file := range resp.Files { - entryCounts := countRows(file.Exported, spec.PKIDs) - - ret := exportedSpan{ - // BackupManifest_File just happens to contain the exact fields - // to store the metadata we need, but there's no actual File - // on-disk anywhere yet. - metadata: backuppb.BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: entryCounts, - LocalityKV: destLocalityKV, - }, - dataSST: file.SST, - revStart: resp.StartTime, - atKeyBoundary: file.EndKeyTS.IsEmpty()} - if span.start != spec.BackupStartTime { - ret.metadata.StartTime = span.start - ret.metadata.EndTime = span.end + if len(resp.Files) > 1 { + log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } - // If multiple files were returned for this span, only one -- the - // last -- should count as completing the requested span. - if i == len(resp.Files)-1 { - ret.completedSpans = completedSpans + + // Even if the ExportRequest did not export any data we want to report + // the span as completed for accurate progress tracking. + if len(resp.Files) == 0 { + sink.writeWithNoData(exportedSpan{completedSpans: completedSpans}) } + for i, file := range resp.Files { + entryCounts := countRows(file.Exported, spec.PKIDs) + + ret := exportedSpan{ + // BackupManifest_File just happens to contain the exact fields + // to store the metadata we need, but there's no actual File + // on-disk anywhere yet. + metadata: backuppb.BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: entryCounts, + LocalityKV: destLocalityKV, + }, + dataSST: file.SST, + revStart: resp.StartTime, + atKeyBoundary: file.EndKeyTS.IsEmpty()} + if span.start != spec.BackupStartTime { + ret.metadata.StartTime = span.start + ret.metadata.EndTime = span.end + } + // If multiple files were returned for this span, only one -- the + // last -- should count as completing the requested span. + if i == len(resp.Files)-1 { + ret.completedSpans = completedSpans + } - if err := sink.write(ctx, ret); err != nil { - return err + if err := sink.write(ctx, ret); err != nil { + return err + } } + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, requestSentAt) + span = resumeSpan } - // Emit the stats for the processed ExportRequest. - recordExportStats(backupProcessorSpan, resp, requestSentAt) - span = resumeSpan } default: // No work left to do, so we can exit. Note that another worker could diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 5df0a91f89ad..b104250ad5c3 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -3906,21 +3906,23 @@ func TestBackupRestoreChecksum(t *testing.T) { } } - // Corrupt one of the files in the backup. - f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[0].Path), os.O_WRONLY, 0) - if err != nil { - t.Fatalf("%+v", err) - } - defer f.Close() - // mess with some bytes. - if _, err := f.Seek(-65, io.SeekEnd); err != nil { - t.Fatalf("%+v", err) - } - if _, err := f.Write([]byte{'1', '2', '3'}); err != nil { - t.Fatalf("%+v", err) - } - if err := f.Sync(); err != nil { - t.Fatalf("%+v", err) + // Corrupt all of the files in the backup. + for i := range backupManifest.Files { + f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[i].Path), os.O_WRONLY, 0) + if err != nil { + t.Fatalf("%+v", err) + } + defer f.Close() + // mess with some bytes. + if _, err := f.Seek(-65, io.SeekEnd); err != nil { + t.Fatalf("%+v", err) + } + if _, err := f.Write([]byte{'1', '2', '3'}); err != nil { + t.Fatalf("%+v", err) + } + if err := f.Sync(); err != nil { + t.Fatalf("%+v", err) + } } sqlDB.Exec(t, `DROP TABLE data.bank`) @@ -11194,7 +11196,7 @@ func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE data2") sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')") files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'") - require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals + require.GreaterOrEqual(t, len(files), 11) // 1 file for full + 10 for 10 incrementals // Assert that the restore processor is processing the same span multiple // times, and the count is based on what's expected from the memory budget.