Skip to content

Commit

Permalink
Merge pull request #74760 from RichardJCai/backport21.1-73875
Browse files Browse the repository at this point in the history
release-21.1: restoreccl: insert system.namespace entry for synthetic public schemas during restore
  • Loading branch information
RichardJCai authored Jan 21, 2022
2 parents 52c7a67 + 487150a commit 3456591
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
67 changes: 67 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8348,3 +8348,70 @@ CREATE SCHEMA db.s;

sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`)
}

// Verify that upon restoring a database, there is a namespace entry for its
// public schema.
func TestRestoreSyntheticPublicSchemaNamespaceEntry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 100
params := base.TestClusterArgs{}

_, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitManualReplication, params)
defer cleanup()

sqlDB.Exec(t, "CREATE DATABASE d")
sqlDB.Exec(t, "BACKUP DATABASE d TO $1", LocalFoo)
sqlDB.Exec(t, "DROP DATABASE d")

sqlDB.Exec(t, fmt.Sprintf("RESTORE DATABASE d FROM '%s'", LocalFoo))

var dbID int
row := sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'd'`)
row.Scan(&dbID)

sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, dbID), [][]string{{"29"}})
}

// Verify that if a database restore fails, the cleanup removes the database's
// public namespace entry.
func TestRestoreSyntheticPublicSchemaNamespaceEntryCleanupOnFail(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 100
params := base.TestClusterArgs{}
_, _, _, dataDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()
_, tc, sqlDB, cleanup := backupRestoreTestSetupEmpty(t, singleNode, dataDir,
InitManualReplication, params)
defer cleanup()

for _, server := range tc.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.beforePublishingDescriptors = func() error {
return errors.New("boom")
}
return r
},
}
}

// Drop the default databases so only the system database remains.
sqlDB.Exec(t, "DROP DATABASE defaultdb")
sqlDB.Exec(t, "DROP DATABASE postgres")

sqlDB.Exec(t, "CREATE DATABASE d")
sqlDB.Exec(t, "BACKUP DATABASE d TO $1", LocalFoo)
sqlDB.Exec(t, "DROP DATABASE d")

restoreQuery := fmt.Sprintf("RESTORE DATABASE d FROM '%s'", LocalFoo)
sqlDB.ExpectErr(t, "boom", restoreQuery)

// We should have no non-system database with a public schema name space
// entry with id 29.
sqlDB.CheckQueryResults(t, `SELECT id FROM system.namespace WHERE name = 'public' AND id=29 AND "parentID"!=1`, [][]string{})
}
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func WriteDescriptors(
// behavior if the cluster version is bumped DURING a restore.
dKey := catalogkv.MakeDatabaseNameKey(ctx, settings, desc.GetName())
b.CPut(dKey.Key(codec), desc.GetID(), nil)

// We also have to put a system.namespace entry for the public schema.
b.CPut(catalogkeys.NewPublicSchemaKey(desc.GetID()).Key(codec), keys.PublicSchemaID, nil)
}

// Write namespace and descriptor entries for each schema.
Expand Down Expand Up @@ -2357,6 +2360,8 @@ func (r *restoreResumer) dropDescriptors(
descKey := catalogkeys.MakeDescMetadataKey(codec, db.GetID())
b.Del(descKey)
b.Del(catalogkeys.NewDatabaseKey(db.GetName()).Key(codec))
b.Del(catalogkeys.NewPublicSchemaKey(db.GetID()).Key(codec))

deletedDBs[db.GetID()] = struct{}{}
}

Expand Down

0 comments on commit 3456591

Please sign in to comment.