Skip to content

Commit

Permalink
sql: add doctor check for parent schema in parent db
Browse files Browse the repository at this point in the history
With this patch, doctor now also checks that the parent id of an object
matches the parent id of the parent schema of the object.

Fixes cockroachdb#55716.

Release note: None
  • Loading branch information
Marius Posta committed Jan 14, 2021
1 parent b22e903 commit af9d6fb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
16 changes: 11 additions & 5 deletions pkg/sql/doctor/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func ExamineDescriptors(
}

_, parentExists := descGetter[desc.GetParentID()]
_, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
parentSchema, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
switch d := desc.(type) {
case catalog.TableDescriptor:
if err := d.Validate(ctx, descGetter); err != nil {
Expand All @@ -161,15 +161,21 @@ func ExamineDescriptors(
// parent schema id is always 0.
parentSchemaExists = true
}
var invalidParentID bool
if desc.GetParentID() != descpb.InvalidID && !parentExists {
problemsFound = true
invalidParentID = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent id %d", desc.GetParentID()))
}
if desc.GetParentSchemaID() != descpb.InvalidID &&
desc.GetParentSchemaID() != keys.PublicSchemaID &&
!parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
desc.GetParentSchemaID() != keys.PublicSchemaID {
if !parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
} else if !invalidParentID && parentSchema.GetParentID() != desc.GetParentID() {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent id of parent schema, expected %d, found %d", desc.GetParentID(), parentSchema.GetParentID()))
}
}

// Process namespace entries pointing to this descriptor.
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/doctor/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestExamineDescriptors(t *testing.T) {
descpb.TableFromDescriptor(droppedValidTableDesc, hlc.Timestamp{WallTime: 1}).
State = descpb.DescriptorState_DROP

inSchemaValidTableDesc := protoutil.Clone(validTableDesc).(*descpb.Descriptor)
descpb.TableFromDescriptor(inSchemaValidTableDesc, hlc.Timestamp{WallTime: 1}).
UnexposedParentSchemaID = 3

tests := []struct {
descTable doctor.DescriptorTable
namespaceTable doctor.NamespaceTable
Expand Down Expand Up @@ -194,6 +198,31 @@ Database 1: ParentID 0, ParentSchemaID 0, Name 'db': not being dropped but
},
expected: `Examining 1 descriptors and 1 namespace entries...
Type 1: ParentID 0, ParentSchemaID 0, Name 'type': invalid parentID 0
`,
},
{
descTable: doctor.DescriptorTable{
{ID: 1, DescBytes: toBytes(t, inSchemaValidTableDesc)},
{
ID: 2,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Database{
Database: &descpb.DatabaseDescriptor{Name: "db", ID: 2},
}}),
},
{
ID: 3,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Schema{
Schema: &descpb.SchemaDescriptor{Name: "schema", ID: 3, ParentID: 0},
}}),
},
},
namespaceTable: doctor.NamespaceTable{
{NameInfo: descpb.NameInfo{ParentID: 2, ParentSchemaID: 3, Name: "t"}, ID: 1},
{NameInfo: descpb.NameInfo{Name: "db"}, ID: 2},
{NameInfo: descpb.NameInfo{ParentID: 0, Name: "schema"}, ID: 3},
},
expected: `Examining 3 descriptors and 3 namespace entries...
Table 1: ParentID 2, ParentSchemaID 3, Name 't': invalid parent id of parent schema, expected 2, found 0
`,
},
{
Expand Down

0 comments on commit af9d6fb

Please sign in to comment.