Skip to content

Commit

Permalink
backupccl: fix key rewriter race in generative split and scatter proc…
Browse files Browse the repository at this point in the history
…essor

The generative split and scatter processor is currently causing tests to fail
under race because there are many goroutines that are operating with the same
splitAndScatterer, which cannot be used concurrently as the underlying key
rewriter cannot be used concurrently. Modify the processor so that every worker
that uses the splitAndScatterer now uses its own instance.

Fixes: #95808

Release note: None
  • Loading branch information
Rui Hu committed Feb 1, 2023
1 parent d6a32ed commit 100b2b9
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 36 deletions.
112 changes: 84 additions & 28 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ type generativeSplitAndScatterProcessor struct {
spec execinfrapb.GenerativeSplitAndScatterSpec
output execinfra.RowReceiver

scatterer splitAndScatterer
// chunkSplitAndScatterers contain the splitAndScatterers for the group of
// split and scatter workers that's responsible for splitting and scattering
// the import span chunks. Each worker needs its own scatterer as one cannot
// be used concurrently.
chunkSplitAndScatterers []splitAndScatterer
// chunkEntrySplitAndScatterers contain the splitAndScatterers for the group of
// split workers that's responsible for making splits at each import span
// entry. These scatterers only create splits for the start key of each import
// span and do not perform any scatters.
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
cancelScatterAndWaitForWorker func()
Expand All @@ -72,24 +82,51 @@ func newGenerativeSplitAndScatterProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {

db := flowCtx.Cfg.DB
kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys,
false /* restoreTenantFromStream */)
if err != nil {
return nil, err
numChunkSplitAndScatterWorkers := int(spec.NumNodes)
// numEntrySplitWorkers is set to be 2 * numChunkSplitAndScatterWorkers in
// order to keep up with the rate at which chunks are split and scattered.
// TODO(rui): This tries to cover for a bad scatter by having 2 * the number
// of nodes in the cluster. Does this knob need to be re-tuned?
numEntrySplitWorkers := 2 * numChunkSplitAndScatterWorkers

mkSplitAndScatterer := func() (splitAndScatterer, error) {
if spec.ValidateOnly {
nodeID, _ := flowCtx.NodeID.OptionalNodeID()
return noopSplitAndScatterer{nodeID}, nil
}
kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys,
false /* restoreTenantFromStream */)
if err != nil {
return nil, err
}
return makeSplitAndScatterer(db.KV(), kr), nil
}

scatterer := makeSplitAndScatterer(db.KV(), kr)
if spec.ValidateOnly {
nodeID, _ := flowCtx.NodeID.OptionalNodeID()
scatterer = noopSplitAndScatterer{nodeID}
var chunkSplitAndScatterers []splitAndScatterer
for i := 0; i < numChunkSplitAndScatterWorkers; i++ {
scatterer, err := mkSplitAndScatterer()
if err != nil {
return nil, err
}
chunkSplitAndScatterers = append(chunkSplitAndScatterers, scatterer)
}

var chunkEntrySplitAndScatterers []splitAndScatterer
for i := 0; i < numEntrySplitWorkers; i++ {
scatterer, err := mkSplitAndScatterer()
if err != nil {
return nil, err
}
chunkEntrySplitAndScatterers = append(chunkEntrySplitAndScatterers, scatterer)
}

ssp := &generativeSplitAndScatterProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
scatterer: scatterer,
flowCtx: flowCtx,
spec: spec,
output: output,
chunkSplitAndScatterers: chunkSplitAndScatterers,
chunkEntrySplitAndScatterers: chunkEntrySplitAndScatterers,
// Large enough so that it never blocks.
doneScatterCh: make(chan entryNode, spec.NumEntries),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
Expand Down Expand Up @@ -124,7 +161,7 @@ func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) {
TaskName: "generativeSplitAndScatter-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh)
gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.chunkSplitAndScatterers, gssp.chunkEntrySplitAndScatterers, gssp.doneScatterCh)
cancel()
close(gssp.doneScatterCh)
close(workerDone)
Expand Down Expand Up @@ -223,15 +260,19 @@ func runGenerativeSplitAndScatter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.GenerativeSplitAndScatterSpec,
scatterer splitAndScatterer,
chunkSplitAndScatterers []splitAndScatterer,
chunkEntrySplitAndScatterers []splitAndScatterer,
doneScatterCh chan<- entryNode,
) error {
log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes",
spec.NumEntries, spec.ChunkSize, spec.NumNodes)
g := ctxgroup.WithContext(ctx)

splitWorkers := int(spec.NumNodes)
restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize))
chunkSplitAndScatterWorkers := len(chunkSplitAndScatterers)
restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, chunkSplitAndScatterWorkers*int(spec.ChunkSize))

// This goroutine generates import spans one at a time and sends them to
// restoreSpanEntriesCh.
g.GoCtx(func(ctx context.Context) error {
defer close(restoreSpanEntriesCh)

Expand Down Expand Up @@ -265,7 +306,11 @@ func runGenerativeSplitAndScatter(
)
})

restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers)
restoreEntryChunksCh := make(chan restoreEntryChunk, chunkSplitAndScatterWorkers)

// This goroutine takes the import spans off of restoreSpanEntriesCh and
// groups them into chunks of spec.ChunkSize. These chunks are then sent to
// restoreEntryChunksCh.
g.GoCtx(func(ctx context.Context) error {
defer close(restoreEntryChunksCh)

Expand All @@ -288,9 +333,15 @@ func runGenerativeSplitAndScatter(
return nil
})

importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2)
importSpanChunksCh := make(chan scatteredChunk, chunkSplitAndScatterWorkers*2)

// This group of goroutines processes the chunks from restoreEntryChunksCh.
// For each chunk, a split is created at the start key of the next chunk. The
// current chunk is then scattered, and the chunk with its destination is
// passed to importSpanChunksCh.
g2 := ctxgroup.WithContext(ctx)
for worker := 0; worker < splitWorkers; worker++ {
for worker := 0; worker < chunkSplitAndScatterWorkers; worker++ {
worker := worker
g2.GoCtx(func(ctx context.Context) error {
// Chunks' leaseholders should be randomly placed throughout the
// cluster.
Expand All @@ -299,11 +350,11 @@ func runGenerativeSplitAndScatter(
if !importSpanChunk.splitKey.Equal(roachpb.Key{}) {
// Split at the start of the next chunk, to partition off a
// prefix of the space to scatter.
if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
if err := chunkSplitAndScatterers[worker].split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil {
return err
}
}
chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey)
chunkDestination, err := chunkSplitAndScatterers[worker].scatter(ctx, flowCtx.Codec(), scatterKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -335,15 +386,20 @@ func runGenerativeSplitAndScatter(
})
}

// This goroutine waits for the chunkSplitAndScatter workers to finish so that
// it can close importSpanChunksCh.
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
return g2.Wait()
})

// TODO(pbardea): This tries to cover for a bad scatter by having 2 * the
// number of nodes in the cluster. Is it necessary?
splitScatterWorkers := 2 * splitWorkers
for worker := 0; worker < splitScatterWorkers; worker++ {
// This group of goroutines takes chunks that have already been split and
// scattered by the previous worker group. These workers create splits at the
// start key of the span of every entry of every chunk. After a chunk has been
// processed, it is passed to doneScatterCh to signal that the chunk has gone
// through the entire split and scatter process.
for worker := 0; worker < len(chunkEntrySplitAndScatterers); worker++ {
worker := worker
g.GoCtx(func(ctx context.Context) error {
for importSpanChunk := range importSpanChunksCh {
chunkDestination := importSpanChunk.destination
Expand All @@ -355,7 +411,7 @@ func runGenerativeSplitAndScatter(
if nextChunkIdx < len(importSpanChunk.entries) {
// Split at the next entry.
splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key
if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil {
if err := chunkEntrySplitAndScatterers[worker].split(ctx, flowCtx.Codec(), splitKey); err != nil {
return err
}
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func restore(
}

targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV)
importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
genSpan := func(ctx context.Context) error {
defer close(importSpanCh)
countSpansCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error {
defer close(spanCh)
return generateAndSendImportSpans(
restoreCtx,
dataToRestore.getSpans(),
Expand All @@ -326,7 +326,7 @@ func restore(
introducedSpanFrontier,
highWaterMark,
targetSize,
importSpanCh,
spanCh,
simpleImportSpans,
)
}
Expand All @@ -336,17 +336,20 @@ func restore(
var countTasks []func(ctx context.Context) error
log.Infof(restoreCtx, "rh_debug: starting count task")
spanCountTask := func(ctx context.Context) error {
for range importSpanCh {
for range countSpansCh {
numImportSpans++
}
return nil
}
countTasks = append(countTasks, genSpan, spanCountTask)
countTasks = append(countTasks, spanCountTask)
countTasks = append(countTasks, func(ctx context.Context) error {
return genSpan(ctx, countSpansCh)
})
if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil {
return emptyRowCount, errors.Wrapf(err, "counting number of import spans")
}

importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000)
importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block
// tasks are the concurrent tasks that are run during the restore.
var tasks []func(ctx context.Context) error
Expand Down Expand Up @@ -427,7 +430,10 @@ func restore(
}
return nil
}
tasks = append(tasks, generativeCheckpointLoop, genSpan)
tasks = append(tasks, generativeCheckpointLoop)
tasks = append(tasks, func(ctx context.Context) error {
return genSpan(ctx, importSpanCh)
})

runRestore := func(ctx context.Context) error {
return distRestore(
Expand Down

0 comments on commit 100b2b9

Please sign in to comment.