Skip to content

Commit

Permalink
sql: minor cleanups to mvcc merger
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
stevendanna committed Feb 16, 2022
1 parent 4d20f47 commit a5a7319
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 63 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func createSchemaChangeJobsFromMutations(
spanList := make([]jobspb.ResumeSpanList, mutationCount)
for i := range spanList {
mut := tableDesc.Mutations[idx+i]
// Index mutations with UseDeletePreservingEncoding are
// used as temporary indexes that are merged back into
// newly added indexes. Their resume spans are based on
// the index span itself since we iterate over the
// temporary index during the merge process.
if idx := mut.GetIndex(); idx != nil && idx.UseDeletePreservingEncoding {
spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.IndexSpan(codec, idx.ID)}}
} else {
Expand Down
116 changes: 54 additions & 62 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,77 +95,70 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) {
return
}

var err error
go func() {
defer close(progCh)
// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(indexBackfillMergeProgressReportInterval)
defer tick.Stop()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
}
}
})

// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(indexBackfillMergeProgressReportInterval)
defer tick.Stop()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
g.GoCtx(func(ctx context.Context) error {
defer close(stopProgress)
// TODO(rui): some room for improvement on single threaded
// implementation, e.g. run merge for spec spans in parallel.
for i := range ibm.spec.Spans {
sp := ibm.spec.Spans[i]
idx := ibm.spec.SpanIdx[i]

key := sp.Key
for key != nil {
nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx],
key, sp.EndKey, ibm.spec.ChunkSize)
if err != nil {
return err
}
}
})

g.GoCtx(func(ctx context.Context) error {
defer close(stopProgress)

// TODO(rui): some room for improvement on single threaded
// implementation, e.g. run merge for spec spans in parallel.
for i := range ibm.spec.Spans {
sp := ibm.spec.Spans[i]
idx := ibm.spec.SpanIdx[i]

key := sp.Key
for key != nil {
nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx],
key, sp.EndKey, ibm.spec.ChunkSize)
if err != nil {
return err
}

completedSpan := roachpb.Span{}
if nextKey == nil {
completedSpan.Key = key
completedSpan.EndKey = sp.EndKey
} else {
completedSpan.Key = key
completedSpan.EndKey = nextKey
}
completedSpan := roachpb.Span{}
if nextKey == nil {
completedSpan.Key = key
completedSpan.EndKey = sp.EndKey
} else {
completedSpan.Key = key
completedSpan.EndKey = nextKey
}

mu.Lock()
mu.completedSpans = append(mu.completedSpans, completedSpan)
mu.completedSpanIdx = append(mu.completedSpanIdx, idx)
mu.Unlock()
mu.Lock()
mu.completedSpans = append(mu.completedSpans, completedSpan)
mu.completedSpanIdx = append(mu.completedSpanIdx, idx)
mu.Unlock()

if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok {
if knobs != nil && knobs.PushesProgressEveryChunk {
pushProgress()
}
if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok {
if knobs != nil && knobs.PushesProgressEveryChunk {
pushProgress()
}

key = nextKey
}

if err != nil {
return err
}
key = nextKey
}
}
return nil
})

return nil
})

var err error
go func() {
defer close(progCh)
err = g.Wait()
}()

Expand All @@ -174,7 +167,6 @@ func (ibm *IndexBackfillMerger) Run(ctx context.Context) {
if p.CompletedSpans != nil {
log.VEventf(ctx, 2, "sending coordinator completed spans: %+v", p.CompletedSpans)
}

ibm.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (imt *IndexMergeTracker) FlushCheckpoint(ctx context.Context) error {

// FlushFractionCompleted writes out the fraction completed.
func (imt *IndexMergeTracker) FlushFractionCompleted(ctx context.Context) error {
// TODO(rui): The backfiller currently doesn't have a good way to report the
// TODO(#76365): The backfiller currently doesn't have a good way to report the
// total progress of mutations that occur in multiple stages that
// independently report progress. So fraction tracking of the merge will be
// unimplemented for now and the progress fraction will report only the
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/mvcc_backfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/startupmigrations"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -159,6 +160,8 @@ func TestRaceWithIndexBackfillMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "TODO(ssd) test times outs under race")

// protects mergeNotification, writesPopulated
var mu syncutil.Mutex
var mergeNotification chan struct{}
Expand Down

0 comments on commit a5a7319

Please sign in to comment.