Skip to content

Commit

Permalink
Merge #56381
Browse files Browse the repository at this point in the history
56381: tabledesc: add validation for partial index predicates, check constraint expressions, and computed column expressions r=ajwerner a=mgartner

#### tabledesc: validate partial index predicate expressions

This commit adds validation to ensure that partial index predicate
expressions do not reference nonexistent columns.

I tested this validation ad-hoc by commenting-out code that renames
columns in partial index predicate expressions during an
`ALTER TABLE ... RENAME COLUMN` statement, and verifying that the
correct error was returned.

Fixes #51083

Release note: None

#### tabledesc: validate check constraint expressions

This commit adds validation for check constraints defined on a table
descriptor. The validation includes verifying that check constraints
exist in the table and that check constraint expressions do not
reference non-existent columns.

Informs #50854

Release note: None

#### tabledesc: move column validation into helper function

Release note: None

#### tabledesc: validate computed column expressions

This commit adds validation to ensure that computed column expressions
do not reference nonexistent columns.

I tested this validation ad-hoc by commenting-out code that renames
columns in computed column expressions during an
`ALTER TABLE ... RENAME COLUMN` statement, and verifying that the
correct error was returned.

Informs #50854

Release note: None


Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
craig[bot] and mgartner committed Nov 9, 2020
2 parents 9518e1c + d0ddc7a commit 0446d7c
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/sql/catalog/tabledesc/safe_format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func TestSafeMessage(t *testing.T) {
Name: "check_not_null",
Check: descpb.TableDescriptor_CheckConstraint{
Name: "check_not_null",
Expr: "j IS NOT NULL",
Validity: descpb.ConstraintValidity_Unvalidated,
ColumnIDs: []descpb.ColumnID{2},
IsNonNullConstraint: true,
Expand Down
147 changes: 111 additions & 36 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -1708,40 +1709,8 @@ func (desc *Immutable) ValidateTable(ctx context.Context) error {

columnNames := make(map[string]descpb.ColumnID, len(desc.Columns))
columnIDs := make(map[descpb.ColumnID]string, len(desc.Columns))
for _, column := range desc.AllNonDropColumns() {
if err := catalog.ValidateName(column.Name, "column"); err != nil {
return err
}
if column.ID == 0 {
return errors.AssertionFailedf("invalid column ID %d", errors.Safe(column.ID))
}

if _, columnNameExists := columnNames[column.Name]; columnNameExists {
for i := range desc.Columns {
if desc.Columns[i].Name == column.Name {
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate column name: %q", column.Name)
}
}
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate: column %q in the middle of being added, not yet public", column.Name)
}
if colinfo.IsSystemColumnName(column.Name) {
return pgerror.Newf(pgcode.DuplicateColumn,
"column name %q conflicts with a system column name", column.Name)
}
columnNames[column.Name] = column.ID

if other, ok := columnIDs[column.ID]; ok {
return fmt.Errorf("column %q duplicate ID of column %q: %d",
column.Name, other, column.ID)
}
columnIDs[column.ID] = column.Name

if column.ID >= desc.NextColumnID {
return errors.AssertionFailedf("column %q invalid ID (%d) >= next column ID (%d)",
column.Name, errors.Safe(column.ID), errors.Safe(desc.NextColumnID))
}
if err := desc.validateColumns(columnNames, columnIDs); err != nil {
return err
}

for _, m := range desc.Mutations {
Expand Down Expand Up @@ -1792,16 +1761,21 @@ func (desc *Immutable) ValidateTable(ctx context.Context) error {

// TODO(dt): Validate each column only appears at-most-once in any FKs.

// Only validate column families and indexes if this is actually a table, not
// if it's just a view.
// Only validate column families, check constraints, and indexes if this is
// actually a table, not if it's just a view.
if desc.IsPhysicalTable() {
if err := desc.validateColumnFamilies(columnIDs); err != nil {
return err
}

if err := desc.validateCheckConstraints(columnIDs); err != nil {
return err
}

if err := desc.validateTableIndexes(columnNames); err != nil {
return err
}

if err := desc.validatePartitioning(); err != nil {
return err
}
Expand Down Expand Up @@ -1865,6 +1839,63 @@ func (desc *Immutable) ValidateTable(ctx context.Context) error {
return desc.Privileges.Validate(desc.GetID(), privilege.Table)
}

func (desc *Immutable) validateColumns(
columnNames map[string]descpb.ColumnID, columnIDs map[descpb.ColumnID]string,
) error {
for _, column := range desc.AllNonDropColumns() {
if err := catalog.ValidateName(column.Name, "column"); err != nil {
return err
}
if column.ID == 0 {
return errors.AssertionFailedf("invalid column ID %d", errors.Safe(column.ID))
}

if _, columnNameExists := columnNames[column.Name]; columnNameExists {
for i := range desc.Columns {
if desc.Columns[i].Name == column.Name {
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate column name: %q", column.Name)
}
}
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate: column %q in the middle of being added, not yet public", column.Name)
}
if colinfo.IsSystemColumnName(column.Name) {
return pgerror.Newf(pgcode.DuplicateColumn,
"column name %q conflicts with a system column name", column.Name)
}
columnNames[column.Name] = column.ID

if other, ok := columnIDs[column.ID]; ok {
return fmt.Errorf("column %q duplicate ID of column %q: %d",
column.Name, other, column.ID)
}
columnIDs[column.ID] = column.Name

if column.ID >= desc.NextColumnID {
return errors.AssertionFailedf("column %q invalid ID (%d) >= next column ID (%d)",
column.Name, errors.Safe(column.ID), errors.Safe(desc.NextColumnID))
}

if column.IsComputed() {
// Verify that the computed column expression is valid.
expr, err := parser.ParseExpr(*column.ComputeExpr)
if err != nil {
return err
}
valid, err := schemaexpr.HasValidColumnReferences(desc, expr)
if err != nil {
return err
}
if !valid {
return fmt.Errorf("computed column %q refers to unknown columns in expression: %s",
column.Name, *column.ComputeExpr)
}
}
}
return nil
}

func (desc *Immutable) validateColumnFamilies(columnIDs map[descpb.ColumnID]string) error {
if len(desc.Families) < 1 {
return fmt.Errorf("at least 1 column family must be specified")
Expand Down Expand Up @@ -1938,6 +1969,36 @@ func (desc *Immutable) validateColumnFamilies(columnIDs map[descpb.ColumnID]stri
return nil
}

// validateCheckConstraints validates that check constraints are well formed.
// Checks include validating the column IDs and verifying that check expressions
// do not reference non-existent columns.
func (desc *Immutable) validateCheckConstraints(columnIDs map[descpb.ColumnID]string) error {
for _, chk := range desc.AllActiveAndInactiveChecks() {
// Verify that the check's column IDs are valid.
for _, colID := range chk.ColumnIDs {
_, ok := columnIDs[colID]
if !ok {
return fmt.Errorf("check constraint %q contains unknown column \"%d\"", chk.Name, colID)
}
}

// Verify that the check's expression is valid.
expr, err := parser.ParseExpr(chk.Expr)
if err != nil {
return err
}
valid, err := schemaexpr.HasValidColumnReferences(desc, expr)
if err != nil {
return err
}
if !valid {
return fmt.Errorf("check constraint %q refers to unknown columns in expression: %s",
chk.Name, chk.Expr)
}
}
return nil
}

// validateTableIndexes validates that indexes are well formed. Checks include
// validating the columns involved in the index, verifying the index names and
// IDs are unique, and the family of the primary key is 0. This does not check
Expand Down Expand Up @@ -2019,6 +2080,20 @@ func (desc *Immutable) validateTableIndexes(columnNames map[string]descpb.Column
index.Name, index.Sharded.Name)
}
}
if index.IsPartial() {
expr, err := parser.ParseExpr(index.Predicate)
if err != nil {
return err
}
valid, err := schemaexpr.HasValidColumnReferences(desc, expr)
if err != nil {
return err
}
if !valid {
return fmt.Errorf("partial index %q refers to unknown columns in predicate: %s",
index.Name, index.Predicate)
}
}
}

return nil
Expand Down
12 changes: 3 additions & 9 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ var validationMap = []struct {
"FormatVersion": {status: thisFieldReferencesNoObjects},
"State": {status: thisFieldReferencesNoObjects},
"OfflineReason": {status: thisFieldReferencesNoObjects},
"Checks": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"Checks": {status: iSolemnlySwearThisFieldIsValidated},
"ViewQuery": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
Expand Down Expand Up @@ -148,9 +146,7 @@ var validationMap = []struct {
"Sharded": {status: iSolemnlySwearThisFieldIsValidated},
"Disabled": {status: thisFieldReferencesNoObjects},
"GeoConfig": {status: thisFieldReferencesNoObjects},
"Predicate": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(mgartner): add validation"},
"Predicate": {status: iSolemnlySwearThisFieldIsValidated},
},
},
{
Expand All @@ -170,9 +166,7 @@ var validationMap = []struct {
"OwnsSequenceIds": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"ComputeExpr": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"ComputeExpr": {status: iSolemnlySwearThisFieldIsValidated},
"PGAttributeNum": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
Expand Down
41 changes: 41 additions & 0 deletions pkg/sql/schemaexpr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,47 @@ func ExtractColumnIDs(desc catalog.TableDescriptor, rootExpr tree.Expr) (TableCo
return colIDs, err
}

type returnFalse struct{}

func (returnFalse) Error() string { panic("unimplemented") }

var returnFalsePseudoError error = returnFalse{}

// HasValidColumnReferences returns true if all columns referenced in rootExpr
// exist in desc.
func HasValidColumnReferences(desc catalog.TableDescriptor, rootExpr tree.Expr) (bool, error) {
_, err := tree.SimpleVisit(rootExpr, func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) {
vBase, ok := expr.(tree.VarName)
if !ok {
return true, expr, nil
}

v, err := vBase.NormalizeVarName()
if err != nil {
return false, nil, err
}

c, ok := v.(*tree.ColumnItem)
if !ok {
return true, expr, nil
}

_, _, err = desc.FindColumnByName(c.ColumnName)
if err != nil {
return false, expr, returnFalsePseudoError
}

return false, expr, nil
})
if errors.Is(err, returnFalsePseudoError) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}

// FormatExprForDisplay formats a schema expression string for display. It
// accepts formatting flags to control things like showing type annotations or
// type casts.
Expand Down
48 changes: 48 additions & 0 deletions pkg/sql/schemaexpr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,51 @@ func TestExtractColumnIDs(t *testing.T) {
})
}
}

func TestValidColumnReferences(t *testing.T) {
// Trick to get the init() for the builtins package to run.
_ = builtins.AllBuiltinNames

table := tree.Name("foo")
desc := testTableDesc(
string(table),
[]testCol{{"a", types.Bool}, {"b", types.Int}},
[]testCol{{"c", types.String}},
)

testData := []struct {
expr string
expected bool
}{
{"true", true},
{"now()", true},
{"a", true},
{"a AND b > 1", true},
{"a AND c = 'foo'", true},
{"a OR (b > 1 AND c = 'foo')", true},
{"a AND abs(b) > 5 AND lower(c) = 'foo'", true},
{"x", false},
{"a AND x > 1", false},
{"a AND x = 'foo'", false},
{"a OR (b > 1 AND x = 'foo')", false},
{"a AND abs(x) > 5 AND lower(y) = 'foo'", false},
}

for _, d := range testData {
t.Run(d.expr, func(t *testing.T) {
expr, err := parser.ParseExpr(d.expr)
if err != nil {
t.Fatalf("%s: unexpected error: %s", d.expr, err)
}

res, err := schemaexpr.HasValidColumnReferences(desc, expr)
if err != nil {
t.Fatalf("%s: unexpected error: %s", d.expr, err)
}

if res != d.expected {
t.Errorf("%s: expected %t, got %t", d.expr, d.expected, res)
}
})
}
}

0 comments on commit 0446d7c

Please sign in to comment.