From 1ba873ac0e2c32525ece5c479713d5b02879ec89 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 24 Mar 2022 21:29:27 +0530 Subject: [PATCH] backupccl: datadrivenify the restore OnFailOrCancel tests This change ports all the "cleanup on failed or canceled restore" tests to the datadriven framework. To make this possible new instructions have been added to the datadriven framework, please refer to the header comment for details about each one. Informs: #77129 Release note: None --- pkg/ccl/backupccl/backup_test.go | 279 ------------- pkg/ccl/backupccl/datadriven_test.go | 380 ++++++++++++++++-- pkg/ccl/backupccl/restore_job.go | 14 + .../backup-restore/backup-dropped-descriptors | 16 +- .../restore-on-fail-or-cancel-schema-objects | 187 +++++++++ .../restore-on-fail-or-cancel-ttl | 73 ++++ pkg/sql/gcjob/gc_job.go | 5 + pkg/testutils/jobutils/jobs_verification.go | 2 +- 8 files changed, 631 insertions(+), 325 deletions(-) create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 3fd381b7ee02..650e52955b6f 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1870,285 +1870,6 @@ func TestBackupRestoreControlJob(t *testing.T) { }) } -func TestRestoreFailCleansUpTTLSchedules(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - injectedErr := errors.New("injected error") - testCases := []struct { - desc string - beforePublishingDescriptors func() error - afterPublishingDescriptors func() error - }{ - { - desc: "error before publishing descriptors", - beforePublishingDescriptors: func() error { - return injectedErr - }, - }, - { - desc: "error after publishing descriptors", - afterPublishingDescriptors: func() error { - return injectedErr - }, - }, - } - - ctx := context.Background() - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - c, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - for _, server := range c.Servers { - registry := server.JobRegistry().(*jobs.Registry) - registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ - jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*restoreResumer) - r.testingKnobs.beforePublishingDescriptors = tc.beforePublishingDescriptors - r.testingKnobs.afterPublishingDescriptors = tc.afterPublishingDescriptors - return r - }, - } - } - - // Create a database with a TTL table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes') -`) - - // Backup d.tb. - sqlDB.Exec(t, `BACKUP DATABASE d TO $1`, localFoo) - - // Drop d so that it can be restored. - sqlDB.Exec(t, `DROP DATABASE d`) - - // Attempt the restore and check it fails. - _, err := sqlDB.DB.ExecContext(ctx, `RESTORE DATABASE d FROM $1`, localFoo) - require.Error(t, err) - require.Regexp(t, injectedErr.Error(), err.Error()) - - var count int - sqlDB.QueryRow(t, `SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'`).Scan(&count) - require.Equal(t, 0, count) - }) - } -} - -func TestRestoreFailCleansUpTypeBackReferences(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - _, sqlDB, dir, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - - dir = dir + "/foo" - - // Create a database with a type and table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TYPE d.ty AS ENUM ('hello'); -CREATE TABLE d.tb (x d.ty); -INSERT INTO d.tb VALUES ('hello'), ('hello'); -`) - - // Backup d.tb. - sqlDB.Exec(t, `BACKUP TABLE d.tb TO $1`, localFoo) - - // Drop d.tb so that it can be restored. - sqlDB.Exec(t, `DROP TABLE d.tb`) - - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - - // We should get an error when restoring the table. - sqlDB.ExpectErr(t, "sst: no such file", `RESTORE d.tb FROM $1`, localFoo) - - // The failed restore should clean up type back references so that we are able - // to drop d.ty. - sqlDB.Exec(t, `DROP TYPE d.ty`) -} - -// TestRestoreFailCleanup tests that a failed RESTORE is cleaned up. -func TestRestoreFailCleanup(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - params := base.TestServerArgs{} - // Disable GC job so that the final check of crdb_internal.tables is - // guaranteed to not be cleaned up. Although this was never observed by a - // stress test, it is here for safety. - blockGC := make(chan struct{}) - params.Knobs.GCJob = &sql.GCJobTestingKnobs{ - RunBeforeResume: func(_ jobspb.JobID) error { - <-blockGC - return nil - }, - } - - const numAccounts = 1000 - tc, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, - InitManualReplication, base.TestClusterArgs{ServerArgs: params}) - defer cleanup() - kvDB := tc.Server(0).DB() - - dir = dir + "/foo" - - sqlDB.Exec(t, `CREATE DATABASE restore`) - - // Create a user defined type and check that it is cleaned up after the - // failed restore. - sqlDB.Exec(t, `CREATE TYPE data.myenum AS ENUM ('hello')`) - // Do the same with a user defined schema. - sqlDB.Exec(t, `USE data; CREATE SCHEMA myschema`) - - sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, localFoo) - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - sqlDB.ExpectErr( - t, "sst: no such file", - `RESTORE data.* FROM $1 WITH OPTIONS (into_db='restore')`, localFoo, - ) - // Verify the failed RESTORE added some DROP tables. - sqlDB.CheckQueryResults(t, - `SELECT name FROM crdb_internal.tables WHERE database_name = 'restore' AND state = 'DROP'`, - [][]string{{"bank"}}, - ) - - // Verify that `myenum` was cleaned out from the failed restore. There should - // only be one namespace entry (data.myenum). - sqlDB.CheckQueryResults(t, `SELECT count(*) FROM system.namespace WHERE name = 'myenum'`, [][]string{{"1"}}) - // Check the same for data.myschema. - sqlDB.CheckQueryResults(t, `SELECT count(*) FROM system.namespace WHERE name = 'myschema'`, [][]string{{"1"}}) - - // Verify that the only schema that appears is the public schema - dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, keys.SystemSQLCodec, "restore") - require.Equal(t, len(dbDesc.DatabaseDesc().Schemas), 1) - if _, found := dbDesc.DatabaseDesc().Schemas[tree.PublicSchema]; !found { - t.Error("public schema not found") - } -} - -// TestRestoreFailDatabaseCleanup tests that a failed RESTORE is cleaned up -// when restoring an entire database. -func TestRestoreFailDatabaseCleanup(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - params := base.TestServerArgs{} - const numAccounts = 1000 - _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, - InitManualReplication, base.TestClusterArgs{ServerArgs: params}) - defer cleanup() - - dir = dir + "/foo" - - // Create a user defined type and check that it is cleaned up after the - // failed restore. - sqlDB.Exec(t, `CREATE TYPE data.myenum AS ENUM ('hello')`) - // Do the same with a user defined schema. - sqlDB.Exec(t, `USE data; CREATE SCHEMA myschema`) - - sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, localFoo) - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - sqlDB.Exec(t, `DROP DATABASE data`) - sqlDB.ExpectErr( - t, "sst: no such file", - `RESTORE DATABASE data FROM $1`, localFoo, - ) - sqlDB.ExpectErr( - t, `database "data" does not exist`, - `DROP DATABASE data`, - ) -} - -func TestRestoreFailCleansUpTempSystemDatabase(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - _, sqlDB, dir, cleanup := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication) - defer cleanup() - - // Create a database with a type and table. - sqlDB.Exec(t, ` -CREATE DATABASE d; -CREATE TYPE d.ty AS ENUM ('hello'); -CREATE TABLE d.tb (x d.ty); -INSERT INTO d.tb VALUES ('hello'), ('hello'); -`) - - // Cluster BACKUP. - sqlDB.Exec(t, `BACKUP TO $1`, localFoo) - - // Bugger the backup by removing the SST files. - if err := filepath.Walk(dir+"/foo", func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { - return nil - } - return os.Remove(path) - }); err != nil { - t.Fatal(err) - } - - _, sqlDBRestore, cleanupRestore := backupRestoreTestSetupEmpty(t, singleNode, dir, InitManualReplication, - base.TestClusterArgs{}) - defer cleanupRestore() - var defaultDBID int - sqlDBRestore.QueryRow( - t, "SELECT id FROM system.namespace WHERE name = 'defaultdb'", - ).Scan(&defaultDBID) - - // We should get an error when restoring the table. - sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM $1`, localFoo) - - // Make sure the temp system db is not present. - row := sqlDBRestore.QueryStr(t, fmt.Sprintf(`SELECT * FROM [SHOW DATABASES] WHERE database_name = '%s'`, restoreTempSystemDB)) - require.Equal(t, 0, len(row)) - - // Make sure defaultdb and postgres are recreated with new IDs. - sqlDBRestore.CheckQueryResults(t, - fmt.Sprintf(` -SELECT name - FROM system.namespace - WHERE "parentID" = 0 AND id > %d`, defaultDBID, - ), [][]string{{"defaultdb"}, {"postgres"}}) -} - func TestBackupRestoreUserDefinedSchemas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index 5d07b9954df4..757b6080b930 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -19,11 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -68,18 +70,22 @@ type sqlDBKey struct { } type datadrivenTestState struct { - servers map[string]serverutils.TestServerInterface - dataDirs map[string]string - sqlDBs map[sqlDBKey]*gosql.DB - noticeBuffer []string - cleanupFns []func() + servers map[string]serverutils.TestServerInterface + dataDirs map[string]string + sqlDBs map[sqlDBKey]*gosql.DB + jobTags map[string]jobspb.JobID + clusterTimestamps map[string]string + noticeBuffer []string + cleanupFns []func() } func newDatadrivenTestState() datadrivenTestState { return datadrivenTestState{ - servers: make(map[string]serverutils.TestServerInterface), - dataDirs: make(map[string]string), - sqlDBs: make(map[sqlDBKey]*gosql.DB), + servers: make(map[string]serverutils.TestServerInterface), + dataDirs: make(map[string]string), + sqlDBs: make(map[sqlDBKey]*gosql.DB), + jobTags: make(map[string]jobspb.JobID), + clusterTimestamps: make(map[string]string), } } @@ -96,16 +102,21 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) { d.noticeBuffer = nil } -func (d *datadrivenTestState) addServer( - t *testing.T, - name, iodir, tempCleanupFrequency string, - ioConf base.ExternalIODirConfig, - localities string, -) error { +type serverCfg struct { + name string + iodir string + tempCleanupFrequency string + nodes int + splits int + ioConf base.ExternalIODirConfig + localities string +} + +func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { var tc serverutils.TestClusterInterface var cleanup func() params := base.TestClusterArgs{} - params.ServerArgs.ExternalIODirConfig = ioConf + params.ServerArgs.ExternalIODirConfig = cfg.ioConf params.ServerArgs.Knobs = base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } @@ -113,8 +124,8 @@ func (d *datadrivenTestState) addServer( settings := cluster.MakeTestingClusterSettings() closedts.TargetDuration.Override(context.Background(), &settings.SV, 10*time.Millisecond) closedts.SideTransportCloseInterval.Override(context.Background(), &settings.SV, 10*time.Millisecond) - if tempCleanupFrequency != "" { - duration, err := time.ParseDuration(tempCleanupFrequency) + if cfg.tempCleanupFrequency != "" { + duration, err := time.ParseDuration(cfg.tempCleanupFrequency) if err != nil { return errors.New("unable to parse tempCleanupFrequency during server creation") } @@ -123,10 +134,10 @@ func (d *datadrivenTestState) addServer( } params.ServerArgs.Settings = settings - clusterSize := singleNode + clusterSize := cfg.nodes - if localities != "" { - cfgs := strings.Split(localities, ",") + if cfg.localities != "" { + cfgs := strings.Split(cfg.localities, ",") clusterSize = len(cfgs) serverArgsPerNode := make(map[int]base.TestServerArgs) for i, cfg := range cfgs { @@ -136,13 +147,15 @@ func (d *datadrivenTestState) addServer( } params.ServerArgsPerNode = serverArgsPerNode } - if iodir == "" { - tc, _, iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, 0, InitManualReplication, params) + if cfg.iodir == "" { + tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits, + InitManualReplication, params) } else { - tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, iodir, InitManualReplication, params) + tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, cfg.iodir, + InitManualReplication, params) } - d.servers[name] = tc.Server(0) - d.dataDirs[name] = iodir + d.servers[cfg.name] = tc.Server(0) + d.dataDirs[cfg.name] = cfg.iodir d.cleanupFns = append(d.cleanupFns, cleanup) return nil @@ -187,26 +200,97 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // commands. The test files are in testdata/backup-restore. The following // syntax is provided: // -// - "new-server name= [share-io-dir=]" -// Create a new server with the input name. It takes in an optional -// share-io-dir argument to share an IO directory with an existing server. -// This is useful when restoring from a backup taken in another server. +// - "new-server name= [args]" +// Create a new server with the input name. +// +// Supported arguments: +// +// + share-io-dir: can be specified to share an IO directory with an existing +// server. This is useful when restoring from a backup taken in another +// server. +// +// + allow-implicit-access: can be specified to set +// `EnableNonAdminImplicitAndArbitraryOutbound` to true +// +// + disable-http: disables use of external HTTP endpoints. +// +// + temp-cleanup-freq: specifies the frequency with which the temporary table +// cleanup reconciliation job runs // -// - "exec-sql server=" +// + localities: specifies the localities that will be used when starting up +// the test cluster. The cluster will have len(localities) nodes, with each +// node assigned a locality config corresponding to the locality. Please +// update the `localityCfgs` map when adding new localities. +// +// + nodes: specifies the number of nodes in the test cluster. +// +// + splits: specifies the number of ranges the bank table is split into. +// +// - "exec-sql [server=] [user=] [args]" // Executes the input SQL query on the target server. By default, server is // the last created server. // -// Supported options: +// Supported arguments: +// +// + expect-error-ignore: expects the query to return an error, but we will +// ignore it. // -// - "query-sql server=" +// - "query-sql [server=] [user=]" // Executes the input SQL query and print the results. // // - "reset" // Clear all state associated with the test. +// +// - "job" [server=] [user=] [args] +// Executes job specific operations. +// +// Supported arguments: +// +// + cancel=: cancels the job referenced by the tag and waits for it to +// reach a CANCELED state. +// +// - "save-cluster-ts" tag= +// Saves the `SELECT cluster_logical_timestamp()` with the tag. Can be used +// in the future with intstructions such as `aost`. +// +// - "backup" [args] +// Executes backup specific operations. +// +// Supported arguments: +// +// + tag=: tag the backup job to reference it in the future. +// +// + expect-pausepoint: expects the backup job to end up in a paused state because +// of a pausepoint error. +// +// - "restore" [args] +// Executes restore specific operations. +// +// Supported arguments: +// +// + tag=: tag the restore job to reference it in the future. +// +// + expect-pausepoint: expects the restore job to end up in a paused state because +// of a pausepoint error. +// +// + aost: expects a tag referencing a previously saved cluster timestamp +// using `save-cluster-ts`. It then runs the restore as of the saved cluster +// timestamp. +// +// - "schema" [args] +// Executes schema change specific operations. +// +// Supported arguments: +// +// + tag=: tag the schema change job to reference it in the future. +// +// + expect-pausepoint: expects the schema change job to end up in a paused state because +// of a pausepoint error. func TestDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderRace(t, "takes >1 min under race") + + skip.UnderRace(t, "takes ~3mins to run") // This test uses this mock HTTP server to pass the backup files between tenants. httpAddr, httpServerCleanup := makeInsecureHTTPServer(t) @@ -218,6 +302,7 @@ func TestDataDriven(t *testing.T) { ds := newDatadrivenTestState() defer ds.cleanup(ctx) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { case "sleep": var sleepDuration string @@ -228,12 +313,16 @@ func TestDataDriven(t *testing.T) { } time.Sleep(duration) return "" + case "reset": ds.cleanup(ctx) ds = newDatadrivenTestState() return "" + case "new-server": var name, shareDirWith, iodir, tempCleanupFrequency, localities string + var splits int + nodes := singleNode var io base.ExternalIODirConfig d.ScanArgs(t, "name", &name) if d.HasArg("share-io-dir") { @@ -254,12 +343,28 @@ func TestDataDriven(t *testing.T) { if d.HasArg("localities") { d.ScanArgs(t, "localities", &localities) } + if d.HasArg("nodes") { + d.ScanArgs(t, "nodes", &nodes) + } + if d.HasArg("splits") { + d.ScanArgs(t, "splits", &splits) + } lastCreatedServer = name - err := ds.addServer(t, name, iodir, tempCleanupFrequency, io, localities) + cfg := serverCfg{ + name: name, + iodir: iodir, + tempCleanupFrequency: tempCleanupFrequency, + nodes: nodes, + splits: splits, + ioConf: io, + localities: localities, + } + err := ds.addServer(t, cfg) if err != nil { return err.Error() } return "" + case "exec-sql": server := lastCreatedServer user := "root" @@ -273,13 +378,36 @@ func TestDataDriven(t *testing.T) { d.Input = strings.ReplaceAll(d.Input, "http://COCKROACH_TEST_HTTP_SERVER/", httpAddr) _, err := ds.getSQLDB(t, server, user).Exec(d.Input) ret := ds.noticeBuffer + + // Check if we are expecting a pausepoint error. + if d.HasArg("expect-pausepoint") { + require.NotNilf(t, err, "expected pause point error") + require.True(t, strings.Contains(err.Error(), "job requested it be paused")) + + // Find job ID of the pausepoint job. + var jobID jobspb.JobID + require.NoError(t, + ds.getSQLDB(t, server, user).QueryRow( + `SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1`).Scan(&jobID)) + fmt.Printf("expecting pausepoint, found job ID %d\n\n\n", jobID) + + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + jobutils.WaitForJobToPause(t, runner, jobID) + ret = append(ds.noticeBuffer, "job paused at pausepoint") + ret = append(ret, "") + return strings.Join(ret, "\n") + } + + // Check if we are expecting an error, and want to ignore outputting it. + if d.HasArg("expect-error-ignore") { + require.NotNilf(t, err, "expected error") + ret = append(ret, "ignoring expected error") + return strings.Join(ret, "\n") + } + + // Check for other errors. if err != nil { - // pausepoint errors have the job ID in them, and datadriven tests - // don't seem to support regex matching. Clean the error up to not - // include the job ID. - if i := strings.Index(err.Error(), "paused before it completed with reason"); i != -1 { - ret = append(ds.noticeBuffer, err.Error()[i:]) - } else if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { + if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { ret = append(ds.noticeBuffer, err.Error()) if pqErr.Detail != "" { ret = append(ret, "DETAIL: "+pqErr.Detail) @@ -292,6 +420,7 @@ func TestDataDriven(t *testing.T) { } } return strings.Join(ret, "\n") + case "query-sql": server := lastCreatedServer user := "root" @@ -308,9 +437,180 @@ func TestDataDriven(t *testing.T) { output, err := sqlutils.RowsToDataDrivenOutput(rows) require.NoError(t, err) return output + + case "backup": + server := lastCreatedServer + user := "root" + jobType := "BACKUP" + + // First, run the backup. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "restore": + server := lastCreatedServer + user := "root" + jobType := "RESTORE" + + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + var ts string + var ok bool + if ts, ok = ds.clusterTimestamps[aost]; !ok { + t.Fatalf("no cluster timestamp found for %s", aost) + } + + // Replace the ts tag with the actual timestamp. + d.Input = strings.Replace(d.Input, aost, + fmt.Sprintf("'%s'", ts), 1) + } + + // First, run the restore. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if the job must be run aost. + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "schema-change": + server := lastCreatedServer + user := "root" + jobType := "SCHEMA CHANGE" + + // First, run the schema change. + _, err := ds.getSQLDB(t, server, user).Exec(d.Input) + + // Tag the job. + if d.HasArg("tag") { + tagJob(t, server, user, jobType, ds, d) + } + + // Check if the job must be run aost. + if d.HasArg("aost") { + var aost string + d.ScanArgs(t, "aost", &aost) + } + + // Check if we expect a pausepoint error. + if d.HasArg("expect-pausepoint") { + expectPausepoint(t, err, jobType, server, user, ds) + ret := append(ds.noticeBuffer, "job paused at pausepoint") + return strings.Join(ret, "\n") + } + + // All other errors are bad. + require.NoError(t, err) + return "" + + case "job": + server := lastCreatedServer + user := "root" + + if d.HasArg("cancel") { + var cancelJobTag string + d.ScanArgs(t, "cancel", &cancelJobTag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[cancelJobTag]; !ok { + t.Fatalf("could not find job with tag %s", cancelJobTag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + runner.Exec(t, `CANCEL JOB $1`, jobID) + jobutils.WaitForJobToCancel(t, runner, jobID) + } + return "" + + case "save-cluster-ts": + server := lastCreatedServer + user := "root" + var timestampTag string + d.ScanArgs(t, "tag", ×tampTag) + if _, ok := ds.clusterTimestamps[timestampTag]; ok { + t.Fatalf("cannot reuse cluster ts tag %s", timestampTag) + } + var ts string + err := ds.getSQLDB(t, server, user).QueryRow(`SELECT cluster_logical_timestamp()`).Scan(&ts) + require.NoError(t, err) + ds.clusterTimestamps[timestampTag] = ts + return "" + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } }) }) } + +// findMostRecentJobWithType returns the most recently created job of `job_type` +// jobType. +func findMostRecentJobWithType( + t *testing.T, ds datadrivenTestState, server, user string, jobType string, +) jobspb.JobID { + var jobID jobspb.JobID + require.NoError( + t, ds.getSQLDB(t, server, user).QueryRow( + fmt.Sprintf( + `SELECT job_id FROM [SHOW JOBS] WHERE job_type = '%s' ORDER BY created DESC LIMIT 1`, + jobType)).Scan(&jobID)) + return jobID +} + +// expectPausepoint waits for the job to hit a pausepoint and enter a paused +// state. +func expectPausepoint( + t *testing.T, err error, jobType, server, user string, ds datadrivenTestState, +) { + // Check if we are expecting a pausepoint error. + require.NotNilf(t, err, "expected pause point error") + + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + jobutils.WaitForJobToPause(t, runner, + findMostRecentJobWithType(t, ds, server, user, jobType)) +} + +// tagJob stores the jobID of the most recent job of `jobType`. Users can use +// the tag to refer to the job in the future. +func tagJob( + t *testing.T, server, user, jobType string, ds datadrivenTestState, d *datadriven.TestData, +) { + var jobTag string + d.ScanArgs(t, "tag", &jobTag) + if _, exists := ds.jobTags[jobTag]; exists { + t.Fatalf("failed to `tag`, job with tag %s already exists", jobTag) + } + ds.jobTags[jobTag] = findMostRecentJobWithType(t, ds, server, user, jobType) +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index aff04579bebd..ba24eb7271a8 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1356,6 +1356,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } p.ExecCfg().JobRegistry.NotifyToAdoptJobs() + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.after_publishing_descriptors"); err != nil { + return err + } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1492,6 +1496,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.after_publishing_descriptors"); err != nil { + return err + } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1723,6 +1731,11 @@ func (r *restoreResumer) publishDescriptors( if details.DescriptorsPublished { return nil } + + if err := execCfg.JobRegistry.CheckPausepoint("restore.before_publishing_descriptors"); err != nil { + return err + } + if fn := r.testingKnobs.beforePublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1941,6 +1954,7 @@ func emitRestoreJobEvent( func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext) r.execCfg = p.ExecCfg() + // Emit to the event log that the job has started reverting. emitRestoreJobEvent(ctx, p, jobs.StatusReverting, r.job) diff --git a/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors b/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors index 749c2dcf773f..55d8edce17fe 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors +++ b/pkg/ccl/backupccl/testdata/backup-restore/backup-dropped-descriptors @@ -1,4 +1,4 @@ -# backup-dropped-desctiprors tests backup and restore interaction with database, schema +# backup-dropped-descriptors tests backup and restore interaction with database, schema # and type descriptors in the DROP state. subtest dropped-database-descriptors @@ -10,9 +10,12 @@ SET use_declarative_schema_changer = 'off'; SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec'; CREATE DATABASE d; CREATE TABLE d.foo (id INT); +---- + +schema-change expect-pausepoint DROP DATABASE d CASCADE; ---- -paused before it completed with reason: pause point "schemachanger.before.exec" hit +job paused at pausepoint # At this point, we have a descriptor entry for `d` in a DROP state. query-sql @@ -118,16 +121,19 @@ SET use_declarative_schema_changer = 'off'; SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec'; ---- -exec-sql +schema-change expect-pausepoint DROP SCHEMA d2.s CASCADE; ---- -paused before it completed with reason: pause point "schemachanger.before.exec" hit +job paused at pausepoint exec-sql SET CLUSTER SETTING jobs.debug.pausepoints = 'typeschemachanger.before.exec'; +---- + +schema-change expect-pausepoint DROP TYPE d2.typ; ---- -paused before it completed with reason: pause point "typeschemachanger.before.exec" hit +job paused at pausepoint query-sql WITH tbls AS ( diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects new file mode 100644 index 000000000000..40f1ad2b4404 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-schema-objects @@ -0,0 +1,187 @@ +new-server name=s1 nodes=1 splits=1000 +---- + +subtest restore-cleanup + +# Disable GC job so that the final check of crdb_internal.tables is +# guaranteed to not be cleaned up. Although this was never observed by a +# stress test, it is here for safety. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'gcjob.before_resume'; +---- + +# Create a user defined type and check that it is cleaned up after the +# failed restore. +exec-sql +CREATE DATABASE restore; +CREATE TYPE data.myenum AS ENUM ('hello'); +---- + +# Do the same with a user defined schema. +exec-sql +USE data; +CREATE SCHEMA myschema; +---- + +exec-sql +BACKUP DATABASE data INTO 'nodelocal://1/foo'; +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +---- + +restore expect-pausepoint tag=a +RESTORE data.* FROM LATEST IN 'nodelocal://1/foo' WITH into_db = 'restore'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=a +---- + +# Verify the cancelled RESTORE added some DROP tables. +query-sql +SELECT name FROM crdb_internal.tables WHERE database_name = 'restore' AND state = 'DROP' +---- +bank + +# Verify that `myenum` was cleaned out from the failed restore. There should +# only be one namespace entry (data.myenum). +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'myenum'; +---- +1 + +# Check the same for data.myschema. +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'myschema'; +---- +1 + +# Verify that the only schema that appears is the public schema. +query-sql +USE restore; +SHOW SCHEMAS; +---- +crdb_internal +information_schema +pg_catalog +pg_extension +public admin + +# Ensure that a database level restore also cleans up after itself. We do this +# by ensuring that no descriptor is left behind for the database `data` since we +# get a does not exist error when trying to drop it. +exec-sql +DROP DATABASE data; +---- + +restore expect-pausepoint tag=b +RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=b +---- + +# Ensure there is no database left behind. +exec-sql +DROP DATABASE data; +---- +pq: database "data" does not exist + +subtest end + +subtest restore-cleanup-type-back-references + +exec-sql +CREATE DATABASE d; +CREATE TYPE d.ty AS ENUM ('hello'); +CREATE TABLE d.tb (x d.ty); +INSERT INTO d.tb VALUES ('hello'), ('hello'); +---- + +exec-sql +BACKUP TABLE d.tb INTO 'nodelocal://1/bar'; +---- + +# Drop d.tb so that it can be restored. +exec-sql +DROP TABLE d.tb; +---- + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +---- + +restore expect-pausepoint tag=c +RESTORE d.tb FROM LATEST IN 'nodelocal://1/bar'; +---- +job paused at pausepoint + + +# Cancel the job so that the cleanup hook runs. +job cancel=c +---- + +# The failed restore should clean up type back references so that we are able to +# drop d.ty. +exec-sql +DROP TYPE d.ty; +---- + +subtest end + +subtest restore-cleanup-temp-system-database + +exec-sql +CREATE DATABASE d2; +CREATE TYPE d2.ty AS ENUM ('hello'); +CREATE TABLE d2.tb (x d2.ty); +INSERT INTO d2.tb VALUES ('hello'), ('hello'); +---- + +exec-sql +BACKUP INTO 'nodelocal://1/cluster'; +---- + +# Start a new cluster with the same IO dir. +new-server name=s2 share-io-dir=s1 +---- + +# We pause the job before publishing descriptors to ensure that we are testing +# OnFailOrCancel's temporary system db cleanup logic. The restore Resume method +# anyways performs this cleanup after publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'; +---- + +# Get defaultdb ID. +exec-sql +USE system; +---- + +restore expect-pausepoint tag=d +RESTORE FROM LATEST IN 'nodelocal://1/cluster'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=d +---- + +# Make sure the temp system db is not present. +query-sql +SELECT * FROM [SHOW DATABASES] WHERE database_name = 'crdb_temp_system'; +---- + +# Make sure defaultdb and postgres are recreated with new IDs. +query-sql +SELECT name FROM system.namespace WHERE "parentID" = 0 AND "id" > 100; +---- +defaultdb +postgres + +subtest end diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl new file mode 100644 index 000000000000..531697252010 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-on-fail-or-cancel-ttl @@ -0,0 +1,73 @@ +# This test ensures that a failed or cancelled restore cleans up the ttl schedule +# that is created when resotring a table with a row level ttl configuration. + +new-server name=s1 nodes=1 +---- + +subtest restore-fail-cleans-up-ttl-schedules-before-publishing + +exec-sql +CREATE DATABASE d; +CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes'); +---- + +exec-sql +BACKUP DATABASE d INTO 'userfile:///foo' +---- + +# Attempt the restore but pause it before publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'; +DROP DATABASE d; +---- + +restore expect-pausepoint tag=a +RESTORE DATABASE d FROM LATEST IN 'userfile:///foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=a +---- + +query-sql +SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'; +---- +0 + +subtest end + + +subtest restore-fail-cleans-up-ttl-schedules-after-publishing + +exec-sql +CREATE DATABASE d; +CREATE TABLE d.tb (id INT PRIMARY KEY) WITH (ttl_expire_after = '10 minutes'); +---- + +exec-sql +BACKUP DATABASE d INTO 'userfile:///foo' +---- + +# Attempt the restore but pause it after publishing descriptors. +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.after_publishing_descriptors'; +DROP DATABASE d; +---- + +restore expect-pausepoint tag=b +RESTORE DATABASE d FROM LATEST IN 'userfile:///foo'; +---- +job paused at pausepoint + +# Cancel the job so that the cleanup hook runs. +job cancel=b +---- + +query-sql +SELECT count(1) FROM [SHOW SCHEDULES] WHERE label LIKE 'row-level-ttl-%'; +---- +0 + +subtest end diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 545d4e121a26..4f40ac9b6aa3 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -173,6 +173,11 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { + return err + } + if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { if err := fn(r.jobID); err != nil { return err diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 02582b17034e..1f1463fed5ed 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -46,7 +46,7 @@ func WaitForJobToPause(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) waitForJobToHaveStatus(t, db, jobID, jobs.StatusPaused) } -// WaitForJobToCancel waits for the specified job ID to be cancelled. +// WaitForJobToCancel waits for the specified job ID to be in a cancelled state. func WaitForJobToCancel(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { t.Helper() waitForJobToHaveStatus(t, db, jobID, jobs.StatusCanceled)