From 236e9e8da6e5b4826adf93410093499e53b38755 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 15 Feb 2022 12:36:08 +0000 Subject: [PATCH] sql: minor cleanups to mvcc merger Release note: None --- .../restore_schema_change_creation.go | 5 + pkg/sql/backfill/mvcc_index_merger.go | 116 ++++++++---------- 2 files changed, 59 insertions(+), 62 deletions(-) diff --git a/pkg/ccl/backupccl/restore_schema_change_creation.go b/pkg/ccl/backupccl/restore_schema_change_creation.go index 0949beadd64f..0ba004ef91ce 100644 --- a/pkg/ccl/backupccl/restore_schema_change_creation.go +++ b/pkg/ccl/backupccl/restore_schema_change_creation.go @@ -183,6 +183,11 @@ func createSchemaChangeJobsFromMutations( spanList := make([]jobspb.ResumeSpanList, mutationCount) for i := range spanList { mut := tableDesc.Mutations[idx+i] + // Index mutations with UseDeletePreservingEncoding are + // used as temporary indexes that are merged back into + // newly added indexes. Their resume spans are based on + // the index span itself since we iterate over the + // temporary index during the merge process. if idx := mut.GetIndex(); idx != nil && idx.UseDeletePreservingEncoding { spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.IndexSpan(codec, idx.ID)}} } else { diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go index 22eb148dee94..a2a8cd840b96 100644 --- a/pkg/sql/backfill/mvcc_index_merger.go +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -95,77 +95,70 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) { return } - var err error - go func() { - defer close(progCh) + // stopProgress will be closed when there is no more progress to report. + stopProgress := make(chan struct{}) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + tick := time.NewTicker(indexBackfillMergeProgressReportInterval) + defer tick.Stop() + done := ctx.Done() + for { + select { + case <-done: + return ctx.Err() + case <-stopProgress: + return nil + case <-tick.C: + pushProgress() + } + } + }) - // stopProgress will be closed when there is no more progress to report. - stopProgress := make(chan struct{}) - g := ctxgroup.WithContext(ctx) - g.GoCtx(func(ctx context.Context) error { - tick := time.NewTicker(indexBackfillMergeProgressReportInterval) - defer tick.Stop() - done := ctx.Done() - for { - select { - case <-done: - return ctx.Err() - case <-stopProgress: - return nil - case <-tick.C: - pushProgress() + g.GoCtx(func(ctx context.Context) error { + defer close(stopProgress) + // TODO(rui): some room for improvement on single threaded + // implementation, e.g. run merge for spec spans in parallel. + for i := range ibm.spec.Spans { + sp := ibm.spec.Spans[i] + idx := ibm.spec.SpanIdx[i] + + key := sp.Key + for key != nil { + nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx], + key, sp.EndKey, ibm.spec.ChunkSize) + if err != nil { + return err } - } - }) - - g.GoCtx(func(ctx context.Context) error { - defer close(stopProgress) - - // TODO(rui): some room for improvement on single threaded - // implementation, e.g. run merge for spec spans in parallel. - for i := range ibm.spec.Spans { - sp := ibm.spec.Spans[i] - idx := ibm.spec.SpanIdx[i] - - key := sp.Key - for key != nil { - nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx], - key, sp.EndKey, ibm.spec.ChunkSize) - if err != nil { - return err - } - completedSpan := roachpb.Span{} - if nextKey == nil { - completedSpan.Key = key - completedSpan.EndKey = sp.EndKey - } else { - completedSpan.Key = key - completedSpan.EndKey = nextKey - } + completedSpan := roachpb.Span{} + if nextKey == nil { + completedSpan.Key = key + completedSpan.EndKey = sp.EndKey + } else { + completedSpan.Key = key + completedSpan.EndKey = nextKey + } - mu.Lock() - mu.completedSpans = append(mu.completedSpans, completedSpan) - mu.completedSpanIdx = append(mu.completedSpanIdx, idx) - mu.Unlock() + mu.Lock() + mu.completedSpans = append(mu.completedSpans, completedSpan) + mu.completedSpanIdx = append(mu.completedSpanIdx, idx) + mu.Unlock() - if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok { - if knobs != nil && knobs.PushesProgressEveryChunk { - pushProgress() - } + if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok { + if knobs != nil && knobs.PushesProgressEveryChunk { + pushProgress() } - - key = nextKey } - if err != nil { - return err - } + key = nextKey } + } + return nil + }) - return nil - }) - + var err error + go func() { + defer close(progCh) err = g.Wait() }() @@ -174,7 +167,6 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) { if p.CompletedSpans != nil { log.VEventf(ctx, 2, "sending coordinator completed spans: %+v", p.CompletedSpans) } - ibm.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) }