diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 66cba7e3db66..4affe116c349 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -9005,3 +9005,70 @@ insert into A (a) VALUES ('foo'); checkRows() }) } + +// Verify that upon restoring a database, there is a namespace entry for its +// public schema. +func TestRestoreSyntheticPublicSchemaNamespaceEntry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + const numAccounts = 100 + params := base.TestClusterArgs{} + + _, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, + InitManualReplication, params) + defer cleanup() + + sqlDB.Exec(t, "CREATE DATABASE d") + sqlDB.Exec(t, "BACKUP DATABASE d TO $1", LocalFoo) + sqlDB.Exec(t, "DROP DATABASE d") + + sqlDB.Exec(t, fmt.Sprintf("RESTORE DATABASE d FROM '%s'", LocalFoo)) + + var dbID int + row := sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'd'`) + row.Scan(&dbID) + + sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, dbID), [][]string{{"29"}}) +} + +// Verify that if a database restore fails, the cleanup removes the database's +// public namespace entry. +func TestRestoreSyntheticPublicSchemaNamespaceEntryCleanupOnFail(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + const numAccounts = 100 + params := base.TestClusterArgs{} + _, _, _, dataDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication) + defer cleanupFn() + _, tc, sqlDB, cleanup := backupRestoreTestSetupEmpty(t, singleNode, dataDir, + InitManualReplication, params) + defer cleanup() + + for _, server := range tc.Servers { + registry := server.JobRegistry().(*jobs.Registry) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.beforePublishingDescriptors = func() error { + return errors.New("boom") + } + return r + }, + } + } + + // Drop the default databases so only the system database remains. + sqlDB.Exec(t, "DROP DATABASE defaultdb") + sqlDB.Exec(t, "DROP DATABASE postgres") + + sqlDB.Exec(t, "CREATE DATABASE d") + sqlDB.Exec(t, "BACKUP DATABASE d TO $1", LocalFoo) + sqlDB.Exec(t, "DROP DATABASE d") + + restoreQuery := fmt.Sprintf("RESTORE DATABASE d FROM '%s'", LocalFoo) + sqlDB.ExpectErr(t, "boom", restoreQuery) + + // We should have no non-system database with a public schema name space + // entry with id 29. + sqlDB.CheckQueryResults(t, `SELECT id FROM system.namespace WHERE name = 'public' AND id=29 AND "parentID"!=1`, [][]string{}) +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 9006d7811463..fb352e33aefa 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -139,6 +139,9 @@ func WriteDescriptors( return err } b.CPut(catalogkeys.EncodeNameKey(codec, desc), desc.GetID(), nil) + + // We also have to put a system.namespace entry for the public schema. + b.CPut(catalogkeys.MakeSchemaNameKey(codec, desc.GetID(), tree.PublicSchema), keys.PublicSchemaID, nil) } // Write namespace and descriptor entries for each schema. @@ -2169,6 +2172,9 @@ func (r *restoreResumer) dropDescriptors( descKey := catalogkeys.MakeDescMetadataKey(codec, db.GetID()) b.Del(descKey) + + b.Del(catalogkeys.MakeSchemaNameKey(codec, db.GetID(), tree.PublicSchema)) + nameKey := catalogkeys.MakeDatabaseNameKey(codec, db.GetName()) b.Del(nameKey) descsCol.AddDeletedDescriptor(db)