Skip to content

Commit

Permalink
sql: use declarative schema changer for add column with unique
Browse files Browse the repository at this point in the history
Previously, the declarative schema changer was disabled
when adding a column with unique constraint, since we
didn't have a complete set of elements / logic for adding
secondary indexes. This was inadequate because operations
would not ordered correctly and rollbacks could potentially
break. To address this, this patch enables support and
addresses the missing pieces to setup the secondary indexes
correctly, such as adding suffix columns, ordering operations
correctly, and returning appropriate errors.

Release note: None
  • Loading branch information
fqazi committed Jun 6, 2022
1 parent 2268b1e commit 4409277
Show file tree
Hide file tree
Showing 17 changed files with 1,058 additions and 172 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func createImportingDescriptors(

// Finally, clean up / update any schema changer state inside descriptors
// globally.
if err := rewrite.SchemaChangerStateInDescs(allMutableDescs); err != nil {
if err := rewrite.CleanSchemaChangerStateInDescs(allMutableDescs); err != nil {
return nil, nil, err
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,11 +1691,19 @@ func ValidateForwardIndexes(
invalid <- idx.GetID()
return nil
}
// Resolve the table index descriptor name.
indexName, err := tableDesc.GetIndexNameByID(idx.GetID())
if err != nil {
log.Warningf(ctx,
"unable to find index by ID for ValidateForwardIndexes: %d",
idx.GetID())
indexName = idx.GetName()
}
// TODO(vivek): find the offending row and include it in the error.
return pgerror.WithConstraintName(pgerror.Newf(pgcode.UniqueViolation,
"duplicate key value violates unique constraint %q",
idx.GetName()),
idx.GetName())
indexName),
indexName)

}
case <-ctx.Done():
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,12 @@ type TableDescriptor interface {
// GetAutoStatsSettings returns the table settings related to automatic
// statistics collection. May return nil if none are set.
GetAutoStatsSettings() *catpb.AutoStatsSettings
// GetIndexNameByID returns the name of an index based on an ID, taking into
// account any ongoing declarative schema changes. Declarative schema changes
// do not propagate the index name into the mutations until changes are fully
// validated and swap operations are complete (to avoid having two constraints
// with the same name).
GetIndexNameByID(indexID descpb.IndexID) (name string, err error)
}

// MutableTableDescriptor is both a MutableDescriptor and a TableDescriptor.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/rewrite/rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ func rewriteIDsInTypesT(typ *types.T, descriptorRewrites jobspb.DescRewriteMap)
return nil
}

// SchemaChangerStateInDescs goes over all mutable descriptors and cleans any
// CleanSchemaChangerStateInDescs goes over all mutable descriptors and cleans any
// empty state information.
func SchemaChangerStateInDescs(descriptors []catalog.MutableDescriptor) error {
func CleanSchemaChangerStateInDescs(descriptors []catalog.MutableDescriptor) error {
nonEmptyJobs := make(map[jobspb.JobID]struct{})
// Track all the schema changer states that have a non-empty job associated
// withe them.
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,6 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er
if err != nil {
return err
}

var compositeColIDs catalog.TableColSet
for i := range desc.Columns {
col := &desc.Columns[i]
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -591,3 +592,24 @@ func (desc *wrapper) getExistingOrNewMutationCache() *mutationCache {
func (desc *wrapper) AllMutations() []catalog.Mutation {
return desc.getExistingOrNewMutationCache().all
}

func (desc *wrapper) GetIndexNameByID(indexID descpb.IndexID) (string, error) {
// Check if there are any ongoing schema changes and prefer the name from
// them.
if scState := desc.GetDeclarativeSchemaChangerState(); scState != nil {
for _, target := range scState.Targets {
if target.IndexName != nil &&
target.TargetStatus == scpb.Status_PUBLIC &&
target.IndexName.TableID == desc.GetID() &&
target.IndexName.IndexID == indexID {
return target.IndexName.Name, nil
}
}
}
// Otherwise, try fetching the name from the index descriptor.
index, err := desc.FindIndexWithID(indexID)
if err != nil {
return "", err
}
return index.GetName(), err
}
14 changes: 11 additions & 3 deletions pkg/sql/row/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -129,14 +130,21 @@ func NewUniquenessConstraintViolationError(
return pgerror.Wrap(err, pgcode.UniqueViolation,
"duplicate key value got decoding error")
}

// Resolve the table index descriptor name.
indexName, err := tableDesc.GetIndexNameByID(index.GetID())
if err != nil {
log.Warningf(ctx,
"unable to find index by ID for NewUniquenessConstraintViolationError: %d",
index.GetID())
indexName = index.GetName()
}
// Exclude implicit partitioning columns and hash sharded index columns from
// the error message.
skipCols := index.ExplicitColumnStartIdx()
return errors.WithDetail(
pgerror.WithConstraintName(pgerror.Newf(pgcode.UniqueViolation,
"duplicate key value violates unique constraint %q", index.GetName(),
), index.GetName()),
"duplicate key value violates unique constraint %q", indexName,
), indexName),
fmt.Sprintf(
"Key (%s)=(%s) already exists.",
strings.Join(names[skipCols:], ","),
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/schemachanger/scbuild/builder_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ func (b *builderState) nextIndexID(id catid.DescID) (ret catid.IndexID) {
ret = index.IndexID + 1
}
})
scpb.ForEachTemporaryIndex(b, func(_ scpb.Status, _ scpb.TargetStatus, index *scpb.TemporaryIndex) {
if index.TableID == id && index.IndexID >= ret {
ret = index.IndexID + 1
}
})
scpb.ForEachSecondaryIndex(b, func(_ scpb.Status, _ scpb.TargetStatus, index *scpb.SecondaryIndex) {
if index.TableID == id && index.IndexID >= ret {
ret = index.IndexID + 1
Expand Down
Loading

0 comments on commit 4409277

Please sign in to comment.