Skip to content

Commit

Permalink
workload/schemachange: enable drop column support
Browse files Browse the repository at this point in the history
Previously, drop column support was disabled because we did not properly
detect if a column was referenced by foreign keys. We also did not
properly detect if removing a column would remove indexes needed to
enforce other foreign keys. This patch adds logic if a column or any
indexes referencing this column are used by foreign keys. Additionally,
other operations are tweaked to allow tables without columns (i.e. all
columns dropped only leaving rowid).

Fixes: cockroachdb#127286

Release note: None

<pkg>: <short description - lowercase, no final period>

<what was there before: Previously, ...>
<why it needed to change: This was inadequate because ...>
<what you did about it: To address this, this patch ...>
  • Loading branch information
fqazi committed Aug 6, 2024
1 parent 78fa126 commit 42c707c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 11 deletions.
197 changes: 194 additions & 3 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,150 @@ func (og *operationGenerator) tableHasDependencies(
`, tableName.Object(), tableName.Schema())
}

// columnRemovalWillDropFKBackingIndexes determines if dropping this column
// will lead to no indexes backing a foreign key.
func (og *operationGenerator) columnRemovalWillDropFKBackingIndexes(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columName string,
) (bool, error) {
return og.scanBool(ctx, tx, fmt.Sprintf(`
WITH
fk
AS (
SELECT
oid,
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.confrelid
)
AS base_table,
a.attname AS base_col,
array_position(c.confkey, a.attnum)
AS base_ordinal,
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.conrelid
)
AS referencing_table,
unnest(
(
SELECT
array_agg(attname)
FROM
pg_attribute
WHERE
attrelid = c.conrelid
AND ARRAY[attnum] <@ c.conkey
AND array_position(
c.confkey,
a.attnum
)
= array_position(
c.conkey,
attnum
)
)
)
AS referencing_col
FROM
pg_constraint AS c
JOIN pg_attribute AS a ON
c.confrelid = a.attrelid
AND ARRAY[attnum] <@ c.confkey
WHERE
c.confrelid = $1::REGCLASS::OID
),
valid_indexes
AS (
SELECT
*
FROM
[SHOW INDEXES FROM %s]
WHERE
index_name
NOT IN (
SELECT
DISTINCT index_name
FROM
[SHOW INDEXES FROM %s]
WHERE
column_name = $2
AND index_name
!= table_name || '_pkey'
)
),
matching_indexes
AS (
SELECT
oid,
index_name,
count(base_ordinal) AS count_base_ordinal,
count(seq_in_index) AS count_seq_in_index
FROM
fk, valid_indexes
WHERE
storing = 'f'
AND non_unique = 'f'
AND base_col = column_name
GROUP BY
(oid, index_name)
),
valid_index_attrib_count
AS (
SELECT
index_name,
max(seq_in_index) AS max_seq_in_index
FROM
valid_indexes
WHERE
storing = 'f'
GROUP BY
index_name
),
valid_fk_count
AS (
SELECT
oid, max(base_ordinal) AS max_base_ordinal
FROM
fk
GROUP BY
fk
),
matching_fks
AS (
SELECT
DISTINCT f.oid
FROM
valid_index_attrib_count AS i,
valid_fk_count AS f,
matching_indexes AS m
WHERE
f.oid = m.oid
AND i.index_name = m.index_name
AND i.max_seq_in_index
= m.count_seq_in_index
AND f.max_base_ordinal
= m.count_base_ordinal
)
SELECT
EXISTS(
SELECT
*
FROM
fk
WHERE
oid NOT IN (SELECT oid FROM matching_fks)
);
`, tableName.String(), tableName.String()), tableName.String(), columName)
}

func (og *operationGenerator) columnIsDependedOn(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
Expand All @@ -142,6 +286,9 @@ func (og *operationGenerator) columnIsDependedOn(
// stored as a list of numbers in a string, so SQL functions are used to parse these values
// into arrays. unnest is used to flatten rows with this column of array type into multiple rows,
// so performing unions and joins is easier.
//
// To check if any foreign key references exist to this table, we use pg_constraint
// and check if any columns are dependent.
return og.scanBool(ctx, tx, `SELECT EXISTS(
SELECT source.column_id
FROM (
Expand Down Expand Up @@ -178,7 +325,52 @@ func (og *operationGenerator) columnIsDependedOn(
AND table_name = $3
AND column_name = $4
) AS source ON source.column_id = cons.column_id
)`, tableName.String(), tableName.Schema(), tableName.Object(), columnName)
)
OR EXISTS(
-- Check for foreign key references
SELECT * FROM (
SELECT
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.confrelid
)
AS base_table,
a.attname AS base_col,
(
SELECT
r.relname
FROM
pg_class AS r
WHERE
r.oid = c.conrelid
)
AS referencing_table,
unnest(
(
SELECT
array_agg(attname)
FROM
pg_attribute
WHERE
attrelid = c.conrelid
AND ARRAY[attnum] <@ c.conkey
)
)
AS referencing_col
FROM
pg_constraint AS c
JOIN pg_attribute AS a ON
c.confrelid = a.attrelid
AND a.attnum = ANY (confkey)
WHERE
c.conrelid = $1::REGCLASS::OID
) WHERE base_col=$4
)
`, tableName.String(), tableName.Schema(), tableName.Object(), columnName)
}

// colIsRefByComputed determines if a column is referenced by a computed column.
Expand Down Expand Up @@ -1297,7 +1489,7 @@ func (og *operationGenerator) violatesFkConstraintsHelper(
return og.scanBool(ctx, tx, q)
}

func (og *operationGenerator) columnIsInDroppingIndex(
func (og *operationGenerator) columnIsInAddingOrDroppingIndex(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return og.scanBool(ctx, tx, `
Expand All @@ -1312,7 +1504,6 @@ SELECT EXISTS(
= indexes.index_id
AND table_id = $1::REGCLASS
AND type = 'INDEX'
AND direction = 'DROP'
);
`, tableName.String(), columnName)
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,10 +1359,14 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
// If the table does not exist, columns cannot be fetched from it. For this reason, the placeholder
// "IrrelevantColumnName" is used, and a pgcode.UndefinedTable error is expected on execution.
if tableExists {
columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String())
columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String(), true /*allowEmpty*/)
if err != nil {
return nil, err
}
// Table has no columns, so use an internal one here.
if len(columnNamesForTable) == 0 {
columnNamesForTable = []string{"crdb_internal_mvcc_timestamp"}
}
columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))]

for j := range columnNamesForTable {
Expand Down Expand Up @@ -1488,10 +1492,14 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (*opStm
// If the table does not exist, columns cannot be fetched from it. For this reason, the placeholder
// "IrrelevantColumnName" is used, and a pgcode.UndefinedTable error is expected on execution.
if tableExists {
columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String())
columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String(), true /*allowEmpty*/)
if err != nil {
return nil, err
}
// Table has no columns, so use an internal one here.
if len(columnNamesForTable) == 0 {
columnNamesForTable = []string{"crdb_internal_mvcc_timestamp"}
}
columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))]

for j := range columnNamesForTable {
Expand Down Expand Up @@ -1579,7 +1587,11 @@ func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStm
if err != nil {
return nil, err
}
columnIsInDroppingIndex, err := og.columnIsInDroppingIndex(ctx, tx, tableName, columnName)
columnRemovalWillDropFKBackingIndexes, err := og.columnRemovalWillDropFKBackingIndexes(ctx, tx, tableName, columnName)
if err != nil {
return nil, err
}
columnIsInDroppingIndex, err := og.columnIsInAddingOrDroppingIndex(ctx, tx, tableName, columnName)
if err != nil {
return nil, err
}
Expand All @@ -1597,7 +1609,7 @@ func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStm
{code: pgcode.ObjectNotInPrerequisiteState, condition: columnIsInDroppingIndex},
{code: pgcode.UndefinedColumn, condition: !columnExists},
{code: pgcode.InvalidColumnReference, condition: colIsPrimaryKey || colIsRefByComputed},
{code: pgcode.DependentObjectsStillExist, condition: columnIsDependedOn},
{code: pgcode.DependentObjectsStillExist, condition: columnIsDependedOn || columnRemovalWillDropFKBackingIndexes},
{code: pgcode.FeatureNotSupported, condition: hasAlterPKSchemaChange},
})
// TODO(#126967): We need to add a check for the column being in an expression
Expand Down Expand Up @@ -3723,7 +3735,7 @@ ORDER BY random()
}

func (og *operationGenerator) tableColumnsShuffled(
ctx context.Context, tx pgx.Tx, tableName string,
ctx context.Context, tx pgx.Tx, tableName string, allowEmpty bool,
) ([]string, error) {
q := fmt.Sprintf(`
SELECT column_name
Expand Down Expand Up @@ -3753,8 +3765,8 @@ FROM [SHOW COLUMNS FROM %s];
og.params.rng.Shuffle(len(columnNames), func(i, j int) {
columnNames[i], columnNames[j] = columnNames[j], columnNames[i]
})

if len(columnNames) <= 0 {
// Return an error if empty column names are not allowed.
if len(columnNames) == 0 && !allowEmpty {
return nil, errors.Errorf("table %s has no columns", tableName)
}
return columnNames, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/schemachange/optype.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ var opWeights = []int{
alterTableAddConstraintUnique: 0,
alterTableAlterColumnType: 0, // Disabled and tracked with #66662.
alterTableAlterPrimaryKey: 1,
alterTableDropColumn: 0, // Disabled and tracked with #127286.
alterTableDropColumn: 1,
alterTableDropColumnDefault: 1,
alterTableDropConstraint: 0, // Disabled and tracked with #127273.
alterTableDropNotNull: 1,
Expand Down

0 comments on commit 42c707c

Please sign in to comment.