Skip to content

Commit

Permalink
sql: deal with computed and default column generation in index backfi…
Browse files Browse the repository at this point in the history
…ller

This commit was mostly motivated by work on the new schema changer to enable
the behavior described in cockroachdb#47989, however, it also turns out to be a
prerequisite of the work to use virtual computed columns in secondary indexes.
Given we haven't released virtual computed columns, I'm going to omit a
release not for this PR.

The basic idea is that we need to track dependencies for computed columns
to make sure they are retrieved. Default expressions need to be evaluated
first. Much of the code is testing.

Release note: None
  • Loading branch information
ajwerner committed Jan 28, 2021
1 parent 1a459a8 commit ebc5e1b
Show file tree
Hide file tree
Showing 8 changed files with 657 additions and 53 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ go_test(
"run_control_test.go",
"scan_test.go",
"scatter_test.go",
"schema_changer_helpers_test.go",
"schema_changer_test.go",
"scrub_test.go",
"sequence_test.go",
Expand Down Expand Up @@ -494,6 +495,7 @@ go_test(
"//pkg/server/status/statuspb:statuspb_go_proto",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/sql/backfill",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
Expand Down Expand Up @@ -585,6 +587,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
"@org_golang_google_protobuf//proto",
"@org_golang_x_sync//errgroup",
],
)
Expand Down
205 changes: 176 additions & 29 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
computedExprs, err := schemaexpr.MakeComputedExprs(
computedExprs, _, err := schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
Expand Down Expand Up @@ -210,9 +211,10 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
if err != nil {
return err
}
computedExprs, err = schemaexpr.MakeComputedExprs(
computedExprs, _, err = schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
Expand Down Expand Up @@ -396,11 +398,27 @@ type IndexBackfiller struct {
types []*types.T
rowVals tree.Datums
evalCtx *tree.EvalContext
cols []descpb.ColumnDescriptor

// cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in
// the descriptor.
cols []descpb.ColumnDescriptor

// addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of
// this index which are not computed.
addedCols []descpb.ColumnDescriptor

// computedCols are the column in this index which are computed and are do
// not have concrete values in the source index. This is virtual computed
// columns and stored computed columns which are non-public.
computedCols []descpb.ColumnDescriptor

// Map of columns which need to be evaluated to their expressions.
colExprs map[descpb.ColumnID]tree.TypedExpr

// predicates is a map of indexes to partial index predicate expressions. It
// includes entries for partial indexes only.
predicates map[descpb.IndexID]tree.TypedExpr

// indexesToEncode is a list of indexes to encode entries for a given row.
// It is a field of IndexBackfiller to avoid allocating a slice for each row
// backfilled.
Expand Down Expand Up @@ -437,26 +455,108 @@ func (ib *IndexBackfiller) InitForLocalUse(
// Initialize ib.added.
valNeededForCol := ib.initIndexes(desc)

predicates, colExprs, referencedColumns, err := constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx,
)
if err != nil {
return err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// constructExprs is a helper to construct the index and column expressions
// required for an index backfill. It also returns the set of columns referenced
// by any of these exprs.
//
// The cols argument is the full set of cols in the table (including those being
// added). The addedCols argument is the set of non-public, non-computed
// columns. The computedCols argument is the set of computed columns in the
// index.
func constructExprs(
ctx context.Context,
desc catalog.TableDescriptor,
addedIndexes []*descpb.IndexDescriptor,
cols, addedCols, computedCols []descpb.ColumnDescriptor,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
) (
predicates map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
referencedColumns catalog.TableColSet,
_ error,
) {
// Convert any partial index predicate strings into expressions.
predicates, predicateRefColIDs, err := schemaexpr.MakePartialIndexExprs(
ctx,
ib.added,
ib.cols,
addedIndexes,
cols,
desc,
evalCtx,
semaCtx,
)
if err != nil {
return err
return nil, nil, catalog.TableColSet{}, err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})
// Determine the exprs for newly added, non-computed columns.
defaultExprs, err := schemaexpr.MakeDefaultExprs(
ctx, addedCols, &transform.ExprTransformContext{}, evalCtx, semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
// TODO(ajwerner): Rethink this table name.
tn := tree.NewUnqualifiedTableName(tree.Name(desc.GetName()))
computedExprs, computedExprRefColIDs, err := schemaexpr.MakeComputedExprs(
ctx,
computedCols,
cols,
desc,
tn,
evalCtx,
semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

numColExprs := len(addedCols) + len(computedCols)
colExprs = make(map[descpb.ColumnID]tree.TypedExpr, numColExprs)
var addedColSet catalog.TableColSet
for i := range defaultExprs {
id := addedCols[i].ID
colExprs[id] = defaultExprs[i]
addedColSet.Add(id)
}
for i := range computedCols {
id := computedCols[i].ID
colExprs[id] = computedExprs[i]
}

// Ensure that only existing columns are added to the needed set. Otherwise
// the fetcher may complain that the columns don't exist. There's a somewhat
// subtle invariant that if any dependencies exist between computed columns
// and default values that the computed column be a later column and thus the
// default value will have been populated. Computed columns are not permitted
// to reference each other.
addToReferencedColumns := func(cols catalog.TableColSet) {
cols.ForEach(func(col descpb.ColumnID) {
if !addedColSet.Contains(col) {
referencedColumns.Add(col)
}
})
}
addToReferencedColumns(predicateRefColIDs)
addToReferencedColumns(computedExprRefColIDs)
return predicates, colExprs, referencedColumns, nil
}

// InitForDistributedUse initializes an IndexBackfiller for use as part of a
Expand All @@ -475,29 +575,27 @@ func (ib *IndexBackfiller) InitForDistributedUse(

evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
var predicateRefColIDs catalog.TableColSet
var colExprs map[descpb.ColumnID]tree.TypedExpr
var referencedColumns catalog.TableColSet

// Install type metadata in the target descriptors, as well as resolve any
// user defined types in partial index predicate expressions.
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn)
// Hydrate all the types present in the table.
if err := typedesc.HydrateTypesInTableDescriptor(ctx, desc.TableDesc(), resolver); err != nil {
if err = typedesc.HydrateTypesInTableDescriptor(
ctx, desc.TableDesc(), resolver,
); err != nil {
return err
}
// Set up a SemaContext to type check the default and computed expressions.
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = resolver

// Convert any partial index predicate strings into expressions.
var err error
predicates, predicateRefColIDs, err =
schemaexpr.MakePartialIndexExprs(ctx, ib.added, ib.cols, desc, evalCtx, &semaCtx)
if err != nil {
return err
}

return nil
predicates, colExprs, referencedColumns, err = constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, &semaCtx,
)
return err
}); err != nil {
return err
}
Expand All @@ -508,11 +606,11 @@ func (ib *IndexBackfiller) InitForDistributedUse(

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// Close releases the resources used by the IndexBackfiller.
Expand Down Expand Up @@ -546,7 +644,14 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6
// initCols is a helper to populate column metadata of an IndexBackfiller. It
// populates the cols and colIdxMap fields.
func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
ib.cols = desc.Columns
for i := range desc.Columns {
col := &desc.Columns[i]
ib.cols = append(ib.cols, *col)
if col.IsComputed() && col.Virtual {
ib.computedCols = append(ib.computedCols, *col)
}
}
ib.cols = append([]descpb.ColumnDescriptor(nil), desc.Columns...)

// If there are ongoing mutations, add columns that are being added and in
// the DELETE_AND_WRITE_ONLY state.
Expand All @@ -558,6 +663,11 @@ func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
m.Direction == descpb.DescriptorMutation_ADD &&
m.State == descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY {
ib.cols = append(ib.cols, *column)
if column.IsComputed() {
ib.computedCols = append(ib.computedCols, *column)
} else {
ib.addedCols = append(ib.addedCols, *column)
}
}
}
}
Expand Down Expand Up @@ -586,8 +696,10 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
ib.added = append(ib.added, idx)
for i := range ib.cols {
id := ib.cols[i].ID
if idx.ContainsColumnID(id) ||
idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding {
idxContainsColumn := idx.ContainsColumnID(id)
isPrimaryIndex := idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding
if (idxContainsColumn || isPrimaryIndex) &&
!(ib.cols[i].Virtual || i >= len(desc.Columns)) {
valNeededForCol.Add(i)
}
}
Expand All @@ -601,12 +713,14 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
func (ib *IndexBackfiller) init(
evalCtx *tree.EvalContext,
predicateExprs map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
valNeededForCol util.FastIntSet,
desc *tabledesc.Immutable,
mon *mon.BytesMonitor,
) error {
ib.evalCtx = evalCtx
ib.predicates = predicateExprs
ib.colExprs = colExprs

// Initialize a list of index descriptors to encode entries for. If there
// are no partial indexes, the list is equivalent to the list of indexes
Expand Down Expand Up @@ -709,6 +823,28 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
memUsedPerChunk += indexEntriesPerRowInitialBufferSize
buffer := make([]rowenc.IndexEntry, len(ib.added))
evaluateExprs := func(cols []descpb.ColumnDescriptor) error {
for i := range cols {
colID := cols[i].ID
texpr, ok := ib.colExprs[colID]
if !ok {
continue
}
val, err := texpr.Eval(ib.evalCtx)
if err != nil {
return err
}
colIdx, ok := ib.colIdxMap.Get(colID)
if !ok {
return errors.AssertionFailedf(
"failed to find index for column %d in %d",
colID, tableDesc.GetID(),
)
}
ib.rowVals[colIdx] = val
}
return nil
}
for i := int64(0); i < chunkSize; i++ {
encRow, _, _, err := ib.fetcher.NextRow(ctx)
if err != nil {
Expand All @@ -726,6 +862,17 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(

iv.CurSourceRow = ib.rowVals

// First populate default values, then populate computed expressions which
// may reference default values.
if len(ib.colExprs) > 0 {
if err := evaluateExprs(ib.addedCols); err != nil {
return nil, nil, 0, err
}
if err := evaluateExprs(ib.computedCols); err != nil {
return nil, nil, 0, err
}
}

// If there are any partial indexes being added, make a list of the
// indexes that the current row should be added to.
if len(ib.predicates) > 0 {
Expand Down
Loading

0 comments on commit ebc5e1b

Please sign in to comment.