Skip to content

Commit

Permalink
tabledesc: improve view validation
Browse files Browse the repository at this point in the history
Previously, we did not validate the DependsOn and DependedOnBy
cross-references. This commit adds this validation and also fixes a bug
in which privileges were not validated for non-physical tables.

Fixes cockroachdb#63147.

Release note: None
  • Loading branch information
Marius Posta committed Sep 23, 2021
1 parent 24b570f commit 7de5ac7
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 32 deletions.
7 changes: 6 additions & 1 deletion pkg/sql/catalog/dbdesc/database_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Database))
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Database))
}

// The DefaultPrivilegeDescriptor may be nil.
if desc.GetDefaultPrivileges() != nil {
// Validate the default privilege descriptor.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/catalog/descpb/privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (p PrivilegeDescriptor) ValidateSuperuserPrivileges(
objectName string,
allowedSuperuserPrivileges privilege.List,
) error {
if parentID == InvalidID && objectType != privilege.Database {
// Special case for virtual objects.
return nil
}
for _, user := range []security.SQLUsername{
// Check "root" user.
security.RootUserName(),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/catalog/schemadesc/schema_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Schema))
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Schema))
}
}

// GetReferencedDescIDs returns the IDs of all descriptors referenced by
Expand Down
51 changes: 47 additions & 4 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (desc *wrapper) ValidateCrossReferences(
}
}

// For views, check dependent relations.
for _, id := range desc.DependsOn {
vea.Report(desc.validateOutboundTableRef(id, vdg))
}
for _, by := range desc.DependedOnBy {
vea.Report(desc.validateInboundTableRef(by, vdg))
}

// Check foreign keys.
for i := range desc.OutboundFKs {
vea.Report(desc.validateOutboundFK(&desc.OutboundFKs[i], vdg))
Expand Down Expand Up @@ -255,6 +263,38 @@ func (desc *wrapper) validateIndexInterleave(
return nil
}

func (desc *wrapper) validateOutboundTableRef(
id descpb.ID, vdg catalog.ValidationDescGetter,
) error {
referencedTable, err := vdg.GetTableDescriptor(id)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid relation reference")
}
for _, by := range referencedTable.TableDesc().DependedOnBy {
if by.ID == desc.GetID() {
return nil
}
}
return errors.AssertionFailedf("missing back-reference for referenced relation %q (%d)",
referencedTable.GetName(), id)
}

func (desc *wrapper) validateInboundTableRef(
by descpb.TableDescriptor_Reference, vdg catalog.ValidationDescGetter,
) error {
backReferencedView, err := vdg.GetTableDescriptor(by.ID)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid relation back-reference")
}
for _, id := range backReferencedView.TableDesc().DependsOn {
if id == desc.GetID() {
return nil
}
}
return errors.AssertionFailedf("missing forward reference for back-referenced relation %q (%d)",
backReferencedView.GetName(), by.ID)
}

func (desc *wrapper) validateOutboundFK(
fk *descpb.ForeignKeyConstraint, vdg catalog.ValidationDescGetter,
) error {
Expand Down Expand Up @@ -494,6 +534,13 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
vea.Report(errors.AssertionFailedf("invalid parent ID %d", desc.GetParentID()))
}

// Validate the privilege descriptor.
if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Table))
}

if desc.IsSequence() {
return
}
Expand All @@ -510,7 +557,6 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
return
}

// TODO(dt, nathan): virtual descs don't validate (missing privs, PK, etc).
if desc.IsVirtualTable() {
return
}
Expand Down Expand Up @@ -571,9 +617,6 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
}

// Validate the privilege descriptor.
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Table))

// Ensure that mutations cannot be queued if a primary key change or
// an alter column type schema change has either been started in
// this transaction, or is currently in progress.
Expand Down
107 changes: 83 additions & 24 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,10 @@ var validationMap = []struct {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"IsMaterializedView": {status: thisFieldReferencesNoObjects},
"DependsOn": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"DependsOnTypes": {status: iSolemnlySwearThisFieldIsValidated},
"DependedOnBy": {
status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"MutationJobs": {status: thisFieldReferencesNoObjects},
"DependsOn": {status: iSolemnlySwearThisFieldIsValidated},
"DependsOnTypes": {status: iSolemnlySwearThisFieldIsValidated},
"DependedOnBy": {status: iSolemnlySwearThisFieldIsValidated},
"MutationJobs": {status: thisFieldReferencesNoObjects},
"SequenceOpts": {status: todoIAmKnowinglyAddingTechDebt,
reason: "initial import: TODO(features): add validation"},
"DropTime": {status: thisFieldReferencesNoObjects},
Expand Down Expand Up @@ -1279,6 +1275,7 @@ func TestValidateTableDesc(t *testing.T) {
}
for i, d := range testData {
t.Run(d.err, func(t *testing.T) {
d.desc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.RootUserName())
desc := NewBuilder(&d.desc).BuildImmutableTable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), d.err)
if err := catalog.ValidateSelf(desc); err == nil {
Expand Down Expand Up @@ -1724,26 +1721,88 @@ func TestValidateCrossTableReferences(t *testing.T) {
Temporary: true,
},
},
// Views.
{ // 18
desc: descpb.TableDescriptor{
Name: "foo",
ID: 51,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
PrimaryIndex: descpb.IndexDescriptor{
ID: 1,
Name: "primary",
KeyColumnIDs: []descpb.ColumnID{1},
KeyColumnNames: []string{"a"},
},
Columns: []descpb.ColumnDescriptor{
{
Name: "a",
ID: 1,
Type: types.Int,
},
},
DependedOnBy: []descpb.TableDescriptor_Reference{{ID: 52}},
},
otherDescs: []descpb.TableDescriptor{{
Name: "bar",
ID: 52,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
ViewQuery: "SELECT * FROM foo",
DependsOn: []descpb.ID{51},
}},
},
{ // 19
err: `missing forward reference for back-referenced relation "bar" (52)`,
desc: descpb.TableDescriptor{
Name: "foo",
ID: 51,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
PrimaryIndex: descpb.IndexDescriptor{
ID: 1,
Name: "primary",
KeyColumnIDs: []descpb.ColumnID{1},
KeyColumnNames: []string{"a"},
},
Columns: []descpb.ColumnDescriptor{
{
Name: "a",
ID: 1,
Type: types.Int,
},
},
DependedOnBy: []descpb.TableDescriptor_Reference{{ID: 52}},
},
otherDescs: []descpb.TableDescriptor{{
Name: "bar",
ID: 52,
ParentID: 1,
UnexposedParentSchemaID: keys.PublicSchemaID,
}},
},
}

for i, test := range tests {
descs := catalog.MakeMapDescGetter()
descs.Descriptors[1] = dbdesc.NewBuilder(&descpb.DatabaseDescriptor{ID: 1}).BuildImmutable()
for _, otherDesc := range test.otherDescs {
otherDesc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName())
descs.Descriptors[otherDesc.ID] = NewBuilder(&otherDesc).BuildImmutable()
}
desc := NewBuilder(&test.desc).BuildImmutable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), test.err)
const validateCrossReferencesOnly = catalog.ValidationLevelCrossReferences &^ (catalog.ValidationLevelCrossReferences >> 1)
results := catalog.Validate(ctx, descs, catalog.NoValidationTelemetry, validateCrossReferencesOnly, desc)
if err := results.CombinedError(); err == nil {
if test.err != "" {
t.Errorf("%d: expected \"%s\", but found success: %+v", i, expectedErr, test.desc)
t.Run(test.err, func(t *testing.T) {
descs := catalog.MakeMapDescGetter()
descs.Descriptors[1] = dbdesc.NewBuilder(&descpb.DatabaseDescriptor{ID: 1}).BuildImmutable()
for _, otherDesc := range test.otherDescs {
otherDesc.Privileges = descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName())
descs.Descriptors[otherDesc.ID] = NewBuilder(&otherDesc).BuildImmutable()
}
} else if expectedErr != err.Error() {
t.Errorf("%d: expected \"%s\", but found \"%s\"", i, expectedErr, err.Error())
}
desc := NewBuilder(&test.desc).BuildImmutable()
expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), test.err)
const validateCrossReferencesOnly = catalog.ValidationLevelCrossReferences &^ (catalog.ValidationLevelCrossReferences >> 1)
results := catalog.Validate(ctx, descs, catalog.NoValidationTelemetry, validateCrossReferencesOnly, desc)
if err := results.CombinedError(); err == nil {
if test.err != "" {
t.Errorf("%d: expected \"%s\", but found success: %+v", i, expectedErr, test.desc)
}
} else if expectedErr != err.Error() {
t.Errorf("%d: expected \"%s\", but found \"%s\"", i, expectedErr, err.Error())
}
})
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,14 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
vea.Report(errors.AssertionFailedf("invalid parent schema ID %d", desc.GetParentSchemaID()))
}

if desc.Privileges == nil {
vea.Report(errors.AssertionFailedf("privileges not set"))
} else if desc.Kind != descpb.TypeDescriptor_ALIAS {
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
}

switch desc.Kind {
case descpb.TypeDescriptor_MULTIREGION_ENUM:
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
// Check presence of region config
if desc.RegionConfig == nil {
vea.Report(errors.AssertionFailedf("no region config on %s type desc", desc.Kind.String()))
Expand All @@ -498,7 +503,6 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
}
case descpb.TypeDescriptor_ENUM:
vea.Report(catprivilege.Validate(*desc.Privileges, desc, privilege.Type))
if desc.RegionConfig != nil {
vea.Report(errors.AssertionFailedf("found region config on %s type desc", desc.Kind.String()))
}
Expand Down

0 comments on commit 7de5ac7

Please sign in to comment.