Skip to content

Commit

Permalink
sql: prevent DROP SCHEMA CASCADE from droping types with references
Browse files Browse the repository at this point in the history
Before this patch, a DROP SCHEMA CASCADE could cause database corruption
by dropping types which were referenced by other tables. This would lead to
bad errors like:

```
ERROR: object already exists: desc 687: type ID 685 in descriptor not found: descriptor not found
SQLSTATE: 42710
```

And doctor errors like:
```
   Table 687: ParentID  50, ParentSchemaID 29, Name 't': type ID 685 in descriptor not found: descriptor not found
```

Fixes #59021.

Release note (bug fix): Fixed a bug where `DROP SCHEMA ... CASCADE` could
result in types which are referenced being dropped.
  • Loading branch information
ajwerner committed Feb 1, 2021
1 parent e4ac407 commit c0025b6
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
40 changes: 40 additions & 0 deletions pkg/sql/drop_cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/errors"
)

Expand All @@ -32,6 +33,7 @@ type dropCascadeState struct {
objectNamesToDelete []tree.ObjectName

td []toDelete
toDeleteByID map[descpb.ID]*toDelete
allTableObjectsToDelete []*tabledesc.Mutable
typesToDelete []*typedesc.Mutable

Expand Down Expand Up @@ -176,6 +178,10 @@ func (d *dropCascadeState) resolveCollectedObjects(
}
d.allTableObjectsToDelete = allObjectsToDelete
d.td = filterImplicitlyDeletedObjects(d.td, implicitDeleteMap)
d.toDeleteByID = make(map[descpb.ID]*toDelete)
for i := range d.td {
d.toDeleteByID[d.td[i].desc.GetID()] = &d.td[i]
}
return nil
}

Expand Down Expand Up @@ -203,6 +209,9 @@ func (d *dropCascadeState) dropAllCollectedObjects(ctx context.Context, p *plann

// Now delete all of the types.
for _, typ := range d.typesToDelete {
if err := d.canDropType(ctx, p, typ); err != nil {
return err
}
// Drop the types. Note that we set queueJob to be false because the types
// will be dropped in bulk as part of the DROP DATABASE job.
if err := p.dropTypeImpl(ctx, typ, "", false /* queueJob */); err != nil {
Expand All @@ -213,6 +222,37 @@ func (d *dropCascadeState) dropAllCollectedObjects(ctx context.Context, p *plann
return nil
}

func (d *dropCascadeState) canDropType(
ctx context.Context, p *planner, typ *typedesc.Mutable,
) error {
var referencedButNotDropping []descpb.ID
for _, id := range typ.ReferencingDescriptorIDs {
if _, exists := d.toDeleteByID[id]; exists {
continue
}
referencedButNotDropping = append(referencedButNotDropping, id)
}
if len(referencedButNotDropping) == 0 {
return nil
}
dependentNames, err := p.getFullyQualifiedTableNamesFromIDs(ctx, referencedButNotDropping)
if err != nil {
return errors.Wrapf(err, "type %q has dependent objects", typ.Name)
}
fqName, err := getTypeNameFromTypeDescriptor(
oneAtATimeSchemaResolver{ctx, p},
typ,
)
if err != nil {
return errors.Wrapf(err, "type %q has dependent objects", typ.Name)
}
return unimplemented.NewWithIssueDetailf(51480, "DROP TYPE CASCADE is not yet supported",
"cannot drop type %q because other objects (%v) still depend on it",
fqName.FQString(),
dependentNames,
)
}

func (d *dropCascadeState) getDroppedTableDetails() []jobspb.DroppedTableDetails {
res := make([]jobspb.DroppedTableDetails, len(d.allTableObjectsToDelete))
for i := range d.allTableObjectsToDelete {
Expand Down
14 changes: 3 additions & 11 deletions pkg/sql/drop_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,9 @@ func (p *planner) canDropTypeDesc(
return err
}
if len(desc.ReferencingDescriptorIDs) > 0 && behavior != tree.DropCascade {
var dependentNames []string
for _, id := range desc.ReferencingDescriptorIDs {
desc, err := p.Descriptors().GetMutableTableVersionByID(ctx, id, p.txn)
if err != nil {
return errors.Wrapf(err, "type has dependent objects")
}
fqName, err := p.getQualifiedTableName(ctx, desc)
if err != nil {
return errors.Wrapf(err, "type %q has dependent objects", desc.Name)
}
dependentNames = append(dependentNames, fqName.FQString())
dependentNames, err := p.getFullyQualifiedTableNamesFromIDs(ctx, desc.ReferencingDescriptorIDs)
if err != nil {
return errors.Wrapf(err, "type %q has dependent objects", desc.Name)
}
return pgerror.Newf(
pgcode.DependentObjectsStillExist,
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_type
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,24 @@ DROP TYPE d."b+c"

statement ok
DROP DATABASE d

# Test that dropping a schema which contains a type which refers to things
# outside of that schema works.

subtest drop_schema_cascade

statement ok
CREATE SCHEMA schema_to_drop;
CREATE TYPE schema_to_drop.typ AS ENUM ('a');
CREATE TABLE t (k schema_to_drop.typ PRIMARY KEY);
CREATE TABLE schema_to_drop.t (k schema_to_drop.typ PRIMARY KEY);

statement error pgcode 0A000 unimplemented: cannot drop type "test.schema_to_drop._typ" because other objects \(\[test\.public\.t\]\) still depend on it
DROP SCHEMA schema_to_drop CASCADE;

statement ok
DROP TABLE t;

statement ok
DROP SCHEMA schema_to_drop CASCADE;

25 changes: 25 additions & 0 deletions pkg/sql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,31 @@ func getDescriptorsFromTargetListForPrivilegeChange(
return descs, nil
}

// getFullyQualifiedTableNamesFromIDs resolves a list of table IDs to their
// fully qualified names.
func (p *planner) getFullyQualifiedTableNamesFromIDs(
ctx context.Context, ids []descpb.ID,
) (fullyQualifiedNames []*tree.TableName, _ error) {
for _, id := range ids {
desc, err := p.Descriptors().GetImmutableTableByID(ctx, p.txn, id, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidCached: true,
IncludeDropped: true,
IncludeOffline: true,
},
})
if err != nil {
return nil, err
}
fqName, err := p.getQualifiedTableName(ctx, desc)
if err != nil {
return nil, err
}
fullyQualifiedNames = append(fullyQualifiedNames, fqName)
}
return fullyQualifiedNames, nil
}

// getQualifiedTableName returns the database-qualified name of the table
// or view represented by the provided descriptor. It is a sort of
// reverse of the Resolve() functions.
Expand Down

0 comments on commit c0025b6

Please sign in to comment.