Skip to content

Commit

Permalink
Merge pull request #76509 from cockroachlabs/blathers/backport-releas…
Browse files Browse the repository at this point in the history
…e-21.2-74905

release-21.2: backup: use correct Context in restore workers
  • Loading branch information
dt authored Feb 18, 2022
2 parents aa83a51 + 2f1a94d commit 6133ffd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
18 changes: 8 additions & 10 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.sstCh)
for entry := range entries {
if err := rd.openSSTs(entry, rd.sstCh); err != nil {
if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil {
return err
}
}
Expand All @@ -166,7 +166,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {

rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.progCh)
return rd.runRestoreWorkers(rd.sstCh)
return rd.runRestoreWorkers(ctx, rd.sstCh)
})
}

Expand Down Expand Up @@ -240,9 +240,8 @@ type mergedSST struct {
}

func (rd *restoreDataProcessor) openSSTs(
entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST,
ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST,
) error {
ctx := rd.Ctx
ctxDone := ctx.Done()

// The sstables only contain MVCC data and no intents, so using an MVCC
Expand Down Expand Up @@ -299,7 +298,7 @@ func (rd *restoreDataProcessor) openSSTs(
return nil
}

log.VEventf(rd.Ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)
log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)

for _, file := range entry.Files {
log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key)
Expand All @@ -322,8 +321,8 @@ func (rd *restoreDataProcessor) openSSTs(
return sendIters(iters, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error {
return ctxgroup.GroupWorkers(rd.Ctx, rd.numWorkers, func(ctx context.Context, _ int) error {
func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, _ int) error {
for {
done, err := func() (done bool, _ error) {
sstIter, ok := <-ssts
Expand All @@ -332,7 +331,7 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error {
return done, nil
}

summary, err := rd.processRestoreSpanEntry(sstIter)
summary, err := rd.processRestoreSpanEntry(ctx, sstIter)
if err != nil {
return done, err
}
Expand All @@ -358,10 +357,9 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error {
}

func (rd *restoreDataProcessor) processRestoreSpanEntry(
sst mergedSST,
ctx context.Context, sst mergedSST,
) (roachpb.BulkOpSummary, error) {
db := rd.flowCtx.Cfg.DB
ctx := rd.Ctx
evalCtx := rd.EvalCtx
var summary roachpb.BulkOpSummary

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(restoreSpanEntry, ssts))
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
close(ssts)
sst := <-ssts
_, err = mockRestoreDataProcessor.processRestoreSpanEntry(sst)
_, err = mockRestoreDataProcessor.processRestoreSpanEntry(ctx, sst)
require.NoError(t, err)

clientKVs, err := kvDB.Scan(ctx, reqStartKey, reqEndKey, 0)
Expand Down

0 comments on commit 6133ffd

Please sign in to comment.