Skip to content

Commit

Permalink
Merge #98906
Browse files Browse the repository at this point in the history
98906: backupccl: move openSSTs call into the restore workers r=lidorcarmel a=lidorcarmel

Currently we see openSSTs being a bottleneck during restore when using more than 4 workers.

This patch moves the openSSTs call into the worker itself, so that this work can be parallelized. This is needed for later PR which will increase the number of workers.

Also, this change simplifies the code a bit and makes it easier to implement #93324, because in that PR we want to produce a few partial SSTs that need to be processed serially. Before this patch it wasn't trivial to make sure that the N workers will not process those partial SSTs in the wrong order, and with this patch each worker will process a single mergedSST, and therefore can serialize the partial SSTs created from that mergedSST.

Tested by running a roachtest (4 nodes, 8 cores) with and without this change. The fixed version was faster: 80MB/s/node vs 60 but some of it is noise, we do expect a perf improvement when using more workers and other params tuned, which is the next step.

Informs: #98015
Epic: CRDB-20916

Release note: None

Co-authored-by: Lidor Carmel <[email protected]>
  • Loading branch information
craig[bot] and lidorcarmel committed Mar 21, 2023
2 parents 87e227e + c894ec7 commit c3b4ec9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 49 deletions.
69 changes: 24 additions & 45 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type restoreDataProcessor struct {
phaseGroup ctxgroup.Group
cancelWorkersAndWait func()

// sstCh is a channel that holds SSTs opened by the processor, but not yet
// ingested.
sstCh chan mergedSST
// Metas from the input are forwarded to the output of this processor.
metaCh chan *execinfrapb.ProducerMetadata
// progress updates are accumulated on this channel. It is populated by the
Expand Down Expand Up @@ -133,14 +130,15 @@ func newRestoreDataProcessor(
input execinfra.RowSource,
) (execinfra.Processor, error) {
sv := &flowCtx.Cfg.Settings.SV
numWorkers := int(numRestoreWorkers.Get(sv))

rd := &restoreDataProcessor{
flowCtx: flowCtx,
input: input,
spec: spec,
progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers),
metaCh: make(chan *execinfrapb.ProducerMetadata, 1),
numWorkers: int(numRestoreWorkers.Get(sv)),
metaCh: make(chan *execinfrapb.ProducerMetadata, numWorkers),
numWorkers: numWorkers,
}

if err := rd.Init(ctx, rd, post, restoreDataOutputTypes, flowCtx, processorID, nil, /* memMonitor */
Expand Down Expand Up @@ -173,26 +171,14 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
log.Infof(ctx, "starting restore data processor")

entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers)
rd.sstCh = make(chan mergedSST, rd.numWorkers)
rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(entries)
return inputReader(ctx, rd.input, entries, rd.metaCh)
})

rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.sstCh)
for entry := range entries {
if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil {
return err
}
}

return nil
})

rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.progCh)
return rd.runRestoreWorkers(ctx, rd.sstCh)
return rd.runRestoreWorkers(ctx, entries)
})
}

Expand Down Expand Up @@ -266,10 +252,8 @@ type mergedSST struct {
}

func (rd *restoreDataProcessor) openSSTs(
ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST,
) error {
ctxDone := ctx.Done()

ctx context.Context, entry execinfrapb.RestoreSpanEntry,
) (mergedSST, error) {
// TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir
// in a given restore span entry
var dirs []cloud.ExternalStorage
Expand All @@ -284,9 +268,9 @@ func (rd *restoreDataProcessor) openSSTs(
}
}()

// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
// getIter returns a multiplexed iterator covering the currently accumulated
// files over the channel.
getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) (mergedSST, error) {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
Expand All @@ -305,14 +289,8 @@ func (rd *restoreDataProcessor) openSSTs(
cleanup: cleanup,
}

select {
case sstCh <- mSST:
case <-ctxDone:
return ctx.Err()
}

dirs = make([]cloud.ExternalStorage, 0)
return nil
return mSST, nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)
Expand All @@ -323,12 +301,12 @@ func (rd *restoreDataProcessor) openSSTs(

dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir)
if err != nil {
return err
return mergedSST{}, err
}
dirs = append(dirs, dir)
storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path})
// TODO(pbardea): When memory monitoring is added, send the currently
// accumulated iterators on the channel if we run into memory pressure.
// TODO(pbardea): When memory monitoring is added, return the currently
// accumulated iterators if we run into memory pressure.
}
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
Expand All @@ -338,12 +316,14 @@ func (rd *restoreDataProcessor) openSSTs(
}
iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts)
if err != nil {
return err
return mergedSST{}, err
}
return sendIter(iter, dirs)
return getIter(iter, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
func (rd *restoreDataProcessor) runRestoreWorkers(
ctx context.Context, entries chan execinfrapb.RestoreSpanEntry,
) error {
return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error {
kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys,
false /* restoreTenantFromStream */)
Expand All @@ -357,12 +337,17 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan

for {
done, err := func() (done bool, _ error) {
sstIter, ok := <-ssts
entry, ok := <-entries
if !ok {
done = true
return done, nil
}

sstIter, err := rd.openSSTs(ctx, entry)
if err != nil {
return done, err
}

summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter)
if err != nil {
return done, err
Expand Down Expand Up @@ -578,12 +563,6 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
return
}
rd.cancelWorkersAndWait()
if rd.sstCh != nil {
// Cleanup all the remaining open SSTs that have not been consumed.
for sst := range rd.sstCh {
sst.cleanup()
}
}
rd.agg.Close()
rd.InternalClose()
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
close(ssts)
sst := <-ssts
sst, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry)
require.NoError(t, err)
rewriter, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), mockRestoreDataSpec.TableRekeys,
mockRestoreDataSpec.TenantRekeys, false /* restoreTenantFromStream */)
require.NoError(t, err)
Expand Down

0 comments on commit c3b4ec9

Please sign in to comment.