diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 3e4c90e38c09..45fd44ab5d03 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -65,9 +65,6 @@ type restoreDataProcessor struct { phaseGroup ctxgroup.Group cancelWorkersAndWait func() - // sstCh is a channel that holds SSTs opened by the processor, but not yet - // ingested. - sstCh chan mergedSST // Metas from the input are forwarded to the output of this processor. metaCh chan *execinfrapb.ProducerMetadata // progress updates are accumulated on this channel. It is populated by the @@ -133,14 +130,15 @@ func newRestoreDataProcessor( input execinfra.RowSource, ) (execinfra.Processor, error) { sv := &flowCtx.Cfg.Settings.SV + numWorkers := int(numRestoreWorkers.Get(sv)) rd := &restoreDataProcessor{ flowCtx: flowCtx, input: input, spec: spec, progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers), - metaCh: make(chan *execinfrapb.ProducerMetadata, 1), - numWorkers: int(numRestoreWorkers.Get(sv)), + metaCh: make(chan *execinfrapb.ProducerMetadata, numWorkers), + numWorkers: numWorkers, } if err := rd.Init(ctx, rd, post, restoreDataOutputTypes, flowCtx, processorID, nil, /* memMonitor */ @@ -173,26 +171,14 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) - rd.sstCh = make(chan mergedSST, rd.numWorkers) rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(entries) return inputReader(ctx, rd.input, entries, rd.metaCh) }) - rd.phaseGroup.GoCtx(func(ctx context.Context) error { - defer close(rd.sstCh) - for entry := range entries { - if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil { - return err - } - } - - return nil - }) - rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.progCh) - return rd.runRestoreWorkers(ctx, rd.sstCh) + return rd.runRestoreWorkers(ctx, entries) }) } @@ -266,10 +252,8 @@ type mergedSST struct { } func (rd *restoreDataProcessor) openSSTs( - ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, -) error { - ctxDone := ctx.Done() - + ctx context.Context, entry execinfrapb.RestoreSpanEntry, +) (mergedSST, error) { // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir // in a given restore span entry var dirs []cloud.ExternalStorage @@ -284,9 +268,9 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIter sends a multiplexed iterator covering the currently accumulated files over the - // channel. - sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + // getIter returns a multiplexed iterator covering the currently accumulated + // files over the channel. + getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) (mergedSST, error) { readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { @@ -305,14 +289,8 @@ func (rd *restoreDataProcessor) openSSTs( cleanup: cleanup, } - select { - case sstCh <- mSST: - case <-ctxDone: - return ctx.Err() - } - dirs = make([]cloud.ExternalStorage, 0) - return nil + return mSST, nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) @@ -323,12 +301,12 @@ func (rd *restoreDataProcessor) openSSTs( dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir) if err != nil { - return err + return mergedSST{}, err } dirs = append(dirs, dir) storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path}) - // TODO(pbardea): When memory monitoring is added, send the currently - // accumulated iterators on the channel if we run into memory pressure. + // TODO(pbardea): When memory monitoring is added, return the currently + // accumulated iterators if we run into memory pressure. } iterOpts := storage.IterOptions{ RangeKeyMaskingBelow: rd.spec.RestoreTime, @@ -338,12 +316,14 @@ func (rd *restoreDataProcessor) openSSTs( } iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) if err != nil { - return err + return mergedSST{}, err } - return sendIter(iter, dirs) + return getIter(iter, dirs) } -func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { +func (rd *restoreDataProcessor) runRestoreWorkers( + ctx context.Context, entries chan execinfrapb.RestoreSpanEntry, +) error { return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error { kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys, false /* restoreTenantFromStream */) @@ -357,12 +337,17 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan for { done, err := func() (done bool, _ error) { - sstIter, ok := <-ssts + entry, ok := <-entries if !ok { done = true return done, nil } + sstIter, err := rd.openSSTs(ctx, entry) + if err != nil { + return done, err + } + summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter) if err != nil { return done, err @@ -578,12 +563,6 @@ func (rd *restoreDataProcessor) ConsumerClosed() { return } rd.cancelWorkersAndWait() - if rd.sstCh != nil { - // Cleanup all the remaining open SSTs that have not been consumed. - for sst := range rd.sstCh { - sst.cleanup() - } - } rd.agg.Close() rd.InternalClose() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index d3c6f2552a33..ed974f354a29 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -394,10 +394,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) - ssts := make(chan mergedSST, 1) - require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) - close(ssts) - sst := <-ssts + sst, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry) + require.NoError(t, err) rewriter, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), mockRestoreDataSpec.TableRekeys, mockRestoreDataSpec.TenantRekeys, false /* restoreTenantFromStream */) require.NoError(t, err)