diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index a6af7aa082cc..f856dd936452 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -255,12 +256,19 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) { cancel() <-workerDone } - go func() { - defer close(workerDone) - defer close(ssp.doneScatterCh) - defer cancel() - ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer) - }() + if err := ssp.flowCtx.Stopper().RunAsyncTaskEx(scatterCtx, stop.TaskOpts{ + TaskName: "splitAndScatter-worker", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { + ssp.scatterErr = runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer, ssp.doneScatterCh) + cancel() + close(ssp.doneScatterCh) + close(workerDone) + }); err != nil { + ssp.scatterErr = err + cancel() + close(workerDone) + } } type entryNode struct { @@ -327,11 +335,12 @@ type scatteredChunk struct { entries []execinfrapb.RestoreSpanEntry } -func (ssp *splitAndScatterProcessor) runSplitAndScatter( +func runSplitAndScatter( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.SplitAndScatterSpec, scatterer splitAndScatterer, + doneScatterCh chan<- entryNode, ) error { g := ctxgroup.WithContext(ctx) @@ -397,7 +406,7 @@ func (ssp *splitAndScatterProcessor) runSplitAndScatter( select { case <-ctx.Done(): return ctx.Err() - case ssp.doneScatterCh <- scatteredEntry: + case doneScatterCh <- scatteredEntry: } } } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor_test.go b/pkg/ccl/backupccl/split_and_scatter_processor_test.go index 1b6de2940383..555a517782e1 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor_test.go @@ -232,6 +232,7 @@ func TestSplitAndScatterProcessor(t *testing.T) { Settings: st, DB: kvDB, Codec: keys.SystemSQLCodec, + Stopper: tc.Stopper(), }, EvalCtx: &evalCtx, DiskMonitor: testDiskMonitor,