From c894ec708fec249561c3e3cc047cf131d72cd7b5 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Fri, 17 Mar 2023 15:54:04 -0700 Subject: [PATCH] backupccl: move openSSTs call into the restore workers Currently we see openSSTs being a bottleneck during restore when using more than 4 workers. This patch moves the openSSTs call into the worker itself, so that this work can be parallelized. This is needed for later PR which will increase the number of workers. Also, this change simplifies the code a bit and makes it easier to implement #93324, because in that PR we want to produce a few partial SSTs that need to be processed serially. Before this patch it wasn't trivial to make sure that the N workers will not process those partial SSTs in the wrong order, and with this patch each worker will process a single mergedSST, and therefore can serialize the partial SSTs created from that mergedSST. Tested by running a roachtest (4 nodes, 8 cores) with and without this change. The fixed version was faster: 80MB/s/node vs 60 but some of it is noise, we do expect a perf improvement when using more workers and other params tuned, which is the next step. Informs: #98015 Epic: CRDB-20916 Release note: None --- pkg/ccl/backupccl/restore_data_processor.go | 69 +++++++------------ .../backupccl/restore_data_processor_test.go | 6 +- 2 files changed, 26 insertions(+), 49 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 3e4c90e38c09..45fd44ab5d03 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -65,9 +65,6 @@ type restoreDataProcessor struct { phaseGroup ctxgroup.Group cancelWorkersAndWait func() - // sstCh is a channel that holds SSTs opened by the processor, but not yet - // ingested. - sstCh chan mergedSST // Metas from the input are forwarded to the output of this processor. metaCh chan *execinfrapb.ProducerMetadata // progress updates are accumulated on this channel. It is populated by the @@ -133,14 +130,15 @@ func newRestoreDataProcessor( input execinfra.RowSource, ) (execinfra.Processor, error) { sv := &flowCtx.Cfg.Settings.SV + numWorkers := int(numRestoreWorkers.Get(sv)) rd := &restoreDataProcessor{ flowCtx: flowCtx, input: input, spec: spec, progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers), - metaCh: make(chan *execinfrapb.ProducerMetadata, 1), - numWorkers: int(numRestoreWorkers.Get(sv)), + metaCh: make(chan *execinfrapb.ProducerMetadata, numWorkers), + numWorkers: numWorkers, } if err := rd.Init(ctx, rd, post, restoreDataOutputTypes, flowCtx, processorID, nil, /* memMonitor */ @@ -173,26 +171,14 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { log.Infof(ctx, "starting restore data processor") entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers) - rd.sstCh = make(chan mergedSST, rd.numWorkers) rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(entries) return inputReader(ctx, rd.input, entries, rd.metaCh) }) - rd.phaseGroup.GoCtx(func(ctx context.Context) error { - defer close(rd.sstCh) - for entry := range entries { - if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil { - return err - } - } - - return nil - }) - rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.progCh) - return rd.runRestoreWorkers(ctx, rd.sstCh) + return rd.runRestoreWorkers(ctx, entries) }) } @@ -266,10 +252,8 @@ type mergedSST struct { } func (rd *restoreDataProcessor) openSSTs( - ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, -) error { - ctxDone := ctx.Done() - + ctx context.Context, entry execinfrapb.RestoreSpanEntry, +) (mergedSST, error) { // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir // in a given restore span entry var dirs []cloud.ExternalStorage @@ -284,9 +268,9 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIter sends a multiplexed iterator covering the currently accumulated files over the - // channel. - sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + // getIter returns a multiplexed iterator covering the currently accumulated + // files over the channel. + getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) (mergedSST, error) { readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { @@ -305,14 +289,8 @@ func (rd *restoreDataProcessor) openSSTs( cleanup: cleanup, } - select { - case sstCh <- mSST: - case <-ctxDone: - return ctx.Err() - } - dirs = make([]cloud.ExternalStorage, 0) - return nil + return mSST, nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) @@ -323,12 +301,12 @@ func (rd *restoreDataProcessor) openSSTs( dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir) if err != nil { - return err + return mergedSST{}, err } dirs = append(dirs, dir) storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path}) - // TODO(pbardea): When memory monitoring is added, send the currently - // accumulated iterators on the channel if we run into memory pressure. + // TODO(pbardea): When memory monitoring is added, return the currently + // accumulated iterators if we run into memory pressure. } iterOpts := storage.IterOptions{ RangeKeyMaskingBelow: rd.spec.RestoreTime, @@ -338,12 +316,14 @@ func (rd *restoreDataProcessor) openSSTs( } iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) if err != nil { - return err + return mergedSST{}, err } - return sendIter(iter, dirs) + return getIter(iter, dirs) } -func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { +func (rd *restoreDataProcessor) runRestoreWorkers( + ctx context.Context, entries chan execinfrapb.RestoreSpanEntry, +) error { return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error { kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys, false /* restoreTenantFromStream */) @@ -357,12 +337,17 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan for { done, err := func() (done bool, _ error) { - sstIter, ok := <-ssts + entry, ok := <-entries if !ok { done = true return done, nil } + sstIter, err := rd.openSSTs(ctx, entry) + if err != nil { + return done, err + } + summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter) if err != nil { return done, err @@ -578,12 +563,6 @@ func (rd *restoreDataProcessor) ConsumerClosed() { return } rd.cancelWorkersAndWait() - if rd.sstCh != nil { - // Cleanup all the remaining open SSTs that have not been consumed. - for sst := range rd.sstCh { - sst.cleanup() - } - } rd.agg.Close() rd.InternalClose() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index d3c6f2552a33..ed974f354a29 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -394,10 +394,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) - ssts := make(chan mergedSST, 1) - require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) - close(ssts) - sst := <-ssts + sst, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry) + require.NoError(t, err) rewriter, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), mockRestoreDataSpec.TableRekeys, mockRestoreDataSpec.TenantRekeys, false /* restoreTenantFromStream */) require.NoError(t, err)