Skip to content

Commit

Permalink
backupccl: immediately process resume spans during backup
Browse files Browse the repository at this point in the history
Previously, we put resume spans returned from an export request back
on the queue for processing. In a large cluster with a lot of work to
do, this might result in the resume span being processed much later.
This isn't great because (1) it means we don't get to take advantage
of disk and block caching and (2) it means that the resume span has a
smaller chance of ending up in the same SST as the original span.

Release note: None
  • Loading branch information
stevendanna committed Aug 18, 2022
1 parent 04c6a1a commit 4f0af41
Showing 1 changed file with 175 additions and 170 deletions.
345 changes: 175 additions & 170 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,11 @@ func runBackupProcessor(

grp := ctxgroup.WithContext(ctx)
// Start a goroutine that will then start a group of goroutines which each
// pull spans off of `todo` and send export requests. Any resume spans are put
// back on `todo`. Any returned SSTs are put on a `returnedSpansChan` to be routed
// to a buffered sink that merges them until they are large enough to flush.
// pull spans off of `todo` and send export requests. Any spans that encounter
// write intent errors during Export are put back on the todo queue for later
// processing. Any returned SSTs are put on a `returnedSpansChan` to be
// routed to a buffered sink that merges them until they are large enough to
// flush.
grp.GoCtx(func(ctx context.Context) error {
defer close(returnedSpansChan)
// TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or
Expand All @@ -330,194 +332,197 @@ func runBackupProcessor(
case <-ctxDone:
return ctx.Err()
case span := <-todo:
header := roachpb.Header{Timestamp: span.end}

splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
// of split span.
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility.
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
ReturnSST: true,
SplitMidKey: splitMidKey,
}
for len(span.span.Key) != 0 {
header := roachpb.Header{Timestamp: span.end}

splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
// of split span.
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

// If we're doing re-attempts but are not yet in the priority regime,
// check to see if it is time to switch to priority.
if !priority && span.attempts > 0 {
// Check if this is starting a new pass and we should delay first.
// We're okay with delaying this worker until then since we assume any
// other work it could pull off the queue will likely want to delay to
// a similar or later time anyway.
if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 {
timer.Reset(delay)
log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1)
select {
case <-ctxDone:
return ctx.Err()
case <-timer.C:
timer.Read = true
}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility.
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
ReturnSST: true,
SplitMidKey: splitMidKey,
}

priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV)
}
// If we're doing re-attempts but are not yet in the priority regime,
// check to see if it is time to switch to priority.
if !priority && span.attempts > 0 {
// Check if this is starting a new pass and we should delay first.
// We're okay with delaying this worker until then since we assume any
// other work it could pull off the queue will likely want to delay to
// a similar or later time anyway.
if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 {
timer.Reset(delay)
log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1)
select {
case <-ctxDone:
return ctx.Err()
case <-timer.C:
timer.Read = true
}
}

if priority {
// This re-attempt is reading far enough in the past that we just want
// to abort any transactions it hits.
header.UserPriority = roachpb.MaxUserPriority
} else {
// On the initial attempt to export this span and re-attempts that are
// done while it is still less than the configured time above the read
// time, we set WaitPolicy to Error, so that the export will return an
// error to us instead of instead doing blocking wait if it hits any
// other txns. This lets us move on to other ranges we have to export,
// provide an indication of why we're blocked, etc instead and come
// back to this range later.
header.WaitPolicy = lock.WaitPolicy_Error
}
priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV)
}

// We set the DistSender response target bytes field to a sentinel
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1
admissionHeader := roachpb.AdmissionHeader{
// Export requests are currently assigned NormalPri.
//
// TODO(dt): Consider linking this to/from the UserPriority field.
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
requestSentAt := timeutil.Now()
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
span.lastTried = timeutil.Now()
span.attempts++
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error())
continue
if priority {
// This re-attempt is reading far enough in the past that we just want
// to abort any transactions it hits.
header.UserPriority = roachpb.MaxUserPriority
} else {
// On the initial attempt to export this span and re-attempts that are
// done while it is still less than the configured time above the read
// time, we set WaitPolicy to Error, so that the export will return an
// error to us instead of instead doing blocking wait if it hits any
// other txns. This lets us move on to other ranges we have to export,
// provide an indication of why we're blocked, etc instead and come
// back to this range later.
header.WaitPolicy = lock.WaitPolicy_Error
}
// TimeoutError improves the opaque `context deadline exceeded` error
// message so use that instead.
if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) {
return errors.Wrap(exportRequestErr, "export request timeout")

// We set the DistSender response target bytes field to a sentinel
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1
admissionHeader := roachpb.AdmissionHeader{
// Export requests are currently assigned NormalPri.
//
// TODO(dt): Consider linking this to/from the UserPriority field.
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
// BatchTimestampBeforeGCError is returned if the ExportRequest
// attempts to read below the range's GC threshold.
if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok {
// If the range we are exporting is marked to be excluded from
// backup, it is safe to ignore the error. It is likely that the
// table has been configured with a low GC TTL, and so the data
// the backup is targeting has already been gc'ed.
if batchTimestampBeforeGCError.DataExcludedFromBackup {
log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
requestSentAt := timeutil.Now()
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
span.lastTried = timeutil.Now()
span.attempts++
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error())
span = spanAndTime{}
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
// message so use that instead.
if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) {
return errors.Wrap(exportRequestErr, "export request timeout")
}
// BatchTimestampBeforeGCError is returned if the ExportRequest
// attempts to read below the range's GC threshold.
if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok {
// If the range we are exporting is marked to be excluded from
// backup, it is safe to ignore the error. It is likely that the
// table has been configured with a low GC TTL, and so the data
// the backup is targeting has already been gc'ed.
if batchTimestampBeforeGCError.DataExcludedFromBackup {
span = spanAndTime{}
continue
}
}
return errors.Wrapf(exportRequestErr, "exporting %s", span.span)
}
return errors.Wrapf(exportRequestErr, "exporting %s", span.span)
}

resp := rawResp.(*roachpb.ExportResponse)
resp := rawResp.(*roachpb.ExportResponse)

// If the reply has a resume span, put the remaining span on
// todo to be picked up again in the next round.
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}
// If the reply has a resume span, we process it immediately.
var resumeSpan spanAndTime
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}

resumeTS := hlc.Timestamp{}
// Taking resume timestamp from the last file of response since files must
// always be consecutive even if we currently expect only one.
if fileCount := len(resp.Files); fileCount > 0 {
resumeTS = resp.Files[fileCount-1].EndKeyTS
}
resumeSpan := spanAndTime{
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
resumeTS := hlc.Timestamp{}
// Taking resume timestamp from the last file of response since files must
// always be consecutive even if we currently expect only one.
if fileCount := len(resp.Files); fileCount > 0 {
resumeTS = resp.Files[fileCount-1].EndKeyTS
}
resumeSpan = spanAndTime{
span: *resp.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
}
}
todo <- resumeSpan
}

if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if backupKnobs.RunAfterExportingSpanEntry != nil {
backupKnobs.RunAfterExportingSpanEntry(ctx, resp)
if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if backupKnobs.RunAfterExportingSpanEntry != nil {
backupKnobs.RunAfterExportingSpanEntry(ctx, resp)
}
}
}

var completedSpans int32
if resp.ResumeSpan == nil {
completedSpans = 1
}

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
// to store the metadata we need, but there's no actual File
// on-disk anywhere yet.
metadata: backuppb.BackupManifest_File{
Span: file.Span,
Path: file.Path,
EntryCounts: entryCounts,
},
dataSST: file.SST,
revStart: resp.StartTime,
atKeyBoundary: file.EndKeyTS.IsEmpty()}
if span.start != spec.BackupStartTime {
ret.metadata.StartTime = span.start
ret.metadata.EndTime = span.end
}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(resp.Files)-1 {
ret.completedSpans = completedSpans
var completedSpans int32
if resp.ResumeSpan == nil {
completedSpans = 1
}
select {
case returnedSpansChan <- ret:
case <-ctxDone:
return ctx.Err()

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}
}

// Emit the stats for the processed ExportRequest.
recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt))
for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
// to store the metadata we need, but there's no actual File
// on-disk anywhere yet.
metadata: backuppb.BackupManifest_File{
Span: file.Span,
Path: file.Path,
EntryCounts: entryCounts,
},
dataSST: file.SST,
revStart: resp.StartTime,
atKeyBoundary: file.EndKeyTS.IsEmpty()}
if span.start != spec.BackupStartTime {
ret.metadata.StartTime = span.start
ret.metadata.EndTime = span.end
}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(resp.Files)-1 {
ret.completedSpans = completedSpans
}
select {
case returnedSpansChan <- ret:
case <-ctxDone:
return ctx.Err()
}
}

// Emit the stats for the processed ExportRequest.
recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt))
span = resumeSpan
}
default:
// No work left to do, so we can exit. Note that another worker could
// still be running and may still push new work (a retry) on to todo but
Expand Down

0 comments on commit 4f0af41

Please sign in to comment.