Skip to content

Commit

Permalink
Merge #74181
Browse files Browse the repository at this point in the history
74181: scmutationexec: use correct mutation IDs r=postamar a=postamar


    scmutationexec: absorb descriptorutils package
    
    This refactoring commit moves everything in the descriptorutils package
    to the scmutationexec package. It also removes unused helper functions
    from the latter package.
    
    Release note: None


    scmutationexec: use correct mutation IDs
    
    Previously, we'd run into issues when validating indexes because the
    declarative schema changer would enqueue descriptor mutations with
    incrementing IDs, which does not make sense outside of the legacy schema
    changer. This commit addresses this by having the declarative schema
    changer add mutations always with the same ID and enforcing this with
    a new validation check.
    
    Release note: None


    sql: make first mutation public when validating inverted index
    
    Previously we did not bother to make the first mutation public in the
    synthetic descriptor when validating inverted indexes, unlike when
    validating forward indexes. While this had not been necessary in the
    legacy schema changer, it is necessary in the declarative schema
    changer.
    
    So far we've been working around this by making the inverted index
    creation depend on any added virtual columns being made public first.
    This change is motivated by a recent commit on mutation ID generation
    in the declarative schema changer, which removes the limitations that
    prevented the mutation-made-public synthetic descriptor from being
    correct.
    
    Release note: None

Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Dec 22, 2021
2 parents 3a94842 + 439f4ea commit 077f63d
Show file tree
Hide file tree
Showing 23 changed files with 210 additions and 310 deletions.
21 changes: 19 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,7 +1913,15 @@ func revalidateIndexes(
}
}
if len(forward) > 0 {
if err := sql.ValidateForwardIndexes(ctx, tableDesc.MakePublic(), forward, runner, false, true, sessiondata.InternalExecutorOverride{}); err != nil {
if err := sql.ValidateForwardIndexes(
ctx,
tableDesc.MakePublic(),
forward,
runner,
false, /* withFirstMutationPublic */
true, /* gatherAllInvalid */
sessiondata.InternalExecutorOverride{},
); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = invalid.Indexes
} else {
Expand All @@ -1922,7 +1930,16 @@ func revalidateIndexes(
}
}
if len(inverted) > 0 {
if err := sql.ValidateInvertedIndexes(ctx, execCfg.Codec, tableDesc.MakePublic(), inverted, runner, true, sessiondata.InternalExecutorOverride{}); err != nil {
if err := sql.ValidateInvertedIndexes(
ctx,
execCfg.Codec,
tableDesc.MakePublic(),
inverted,
runner,
false, /* withFirstMutationPublic */
true, /* gatherAllInvalid */
sessiondata.InternalExecutorOverride{},
); err != nil {
if invalid := (sql.InvalidIndexesError{}); errors.As(err, &invalid) {
invalidIndexes[tableDesc.ID] = append(invalidIndexes[tableDesc.ID], invalid.Indexes...)
} else {
Expand Down
55 changes: 42 additions & 13 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,12 +1424,29 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

if len(forwardIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateForwardIndexes(ctx, tableDesc, forwardIndexes, runHistoricalTxn, true /* withFirstMutationPubic */, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
return ValidateForwardIndexes(
ctx,
tableDesc,
forwardIndexes,
runHistoricalTxn,
true, /* withFirstMutationPubic */
false, /* gatherAllInvalid */
sessiondata.InternalExecutorOverride{},
)
})
}
if len(invertedIndexes) > 0 {
grp.GoCtx(func(ctx context.Context) error {
return ValidateInvertedIndexes(ctx, sc.execCfg.Codec, tableDesc, invertedIndexes, runHistoricalTxn, false /* gatherAllInvalid */, sessiondata.InternalExecutorOverride{})
return ValidateInvertedIndexes(
ctx,
sc.execCfg.Codec,
tableDesc,
invertedIndexes,
runHistoricalTxn,
true, /* withFirstMutationPublic */
false, /* gatherAllInvalid */
sessiondata.InternalExecutorOverride{},
)
})
}
if err := grp.Wait(); err != nil {
Expand Down Expand Up @@ -1461,6 +1478,7 @@ func ValidateInvertedIndexes(
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
runHistoricalTxn sqlutil.HistoricalInternalExecTxnRunner,
withFirstMutationPublic bool,
gatherAllInvalid bool,
execOverride sessiondata.InternalExecutorOverride,
) error {
Expand Down Expand Up @@ -1525,11 +1543,22 @@ func ValidateInvertedIndexes(

grp.GoCtx(func(ctx context.Context) error {
defer close(countReady[i])
desc := tableDesc
if withFirstMutationPublic {
// Make the mutations public in an in-memory copy of the descriptor and
// add it to the Collection's synthetic descriptors, so that we can use
// SQL below to perform the validation.
fakeDesc, err := tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraints)
if err != nil {
return err
}
desc = fakeDesc
}

start := timeutil.Now()

colID := idx.InvertedColumnID()
col, err := tableDesc.FindColumnWithID(colID)
col, err := desc.FindColumnWithID(colID)
if err != nil {
return err
}
Expand All @@ -1555,20 +1584,20 @@ func ValidateInvertedIndexes(
if geoindex.IsEmptyConfig(&geoConfig) {
stmt = fmt.Sprintf(
`SELECT coalesce(sum_int(crdb_internal.num_inverted_index_entries(%s, %d)), 0) FROM [%d AS t]`,
colNameOrExpr, idx.GetVersion(), tableDesc.GetID(),
colNameOrExpr, idx.GetVersion(), desc.GetID(),
)
} else {
stmt = fmt.Sprintf(
`SELECT coalesce(sum_int(crdb_internal.num_geo_inverted_index_entries(%d, %d, %s)), 0) FROM [%d AS t]`,
tableDesc.GetID(), idx.GetID(), colNameOrExpr, tableDesc.GetID(),
desc.GetID(), idx.GetID(), colNameOrExpr, desc.GetID(),
)
}
// If the index is a partial index the predicate must be added
// as a filter to the query.
if idx.IsPartial() {
stmt = fmt.Sprintf(`%s WHERE %s`, stmt, idx.GetPredicate())
}
return ie.WithSyntheticDescriptors([]catalog.Descriptor{tableDesc}, func() error {
return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
row, err := ie.QueryRowEx(ctx, "verify-inverted-idx-count", txn, execOverride, stmt)
if err != nil {
return err
Expand All @@ -1583,7 +1612,7 @@ func ValidateInvertedIndexes(
return err
}
log.Infof(ctx, "%s %s expected inverted index count = %d, took %s",
tableDesc.GetName(), colNameOrExpr, expectedCount[i], timeutil.Since(start))
desc.GetName(), colNameOrExpr, expectedCount[i], timeutil.Since(start))
return nil
})
}
Expand Down Expand Up @@ -1669,16 +1698,16 @@ func ValidateForwardIndexes(
}
}

var desc catalog.TableDescriptor = tableDesc
desc := tableDesc
if withFirstMutationPublic {
// Make the mutations public in an in-memory copy of the descriptor and
// add it to the Collection's synthetic descriptors, so that we can use
// SQL below to perform the validation.
var err error
desc, err = tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraints)
fakeDesc, err := tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraints)
if err != nil {
return err
}
desc = fakeDesc
}

// Retrieve the row count in the index.
Expand Down Expand Up @@ -1760,17 +1789,17 @@ func ValidateForwardIndexes(
var tableRowCountTime time.Duration
start := timeutil.Now()

var desc catalog.TableDescriptor = tableDesc
desc := tableDesc
if withFirstMutationPublic {
// The query to count the expected number of rows can reference columns
// added earlier in the same mutation. Make the mutations public in an
// in-memory copy of the descriptor and add it to the Collection's synthetic
// descriptors, so that we can use SQL below to perform the validation.
descI, err := tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraintsAndPKSwaps)
fakeDesc, err := tableDesc.MakeFirstMutationPublic(catalog.IgnoreConstraintsAndPKSwaps)
if err != nil {
return err
}
desc = descI.(*tabledesc.Mutable)
desc = fakeDesc
}

// Count the number of rows in the table.
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ func (desc *wrapper) ValidateTxnCommit(
vea.Report(unimplemented.NewWithIssue(48026,
"primary key dropped without subsequent addition of new primary key in same transaction"))
}
// Check that the mutation ID values are appropriately set when a declarative
// schema change is underway.
if n := len(desc.Mutations); n > 0 && desc.NewSchemaChangeJobID != 0 {
lastMutationID := desc.Mutations[n-1].MutationID
if lastMutationID != desc.NextMutationID {
vea.Report(errors.AssertionFailedf(
"expected next mutation ID to be %d in table undergoing declarative schema change, found %d instead",
lastMutationID, desc.NextMutationID))
}
}
}

// GetReferencedDescIDs returns the IDs of all descriptors referenced by
Expand Down Expand Up @@ -435,7 +445,7 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
// of the descriptor, in both the new and old schema change jobs.)
if len(desc.MutationJobs) > 0 && desc.NewSchemaChangeJobID != 0 {
vea.Report(errors.AssertionFailedf(
"invalid concurrent new-style schema change job %d and old-style schema change jobs %v",
"invalid concurrent declarative schema change job %d and legacy schema change jobs %v",
desc.NewSchemaChangeJobID, desc.MutationJobs))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ func (p *planner) initiateDropTable(
return errors.Errorf("table %q is already being dropped", tableDesc.Name)
}

// Exit early with an error if the table is undergoing a new-style schema
// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if tableDesc.NewSchemaChangeJobID != 0 {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"cannot perform a schema change on table %q while it is undergoing a new-style schema change",
"cannot perform a schema change on table %q while it is undergoing a declarative schema change",
tableDesc.GetName(),
)
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,6 @@ func (s *TestState) ValidateForwardIndexes(
_ context.Context,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
_ bool,
_ bool,
_ sessiondata.InternalExecutorOverride,
) error {
ids := make([]descpb.IndexID, len(indexes))
Expand All @@ -850,7 +848,6 @@ func (s *TestState) ValidateInvertedIndexes(
_ context.Context,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
_ bool,
_ sessiondata.InternalExecutorOverride,
) error {
ids := make([]descpb.IndexID, len(indexes))
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/nstree",
"//pkg/sql/schemachanger/scexec/descriptorutils",
"//pkg/sql/schemachanger/scexec/scmutationexec",
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scpb",
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/schemachanger/scexec/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,13 @@ type IndexValidator interface {
ctx context.Context,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
withFirstMutationPublic bool,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error

ValidateInvertedIndexes(
ctx context.Context,
tbl catalog.TableDescriptor,
indexes []catalog.Index,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error
}
Expand Down
13 changes: 0 additions & 13 deletions pkg/sql/schemachanger/scexec/descriptorutils/BUILD.bazel

This file was deleted.

58 changes: 0 additions & 58 deletions pkg/sql/schemachanger/scexec/descriptorutils/helpers.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/exec_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec/descriptorutils"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec/scmutationexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
)

Expand Down Expand Up @@ -50,7 +50,7 @@ func executeIndexBackfillOp(ctx context.Context, deps Dependencies, op *scop.Bac
if !ok {
return catalog.WrapTableDescRefErr(desc.GetID(), catalog.NewDescriptorTypeError(desc))
}
mut, err := descriptorutils.FindMutation(table, descriptorutils.MakeIndexIDMutationSelector(op.IndexID))
mut, err := scmutationexec.FindMutation(table, scmutationexec.MakeIndexIDMutationSelector(op.IndexID))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/exec_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func executeValidateUniqueIndex(
User: security.RootUserName(),
}
if index.GetType() == descpb.IndexDescriptor_FORWARD {
err = deps.IndexValidator().ValidateForwardIndexes(ctx, table, []catalog.Index{index}, true, false, execOverride)
err = deps.IndexValidator().ValidateForwardIndexes(ctx, table, []catalog.Index{index}, execOverride)
} else {
err = deps.IndexValidator().ValidateInvertedIndexes(ctx, table, []catalog.Index{index}, false, execOverride)
err = deps.IndexValidator().ValidateInvertedIndexes(ctx, table, []catalog.Index{index}, execOverride)
}
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/schemachanger/scexec/executor_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ CREATE TABLE db.t (
},
State: descpb.DescriptorMutation_DELETE_ONLY,
Direction: descpb.DescriptorMutation_ADD,
MutationID: mutable.NextMutationID,
MutationID: 1,
})
mutable.NextMutationID++
mutable.NextMutationID = 1
}),
ops: func() []scop.Op {
return []scop.Op{
Expand Down Expand Up @@ -489,8 +489,6 @@ func (noopIndexValidator) ValidateForwardIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
withFirstMutationPublic bool,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
return nil
Expand All @@ -500,7 +498,6 @@ func (noopIndexValidator) ValidateInvertedIndexes(
ctx context.Context,
tableDesc catalog.TableDescriptor,
indexes []catalog.Index,
gatherAllInvalid bool,
override sessiondata.InternalExecutorOverride,
) error {
return nil
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/parser",
"//pkg/sql/schemachanger/scexec/descriptorutils",
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/screl",
Expand Down
Loading

0 comments on commit 077f63d

Please sign in to comment.