Skip to content

Commit

Permalink
backupccl: fix data race
Browse files Browse the repository at this point in the history
A reader and writer were racing on the stopScattering member.

Release note: None
  • Loading branch information
andreimatei authored and adityamaru committed Jan 31, 2022
1 parent dea26cb commit 90e0dec
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ func newSplitAndScatterProcessor(
// Start is part of the RowSource interface.
func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
ctx = ssp.StartInternal(ctx, splitAndScatterProcessorName)
// Note that the loop over doneScatterCh in Next should prevent the goroutine
// below from leaking when there are no errors. However, if that loop needs to
// exit early, runSplitAndScatter's context will be canceled.
scatterCtx, cancel := context.WithCancel(ctx)
ssp.stopScattering = cancel
go func() {
// Note that the loop over doneScatterCh in Next should prevent this
// goroutine from leaking when there are no errors. However, if that loop
// needs to exit early, runSplitAndScatter's context will be canceled.
scatterCtx, stopScattering := context.WithCancel(ctx)
ssp.stopScattering = stopScattering

defer close(ssp.doneScatterCh)
defer cancel()
ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer)
}()
}
Expand Down Expand Up @@ -310,9 +310,7 @@ func (ssp *splitAndScatterProcessor) ConsumerClosed() {
// don't leak goroutines.
func (ssp *splitAndScatterProcessor) close() {
if ssp.InternalClose() {
if ssp.stopScattering != nil {
ssp.stopScattering()
}
ssp.stopScattering()
}
}

Expand Down

0 comments on commit 90e0dec

Please sign in to comment.