From dcad601db980aea8bbdf86c1378b3d7f143bf4cd Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 10 Nov 2023 20:54:05 +0000 Subject: [PATCH 1/2] backup: split request spans to be range sized Backup processors are assigned spans -- which are produced by the SQL planning function PartitionSpans - which they must backup, by reading content of that span using some number of paginated ExportRequests and then writing that content to the assigned destination. Typically each export request sent by a backup processor is expected to be served by approximately one range: it sends the request to the whole span it is trying to export, distsender sends it to first range it overlaps, that range reads until it hits the pagination limit, then distsender returns its result and the processor does this again starting the span from the resume key. Since each request does a range's worth of work, the backup processor can assume it should, if things are normal and healthy in the cluster, return its result within a short amount of time. This is often a second or less, or perhaps a few seconds if it had to wait in queues. As such, the backup processor imposes a 5 minute timeout on these requests, as a single request not returning in this duration indicates something is not normal and healthy in the cluster, and the backup cannot expect to make process until that is resolved. However this logic does not hold if a single request, subject to this timeout, ends up doing substantially more work. This however can happen if that request has a span large than a single range _and_ the ranges in that span are empty and/or don't contain data matching the predicate of the request. In such cases, the request would be sent to one range, it would process it, but since it returns zero results, the pagination limit would not be hit and the request would then continue on to be sent to another range, and another, etc until it either reaches the end of the requested span or finally finds results that hit the pagination limit. If neither of these happen, it could end up hitting the timeout, that was imposed as a limit that should never be hit by doing a single range's worth of work, because we are in fact doing many range's worth of work. This change pre-splits the spans that we need to export into subspans that we will send requests to, so that each sub-span is the size of one range. It is OK if the actual ranges below these requests end up splitting or merging, as this splitting has simply ensured that each request corresponds to "a range's worth of work" which is should as it was at the splitting time a range. By doing this, we should be able to assume that all requests are expected to complete, if the cluster is healthy, within the 5min timeout. Release note: none. Epic: none. --- pkg/ccl/backupccl/backup_processor.go | 77 +++++++++++++++++++-------- pkg/ccl/backupccl/backup_test.go | 2 +- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 13d6e53ea608..ca18a04ace6e 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -87,6 +87,13 @@ var ( settings.WithName("bulkio.backup.split_keys_on_timestamps.enabled"), ) + preSplitExports = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "bulkio.backup.presplit_request_spans.enabled", + "split the spans that will be requests before requesting them", + util.ConstantWithMetamorphicTestBool("backup-presplit-spans", true), + ) + sendExportRequestWithVerboseTracing = settings.RegisterBoolSetting( settings.ApplicationLevel, "bulkio.backup.export_request_verbose_tracing", @@ -284,13 +291,12 @@ func (bp *backupDataProcessor) ConsumerClosed() { } type spanAndTime struct { - // spanIdx is a unique identifier of this object. - spanIdx int - span roachpb.Span - firstKeyTS hlc.Timestamp - start, end hlc.Timestamp - attempts int - lastTried time.Time + span roachpb.Span + firstKeyTS hlc.Timestamp + start, end hlc.Timestamp + attempts int + lastTried time.Time + finishesSpec bool } type exportedSpan struct { @@ -312,21 +318,50 @@ func runBackupProcessor( clusterSettings := flowCtx.Cfg.Settings totalSpans := len(spec.Spans) + len(spec.IntroducedSpans) - todo := make(chan spanAndTime, totalSpans) - var spanIdx int - for _, s := range spec.IntroducedSpans { - todo <- spanAndTime{ - spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: hlc.Timestamp{}, - end: spec.BackupStartTime, + requestSpans := make([]spanAndTime, 0, totalSpans) + rangeSizedSpans := preSplitExports.Get(&flowCtx.EvalCtx.Settings.SV) + + splitSpans := func(spans []roachpb.Span, start, end hlc.Timestamp) error { + for _, fullSpan := range spans { + remainingSpan := fullSpan + + if rangeSizedSpans { + rdi, err := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig).RangeDescIteratorFactory.NewIterator(ctx, fullSpan) + if err != nil { + return err + } + for ; rdi.Valid(); rdi.Next() { + rangeDesc := rdi.CurRangeDescriptor() + rangeSpan := roachpb.Span{Key: rangeDesc.StartKey.AsRawKey(), EndKey: rangeDesc.EndKey.AsRawKey()} + subspan := remainingSpan.Intersect(rangeSpan) + if !subspan.Valid() { + return errors.AssertionFailedf("%s not in %s of %s", rangeSpan, remainingSpan, fullSpan) + } + requestSpans = append(requestSpans, spanAndTime{span: subspan, start: start, end: end}) + remainingSpan.Key = subspan.EndKey + } + } + + if remainingSpan.Valid() { + requestSpans = append(requestSpans, spanAndTime{span: remainingSpan, start: start, end: end}) + } + requestSpans[len(requestSpans)-1].finishesSpec = true } - spanIdx++ + return nil } - for _, s := range spec.Spans { - todo <- spanAndTime{ - spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: spec.BackupStartTime, - end: spec.BackupEndTime, - } - spanIdx++ + + if err := splitSpans(spec.IntroducedSpans, hlc.Timestamp{}, spec.BackupStartTime); err != nil { + return err + } + if err := splitSpans(spec.Spans, spec.BackupStartTime, spec.BackupEndTime); err != nil { + return err + } + + log.Infof(ctx, "backup processor is assigned %d spans covering %d ranges", totalSpans, len(requestSpans)) + + todo := make(chan spanAndTime, len(requestSpans)) + for i := range requestSpans { + todo <- requestSpans[i] } destURI := spec.DefaultURI @@ -575,7 +610,7 @@ func runBackupProcessor( } var completedSpans int32 - if resp.ResumeSpan == nil { + if span.finishesSpec && resp.ResumeSpan == nil { completedSpans = 1 } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 5df0a91f89ad..046f2620c266 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -11194,7 +11194,7 @@ func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE data2") sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')") files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'") - require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals + require.GreaterOrEqual(t, len(files), 11) // 1 file for full + 10 for 10 incrementals // Assert that the restore processor is processing the same span multiple // times, and the count is based on what's expected from the memory budget. From 12627e25e19e8153c46477c1a9d689dd3f6cb21c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 20 Nov 2023 13:37:07 +0000 Subject: [PATCH 2/2] backup: group todo spans into chunks Having a single worker handle sequential spans when backing up allows that worker to append the results to its output, producing output that is also sequential and importantly non-overlapping with other workers, which will allow for reducing the metadata required to track the unique output spans. Release note: none. Epic: none. --- pkg/ccl/backupccl/backup_processor.go | 405 ++++++++++++++------------ pkg/ccl/backupccl/backup_test.go | 32 +- 2 files changed, 231 insertions(+), 206 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index ca18a04ace6e..8f38887f8d86 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -359,11 +359,6 @@ func runBackupProcessor( log.Infof(ctx, "backup processor is assigned %d spans covering %d ranges", totalSpans, len(requestSpans)) - todo := make(chan spanAndTime, len(requestSpans)) - for i := range requestSpans { - todo <- requestSpans[i] - } - destURI := spec.DefaultURI var destLocalityKV string @@ -424,6 +419,32 @@ func runBackupProcessor( log.Infof(ctx, "starting %d backup export workers", numSenders) defer release() + todo := make(chan []spanAndTime, len(requestSpans)) + + const maxChunkSize = 100 + // Aim to make at least 4 chunks per worker, ensuring size is >=1 and <= max. + chunkSize := (len(requestSpans) / (numSenders * 4)) + 1 + if chunkSize > maxChunkSize { + chunkSize = maxChunkSize + } + + chunk := make([]spanAndTime, 0, chunkSize) + for i := range requestSpans { + if !rangeSizedSpans { + todo <- []spanAndTime{requestSpans[i]} + continue + } + + chunk = append(chunk, requestSpans[i]) + if len(chunk) > chunkSize { + todo <- chunk + chunk = make([]spanAndTime, 0, chunkSize) + } + } + if len(chunk) > 0 { + todo <- chunk + } + return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { readTime := spec.BackupEndTime.GoTime() sink := makeFileSSTSink(sinkConf, storage) @@ -445,217 +466,219 @@ func runBackupProcessor( select { case <-ctxDone: return ctx.Err() - case span := <-todo: - for len(span.span.Key) != 0 { - splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) - // If we started splitting already, we must continue until we reach the end - // of split span. - if !span.firstKeyTS.IsEmpty() { - splitMidKey = true - } - - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - SplitMidKey: splitMidKey, - } + case spans := <-todo: + for _, span := range spans { + for len(span.span.Key) != 0 { + splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) + // If we started splitting already, we must continue until we reach the end + // of split span. + if !span.firstKeyTS.IsEmpty() { + splitMidKey = true + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttempt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-ctxDone: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + SplitMidKey: splitMidKey, } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttempt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - header := kvpb.Header{ - // We set the DistSender response target bytes field to a sentinel - // value. The sentinel value of 1 forces the ExportRequest to paginate - // after creating a single SST. - TargetBytes: 1, - Timestamp: span.end, - ReturnElasticCPUResumeSpans: true, - } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - admissionHeader := kvpb.AdmissionHeader{ - // Export requests are currently assigned BulkNormalPri. - // - // TODO(dt): Consider linking this to/from the UserPriority field. - Priority: int32(admissionpb.BulkNormalPri), - CreateTime: timeutil.Now().UnixNano(), - Source: kvpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - } - log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawResp kvpb.Response - var recording tracingpb.Recording - var pErr *kvpb.Error - requestSentAt := timeutil.Now() - exportRequestErr := timeutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - sp := tracing.SpanFromContext(ctx) - opts := make([]tracing.SpanOption, 0) - opts = append(opts, tracing.WithParent(sp)) - if sendExportRequestWithVerboseTracing.Get(&clusterSettings.SV) { - opts = append(opts, tracing.WithRecording(tracingpb.RecordingVerbose)) - } - ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "backupccl.ExportRequest", opts...) - rawResp, pErr = kv.SendWrappedWithAdmission( - ctx, flowCtx.Cfg.DB.KV().NonTransactionalSender(), header, admissionHeader, req) - recording = exportSpan.FinishAndGetConfiguredRecording() - if pErr != nil { - return pErr.GoError() - } - return nil - }) - if exportRequestErr != nil { - if lockErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error()) - span = spanAndTime{} - continue + header := kvpb.Header{ + // We set the DistSender response target bytes field to a sentinel + // value. The sentinel value of 1 forces the ExportRequest to paginate + // after creating a single SST. + TargetBytes: 1, + Timestamp: span.end, + ReturnElasticCPUResumeSpans: true, } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*timeutil.TimeoutError)(nil)) { - if recording != nil { - log.Errorf(ctx, "failed export request for span %s\n trace:\n%s", span.span, recording) - } - return errors.Wrap(exportRequestErr, "export request timeout") + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error + } + + admissionHeader := kvpb.AdmissionHeader{ + // Export requests are currently assigned BulkNormalPri. + // + // TODO(dt): Consider linking this to/from the UserPriority field. + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: kvpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, } - // BatchTimestampBeforeGCError is returned if the ExportRequest - // attempts to read below the range's GC threshold. - if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*kvpb.BatchTimestampBeforeGCError); ok { - // If the range we are exporting is marked to be excluded from - // backup, it is safe to ignore the error. It is likely that the - // table has been configured with a low GC TTL, and so the data - // the backup is targeting has already been gc'ed. - if batchTimestampBeforeGCError.DataExcludedFromBackup { + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawResp kvpb.Response + var recording tracingpb.Recording + var pErr *kvpb.Error + requestSentAt := timeutil.Now() + exportRequestErr := timeutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + sp := tracing.SpanFromContext(ctx) + opts := make([]tracing.SpanOption, 0) + opts = append(opts, tracing.WithParent(sp)) + if sendExportRequestWithVerboseTracing.Get(&clusterSettings.SV) { + opts = append(opts, tracing.WithRecording(tracingpb.RecordingVerbose)) + } + ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "backupccl.ExportRequest", opts...) + rawResp, pErr = kv.SendWrappedWithAdmission( + ctx, flowCtx.Cfg.DB.KV().NonTransactionalSender(), header, admissionHeader, req) + recording = exportSpan.FinishAndGetConfiguredRecording() + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if lockErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- []spanAndTime{span} + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error()) span = spanAndTime{} continue } - } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*timeutil.TimeoutError)(nil)) { + if recording != nil { + log.Errorf(ctx, "failed export request for span %s\n trace:\n%s", span.span, recording) + } + return errors.Wrap(exportRequestErr, "export request timeout") + } + // BatchTimestampBeforeGCError is returned if the ExportRequest + // attempts to read below the range's GC threshold. + if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*kvpb.BatchTimestampBeforeGCError); ok { + // If the range we are exporting is marked to be excluded from + // backup, it is safe to ignore the error. It is likely that the + // table has been configured with a low GC TTL, and so the data + // the backup is targeting has already been gc'ed. + if batchTimestampBeforeGCError.DataExcludedFromBackup { + span = spanAndTime{} + continue + } + } - if recording != nil { - log.Errorf(ctx, "failed export request %s\n trace:\n%s", span.span, recording) + if recording != nil { + log.Errorf(ctx, "failed export request %s\n trace:\n%s", span.span, recording) + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - resp := rawResp.(*kvpb.ExportResponse) + resp := rawResp.(*kvpb.ExportResponse) - // If the reply has a resume span, we process it immediately. - var resumeSpan spanAndTime - if resp.ResumeSpan != nil { - if !resp.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) - } + // If the reply has a resume span, we process it immediately. + var resumeSpan spanAndTime + if resp.ResumeSpan != nil { + if !resp.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) + } - resumeTS := hlc.Timestamp{} - // Taking resume timestamp from the last file of response since files must - // always be consecutive even if we currently expect only one. - if fileCount := len(resp.Files); fileCount > 0 { - resumeTS = resp.Files[fileCount-1].EndKeyTS - } - resumeSpan = spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + resumeTS := hlc.Timestamp{} + // Taking resume timestamp from the last file of response since files must + // always be consecutive even if we currently expect only one. + if fileCount := len(resp.Files); fileCount > 0 { + resumeTS = resp.Files[fileCount-1].EndKeyTS + } + resumeSpan = spanAndTime{ + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } } - } - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + } } - } - var completedSpans int32 - if span.finishesSpec && resp.ResumeSpan == nil { - completedSpans = 1 - } - - if len(resp.Files) > 1 { - log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") - } + var completedSpans int32 + if span.finishesSpec && resp.ResumeSpan == nil { + completedSpans = 1 + } - // Even if the ExportRequest did not export any data we want to report - // the span as completed for accurate progress tracking. - if len(resp.Files) == 0 { - sink.writeWithNoData(exportedSpan{completedSpans: completedSpans}) - } - for i, file := range resp.Files { - entryCounts := countRows(file.Exported, spec.PKIDs) - - ret := exportedSpan{ - // BackupManifest_File just happens to contain the exact fields - // to store the metadata we need, but there's no actual File - // on-disk anywhere yet. - metadata: backuppb.BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: entryCounts, - LocalityKV: destLocalityKV, - }, - dataSST: file.SST, - revStart: resp.StartTime, - atKeyBoundary: file.EndKeyTS.IsEmpty()} - if span.start != spec.BackupStartTime { - ret.metadata.StartTime = span.start - ret.metadata.EndTime = span.end + if len(resp.Files) > 1 { + log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } - // If multiple files were returned for this span, only one -- the - // last -- should count as completing the requested span. - if i == len(resp.Files)-1 { - ret.completedSpans = completedSpans + + // Even if the ExportRequest did not export any data we want to report + // the span as completed for accurate progress tracking. + if len(resp.Files) == 0 { + sink.writeWithNoData(exportedSpan{completedSpans: completedSpans}) } + for i, file := range resp.Files { + entryCounts := countRows(file.Exported, spec.PKIDs) + + ret := exportedSpan{ + // BackupManifest_File just happens to contain the exact fields + // to store the metadata we need, but there's no actual File + // on-disk anywhere yet. + metadata: backuppb.BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: entryCounts, + LocalityKV: destLocalityKV, + }, + dataSST: file.SST, + revStart: resp.StartTime, + atKeyBoundary: file.EndKeyTS.IsEmpty()} + if span.start != spec.BackupStartTime { + ret.metadata.StartTime = span.start + ret.metadata.EndTime = span.end + } + // If multiple files were returned for this span, only one -- the + // last -- should count as completing the requested span. + if i == len(resp.Files)-1 { + ret.completedSpans = completedSpans + } - if err := sink.write(ctx, ret); err != nil { - return err + if err := sink.write(ctx, ret); err != nil { + return err + } } + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, requestSentAt) + span = resumeSpan } - // Emit the stats for the processed ExportRequest. - recordExportStats(backupProcessorSpan, resp, requestSentAt) - span = resumeSpan } default: // No work left to do, so we can exit. Note that another worker could diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 046f2620c266..b104250ad5c3 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -3906,21 +3906,23 @@ func TestBackupRestoreChecksum(t *testing.T) { } } - // Corrupt one of the files in the backup. - f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[0].Path), os.O_WRONLY, 0) - if err != nil { - t.Fatalf("%+v", err) - } - defer f.Close() - // mess with some bytes. - if _, err := f.Seek(-65, io.SeekEnd); err != nil { - t.Fatalf("%+v", err) - } - if _, err := f.Write([]byte{'1', '2', '3'}); err != nil { - t.Fatalf("%+v", err) - } - if err := f.Sync(); err != nil { - t.Fatalf("%+v", err) + // Corrupt all of the files in the backup. + for i := range backupManifest.Files { + f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[i].Path), os.O_WRONLY, 0) + if err != nil { + t.Fatalf("%+v", err) + } + defer f.Close() + // mess with some bytes. + if _, err := f.Seek(-65, io.SeekEnd); err != nil { + t.Fatalf("%+v", err) + } + if _, err := f.Write([]byte{'1', '2', '3'}); err != nil { + t.Fatalf("%+v", err) + } + if err := f.Sync(); err != nil { + t.Fatalf("%+v", err) + } } sqlDB.Exec(t, `DROP TABLE data.bank`)