Skip to content

Commit

Permalink
Merge pull request #65802 from pbardea/backport21.1-65797
Browse files Browse the repository at this point in the history
release-21.1: backupccl: fix bug in failing backups
  • Loading branch information
pbardea authored May 31, 2021
2 parents bdffce4 + 2009843 commit 587aaed
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 34 deletions.
30 changes: 15 additions & 15 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func backup(
spans := filterSpans(backupManifest.Spans, completedSpans)
introducedSpans := filterSpans(backupManifest.IntroducedSpans, completedIntroducedSpans)

g := ctxgroup.WithContext(ctx)
pkIDs := make(map[uint64]bool)
for i := range backupManifest.Descriptors {
if t, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); t != nil {
Expand Down Expand Up @@ -228,17 +227,18 @@ func backup(
progressLogger := jobs.NewChunkProgressLogger(job, numTotalSpans, job.FractionCompleted(), jobs.ProgressUpdateOnly)

requestFinishedCh := make(chan struct{}, numTotalSpans) // enough buffer to never block
var jobProgressLoop func(ctx context.Context) error
if numTotalSpans > 0 {
g.GoCtx(func(ctx context.Context) error {
jobProgressLoop = func(ctx context.Context) error {
// Currently the granularity of backup progress is the % of spans
// exported. Would improve accuracy if we tracked the actual size of each
// file.
return progressLogger.Loop(ctx, requestFinishedCh)
})
}
}

progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
g.GoCtx(func(ctx context.Context) error {
checkpointLoop := func(ctx context.Context) error {
// When a processor is done exporting a span, it will send a progress update
// to progCh.
for progress := range progCh {
Expand Down Expand Up @@ -268,20 +268,20 @@ func backup(
}
}
return nil
})
}

if err := distBackup(
ctx,
execCtx,
planCtx,
dsp,
progCh,
backupSpecs,
); err != nil {
return RowCount{}, err
runBackup := func(ctx context.Context) error {
return distBackup(
ctx,
execCtx,
planCtx,
dsp,
progCh,
backupSpecs,
)
}

if err := g.Wait(); err != nil {
if err := ctxgroup.GoAndWait(ctx, jobProgressLoop, checkpointLoop, runBackup); err != nil {
return RowCount{}, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans))
}

Expand Down
35 changes: 16 additions & 19 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,6 @@ func restore(
}
})

g := ctxgroup.WithContext(restoreCtx)

// TODO(pbardea): This not super principled. I just wanted something that
// wasn't a constant and grew slower than linear with the length of
// importSpans. It seems to be working well for BenchmarkRestore2TB but
Expand All @@ -706,15 +704,15 @@ func restore(
}

requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
g.GoCtx(func(ctx context.Context) error {
jobProgressLoop := func(ctx context.Context) error {
ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log")
defer progressSpan.Finish()
return progressLogger.Loop(ctx, requestFinishedCh)
})
}

progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

g.GoCtx(func(ctx context.Context) error {
jobCheckpointLoop := func(ctx context.Context) error {
// When a processor is done importing a span, it will send a progress update
// to progCh.
for progress := range progCh {
Expand Down Expand Up @@ -745,23 +743,22 @@ func restore(
requestFinishedCh <- struct{}{}
}
return nil
})
}

// TODO(pbardea): Improve logging in processors.
if err := distRestore(
restoreCtx,
execCtx,
importSpanChunks,
dataToRestore.getPKIDs(),
encryption,
dataToRestore.getRekeys(),
endTime,
progCh,
); err != nil {
return emptyRowCount, err
runRestore := func(ctx context.Context) error {
return distRestore(
ctx,
execCtx,
importSpanChunks,
dataToRestore.getPKIDs(),
encryption,
dataToRestore.getRekeys(),
endTime,
progCh,
)
}

if err := g.Wait(); err != nil {
if err := ctxgroup.GoAndWait(restoreCtx, jobProgressLoop, jobCheckpointLoop, runRestore); err != nil {
// This leaves the data that did get imported in case the user wants to
// retry.
// TODO(dan): Build tooling to allow a user to restart a failed restore.
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/ctxgroup/ctxgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,16 @@ func GroupWorkers(ctx context.Context, num int, f func(context.Context, int) err
}
return group.Wait()
}

// GoAndWait calls the given functions each in a new goroutine. It then Waits
// for them to finish. This is intended to help prevent bugs caused by returning
// early after running some goroutines but before Waiting for them to complete.
func GoAndWait(ctx context.Context, fs ...func(ctx context.Context) error) error {
group := WithContext(ctx)
for _, f := range fs {
if f != nil {
group.GoCtx(f)
}
}
return group.Wait()
}

0 comments on commit 587aaed

Please sign in to comment.