Skip to content

Commit

Permalink
backupccl: move openSSTs call into the restore workers
Browse files Browse the repository at this point in the history
Currently we see openSSTs being a bottleneck during restore when using
more than 4 workers.

This patch moves the openSSTs call into the worker itself, so that this
work can be parallelized. This is needed for later PR which will increase
the number of workers.

Also, this change simplifies the code a bit and makes it easier to
implement cockroachdb#93324, because in that PR we want to produce a few partial SSTs
that need to be processed serially. Before this patch it wasn't trivial to
make sure that the N workers will not process those partial SSTs in the wrong
order, and with this patch each worker will process a single mergedSST, and
therefore can serialize the partial SSTs created from that mergedSST.

Tested by running a roachtest (4 nodes, 8 cores) with and without this
change. The fixed version was faster: 80MB/s/node vs 60 but some of it is
noise, we do expect a perf improvement when using more workers and other
params tuned, which is the next step.

Informs: cockroachdb#98015
Epic: CRDB-20916

Release note: None
  • Loading branch information
lidorcarmel authored and msbutler committed May 18, 2023
1 parent 1b11ca6 commit fe01128
Show file tree
Hide file tree
Showing 6 changed files with 3,397 additions and 49 deletions.
69 changes: 24 additions & 45 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type restoreDataProcessor struct {
phaseGroup ctxgroup.Group
cancelWorkersAndWait func()

// sstCh is a channel that holds SSTs opened by the processor, but not yet
// ingested.
sstCh chan mergedSST
// Metas from the input are forwarded to the output of this processor.
metaCh chan *execinfrapb.ProducerMetadata
// progress updates are accumulated on this channel. It is populated by the
Expand Down Expand Up @@ -133,15 +130,16 @@ func newRestoreDataProcessor(
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
sv := &flowCtx.Cfg.Settings.SV
numWorkers := int(numRestoreWorkers.Get(sv))

rd := &restoreDataProcessor{
flowCtx: flowCtx,
input: input,
spec: spec,
output: output,
progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers),
metaCh: make(chan *execinfrapb.ProducerMetadata, 1),
numWorkers: int(numRestoreWorkers.Get(sv)),
metaCh: make(chan *execinfrapb.ProducerMetadata, numWorkers),
numWorkers: numWorkers,
}

if err := rd.Init(rd, post, restoreDataOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
Expand Down Expand Up @@ -174,26 +172,14 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {
log.Infof(ctx, "starting restore data")

entries := make(chan execinfrapb.RestoreSpanEntry, rd.numWorkers)
rd.sstCh = make(chan mergedSST, rd.numWorkers)
rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(entries)
return inputReader(ctx, rd.input, entries, rd.metaCh)
})

rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.sstCh)
for entry := range entries {
if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil {
return err
}
}

return nil
})

rd.phaseGroup.GoCtx(func(ctx context.Context) error {
defer close(rd.progCh)
return rd.runRestoreWorkers(ctx, rd.sstCh)
return rd.runRestoreWorkers(ctx, entries)
})
}

Expand Down Expand Up @@ -267,10 +253,8 @@ type mergedSST struct {
}

func (rd *restoreDataProcessor) openSSTs(
ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST,
) error {
ctxDone := ctx.Done()

ctx context.Context, entry execinfrapb.RestoreSpanEntry,
) (mergedSST, error) {
// TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir
// in a given restore span entry
var dirs []cloud.ExternalStorage
Expand All @@ -285,9 +269,9 @@ func (rd *restoreDataProcessor) openSSTs(
}
}()

// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
// getIter returns a multiplexed iterator covering the currently accumulated
// files over the channel.
getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) (mergedSST, error) {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
Expand All @@ -306,14 +290,8 @@ func (rd *restoreDataProcessor) openSSTs(
cleanup: cleanup,
}

select {
case sstCh <- mSST:
case <-ctxDone:
return ctx.Err()
}

dirs = make([]cloud.ExternalStorage, 0)
return nil
return mSST, nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)
Expand All @@ -324,12 +302,12 @@ func (rd *restoreDataProcessor) openSSTs(

dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir)
if err != nil {
return err
return mergedSST{}, err
}
dirs = append(dirs, dir)
storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path})
// TODO(pbardea): When memory monitoring is added, send the currently
// accumulated iterators on the channel if we run into memory pressure.
// TODO(pbardea): When memory monitoring is added, return the currently
// accumulated iterators if we run into memory pressure.
}
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
Expand All @@ -339,12 +317,14 @@ func (rd *restoreDataProcessor) openSSTs(
}
iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts)
if err != nil {
return err
return mergedSST{}, err
}
return sendIter(iter, dirs)
return getIter(iter, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
func (rd *restoreDataProcessor) runRestoreWorkers(
ctx context.Context, entries chan execinfrapb.RestoreSpanEntry,
) error {
return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error {
kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys,
false /* restoreTenantFromStream */)
Expand All @@ -358,12 +338,17 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan

for {
done, err := func() (done bool, _ error) {
sstIter, ok := <-ssts
entry, ok := <-entries
if !ok {
done = true
return done, nil
}

sstIter, err := rd.openSSTs(ctx, entry)
if err != nil {
return done, err
}

summary, err := rd.processRestoreSpanEntry(ctx, kr, sstIter)
if err != nil {
return done, err
Expand Down Expand Up @@ -572,12 +557,6 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
return
}
rd.cancelWorkersAndWait()
if rd.sstCh != nil {
// Cleanup all the remaining open SSTs that have not been consumed.
for sst := range rd.sstCh {
sst.cleanup()
}
}
rd.agg.Close()
rd.InternalClose()
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {

mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec)
require.NoError(t, err)
ssts := make(chan mergedSST, 1)
require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts))
close(ssts)
sst := <-ssts
sst, err := mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry)
require.NoError(t, err)
rewriter, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), mockRestoreDataSpec.TableRekeys,
mockRestoreDataSpec.TenantRekeys, false /* restoreTenantFromStream */)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit fe01128

Please sign in to comment.