Skip to content

Commit

Permalink
sql: distribute the index merging process
Browse files Browse the repository at this point in the history
This distributes and checkpoints the index merging process. The
merging process checkpoint is per temporary index.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
Rui Hu and stevendanna committed Feb 16, 2022
1 parent 5a97a11 commit e47317f
Show file tree
Hide file tree
Showing 19 changed files with 1,343 additions and 137 deletions.
14 changes: 12 additions & 2 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func createSchemaChangeJobsFromMutations(
) error {
mutationJobs := make([]descpb.TableDescriptor_MutationJob, 0, len(tableDesc.Mutations))
seenMutations := make(map[descpb.MutationID]bool)
for _, mutation := range tableDesc.Mutations {
for idx, mutation := range tableDesc.Mutations {
if seenMutations[mutation.MutationID] {
// We've already seen a mutation with this ID, so a job that handles all
// mutations with this ID has already been created.
Expand All @@ -182,7 +182,17 @@ func createSchemaChangeJobsFromMutations(
}
spanList := make([]jobspb.ResumeSpanList, mutationCount)
for i := range spanList {
spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.PrimaryIndexSpan(codec)}}
mut := tableDesc.Mutations[idx+i]
// Index mutations with UseDeletePreservingEncoding are
// used as temporary indexes that are merged back into
// newly added indexes. Their resume spans are based on
// the index span itself since we iterate over the
// temporary index during the merge process.
if idx := mut.GetIndex(); idx != nil && idx.UseDeletePreservingEncoding {
spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.IndexSpan(codec, idx.ID)}}
} else {
spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.PrimaryIndexSpan(codec)}}
}
}
jobRecord := jobs.Record{
// We indicate that this schema change was triggered by a RESTORE since
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/interval",
"//pkg/util/ioctx",
"//pkg/util/iterutil",
"//pkg/util/json",
Expand Down Expand Up @@ -501,6 +502,7 @@ go_test(
"metric_test.go",
"metric_util_test.go",
"mutation_test.go",
"mvcc_backfiller_test.go",
"normalization_test.go",
"partition_test.go",
"pg_metadata_test.go",
Expand Down
122 changes: 113 additions & 9 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ var indexBackfillBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt, /* validateFn */
)

// indexBackfillMergeBatchSize is the maximum number of rows we
// attempt to merge in a single transaction during the merging
// process.
var indexBackfillMergeBatchSize = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.index_backfill.merge_batch_size",
"the number of rows we merge between temporary and adding indexes in a single batch",
1000,
settings.NonNegativeInt, /* validateFn */
)

// columnBackfillBatchSize is the maximum number of rows we update at once when
// adding or removing columns.
var columnBackfillBatchSize = settings.RegisterIntSetting(
Expand Down Expand Up @@ -2012,15 +2023,9 @@ func (sc *SchemaChanger) mergeFromTemporaryIndex(
}); err != nil {
return err
}
table := tabledesc.NewBuilder(&tbl.ClusterVersion).BuildImmutableTable()
for i, addIdx := range addingIndexes {
tempIdx := temporaryIndexes[i]
log.Infof(ctx, "merging from %d -> %d on %v", tempIdx, addIdx, table)
sourceSpan := table.IndexSpan(sc.execCfg.Codec, tempIdx)
err := sc.Merge(ctx, sc.execCfg.Codec, table, tempIdx, addIdx, sourceSpan)
if err != nil {
return err
}
tableDesc := tabledesc.NewBuilder(&tbl.ClusterVersion).BuildImmutableTable()
if err := sc.distIndexMerge(ctx, tableDesc, addingIndexes, temporaryIndexes); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -2607,3 +2612,102 @@ func indexTruncateInTxn(
// Remove index zone configs.
return RemoveIndexZoneConfigs(ctx, txn, execCfg, tableDesc, []uint32{uint32(idx.GetID())})
}

func (sc *SchemaChanger) distIndexMerge(
ctx context.Context,
tableDesc catalog.TableDescriptor,
addedIndexes []descpb.IndexID,
temporaryIndexes []descpb.IndexID,
) error {
// Gather the initial resume spans for the merge process.
progress, err := extractMergeProgress(sc.job, tableDesc, addedIndexes, temporaryIndexes)
if err != nil {
return err
}

log.VEventf(ctx, 2, "indexbackfill merge: initial resume spans %+v", progress.TodoSpans)
if progress.TodoSpans == nil {
return nil
}

// TODO(rui): these can be initialized along with other new schema changer dependencies.
planner := NewIndexBackfillerMergePlanner(sc.execCfg, sc.execCfg.InternalExecutorFactory)
tracker := NewIndexMergeTracker(progress, sc.job)
periodicFlusher := newPeriodicProgressFlusher(sc.settings)

metaFn := func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.BulkProcessorProgress != nil {
idxCompletedSpans := make(map[int32][]roachpb.Span)
for i, sp := range meta.BulkProcessorProgress.CompletedSpans {
spanIdx := meta.BulkProcessorProgress.CompletedSpanIdx[i]
idxCompletedSpans[spanIdx] = append(idxCompletedSpans[spanIdx], sp)
}
tracker.UpdateMergeProgress(ctx, func(_ context.Context, currentProgress *MergeProgress) {
for idx, completedSpans := range idxCompletedSpans {
currentProgress.TodoSpans[idx] = roachpb.SubtractSpans(currentProgress.TodoSpans[idx], completedSpans)
}
})
if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails {
if err := tracker.FlushCheckpoint(ctx); err != nil {
return err
}
}
}
return nil
}

stop := periodicFlusher.StartPeriodicUpdates(ctx, tracker)
defer func() { _ = stop() }()

run, err := planner.plan(ctx, tableDesc, progress.TodoSpans, progress.AddedIndexes,
progress.TemporaryIndexes, metaFn)
if err != nil {
return err
}

if err := run(ctx); err != nil {
return err
}

if err := stop(); err != nil {
return err
}

if err := tracker.FlushCheckpoint(ctx); err != nil {
return err
}

return tracker.FlushFractionCompleted(ctx)
}

func extractMergeProgress(
job *jobs.Job, tableDesc catalog.TableDescriptor, addedIndexes, temporaryIndexes []descpb.IndexID,
) (*MergeProgress, error) {
resumeSpanList := job.Details().(jobspb.SchemaChangeDetails).ResumeSpanList
progress := MergeProgress{}
progress.TemporaryIndexes = temporaryIndexes
progress.AddedIndexes = addedIndexes

const noIdx = -1
findMutIdx := func(id descpb.IndexID) int {
for mutIdx, mut := range tableDesc.AllMutations() {
if mut.AsIndex() != nil && mut.AsIndex().GetID() == id {
return mutIdx
}
}

return noIdx
}

for _, tempIdx := range temporaryIndexes {
mutIdx := findMutIdx(tempIdx)
if mutIdx == noIdx {
return nil, errors.AssertionFailedf("no corresponding mutation for temporary index %d", tempIdx)
}

progress.TodoSpans = append(progress.TodoSpans, resumeSpanList[mutIdx].ResumeSpans)
progress.MutationIdx = append(progress.MutationIdx, mutIdx)
}

return &progress, nil
}
11 changes: 10 additions & 1 deletion pkg/sql/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "backfill",
srcs = ["backfill.go"],
srcs = [
"backfill.go",
"mvcc_index_merger.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/backfill",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/rowinfra",
Expand All @@ -22,9 +29,11 @@ go_library(
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
],
)
Loading

0 comments on commit e47317f

Please sign in to comment.