From 5333cddc360a7e5e3591487af4e30c52e9c6f2a5 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 10 Nov 2021 13:19:42 -0500 Subject: [PATCH] backupccl: fix data race A reader and writer were racing on the stopScattering member. Release note: None --- pkg/ccl/backupccl/split_and_scatter_processor.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 3143c6ecdb8f..7f8b73b23b7c 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -244,14 +244,14 @@ func newSplitAndScatterProcessor( // Start is part of the RowSource interface. func (ssp *splitAndScatterProcessor) Start(ctx context.Context) { ctx = ssp.StartInternal(ctx, splitAndScatterProcessorName) + // Note that the loop over doneScatterCh in Next should prevent the goroutine + // below from leaking when there are no errors. However, if that loop needs to + // exit early, runSplitAndScatter's context will be canceled. + scatterCtx, cancel := context.WithCancel(ctx) + ssp.stopScattering = cancel go func() { - // Note that the loop over doneScatterCh in Next should prevent this - // goroutine from leaking when there are no errors. However, if that loop - // needs to exit early, runSplitAndScatter's context will be canceled. - scatterCtx, stopScattering := context.WithCancel(ctx) - ssp.stopScattering = stopScattering - defer close(ssp.doneScatterCh) + defer cancel() ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer) }() } @@ -310,9 +310,7 @@ func (ssp *splitAndScatterProcessor) ConsumerClosed() { // don't leak goroutines. func (ssp *splitAndScatterProcessor) close() { if ssp.InternalClose() { - if ssp.stopScattering != nil { - ssp.stopScattering() - } + ssp.stopScattering() } }