diff --git a/pkg/ccl/backupccl/restore_schema_change_creation.go b/pkg/ccl/backupccl/restore_schema_change_creation.go index 3815b79d4415..0ba004ef91ce 100644 --- a/pkg/ccl/backupccl/restore_schema_change_creation.go +++ b/pkg/ccl/backupccl/restore_schema_change_creation.go @@ -168,7 +168,7 @@ func createSchemaChangeJobsFromMutations( ) error { mutationJobs := make([]descpb.TableDescriptor_MutationJob, 0, len(tableDesc.Mutations)) seenMutations := make(map[descpb.MutationID]bool) - for _, mutation := range tableDesc.Mutations { + for idx, mutation := range tableDesc.Mutations { if seenMutations[mutation.MutationID] { // We've already seen a mutation with this ID, so a job that handles all // mutations with this ID has already been created. @@ -182,7 +182,17 @@ func createSchemaChangeJobsFromMutations( } spanList := make([]jobspb.ResumeSpanList, mutationCount) for i := range spanList { - spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.PrimaryIndexSpan(codec)}} + mut := tableDesc.Mutations[idx+i] + // Index mutations with UseDeletePreservingEncoding are + // used as temporary indexes that are merged back into + // newly added indexes. Their resume spans are based on + // the index span itself since we iterate over the + // temporary index during the merge process. + if idx := mut.GetIndex(); idx != nil && idx.UseDeletePreservingEncoding { + spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.IndexSpan(codec, idx.ID)}} + } else { + spanList[i] = jobspb.ResumeSpanList{ResumeSpans: []roachpb.Span{tableDesc.PrimaryIndexSpan(codec)}} + } } jobRecord := jobs.Record{ // We indicate that this schema change was triggered by a RESTORE since diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index e13fbc7f7204..646718fac821 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -407,6 +407,7 @@ go_library( "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/humanizeutil", + "//pkg/util/interval", "//pkg/util/ioctx", "//pkg/util/iterutil", "//pkg/util/json", @@ -501,6 +502,7 @@ go_test( "metric_test.go", "metric_util_test.go", "mutation_test.go", + "mvcc_backfiller_test.go", "normalization_test.go", "partition_test.go", "pg_metadata_test.go", diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 4e8d49bc216a..ed73b02a22ee 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -82,6 +82,17 @@ var indexBackfillBatchSize = settings.RegisterIntSetting( settings.NonNegativeInt, /* validateFn */ ) +// indexBackfillMergeBatchSize is the maximum number of rows we +// attempt to merge in a single transaction during the merging +// process. +var indexBackfillMergeBatchSize = settings.RegisterIntSetting( + settings.TenantWritable, + "bulkio.index_backfill.merge_batch_size", + "the number of rows we merge between temporary and adding indexes in a single batch", + 1000, + settings.NonNegativeInt, /* validateFn */ +) + // columnBackfillBatchSize is the maximum number of rows we update at once when // adding or removing columns. var columnBackfillBatchSize = settings.RegisterIntSetting( @@ -2012,15 +2023,9 @@ func (sc *SchemaChanger) mergeFromTemporaryIndex( }); err != nil { return err } - table := tabledesc.NewBuilder(&tbl.ClusterVersion).BuildImmutableTable() - for i, addIdx := range addingIndexes { - tempIdx := temporaryIndexes[i] - log.Infof(ctx, "merging from %d -> %d on %v", tempIdx, addIdx, table) - sourceSpan := table.IndexSpan(sc.execCfg.Codec, tempIdx) - err := sc.Merge(ctx, sc.execCfg.Codec, table, tempIdx, addIdx, sourceSpan) - if err != nil { - return err - } + tableDesc := tabledesc.NewBuilder(&tbl.ClusterVersion).BuildImmutableTable() + if err := sc.distIndexMerge(ctx, tableDesc, addingIndexes, temporaryIndexes); err != nil { + return err } return nil } @@ -2607,3 +2612,102 @@ func indexTruncateInTxn( // Remove index zone configs. return RemoveIndexZoneConfigs(ctx, txn, execCfg, tableDesc, []uint32{uint32(idx.GetID())}) } + +func (sc *SchemaChanger) distIndexMerge( + ctx context.Context, + tableDesc catalog.TableDescriptor, + addedIndexes []descpb.IndexID, + temporaryIndexes []descpb.IndexID, +) error { + // Gather the initial resume spans for the merge process. + progress, err := extractMergeProgress(sc.job, tableDesc, addedIndexes, temporaryIndexes) + if err != nil { + return err + } + + log.VEventf(ctx, 2, "indexbackfill merge: initial resume spans %+v", progress.TodoSpans) + if progress.TodoSpans == nil { + return nil + } + + // TODO(rui): these can be initialized along with other new schema changer dependencies. + planner := NewIndexBackfillerMergePlanner(sc.execCfg, sc.execCfg.InternalExecutorFactory) + tracker := NewIndexMergeTracker(progress, sc.job) + periodicFlusher := newPeriodicProgressFlusher(sc.settings) + + metaFn := func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error { + if meta.BulkProcessorProgress != nil { + idxCompletedSpans := make(map[int32][]roachpb.Span) + for i, sp := range meta.BulkProcessorProgress.CompletedSpans { + spanIdx := meta.BulkProcessorProgress.CompletedSpanIdx[i] + idxCompletedSpans[spanIdx] = append(idxCompletedSpans[spanIdx], sp) + } + tracker.UpdateMergeProgress(ctx, func(_ context.Context, currentProgress *MergeProgress) { + for idx, completedSpans := range idxCompletedSpans { + currentProgress.TodoSpans[idx] = roachpb.SubtractSpans(currentProgress.TodoSpans[idx], completedSpans) + } + }) + if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails { + if err := tracker.FlushCheckpoint(ctx); err != nil { + return err + } + } + } + return nil + } + + stop := periodicFlusher.StartPeriodicUpdates(ctx, tracker) + defer func() { _ = stop() }() + + run, err := planner.plan(ctx, tableDesc, progress.TodoSpans, progress.AddedIndexes, + progress.TemporaryIndexes, metaFn) + if err != nil { + return err + } + + if err := run(ctx); err != nil { + return err + } + + if err := stop(); err != nil { + return err + } + + if err := tracker.FlushCheckpoint(ctx); err != nil { + return err + } + + return tracker.FlushFractionCompleted(ctx) +} + +func extractMergeProgress( + job *jobs.Job, tableDesc catalog.TableDescriptor, addedIndexes, temporaryIndexes []descpb.IndexID, +) (*MergeProgress, error) { + resumeSpanList := job.Details().(jobspb.SchemaChangeDetails).ResumeSpanList + progress := MergeProgress{} + progress.TemporaryIndexes = temporaryIndexes + progress.AddedIndexes = addedIndexes + + const noIdx = -1 + findMutIdx := func(id descpb.IndexID) int { + for mutIdx, mut := range tableDesc.AllMutations() { + if mut.AsIndex() != nil && mut.AsIndex().GetID() == id { + return mutIdx + } + } + + return noIdx + } + + for _, tempIdx := range temporaryIndexes { + mutIdx := findMutIdx(tempIdx) + if mutIdx == noIdx { + return nil, errors.AssertionFailedf("no corresponding mutation for temporary index %d", tempIdx) + } + + progress.TodoSpans = append(progress.TodoSpans, resumeSpanList[mutIdx].ResumeSpans) + progress.MutationIdx = append(progress.MutationIdx, mutIdx) + } + + return &progress, nil +} diff --git a/pkg/sql/backfill/BUILD.bazel b/pkg/sql/backfill/BUILD.bazel index 383da95b8d2f..c73bcaec9a51 100644 --- a/pkg/sql/backfill/BUILD.bazel +++ b/pkg/sql/backfill/BUILD.bazel @@ -2,18 +2,25 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "backfill", - srcs = ["backfill.go"], + srcs = [ + "backfill.go", + "mvcc_index_merger.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/backfill", visibility = ["//visibility:public"], deps = [ + "//pkg/base", + "//pkg/keys", "//pkg/kv", "//pkg/roachpb", "//pkg/settings", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", "//pkg/sql/execinfra", + "//pkg/sql/execinfrapb", "//pkg/sql/row", "//pkg/sql/rowenc", "//pkg/sql/rowinfra", @@ -22,9 +29,11 @@ go_library( "//pkg/sql/sqlerrors", "//pkg/sql/types", "//pkg/util", + "//pkg/util/ctxgroup", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", ], ) diff --git a/pkg/sql/backfill/mvcc_index_merger.go b/pkg/sql/backfill/mvcc_index_merger.go new file mode 100644 index 000000000000..a2a8cd840b96 --- /dev/null +++ b/pkg/sql/backfill/mvcc_index_merger.go @@ -0,0 +1,317 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package backfill + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +// IndexBackfillMerger is a processor that merges entries from the corresponding +// temporary index to a new index. +type IndexBackfillMerger struct { + spec execinfrapb.IndexBackfillMergerSpec + + desc catalog.TableDescriptor + + out execinfra.ProcOutputHelper + + flowCtx *execinfra.FlowCtx + + evalCtx *tree.EvalContext + + output execinfra.RowReceiver +} + +// OutputTypes is always nil. +func (ibm *IndexBackfillMerger) OutputTypes() []*types.T { + return nil +} + +// MustBeStreaming is always false. +func (ibm *IndexBackfillMerger) MustBeStreaming() bool { + return false +} + +const indexBackfillMergeProgressReportInterval = 10 * time.Second + +// Run runs the processor. +func (ibm *IndexBackfillMerger) Run(ctx context.Context) { + opName := "IndexBackfillMerger" + ctx = logtags.AddTag(ctx, opName, int(ibm.spec.Table.ID)) + ctx, span := execinfra.ProcessorSpan(ctx, opName) + defer span.Finish() + defer ibm.output.ProducerDone() + defer execinfra.SendTraceData(ctx, ibm.output) + + mu := struct { + syncutil.Mutex + completedSpans []roachpb.Span + completedSpanIdx []int32 + }{} + + progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + pushProgress := func() { + mu.Lock() + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + prog.CompletedSpans = append(prog.CompletedSpans, mu.completedSpans...) + mu.completedSpans = nil + prog.CompletedSpanIdx = append(prog.CompletedSpanIdx, mu.completedSpanIdx...) + mu.completedSpanIdx = nil + mu.Unlock() + + progCh <- prog + } + + semaCtx := tree.MakeSemaContext() + if err := ibm.out.Init(&execinfrapb.PostProcessSpec{}, nil, &semaCtx, ibm.flowCtx.NewEvalCtx()); err != nil { + ibm.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) + return + } + + // stopProgress will be closed when there is no more progress to report. + stopProgress := make(chan struct{}) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + tick := time.NewTicker(indexBackfillMergeProgressReportInterval) + defer tick.Stop() + done := ctx.Done() + for { + select { + case <-done: + return ctx.Err() + case <-stopProgress: + return nil + case <-tick.C: + pushProgress() + } + } + }) + + g.GoCtx(func(ctx context.Context) error { + defer close(stopProgress) + // TODO(rui): some room for improvement on single threaded + // implementation, e.g. run merge for spec spans in parallel. + for i := range ibm.spec.Spans { + sp := ibm.spec.Spans[i] + idx := ibm.spec.SpanIdx[i] + + key := sp.Key + for key != nil { + nextKey, err := ibm.Merge(ctx, ibm.evalCtx.Codec, ibm.desc, ibm.spec.TemporaryIndexes[idx], ibm.spec.AddedIndexes[idx], + key, sp.EndKey, ibm.spec.ChunkSize) + if err != nil { + return err + } + + completedSpan := roachpb.Span{} + if nextKey == nil { + completedSpan.Key = key + completedSpan.EndKey = sp.EndKey + } else { + completedSpan.Key = key + completedSpan.EndKey = nextKey + } + + mu.Lock() + mu.completedSpans = append(mu.completedSpans, completedSpan) + mu.completedSpanIdx = append(mu.completedSpanIdx, idx) + mu.Unlock() + + if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok { + if knobs != nil && knobs.PushesProgressEveryChunk { + pushProgress() + } + } + + key = nextKey + } + } + return nil + }) + + var err error + go func() { + defer close(progCh) + err = g.Wait() + }() + + for prog := range progCh { + p := prog + if p.CompletedSpans != nil { + log.VEventf(ctx, 2, "sending coordinator completed spans: %+v", p.CompletedSpans) + } + ibm.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) + } + + if err != nil { + ibm.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) + } +} + +var _ execinfra.Processor = &IndexBackfillMerger{} + +// Merge merges the entries from startKey to endKey from the index with sourceID +// into the index with destinationID, up to a maximum of chunkSize entries. +func (ibm *IndexBackfillMerger) Merge( + ctx context.Context, + codec keys.SQLCodec, + table catalog.TableDescriptor, + sourceID descpb.IndexID, + destinationID descpb.IndexID, + startKey roachpb.Key, + endKey roachpb.Key, + chunkSize int64, +) (roachpb.Key, error) { + sourcePrefix := rowenc.MakeIndexKeyPrefix(codec, table.GetID(), sourceID) + prefixLen := len(sourcePrefix) + destPrefix := rowenc.MakeIndexKeyPrefix(codec, table.GetID(), destinationID) + + key := startKey + destKey := make([]byte, len(destPrefix)) + + if knobs, ok := ibm.flowCtx.Cfg.TestingKnobs.IndexBackfillMergerTestingKnobs.(*IndexBackfillMergerTestingKnobs); ok { + if knobs != nil && knobs.RunBeforeMergeChunk != nil { + if err := knobs.RunBeforeMergeChunk(key); err != nil { + return nil, err + } + } + } + + err := ibm.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // For now just grab all of the destination KVs and merge the corresponding entries. + kvs, err := txn.Scan(ctx, key, endKey, chunkSize) + if err != nil { + return err + } + + if len(kvs) == 0 { + key = nil + return nil + } + + destKeys := make([]roachpb.Key, len(kvs)) + for i := range kvs { + sourceKV := &kvs[i] + + if len(sourceKV.Key) < prefixLen { + return errors.Errorf("Key for index entry %v does not start with prefix %v", sourceKV, sourcePrefix) + } + + destKey = destKey[:0] + destKey = append(destKey, destPrefix...) + destKey = append(destKey, sourceKV.Key[prefixLen:]...) + destKeys[i] = make([]byte, len(destKey)) + copy(destKeys[i], destKey) + } + + wb := txn.NewBatch() + for i := range kvs { + mergedEntry, deleted, err := mergeEntry(&kvs[i], destKeys[i]) + if err != nil { + return err + } + + if deleted { + wb.Del(mergedEntry.Key) + } else { + wb.Put(mergedEntry.Key, mergedEntry.Value) + } + } + + if err := txn.Run(ctx, wb); err != nil { + return err + } + + key = kvs[len(kvs)-1].Key.Next() + return nil + }) + + if err != nil { + return nil, err + } + + return key, nil +} + +func mergeEntry(sourceKV *kv.KeyValue, destKey roachpb.Key) (*kv.KeyValue, bool, error) { + var destTagAndData []byte + var deleted bool + + tempWrapper, err := rowenc.DecodeWrapper(sourceKV.Value) + if err != nil { + return nil, false, err + } + + if tempWrapper.Deleted { + deleted = true + } else { + destTagAndData = tempWrapper.Value + } + + value := &roachpb.Value{} + value.SetTagAndData(destTagAndData) + + return &kv.KeyValue{ + Key: destKey, + Value: value, + }, deleted, nil +} + +// NewIndexBackfillMerger creates a new IndexBackfillMerger. +func NewIndexBackfillMerger( + flowCtx *execinfra.FlowCtx, + spec execinfrapb.IndexBackfillMergerSpec, + output execinfra.RowReceiver, +) (*IndexBackfillMerger, error) { + return &IndexBackfillMerger{ + spec: spec, + desc: tabledesc.NewUnsafeImmutable(&spec.Table), + flowCtx: flowCtx, + evalCtx: flowCtx.NewEvalCtx(), + output: output, + }, nil +} + +// IndexBackfillMergerTestingKnobs is for testing the distributed processors for +// the index backfill merge step. +type IndexBackfillMergerTestingKnobs struct { + // RunBeforeMergeChunk is called once before the merge of each chunk. It is + // called with starting key of the chunk. + RunBeforeMergeChunk func(startKey roachpb.Key) error + + // PushesProgressEveryChunk forces the process to push the merge process after + // every chunk. + PushesProgressEveryChunk bool +} + +var _ base.ModuleTestingKnobs = &IndexBackfillMergerTestingKnobs{} + +// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. +func (*IndexBackfillMergerTestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index 252ed1f657fb..bfdcb7702150 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -17,12 +17,12 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/rowencpb" @@ -474,7 +476,7 @@ func compareVersionedValueWrappers( // This test tests that the schema changer is able to merge entries from a // delete-preserving index into a regular index. -func TestMergeProcess(t *testing.T) { +func TestMergeProcessor(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() @@ -587,13 +589,19 @@ func TestMergeProcess(t *testing.T) { codec := keys.SystemSQLCodec tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "d", "t") - lm := server.LeaseManager().(*lease.Manager) settings := server.ClusterSettings() execCfg := server.ExecutorConfig().(sql.ExecutorConfig) - jr := server.JobRegistry().(*jobs.Registry) + evalCtx := tree.EvalContext{Settings: settings} + flowCtx := execinfra.FlowCtx{Cfg: &execinfra.ServerConfig{DB: kvDB, + Settings: settings, + Codec: codec, + }, + EvalCtx: &evalCtx} - changer := sql.NewSchemaChangerForTesting( - tableDesc.GetID(), 1, execCfg.NodeID.SQLInstanceID(), kvDB, lm, jr, &execCfg, settings) + im, err := backfill.NewIndexBackfillMerger(&flowCtx, execinfrapb.IndexBackfillMergerSpec{}, nil) + if err != nil { + t.Fatal(err) + } // Here want to have different entries for the two indices, so we manipulate // the index to DELETE_ONLY when we don't want to write to it, and @@ -605,7 +613,7 @@ func TestMergeProcess(t *testing.T) { } } - err := mutateIndexByName(kvDB, codec, tableDesc, test.dstIndex, nil, descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY) + err = mutateIndexByName(kvDB, codec, tableDesc, test.dstIndex, nil, descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY) require.NoError(t, err) err = mutateIndexByName(kvDB, codec, tableDesc, test.srcIndex, setUseDeletePreservingEncoding(true), descpb.DescriptorMutation_DELETE_ONLY) require.NoError(t, err) @@ -657,13 +665,9 @@ func TestMergeProcess(t *testing.T) { return nil })) - if err := changer.Merge(context.Background(), - codec, - tableDesc, - srcIndex.GetID(), - dstIndex.GetID(), - tableDesc.IndexSpan(codec, srcIndex.GetID()), - ); err != nil { + sp := tableDesc.IndexSpan(codec, srcIndex.GetID()) + _, err = im.Merge(context.Background(), codec, tableDesc, srcIndex.GetID(), dstIndex.GetID(), sp.Key, sp.EndKey, 1000) + if err != nil { t.Fatal(err) } diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index 6f559ef4fd99..8cd1d37ca98b 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -12,6 +12,7 @@ package sql import ( "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -19,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/interval" + "github.com/cockroachdb/errors" ) func initColumnBackfillerSpec( @@ -51,6 +54,20 @@ func initIndexBackfillerSpec( }, nil } +func initIndexBackfillMergerSpec( + desc descpb.TableDescriptor, + chunkSize int64, + addedIndexes []descpb.IndexID, + temporaryIndexes []descpb.IndexID, +) (execinfrapb.IndexBackfillMergerSpec, error) { + return execinfrapb.IndexBackfillMergerSpec{ + Table: desc, + ChunkSize: chunkSize, + AddedIndexes: addedIndexes, + TemporaryIndexes: temporaryIndexes, + }, nil +} + // createBackfiller generates a plan consisting of index/column backfiller // processors, one for each node that has spans that we are reading. The plan is // finalized. @@ -85,3 +102,82 @@ func (dsp *DistSQLPlanner) createBackfillerPhysicalPlan( dsp.FinalizePlan(planCtx, p) return p, nil } + +// createIndexBackfillerMergePhysicalPlan generates a plan consisting +// of index merger processors, one for each node that has spans that +// we are reading. The plan is finalized. +func (dsp *DistSQLPlanner) createIndexBackfillerMergePhysicalPlan( + planCtx *PlanningCtx, spec execinfrapb.IndexBackfillMergerSpec, spans [][]roachpb.Span, +) (*PhysicalPlan, error) { + + var n int + for _, sp := range spans { + for range sp { + n++ + } + } + indexSpans := make([]roachpb.Span, 0, n) + spanIdxs := make([]spanAndIndex, 0, n) + spanIdxTree := interval.NewTree(interval.ExclusiveOverlapper) + for i := range spans { + for j := range spans[i] { + indexSpans = append(indexSpans, spans[i][j]) + spanIdxs = append(spanIdxs, spanAndIndex{Span: spans[i][j], idx: i}) + if err := spanIdxTree.Insert(&spanIdxs[len(spanIdxs)-1], true /* fast */); err != nil { + return nil, err + } + + } + } + spanIdxTree.AdjustRanges() + getIndex := func(sp roachpb.Span) (idx int) { + if !spanIdxTree.DoMatching(func(i interval.Interface) (done bool) { + idx = i.(*spanAndIndex).idx + return true + }, sp.AsRange()) { + panic(errors.AssertionFailedf("no matching index found for span: %s", sp)) + } + return idx + } + + spanPartitions, err := dsp.PartitionSpans(planCtx, indexSpans) + if err != nil { + return nil, err + } + + p := planCtx.NewPhysicalPlan() + p.ResultRouters = make([]physicalplan.ProcessorIdx, len(spanPartitions)) + for i, sp := range spanPartitions { + ibm := &execinfrapb.IndexBackfillMergerSpec{} + *ibm = spec + + ibm.Spans = sp.Spans + for _, sp := range ibm.Spans { + ibm.SpanIdx = append(ibm.SpanIdx, int32(getIndex(sp))) + } + + proc := physicalplan.Processor{ + SQLInstanceID: sp.SQLInstanceID, + Spec: execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{IndexBackfillMerger: ibm}, + Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}}, + ResultTypes: []*types.T{}, + }, + } + + pIdx := p.AddProcessor(proc) + p.ResultRouters[i] = pIdx + } + dsp.FinalizePlan(planCtx, p) + return p, nil +} + +type spanAndIndex struct { + roachpb.Span + idx int +} + +var _ interval.Interface = (*spanAndIndex)(nil) + +func (si *spanAndIndex) Range() interval.Range { return si.AsRange() } +func (si *spanAndIndex) ID() uintptr { return uintptr(unsafe.Pointer(si)) } diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index f3a8e4c2c14f..eace636f896a 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -264,6 +264,10 @@ type TestingKnobs struct { // StreamingTestingKnobs are backup and restore specific testing knobs. StreamingTestingKnobs base.ModuleTestingKnobs + + // IndexBackfillMergerTestingKnobs are the index backfill merger specific + // testing knobs. + IndexBackfillMergerTestingKnobs base.ModuleTestingKnobs } // MetadataTestLevel represents the types of queries where metadata test diff --git a/pkg/sql/execinfrapb/data.proto b/pkg/sql/execinfrapb/data.proto index da12cda7b420..f6152df3b15a 100644 --- a/pkg/sql/execinfrapb/data.proto +++ b/pkg/sql/execinfrapb/data.proto @@ -286,6 +286,7 @@ message RemoteProducerMetadata { // Used to stream back progress to the coordinator of a bulk job. optional google.protobuf.Any progress_details = 4 [(gogoproto.nullable) = false]; optional roachpb.BulkOpSummary bulk_summary = 5 [(gogoproto.nullable) = false]; + repeated int32 completed_span_idx = 6; } // Metrics are unconditionally emitted by table readers. message Metrics { diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 287a87a060c9..099b4de3eb8f 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -124,6 +124,8 @@ message ProcessorCoreUnion { optional StreamIngestionDataSpec streamIngestionData = 35; optional StreamIngestionFrontierSpec streamIngestionFrontier = 36; optional ExportSpec exporter = 37; + optional IndexBackfillMergerSpec indexBackfillMerger = 38; + reserved 6, 12; } @@ -154,4 +156,3 @@ message MetadataTestSenderSpec { message MetadataTestReceiverSpec { repeated string sender_ids = 1 [(gogoproto.customname) = "SenderIDs"]; } - diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 557713708c66..6838886d52aa 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -306,3 +306,15 @@ message ExportSpec { message BulkRowWriterSpec { optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false]; } + +message IndexBackfillMergerSpec { + optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false]; + + repeated uint32 temporary_indexes = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"]; + repeated uint32 added_indexes = 3 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"]; + + repeated roachpb.Span spans = 4 [(gogoproto.nullable) = false]; + repeated int32 span_idx = 5; + + optional int64 chunk_size = 6 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index a86f6d22a559..54e25b9ba176 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -12,115 +12,208 @@ package sql import ( "context" + "time" - "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) -// Merge merges the entries in the provide span sourceSpan from the index with -// sourceID into the index with destinationID. -func (sc *SchemaChanger) Merge( - ctx context.Context, - codec keys.SQLCodec, - table catalog.TableDescriptor, - sourceID descpb.IndexID, - destinationID descpb.IndexID, - sourceSpan roachpb.Span, -) error { - sourcePrefix := rowenc.MakeIndexKeyPrefix(codec, table.GetID(), sourceID) - prefixLen := len(sourcePrefix) - destPrefix := rowenc.MakeIndexKeyPrefix(codec, table.GetID(), destinationID) - - const pageSize = 1000 - key := sourceSpan.Key - destKey := make([]byte, len(destPrefix)) - - for key != nil { - err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // For now just grab all of the destination KVs and merge the corresponding entries. - kvs, err := txn.Scan(ctx, key, sourceSpan.EndKey, int64(pageSize)) - if err != nil { - return err - } - - if len(kvs) == 0 { - key = nil - return nil - } - - destKeys := make([]roachpb.Key, len(kvs)) - for i := range kvs { - sourceKV := &kvs[i] - - if len(sourceKV.Key) < prefixLen { - return errors.Errorf("Key for index entry %v does not start with prefix %v", sourceKV, sourceSpan.Key) - } - - destKey = destKey[:0] - destKey = append(destKey, destPrefix...) - destKey = append(destKey, sourceKV.Key[prefixLen:]...) - destKeys[i] = make([]byte, len(destKey)) - copy(destKeys[i], destKey) - } - - wb := txn.NewBatch() - for i := range kvs { - mergedEntry, deleted, err := mergeEntry(&kvs[i], destKeys[i]) - if err != nil { - return err - } - - // We can blindly put and delete values during the merge since any - // uniqueness variations in the merged index will be caught by - // ValidateForwardIndexes and ValidateInvertedIndexes during validation. - if deleted { - wb.Del(mergedEntry.Key) - } else { - wb.Put(mergedEntry.Key, mergedEntry.Value) - } - } - - if err := txn.Run(ctx, wb); err != nil { - return err - } - - key = kvs[len(kvs)-1].Key.Next() - return nil - }) +// IndexBackfillerMergePlanner holds dependencies for the merge step of the +// index backfiller. +type IndexBackfillerMergePlanner struct { + execCfg *ExecutorConfig + ieFactory sqlutil.SessionBoundInternalExecutorFactory +} + +// NewIndexBackfillerMergePlanner creates a new IndexBackfillerMergePlanner. +func NewIndexBackfillerMergePlanner( + execCfg *ExecutorConfig, ieFactory sqlutil.SessionBoundInternalExecutorFactory, +) *IndexBackfillerMergePlanner { + return &IndexBackfillerMergePlanner{execCfg: execCfg, ieFactory: ieFactory} +} +func (im *IndexBackfillerMergePlanner) plan( + ctx context.Context, + tableDesc catalog.TableDescriptor, + todoSpanList [][]roachpb.Span, + addedIndexes, temporaryIndexes []descpb.IndexID, + metaFn func(_ context.Context, meta *execinfrapb.ProducerMetadata) error, +) (func(context.Context) error, error) { + var p *PhysicalPlan + var evalCtx extendedEvalContext + var planCtx *PlanningCtx + + if err := DescsTxn(ctx, im.execCfg, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + evalCtx = createSchemaChangeEvalCtx(ctx, im.execCfg, txn.ReadTimestamp(), descriptors) + planCtx = im.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn, + true /* distribute */) + chunkSize := indexBackfillMergeBatchSize.Get(&im.execCfg.Settings.SV) + + spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), chunkSize, addedIndexes, temporaryIndexes) if err != nil { return err } + p, err = im.execCfg.DistSQLPlanner.createIndexBackfillerMergePhysicalPlan(planCtx, spec, todoSpanList) + return err + }); err != nil { + return nil, err } - return nil + return func(ctx context.Context) error { + cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn} + recv := MakeDistSQLReceiver( + ctx, + &cbw, + tree.Rows, /* stmtType - doesn't matter here since no result are produced */ + im.execCfg.RangeDescriptorCache, + nil, /* txn - the flow does not run wholly in a txn */ + im.execCfg.Clock, + evalCtx.Tracing, + im.execCfg.ContentionRegistry, + nil, /* testingPushCallback */ + ) + defer recv.Release() + evalCtxCopy := evalCtx + im.execCfg.DistSQLPlanner.Run( + planCtx, + nil, /* txn - the processors manage their own transactions */ + p, recv, &evalCtxCopy, + nil, /* finishedSetupFn */ + )() + return cbw.Err() + }, nil +} + +// MergeProgress tracks the progress for an index backfill merge. +type MergeProgress struct { + // TodoSpans contains the all the spans for all the temporary + // indexes that still need to be merged. + TodoSpans [][]roachpb.Span + + // MutationIdx contains the indexes of the mutations for the + // temporary indexes in the list of mutations. + MutationIdx []int + + // AddedIndexes and TemporaryIndexes contain the index IDs for + // all newly added indexes and their corresponding temporary + // index. + AddedIndexes, TemporaryIndexes []descpb.IndexID +} + +// Copy returns a copy of this MergeProcess. Note that roachpb.Span's +// aren't deep copied. +func (mp *MergeProgress) Copy() *MergeProgress { + newp := &MergeProgress{ + TodoSpans: make([][]roachpb.Span, len(mp.TodoSpans)), + MutationIdx: make([]int, len(mp.MutationIdx)), + AddedIndexes: make([]descpb.IndexID, len(mp.AddedIndexes)), + TemporaryIndexes: make([]descpb.IndexID, len(mp.TemporaryIndexes)), + } + copy(newp.MutationIdx, mp.MutationIdx) + copy(newp.AddedIndexes, mp.AddedIndexes) + copy(newp.TemporaryIndexes, mp.TemporaryIndexes) + for i, spanSlice := range mp.TodoSpans { + newSpanSlice := make([]roachpb.Span, len(spanSlice)) + copy(newSpanSlice, spanSlice) + newp.TodoSpans[i] = newSpanSlice + } + return newp +} + +// IndexMergeTracker abstracts the infrastructure to read and write merge +// progress to job state. +type IndexMergeTracker struct { + mu struct { + syncutil.Mutex + progress *MergeProgress + } + + jobMu struct { + syncutil.Mutex + job *jobs.Job + } } -func mergeEntry(sourceKV *kv.KeyValue, destKey roachpb.Key) (*kv.KeyValue, bool, error) { - var destTagAndData []byte - var deleted bool +var _ scexec.BackfillProgressFlusher = (*IndexMergeTracker)(nil) - tempWrapper, err := rowenc.DecodeWrapper(sourceKV.Value) - if err != nil { - return nil, false, err +// NewIndexMergeTracker creates a new IndexMergeTracker +func NewIndexMergeTracker(progress *MergeProgress, job *jobs.Job) *IndexMergeTracker { + imt := IndexMergeTracker{} + imt.mu.progress = progress.Copy() + imt.jobMu.job = job + return &imt +} + +// FlushCheckpoint writes out a checkpoint containing any data which +// has been previously updated via UpdateMergeProgress. +func (imt *IndexMergeTracker) FlushCheckpoint(ctx context.Context) error { + imt.jobMu.Lock() + defer imt.jobMu.Unlock() + + imt.mu.Lock() + if imt.mu.progress.TodoSpans == nil { + imt.mu.Unlock() + return nil } + progress := imt.mu.progress.Copy() + imt.mu.Unlock() - if tempWrapper.Deleted { - deleted = true - } else { - destTagAndData = tempWrapper.Value + details, ok := imt.jobMu.job.Details().(jobspb.SchemaChangeDetails) + if !ok { + return errors.Errorf("expected SchemaChangeDetails job type, got %T", imt.jobMu.job.Details()) } - value := &roachpb.Value{} - value.SetTagAndData(destTagAndData) + for idx := range progress.TodoSpans { + details.ResumeSpanList[progress.MutationIdx[idx]].ResumeSpans = progress.TodoSpans[idx] + } + + return imt.jobMu.job.SetDetails(ctx, nil, details) +} + +// FlushFractionCompleted writes out the fraction completed. +func (imt *IndexMergeTracker) FlushFractionCompleted(ctx context.Context) error { + // TODO(#76365): The backfiller currently doesn't have a good way to report the + // total progress of mutations that occur in multiple stages that + // independently report progress. So fraction tracking of the merge will be + // unimplemented for now and the progress fraction will report only the + // progress of the backfilling stage. + return nil +} + +// UpdateMergeProgress allow the caller to modify the current progress with updateFn. +func (imt *IndexMergeTracker) UpdateMergeProgress( + ctx context.Context, updateFn func(ctx context.Context, progress *MergeProgress), +) { + imt.mu.Lock() + defer imt.mu.Unlock() + updateFn(ctx, imt.mu.progress) +} - return &kv.KeyValue{ - Key: destKey, - Value: value, - }, deleted, nil +func newPeriodicProgressFlusher(settings *cluster.Settings) scexec.PeriodicProgressFlusher { + return scdeps.NewPeriodicProgressFlusher( + func() time.Duration { + return backfill.IndexBackfillCheckpointInterval.Get(&settings.SV) + }, + func() time.Duration { + // fractionInterval is copied from the logic in existing backfill code. + const fractionInterval = 10 * time.Second + return fractionInterval + }, + ) } diff --git a/pkg/sql/mvcc_backfiller_test.go b/pkg/sql/mvcc_backfiller_test.go new file mode 100644 index 000000000000..64b561b3a88e --- /dev/null +++ b/pkg/sql/mvcc_backfiller_test.go @@ -0,0 +1,465 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/startupmigrations" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// Test schema changes are retried and complete properly when there's an error +// in the merge step. This also checks that a mutation checkpoint reduces the +// size of the span operated on during a retry. +func TestIndexBackfillMergeRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStressRace(t, "TODO(ssd) test times outs under race") + + params, _ := tests.CreateTestServerParams() + + writesPopulated := false + var writesFn func() error + + populateTempIndexWithWrites := func(sp roachpb.Span) error { + if !writesPopulated { + if err := writesFn(); err != nil { + return err + } + writesPopulated = true + } + + return nil + } + + mergeChunk := 0 + var seenKey roachpb.Key + checkStartingKey := func(key roachpb.Key) error { + mergeChunk++ + if mergeChunk == 3 { + // Fail right before merging the 3rd chunk. + if rand.Intn(2) == 0 { + return context.DeadlineExceeded + } else { + errAmbiguous := &roachpb.AmbiguousResultError{} + return roachpb.NewError(errAmbiguous).GoError() + } + } + + if seenKey != nil && key != nil { + // Check that starting span keys are never reevaluated. + if seenKey.Compare(key) >= 0 { + t.Errorf("reprocessing starting with key %v, already seen starting key %v", key, seenKey) + } + } + + seenKey = key + return nil + } + + const maxValue = 2000 + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + WriteCheckpointInterval: time.Nanosecond, + AlwaysUpdateIndexBackfillDetails: true, + }, + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: populateTempIndexWithWrites, + BulkAdderFlushesEveryBatch: true, + SerializeIndexBackfillCreationAndIngestion: make(chan struct{}, 1), + IndexBackfillMergerTestingKnobs: &backfill.IndexBackfillMergerTestingKnobs{ + PushesProgressEveryChunk: true, + RunBeforeMergeChunk: checkStartingKey, + }, + }, + // Disable backfill migrations, we still need the jobs table migration. + StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{ + DisableBackfillMigrations: true, + }, + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k INT PRIMARY KEY, v INT); +`); err != nil { + t.Fatal(err) + } + + // TODO(rui): use testing hook instead of cluster setting once this value for + // the backfill merge is hooked up to testing hooks. + if _, err := sqlDB.Exec(fmt.Sprintf(`SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d;`, maxValue/5)); err != nil { + t.Fatal(err) + } + + writesFn = func() error { + if _, err := sqlDB.Exec(fmt.Sprintf(`UPDATE t.test SET v = v + %d WHERE k >= 0`, 2*maxValue)); err != nil { + return err + } + + if _, err := sqlDB.Exec(fmt.Sprintf(`UPDATE t.test SET v = v - %d WHERE k >= 0`, 2*maxValue)); err != nil { + return err + } + return nil + } + + // Bulk insert. + if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { + t.Fatal(err) + } + + addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, func() { + if _, err := sqlDB.Exec("SHOW JOBS WHEN COMPLETE (SELECT job_id FROM [SHOW JOBS])"); err != nil { + t.Fatal(err) + } + }) + require.True(t, mergeChunk > 3, fmt.Sprintf("mergeChunk: %d", mergeChunk)) + +} + +// Test index backfill merges are not affected by various operations that run +// simultaneously. +func TestRaceWithIndexBackfillMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStressRace(t, "TODO(ssd) test times outs under race") + + // protects mergeNotification, writesPopulated + var mu syncutil.Mutex + var mergeNotification chan struct{} + writesPopulated := false + + const numNodes = 5 + var chunkSize int64 = 100 + var maxValue = 4000 + if util.RaceEnabled { + // Race builds are a lot slower, so use a smaller number of rows and a + // correspondingly smaller chunk size. + chunkSize = 5 + maxValue = 200 + } + + params, _ := tests.CreateTestServerParams() + initMergeNotification := func() chan struct{} { + mu.Lock() + defer mu.Unlock() + mergeNotification = make(chan struct{}) + return mergeNotification + } + + notifyMerge := func() { + mu.Lock() + defer mu.Unlock() + if mergeNotification != nil { + // Close channel to notify that the backfill has started. + close(mergeNotification) + mergeNotification = nil + } + } + + var sqlWritesFn func() error + var idxName string + var splitTemporaryIndex func() error + + populateTempIndexWithWrites := func(_ roachpb.Span) error { + mu.Lock() + defer mu.Unlock() + if !writesPopulated { + if err := sqlWritesFn(); err != nil { + return err + } + + // Split the temporary index so that the merge is distributed. + if err := splitTemporaryIndex(); err != nil { + return err + } + + writesPopulated = true + } + + return nil + } + + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + BackfillChunkSize: chunkSize, + }, + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: populateTempIndexWithWrites, + IndexBackfillMergerTestingKnobs: &backfill.IndexBackfillMergerTestingKnobs{ + RunBeforeMergeChunk: func(key roachpb.Key) error { + notifyMerge() + return nil + }, + }, + }, + } + + tc := serverutils.StartNewTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: params, + }) + defer tc.Stopper().Stop(context.Background()) + kvDB := tc.Server(0).DB() + sqlDB := tc.ServerConn(0) + + splitTemporaryIndex = func() error { + tableDesc := desctestutils.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "public", "test") + tempIdx, err := findCorrespondingTemporaryIndex(tableDesc, idxName) + if err != nil { + return err + } + + var sps []sql.SplitPoint + for i := 0; i < numNodes; i++ { + sps = append(sps, sql.SplitPoint{TargetNodeIdx: i, Vals: []interface{}{maxValue/numNodes*i + 5*maxValue}}) + } + + return splitIndex(tc, tableDesc, tempIdx, sps) + } + + sqlWritesFn = func() error { + if _, err := sqlDB.Exec(fmt.Sprintf(`UPDATE t.test SET v = v + %d WHERE k >= 0`, 5*maxValue)); err != nil { + return err + } + + if _, err := sqlDB.Exec(fmt.Sprintf(`UPDATE t.test SET v = v - %d WHERE k >= 0`, 5*maxValue)); err != nil { + return err + } + + return nil + } + + // TODO(rui): use testing hook instead of cluster setting once this value for + // the backfill merge is hooked up to testing hooks. + if _, err := sqlDB.Exec(fmt.Sprintf(`SET CLUSTER SETTING bulkio.index_backfill.merge_batch_size = %d`, chunkSize)); err != nil { + t.Fatal(err) + } + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k INT PRIMARY KEY, v INT, x DECIMAL DEFAULT (DECIMAL '1.4')); +`); err != nil { + t.Fatal(err) + } + + // Bulk insert. + if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + // number of keys == 2 * number of rows; 1 column family and 1 index entry + // for each row. + if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, 1, maxValue); err != nil { + t.Fatal(err) + } + if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil { + t.Fatal(err) + } + + // Run some index schema changes with operations. + idxName = "foo" + runSchemaChangeWithOperations( + t, + sqlDB, + kvDB, + "CREATE UNIQUE INDEX foo ON t.test (v)", + maxValue, + 2, + initMergeNotification(), + true, + ) + + idxName = "bar" + writesPopulated = false + // Add STORING index (that will have non-nil values). + runSchemaChangeWithOperations( + t, + sqlDB, + kvDB, + "CREATE INDEX bar ON t.test(k) STORING (v)", + maxValue, + 3, + initMergeNotification(), + true, + ) + + // Verify that the index foo over v is consistent/ + rows, err := sqlDB.Query(`SELECT v, x from t.test@foo ORDER BY v`) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + count := 0 + for ; rows.Next(); count++ { + var val int + var x float64 + if err := rows.Scan(&val, &x); err != nil { + t.Errorf("row %d scan failed: %s", count, err) + continue + } + if count != val { + t.Errorf("e = %d, v = %d", count, val) + } + if x != 1.4 { + t.Errorf("e = %f, v = %f", 1.4, x) + } + } + if err := rows.Err(); err != nil { + t.Fatal(err) + } + eCount := maxValue + 1 + if eCount != count { + t.Fatalf("read the wrong number of rows: e = %d, v = %d", eCount, count) + } +} + +func findCorrespondingTemporaryIndex( + tableDesc catalog.TableDescriptor, idxName string, +) (catalog.Index, error) { + idx := -1 + for i, mut := range tableDesc.AllMutations() { + if mut.AsIndex() != nil && mut.AsIndex().GetName() == idxName { + idx = i + } + } + + if idx == -1 { + return nil, errors.Errorf("could not find an index mutation with name %s", idxName) + } + + if idx+1 < len(tableDesc.AllMutations()) { + tempIdxMut := tableDesc.AllMutations()[idx+1] + if tempIdxMut.AsIndex() != nil && tempIdxMut.AsIndex().IndexDesc().UseDeletePreservingEncoding { + return tempIdxMut.AsIndex(), nil + } + } + + return nil, errors.Errorf("could not find temporary index mutation for index %s", idxName) +} + +type rangeAndKT struct { + Range roachpb.RangeDescriptor + KT serverutils.KeyAndTargets +} + +func splitIndex( + tc serverutils.TestClusterInterface, + desc catalog.TableDescriptor, + index catalog.Index, + sps []sql.SplitPoint, +) error { + if tc.ReplicationMode() != base.ReplicationManual { + return errors.Errorf("splitIndex called on a test cluster that was not in manual replication mode") + } + + rkts := make(map[roachpb.RangeID]rangeAndKT) + for _, sp := range sps { + + pik, err := randgen.TestingMakeSecondaryIndexKey(desc, index, keys.SystemSQLCodec, sp.Vals...) + if err != nil { + return err + } + + rangeDesc, err := tc.LookupRange(pik) + if err != nil { + return err + } + + holder, err := tc.FindRangeLeaseHolder(rangeDesc, nil) + if err != nil { + return err + } + + _, rightRange, err := tc.Server(int(holder.NodeID) - 1).SplitRange(pik) + if err != nil { + return err + } + + rightRangeStartKey := rightRange.StartKey.AsRawKey() + target := tc.Target(sp.TargetNodeIdx) + + rkts[rightRange.RangeID] = rangeAndKT{ + rightRange, + serverutils.KeyAndTargets{StartKey: rightRangeStartKey, Targets: []roachpb.ReplicationTarget{target}}} + } + + var kts []serverutils.KeyAndTargets + for _, rkt := range rkts { + kts = append(kts, rkt.KT) + } + var descs []roachpb.RangeDescriptor + for _, kt := range kts { + desc, err := tc.AddVoters(kt.StartKey, kt.Targets...) + if err != nil { + if testutils.IsError(err, "trying to add a voter to a store that already has a VOTER_FULL") { + desc, err = tc.LookupRange(kt.StartKey) + if err != nil { + return err + } + } else { + return err + } + } + + descs = append(descs, desc) + } + + for _, desc := range descs { + rkt, ok := rkts[desc.RangeID] + if !ok { + continue + } + + for _, target := range rkt.KT.Targets { + if err := tc.TransferRangeLease(desc, target); err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/sql/randgen/schema.go b/pkg/sql/randgen/schema.go index 849da14ef6f6..6ebd67f03ee1 100644 --- a/pkg/sql/randgen/schema.go +++ b/pkg/sql/randgen/schema.go @@ -535,3 +535,60 @@ func TestingMakePrimaryIndexKeyForTenant( } return key, nil } + +// TestingMakeSecondaryIndexKey creates a key prefix that corresponds to +// a secondary index; it is intended for tests. +// +// It is exported because it is used by tests outside of this package. +// +// The value types must match the secondary key columns, +// supported types are: - Datum +// - bool (converts to DBool) +// - int (converts to DInt) +// - string (converts to DString) +func TestingMakeSecondaryIndexKey( + desc catalog.TableDescriptor, index catalog.Index, codec keys.SQLCodec, vals ...interface{}, +) (roachpb.Key, error) { + if len(vals) > index.NumKeyColumns() { + return nil, errors.Errorf("got %d values, index %s has %d columns", len(vals), index.GetName(), index.NumKeyColumns()) + } + + datums := make([]tree.Datum, len(vals)) + for i, v := range vals { + switch v := v.(type) { + case bool: + datums[i] = tree.MakeDBool(tree.DBool(v)) + case int: + datums[i] = tree.NewDInt(tree.DInt(v)) + case string: + datums[i] = tree.NewDString(v) + case tree.Datum: + datums[i] = v + default: + return nil, errors.Errorf("unexpected value type %T", v) + } + // Check that the value type matches. + colID := index.GetKeyColumnID(i) + col, _ := desc.FindColumnWithID(colID) + if col != nil && col.Public() { + colTyp := datums[i].ResolvedType() + if t := colTyp.Family(); t != col.GetType().Family() { + return nil, errors.Errorf("column %d of type %s, got value of type %s", i, col.GetType().Family(), t) + } + } + } + // Create the ColumnID to index in datums slice map needed by + // MakeIndexKeyPrefix. + var colIDToRowIndex catalog.TableColMap + for i := range vals { + colIDToRowIndex.Set(index.GetKeyColumnID(i), i) + } + + keyPrefix := rowenc.MakeIndexKeyPrefix(codec, desc.GetID(), index.GetID()) + key, _, err := rowenc.EncodeIndexKey(desc, index, colIDToRowIndex, datums, keyPrefix) + + if err != nil { + return nil, err + } + return key, nil +} diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 11768728cb05..660e692da435 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -366,6 +367,12 @@ func NewProcessor( } return NewStreamIngestionFrontierProcessor(flowCtx, processorID, *core.StreamIngestionFrontier, inputs[0], post, outputs[0]) } + if core.IndexBackfillMerger != nil { + if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { + return nil, err + } + return backfill.NewIndexBackfillMerger(flowCtx, *core.IndexBackfillMerger, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %q", core) } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index d2287a240a10..3d8f26bf61c9 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1063,7 +1063,7 @@ COMMIT; // Add an index and check that it succeeds. func addIndexSchemaChange( - t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, maxValue int, numKeysPerRow int, + t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, maxValue int, numKeysPerRow int, waitFn func(), ) { if _, err := sqlDB.Exec("CREATE UNIQUE INDEX foo ON t.test (v)"); err != nil { t.Fatal(err) @@ -1097,6 +1097,10 @@ func addIndexSchemaChange( ctx := context.Background() + if waitFn != nil { + waitFn() + } + if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, numKeysPerRow, maxValue); err != nil { t.Fatal(err) } @@ -1302,7 +1306,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); t.Fatal(err) } - addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2) + addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, nil) currChunk = 0 seenSpan = roachpb.Span{} @@ -1456,7 +1460,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); t.Fatal(err) } - addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2) + addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2, nil) if num := atomic.SwapUint32(&numBackfills, 0); num != 2 { t.Fatalf("expected %d backfills, but saw %d", 2, num) } diff --git a/pkg/sql/schemachanger/scdeps/periodic_progress_flusher.go b/pkg/sql/schemachanger/scdeps/periodic_progress_flusher.go index 2a2f0d3d4115..aaf05d710899 100644 --- a/pkg/sql/schemachanger/scdeps/periodic_progress_flusher.go +++ b/pkg/sql/schemachanger/scdeps/periodic_progress_flusher.go @@ -21,22 +21,35 @@ import ( "golang.org/x/sync/errgroup" ) -func newPeriodicProgressFlusher(settings *cluster.Settings) scexec.PeriodicProgressFlusher { - clock := timeutil.DefaultTimeSource{} - getCheckpointInterval := func() time.Duration { - return backfill.IndexBackfillCheckpointInterval.Get(&settings.SV) - } - // fractionInterval is copied from the logic in existing backfill code. - // TODO(ajwerner): Add a cluster setting to control this. - const fractionInterval = 10 * time.Second - getFractionInterval := func() time.Duration { return fractionInterval } +// NewPeriodicProgressFlusher returns a PeriodicProgressFlusher that +// will flush at the given intervals. +func NewPeriodicProgressFlusher( + checkpointIntervalFn func() time.Duration, fractionIntervalFn func() time.Duration, +) scexec.PeriodicProgressFlusher { return &periodicProgressFlusher{ - clock: clock, - checkpointInterval: getCheckpointInterval, - fractionInterval: getFractionInterval, + clock: timeutil.DefaultTimeSource{}, + checkpointInterval: checkpointIntervalFn, + fractionInterval: fractionIntervalFn, } } +func newPeriodicProgressFlusherForIndexBackfill( + settings *cluster.Settings, +) scexec.PeriodicProgressFlusher { + return NewPeriodicProgressFlusher( + func() time.Duration { + return backfill.IndexBackfillCheckpointInterval.Get(&settings.SV) + + }, + func() time.Duration { + // fractionInterval is copied from the logic in existing backfill code. + // TODO(ajwerner): Add a cluster setting to control this. + const fractionInterval = 10 * time.Second + return fractionInterval + }, + ) +} + type periodicProgressFlusher struct { clock timeutil.TimeSource checkpointInterval, fractionInterval func() time.Duration diff --git a/pkg/sql/schemachanger/scdeps/run_deps.go b/pkg/sql/schemachanger/scdeps/run_deps.go index fdcd85215933..e49a44ea65c2 100644 --- a/pkg/sql/schemachanger/scdeps/run_deps.go +++ b/pkg/sql/schemachanger/scdeps/run_deps.go @@ -113,7 +113,7 @@ func (d *jobExecutionDeps) WithTxnInJob(ctx context.Context, fn scrun.JobTxnFunc d.codec, pl.GetNewSchemaChange().BackfillProgress, ), ), - periodicProgressFlusher: newPeriodicProgressFlusher(d.settings), + periodicProgressFlusher: newPeriodicProgressFlusherForIndexBackfill(d.settings), statements: d.statements, user: pl.UsernameProto.Decode(), clock: NewConstantClock(timeutil.FromUnixMicros(pl.StartedMicros)), diff --git a/pkg/sql/table.go b/pkg/sql/table.go index d2094c887972..f92ef203083d 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -136,11 +136,18 @@ func (p *planner) createOrUpdateSchemaChangeJob( } span := tableDesc.PrimaryIndexSpan(p.ExecCfg().Codec) for i := len(tableDesc.ClusterVersion.Mutations) + len(spanList); i < len(tableDesc.Mutations); i++ { - spanList = append(spanList, - jobspb.ResumeSpanList{ - ResumeSpans: []roachpb.Span{span}, - }, - ) + var resumeSpans []roachpb.Span + mut := tableDesc.Mutations[i] + if mut.GetIndex() != nil && mut.GetIndex().UseDeletePreservingEncoding { + // Resume spans for merging the delete preserving temporary indexes are + // the spans of the temporary indexes. + resumeSpans = []roachpb.Span{tableDesc.IndexSpan(p.ExecCfg().Codec, mut.GetIndex().ID)} + } else { + resumeSpans = []roachpb.Span{span} + } + spanList = append(spanList, jobspb.ResumeSpanList{ + ResumeSpans: resumeSpans, + }) } if !recordExists {