Skip to content

Commit

Permalink
sql: deal with computed and default column generation in index backfi…
Browse files Browse the repository at this point in the history
…ller

This commit was mostly motivated by work on the new schema changer to enable
the behavior described in cockroachdb#47989, however, it also turns out to be a
prerequisite of the work to use virtual computed columns in secondary indexes.
Given we haven't released virtual computed columns, I'm going to omit a
release not for this PR.

The basic idea is that we need to track dependencies for computed columns
to make sure they are retrieved. Default expressions need to be evaluated
first. Much of the code is testing.

Release note: None
ajwerner committed Jan 28, 2021

Verified

This commit was signed with the committer’s verified signature.
1 parent 4031b6f commit 5b2198f
Showing 8 changed files with 660 additions and 55 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -439,6 +439,7 @@ go_test(
"run_control_test.go",
"scan_test.go",
"scatter_test.go",
"schema_changer_helpers_test.go",
"schema_changer_test.go",
"scrub_test.go",
"sequence_test.go",
@@ -495,6 +496,7 @@ go_test(
"//pkg/server/status/statuspb:statuspb_go_proto",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/sql/backfill",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
@@ -586,6 +588,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
"@org_golang_google_protobuf//proto",
"@org_golang_x_sync//errgroup",
],
)
206 changes: 177 additions & 29 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
@@ -167,9 +167,10 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
computedExprs, err := schemaexpr.MakeComputedExprs(
computedExprs, _, err := schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
@@ -210,9 +211,10 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
if err != nil {
return err
}
computedExprs, err = schemaexpr.MakeComputedExprs(
computedExprs, _, err = schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
@@ -396,11 +398,27 @@ type IndexBackfiller struct {
types []*types.T
rowVals tree.Datums
evalCtx *tree.EvalContext
cols []descpb.ColumnDescriptor

// cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in
// the descriptor.
cols []descpb.ColumnDescriptor

// addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of
// this index which are not computed.
addedCols []descpb.ColumnDescriptor

// computedCols are the columns in this index which are computed and do
// not have concrete values in the source index. This is virtual computed
// columns and stored computed columns which are non-public.
computedCols []descpb.ColumnDescriptor

// Map of columns which need to be evaluated to their expressions.
colExprs map[descpb.ColumnID]tree.TypedExpr

// predicates is a map of indexes to partial index predicate expressions. It
// includes entries for partial indexes only.
predicates map[descpb.IndexID]tree.TypedExpr

// indexesToEncode is a list of indexes to encode entries for a given row.
// It is a field of IndexBackfiller to avoid allocating a slice for each row
// backfilled.
@@ -437,26 +455,108 @@ func (ib *IndexBackfiller) InitForLocalUse(
// Initialize ib.added.
valNeededForCol := ib.initIndexes(desc)

predicates, colExprs, referencedColumns, err := constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx,
)
if err != nil {
return err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// constructExprs is a helper to construct the index and column expressions
// required for an index backfill. It also returns the set of columns referenced
// by any of these exprs.
//
// The cols argument is the full set of cols in the table (including those being
// added). The addedCols argument is the set of non-public, non-computed
// columns. The computedCols argument is the set of computed columns in the
// index.
func constructExprs(
ctx context.Context,
desc catalog.TableDescriptor,
addedIndexes []*descpb.IndexDescriptor,
cols, addedCols, computedCols []descpb.ColumnDescriptor,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
) (
predicates map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
referencedColumns catalog.TableColSet,
_ error,
) {
// Convert any partial index predicate strings into expressions.
predicates, predicateRefColIDs, err := schemaexpr.MakePartialIndexExprs(
ctx,
ib.added,
ib.cols,
addedIndexes,
cols,
desc,
evalCtx,
semaCtx,
)
if err != nil {
return err
return nil, nil, catalog.TableColSet{}, err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})
// Determine the exprs for newly added, non-computed columns.
defaultExprs, err := schemaexpr.MakeDefaultExprs(
ctx, addedCols, &transform.ExprTransformContext{}, evalCtx, semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
// TODO(ajwerner): Rethink this table name.
tn := tree.NewUnqualifiedTableName(tree.Name(desc.GetName()))
computedExprs, computedExprRefColIDs, err := schemaexpr.MakeComputedExprs(
ctx,
computedCols,
cols,
desc,
tn,
evalCtx,
semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

numColExprs := len(addedCols) + len(computedCols)
colExprs = make(map[descpb.ColumnID]tree.TypedExpr, numColExprs)
var addedColSet catalog.TableColSet
for i := range defaultExprs {
id := addedCols[i].ID
colExprs[id] = defaultExprs[i]
addedColSet.Add(id)
}
for i := range computedCols {
id := computedCols[i].ID
colExprs[id] = computedExprs[i]
}

// Ensure that only existing columns are added to the needed set. Otherwise
// the fetcher may complain that the columns don't exist. There's a somewhat
// subtle invariant that if any dependencies exist between computed columns
// and default values that the computed column be a later column and thus the
// default value will have been populated. Computed columns are not permitted
// to reference each other.
addToReferencedColumns := func(cols catalog.TableColSet) {
cols.ForEach(func(col descpb.ColumnID) {
if !addedColSet.Contains(col) {
referencedColumns.Add(col)
}
})
}
addToReferencedColumns(predicateRefColIDs)
addToReferencedColumns(computedExprRefColIDs)
return predicates, colExprs, referencedColumns, nil
}

// InitForDistributedUse initializes an IndexBackfiller for use as part of a
@@ -475,29 +575,27 @@ func (ib *IndexBackfiller) InitForDistributedUse(

evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
var predicateRefColIDs catalog.TableColSet
var colExprs map[descpb.ColumnID]tree.TypedExpr
var referencedColumns catalog.TableColSet

// Install type metadata in the target descriptors, as well as resolve any
// user defined types in partial index predicate expressions.
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn)
// Hydrate all the types present in the table.
if err := typedesc.HydrateTypesInTableDescriptor(ctx, desc.TableDesc(), resolver); err != nil {
if err = typedesc.HydrateTypesInTableDescriptor(
ctx, desc.TableDesc(), resolver,
); err != nil {
return err
}
// Set up a SemaContext to type check the default and computed expressions.
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = resolver

// Convert any partial index predicate strings into expressions.
var err error
predicates, predicateRefColIDs, err =
schemaexpr.MakePartialIndexExprs(ctx, ib.added, ib.cols, desc, evalCtx, &semaCtx)
if err != nil {
return err
}

return nil
predicates, colExprs, referencedColumns, err = constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, &semaCtx,
)
return err
}); err != nil {
return err
}
@@ -508,11 +606,11 @@ func (ib *IndexBackfiller) InitForDistributedUse(

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// Close releases the resources used by the IndexBackfiller.
@@ -546,7 +644,14 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6
// initCols is a helper to populate column metadata of an IndexBackfiller. It
// populates the cols and colIdxMap fields.
func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
ib.cols = desc.Columns
for i := range desc.Columns {
col := &desc.Columns[i]
ib.cols = append(ib.cols, *col)
if col.IsComputed() && col.Virtual {
ib.computedCols = append(ib.computedCols, *col)
}
}
ib.cols = append([]descpb.ColumnDescriptor(nil), desc.Columns...)

// If there are ongoing mutations, add columns that are being added and in
// the DELETE_AND_WRITE_ONLY state.
@@ -558,6 +663,11 @@ func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
m.Direction == descpb.DescriptorMutation_ADD &&
m.State == descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY {
ib.cols = append(ib.cols, *column)
if column.IsComputed() {
ib.computedCols = append(ib.computedCols, *column)
} else {
ib.addedCols = append(ib.addedCols, *column)
}
}
}
}
@@ -586,8 +696,11 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
ib.added = append(ib.added, idx)
for i := range ib.cols {
id := ib.cols[i].ID
if idx.ContainsColumnID(id) ||
idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding {
idxContainsColumn := idx.ContainsColumnID(id)
isPrimaryIndex := idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding
if (idxContainsColumn || isPrimaryIndex) &&
!ib.cols[i].Virtual &&
i < len(desc.Columns) {
valNeededForCol.Add(i)
}
}
@@ -601,12 +714,14 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
func (ib *IndexBackfiller) init(
evalCtx *tree.EvalContext,
predicateExprs map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
valNeededForCol util.FastIntSet,
desc *tabledesc.Immutable,
mon *mon.BytesMonitor,
) error {
ib.evalCtx = evalCtx
ib.predicates = predicateExprs
ib.colExprs = colExprs

// Initialize a list of index descriptors to encode entries for. If there
// are no partial indexes, the list is equivalent to the list of indexes
@@ -709,6 +824,28 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
memUsedPerChunk += indexEntriesPerRowInitialBufferSize
buffer := make([]rowenc.IndexEntry, len(ib.added))
evaluateExprs := func(cols []descpb.ColumnDescriptor) error {
for i := range cols {
colID := cols[i].ID
texpr, ok := ib.colExprs[colID]
if !ok {
continue
}
val, err := texpr.Eval(ib.evalCtx)
if err != nil {
return err
}
colIdx, ok := ib.colIdxMap.Get(colID)
if !ok {
return errors.AssertionFailedf(
"failed to find index for column %d in %d",
colID, tableDesc.GetID(),
)
}
ib.rowVals[colIdx] = val
}
return nil
}
for i := int64(0); i < chunkSize; i++ {
encRow, _, _, err := ib.fetcher.NextRow(ctx)
if err != nil {
@@ -726,6 +863,17 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(

iv.CurSourceRow = ib.rowVals

// First populate default values, then populate computed expressions which
// may reference default values.
if len(ib.colExprs) > 0 {
if err := evaluateExprs(ib.addedCols); err != nil {
return nil, nil, 0, err
}
if err := evaluateExprs(ib.computedCols); err != nil {
return nil, nil, 0, err
}
}

// If there are any partial indexes being added, make a list of the
// indexes that the current row should be added to.
if len(ib.predicates) > 0 {
Loading

0 comments on commit 5b2198f

Please sign in to comment.