Skip to content

Commit

Permalink
backupccl: fix a span use-after-Finish
Browse files Browse the repository at this point in the history
This processor was spawning a goroutine whose tracing span is finished
externally when the processor is moved to draining. That might have
happened while the goroutine was running, which caused the span to be
possibly used afterwards. That's not cool. This patch overrides the
processor's draining process so that it properly waits for the goroutine
to finish before finishing the span.

Release note: None
  • Loading branch information
andreimatei authored and adityamaru committed Jan 31, 2022
1 parent 90e0dec commit 8d6e1c9
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ type splitAndScatterProcessor struct {
spec execinfrapb.SplitAndScatterSpec
output execinfra.RowReceiver

scatterer splitAndScatterer
stopScattering context.CancelFunc
scatterer splitAndScatterer
// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
// A cache for routing datums, so only 1 is allocated per node.
Expand Down Expand Up @@ -248,8 +250,13 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
// below from leaking when there are no errors. However, if that loop needs to
// exit early, runSplitAndScatter's context will be canceled.
scatterCtx, cancel := context.WithCancel(ctx)
ssp.stopScattering = cancel
workerDone := make(chan struct{})
ssp.cancelScatterAndWaitForWorker = func() {
cancel()
<-workerDone
}
go func() {
defer close(workerDone)
defer close(ssp.doneScatterCh)
defer cancel()
ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer)
Expand Down Expand Up @@ -309,9 +316,8 @@ func (ssp *splitAndScatterProcessor) ConsumerClosed() {
// runs into an error and stops consuming scattered entries to make sure we
// don't leak goroutines.
func (ssp *splitAndScatterProcessor) close() {
if ssp.InternalClose() {
ssp.stopScattering()
}
ssp.cancelScatterAndWaitForWorker()
ssp.InternalClose()
}

// scatteredChunk is the entries of a chunk of entries to process along with the
Expand Down

0 comments on commit 8d6e1c9

Please sign in to comment.