From 100b2b9394e084198343de5c068b00304689bc39 Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Tue, 31 Jan 2023 17:01:22 -0500 Subject: [PATCH] backupccl: fix key rewriter race in generative split and scatter processor The generative split and scatter processor is currently causing tests to fail under race because there are many goroutines that are operating with the same splitAndScatterer, which cannot be used concurrently as the underlying key rewriter cannot be used concurrently. Modify the processor so that every worker that uses the splitAndScatterer now uses its own instance. Fixes: #95808 Release note: None --- .../generative_split_and_scatter_processor.go | 112 +++++++++++++----- pkg/ccl/backupccl/restore_job.go | 22 ++-- 2 files changed, 98 insertions(+), 36 deletions(-) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index bffa2482329e..c28c40850831 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -51,7 +51,17 @@ type generativeSplitAndScatterProcessor struct { spec execinfrapb.GenerativeSplitAndScatterSpec output execinfra.RowReceiver - scatterer splitAndScatterer + // chunkSplitAndScatterers contain the splitAndScatterers for the group of + // split and scatter workers that's responsible for splitting and scattering + // the import span chunks. Each worker needs its own scatterer as one cannot + // be used concurrently. + chunkSplitAndScatterers []splitAndScatterer + // chunkEntrySplitAndScatterers contain the splitAndScatterers for the group of + // split workers that's responsible for making splits at each import span + // entry. These scatterers only create splits for the start key of each import + // span and do not perform any scatters. + chunkEntrySplitAndScatterers []splitAndScatterer + // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for // it to finish. cancelScatterAndWaitForWorker func() @@ -72,24 +82,51 @@ func newGenerativeSplitAndScatterProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { - db := flowCtx.Cfg.DB - kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, - false /* restoreTenantFromStream */) - if err != nil { - return nil, err + numChunkSplitAndScatterWorkers := int(spec.NumNodes) + // numEntrySplitWorkers is set to be 2 * numChunkSplitAndScatterWorkers in + // order to keep up with the rate at which chunks are split and scattered. + // TODO(rui): This tries to cover for a bad scatter by having 2 * the number + // of nodes in the cluster. Does this knob need to be re-tuned? + numEntrySplitWorkers := 2 * numChunkSplitAndScatterWorkers + + mkSplitAndScatterer := func() (splitAndScatterer, error) { + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + return noopSplitAndScatterer{nodeID}, nil + } + kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, + false /* restoreTenantFromStream */) + if err != nil { + return nil, err + } + return makeSplitAndScatterer(db.KV(), kr), nil } - scatterer := makeSplitAndScatterer(db.KV(), kr) - if spec.ValidateOnly { - nodeID, _ := flowCtx.NodeID.OptionalNodeID() - scatterer = noopSplitAndScatterer{nodeID} + var chunkSplitAndScatterers []splitAndScatterer + for i := 0; i < numChunkSplitAndScatterWorkers; i++ { + scatterer, err := mkSplitAndScatterer() + if err != nil { + return nil, err + } + chunkSplitAndScatterers = append(chunkSplitAndScatterers, scatterer) } + + var chunkEntrySplitAndScatterers []splitAndScatterer + for i := 0; i < numEntrySplitWorkers; i++ { + scatterer, err := mkSplitAndScatterer() + if err != nil { + return nil, err + } + chunkEntrySplitAndScatterers = append(chunkEntrySplitAndScatterers, scatterer) + } + ssp := &generativeSplitAndScatterProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - scatterer: scatterer, + flowCtx: flowCtx, + spec: spec, + output: output, + chunkSplitAndScatterers: chunkSplitAndScatterers, + chunkEntrySplitAndScatterers: chunkEntrySplitAndScatterers, // Large enough so that it never blocks. doneScatterCh: make(chan entryNode, spec.NumEntries), routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), @@ -124,7 +161,7 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { TaskName: "generativeSplitAndScatter-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh) + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh) cancel() close(gssp.doneScatterCh) close(workerDone) @@ -223,15 +260,19 @@ func runGenerativeSplitAndScatter( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, - scatterer splitAndScatterer, + chunkSplitAndScatterers []splitAndScatterer, + chunkEntrySplitAndScatterers []splitAndScatterer, doneScatterCh chan<- entryNode, ) error { log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", spec.NumEntries, spec.ChunkSize, spec.NumNodes) g := ctxgroup.WithContext(ctx) - splitWorkers := int(spec.NumNodes) - restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize)) + chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers) + restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, chunkSplitAndScatterWorkers*int(spec.ChunkSize)) + + // This goroutine generates import spans one at a time and sends them to + // restoreSpanEntriesCh. g.GoCtx(func(ctx context.Context) error { defer close(restoreSpanEntriesCh) @@ -265,7 +306,11 @@ func runGenerativeSplitAndScatter( ) }) - restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers) + restoreEntryChunksCh := make(chan restoreEntryChunk, chunkSplitAndScatterWorkers) + + // This goroutine takes the import spans off of restoreSpanEntriesCh and + // groups them into chunks of spec.ChunkSize. These chunks are then sent to + // restoreEntryChunksCh. g.GoCtx(func(ctx context.Context) error { defer close(restoreEntryChunksCh) @@ -288,9 +333,15 @@ func runGenerativeSplitAndScatter( return nil }) - importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2) + importSpanChunksCh := make(chan scatteredChunk, chunkSplitAndScatterWorkers*2) + + // This group of goroutines processes the chunks from restoreEntryChunksCh. + // For each chunk, a split is created at the start key of the next chunk. The + // current chunk is then scattered, and the chunk with its destination is + // passed to importSpanChunksCh. g2 := ctxgroup.WithContext(ctx) - for worker := 0; worker < splitWorkers; worker++ { + for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ { + worker := worker g2.GoCtx(func(ctx context.Context) error { // Chunks' leaseholders should be randomly placed throughout the // cluster. @@ -299,11 +350,11 @@ func runGenerativeSplitAndScatter( if !importSpanChunk.splitKey.Equal(roachpb.Key{}) { // Split at the start of the next chunk, to partition off a // prefix of the space to scatter. - if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { + if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { return err } } - chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey) + chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey) if err != nil { return err } @@ -335,15 +386,20 @@ func runGenerativeSplitAndScatter( }) } + // This goroutine waits for the chunkSplitAndScatter workers to finish so that + // it can close importSpanChunksCh. g.GoCtx(func(ctx context.Context) error { defer close(importSpanChunksCh) return g2.Wait() }) - // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the - // number of nodes in the cluster. Is it necessary? - splitScatterWorkers := 2 * splitWorkers - for worker := 0; worker < splitScatterWorkers; worker++ { + // This group of goroutines takes chunks that have already been split and + // scattered by the previous worker group. These workers create splits at the + // start key of the span of every entry of every chunk. After a chunk has been + // processed, it is passed to doneScatterCh to signal that the chunk has gone + // through the entire split and scatter process. + for worker := 0; worker < len(chunkEntrySplitAndScatterers); worker++ { + worker := worker g.GoCtx(func(ctx context.Context) error { for importSpanChunk := range importSpanChunksCh { chunkDestination := importSpanChunk.destination @@ -355,7 +411,7 @@ func runGenerativeSplitAndScatter( if nextChunkIdx < len(importSpanChunk.entries) { // Split at the next entry. splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key - if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil { + if err := chunkEntrySplitAndScatterers[worker].split(ctx, flowCtx.Codec(), splitKey); err != nil { return err } } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 2511d02eaf1e..9188fe697186 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -314,9 +314,9 @@ func restore( } targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) - importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) - genSpan := func(ctx context.Context) error { - defer close(importSpanCh) + countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { + defer close(spanCh) return generateAndSendImportSpans( restoreCtx, dataToRestore.getSpans(), @@ -326,7 +326,7 @@ func restore( introducedSpanFrontier, highWaterMark, targetSize, - importSpanCh, + spanCh, simpleImportSpans, ) } @@ -336,17 +336,20 @@ func restore( var countTasks []func(ctx context.Context) error log.Infof(restoreCtx, "rh_debug: starting count task") spanCountTask := func(ctx context.Context) error { - for range importSpanCh { + for range countSpansCh { numImportSpans++ } return nil } - countTasks = append(countTasks, genSpan, spanCountTask) + countTasks = append(countTasks, spanCountTask) + countTasks = append(countTasks, func(ctx context.Context) error { + return genSpan(ctx, countSpansCh) + }) if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil { return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000) + importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error @@ -427,7 +430,10 @@ func restore( } return nil } - tasks = append(tasks, generativeCheckpointLoop, genSpan) + tasks = append(tasks, generativeCheckpointLoop) + tasks = append(tasks, func(ctx context.Context) error { + return genSpan(ctx, importSpanCh) + }) runRestore := func(ctx context.Context) error { return distRestore(