Skip to content

Commit

Permalink
sql: legacy schema changer jobs can incorrectly drop db/schema descri…
Browse files Browse the repository at this point in the history
…ptors

Previously, the legacy schema changer would create a job
to clean up database/schema descriptors, however it would only
care if the descriptor was marked as dropped. This was
inadequate because with the declarative schema changer
becoming in use, its possible for it to interfere with
the declarative schema changer, if a legacy schema changer
job is kicked off after the pre-commit phase of a declarative
drop. To address this, the legacy schema changer now checks
if the descriptor ID is in the job details.

Release note: None
  • Loading branch information
fqazi committed Dec 7, 2022
1 parent 25c2b9e commit 37df598
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
50 changes: 50 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/reassign_owned_by
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,55 @@ statement ok
DROP SCHEMA db1.sc1 CASCADE;

statement ok
USE db1;
REASSIGN OWNED BY testuser TO testuser2;

user root
statement ok
USE test;
DROP DATABASE db1 CASCADE;

# Also validate the reverse, where the REASSIGN OWNED BY job
# is created, but paused before a DROP DATABASE occurs. The job
# should be re-slient in this scenario.
user root
statement ok
CREATE DATABASE db1;
ALTER DATABASE db1 OWNER TO testuser;
CREATE SCHEMA db1.sc1;
ALTER SCHEMA db1.sc1 OWNER TO testuser;
CREATE TABLE db1.sc1.table(n int);
ALTER TABLE db1.sc1.table OWNER TO testuser;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';

skipif config local-legacy-schema-changer
statement error job \d+ was paused before it completed with reason: pause point "schemachanger.before.exec" hit
use db1;
REASSIGN OWNED BY testuser TO testuser2;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = 'newschemachanger.before.exec';

user root

skipif config local-legacy-schema-changer
statement error job \d+ was paused before it completed with reason: pause point "newschemachanger.before.exec" hit
DROP DATABASE db1 CASCADE;

statement ok
SET CLUSTER SETTING jobs.debug.pausepoints = '';

# There will be 3 jobs for the schema, database and table
skipif config local-legacy-schema-changer
statement ok
USE test;
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'REASSIGN OWNED BY%' AND status='paused' FETCH FIRST 1 ROWS ONLY);

# Next allow the post commit phase of the declarative schema changer to resume.
skipif config local-legacy-schema-changer
statement ok
RESUME JOB (SELECT job_id FROM crdb_internal.jobs WHERE description LIKE 'DROP DATABASE%' AND status='paused' FETCH FIRST 1 ROWS ONLY);
34 changes: 23 additions & 11 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type SchemaChanger struct {
descID descpb.ID
mutationID descpb.MutationID
droppedDatabaseID descpb.ID
droppedSchemaIDs catalog.DescriptorIDSet
sqlInstanceID base.SQLInstanceID
db *kv.DB
leaseMgr *lease.Manager
Expand Down Expand Up @@ -691,14 +692,24 @@ func (sc *SchemaChanger) exec(ctx context.Context) error {
if err := waitToUpdateLeases(false /* refreshStats */); err != nil {
return err
}
// Some descriptors should be deleted if they are in the DROP state.
// Database/Schema descriptors should be deleted if they are in the DROP state.
if !desc.Dropped() {
return nil
}
switch desc.(type) {
case catalog.SchemaDescriptor, catalog.DatabaseDescriptor:
if desc.Dropped() {
if _, err := sc.execCfg.DB.Del(ctx, catalogkeys.MakeDescMetadataKey(sc.execCfg.Codec, desc.GetID())); err != nil {
return err
}
case catalog.SchemaDescriptor:
if !sc.droppedSchemaIDs.Contains(desc.GetID()) {
return nil
}
case catalog.DatabaseDescriptor:
if sc.droppedDatabaseID != desc.GetID() {
return nil
}
default:
return nil
}
if _, err := sc.execCfg.DB.Del(ctx, catalogkeys.MakeDescMetadataKey(sc.execCfg.Codec, desc.GetID())); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -2594,10 +2605,11 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
return err
}
}
execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID) error {
execSchemaChange := func(descID descpb.ID, mutationID descpb.MutationID, droppedDatabaseID descpb.ID, droppedSchemaIDs descpb.IDs) error {
sc := SchemaChanger{
descID: descID,
mutationID: mutationID,
droppedSchemaIDs: catalog.MakeDescriptorIDSet(droppedSchemaIDs...),
droppedDatabaseID: droppedDatabaseID,
sqlInstanceID: p.ExecCfg().NodeInfo.NodeID.SQLInstanceID(),
db: p.ExecCfg().DB,
Expand Down Expand Up @@ -2695,22 +2707,22 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
// Drop the child tables.
for i := range details.DroppedTables {
droppedTable := &details.DroppedTables[i]
if err := execSchemaChange(droppedTable.ID, descpb.InvalidMutationID, details.DroppedDatabaseID); err != nil {
if err := execSchemaChange(droppedTable.ID, descpb.InvalidMutationID, details.DroppedDatabaseID, details.DroppedSchemas); err != nil {
return err
}
}

// Drop all schemas.
for _, id := range details.DroppedSchemas {
if err := execSchemaChange(id, descpb.InvalidMutationID, descpb.InvalidID); err != nil {
if err := execSchemaChange(id, descpb.InvalidMutationID, descpb.InvalidID, details.DroppedSchemas); err != nil {
return err
}
}

// Drop the database, if applicable.
if details.FormatVersion >= jobspb.DatabaseJobFormatVersion {
if dbID := details.DroppedDatabaseID; dbID != descpb.InvalidID {
if err := execSchemaChange(dbID, descpb.InvalidMutationID, descpb.InvalidID); err != nil {
if err := execSchemaChange(dbID, descpb.InvalidMutationID, details.DroppedDatabaseID, details.DroppedSchemas); err != nil {
return err
}
// If there are no tables to GC, the zone config needs to be deleted now.
Expand Down Expand Up @@ -2763,7 +2775,7 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
// schema changer. This can be any single-table schema change or any change to
// a database or schema other than a drop.
if details.DescID != descpb.InvalidID {
return execSchemaChange(details.DescID, details.TableMutationID, details.DroppedDatabaseID)
return execSchemaChange(details.DescID, details.TableMutationID, details.DroppedDatabaseID, details.DroppedSchemas)
}
return nil
}
Expand Down

0 comments on commit 37df598

Please sign in to comment.