Skip to content

Commit

Permalink
backupccl: fix bug in database and schema restore cleanup
Browse files Browse the repository at this point in the history
This change fixes a bug where during restore cleanup we
would delete the database descriptor, and namespace entry
only to add the descriptor entry back when clearing the schema
references in the database desc slice.

We fix this by first clearing the schema reference in the db
descroptor slice, and then going through the motions of dropping
the database descriptor.

Fixes: cockroachdb#72248

Release note (bug fix): Fix bug in database and schema restore
cleanup that results in a dangling descriptor entry on job failure.
  • Loading branch information
adityamaru committed Nov 24, 2021
1 parent 0331536 commit 52edf03
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 28 deletions.
46 changes: 46 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8952,3 +8952,49 @@ DROP INDEX foo@bar;

sqlRunner.Exec(t, `BACKUP INTO LATEST IN 'nodelocal://0/foo' WITH revision_history`)
}

// TestRestoreSchemaDescriptorsRollBack is a regression test that ensures that a
// failed restore properly cleans up the database, schema and the database
// descriptors Schemas slice. Prior to the fix, the order in which we performed
// the cleanup would leave a dangling entry in the system.descriptor table for
// the cleaned up database. This caused the subsequent backup to fail when
// resolving the targets to backup with a "duplicate database" error.
func TestRestoreSchemaDescriptorsRollBack(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

_, tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

for _, server := range tc.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.beforePublishingDescriptors = func() error {
return errors.New("boom")
}
return r
},
}
}

sqlDB.Exec(t, `
CREATE DATABASE db;
CREATE SCHEMA db.s;
`)

// Back up the database, drop it, and restore into it.
sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/1'`)
sqlDB.Exec(t, `DROP DATABASE db`)
sqlDB.ExpectErr(t, "boom", `RESTORE DATABASE db FROM 'nodelocal://0/test/1'`)

// Back up database with user defined schema.
sqlDB.Exec(t, `
CREATE DATABASE db;
CREATE SCHEMA db.s;
`)

// Back up the database, drop it, and restore into it.
sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`)
}
56 changes: 28 additions & 28 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,34 @@ func (r *restoreResumer) dropDescriptors(
dbsWithDeletedSchemas[mutSchema.GetParentID()] = append(dbsWithDeletedSchemas[mutSchema.GetParentID()], mutSchema)
}

// For each database that had a child schema deleted (regardless of whether
// the db was created in the restore job), if it wasn't deleted just now,
// delete the now-deleted child schema from its schema map.
for dbID, schemas := range dbsWithDeletedSchemas {
log.Infof(ctx, "deleting %d schema entries from database %d", len(schemas), dbID)
desc, err := descsCol.GetMutableDescriptorByID(ctx, dbID, txn)
if err != nil {
return err
}
db := desc.(*dbdesc.Mutable)
for _, sc := range schemas {
if schemaInfo, ok := db.Schemas[sc.GetName()]; !ok {
log.Warningf(ctx, "unexpected missing schema entry for %s from db %d; skipping deletion",
sc.GetName(), dbID)
} else if schemaInfo.ID != sc.GetID() {
log.Warningf(ctx, "unexpected schema entry %d for %s from db %d, expecting %d; skipping deletion",
schemaInfo.ID, sc.GetName(), dbID, sc.GetID())
} else {
delete(db.Schemas, sc.GetName())
}
}
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, db, b,
); err != nil {
return err
}
}

// Delete the database descriptors.
deletedDBs := make(map[descpb.ID]struct{})
for _, dbDesc := range details.DatabaseDescs {
Expand Down Expand Up @@ -2351,34 +2379,6 @@ func (r *restoreResumer) dropDescriptors(
deletedDBs[db.GetID()] = struct{}{}
}

// For each database that had a child schema deleted (regardless of whether
// the db was created in the restore job), if it wasn't deleted just now,
// delete the now-deleted child schema from its schema map.
for dbID, schemas := range dbsWithDeletedSchemas {
log.Infof(ctx, "deleting %d schema entries from database %d", len(schemas), dbID)
desc, err := descsCol.GetMutableDescriptorByID(ctx, dbID, txn)
if err != nil {
return err
}
db := desc.(*dbdesc.Mutable)
for _, sc := range schemas {
if schemaInfo, ok := db.Schemas[sc.GetName()]; !ok {
log.Warningf(ctx, "unexpected missing schema entry for %s from db %d; skipping deletion",
sc.GetName(), dbID)
} else if schemaInfo.ID != sc.GetID() {
log.Warningf(ctx, "unexpected schema entry %d for %s from db %d, expecting %d; skipping deletion",
schemaInfo.ID, sc.GetName(), dbID, sc.GetID())
} else {
delete(db.Schemas, sc.GetName())
}
}
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, db, b,
); err != nil {
return err
}
}

if err := txn.Run(ctx, b); err != nil {
return errors.Wrap(err, "dropping tables created at the start of restore caused by fail/cancel")
}
Expand Down

0 comments on commit 52edf03

Please sign in to comment.