Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: deal with computed and default column generation in index backfi… #58990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ go_test(
"run_control_test.go",
"scan_test.go",
"scatter_test.go",
"schema_changer_helpers_test.go",
"schema_changer_test.go",
"scrub_test.go",
"sequence_test.go",
Expand Down Expand Up @@ -495,6 +496,7 @@ go_test(
"//pkg/server/status/statuspb:statuspb_go_proto",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/sql/backfill",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
Expand Down Expand Up @@ -586,6 +588,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
"@org_golang_google_protobuf//proto",
"@org_golang_x_sync//errgroup",
],
)
Expand Down
206 changes: 177 additions & 29 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
computedExprs, err := schemaexpr.MakeComputedExprs(
computedExprs, _, err := schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
Expand Down Expand Up @@ -210,9 +211,10 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
if err != nil {
return err
}
computedExprs, err = schemaexpr.MakeComputedExprs(
computedExprs, _, err = schemaexpr.MakeComputedExprs(
ctx,
cb.added,
desc.GetPublicColumns(),
desc,
tree.NewUnqualifiedTableName(tree.Name(desc.Name)),
evalCtx,
Expand Down Expand Up @@ -396,11 +398,27 @@ type IndexBackfiller struct {
types []*types.T
rowVals tree.Datums
evalCtx *tree.EvalContext
cols []descpb.ColumnDescriptor

// cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in
// the descriptor.
cols []descpb.ColumnDescriptor

// addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of
// this index which are not computed.
addedCols []descpb.ColumnDescriptor

// computedCols are the columns in this index which are computed and do
// not have concrete values in the source index. This is virtual computed
// columns and stored computed columns which are non-public.
computedCols []descpb.ColumnDescriptor

// Map of columns which need to be evaluated to their expressions.
colExprs map[descpb.ColumnID]tree.TypedExpr

// predicates is a map of indexes to partial index predicate expressions. It
// includes entries for partial indexes only.
predicates map[descpb.IndexID]tree.TypedExpr

// indexesToEncode is a list of indexes to encode entries for a given row.
// It is a field of IndexBackfiller to avoid allocating a slice for each row
// backfilled.
Expand Down Expand Up @@ -437,26 +455,108 @@ func (ib *IndexBackfiller) InitForLocalUse(
// Initialize ib.added.
valNeededForCol := ib.initIndexes(desc)

predicates, colExprs, referencedColumns, err := constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx,
)
if err != nil {
return err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// constructExprs is a helper to construct the index and column expressions
// required for an index backfill. It also returns the set of columns referenced
// by any of these exprs.
//
// The cols argument is the full set of cols in the table (including those being
// added). The addedCols argument is the set of non-public, non-computed
// columns. The computedCols argument is the set of computed columns in the
// index.
func constructExprs(
ctx context.Context,
desc catalog.TableDescriptor,
addedIndexes []*descpb.IndexDescriptor,
cols, addedCols, computedCols []descpb.ColumnDescriptor,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
) (
predicates map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
referencedColumns catalog.TableColSet,
_ error,
) {
// Convert any partial index predicate strings into expressions.
predicates, predicateRefColIDs, err := schemaexpr.MakePartialIndexExprs(
ctx,
ib.added,
ib.cols,
addedIndexes,
cols,
desc,
evalCtx,
semaCtx,
)
if err != nil {
return err
return nil, nil, catalog.TableColSet{}, err
}

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})
// Determine the exprs for newly added, non-computed columns.
defaultExprs, err := schemaexpr.MakeDefaultExprs(
ctx, addedCols, &transform.ExprTransformContext{}, evalCtx, semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
// TODO(ajwerner): Rethink this table name.
tn := tree.NewUnqualifiedTableName(tree.Name(desc.GetName()))
computedExprs, computedExprRefColIDs, err := schemaexpr.MakeComputedExprs(
ctx,
computedCols,
cols,
desc,
tn,
evalCtx,
semaCtx,
)
if err != nil {
return nil, nil, catalog.TableColSet{}, err
}

numColExprs := len(addedCols) + len(computedCols)
colExprs = make(map[descpb.ColumnID]tree.TypedExpr, numColExprs)
var addedColSet catalog.TableColSet
for i := range defaultExprs {
id := addedCols[i].ID
colExprs[id] = defaultExprs[i]
addedColSet.Add(id)
}
for i := range computedCols {
id := computedCols[i].ID
colExprs[id] = computedExprs[i]
}

// Ensure that only existing columns are added to the needed set. Otherwise
// the fetcher may complain that the columns don't exist. There's a somewhat
// subtle invariant that if any dependencies exist between computed columns
// and default values that the computed column be a later column and thus the
// default value will have been populated. Computed columns are not permitted
// to reference each other.
addToReferencedColumns := func(cols catalog.TableColSet) {
cols.ForEach(func(col descpb.ColumnID) {
if !addedColSet.Contains(col) {
referencedColumns.Add(col)
}
})
}
addToReferencedColumns(predicateRefColIDs)
addToReferencedColumns(computedExprRefColIDs)
return predicates, colExprs, referencedColumns, nil
}

// InitForDistributedUse initializes an IndexBackfiller for use as part of a
Expand All @@ -475,29 +575,27 @@ func (ib *IndexBackfiller) InitForDistributedUse(

evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
var predicateRefColIDs catalog.TableColSet
var colExprs map[descpb.ColumnID]tree.TypedExpr
var referencedColumns catalog.TableColSet

// Install type metadata in the target descriptors, as well as resolve any
// user defined types in partial index predicate expressions.
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn)
// Hydrate all the types present in the table.
if err := typedesc.HydrateTypesInTableDescriptor(ctx, desc.TableDesc(), resolver); err != nil {
if err = typedesc.HydrateTypesInTableDescriptor(
ctx, desc.TableDesc(), resolver,
); err != nil {
return err
}
// Set up a SemaContext to type check the default and computed expressions.
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = resolver

// Convert any partial index predicate strings into expressions.
var err error
predicates, predicateRefColIDs, err =
schemaexpr.MakePartialIndexExprs(ctx, ib.added, ib.cols, desc, evalCtx, &semaCtx)
if err != nil {
return err
}

return nil
predicates, colExprs, referencedColumns, err = constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, &semaCtx,
)
return err
}); err != nil {
return err
}
Expand All @@ -508,11 +606,11 @@ func (ib *IndexBackfiller) InitForDistributedUse(

// Add the columns referenced in the predicate to valNeededForCol so that
// columns necessary to evaluate the predicate expression are fetched.
predicateRefColIDs.ForEach(func(col descpb.ColumnID) {
referencedColumns.ForEach(func(col descpb.ColumnID) {
valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, valNeededForCol, desc, mon)
return ib.init(evalCtx, predicates, colExprs, valNeededForCol, desc, mon)
}

// Close releases the resources used by the IndexBackfiller.
Expand Down Expand Up @@ -546,7 +644,14 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6
// initCols is a helper to populate column metadata of an IndexBackfiller. It
// populates the cols and colIdxMap fields.
func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
ib.cols = desc.Columns
for i := range desc.Columns {
col := &desc.Columns[i]
ib.cols = append(ib.cols, *col)
if col.IsComputed() && col.Virtual {
ib.computedCols = append(ib.computedCols, *col)
}
}
ib.cols = append([]descpb.ColumnDescriptor(nil), desc.Columns...)

// If there are ongoing mutations, add columns that are being added and in
// the DELETE_AND_WRITE_ONLY state.
Expand All @@ -558,6 +663,11 @@ func (ib *IndexBackfiller) initCols(desc *tabledesc.Immutable) {
m.Direction == descpb.DescriptorMutation_ADD &&
m.State == descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY {
ib.cols = append(ib.cols, *column)
if column.IsComputed() {
ib.computedCols = append(ib.computedCols, *column)
} else {
ib.addedCols = append(ib.addedCols, *column)
}
}
}
}
Expand Down Expand Up @@ -586,8 +696,11 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
ib.added = append(ib.added, idx)
for i := range ib.cols {
id := ib.cols[i].ID
if idx.ContainsColumnID(id) ||
idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding {
idxContainsColumn := idx.ContainsColumnID(id)
isPrimaryIndex := idx.GetEncodingType(desc.GetPrimaryIndexID()) == descpb.PrimaryIndexEncoding
if (idxContainsColumn || isPrimaryIndex) &&
!ib.cols[i].Virtual &&
i < len(desc.Columns) {
valNeededForCol.Add(i)
}
}
Expand All @@ -601,12 +714,14 @@ func (ib *IndexBackfiller) initIndexes(desc *tabledesc.Immutable) util.FastIntSe
func (ib *IndexBackfiller) init(
evalCtx *tree.EvalContext,
predicateExprs map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
valNeededForCol util.FastIntSet,
desc *tabledesc.Immutable,
mon *mon.BytesMonitor,
) error {
ib.evalCtx = evalCtx
ib.predicates = predicateExprs
ib.colExprs = colExprs

// Initialize a list of index descriptors to encode entries for. If there
// are no partial indexes, the list is equivalent to the list of indexes
Expand Down Expand Up @@ -709,6 +824,28 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
memUsedPerChunk += indexEntriesPerRowInitialBufferSize
buffer := make([]rowenc.IndexEntry, len(ib.added))
evaluateExprs := func(cols []descpb.ColumnDescriptor) error {
for i := range cols {
colID := cols[i].ID
texpr, ok := ib.colExprs[colID]
if !ok {
continue
}
val, err := texpr.Eval(ib.evalCtx)
if err != nil {
return err
}
colIdx, ok := ib.colIdxMap.Get(colID)
if !ok {
return errors.AssertionFailedf(
"failed to find index for column %d in %d",
colID, tableDesc.GetID(),
)
}
ib.rowVals[colIdx] = val
}
return nil
}
for i := int64(0); i < chunkSize; i++ {
encRow, _, _, err := ib.fetcher.NextRow(ctx)
if err != nil {
Expand All @@ -726,6 +863,17 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(

iv.CurSourceRow = ib.rowVals

// First populate default values, then populate computed expressions which
// may reference default values.
if len(ib.colExprs) > 0 {
if err := evaluateExprs(ib.addedCols); err != nil {
return nil, nil, 0, err
}
if err := evaluateExprs(ib.computedCols); err != nil {
return nil, nil, 0, err
}
}

// If there are any partial indexes being added, make a list of the
// indexes that the current row should be added to.
if len(ib.predicates) > 0 {
Expand Down
Loading