Skip to content

Commit

Permalink
sql: check equivalent constraint when creating hash index
Browse files Browse the repository at this point in the history
Fixes #68031

Previously we only try to create constraint for shard column if
it's newly created. We check duplicate constraint for shard
column when `Alter Primary Key` and `Create Index`, however the
check is simply a name check. This pr adds logic to check
equivalent constraint by checking if the formatted expression
string is the same. With this logic we can try to create the
constraint no matter a shard column is newly created or not. With
this fix, we also don't need to expose the constraint through
`SHOW CREATE TABLE` result since we make sure the constraint is
created or skip if one already exists.

Release note (sql change): Constraint on the shard column of hash
index was exposed to user through `SHOW CREATE TABLE` to make sure
constraint exists when user recreate a same table using the
returned statement. This is not needed anymore with a guarantee
that the constraint always exists for the shard column.
  • Loading branch information
chengxiong-ruan committed Jan 11, 2022
1 parent c02266f commit 0bfea0f
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 145 deletions.
18 changes: 8 additions & 10 deletions pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (p *planner) AlterPrimaryKey(
// If the new index is requested to be sharded, set up the index descriptor
// to be sharded, and add the new shard column if it is missing.
if alterPKNode.Sharded != nil {
shardCol, newColumns, newColumn, err := setupShardedIndex(
shardCol, newColumns, err := setupShardedIndex(
ctx,
p.EvalContext(),
&p.semaCtx,
Expand All @@ -186,15 +186,13 @@ func (p *planner) AlterPrimaryKey(
return err
}
alterPKNode.Columns = newColumns
if newColumn {
if err := p.setupConstraintForShard(
ctx,
tableDesc,
shardCol,
newPrimaryIndexDesc.Sharded.ShardBuckets,
); err != nil {
return err
}
if err := p.maybeSetupConstraintForShard(
ctx,
tableDesc,
shardCol,
newPrimaryIndexDesc.Sharded.ShardBuckets,
); err != nil {
return err
}
telemetry.Inc(sqltelemetry.HashShardedIndexCounter)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,6 @@ CREATE TABLE system.statement_statistics (
metadata,
statistics,
plan
),
CONSTRAINT check_crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 CHECK (
crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 IN (
0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8, 5:::INT8, 6:::INT8, 7:::INT8
)
)
)
`
Expand Down Expand Up @@ -554,11 +549,6 @@ CREATE TABLE system.transaction_statistics (
agg_interval,
metadata,
statistics
),
CONSTRAINT check_crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 CHECK (
crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 IN (
0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8, 5:::INT8, 6:::INT8, 7:::INT8
)
)
);
`
Expand Down
70 changes: 33 additions & 37 deletions pkg/sql/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ func (p *planner) CreateIndex(ctx context.Context, n *tree.CreateIndex) (planNod
return &createIndexNode{tableDesc: tableDesc, n: n}, nil
}

// setupConstraintForShard adds a check constraint ensuring that the shard
// maybeSetupConstraintForShard adds a check constraint ensuring that the shard
// column's value is within [0..ShardBuckets-1]. This method is called when a
// `CREATE INDEX`/`ALTER PRIMARY KEY` statement is issued for the creation of a
// sharded index that *does not* re-use a pre-existing shard column.
func (p *planner) setupConstraintForShard(
func (p *planner) maybeSetupConstraintForShard(
ctx context.Context, tableDesc *tabledesc.Mutable, shardCol catalog.Column, buckets int32,
) error {
// Assign an ID to the newly-added shard column, which is needed for the creation
Expand All @@ -104,31 +104,26 @@ func (p *planner) setupConstraintForShard(
if err != nil {
return err
}
info, err := tableDesc.GetConstraintInfo()
ckBuilder := schemaexpr.MakeCheckConstraintBuilder(ctx, p.tableName, tableDesc, &p.semaCtx)
ckDesc, err := ckBuilder.Build(ckDef)
if err != nil {
return err
}

inuseNames := make(map[string]struct{}, len(info))
for k := range info {
inuseNames[k] = struct{}{}
}

ckBuilder := schemaexpr.MakeCheckConstraintBuilder(ctx, p.tableName, tableDesc, &p.semaCtx)
ckName, err := ckBuilder.DefaultName(ckDef.Expr)
curConstraintInfos, err := tableDesc.GetConstraintInfo()
if err != nil {
return err
}

// Avoid creating duplicate check constraints.
if _, ok := inuseNames[ckName]; !ok {
ck, err := ckBuilder.Build(ckDef)
if err != nil {
return err
for _, info := range curConstraintInfos {
if info.CheckConstraint != nil && info.CheckConstraint.Expr == ckDesc.Expr {
return nil
}
ck.Validity = descpb.ConstraintValidity_Validating
tableDesc.AddCheckMutation(ck, descpb.DescriptorMutation_ADD)
}

ckDesc.Validity = descpb.ConstraintValidity_Validating
tableDesc.AddCheckMutation(ckDesc, descpb.DescriptorMutation_ADD)
return nil
}

Expand Down Expand Up @@ -225,7 +220,7 @@ func makeIndexDescriptor(
if tableDesc.IsLocalityRegionalByRow() {
return nil, hashShardedIndexesOnRegionalByRowError()
}
shardCol, newColumns, newColumn, err := setupShardedIndex(
shardCol, newColumns, err := setupShardedIndex(
params.ctx,
params.EvalContext(),
&params.p.semaCtx,
Expand All @@ -239,10 +234,10 @@ func makeIndexDescriptor(
return nil, err
}
columns = newColumns
if newColumn {
if err := params.p.setupConstraintForShard(params.ctx, tableDesc, shardCol, indexDesc.Sharded.ShardBuckets); err != nil {
return nil, err
}
if err := params.p.maybeSetupConstraintForShard(
params.ctx, tableDesc, shardCol, indexDesc.Sharded.ShardBuckets,
); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -480,9 +475,10 @@ var hashShardedIndexesDisabledError = pgerror.Newf(pgcode.FeatureNotSupported,
"hash sharded indexes require the experimental_enable_hash_sharded_indexes session variable")

// setupShardedIndex creates a shard column for the given index descriptor. It
// returns the shard column, the new column list for the index, and a boolean
// which is true if the shard column was newly created. If the shard column is
// new, it is added to tableDesc.
// returns the shard column and the new column list for the index. If the shard
// column is new, either of the following happens:
// (1) the column is added to tableDesc if it's a new table;
// (2) a column mutation is added to tableDesc if the table is not new.
func setupShardedIndex(
ctx context.Context,
evalCtx *tree.EvalContext,
Expand All @@ -493,9 +489,9 @@ func setupShardedIndex(
tableDesc *tabledesc.Mutable,
indexDesc *descpb.IndexDescriptor,
isNewTable bool,
) (shard catalog.Column, newColumns tree.IndexElemList, newColumn bool, err error) {
) (shard catalog.Column, newColumns tree.IndexElemList, err error) {
if !shardedIndexEnabled {
return nil, nil, false, hashShardedIndexesDisabledError
return nil, nil, hashShardedIndexesDisabledError
}

colNames := make([]string, 0, len(columns))
Expand All @@ -504,12 +500,13 @@ func setupShardedIndex(
}
buckets, err := tabledesc.EvalShardBucketCount(ctx, semaCtx, evalCtx, bucketsExpr)
if err != nil {
return nil, nil, false, err
return nil, nil, err
}
shardCol, newColumn, err := maybeCreateAndAddShardCol(int(buckets), tableDesc,
shardCol, err := maybeCreateAndAddShardCol(int(buckets), tableDesc,
colNames, isNewTable)

if err != nil {
return nil, nil, false, err
return nil, nil, err
}
shardIdxElem := tree.IndexElem{
Column: tree.Name(shardCol.GetName()),
Expand All @@ -522,18 +519,18 @@ func setupShardedIndex(
ShardBuckets: buckets,
ColumnNames: colNames,
}
return shardCol, newColumns, newColumn, nil
return shardCol, newColumns, nil
}

// maybeCreateAndAddShardCol adds a new hidden computed shard column (or its mutation) to
// `desc`, if one doesn't already exist for the given index column set and number of shard
// buckets.
func maybeCreateAndAddShardCol(
shardBuckets int, desc *tabledesc.Mutable, colNames []string, isNewTable bool,
) (col catalog.Column, created bool, err error) {
) (col catalog.Column, err error) {
shardColDesc, err := makeShardColumnDesc(colNames, shardBuckets)
if err != nil {
return nil, false, err
return nil, err
}
existingShardCol, err := desc.FindColumnWithName(tree.Name(shardColDesc.Name))
if err == nil && !existingShardCol.Dropped() {
Expand All @@ -543,25 +540,24 @@ func maybeCreateAndAddShardCol(
if !existingShardCol.IsHidden() {
// The user managed to reverse-engineer our crazy shard column name, so
// we'll return an error here rather than try to be tricky.
return nil, false, pgerror.Newf(pgcode.DuplicateColumn,
return nil, pgerror.Newf(pgcode.DuplicateColumn,
"column %s already specified; can't be used for sharding", shardColDesc.Name)
}
return existingShardCol, false, nil
return existingShardCol, nil
}
columnIsUndefined := sqlerrors.IsUndefinedColumnError(err)
if err != nil && !columnIsUndefined {
return nil, false, err
return nil, err
}
if columnIsUndefined || existingShardCol.Dropped() {
if isNewTable {
desc.AddColumn(shardColDesc)
} else {
desc.AddColumnMutation(shardColDesc, descpb.DescriptorMutation_ADD)
}
created = true
}
shardCol, err := desc.FindColumnWithName(tree.Name(shardColDesc.Name))
return shardCol, created, err
return shardCol, err
}

func (n *createIndexNode) startExec(params runParams) error {
Expand Down
45 changes: 33 additions & 12 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,7 +1485,7 @@ func NewTableDesc(
if err != nil {
return nil, err
}
shardCol, _, err := maybeCreateAndAddShardCol(int(buckets), &desc,
shardCol, err := maybeCreateAndAddShardCol(int(buckets), &desc,
[]string{string(d.Name)}, true, /* isNewTable */
)
if err != nil {
Expand Down Expand Up @@ -1597,7 +1597,7 @@ func NewTableDesc(
if n.PartitionByTable.ContainsPartitions() {
return nil, pgerror.New(pgcode.FeatureNotSupported, "sharded indexes don't support partitioning")
}
shardCol, newColumns, newColumn, err := setupShardedIndex(
shardCol, newColumns, err := setupShardedIndex(
ctx,
evalCtx,
semaCtx,
Expand All @@ -1610,18 +1610,39 @@ func NewTableDesc(
if err != nil {
return nil, err
}
if newColumn {
buckets, err := tabledesc.EvalShardBucketCount(ctx, semaCtx, evalCtx, d.Sharded.ShardBuckets)
if err != nil {
return nil, err
}
checkConstraint, err := makeShardCheckConstraintDef(int(buckets), shardCol)
if err != nil {
return nil, err

buckets, err := tabledesc.EvalShardBucketCount(ctx, semaCtx, evalCtx, d.Sharded.ShardBuckets)
if err != nil {
return nil, err
}
checkConstraint, err := makeShardCheckConstraintDef(int(buckets), shardCol)
if err != nil {
return nil, err
}

// If there is an equivalent check constraint from the CREATE TABLE (should
// be rare since we hide the constraint of shard column), we don't create a
// duplicate one.
ckBuilder := schemaexpr.MakeCheckConstraintBuilder(ctx, n.Table, &desc, semaCtx)
checkConstraintDesc, err := ckBuilder.Build(checkConstraint)
if err != nil {
return nil, err
}
for _, def := range n.Defs {
if inputCheckConstraint, ok := def.(*tree.CheckConstraintTableDef); ok {
inputCheckConstraintDesc, err := ckBuilder.Build(inputCheckConstraint)
if err != nil {
return nil, err
}
if checkConstraintDesc.Expr == inputCheckConstraintDesc.Expr {
return newColumns, nil
}
}
n.Defs = append(n.Defs, checkConstraint)
cdd = append(cdd, nil)
}

n.Defs = append(n.Defs, checkConstraint)
cdd = append(cdd, nil)

return newColumns, nil
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/sql/logictest/testdata/logic_test/alter_primary_key
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ t CREATE TABLE public.t (
UNIQUE INDEX i5 (w ASC) STORING (y),
INVERTED INDEX i6 (v),
INDEX i7 (z ASC) USING HASH WITH BUCKET_COUNT = 4,
FAMILY fam_0_x_y_z_w_v (x, y, z, w, v),
CONSTRAINT check_crdb_internal_z_shard_4 CHECK (crdb_internal_z_shard_4 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8))
FAMILY fam_0_x_y_z_w_v (x, y, z, w, v)
)

# Test that the indexes we expect got rewritten. All but i3 should have been rewritten,
Expand Down Expand Up @@ -368,9 +367,7 @@ t CREATE TABLE public.t (
CONSTRAINT t_pkey PRIMARY KEY (y ASC) USING HASH WITH BUCKET_COUNT = 10,
UNIQUE INDEX t_x_key (x ASC),
INDEX i1 (z ASC) USING HASH WITH BUCKET_COUNT = 5,
FAMILY fam_0_x_y_z (x, y, z),
CONSTRAINT check_crdb_internal_z_shard_5 CHECK (crdb_internal_z_shard_5 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8)),
CONSTRAINT check_crdb_internal_y_shard_10 CHECK (crdb_internal_y_shard_10 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8, 5:::INT8, 6:::INT8, 7:::INT8, 8:::INT8, 9:::INT8))
FAMILY fam_0_x_y_z (x, y, z)
)

query T
Expand Down Expand Up @@ -429,8 +426,7 @@ t CREATE TABLE public.t (
CONSTRAINT t_pkey PRIMARY KEY (y ASC),
UNIQUE INDEX t_x_key (x ASC) USING HASH WITH BUCKET_COUNT = 5,
INDEX i (z ASC),
FAMILY fam_0_x_y_z (x, y, z),
CONSTRAINT check_crdb_internal_x_shard_5 CHECK (crdb_internal_x_shard_5 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8))
FAMILY fam_0_x_y_z (x, y, z)
)

query III
Expand Down Expand Up @@ -556,8 +552,7 @@ t CREATE TABLE public.t (
rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(),
crdb_internal_x_shard_4 INT4 NOT VISIBLE NOT NULL AS (mod(fnv32(crdb_internal.datums_to_bytes(x)), 4:::INT8)) VIRTUAL,
CONSTRAINT t_pkey PRIMARY KEY (x ASC) USING HASH WITH BUCKET_COUNT = 4,
FAMILY "primary" (x, rowid),
CONSTRAINT check_crdb_internal_x_shard_4 CHECK (crdb_internal_x_shard_4 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8))
FAMILY "primary" (x, rowid)
)

statement ok
Expand Down Expand Up @@ -950,9 +945,7 @@ t CREATE TABLE public.t (
x INT8 NOT NULL,
crdb_internal_x_shard_3 INT4 NOT VISIBLE NOT NULL AS (mod(fnv32(crdb_internal.datums_to_bytes(x)), 3:::INT8)) VIRTUAL,
CONSTRAINT t_pkey PRIMARY KEY (x ASC) USING HASH WITH BUCKET_COUNT = 3,
FAMILY "primary" (x),
CONSTRAINT check_crdb_internal_x_shard_2 CHECK (crdb_internal_x_shard_2 IN (0:::INT8, 1:::INT8)),
CONSTRAINT check_crdb_internal_x_shard_3 CHECK (crdb_internal_x_shard_3 IN (0:::INT8, 1:::INT8, 2:::INT8))
FAMILY "primary" (x)
)

# Changes on a hash sharded index that change the columns will cause the old
Expand All @@ -972,9 +965,7 @@ t CREATE TABLE public.t (
crdb_internal_y_shard_2 INT4 NOT VISIBLE NOT NULL AS (mod(fnv32(crdb_internal.datums_to_bytes(y)), 2:::INT8)) VIRTUAL,
CONSTRAINT t_pkey PRIMARY KEY (y ASC) USING HASH WITH BUCKET_COUNT = 2,
UNIQUE INDEX t_x_key (x ASC) USING HASH WITH BUCKET_COUNT = 2,
FAMILY fam_0_x_y (x, y),
CONSTRAINT check_crdb_internal_x_shard_2 CHECK (crdb_internal_x_shard_2 IN (0:::INT8, 1:::INT8)),
CONSTRAINT check_crdb_internal_y_shard_2 CHECK (crdb_internal_y_shard_2 IN (0:::INT8, 1:::INT8))
FAMILY fam_0_x_y (x, y)
)

# Regression for #49079.
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/logictest/testdata/logic_test/create_table
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ like_hash CREATE TABLE public.like_hash (
rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(),
CONSTRAINT like_hash_base_pkey PRIMARY KEY (rowid ASC),
INDEX like_hash_base_a_idx (a ASC) USING HASH WITH BUCKET_COUNT = 4,
FAMILY "primary" (a, rowid),
CONSTRAINT check_crdb_internal_a_shard_4 CHECK (crdb_internal_a_shard_4 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8))
FAMILY "primary" (a, rowid)
)

statement ok
Expand Down
Loading

0 comments on commit 0bfea0f

Please sign in to comment.