Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/schemachanger: reassign owned by and concurrent declarative schema changer drops can conflict #93101

Merged
merged 3 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ upsert descriptor #104
id: 104
modificationTime: {}
...
public:
id: 105
regionEnumId: 106
survivalGoal: REGION_FAILURE
- schemas:
- public:
- id: 105
- version: "1"
+ state: DROP
+ version: "2"
Expand Down
50 changes: 50 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/reassign_owned_by
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,55 @@ statement ok
DROP SCHEMA db1.sc1 CASCADE;

statement ok
USE db1;
REASSIGN OWNED BY testuser TO testuser2;

user root
statement ok
USE test;
DROP DATABASE db1 CASCADE;

# Also validate the reverse, where the REASSIGN OWNED BY job
# is created, but paused before a DROP DATABASE occurs. The job
# should be re-slient in this scenario.
user root
statement ok
CREATE DATABASE db1;
ALTER DATABASE db1 OWNER TO testuser;
CREATE SCHEMA db1.sc1;
ALTER SCHEMA db1.sc1 OWNER TO testuser;
CREATE TABLE db1.sc1.table(n int);
ALTER TABLE db1.sc1.table OWNER TO testuser;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';

skipif config local-legacy-schema-changer
statement error job \d+ was paused before it completed with reason: pause point "schemachanger.before.exec" hit
use db1;
REASSIGN OWNED BY testuser TO testuser2;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = 'newschemachanger.before.exec';

user root

skipif config local-legacy-schema-changer
statement error job \d+ was paused before it completed with reason: pause point "newschemachanger.before.exec" hit
DROP DATABASE db1 CASCADE;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = '';

# There will be 3 jobs for the schema, database and table
skipif config local-legacy-schema-changer
statement ok
USE test;
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);

# Next allow the post commit phase of the declarative schema changer to resume.
skipif config local-legacy-schema-changer
statement ok
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'DROP DATABASE%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
38 changes: 26 additions & 12 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type SchemaChanger struct {
descID descpb.ID
mutationID descpb.MutationID
droppedDatabaseID descpb.ID
droppedSchemaIDs catalog.DescriptorIDSet
sqlInstanceID base.SQLInstanceID
db *kv.DB
leaseMgr *lease.Manager
Expand Down Expand Up @@ -691,14 +692,24 @@ func (sc *SchemaChanger) exec(ctx context.Context) error {
if err := waitToUpdateLeases(false /* refreshStats */); err != nil {
return err
}
// Some descriptors should be deleted if they are in the DROP state.
// Database/Schema descriptors should be deleted if they are in the DROP state.
if !desc.Dropped() {
return nil
}
switch desc.(type) {
case catalog.SchemaDescriptor, catalog.DatabaseDescriptor:
if desc.Dropped() {
if _, err := sc.execCfg.DB.Del(ctx, catalogkeys.MakeDescMetadataKey(sc.execCfg.Codec, desc.GetID())); err != nil {
return err
}
case catalog.SchemaDescriptor:
if !sc.droppedSchemaIDs.Contains(desc.GetID()) {
return nil
}
case catalog.DatabaseDescriptor:
if sc.droppedDatabaseID != desc.GetID() {
return nil
}
default:
return nil
}
if _, err := sc.execCfg.DB.Del(ctx, catalogkeys.MakeDescMetadataKey(sc.execCfg.Codec, desc.GetID())); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -2594,10 +2605,11 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
return err
}
}
execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error {
execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID, droppedSchemaIDs descpb.IDs) error {
sc := SchemaChanger{
descID: descID,
mutationID: mutationID,
droppedSchemaIDs: catalog.MakeDescriptorIDSet(droppedSchemaIDs...),
droppedDatabaseID: droppedDatabaseID,
sqlInstanceID: p.ExecCfg().NodeInfo.NodeID.SQLInstanceID(),
db: p.ExecCfg().DB,
Expand Down Expand Up @@ -2695,22 +2707,22 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
// Drop the child tables.
for i := range details.DroppedTables {
droppedTable := &details.DroppedTables[i]
if err := execSchemaChange(droppedTable.ID, descpb.InvalidMutationID, details.DroppedDatabaseID); err != nil {
if err := execSchemaChange(droppedTable.ID, descpb.InvalidMutationID, details.DroppedDatabaseID, details.DroppedSchemas); err != nil {
return err
}
}

// Drop all schemas.
for _, id := range details.DroppedSchemas {
if err := execSchemaChange(id, descpb.InvalidMutationID, descpb.InvalidID); err != nil {
if err := execSchemaChange(id, descpb.InvalidMutationID, descpb.InvalidID, details.DroppedSchemas); err != nil {
return err
}
}

// Drop the database, if applicable.
if details.FormatVersion >= jobspb.DatabaseJobFormatVersion {
if dbID := details.DroppedDatabaseID; dbID != descpb.InvalidID {
if err := execSchemaChange(dbID, descpb.InvalidMutationID, descpb.InvalidID); err != nil {
if err := execSchemaChange(dbID, descpb.InvalidMutationID, details.DroppedDatabaseID, details.DroppedSchemas); err != nil {
return err
}
// If there are no tables to GC, the zone config needs to be deleted now.
Expand Down Expand Up @@ -2763,7 +2775,7 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
// schema changer. This can be any single-table schema change or any change to
// a database or schema other than a drop.
if details.DescID != descpb.InvalidID {
return execSchemaChange(details.DescID, details.TableMutationID, details.DroppedDatabaseID)
return execSchemaChange(details.DescID, details.TableMutationID, details.DroppedDatabaseID, details.DroppedSchemas)
}
return nil
}
Expand Down Expand Up @@ -3251,6 +3263,8 @@ func (p *planner) CanPerformDropOwnedBy(
) (bool, error) {
row, err := p.QueryRowEx(ctx, `role-has-synthetic-privileges`, sessiondata.NodeUserSessionDataOverride,
`SELECT count(1) FROM system.privileges WHERE username = $1`, role.Normalized())

if err != nil {
return false, err
}
return tree.MustBeDInt(row[0]) == 0, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func (m *visitor) RemoveSchemaParent(ctx context.Context, op scop.RemoveSchemaParent) error {
db, err := m.checkOutDatabase(ctx, op.Parent.ParentDatabaseID)
if err != nil || db.Dropped() {
if err != nil {
return err
}
for name, info := range db.Schemas {
Expand Down