Skip to content

Commit

Permalink
backupccl: datadrivenify TestBackupRestoreUserDefinedSchemas
Browse files Browse the repository at this point in the history
This tests previously started 8 test clusters, now it starts
just 2!

Informs: cockroachdb#77129

Release note: None
  • Loading branch information
adityamaru committed Mar 22, 2022
1 parent bf01e1a commit 91e0f63
Show file tree
Hide file tree
Showing 2 changed files with 640 additions and 345 deletions.
345 changes: 0 additions & 345 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,351 +1675,6 @@ func TestBackupRestoreControlJob(t *testing.T) {
})
}

func TestBackupRestoreUserDefinedSchemas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This test takes a full backup and an incremental backup with revision
// history at certain timestamps, then restores to each of the timestamps to
// ensure that the types restored are correct.
t.Run("revision-history", func(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

var ts1, ts2, ts3, ts4, ts5, ts6 string
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE SCHEMA sc2;`)
sqlDB.Exec(t, `CREATE TABLE d.sc.t1 (x int);`)
sqlDB.Exec(t, `CREATE TABLE d.sc2.t1 (x bool);`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts1)

sqlDB.Exec(t, `ALTER SCHEMA sc RENAME TO sc3;`)
sqlDB.Exec(t, `ALTER SCHEMA sc2 RENAME TO sc;`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts2)

sqlDB.Exec(t, `DROP TABLE sc.t1;`)
sqlDB.Exec(t, `DROP TABLE sc3.t1;`)
sqlDB.Exec(t, `DROP SCHEMA sc;`)
sqlDB.Exec(t, `DROP SCHEMA sc3;`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts3)

sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.t1 (a STRING);
`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts4)
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/rev-history-backup' WITH revision_history`)

sqlDB.Exec(t, `DROP TABLE sc.t1;`)
sqlDB.Exec(t, `DROP SCHEMA sc;
`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts5)

sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.t1 (a FLOAT);`)
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts6)
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/rev-history-backup' WITH revision_history`)

t.Run("ts1", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts1)
sqlDB.Exec(t, "INSERT INTO d.sc.t1 VALUES (1)")
sqlDB.Exec(t, "INSERT INTO d.sc2.t1 VALUES (true)")
sqlDB.Exec(t, "USE d; CREATE SCHEMA unused;")
})
t.Run("ts2", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts2)
sqlDB.Exec(t, "INSERT INTO d.sc3.t1 VALUES (1)")
sqlDB.Exec(t, "INSERT INTO d.sc.t1 VALUES (true)")
})
t.Run("ts3", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts3)
sqlDB.Exec(t, "USE d")
sqlDB.Exec(t, "CREATE SCHEMA sc")
sqlDB.Exec(t, "CREATE SCHEMA sc3;")
})
t.Run("ts4", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts4)
sqlDB.Exec(t, "INSERT INTO d.sc.t1 VALUES ('hello')")
})
t.Run("ts5", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts5)
sqlDB.Exec(t, "USE d")
sqlDB.Exec(t, "CREATE SCHEMA sc")
})
t.Run("ts6", func(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE d;")
sqlDB.Exec(t, "RESTORE DATABASE d FROM 'nodelocal://0/rev-history-backup' AS OF SYSTEM TIME "+ts6)
sqlDB.Exec(t, `INSERT INTO d.sc.t1 VALUES (123.123)`)
})
})

// Tests full cluster backup/restore with user defined schemas.
t.Run("full-cluster", func(t *testing.T) {
_, sqlDB, dataDir, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()
sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA unused;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES (1);`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES ('hello');`)
// Now backup the full cluster.
sqlDB.Exec(t, `BACKUP TO 'nodelocal://0/test/'`)
// Start a new server that shares the data directory.
_, sqlDBRestore, cleanupRestore := backupRestoreTestSetupEmpty(t, singleNode, dataDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupRestore()

// Restore into the new cluster.
sqlDBRestore.Exec(t, `RESTORE FROM 'nodelocal://0/test/'`)

// Check that we can resolve all names through the user defined schema.
sqlDBRestore.CheckQueryResults(t, `SELECT * FROM d.sc.tb1`, [][]string{{"1"}})
sqlDBRestore.CheckQueryResults(t, `SELECT * FROM d.sc.tb2`, [][]string{{"hello"}})
sqlDBRestore.CheckQueryResults(t, `SELECT 'hello'::d.sc.typ1`, [][]string{{"hello"}})

// We shouldn't be able to create a new schema with the same name.
sqlDBRestore.ExpectErr(t, `pq: schema "sc" already exists`, `USE d; CREATE SCHEMA sc`)
sqlDBRestore.ExpectErr(t, `pq: schema "unused" already exists`, `USE d; CREATE SCHEMA unused`)
})

// Tests restoring databases with user defined schemas.
t.Run("database", func(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE SCHEMA unused;`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES (1);`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES ('hello');`)
// Backup the database.
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/test/'`)

// Drop the database and restore into it.
sqlDB.Exec(t, `DROP DATABASE d`)
sqlDB.Exec(t, `RESTORE DATABASE d FROM 'nodelocal://0/test/'`)

// Check that we can resolve all names through the user defined schema.
sqlDB.CheckQueryResults(t, `SELECT * FROM d.sc.tb1`, [][]string{{"1"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM d.sc.tb2`, [][]string{{"hello"}})
sqlDB.CheckQueryResults(t, `SELECT 'hello'::d.sc.typ1`, [][]string{{"hello"}})

// We shouldn't be able to create a new schema with the same name.
sqlDB.ExpectErr(t, `pq: schema "sc" already exists`, `USE d; CREATE SCHEMA sc`)
sqlDB.ExpectErr(t, `pq: schema "unused" already exists`, `USE d; CREATE SCHEMA unused`)
})

// Tests backing up and restoring all tables in requested user defined
// schemas.
t.Run("all-tables-in-requested-schema", func(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `CREATE TABLE table_in_data (x INT);`)

sqlDB.Exec(t, `CREATE SCHEMA data;`)
sqlDB.Exec(t, `CREATE TABLE data.tb1 (x INT);`)

sqlDB.Exec(t, `CREATE DATABASE foo;`)
sqlDB.Exec(t, `USE foo;`)
sqlDB.Exec(t, `CREATE SCHEMA schema_in_foo;`)
sqlDB.Exec(t, `CREATE TABLE schema_in_foo.tb1 (x INT);`)

sqlDB.Exec(t, `CREATE SCHEMA schema_in_foo2;`)
sqlDB.Exec(t, `CREATE TABLE schema_in_foo2.tb1 (x INT);`)

sqlDB.Exec(t, `CREATE SCHEMA foo;`)
sqlDB.Exec(t, `CREATE TABLE foo.tb1 (x INT);`)

sqlDB.Exec(t, `CREATE TABLE tb2 (y INT);`)

for _, tc := range []struct {
name string
target string
expectedTablesInBackup [][]string
}{
{
name: "fully-qualified-target",
target: "foo.schema_in_foo.*",
expectedTablesInBackup: [][]string{{"schema_in_foo", "tb1"}},
},
{
name: "schema-qualified-target",
target: "schema_in_foo.*",
expectedTablesInBackup: [][]string{{"schema_in_foo", "tb1"}},
},
{
name: "schema-qualified-target-with-identical-name-as-curdb",
target: "foo.*",
expectedTablesInBackup: [][]string{{"foo", "tb1"}},
},
{
name: "curdb-public-schema-target",
target: "*",
expectedTablesInBackup: [][]string{{"public", "tb2"}},
},
{
name: "cross-db-qualified-target",
target: "data.*",
expectedTablesInBackup: [][]string{{"data", "tb1"}, {"public", "bank"}, {"public", "table_in_data"}},
},
} {
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TABLE %s TO 'nodelocal://0/%s'`, tc.target, tc.name))
sqlDB.Exec(t, `CREATE DATABASE restore`)
sqlDB.Exec(t, fmt.Sprintf(`RESTORE TABLE %s FROM 'nodelocal://0/%s' WITH into_db='restore'`, tc.target, tc.name))
sqlDB.CheckQueryResults(t, `SELECT schema_name,
table_name from [SHOW TABLES FROM restore] ORDER BY schema_name, table_name`, tc.expectedTablesInBackup)
sqlDB.Exec(t, `DROP DATABASE restore CASCADE`)
}
})

// Test restoring tables with user defined schemas when restore schemas are
// not being remapped.
t.Run("no-remap", func(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb2 (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc.tb2 VALUES (1);`)
{
// We have to qualify the table correctly to back it up. d.tb1 resolves
// to d.public.tb1.
sqlDB.ExpectErr(t, `pq: failed to resolve targets specified in the BACKUP stmt: table "d.tb1" does not exist`, `BACKUP TABLE d.tb1 TO 'nodelocal://0/test/'`)
// Backup tb1.
sqlDB.Exec(t, `BACKUP TABLE d.sc.tb1 TO 'nodelocal://0/test/'`)
// Create a new database to restore into. This restore should restore the
// schema sc into the new database.
sqlDB.Exec(t, `CREATE DATABASE d2`)

// We must properly qualify the table name when restoring as well.
sqlDB.ExpectErr(t, `pq: failed to resolve targets in the BACKUP location specified by the RESTORE stmt, use SHOW BACKUP to find correct targets: table "d.tb1" does not exist`, `RESTORE TABLE d.tb1 FROM 'nodelocal://0/test/' WITH into_db = 'd2'`)

sqlDB.Exec(t, `RESTORE TABLE d.sc.tb1 FROM 'nodelocal://0/test/' WITH into_db = 'd2'`)

// Check that we can resolve all names through the user defined schema.
sqlDB.CheckQueryResults(t, `SELECT * FROM d2.sc.tb1`, [][]string{{"hello"}})
sqlDB.CheckQueryResults(t, `SELECT 'hello'::d2.sc.typ1`, [][]string{{"hello"}})

// We shouldn't be able to create a new schema with the same name.
sqlDB.ExpectErr(t, `pq: schema "sc" already exists`, `USE d2; CREATE SCHEMA sc`)
}

{
// Test that we can * expand schema prefixed names. Create a new backup
// with all the tables in d.sc.
sqlDB.Exec(t, `BACKUP TABLE d.sc.* TO 'nodelocal://0/test2/'`)
// Create a new database to restore into.
sqlDB.Exec(t, `CREATE DATABASE d3`)
sqlDB.Exec(t, `RESTORE TABLE d.sc.* FROM 'nodelocal://0/test2/' WITH into_db = 'd3'`)

// Check that we can resolve all names through the user defined schema.
sqlDB.CheckQueryResults(t, `SELECT * FROM d3.sc.tb1`, [][]string{{"hello"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM d3.sc.tb2`, [][]string{{"1"}})
sqlDB.CheckQueryResults(t, `SELECT 'hello'::d3.sc.typ1`, [][]string{{"hello"}})

// We shouldn't be able to create a new schema with the same name.
sqlDB.ExpectErr(t, `pq: schema "sc" already exists`, `USE d3; CREATE SCHEMA sc`)
}
})

// Test restoring tables with user defined schemas when restore schemas are
// not being remapped. Like no-remap but with more databases and schemas.
t.Run("multi-schemas", func(t *testing.T) {
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()
kvDB := tc.Server(0).DB()

sqlDB.Exec(t, `CREATE DATABASE d1;`)
sqlDB.Exec(t, `USE d1;`)
sqlDB.Exec(t, `CREATE SCHEMA sc1;`)
sqlDB.Exec(t, `CREATE TABLE sc1.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc1.tb VALUES (1);`)
sqlDB.Exec(t, `CREATE SCHEMA sc2;`)
sqlDB.Exec(t, `CREATE TABLE sc2.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc2.tb VALUES (2);`)

sqlDB.Exec(t, `CREATE DATABASE d2;`)
sqlDB.Exec(t, `USE d2;`)
sqlDB.Exec(t, `CREATE SCHEMA sc3;`)
sqlDB.Exec(t, `CREATE TABLE sc3.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc3.tb VALUES (3);`)
sqlDB.Exec(t, `CREATE SCHEMA sc4;`)
sqlDB.Exec(t, `CREATE TABLE sc4.tb (x INT);`)
sqlDB.Exec(t, `INSERT INTO sc4.tb VALUES (4);`)
{
// Backup all databases.
sqlDB.Exec(t, `BACKUP DATABASE d1, d2 TO 'nodelocal://0/test/'`)
// Create a new database to restore into. This restore should restore the
// schemas into the new database.
sqlDB.Exec(t, `CREATE DATABASE newdb`)
// Create a schema and table in the database to restore into, unrelated to
// the restore.
sqlDB.Exec(t, `USE newdb`)
sqlDB.Exec(t, `CREATE SCHEMA existingschema`)
sqlDB.Exec(t, `CREATE TABLE existingschema.tb (x INT)`)
sqlDB.Exec(t, `INSERT INTO existingschema.tb VALUES (0)`)

sqlDB.Exec(t, `RESTORE TABLE d1.sc1.*, d1.sc2.*, d2.sc3.*, d2.sc4.* FROM 'nodelocal://0/test/' WITH into_db = 'newdb'`)

// Check that we can resolve all names through the user defined schemas.
sqlDB.CheckQueryResults(t, `SELECT * FROM newdb.sc1.tb`, [][]string{{"1"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM newdb.sc2.tb`, [][]string{{"2"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM newdb.sc3.tb`, [][]string{{"3"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM newdb.sc4.tb`, [][]string{{"4"}})

// Check that name resolution still works for the preexisting schema.
sqlDB.CheckQueryResults(t, `SELECT * FROM newdb.existingschema.tb`, [][]string{{"0"}})
}

// Verify that the schemas are in the database's schema map.
dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, keys.SystemSQLCodec, "newdb")
require.Contains(t, dbDesc.DatabaseDesc().Schemas, "sc1")
require.Contains(t, dbDesc.DatabaseDesc().Schemas, "sc2")
require.Contains(t, dbDesc.DatabaseDesc().Schemas, "sc3")
require.Contains(t, dbDesc.DatabaseDesc().Schemas, "sc4")
require.Contains(t, dbDesc.DatabaseDesc().Schemas, "existingschema")
require.Len(t, dbDesc.DatabaseDesc().Schemas, 6)
})
// Test when we remap schemas to existing schemas in the cluster.
t.Run("remap", func(t *testing.T) {
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()

sqlDB.Exec(t, `CREATE DATABASE d;`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE SCHEMA sc;`)
sqlDB.Exec(t, `CREATE TYPE sc.typ1 AS ENUM ('hello');`)
sqlDB.Exec(t, `CREATE TABLE sc.tb1 (x sc.typ1);`)
sqlDB.Exec(t, `INSERT INTO sc.tb1 VALUES ('hello');`)
// Take a backup.
sqlDB.Exec(t, `BACKUP TABLE d.sc.tb1 TO 'nodelocal://0/test/'`)
// Now drop the table.
sqlDB.Exec(t, `DROP TABLE d.sc.tb1`)
// Restoring the table should restore into d.sc.
sqlDB.Exec(t, `RESTORE TABLE d.sc.tb1 FROM 'nodelocal://0/test/'`)
sqlDB.CheckQueryResults(t, `SELECT * FROM d.sc.tb1`, [][]string{{"hello"}})
})
}

func TestBackupRestoreUserDefinedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 91e0f63

Please sign in to comment.