Skip to content

Commit

Permalink
sql: add doctor check for parent schema in parent db
Browse files Browse the repository at this point in the history
With this patch, doctor now also checks that the parent id of an object
matches the parent id of the parent schema of the object.

Fixes #55716.

Release note: None
  • Loading branch information
Marius Posta committed Jan 12, 2021
1 parent c59bcc6 commit 6bbd2a3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
16 changes: 11 additions & 5 deletions pkg/sql/doctor/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func ExamineDescriptors(
}

_, parentExists := descGetter[desc.GetParentID()]
_, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
parentSchema, parentSchemaExists := descGetter[desc.GetParentSchemaID()]
switch d := desc.(type) {
case catalog.TableDescriptor:
if err := d.Validate(ctx, descGetter); err != nil {
Expand All @@ -161,15 +161,21 @@ func ExamineDescriptors(
// parent schema id is always 0.
parentSchemaExists = true
}
var invalidParentID bool
if desc.GetParentID() != descpb.InvalidID && !parentExists {
problemsFound = true
invalidParentID = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent id %d", desc.GetParentID()))
}
if desc.GetParentSchemaID() != descpb.InvalidID &&
desc.GetParentSchemaID() != keys.PublicSchemaID &&
!parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
desc.GetParentSchemaID() != keys.PublicSchemaID {
if !parentSchemaExists {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema id %d", desc.GetParentSchemaID()))
} else if !invalidParentID && parentSchema.GetParentID() != desc.GetParentID() {
problemsFound = true
fmt.Fprint(stdout, reportMsg(desc, "invalid parent schema parent id %d", parentSchema.GetParentID()))
}
}

// Process namespace entries pointing to this descriptor.
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/doctor/doctor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestExamineDescriptors(t *testing.T) {
descpb.TableFromDescriptor(droppedValidTableDesc, hlc.Timestamp{WallTime: 1}).
State = descpb.DescriptorState_DROP

inSchemaValidTableDesc := protoutil.Clone(validTableDesc).(*descpb.Descriptor)
descpb.TableFromDescriptor(inSchemaValidTableDesc, hlc.Timestamp{WallTime: 1}).
UnexposedParentSchemaID = 3

tests := []struct {
descTable doctor.DescriptorTable
namespaceTable doctor.NamespaceTable
Expand Down Expand Up @@ -194,6 +198,31 @@ Database 1: ParentID 0, ParentSchemaID 0, Name 'db': not being dropped but
},
expected: `Examining 1 descriptors and 1 namespace entries...
Type 1: ParentID 0, ParentSchemaID 0, Name 'type': invalid parentID 0
`,
},
{
descTable: doctor.DescriptorTable{
{ID: 1, DescBytes: toBytes(t, inSchemaValidTableDesc)},
{
ID: 2,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Database{
Database: &descpb.DatabaseDescriptor{Name: "db", ID: 2},
}}),
},
{
ID: 3,
DescBytes: toBytes(t, &descpb.Descriptor{Union: &descpb.Descriptor_Schema{
Schema: &descpb.SchemaDescriptor{Name: "schema", ID: 3, ParentID: 0},
}}),
},
},
namespaceTable: doctor.NamespaceTable{
{NameInfo: descpb.NameInfo{ParentID: 2, ParentSchemaID: 3, Name: "t"}, ID: 1},
{NameInfo: descpb.NameInfo{Name: "db"}, ID: 2},
{NameInfo: descpb.NameInfo{ParentID: 0, Name: "schema"}, ID: 3},
},
expected: `Examining 3 descriptors and 3 namespace entries...
Table 1: ParentID 2, ParentSchemaID 3, Name 't': invalid parent schema parent id 0
`,
},
{
Expand Down

0 comments on commit 6bbd2a3

Please sign in to comment.