From eb64b81a698db17ff29c0c4fc72658b5a32ddcaa Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 10 Nov 2021 13:26:52 -0500 Subject: [PATCH] backupccl: fix a span use-after-Finish This processor was spawning a goroutine whose tracing span is finished externally when the processor is moved to draining. That might have happened while the goroutine was running, which caused the span to be possibly used afterwards. That's not cool. This patch overrides the processor's draining process so that it properly waits for the goroutine to finish before finishing the span. Release note: None --- .../backupccl/split_and_scatter_processor.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 7f8b73b23b7c..a6af7aa082cc 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -186,8 +186,10 @@ type splitAndScatterProcessor struct { spec execinfrapb.SplitAndScatterSpec output execinfra.RowReceiver - scatterer splitAndScatterer - stopScattering context.CancelFunc + scatterer splitAndScatterer + // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for + // it to finish. + cancelScatterAndWaitForWorker func() doneScatterCh chan entryNode // A cache for routing datums, so only 1 is allocated per node. @@ -248,8 +250,13 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) { // below from leaking when there are no errors. However, if that loop needs to // exit early, runSplitAndScatter's context will be canceled. scatterCtx, cancel := context.WithCancel(ctx) - ssp.stopScattering = cancel + workerDone := make(chan struct{}) + ssp.cancelScatterAndWaitForWorker = func() { + cancel() + <-workerDone + } go func() { + defer close(workerDone) defer close(ssp.doneScatterCh) defer cancel() ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer) @@ -309,9 +316,8 @@ func (ssp *splitAndScatterProcessor) ConsumerClosed() { // runs into an error and stops consuming scattered entries to make sure we // don't leak goroutines. func (ssp *splitAndScatterProcessor) close() { - if ssp.InternalClose() { - ssp.stopScattering() - } + ssp.cancelScatterAndWaitForWorker() + ssp.InternalClose() } // scatteredChunk is the entries of a chunk of entries to process along with the