Skip to content

Commit

Permalink
backupccl: put a goroutine under the stopper's control
Browse files Browse the repository at this point in the history
The splitAndScatter processor was spawning a naked goroutine. We don't
like that very much - see cockroachdb#58164. This patch puts that goroutine under
the Stopper. One benefit is that the goroutine gets its own span, so
it's resilient to the parent span being Finish()ed from under it (which
was a bug until the prior commit).

Release note: None
  • Loading branch information
andreimatei authored and adityamaru committed Jan 31, 2022
1 parent 8d6e1c9 commit 7999821
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
25 changes: 17 additions & 8 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -255,12 +256,19 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
cancel()
<-workerDone
}
go func() {
defer close(workerDone)
defer close(ssp.doneScatterCh)
defer cancel()
ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer)
}()
if err := ssp.flowCtx.Stopper().RunAsyncTaskEx(scatterCtx, stop.TaskOpts{
TaskName: "splitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
ssp.scatterErr = runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer, ssp.doneScatterCh)
cancel()
close(ssp.doneScatterCh)
close(workerDone)
}); err != nil {
ssp.scatterErr = err
cancel()
close(workerDone)
}
}

type entryNode struct {
Expand Down Expand Up @@ -327,11 +335,12 @@ type scatteredChunk struct {
entries []execinfrapb.RestoreSpanEntry
}

func (ssp *splitAndScatterProcessor) runSplitAndScatter(
func runSplitAndScatter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.SplitAndScatterSpec,
scatterer splitAndScatterer,
doneScatterCh chan<- entryNode,
) error {
g := ctxgroup.WithContext(ctx)

Expand Down Expand Up @@ -397,7 +406,7 @@ func (ssp *splitAndScatterProcessor) runSplitAndScatter(
select {
case <-ctx.Done():
return ctx.Err()
case ssp.doneScatterCh <- scatteredEntry:
case doneScatterCh <- scatteredEntry:
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestSplitAndScatterProcessor(t *testing.T) {
Settings: st,
DB: kvDB,
Codec: keys.SystemSQLCodec,
Stopper: tc.Stopper(),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down

0 comments on commit 7999821

Please sign in to comment.