Skip to content

Commit

Permalink
workload/schemachange: enable drop column support
Browse files Browse the repository at this point in the history
Previously, drop column support was disabled because we did not properly
detect if a column was referenced by foreign keys. We also did not
properly detect if removing a column would remove indexes needed to
enforce other foreign keys. This patch adds logic if a column or any
indexes referencing this column are used by foreign keys. Additionally,
other operations are tweaked to allow tables without columns (i.e. all
columns dropped only leaving rowid).

Fixes: cockroachdb#127286
  • Loading branch information
fqazi committed Aug 8, 2024
1 parent 78fa126 commit 13c94dc
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 12 deletions.
153 changes: 150 additions & 3 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,150 @@ func (og *operationGenerator) tableHasDependencies(
`, tableName.Object(), tableName.Schema())
}

// columnRemovalWillDropFKBackingIndexes determines if dropping this column
// will lead to no indexes backing a foreign key.
func (og *operationGenerator) columnRemovalWillDropFKBackingIndexes(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columName string,
) (bool, error) {
return og.scanBool(ctx, tx, fmt.Sprintf(`
WITH
fk
AS (
SELECT
oid,
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.confrelid
)
AS base_table,
a.attname AS base_col,
array_position(c.confkey, a.attnum)
AS base_ordinal,
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.conrelid
)
AS referencing_table,
unnest(
(
SELECT
array_agg(attname)
FROM
pg_attribute
WHERE
attrelid = c.conrelid
AND ARRAY[attnum] <@ c.conkey
AND array_position(
c.confkey,
a.attnum
)
= array_position(
c.conkey,
attnum
)
)
)
AS referencing_col
FROM
pg_constraint AS c
JOIN pg_attribute AS a ON
c.confrelid = a.attrelid
AND ARRAY[attnum] <@ c.confkey
WHERE
c.confrelid = $1::REGCLASS::OID
),
valid_indexes
AS (
SELECT
*
FROM
[SHOW INDEXES FROM %s]
WHERE
index_name
NOT IN (
SELECT
DISTINCT index_name
FROM
[SHOW INDEXES FROM %s]
WHERE
column_name = $2
AND index_name
!= table_name || '_pkey'
)
),
matching_indexes
AS (
SELECT
oid,
index_name,
count(base_ordinal) AS count_base_ordinal,
count(seq_in_index) AS count_seq_in_index
FROM
fk, valid_indexes
WHERE
storing = 'f'
AND non_unique = 'f'
AND base_col = column_name
GROUP BY
(oid, index_name)
),
valid_index_attrib_count
AS (
SELECT
index_name,
max(seq_in_index) AS max_seq_in_index
FROM
valid_indexes
WHERE
storing = 'f'
GROUP BY
index_name
),
valid_fk_count
AS (
SELECT
oid, max(base_ordinal) AS max_base_ordinal
FROM
fk
GROUP BY
fk
),
matching_fks
AS (
SELECT
DISTINCT f.oid
FROM
valid_index_attrib_count AS i,
valid_fk_count AS f,
matching_indexes AS m
WHERE
f.oid = m.oid
AND i.index_name = m.index_name
AND i.max_seq_in_index
= m.count_seq_in_index
AND f.max_base_ordinal
= m.count_base_ordinal
)
SELECT
EXISTS(
SELECT
*
FROM
fk
WHERE
oid NOT IN (SELECT oid FROM matching_fks)
);
`, tableName.String(), tableName.String()), tableName.String(), columName)
}

func (og *operationGenerator) columnIsDependedOn(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
Expand All @@ -142,6 +286,9 @@ func (og *operationGenerator) columnIsDependedOn(
// stored as a list of numbers in a string, so SQL functions are used to parse these values
// into arrays. unnest is used to flatten rows with this column of array type into multiple rows,
// so performing unions and joins is easier.
//
// To check if any foreign key references exist to this table, we use pg_constraint
// and check if any columns are dependent.
return og.scanBool(ctx, tx, `SELECT EXISTS(
SELECT source.column_id
FROM (
Expand Down Expand Up @@ -178,7 +325,8 @@ func (og *operationGenerator) columnIsDependedOn(
AND table_name = $3
AND column_name = $4
) AS source ON source.column_id = cons.column_id
)`, tableName.String(), tableName.Schema(), tableName.Object(), columnName)
)
`, tableName.String(), tableName.Schema(), tableName.Object(), columnName)
}

// colIsRefByComputed determines if a column is referenced by a computed column.
Expand Down Expand Up @@ -1297,7 +1445,7 @@ func (og *operationGenerator) violatesFkConstraintsHelper(
return og.scanBool(ctx, tx, q)
}

func (og *operationGenerator) columnIsInDroppingIndex(
func (og *operationGenerator) columnIsInAddingOrDroppingIndex(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return og.scanBool(ctx, tx, `
Expand All @@ -1312,7 +1460,6 @@ SELECT EXISTS(
= indexes.index_id
AND table_id = $1::REGCLASS
AND type = 'INDEX'
AND direction = 'DROP'
);
`, tableName.String(), columnName)
}
Expand Down
30 changes: 22 additions & 8 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,14 +1363,24 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
if err != nil {
return nil, err
}
// Table has no columns, so use an internal one here.
usingInternalColumn := false
if len(columnNamesForTable) == 0 {
columnNamesForTable = []string{"crdb_internal_mvcc_timestamp"}
usingInternalColumn = true
}
columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))]

for j := range columnNamesForTable {
colItem := tree.ColumnItem{
TableName: tableName.(*tree.TableName).ToUnresolvedObjectName(),
ColumnName: tree.Name(columnNamesForTable[j]),
}
selectStatement.Exprs = append(selectStatement.Exprs, tree.SelectExpr{Expr: &colItem})
selectExpr := tree.SelectExpr{Expr: &colItem}
if usingInternalColumn {
selectExpr.As = tree.UnrestrictedName(fmt.Sprintf("%s_%d", colItem.TableName.Object(), j))
}
selectStatement.Exprs = append(selectStatement.Exprs, selectExpr)

if _, exists := uniqueColumnNames[columnNamesForTable[j]]; exists {
duplicateColumns = true
Expand Down Expand Up @@ -1492,6 +1502,10 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (*opStm
if err != nil {
return nil, err
}
// Table has no columns, so use an internal one here.
if len(columnNamesForTable) == 0 {
columnNamesForTable = []string{"crdb_internal_mvcc_timestamp"}
}
columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))]

for j := range columnNamesForTable {
Expand Down Expand Up @@ -1579,7 +1593,11 @@ func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStm
if err != nil {
return nil, err
}
columnIsInDroppingIndex, err := og.columnIsInDroppingIndex(ctx, tx, tableName, columnName)
columnRemovalWillDropFKBackingIndexes, err := og.columnRemovalWillDropFKBackingIndexes(ctx, tx, tableName, columnName)
if err != nil {
return nil, err
}
columnIsInAddingOrDroppingIndex, err := og.columnIsInAddingOrDroppingIndex(ctx, tx, tableName, columnName)
if err != nil {
return nil, err
}
Expand All @@ -1594,10 +1612,10 @@ func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStm

stmt := makeOpStmt(OpStmtDDL)
stmt.expectedExecErrors.addAll(codesWithConditions{
{code: pgcode.ObjectNotInPrerequisiteState, condition: columnIsInDroppingIndex},
{code: pgcode.ObjectNotInPrerequisiteState, condition: columnIsInAddingOrDroppingIndex},
{code: pgcode.UndefinedColumn, condition: !columnExists},
{code: pgcode.InvalidColumnReference, condition: colIsPrimaryKey || colIsRefByComputed},
{code: pgcode.DependentObjectsStillExist, condition: columnIsDependedOn},
{code: pgcode.DependentObjectsStillExist, condition: columnIsDependedOn || columnRemovalWillDropFKBackingIndexes},
{code: pgcode.FeatureNotSupported, condition: hasAlterPKSchemaChange},
})
// TODO(#126967): We need to add a check for the column being in an expression
Expand Down Expand Up @@ -3753,10 +3771,6 @@ FROM [SHOW COLUMNS FROM %s];
og.params.rng.Shuffle(len(columnNames), func(i, j int) {
columnNames[i], columnNames[j] = columnNames[j], columnNames[i]
})

if len(columnNames) <= 0 {
return nil, errors.Errorf("table %s has no columns", tableName)
}
return columnNames, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/schemachange/optype.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ var opWeights = []int{
alterTableAddConstraintUnique: 0,
alterTableAlterColumnType: 0, // Disabled and tracked with #66662.
alterTableAlterPrimaryKey: 1,
alterTableDropColumn: 0, // Disabled and tracked with #127286.
alterTableDropColumn: 1,
alterTableDropColumnDefault: 1,
alterTableDropConstraint: 0, // Disabled and tracked with #127273.
alterTableDropNotNull: 1,
Expand Down

0 comments on commit 13c94dc

Please sign in to comment.