From 4f0af41a763f24a95eb60a862f62225bef81bc40 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 11 Aug 2022 16:56:45 +0100 Subject: [PATCH] backupccl: immediately process resume spans during backup Previously, we put resume spans returned from an export request back on the queue for processing. In a large cluster with a lot of work to do, this might result in the resume span being processed much later. This isn't great because (1) it means we don't get to take advantage of disk and block caching and (2) it means that the resume span has a smaller chance of ending up in the same SST as the original span. Release note: None --- pkg/ccl/backupccl/backup_processor.go | 345 +++++++++++++------------- 1 file changed, 175 insertions(+), 170 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 0f6b11c3b970..4e46622bad4d 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -306,9 +306,11 @@ func runBackupProcessor( grp := ctxgroup.WithContext(ctx) // Start a goroutine that will then start a group of goroutines which each - // pull spans off of `todo` and send export requests. Any resume spans are put - // back on `todo`. Any returned SSTs are put on a `returnedSpansChan` to be routed - // to a buffered sink that merges them until they are large enough to flush. + // pull spans off of `todo` and send export requests. Any spans that encounter + // write intent errors during Export are put back on the todo queue for later + // processing. Any returned SSTs are put on a `returnedSpansChan` to be + // routed to a buffered sink that merges them until they are large enough to + // flush. grp.GoCtx(func(ctx context.Context) error { defer close(returnedSpansChan) // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or @@ -330,194 +332,197 @@ func runBackupProcessor( case <-ctxDone: return ctx.Err() case span := <-todo: - header := roachpb.Header{Timestamp: span.end} - - splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) - // If we started splitting already, we must continue until we reach the end - // of split span. - if !span.firstKeyTS.IsEmpty() { - splitMidKey = true - } - - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility. - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - ReturnSST: true, - SplitMidKey: splitMidKey, - } + for len(span.span.Key) != 0 { + header := roachpb.Header{Timestamp: span.end} + + splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) + // If we started splitting already, we must continue until we reach the end + // of split span. + if !span.firstKeyTS.IsEmpty() { + splitMidKey = true + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-ctxDone: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility. + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + ReturnSST: true, + SplitMidKey: splitMidKey, } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - // We set the DistSender response target bytes field to a sentinel - // value. The sentinel value of 1 forces the ExportRequest to paginate - // after creating a single SST. - header.TargetBytes = 1 - admissionHeader := roachpb.AdmissionHeader{ - // Export requests are currently assigned NormalPri. - // - // TODO(dt): Consider linking this to/from the UserPriority field. - Priority: int32(admissionpb.BulkNormalPri), - CreateTime: timeutil.Now().UnixNano(), - Source: roachpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - } - log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawResp roachpb.Response - var pErr *roachpb.Error - requestSentAt := timeutil.Now() - exportRequestErr := contextutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - rawResp, pErr = kv.SendWrappedWithAdmission( - ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - if pErr != nil { - return pErr.GoError() - } - return nil - }) - if exportRequestErr != nil { - if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) - continue + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { - return errors.Wrap(exportRequestErr, "export request timeout") + + // We set the DistSender response target bytes field to a sentinel + // value. The sentinel value of 1 forces the ExportRequest to paginate + // after creating a single SST. + header.TargetBytes = 1 + admissionHeader := roachpb.AdmissionHeader{ + // Export requests are currently assigned NormalPri. + // + // TODO(dt): Consider linking this to/from the UserPriority field. + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, } - // BatchTimestampBeforeGCError is returned if the ExportRequest - // attempts to read below the range's GC threshold. - if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok { - // If the range we are exporting is marked to be excluded from - // backup, it is safe to ignore the error. It is likely that the - // table has been configured with a low GC TTL, and so the data - // the backup is targeting has already been gc'ed. - if batchTimestampBeforeGCError.DataExcludedFromBackup { + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawResp roachpb.Response + var pErr *roachpb.Error + requestSentAt := timeutil.Now() + exportRequestErr := contextutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + rawResp, pErr = kv.SendWrappedWithAdmission( + ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- span + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) + span = spanAndTime{} continue } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { + return errors.Wrap(exportRequestErr, "export request timeout") + } + // BatchTimestampBeforeGCError is returned if the ExportRequest + // attempts to read below the range's GC threshold. + if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok { + // If the range we are exporting is marked to be excluded from + // backup, it is safe to ignore the error. It is likely that the + // table has been configured with a low GC TTL, and so the data + // the backup is targeting has already been gc'ed. + if batchTimestampBeforeGCError.DataExcludedFromBackup { + span = spanAndTime{} + continue + } + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - resp := rawResp.(*roachpb.ExportResponse) + resp := rawResp.(*roachpb.ExportResponse) - // If the reply has a resume span, put the remaining span on - // todo to be picked up again in the next round. - if resp.ResumeSpan != nil { - if !resp.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) - } + // If the reply has a resume span, we process it immediately. + var resumeSpan spanAndTime + if resp.ResumeSpan != nil { + if !resp.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) + } - resumeTS := hlc.Timestamp{} - // Taking resume timestamp from the last file of response since files must - // always be consecutive even if we currently expect only one. - if fileCount := len(resp.Files); fileCount > 0 { - resumeTS = resp.Files[fileCount-1].EndKeyTS - } - resumeSpan := spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + resumeTS := hlc.Timestamp{} + // Taking resume timestamp from the last file of response since files must + // always be consecutive even if we currently expect only one. + if fileCount := len(resp.Files); fileCount > 0 { + resumeTS = resp.Files[fileCount-1].EndKeyTS + } + resumeSpan = spanAndTime{ + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } } - todo <- resumeSpan - } - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + } } - } - var completedSpans int32 - if resp.ResumeSpan == nil { - completedSpans = 1 - } - - if len(resp.Files) > 1 { - log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") - } - - for i, file := range resp.Files { - entryCounts := countRows(file.Exported, spec.PKIDs) - - ret := exportedSpan{ - // BackupManifest_File just happens to contain the exact fields - // to store the metadata we need, but there's no actual File - // on-disk anywhere yet. - metadata: backuppb.BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: entryCounts, - }, - dataSST: file.SST, - revStart: resp.StartTime, - atKeyBoundary: file.EndKeyTS.IsEmpty()} - if span.start != spec.BackupStartTime { - ret.metadata.StartTime = span.start - ret.metadata.EndTime = span.end - } - // If multiple files were returned for this span, only one -- the - // last -- should count as completing the requested span. - if i == len(resp.Files)-1 { - ret.completedSpans = completedSpans + var completedSpans int32 + if resp.ResumeSpan == nil { + completedSpans = 1 } - select { - case returnedSpansChan <- ret: - case <-ctxDone: - return ctx.Err() + + if len(resp.Files) > 1 { + log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } - } - // Emit the stats for the processed ExportRequest. - recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) + for i, file := range resp.Files { + entryCounts := countRows(file.Exported, spec.PKIDs) + + ret := exportedSpan{ + // BackupManifest_File just happens to contain the exact fields + // to store the metadata we need, but there's no actual File + // on-disk anywhere yet. + metadata: backuppb.BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: entryCounts, + }, + dataSST: file.SST, + revStart: resp.StartTime, + atKeyBoundary: file.EndKeyTS.IsEmpty()} + if span.start != spec.BackupStartTime { + ret.metadata.StartTime = span.start + ret.metadata.EndTime = span.end + } + // If multiple files were returned for this span, only one -- the + // last -- should count as completing the requested span. + if i == len(resp.Files)-1 { + ret.completedSpans = completedSpans + } + select { + case returnedSpansChan <- ret: + case <-ctxDone: + return ctx.Err() + } + } + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) + span = resumeSpan + } default: // No work left to do, so we can exit. Note that another worker could // still be running and may still push new work (a retry) on to todo but