diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 66badc289171..e38289b18b15 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -172,7 +172,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.sstCh) for entry := range entries { - if err := rd.openSSTs(entry, rd.sstCh); err != nil { + if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil { return err } } @@ -182,7 +182,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.progCh) - return rd.runRestoreWorkers(rd.sstCh) + return rd.runRestoreWorkers(ctx, rd.sstCh) }) } @@ -256,9 +256,8 @@ type mergedSST struct { } func (rd *restoreDataProcessor) openSSTs( - entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, + ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, ) error { - ctx := rd.Ctx ctxDone := ctx.Done() // The sstables only contain MVCC data and no intents, so using an MVCC @@ -315,7 +314,7 @@ func (rd *restoreDataProcessor) openSSTs( return nil } - log.VEventf(rd.Ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -338,8 +337,8 @@ func (rd *restoreDataProcessor) openSSTs( return sendIters(iters, dirs) } -func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { - return ctxgroup.GroupWorkers(rd.Ctx, rd.numWorkers, func(ctx context.Context, _ int) error { +func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { + return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, _ int) error { for { done, err := func() (done bool, _ error) { sstIter, ok := <-ssts @@ -348,7 +347,7 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { return done, nil } - summary, err := rd.processRestoreSpanEntry(sstIter) + summary, err := rd.processRestoreSpanEntry(ctx, sstIter) if err != nil { return done, err } @@ -374,10 +373,9 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { } func (rd *restoreDataProcessor) processRestoreSpanEntry( - sst mergedSST, + ctx context.Context, sst mergedSST, ) (roachpb.BulkOpSummary, error) { db := rd.flowCtx.Cfg.DB - ctx := rd.Ctx evalCtx := rd.EvalCtx var summary roachpb.BulkOpSummary diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index fbbef31b3a39..ba902825a066 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -389,10 +389,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) - require.NoError(t, mockRestoreDataProcessor.openSSTs(restoreSpanEntry, ssts)) + require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) close(ssts) sst := <-ssts - _, err = mockRestoreDataProcessor.processRestoreSpanEntry(sst) + _, err = mockRestoreDataProcessor.processRestoreSpanEntry(ctx, sst) require.NoError(t, err) clientKVs, err := kvDB.Scan(ctx, reqStartKey, reqEndKey, 0)