Skip to content

Commit

Permalink
backupccl: put a goroutine under the stopper's control
Browse files Browse the repository at this point in the history
The splitAndScatter processor was spawning a naked goroutine. We don't
like that very much - see cockroachdb#58164. This patch puts that goroutine under
the Stopper. One benefit is that the goroutine gets its own span, so
it's resilient to the parent span being Finish()ed from under it (which
was a bug until the prior commit).

Release note: None
  • Loading branch information
andreimatei committed Nov 11, 2021
1 parent eb64b81 commit 2934ff0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -255,12 +256,18 @@ func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
cancel()
<-workerDone
}
go func() {
if err := ssp.flowCtx.Stopper().RunAsyncTaskEx(scatterCtx, stop.TaskOpts{
TaskName: "splitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
defer close(workerDone)
defer close(ssp.doneScatterCh)
defer cancel()
ssp.scatterErr = ssp.runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer)
}()
ssp.scatterErr = runSplitAndScatter(scatterCtx, ssp.flowCtx, &ssp.spec, ssp.scatterer, ssp.doneScatterCh)
}); err != nil {
ssp.scatterErr = err
cancel()
close(workerDone)
}
}

type entryNode struct {
Expand Down Expand Up @@ -327,12 +334,14 @@ type scatteredChunk struct {
entries []execinfrapb.RestoreSpanEntry
}

func (ssp *splitAndScatterProcessor) runSplitAndScatter(
func runSplitAndScatter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.SplitAndScatterSpec,
scatterer splitAndScatterer,
doneScatterCh chan<- entryNode,
) error {
defer close(doneScatterCh)
g := ctxgroup.WithContext(ctx)

importSpanChunksCh := make(chan scatteredChunk)
Expand Down Expand Up @@ -397,7 +406,7 @@ func (ssp *splitAndScatterProcessor) runSplitAndScatter(
select {
case <-ctx.Done():
return ctx.Err()
case ssp.doneScatterCh <- scatteredEntry:
case doneScatterCh <- scatteredEntry:
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func TestSplitAndScatterProcessor(t *testing.T) {
Settings: st,
DB: kvDB,
Codec: keys.SystemSQLCodec,
Stopper: tc.Stopper(),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down

0 comments on commit 2934ff0

Please sign in to comment.