From 5b2198f8f7177401c5bbaa21f29b281f6085748f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 13 Jan 2021 12:26:28 -0500 Subject: [PATCH] sql: deal with computed and default column generation in index backfiller This commit was mostly motivated by work on the new schema changer to enable the behavior described in #47989, however, it also turns out to be a prerequisite of the work to use virtual computed columns in secondary indexes. Given we haven't released virtual computed columns, I'm going to omit a release not for this PR. The basic idea is that we need to track dependencies for computed columns to make sure they are retrieved. Default expressions need to be evaluated first. Much of the code is testing. Release note: None --- pkg/sql/BUILD.bazel | 3 + pkg/sql/backfill/backfill.go | 206 +++++++-- pkg/sql/catalog/schemaexpr/computed_column.go | 50 ++- pkg/sql/indexbackfiller_test.go | 403 ++++++++++++++++++ pkg/sql/row/row_converter.go | 3 +- pkg/sql/schema_changer.go | 9 +- pkg/sql/schema_changer_helpers_test.go | 37 ++ pkg/sql/schema_changer_test.go | 4 +- 8 files changed, 660 insertions(+), 55 deletions(-) create mode 100644 pkg/sql/schema_changer_helpers_test.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 2b6645a9b902..b9d03c25a359 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -439,6 +439,7 @@ go_test( "run_control_test.go", "scan_test.go", "scatter_test.go", + "schema_changer_helpers_test.go", "schema_changer_test.go", "scrub_test.go", "sequence_test.go", @@ -495,6 +496,7 @@ go_test( "//pkg/server/status/statuspb:statuspb_go_proto", "//pkg/server/telemetry", "//pkg/settings/cluster", + "//pkg/sql/backfill", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", @@ -586,6 +588,7 @@ go_test( "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@in_gopkg_yaml_v2//:yaml_v2", + "@org_golang_google_protobuf//proto", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 81f2d5c3cf3b..44a640e4074c 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -167,9 +167,10 @@ func (cb *ColumnBackfiller) InitForLocalUse( if err != nil { return err } - computedExprs, err := schemaexpr.MakeComputedExprs( + computedExprs, _, err := schemaexpr.MakeComputedExprs( ctx, cb.added, + desc.GetPublicColumns(), desc, tree.NewUnqualifiedTableName(tree.Name(desc.Name)), evalCtx, @@ -210,9 +211,10 @@ func (cb *ColumnBackfiller) InitForDistributedUse( if err != nil { return err } - computedExprs, err = schemaexpr.MakeComputedExprs( + computedExprs, _, err = schemaexpr.MakeComputedExprs( ctx, cb.added, + desc.GetPublicColumns(), desc, tree.NewUnqualifiedTableName(tree.Name(desc.Name)), evalCtx, @@ -396,11 +398,27 @@ type IndexBackfiller struct { types []*types.T rowVals tree.Datums evalCtx *tree.EvalContext - cols []descpb.ColumnDescriptor + + // cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in + // the descriptor. + cols []descpb.ColumnDescriptor + + // addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of + // this index which are not computed. + addedCols []descpb.ColumnDescriptor + + // computedCols are the columns in this index which are computed and do + // not have concrete values in the source index. This is virtual computed + // columns and stored computed columns which are non-public. + computedCols []descpb.ColumnDescriptor + + // Map of columns which need to be evaluated to their expressions. + colExprs map[descpb.ColumnID]tree.TypedExpr // predicates is a map of indexes to partial index predicate expressions. It // includes entries for partial indexes only. predicates map[descpb.IndexID]tree.TypedExpr + // indexesToEncode is a list of indexes to encode entries for a given row. // It is a field of IndexBackfiller to avoid allocating a slice for each row // backfilled. @@ -437,26 +455,108 @@ func (ib *IndexBackfiller) InitForLocalUse( // Initialize ib.added. valNeededForCol := ib.initIndexes(desc) + predicates, colExprs, referencedColumns, err := constructExprs( + ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx, + ) + if err != nil { + return err + } + + // Add the columns referenced in the predicate to valNeededForCol so that + // columns necessary to evaluate the predicate expression are fetched. + referencedColumns.ForEach(func(col descpb.ColumnID) { + valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) + }) + + return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) +} + +// constructExprs is a helper to construct the index and column expressions +// required for an index backfill. It also returns the set of columns referenced +// by any of these exprs. +// +// The cols argument is the full set of cols in the table (including those being +// added). The addedCols argument is the set of non-public, non-computed +// columns. The computedCols argument is the set of computed columns in the +// index. +func constructExprs( + ctx context.Context, + desc catalog.TableDescriptor, + addedIndexes []*descpb.IndexDescriptor, + cols, addedCols, computedCols []descpb.ColumnDescriptor, + evalCtx *tree.EvalContext, + semaCtx *tree.SemaContext, +) ( + predicates map[descpb.IndexID]tree.TypedExpr, + colExprs map[descpb.ColumnID]tree.TypedExpr, + referencedColumns catalog.TableColSet, + _ error, +) { // Convert any partial index predicate strings into expressions. predicates, predicateRefColIDs, err := schemaexpr.MakePartialIndexExprs( ctx, - ib.added, - ib.cols, + addedIndexes, + cols, desc, evalCtx, semaCtx, ) if err != nil { - return err + return nil, nil, catalog.TableColSet{}, err } - // Add the columns referenced in the predicate to valNeededForCol so that - // columns necessary to evaluate the predicate expression are fetched. - predicateRefColIDs.ForEach(func(col descpb.ColumnID) { - valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) - }) + // Determine the exprs for newly added, non-computed columns. + defaultExprs, err := schemaexpr.MakeDefaultExprs( + ctx, addedCols, &transform.ExprTransformContext{}, evalCtx, semaCtx, + ) + if err != nil { + return nil, nil, catalog.TableColSet{}, err + } - return ib.init(evalCtx, predicates, valNeededForCol, desc, mon) + // TODO(ajwerner): Rethink this table name. + tn := tree.NewUnqualifiedTableName(tree.Name(desc.GetName())) + computedExprs, computedExprRefColIDs, err := schemaexpr.MakeComputedExprs( + ctx, + computedCols, + cols, + desc, + tn, + evalCtx, + semaCtx, + ) + if err != nil { + return nil, nil, catalog.TableColSet{}, err + } + + numColExprs := len(addedCols) + len(computedCols) + colExprs = make(map[descpb.ColumnID]tree.TypedExpr, numColExprs) + var addedColSet catalog.TableColSet + for i := range defaultExprs { + id := addedCols[i].ID + colExprs[id] = defaultExprs[i] + addedColSet.Add(id) + } + for i := range computedCols { + id := computedCols[i].ID + colExprs[id] = computedExprs[i] + } + + // Ensure that only existing columns are added to the needed set. Otherwise + // the fetcher may complain that the columns don't exist. There's a somewhat + // subtle invariant that if any dependencies exist between computed columns + // and default values that the computed column be a later column and thus the + // default value will have been populated. Computed columns are not permitted + // to reference each other. + addToReferencedColumns := func(cols catalog.TableColSet) { + cols.ForEach(func(col descpb.ColumnID) { + if !addedColSet.Contains(col) { + referencedColumns.Add(col) + } + }) + } + addToReferencedColumns(predicateRefColIDs) + addToReferencedColumns(computedExprRefColIDs) + return predicates, colExprs, referencedColumns, nil } // InitForDistributedUse initializes an IndexBackfiller for use as part of a @@ -475,29 +575,27 @@ func (ib *IndexBackfiller) InitForDistributedUse( evalCtx := flowCtx.NewEvalCtx() var predicates map[descpb.IndexID]tree.TypedExpr - var predicateRefColIDs catalog.TableColSet + var colExprs map[descpb.ColumnID]tree.TypedExpr + var referencedColumns catalog.TableColSet // Install type metadata in the target descriptors, as well as resolve any // user defined types in partial index predicate expressions. - if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn) // Hydrate all the types present in the table. - if err := typedesc.HydrateTypesInTableDescriptor(ctx, desc.TableDesc(), resolver); err != nil { + if err = typedesc.HydrateTypesInTableDescriptor( + ctx, desc.TableDesc(), resolver, + ); err != nil { return err } // Set up a SemaContext to type check the default and computed expressions. semaCtx := tree.MakeSemaContext() semaCtx.TypeResolver = resolver - // Convert any partial index predicate strings into expressions. - var err error - predicates, predicateRefColIDs, err = - schemaexpr.MakePartialIndexExprs(ctx, ib.added, ib.cols, desc, evalCtx, &semaCtx) - if err != nil { - return err - } - - return nil + predicates, colExprs, referencedColumns, err = constructExprs( + ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, &semaCtx, + ) + return err }); err != nil { return err } @@ -508,11 +606,11 @@ func (ib *IndexBackfiller) InitForDistributedUse( // Add the columns referenced in the predicate to valNeededForCol so that // columns necessary to evaluate the predicate expression are fetched. - predicateRefColIDs.ForEach(func(col descpb.ColumnID) { + referencedColumns.ForEach(func(col descpb.ColumnID) { valNeededForCol.Add(ib.colIdxMap.GetDefault(col)) }) - return ib.init(evalCtx, predicates, valNeededForCol, desc, mon) + return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon) } // Close releases the resources used by the IndexBackfiller. @@ -546,7 +644,14 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6 // initCols is a helper to populate column metadata of an IndexBackfiller. It // populates the cols and colIdxMap fields. func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) { - ib.cols = desc.Columns + for i := range desc.Columns { + col := &desc.Columns[i] + ib.cols = append(ib.cols, *col) + if col.IsComputed() && col.Virtual { + ib.computedCols = append(ib.computedCols, *col) + } + } + ib.cols = append([]descpb.ColumnDescriptor(nil), desc.Columns...) // If there are ongoing mutations, add columns that are being added and in // the DELETE_AND_WRITE_ONLY state. @@ -558,6 +663,11 @@ func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) { m.Direction == descpb.DescriptorMutation_ADD && m.State == descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY { ib.cols = append(ib.cols, *column) + if column.IsComputed() { + ib.computedCols = append(ib.computedCols, *column) + } else { + ib.addedCols = append(ib.addedCols, *column) + } } } } @@ -586,8 +696,11 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe ib.added = append(ib.added, idx) for i := range ib.cols { id := ib.cols[i].ID - if idx.ContainsColumnID(id) || - idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding { + idxContainsColumn := idx.ContainsColumnID(id) + isPrimaryIndex := idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding + if (idxContainsColumn || isPrimaryIndex) && + !ib.cols[i].Virtual && + i < len(desc.Columns) { valNeededForCol.Add(i) } } @@ -601,12 +714,14 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe func (ib *IndexBackfiller) init( evalCtx *tree.EvalContext, predicateExprs map[descpb.IndexID]tree.TypedExpr, + colExprs map[descpb.ColumnID]tree.TypedExpr, valNeededForCol util.FastIntSet, desc *tabledesc.Immutable, mon *mon.BytesMonitor, ) error { ib.evalCtx = evalCtx ib.predicates = predicateExprs + ib.colExprs = colExprs // Initialize a list of index descriptors to encode entries for. If there // are no partial indexes, the list is equivalent to the list of indexes @@ -709,6 +824,28 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( } memUsedPerChunk += indexEntriesPerRowInitialBufferSize buffer := make([]rowenc.IndexEntry, len(ib.added)) + evaluateExprs := func(cols []descpb.ColumnDescriptor) error { + for i := range cols { + colID := cols[i].ID + texpr, ok := ib.colExprs[colID] + if !ok { + continue + } + val, err := texpr.Eval(ib.evalCtx) + if err != nil { + return err + } + colIdx, ok := ib.colIdxMap.Get(colID) + if !ok { + return errors.AssertionFailedf( + "failed to find index for column %d in %d", + colID, tableDesc.GetID(), + ) + } + ib.rowVals[colIdx] = val + } + return nil + } for i := int64(0); i < chunkSize; i++ { encRow, _, _, err := ib.fetcher.NextRow(ctx) if err != nil { @@ -726,6 +863,17 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( iv.CurSourceRow = ib.rowVals + // First populate default values, then populate computed expressions which + // may reference default values. + if len(ib.colExprs) > 0 { + if err := evaluateExprs(ib.addedCols); err != nil { + return nil, nil, 0, err + } + if err := evaluateExprs(ib.computedCols); err != nil { + return nil, nil, 0, err + } + } + // If there are any partial indexes being added, make a list of the // indexes that the current row should be added to. if len(ib.predicates) > 0 { diff --git a/pkg/sql/catalog/schemaexpr/computed_column.go b/pkg/sql/catalog/schemaexpr/computed_column.go index 49158b27fdc2..1081e5513e48 100644 --- a/pkg/sql/catalog/schemaexpr/computed_column.go +++ b/pkg/sql/catalog/schemaexpr/computed_column.go @@ -168,41 +168,42 @@ func (v *ComputedColumnValidator) ValidateNoDependents(col *descpb.ColumnDescrip // MakeComputedExprs returns a slice of the computed expressions for the // slice of input column descriptors, or nil if none of the input column -// descriptors have computed expressions. +// descriptors have computed expressions. The caller provides the set of +// sourceColumns to which the expr may refer. // // The length of the result slice matches the length of the input column // descriptors. For every column that has no computed expression, a NULL // expression is reported. // -// Note that the order of columns is critical. Expressions cannot reference -// columns that come after them in cols. +// Note that the order of input is critical. Expressions cannot reference +// columns that come after them in input. func MakeComputedExprs( ctx context.Context, - cols []descpb.ColumnDescriptor, + input, sourceColumns []descpb.ColumnDescriptor, tableDesc catalog.TableDescriptor, tn *tree.TableName, evalCtx *tree.EvalContext, semaCtx *tree.SemaContext, -) ([]tree.TypedExpr, error) { +) (_ []tree.TypedExpr, refColIDs catalog.TableColSet, _ error) { // Check to see if any of the columns have computed expressions. If there // are none, we don't bother with constructing the map as the expressions // are all NULL. haveComputed := false - for i := range cols { - if cols[i].IsComputed() { + for i := range input { + if input[i].IsComputed() { haveComputed = true break } } if !haveComputed { - return nil, nil + return nil, catalog.TableColSet{}, nil } // Build the computed expressions map from the parsed statement. - computedExprs := make([]tree.TypedExpr, 0, len(cols)) - exprStrings := make([]string, 0, len(cols)) - for i := range cols { - col := &cols[i] + computedExprs := make([]tree.TypedExpr, 0, len(input)) + exprStrings := make([]string, 0, len(input)) + for i := range input { + col := &input[i] if col.IsComputed() { exprStrings = append(exprStrings, *col.ComputeExpr) } @@ -210,36 +211,45 @@ func MakeComputedExprs( exprs, err := parser.ParseExprs(exprStrings) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } - nr := newNameResolver(evalCtx, tableDesc.GetID(), tn, columnDescriptorsToPtrs(tableDesc.GetPublicColumns())) + nr := newNameResolver(evalCtx, tableDesc.GetID(), tn, columnDescriptorsToPtrs(sourceColumns)) nr.addIVarContainerToSemaCtx(semaCtx) var txCtx transform.ExprTransformContext compExprIdx := 0 - for i := range cols { - col := &cols[i] + for i := range input { + col := &input[i] if !col.IsComputed() { computedExprs = append(computedExprs, tree.DNull) nr.addColumn(col) continue } + + // Collect all column IDs that are referenced in the partial index + // predicate expression. + colIDs, err := ExtractColumnIDs(tableDesc, exprs[compExprIdx]) + if err != nil { + return nil, refColIDs, err + } + refColIDs.UnionWith(colIDs) + expr, err := nr.resolveNames(exprs[compExprIdx]) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } typedExpr, err := tree.TypeCheck(ctx, expr, semaCtx, col.Type) if err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } if typedExpr, err = txCtx.NormalizeExpr(evalCtx, typedExpr); err != nil { - return nil, err + return nil, catalog.TableColSet{}, err } computedExprs = append(computedExprs, typedExpr) compExprIdx++ nr.addColumn(col) } - return computedExprs, nil + return computedExprs, refColIDs, nil } diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index cb08813de91c..091a8932de97 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -14,15 +14,41 @@ import ( "context" gosql "database/sql" "sync" + "sync/atomic" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/sqlmigrations" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) // TestIndexBackfiller tests the scenarios described in docs/tech-notes/index-backfill.md @@ -149,3 +175,380 @@ func queryPairs(t *testing.T, sqlDB *gosql.DB, query string) []pair { } return ret } + +// TestIndexBackfillerComputedAndGeneratedColumns tests that the index +// backfiller support backfilling columns with default values or computed +// expressions which are being backfilled. +// +// This is a remarkably low-level test. It manually mucks with the table +// descriptor to set up an index backfill and then manually mucks sets up and +// plans the index backfill, then it mucks with the table descriptor more +// and ensures that the data was backfilled properly. +func TestIndexBackfillerComputedAndGeneratedColumns(t *testing.T) { + defer leaktest.AfterTest(t)() + + // A test case exists to exercise the behavior of an index backfill. + // The case gets to do arbitrary things to the table (which is created with + // the specified tableName in setupSQL). The idea is that there will be a + // mutation on the table with mutation ID 1 that represents an index backfill. + // The associated job (which is created by the test harness for the first + // mutation), will be blocked from running. Instead, the test will manually + // run the index backfiller via a hook exposed only for testing. + // + // After the backfill is run, the index (specified by its ID) is scanned and + // the contents are compared with expectedContents. + type indexBackfillTestCase struct { + // name of the test case. + name string + + // SQL statements to run to setup the table. + setupSQL string + tableName string + + // setupDesc should mutate the descriptor such that the mutation with + // id 1 contains an index backfill. + setupDesc func(t *testing.T, mut *tabledesc.Mutable) + indexToBackfill descpb.IndexID + expectedContents [][]string + } + + indexBackfillerTestCases := []indexBackfillTestCase{ + // This tests a secondary index which uses a virtual computed column in its + // key. + { + name: "virtual computed column in key", + setupSQL: ` +SET experimental_enable_virtual_columns = true; +CREATE TABLE foo (i INT PRIMARY KEY, k INT, v INT AS (i*i + k) VIRTUAL); +INSERT INTO foo VALUES (1, 2), (2, 3), (3, 4); +`, + tableName: "foo", + indexToBackfill: 2, + // Note that the results are ordered by column ID. + expectedContents: [][]string{ + {"1", "3"}, + {"2", "7"}, + {"3", "13"}, + }, + setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + indexToBackfill := descpb.IndexDescriptor{ + Name: "virtual_column_backed_index", + ID: mut.NextIndexID, + Unique: true, + Version: descpb.EmptyArraysInInvertedIndexesVersion, + ColumnNames: []string{ + mut.Columns[2].Name, + }, + ColumnDirections: []descpb.IndexDescriptor_Direction{ + descpb.IndexDescriptor_ASC, + }, + ColumnIDs: []descpb.ColumnID{ + mut.Columns[2].ID, + }, + ExtraColumnIDs: []descpb.ColumnID{ + mut.Columns[0].ID, + }, + Type: descpb.IndexDescriptor_FORWARD, + EncodingType: descpb.SecondaryIndexEncoding, + } + mut.NextIndexID++ + require.NoError(t, mut.AddIndexMutation( + &indexToBackfill, descpb.DescriptorMutation_ADD, + )) + }, + }, + // This test will inject a new primary index and perform a primary key swap + // where the new primary index has two new stored columns not in the existing + // primary index. + { + name: "default and generated column referencing it in new primary index", + setupSQL: ` +CREATE TABLE foo (i INT PRIMARY KEY); +INSERT INTO foo VALUES (1), (10), (100); +`, + tableName: "foo", + indexToBackfill: 2, + expectedContents: [][]string{ + {"1", "42", "43"}, + {"10", "42", "52"}, + {"100", "42", "142"}, + }, + setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + columnWithDefault := descpb.ColumnDescriptor{ + Name: "def", + ID: mut.NextColumnID, + Type: types.Int, + Nullable: false, + DefaultExpr: proto.String("42"), + Hidden: false, + PGAttributeNum: uint32(mut.NextColumnID), + } + mut.NextColumnID++ + mut.AddColumnMutation(&columnWithDefault, descpb.DescriptorMutation_ADD) + // Cheat and jump right to DELETE_AND_WRITE_ONLY. + mut.Mutations[len(mut.Mutations)-1].State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + computedColumnNotInPrimaryIndex := descpb.ColumnDescriptor{ + Name: "comp", + ID: mut.NextColumnID, + Type: types.Int, + Nullable: false, + ComputeExpr: proto.String("i + def"), + Hidden: false, + PGAttributeNum: uint32(mut.NextColumnID), + } + mut.NextColumnID++ + mut.AddColumnMutation(&computedColumnNotInPrimaryIndex, descpb.DescriptorMutation_ADD) + // Cheat and jump right to DELETE_AND_WRITE_ONLY. + mut.Mutations[len(mut.Mutations)-1].State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + + mut.Families[0].ColumnIDs = append(mut.Families[0].ColumnIDs, + columnWithDefault.ID, + computedColumnNotInPrimaryIndex.ID) + mut.Families[0].ColumnNames = append(mut.Families[0].ColumnNames, + columnWithDefault.Name, + computedColumnNotInPrimaryIndex.Name) + + indexToBackfill := descpb.IndexDescriptor{ + Name: "new_primary_index", + ID: mut.NextIndexID, + Unique: true, + Version: descpb.EmptyArraysInInvertedIndexesVersion, + ColumnNames: []string{ + mut.Columns[0].Name, + }, + ColumnDirections: []descpb.IndexDescriptor_Direction{ + descpb.IndexDescriptor_ASC, + }, + StoreColumnNames: []string{ + columnWithDefault.Name, + computedColumnNotInPrimaryIndex.Name, + }, + ColumnIDs: []descpb.ColumnID{ + mut.Columns[0].ID, + }, + ExtraColumnIDs: nil, + StoreColumnIDs: []descpb.ColumnID{ + columnWithDefault.ID, + computedColumnNotInPrimaryIndex.ID, + }, + Type: descpb.IndexDescriptor_FORWARD, + EncodingType: descpb.PrimaryIndexEncoding, + } + mut.NextIndexID++ + require.NoError(t, mut.AddIndexMutation( + &indexToBackfill, descpb.DescriptorMutation_ADD, + )) + mut.AddPrimaryKeySwapMutation(&descpb.PrimaryKeySwap{ + OldPrimaryIndexId: 1, + NewPrimaryIndexId: 2, + }) + }, + }, + } + + // fetchIndex fetches the contents of an a table index returning the results + // as datums. The datums will correspond to each of the columns stored in the + // index, ordered by column ID. + fetchIndex := func( + ctx context.Context, t *testing.T, txn *kv.Txn, table *tabledesc.Mutable, indexID descpb.IndexID, + ) []tree.Datums { + t.Helper() + var fetcher row.Fetcher + var alloc rowenc.DatumAlloc + + mm := mon.MakeStandaloneBudget(1 << 30) + idx, err := table.FindIndexWithID(indexID) + colIdxMap := table.ColumnIdxMap() + var valsNeeded util.FastIntSet + if idx.Primary() { + _ = table.ForeachPublicColumn(func(column *descpb.ColumnDescriptor) error { + if !column.Virtual { + valsNeeded.Add(colIdxMap.GetDefault(column.ID)) + } + return nil + }) + } else { + _ = idx.ForEachColumnID(func(id descpb.ColumnID) error { + valsNeeded.Add(colIdxMap.GetDefault(id)) + return nil + }) + } + require.NoError(t, err) + spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, indexID)} + const reverse = false + require.NoError(t, fetcher.Init( + ctx, + keys.SystemSQLCodec, + reverse, + descpb.ScanLockingStrength_FOR_NONE, + descpb.ScanLockingWaitPolicy_BLOCK, + false, + &alloc, + mm.Monitor(), + row.FetcherTableArgs{ + Spans: spans, + Desc: table, + Index: idx.IndexDesc(), + ColIdxMap: colIdxMap, + Cols: table.Columns, + ValNeededForCol: valsNeeded, + IsSecondaryIndex: !idx.Primary(), + }, + )) + + require.NoError(t, fetcher.StartScan( + ctx, txn, spans, false, 0, true, false, /* forceProductionBatchSize */ + )) + var rows []tree.Datums + for { + datums, _, _, err := fetcher.NextRowDecoded(ctx) + require.NoError(t, err) + if datums == nil { + break + } + // Copy the datums out as the slice is reused internally. + row := make(tree.Datums, 0, valsNeeded.Len()) + for i := range datums { + if valsNeeded.Contains(i) { + row = append(row, datums[i]) + } + } + rows = append(rows, row) + } + return rows + } + + datumSliceToStrMatrix := func(rows []tree.Datums) [][]string { + res := make([][]string, len(rows)) + for i, row := range rows { + rowStrs := make([]string, len(row)) + for j, d := range row { + rowStrs[j] = d.String() + } + res[i] = rowStrs + } + return res + } + + // See the comment on indexBackfillTestCase for the behavior of run. + run := func(t *testing.T, test indexBackfillTestCase) { + ctx := context.Background() + + // Ensure the job doesn't actually run. The code below will handle running + // the index backfill. + blockChan := make(chan struct{}) + var jobToBlock atomic.Value + jobToBlock.Store(int64(0)) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(jobID int64) error { + if jobID == jobToBlock.Load().(int64) { + <-blockChan + return errors.New("boom") + } + return nil + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer close(blockChan) + + // Run the initial setupSQL. + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, test.setupSQL) + + // Fetch the descriptor ID for the relevant table. + var tableID descpb.ID + tdb.QueryRow(t, "SELECT ($1::REGCLASS)::INT", test.tableName).Scan(&tableID) + + // Run the testCase's setupDesc function to prepare an index backfill + // mutation. Also, create an associated job and set it up to be blocked. + s0 := tc.Server(0) + lm := s0.LeaseManager().(*lease.Manager) + ie := s0.InternalExecutor().(sqlutil.InternalExecutor) + settings := s0.ClusterSettings() + execCfg := s0.ExecutorConfig().(sql.ExecutorConfig) + jr := s0.JobRegistry().(*jobs.Registry) + var j *jobs.Job + var table catalog.TableDescriptor + require.NoError(t, descs.Txn(ctx, settings, lm, ie, s0.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + mut, err := descriptors.GetMutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + test.setupDesc(t, mut) + span := mut.PrimaryIndexSpan(execCfg.Codec) + resumeSpanList := make([]jobspb.ResumeSpanList, len(mut.Mutations)) + for i := range mut.Mutations { + resumeSpanList[i] = jobspb.ResumeSpanList{ + ResumeSpans: []roachpb.Span{span}, + } + } + j, err = jr.CreateAdoptableJobWithTxn(ctx, jobs.Record{ + Description: "testing", + Statement: "testing", + Username: security.RootUserName(), + DescriptorIDs: []descpb.ID{tableID}, + Details: jobspb.SchemaChangeDetails{ + DescID: tableID, + TableMutationID: 1, + FormatVersion: jobspb.JobResumerFormatVersion, + ResumeSpanList: resumeSpanList, + }, + Progress: jobspb.SchemaChangeGCProgress{}, + }, txn) + if err != nil { + return err + } + mut.MutationJobs = append(mut.MutationJobs, descpb.TableDescriptor_MutationJob{ + JobID: *j.ID(), + MutationID: 1, + }) + jobToBlock.Store(*j.ID()) + mut.MaybeIncrementVersion() + table = mut.ImmutableCopy().(catalog.TableDescriptor) + return descriptors.WriteDesc(ctx, false /* kvTrace */, mut, txn) + })) + _, err := lm.WaitForOneVersion(ctx, tableID, retry.Options{}) + require.NoError(t, err) + + // Run the index backfill + changer := sql.NewSchemaChangerForTesting( + tableID, 1, execCfg.NodeID.SQLInstanceID(), s0.DB(), lm, jr, &execCfg, settings) + changer.SetJob(j) + spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, test.indexToBackfill)} + require.NoError(t, changer.TestingDistIndexBackfill( + ctx, table.GetVersion(), spans, backfill.IndexMutationFilter, 10, + )) + + // Make the mutation complete, then read the index and validate that it + // has the expected contents. + require.NoError(t, descs.Txn(ctx, settings, lm, ie, s0.DB(), func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + table, err := descriptors.GetMutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + toComplete := len(table.Mutations) + for i := 0; i < toComplete; i++ { + mut := table.Mutations[i] + require.NoError(t, table.MakeMutationComplete(mut)) + } + table.Mutations = table.Mutations[toComplete:] + datums := fetchIndex(ctx, t, txn, table, test.indexToBackfill) + require.Equal(t, test.expectedContents, datumSliceToStrMatrix(datums)) + return nil + })) + } + + for _, test := range indexBackfillerTestCases { + t.Run(test.name, func(t *testing.T) { run(t, test) }) + } +} diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index a9fe681e925e..635c7aac6db1 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -428,9 +428,10 @@ func NewDatumRowConverter( // Here, computeExprs will be nil if there's no computed column, or // the list of computed expressions (including nil, for those columns // that are not computed) otherwise, according to colsOrdered. - c.computedExprs, err = schemaexpr.MakeComputedExprs( + c.computedExprs, _, err = schemaexpr.MakeComputedExprs( ctx, colsOrdered, + c.tableDesc.GetPublicColumns(), c.tableDesc, tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.Name)), c.EvalCtx, diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d83a1a2e20ea..1761b3d427e7 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -108,7 +108,7 @@ func NewSchemaChangerForTesting( tableID descpb.ID, mutationID descpb.MutationID, sqlInstanceID base.SQLInstanceID, - db kv.DB, + db *kv.DB, leaseMgr *lease.Manager, jobRegistry *jobs.Registry, execCfg *ExecutorConfig, @@ -118,7 +118,7 @@ func NewSchemaChangerForTesting( descID: tableID, mutationID: mutationID, sqlInstanceID: sqlInstanceID, - db: &db, + db: db, leaseMgr: leaseMgr, jobRegistry: jobRegistry, settings: settings, @@ -130,7 +130,10 @@ func NewSchemaChangerForTesting( ) sqlutil.InternalExecutor { return execCfg.InternalExecutor }, - metrics: NewSchemaChangerMetrics(), + metrics: NewSchemaChangerMetrics(), + clock: db.Clock(), + distSQLPlanner: execCfg.DistSQLPlanner, + testingKnobs: &SchemaChangerTestingKnobs{}, } } diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go new file mode 100644 index 000000000000..deea56cc7486 --- /dev/null +++ b/pkg/sql/schema_changer_helpers_test.go @@ -0,0 +1,37 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/backfill" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" +) + +// TestingDistIndexBackfill exposes the index backfill functionality for +// testing. +func (sc *SchemaChanger) TestingDistIndexBackfill( + ctx context.Context, + version descpb.DescriptorVersion, + targetSpans []roachpb.Span, + filter backfill.MutationFilter, + indexBackfillBatchSize int64, +) error { + return sc.distIndexBackfill(ctx, version, targetSpans, filter, indexBackfillBatchSize) +} + +// SetJob sets the job. +func (sc *SchemaChanger) SetJob(job *jobs.Job) { + sc.job = job +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 311658a5b079..eb2ea49e59cf 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -113,7 +113,7 @@ func TestSchemaChangeProcess(t *testing.T) { jobRegistry := s.JobRegistry().(*jobs.Registry) defer stopper.Stop(context.Background()) changer := sql.NewSchemaChangerForTesting( - id, 0, instance, *kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings()) + id, 0, instance, kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings()) if _, err := sqlDB.Exec(` CREATE DATABASE t; @@ -148,7 +148,7 @@ INSERT INTO t.test VALUES ('a', 'b'), ('c', 'd'); index.ID = tableDesc.NextIndexID tableDesc.NextIndexID++ changer = sql.NewSchemaChangerForTesting( - id, tableDesc.NextMutationID, instance, *kvDB, leaseMgr, jobRegistry, + id, tableDesc.NextMutationID, instance, kvDB, leaseMgr, jobRegistry, &execCfg, cluster.MakeTestingClusterSettings(), ) tableDesc.Mutations = append(tableDesc.Mutations, descpb.DescriptorMutation{