Skip to content

Commit

Permalink
tabledesc: improve view validation
Browse files Browse the repository at this point in the history
Previously, we did not validate the DependsOn and DependedOnBy
cross-references. This commit adds this validation and also fixes a bug
in which privileges were not validated for non-physical tables.

Fixes cockroachdb#63147.

Release note: None
  • Loading branch information
Marius Posta committed Sep 24, 2021
1 parent 6994559 commit 1e113f4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 32 deletions.
7 changes: 6 additions & 1 deletion pkg/sql/catalog/dbdesc/database_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Database))
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Database))
}

// The DefaultPrivilegeDescriptor may be nil.
if desc.GetDefaultPrivileges() != nil {
// Validate the default privilege descriptor.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/catalog/descpb/privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (p PrivilegeDescriptor) ValidateSuperuserPrivileges(
objectName string,
allowedSuperuserPrivileges privilege.List,
) error {
if parentID == InvalidID && objectType != privilege.Database {
// Special case for virtual objects.
return nil
}
for _, user := range []security.SQLUsername{
// Check "root" user.
security.RootUserName(),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/catalog/schemadesc/schema_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Schema))
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Schema))
}
}

// GetReferencedDescIDs returns the IDs of all descriptors referenced by
Expand Down
59 changes: 55 additions & 4 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (desc *wrapper) ValidateCrossReferences(
}
}

// For views, check dependent relations.
for _, id := range desc.DependsOn {
vea.Report(desc.validateOutboundTableRef(id, vdg))
}
for _, by := range desc.DependedOnBy {
vea.Report(desc.validateInboundTableRef(by, vdg))
}

// Check foreign keys.
for i := range desc.OutboundFKs {
vea.Report(desc.validateOutboundFK(&desc.OutboundFKs[i], vdg))
Expand Down Expand Up @@ -255,6 +263,46 @@ func (desc *wrapper) validateIndexInterleave(
return nil
}

func (desc *wrapper) validateOutboundTableRef(
id descpb.ID, vdg catalog.ValidationDescGetter,
) error {
referencedTable, err := vdg.GetTableDescriptor(id)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid relation reference")
}
for _, by := range referencedTable.TableDesc().DependedOnBy {
if by.ID == desc.GetID() {
return nil
}
}
return errors.AssertionFailedf("missing back-reference for referenced relation %q (%d)",
referencedTable.GetName(), id)
}

func (desc *wrapper) validateInboundTableRef(
by descpb.TableDescriptor_Reference, vdg catalog.ValidationDescGetter,
) error {
backReferencedTable, err := vdg.GetTableDescriptor(by.ID)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid relation back-reference")
}
for _, id := range backReferencedTable.TableDesc().DependsOn {
if id == desc.GetID() {
return nil
}
}
// Relation back-references need to also consider sequence forward references.
for _, col := range backReferencedTable.DeletableColumns() {
for i := 0; i < col.NumUsesSequences(); i++ {
if col.GetUsesSequenceID(i) == desc.GetID() {
return nil
}
}
}
return errors.AssertionFailedf("missing forward reference for back-referenced relation %q (%d)",
backReferencedTable.GetName(), by.ID)
}

func (desc *wrapper) validateOutboundFK(
fk *descpb.ForeignKeyConstraint, vdg catalog.ValidationDescGetter,
) error {
Expand Down Expand Up @@ -494,6 +542,13 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
vea.Report(errors.AssertionFailedf("invalid parent ID %d", desc.GetParentID()))
}

// Validate the privilege descriptor.
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Table))
}

if desc.IsSequence() {
return
}
Expand All @@ -510,7 +565,6 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
return
}

// TODO(dt, nathan): virtual descs don't validate (missing privs, PK, etc).
if desc.IsVirtualTable() {
return
}
Expand Down Expand Up @@ -571,9 +625,6 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Table))

// Ensure that mutations cannot be queued if a primary key change or
// an alter column type schema change has either been started in
// this transaction, or is currently in progress.
Expand Down
107 changes: 83 additions & 24 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,10 @@ var validationMap = []struct {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"IsMaterializedView": {status: thisFieldReferencesNoObjects},
"DependsOn": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"DependsOnTypes": {status: iSolemnlySwearThisFieldIsValidated},
"DependedOnBy": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"MutationJobs": {status: thisFieldReferencesNoObjects},
"DependsOn": {status: iSolemnlySwearThisFieldIsValidated},
"DependsOnTypes": {status: iSolemnlySwearThisFieldIsValidated},
"DependedOnBy": {status: iSolemnlySwearThisFieldIsValidated},
"MutationJobs": {status: thisFieldReferencesNoObjects},
"SequenceOpts": {status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"DropTime": {status: thisFieldReferencesNoObjects},
Expand Down Expand Up @@ -1279,6 +1275,7 @@ func TestValidateTableDesc(t *testing.T) {
}
for i, d := range testData {
t.Run(d.err, func(t *testing.T) {
d.desc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.RootUserName())
desc := NewBuilder(&d.desc).BuildImmutableTable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), d.err)
if err := catalog.ValidateSelf(desc); err == nil {
Expand Down Expand Up @@ -1724,26 +1721,88 @@ func TestValidateCrossTableReferences(t *testing.T) {
Temporary: true,
},
},
// Views.
{ // 18
desc: descpb.TableDescriptor{
Name: "foo",
ID: 51,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
PrimaryIndex: descpb.IndexDescriptor{
ID: 1,
Name: "primary",
KeyColumnIDs: []descpb.ColumnID{1},
KeyColumnNames: []string{"a"},
},
Columns: []descpb.ColumnDescriptor{
{
Name: "a",
ID: 1,
Type: types.Int,
},
},
DependedOnBy: []descpb.TableDescriptor_Reference{{ID: 52}},
},
otherDescs: []descpb.TableDescriptor{{
Name: "bar",
ID: 52,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
ViewQuery: "SELECT * FROM foo",
DependsOn: []descpb.ID{51},
}},
},
{ // 19
err: `missing forward reference for back-referenced relation "bar" (52)`,
desc: descpb.TableDescriptor{
Name: "foo",
ID: 51,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
PrimaryIndex: descpb.IndexDescriptor{
ID: 1,
Name: "primary",
KeyColumnIDs: []descpb.ColumnID{1},
KeyColumnNames: []string{"a"},
},
Columns: []descpb.ColumnDescriptor{
{
Name: "a",
ID: 1,
Type: types.Int,
},
},
DependedOnBy: []descpb.TableDescriptor_Reference{{ID: 52}},
},
otherDescs: []descpb.TableDescriptor{{
Name: "bar",
ID: 52,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
}},
},
}

for i, test := range tests {
descs := catalog.MakeMapDescGetter()
descs.Descriptors[1] = dbdesc.NewBuilder(&descpb.DatabaseDescriptor{ID: 1}).BuildImmutable()
for _, otherDesc := range test.otherDescs {
otherDesc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName())
descs.Descriptors[otherDesc.ID] = NewBuilder(&otherDesc).BuildImmutable()
}
desc := NewBuilder(&test.desc).BuildImmutable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), test.err)
const validateCrossReferencesOnly = catalog.ValidationLevelCrossReferences &^ (catalog.ValidationLevelCrossReferences >> 1)
results := catalog.Validate(ctx, descs, catalog.NoValidationTelemetry, validateCrossReferencesOnly, desc)
if err := results.CombinedError(); err == nil {
if test.err != "" {
t.Errorf("%d: expected \"%s\", but found success: %+v", i, expectedErr, test.desc)
t.Run(test.err, func(t *testing.T) {
descs := catalog.MakeMapDescGetter()
descs.Descriptors[1] = dbdesc.NewBuilder(&descpb.DatabaseDescriptor{ID: 1}).BuildImmutable()
for _, otherDesc := range test.otherDescs {
otherDesc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName())
descs.Descriptors[otherDesc.ID] = NewBuilder(&otherDesc).BuildImmutable()
}
} else if expectedErr != err.Error() {
t.Errorf("%d: expected \"%s\", but found \"%s\"", i, expectedErr, err.Error())
}
desc := NewBuilder(&test.desc).BuildImmutable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), test.err)
const validateCrossReferencesOnly = catalog.ValidationLevelCrossReferences &^ (catalog.ValidationLevelCrossReferences >> 1)
results := catalog.Validate(ctx, descs, catalog.NoValidationTelemetry, validateCrossReferencesOnly, desc)
if err := results.CombinedError(); err == nil {
if test.err != "" {
t.Errorf("%d: expected \"%s\", but found success: %+v", i, expectedErr, test.desc)
}
} else if expectedErr != err.Error() {
t.Errorf("%d: expected \"%s\", but found \"%s\"", i, expectedErr, err.Error())
}
})
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,14 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
vea.Report(errors.AssertionFailedf("invalid parent schema ID %d", desc.GetParentSchemaID()))
}

if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else if desc.Kind != descpb.TypeDescriptor_ALIAS {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
}

switch desc.Kind {
case descpb.TypeDescriptor_MULTIREGION_ENUM:
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
// Check presence of region config
if desc.RegionConfig == nil {
vea.Report(errors.AssertionFailedf("no region config on %s type desc", desc.Kind.String()))
Expand All @@ -498,7 +503,6 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
}
case descpb.TypeDescriptor_ENUM:
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
if desc.RegionConfig != nil {
vea.Report(errors.AssertionFailedf("found region config on %s type desc", desc.Kind.String()))
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/sequences
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,8 @@ statement ok
DROP TABLE s2

# Validate that cross DB sequences are detected by internal tables
subtest cross_db_sequences

statement ok
CREATE DATABASE db3;

Expand All @@ -1817,6 +1819,41 @@ db2 public seq db1 public t sequences owning table
db2 public seq2 db1 public t sequences owning table
test public tdb3ref db3 public s table column refers to sequence

subtest regression_34527

# Testing parsing empty string for currval issue #34527.
statement error pq: currval\(\): invalid table name:
SELECT currval('')

subtest regression_63147

statement ok
CREATE SEQUENCE s63147

statement ok
CREATE TABLE t63147 (a INT PRIMARY KEY)

statement ok
INSERT INTO t63147 (a) VALUES (1), (2)

statement error pgcode 0A000 cannot evaluate scalar expressions containing sequence operations in this context
ALTER TABLE t63147 ADD b INT DEFAULT nextval('s63147')

query I
SELECT COUNT(*) FROM crdb_internal.invalid_objects;
----
0

statement ok
DROP TABLE t63147

statement ok
CREATE TABLE t63147 (i INT PRIMARY KEY DEFAULT (nextval('s63147')))

statement ok
CREATE VIEW v63147 AS SELECT nextval('s63147') FROM t63147

query I
SELECT COUNT(*) FROM crdb_internal.invalid_objects;
----
0

0 comments on commit 1e113f4

Please sign in to comment.