Skip to content

Commit

Permalink
sql: minor cleanups to mvcc merger
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
stevendanna committed Feb 15, 2022
1 parent 787abf6 commit 236e9e8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 62 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func createSchemaChangeJobsFromMutations(
spanList := make([]jobspb.ResumeSpanList, mutationCount)
for i := range spanList {
mut := tableDesc.Mutations[idx+i]
// Index mutations with UseDeletePreservingEncoding are
// used as temporary indexes that are merged back into
// newly added indexes. Their resume spans are based on
// the index span itself since we iterate over the
// temporary index during the merge process.
if idx := mut.GetIndex(); idx != nil && idx.UseDeletePreservingEncoding {
spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.IndexSpan(codec, idx.ID)}}
} else {
Expand Down
116 changes: 54 additions & 62 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,77 +95,70 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) {
return
}

var err error
go func() {
defer close(progCh)
// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(indexBackfillMergeProgressReportInterval)
defer tick.Stop()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
}
}
})

// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(indexBackfillMergeProgressReportInterval)
defer tick.Stop()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
g.GoCtx(func(ctx context.Context) error {
defer close(stopProgress)
// TODO(rui): some room for improvement on single threaded
// implementation, e.g. run merge for spec spans in parallel.
for i := range ibm.spec.Spans {
sp := ibm.spec.Spans[i]
idx := ibm.spec.SpanIdx[i]

key := sp.Key
for key != nil {
nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx],
key, sp.EndKey, ibm.spec.ChunkSize)
if err != nil {
return err
}
}
})

g.GoCtx(func(ctx context.Context) error {
defer close(stopProgress)

// TODO(rui): some room for improvement on single threaded
// implementation, e.g. run merge for spec spans in parallel.
for i := range ibm.spec.Spans {
sp := ibm.spec.Spans[i]
idx := ibm.spec.SpanIdx[i]

key := sp.Key
for key != nil {
nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx],
key, sp.EndKey, ibm.spec.ChunkSize)
if err != nil {
return err
}

completedSpan := roachpb.Span{}
if nextKey == nil {
completedSpan.Key = key
completedSpan.EndKey = sp.EndKey
} else {
completedSpan.Key = key
completedSpan.EndKey = nextKey
}
completedSpan := roachpb.Span{}
if nextKey == nil {
completedSpan.Key = key
completedSpan.EndKey = sp.EndKey
} else {
completedSpan.Key = key
completedSpan.EndKey = nextKey
}

mu.Lock()
mu.completedSpans = append(mu.completedSpans, completedSpan)
mu.completedSpanIdx = append(mu.completedSpanIdx, idx)
mu.Unlock()
mu.Lock()
mu.completedSpans = append(mu.completedSpans, completedSpan)
mu.completedSpanIdx = append(mu.completedSpanIdx, idx)
mu.Unlock()

if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok {
if knobs != nil && knobs.PushesProgressEveryChunk {
pushProgress()
}
if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok {
if knobs != nil && knobs.PushesProgressEveryChunk {
pushProgress()
}

key = nextKey
}

if err != nil {
return err
}
key = nextKey
}
}
return nil
})

return nil
})

var err error
go func() {
defer close(progCh)
err = g.Wait()
}()

Expand All @@ -174,7 +167,6 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) {
if p.CompletedSpans != nil {
log.VEventf(ctx, 2, "sending coordinator completed spans: %+v", p.CompletedSpans)
}

ibm.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p})
}

Expand Down

0 comments on commit 236e9e8

Please sign in to comment.