From 2f1a94d061071327c2db455643171bef96b4c395 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 16 Jan 2022 21:53:41 +0000 Subject: [PATCH] backup: use correct Context in restore workers Previously some of the workers, which are called by ctxgroup goroutines, were using RestoreDataProcessor.Ctx, instead of the child context that the group created, which, critically, is cancelled if any group task fails. This could mean one worker in the group fails and stops draining a channel and returns an error to the group, which cancels its context, but another worker trying to write to that channel hangs if it is not checking the passed, now cancelled context. Release note (bug fix): fix a case where a RESTORE job could hang if it encountered an error when ingesting restored data. --- pkg/ccl/backupccl/restore_data_processor.go | 18 ++++++++---------- .../backupccl/restore_data_processor_test.go | 4 ++-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index c87ca38ebaaf..8dc12c67e888 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -156,7 +156,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.sstCh) for entry := range entries { - if err := rd.openSSTs(entry, rd.sstCh); err != nil { + if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil { return err } } @@ -166,7 +166,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.progCh) - return rd.runRestoreWorkers(rd.sstCh) + return rd.runRestoreWorkers(ctx, rd.sstCh) }) } @@ -240,9 +240,8 @@ type mergedSST struct { } func (rd *restoreDataProcessor) openSSTs( - entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, + ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, ) error { - ctx := rd.Ctx ctxDone := ctx.Done() // The sstables only contain MVCC data and no intents, so using an MVCC @@ -299,7 +298,7 @@ func (rd *restoreDataProcessor) openSSTs( return nil } - log.VEventf(rd.Ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -322,8 +321,8 @@ func (rd *restoreDataProcessor) openSSTs( return sendIters(iters, dirs) } -func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { - return ctxgroup.GroupWorkers(rd.Ctx, rd.numWorkers, func(ctx context.Context, _ int) error { +func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { + return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, _ int) error { for { done, err := func() (done bool, _ error) { sstIter, ok := <-ssts @@ -332,7 +331,7 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { return done, nil } - summary, err := rd.processRestoreSpanEntry(sstIter) + summary, err := rd.processRestoreSpanEntry(ctx, sstIter) if err != nil { return done, err } @@ -358,10 +357,9 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { } func (rd *restoreDataProcessor) processRestoreSpanEntry( - sst mergedSST, + ctx context.Context, sst mergedSST, ) (roachpb.BulkOpSummary, error) { db := rd.flowCtx.Cfg.DB - ctx := rd.Ctx evalCtx := rd.EvalCtx var summary roachpb.BulkOpSummary diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index fbbef31b3a39..ba902825a066 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -389,10 +389,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) - require.NoError(t, mockRestoreDataProcessor.openSSTs(restoreSpanEntry, ssts)) + require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) close(ssts) sst := <-ssts - _, err = mockRestoreDataProcessor.processRestoreSpanEntry(sst) + _, err = mockRestoreDataProcessor.processRestoreSpanEntry(ctx, sst) require.NoError(t, err) clientKVs, err := kvDB.Scan(ctx, reqStartKey, reqEndKey, 0)