From 089517c0f4f5c1017f8751e4612b1db05fcf65df Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Thu, 3 Sep 2020 17:17:55 -0400 Subject: [PATCH] backupccl: fix NPE during restore rollback This commit fixes an NPE that happens during the rollback if the RESTORE fails or is canceled before the execCfg is set on the resumer. The execCfg should be removed from the resumer, but that change should be left for a separate commit. Release justification: bug fix Release note (bug fix): RESTOREs that have been cancelled may have crashed a node. This is now fixed. --- .../full_cluster_backup_restore_test.go | 23 +++++++++++++++++++ pkg/ccl/backupccl/restore_job.go | 21 +++++++++++++---- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index bbf664d71c1f..8aab10e076b9 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -526,6 +526,29 @@ func TestClusterRestoreFailCleanup(t *testing.T) { }, ) }) + + t.Run("after offline tables", func(t *testing.T) { + _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + + // Bugger the backup by injecting a failure while restoring the system data. + for _, server := range tcRestore.Servers { + registry := server.JobRegistry().(*jobs.Registry) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.afterOfflineTableCreation = func() error { + return errors.New("injected error") + } + return r + }, + } + } + + sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + }) } // TestClusterRevisionHistory tests that cluster backups can be taken with diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 7fec0ef51e86..d4aa91f46d47 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -693,6 +693,10 @@ type restoreResumer struct { // restore. It is used to simulate any errors that we may face at this point // of the restore. duringSystemTableRestoration func() error + // afterOfflineTableCreation is called after creating the OFFLINE table + // descriptors we're ingesting. If an error is returned, we fail the + // restore. + afterOfflineTableCreation func() error } } @@ -1089,6 +1093,12 @@ func (r *restoreResumer) Resume( // Refresh the job details since they may have been updated when creating the // importing descriptors. details = r.job.Details().(jobspb.RestoreDetails) + + if fn := r.testingKnobs.afterOfflineTableCreation; fn != nil { + if err := fn(); err != nil { + return err + } + } r.execCfg = p.ExecCfg() backupStats, err := getStatisticsFromBackup(ctx, defaultStore, details.Encryption, latestBackupManifest) if err != nil { @@ -1334,15 +1344,16 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) er return err } } - return r.dropDescriptors(ctx, execCfg.JobRegistry, txn) + return r.dropDescriptors(ctx, execCfg, txn) }) } // dropDescriptors implements the OnFailOrCancel logic. func (r *restoreResumer) dropDescriptors( - ctx context.Context, jr *jobs.Registry, txn *kv.Txn, + ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, ) error { details := r.job.Details().(jobspb.RestoreDetails) + jr := execCfg.JobRegistry // No need to mark the tables as dropped if they were not even created in the // first place. @@ -1351,7 +1362,7 @@ func (r *restoreResumer) dropDescriptors( } // Needed to trigger the schema change manager. - if err := txn.SetSystemConfigTrigger(r.execCfg.Codec.ForSystemTenant()); err != nil { + if err := txn.SetSystemConfigTrigger(execCfg.Codec.ForSystemTenant()); err != nil { return err } @@ -1380,7 +1391,7 @@ func (r *restoreResumer) dropDescriptors( if err != nil { return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace") } - existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, prev) + existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prev) if err != nil { return errors.Wrap(err, "dropping tables caused by restore fail/cancel") } @@ -1466,7 +1477,7 @@ func (r *restoreResumer) dropDescriptors( for _, dbDesc := range details.DatabaseDescs { db := dbdesc.NewExistingMutable(*dbDesc) // We need to ignore details.TableDescs since we haven't committed the txn that deletes these. - isDBEmpty, err = isDatabaseEmpty(ctx, r.execCfg.DB, db, ignoredTables) + isDBEmpty, err = isDatabaseEmpty(ctx, execCfg.DB, db, ignoredTables) if err != nil { return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", db.GetName()) }