diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 3fd381b7ee02..650e52955b6f 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1870,285 +1870,6 @@ func TestBackupRestoreControlJob(t *testing.T) { }) } -func TestRestoreFailCleansUpTTLSchedules(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - injectedErr := errors.New("injected error") - testCases := []struct { - desc string - beforePublishingDescriptors func() error - afterPublishingDescriptors func() error - }{ - { - desc: "error before publishing descriptors", - beforePublishingDescriptors: func() error { - return injectedErr - }, - }, - { - desc: "error after publishing descriptors", - afterPublishingDescriptors: func() error { - return injectedErr - }, - }, - } - - ctx := context.Background() - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - c, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - for _, server := range c.Servers { - registry := server.JobRegistry().(*jobs.Registry) - registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ - jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*restoreResumer) - r.testingKnobs.beforePublishingDescriptors = tc.beforePublishingDescriptors - r.testingKnobs.afterPublishingDescriptors = tc.afterPublishingDescriptors - return r - }, - } - } - - // Create a database with a TTL table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes') -`) - - // Backup d.tb. - sqlDB.Exec(t, `BACKUP DATABASE d TO $1`, localFoo) - - // Drop d so that it can be restored. - sqlDB.Exec(t, `DROP DATABASE d`) - - // Attempt the restore and check it fails. - _, err := sqlDB.DB.ExecContext(ctx, `RESTORE DATABASE d FROM $1`, localFoo) - require.Error(t, err) - require.Regexp(t, injectedErr.Error(), err.Error()) - - var count int - sqlDB.QueryRow(t, `SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'`).Scan(&count) - require.Equal(t, 0, count) - }) - } -} - -func TestRestoreFailCleansUpTypeBackReferences(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - _, sqlDB, dir, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - - dir = dir + "/foo" - - // Create a database with a type and table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TYPE d.ty AS ENUM ('hello'); -CREATE TABLE d.tb (x d.ty); -INSERT INTO d.tb VALUES ('hello'), ('hello'); -`) - - // Backup d.tb. - sqlDB.Exec(t, `BACKUP TABLE d.tb TO $1`, localFoo) - - // Drop d.tb so that it can be restored. - sqlDB.Exec(t, `DROP TABLE d.tb`) - - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - - // We should get an error when restoring the table. - sqlDB.ExpectErr(t, "sst: no such file", `RESTORE d.tb FROM $1`, localFoo) - - // The failed restore should clean up type back references so that we are able - // to drop d.ty. - sqlDB.Exec(t, `DROP TYPE d.ty`) -} - -// TestRestoreFailCleanup tests that a failed RESTORE is cleaned up. -func TestRestoreFailCleanup(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - params := base.TestServerArgs{} - // Disable GC job so that the final check of crdb_internal.tables is - // guaranteed to not be cleaned up. Although this was never observed by a - // stress test, it is here for safety. - blockGC := make(chan struct{}) - params.Knobs.GCJob = &sql.GCJobTestingKnobs{ - RunBeforeResume: func(_ jobspb.JobID) error { - <-blockGC - return nil - }, - } - - const numAccounts = 1000 - tc, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, - InitManualReplication, base.TestClusterArgs{ServerArgs: params}) - defer cleanup() - kvDB := tc.Server(0).DB() - - dir = dir + "/foo" - - sqlDB.Exec(t, `CREATE DATABASE restore`) - - // Create a user defined type and check that it is cleaned up after the - // failed restore. - sqlDB.Exec(t, `CREATE TYPE data.myenum AS ENUM ('hello')`) - // Do the same with a user defined schema. - sqlDB.Exec(t, `USE data; CREATE SCHEMA myschema`) - - sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, localFoo) - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - sqlDB.ExpectErr( - t, "sst: no such file", - `RESTORE data.* FROM $1 WITH OPTIONS (into_db='restore')`, localFoo, - ) - // Verify the failed RESTORE added some DROP tables. - sqlDB.CheckQueryResults(t, - `SELECT name FROM crdb_internal.tables WHERE database_name = 'restore' AND state = 'DROP'`, - [][]string{{"bank"}}, - ) - - // Verify that `myenum` was cleaned out from the failed restore. There should - // only be one namespace entry (data.myenum). - sqlDB.CheckQueryResults(t, `SELECT count(*) FROM system.namespace WHERE name = 'myenum'`, [][]string{{"1"}}) - // Check the same for data.myschema. - sqlDB.CheckQueryResults(t, `SELECT count(*) FROM system.namespace WHERE name = 'myschema'`, [][]string{{"1"}}) - - // Verify that the only schema that appears is the public schema - dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, keys.SystemSQLCodec, "restore") - require.Equal(t, len(dbDesc.DatabaseDesc().Schemas), 1) - if _, found := dbDesc.DatabaseDesc().Schemas[tree.PublicSchema]; !found { - t.Error("public schema not found") - } -} - -// TestRestoreFailDatabaseCleanup tests that a failed RESTORE is cleaned up -// when restoring an entire database. -func TestRestoreFailDatabaseCleanup(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - params := base.TestServerArgs{} - const numAccounts = 1000 - _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, - InitManualReplication, base.TestClusterArgs{ServerArgs: params}) - defer cleanup() - - dir = dir + "/foo" - - // Create a user defined type and check that it is cleaned up after the - // failed restore. - sqlDB.Exec(t, `CREATE TYPE data.myenum AS ENUM ('hello')`) - // Do the same with a user defined schema. - sqlDB.Exec(t, `USE data; CREATE SCHEMA myschema`) - - sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, localFoo) - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - sqlDB.Exec(t, `DROP DATABASE data`) - sqlDB.ExpectErr( - t, "sst: no such file", - `RESTORE DATABASE data FROM $1`, localFoo, - ) - sqlDB.ExpectErr( - t, `database "data" does not exist`, - `DROP DATABASE data`, - ) -} - -func TestRestoreFailCleansUpTempSystemDatabase(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - _, sqlDB, dir, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - - // Create a database with a type and table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TYPE d.ty AS ENUM ('hello'); -CREATE TABLE d.tb (x d.ty); -INSERT INTO d.tb VALUES ('hello'), ('hello'); -`) - - // Cluster BACKUP. - sqlDB.Exec(t, `BACKUP TO $1`, localFoo) - - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir+"/foo", func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - - _, sqlDBRestore, cleanupRestore := backupRestoreTestSetupEmpty(t, singleNode, dir, InitManualReplication, - base.TestClusterArgs{}) - defer cleanupRestore() - var defaultDBID int - sqlDBRestore.QueryRow( - t, "SELECT id FROM system.namespace WHERE name = 'defaultdb'", - ).Scan(&defaultDBID) - - // We should get an error when restoring the table. - sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM $1`, localFoo) - - // Make sure the temp system db is not present. - row := sqlDBRestore.QueryStr(t, fmt.Sprintf(`SELECT * FROM [SHOW DATABASES] WHERE database_name = '%s'`, restoreTempSystemDB)) - require.Equal(t, 0, len(row)) - - // Make sure defaultdb and postgres are recreated with new IDs. - sqlDBRestore.CheckQueryResults(t, - fmt.Sprintf(` -SELECT name - FROM system.namespace - WHERE "parentID" = 0 AND id > %d`, defaultDBID, - ), [][]string{{"defaultdb"}, {"postgres"}}) -} - func TestBackupRestoreUserDefinedSchemas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index 5d07b9954df4..757b6080b930 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -19,11 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -68,18 +70,22 @@ type sqlDBKey struct { } type datadrivenTestState struct { - servers map[string]serverutils.TestServerInterface - dataDirs map[string]string - sqlDBs map[sqlDBKey]*gosql.DB - noticeBuffer []string - cleanupFns []func() + servers map[string]serverutils.TestServerInterface + dataDirs map[string]string + sqlDBs map[sqlDBKey]*gosql.DB + jobTags map[string]jobspb.JobID + clusterTimestamps map[string]string + noticeBuffer []string + cleanupFns []func() } func newDatadrivenTestState() datadrivenTestState { return datadrivenTestState{ - servers: make(map[string]serverutils.TestServerInterface), - dataDirs: make(map[string]string), - sqlDBs: make(map[sqlDBKey]*gosql.DB), + servers: make(map[string]serverutils.TestServerInterface), + dataDirs: make(map[string]string), + sqlDBs: make(map[sqlDBKey]*gosql.DB), + jobTags: make(map[string]jobspb.JobID), + clusterTimestamps: make(map[string]string), } } @@ -96,16 +102,21 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) { d.noticeBuffer = nil } -func (d *datadrivenTestState) addServer( - t *testing.T, - name, iodir, tempCleanupFrequency string, - ioConf base.ExternalIODirConfig, - localities string, -) error { +type serverCfg struct { + name string + iodir string + tempCleanupFrequency string + nodes int + splits int + ioConf base.ExternalIODirConfig + localities string +} + +func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { var tc serverutils.TestClusterInterface var cleanup func() params := base.TestClusterArgs{} - params.ServerArgs.ExternalIODirConfig = ioConf + params.ServerArgs.ExternalIODirConfig = cfg.ioConf params.ServerArgs.Knobs = base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } @@ -113,8 +124,8 @@ func (d *datadrivenTestState) addServer( settings := cluster.MakeTestingClusterSettings() closedts.TargetDuration.Override(context.Background(), &settings.SV, 10*time.Millisecond) closedts.SideTransportCloseInterval.Override(context.Background(), &settings.SV, 10*time.Millisecond) - if tempCleanupFrequency != "" { - duration, err := time.ParseDuration(tempCleanupFrequency) + if cfg.tempCleanupFrequency != "" { + duration, err := time.ParseDuration(cfg.tempCleanupFrequency) if err != nil { return errors.New("unable to parse tempCleanupFrequency during server creation") } @@ -123,10 +134,10 @@ func (d *datadrivenTestState) addServer( } params.ServerArgs.Settings = settings - clusterSize := singleNode + clusterSize := cfg.nodes - if localities != "" { - cfgs := strings.Split(localities, ",") + if cfg.localities != "" { + cfgs := strings.Split(cfg.localities, ",") clusterSize = len(cfgs) serverArgsPerNode := make(map[int]base.TestServerArgs) for i, cfg := range cfgs { @@ -136,13 +147,15 @@ func (d *datadrivenTestState) addServer( } params.ServerArgsPerNode = serverArgsPerNode } - if iodir == "" { - tc, _, iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, 0, InitManualReplication, params) + if cfg.iodir == "" { + tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits, + InitManualReplication, params) } else { - tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, iodir, InitManualReplication, params) + tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, cfg.iodir, + InitManualReplication, params) } - d.servers[name] = tc.Server(0) - d.dataDirs[name] = iodir + d.servers[cfg.name] = tc.Server(0) + d.dataDirs[cfg.name] = cfg.iodir d.cleanupFns = append(d.cleanupFns, cleanup) return nil @@ -187,26 +200,97 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // commands. The test files are in testdata/backup-restore. The following // syntax is provided: // -// - "new-server name= [share-io-dir=]" -// Create a new server with the input name. It takes in an optional -// share-io-dir argument to share an IO directory with an existing server. -// This is useful when restoring from a backup taken in another server. +// - "new-server name= [args]" +// Create a new server with the input name. +// +// Supported arguments: +// +// + share-io-dir: can be specified to share an IO directory with an existing +// server. This is useful when restoring from a backup taken in another +// server. +// +// + allow-implicit-access: can be specified to set +// `EnableNonAdminImplicitAndArbitraryOutbound` to true +// +// + disable-http: disables use of external HTTP endpoints. +// +// + temp-cleanup-freq: specifies the frequency with which the temporary table +// cleanup reconciliation job runs // -// - "exec-sql server=" +// + localities: specifies the localities that will be used when starting up +// the test cluster. The cluster will have len(localities) nodes, with each +// node assigned a locality config corresponding to the locality. Please +// update the `localityCfgs` map when adding new localities. +// +// + nodes: specifies the number of nodes in the test cluster. +// +// + splits: specifies the number of ranges the bank table is split into. +// +// - "exec-sql [server=] [user=] [args]" // Executes the input SQL query on the target server. By default, server is // the last created server. // -// Supported options: +// Supported arguments: +// +// + expect-error-ignore: expects the query to return an error, but we will +// ignore it. // -// - "query-sql server=" +// - "query-sql [server=] [user=]" // Executes the input SQL query and print the results. // // - "reset" // Clear all state associated with the test. +// +// - "job" [server=] [user=] [args] +// Executes job specific operations. +// +// Supported arguments: +// +// + cancel=: cancels the job referenced by the tag and waits for it to +// reach a CANCELED state. +// +// - "save-cluster-ts" tag= +// Saves the `SELECT cluster_logical_timestamp()` with the tag. Can be used +// in the future with intstructions such as `aost`. +// +// - "backup" [args] +// Executes backup specific operations. +// +// Supported arguments: +// +// + tag=: tag the backup job to reference it in the future. +// +// + expect-pausepoint: expects the backup job to end up in a paused state because +// of a pausepoint error. +// +// - "restore" [args] +// Executes restore specific operations. +// +// Supported arguments: +// +// + tag=: tag the restore job to reference it in the future. +// +// + expect-pausepoint: expects the restore job to end up in a paused state because +// of a pausepoint error. +// +// + aost: expects a tag referencing a previously saved cluster timestamp +// using `save-cluster-ts`. It then runs the restore as of the saved cluster +// timestamp. +// +// - "schema" [args] +// Executes schema change specific operations. +// +// Supported arguments: +// +// + tag=: tag the schema change job to reference it in the future. +// +// + expect-pausepoint: expects the schema change job to end up in a paused state because +// of a pausepoint error. func TestDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderRace(t, "takes >1 min under race") + + skip.UnderRace(t, "takes ~3mins to run") // This test uses this mock HTTP server to pass the backup files between tenants. httpAddr, httpServerCleanup := makeInsecureHTTPServer(t) @@ -218,6 +302,7 @@ func TestDataDriven(t *testing.T) { ds := newDatadrivenTestState() defer ds.cleanup(ctx) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { case "sleep": var sleepDuration string @@ -228,12 +313,16 @@ func TestDataDriven(t *testing.T) { } time.Sleep(duration) return "" + case "reset": ds.cleanup(ctx) ds = newDatadrivenTestState() return "" + case "new-server": var name, shareDirWith, iodir, tempCleanupFrequency, localities string + var splits int + nodes := singleNode var io base.ExternalIODirConfig d.ScanArgs(t, "name", &name) if d.HasArg("share-io-dir") { @@ -254,12 +343,28 @@ func TestDataDriven(t *testing.T) { if d.HasArg("localities") { d.ScanArgs(t, "localities", &localities) } + if d.HasArg("nodes") { + d.ScanArgs(t, "nodes", &nodes) + } + if d.HasArg("splits") { + d.ScanArgs(t, "splits", &splits) + } lastCreatedServer = name - err := ds.addServer(t, name, iodir, tempCleanupFrequency, io, localities) + cfg := serverCfg{ + name: name, + iodir: iodir, + tempCleanupFrequency: tempCleanupFrequency, + nodes: nodes, + splits: splits, + ioConf: io, + localities: localities, + } + err := ds.addServer(t, cfg) if err != nil { return err.Error() } return "" + case "exec-sql": server := lastCreatedServer user := "root" @@ -273,13 +378,36 @@ func TestDataDriven(t *testing.T) { d.Input = strings.ReplaceAll(d.Input, "http://COCKROACH_TEST_HTTP_SERVER/", httpAddr) _, err := ds.getSQLDB(t, server, user).Exec(d.Input) ret := ds.noticeBuffer + + // Check if we are expecting a pausepoint error. + if d.HasArg("expect-pausepoint") { + require.NotNilf(t, err, "expected pause point error") + require.True(t, strings.Contains(err.Error(), "job requested it be paused")) + + // Find job ID of the pausepoint job. + var jobID jobspb.JobID + require.NoError(t, + ds.getSQLDB(t, server, user).QueryRow( + `SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1`).Scan(&jobID)) + fmt.Printf("expecting pausepoint, found job ID %d\n\n\n", jobID) + + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + jobutils.WaitForJobToPause(t, runner, jobID) + ret = append(ds.noticeBuffer, "job paused at pausepoint") + ret = append(ret, "") + return strings.Join(ret, "\n") + } + + // Check if we are expecting an error, and want to ignore outputting it. + if d.HasArg("expect-error-ignore") { + require.NotNilf(t, err, "expected error") + ret = append(ret, "ignoring expected error") + return strings.Join(ret, "\n") + } + + // Check for other errors. if err != nil { - // pausepoint errors have the job ID in them, and datadriven tests - // don't seem to support regex matching. Clean the error up to not - // include the job ID. - if i := strings.Index(err.Error(), "paused before it completed with reason"); i != -1 { - ret = append(ds.noticeBuffer, err.Error()[i:]) - } else if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { + if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { ret = append(ds.noticeBuffer, err.Error()) if pqErr.Detail != "" { ret = append(ret, "DETAIL: "+pqErr.Detail) @@ -292,6 +420,7 @@ func TestDataDriven(t *testing.T) { } } return strings.Join(ret, "\n") + case "query-sql": server := lastCreatedServer user := "root" @@ -308,9 +437,180 @@ func TestDataDriven(t *testing.T) { output, err := sqlutils.RowsToDataDrivenOutput(rows) require.NoError(t, err) return output + + case "backup": + server := lastCreatedServer + user := "root" + jobType := "BACKUP" + + // First, run the backup. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "restore": + server := lastCreatedServer + user := "root" + jobType := "RESTORE" + + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + var ts string + var ok bool + if ts, ok = ds.clusterTimestamps[aost]; !ok { + t.Fatalf("no cluster timestamp found for %s", aost) + } + + // Replace the ts tag with the actual timestamp. + d.Input = strings.Replace(d.Input, aost, + fmt.Sprintf("'%s'", ts), 1) + } + + // First, run the restore. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if the job must be run aost. + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "schema-change": + server := lastCreatedServer + user := "root" + jobType := "SCHEMA CHANGE" + + // First, run the schema change. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if the job must be run aost. + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "job": + server := lastCreatedServer + user := "root" + + if d.HasArg("cancel") { + var cancelJobTag string + d.ScanArgs(t, "cancel", &cancelJobTag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[cancelJobTag]; !ok { + t.Fatalf("could not find job with tag %s", cancelJobTag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + runner.Exec(t, `CANCEL JOB $1`, jobID) + jobutils.WaitForJobToCancel(t, runner, jobID) + } + return "" + + case "save-cluster-ts": + server := lastCreatedServer + user := "root" + var timestampTag string + d.ScanArgs(t, "tag", ×tampTag) + if _, ok := ds.clusterTimestamps[timestampTag]; ok { + t.Fatalf("cannot reuse cluster ts tag %s", timestampTag) + } + var ts string + err := ds.getSQLDB(t, server, user).QueryRow(`SELECT cluster_logical_timestamp()`).Scan(&ts) + require.NoError(t, err) + ds.clusterTimestamps[timestampTag] = ts + return "" + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } }) }) } + +// findMostRecentJobWithType returns the most recently created job of `job_type` +// jobType. +func findMostRecentJobWithType( + t *testing.T, ds datadrivenTestState, server, user string, jobType string, +) jobspb.JobID { + var jobID jobspb.JobID + require.NoError( + t, ds.getSQLDB(t, server, user).QueryRow( + fmt.Sprintf( + `SELECT job_id FROM [SHOW JOBS] WHERE job_type = '%s' ORDER BY created DESC LIMIT 1`, + jobType)).Scan(&jobID)) + return jobID +} + +// expectPausepoint waits for the job to hit a pausepoint and enter a paused +// state. +func expectPausepoint( + t *testing.T, err error, jobType, server, user string, ds datadrivenTestState, +) { + // Check if we are expecting a pausepoint error. + require.NotNilf(t, err, "expected pause point error") + + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + jobutils.WaitForJobToPause(t, runner, + findMostRecentJobWithType(t, ds, server, user, jobType)) +} + +// tagJob stores the jobID of the most recent job of `jobType`. Users can use +// the tag to refer to the job in the future. +func tagJob( + t *testing.T, server, user, jobType string, ds datadrivenTestState, d *datadriven.TestData, +) { + var jobTag string + d.ScanArgs(t, "tag", &jobTag) + if _, exists := ds.jobTags[jobTag]; exists { + t.Fatalf("failed to `tag`, job with tag %s already exists", jobTag) + } + ds.jobTags[jobTag] = findMostRecentJobWithType(t, ds, server, user, jobType) +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index aff04579bebd..ba24eb7271a8 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1356,6 +1356,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } p.ExecCfg().JobRegistry.NotifyToAdoptJobs() + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.after_publishing_descriptors"); err != nil { + return err + } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1492,6 +1496,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.after_publishing_descriptors"); err != nil { + return err + } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1723,6 +1731,11 @@ func (r *restoreResumer) publishDescriptors( if details.DescriptorsPublished { return nil } + + if err := execCfg.JobRegistry.CheckPausepoint("restore.before_publishing_descriptors"); err != nil { + return err + } + if fn := r.testingKnobs.beforePublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1941,6 +1954,7 @@ func emitRestoreJobEvent( func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext) r.execCfg = p.ExecCfg() + // Emit to the event log that the job has started reverting. emitRestoreJobEvent(ctx, p, jobs.StatusReverting, r.job) diff --git a/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors b/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors index 749c2dcf773f..55d8edce17fe 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors +++ b/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors @@ -1,4 +1,4 @@ -# backup-dropped-desctiprors tests backup and restore interaction with database, schema +# backup-dropped-descriptors tests backup and restore interaction with database, schema # and type descriptors in the DROP state. subtest dropped-database-descriptors @@ -10,9 +10,12 @@ SET use_declarative_schema_changer = 'off'; SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec'; CREATE DATABASE d; CREATE TABLE d.foo (id INT); +---- + +schema-change expect-pausepoint DROP DATABASE d CASCADE; ---- -paused before it completed with reason: pause point "schemachanger.before.exec" hit +job paused at pausepoint # At this point, we have a descriptor entry for `d` in a DROP state. query-sql @@ -118,16 +121,19 @@ SET use_declarative_schema_changer = 'off'; SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec'; ---- -exec-sql +schema-change expect-pausepoint DROP SCHEMA d2.s CASCADE; ---- -paused before it completed with reason: pause point "schemachanger.before.exec" hit +job paused at pausepoint exec-sql SET CLUSTER SETTING jobs.debug.pausepoints = 'typeschemachanger.before.exec'; +---- + +schema-change expect-pausepoint DROP TYPE d2.typ; ---- -paused before it completed with reason: pause point "typeschemachanger.before.exec" hit +job paused at pausepoint query-sql WITH tbls AS ( diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects new file mode 100644 index 000000000000..40f1ad2b4404 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects @@ -0,0 +1,187 @@ +new-server name=s1 nodes=1 splits=1000 +---- + +subtest restore-cleanup + +# Disable GC job so that the final check of crdb_internal.tables is +# guaranteed to not be cleaned up. Although this was never observed by a +# stress test, it is here for safety. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'gcjob.before_resume'; +---- + +# Create a user defined type and check that it is cleaned up after the +# failed restore. +exec-sql +CREATE DATABASE restore; +CREATE TYPE data.myenum AS ENUM ('hello'); +---- + +# Do the same with a user defined schema. +exec-sql +USE data; +CREATE SCHEMA myschema; +---- + +exec-sql +BACKUP DATABASE data INTO 'nodelocal://1/foo'; +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +---- + +restore expect-pausepoint tag=a +RESTORE data.* FROM LATEST IN 'nodelocal://1/foo' WITH into_db = 'restore'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=a +---- + +# Verify the cancelled RESTORE added some DROP tables. +query-sql +SELECT name FROM crdb_internal.tables WHERE database_name = 'restore' AND state = 'DROP' +---- +bank + +# Verify that `myenum` was cleaned out from the failed restore. There should +# only be one namespace entry (data.myenum). +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'myenum'; +---- +1 + +# Check the same for data.myschema. +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'myschema'; +---- +1 + +# Verify that the only schema that appears is the public schema. +query-sql +USE restore; +SHOW SCHEMAS; +---- +crdb_internal +information_schema +pg_catalog +pg_extension +public admin + +# Ensure that a database level restore also cleans up after itself. We do this +# by ensuring that no descriptor is left behind for the database `data` since we +# get a does not exist error when trying to drop it. +exec-sql +DROP DATABASE data; +---- + +restore expect-pausepoint tag=b +RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=b +---- + +# Ensure there is no database left behind. +exec-sql +DROP DATABASE data; +---- +pq: database "data" does not exist + +subtest end + +subtest restore-cleanup-type-back-references + +exec-sql +CREATE DATABASE d; +CREATE TYPE d.ty AS ENUM ('hello'); +CREATE TABLE d.tb (x d.ty); +INSERT INTO d.tb VALUES ('hello'), ('hello'); +---- + +exec-sql +BACKUP TABLE d.tb INTO 'nodelocal://1/bar'; +---- + +# Drop d.tb so that it can be restored. +exec-sql +DROP TABLE d.tb; +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +---- + +restore expect-pausepoint tag=c +RESTORE d.tb FROM LATEST IN 'nodelocal://1/bar'; +---- +job paused at pausepoint + + +# Cancel the job so that the cleanup hook runs. +job cancel=c +---- + +# The failed restore should clean up type back references so that we are able to +# drop d.ty. +exec-sql +DROP TYPE d.ty; +---- + +subtest end + +subtest restore-cleanup-temp-system-database + +exec-sql +CREATE DATABASE d2; +CREATE TYPE d2.ty AS ENUM ('hello'); +CREATE TABLE d2.tb (x d2.ty); +INSERT INTO d2.tb VALUES ('hello'), ('hello'); +---- + +exec-sql +BACKUP INTO 'nodelocal://1/cluster'; +---- + +# Start a new cluster with the same IO dir. +new-server name=s2 share-io-dir=s1 +---- + +# We pause the job before publishing descriptors to ensure that we are testing +# OnFailOrCancel's temporary system db cleanup logic. The restore Resume method +# anyways performs this cleanup after publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'; +---- + +# Get defaultdb ID. +exec-sql +USE system; +---- + +restore expect-pausepoint tag=d +RESTORE FROM LATEST IN 'nodelocal://1/cluster'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=d +---- + +# Make sure the temp system db is not present. +query-sql +SELECT * FROM [SHOW DATABASES] WHERE database_name = 'crdb_temp_system'; +---- + +# Make sure defaultdb and postgres are recreated with new IDs. +query-sql +SELECT name FROM system.namespace WHERE "parentID" = 0 AND "id" > 100; +---- +defaultdb +postgres + +subtest end diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl new file mode 100644 index 000000000000..531697252010 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl @@ -0,0 +1,73 @@ +# This test ensures that a failed or cancelled restore cleans up the ttl schedule +# that is created when resotring a table with a row level ttl configuration. + +new-server name=s1 nodes=1 +---- + +subtest restore-fail-cleans-up-ttl-schedules-before-publishing + +exec-sql +CREATE DATABASE d; +CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes'); +---- + +exec-sql +BACKUP DATABASE d INTO 'userfile:///foo' +---- + +# Attempt the restore but pause it before publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'; +DROP DATABASE d; +---- + +restore expect-pausepoint tag=a +RESTORE DATABASE d FROM LATEST IN 'userfile:///foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=a +---- + +query-sql +SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'; +---- +0 + +subtest end + + +subtest restore-fail-cleans-up-ttl-schedules-after-publishing + +exec-sql +CREATE DATABASE d; +CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes'); +---- + +exec-sql +BACKUP DATABASE d INTO 'userfile:///foo' +---- + +# Attempt the restore but pause it after publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +DROP DATABASE d; +---- + +restore expect-pausepoint tag=b +RESTORE DATABASE d FROM LATEST IN 'userfile:///foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=b +---- + +query-sql +SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'; +---- +0 + +subtest end diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 545d4e121a26..4f40ac9b6aa3 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -173,6 +173,11 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { + return err + } + if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { if err := fn(r.jobID); err != nil { return err diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 02582b17034e..1f1463fed5ed 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -46,7 +46,7 @@ func WaitForJobToPause(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) waitForJobToHaveStatus(t, db, jobID, jobs.StatusPaused) } -// WaitForJobToCancel waits for the specified job ID to be cancelled. +// WaitForJobToCancel waits for the specified job ID to be in a cancelled state. func WaitForJobToCancel(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { t.Helper() waitForJobToHaveStatus(t, db, jobID, jobs.StatusCanceled)