diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 60c82cd2e0b2..387d97e952da 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -110,6 +110,7 @@ go_library( "//pkg/util/metric", "//pkg/util/protoutil", "//pkg/util/retry", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index fb19fcd848d4..706668139313 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -158,11 +159,19 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { for range bp.progCh { } } - go func() { - defer cancel() - defer close(bp.progCh) + if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ + TaskName: "backup-worker", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh) - }() + cancel() + close(bp.progCh) + }); err != nil { + // The closure above hasn't run, so we have to do the cleanup. + bp.backupErr = err + cancel() + close(bp.progCh) + } } // Next is part of the RowSource interface.