Skip to content

Commit

Permalink
backupccl: fix NPE during restore rollback
Browse files Browse the repository at this point in the history
This commit fixes an NPE that happens during the rollback if the RESTORE
fails or is canceled before the execCfg is set on the resumer. The
execCfg should be removed from the resumer, but that change should be
left for a separate commit.

Release justification: bug fix
Release note (bug fix): RESTOREs that have been cancelled may have
crashed a node. This is now fixed.
  • Loading branch information
pbardea committed Sep 3, 2020
1 parent ed43545 commit 089517c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
23 changes: 23 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,29 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
},
)
})

t.Run("after offline tables", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.afterOfflineTableCreation = func() error {
return errors.New("injected error")
}
return r
},
}
}

sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
})
}

// TestClusterRevisionHistory tests that cluster backups can be taken with
Expand Down
21 changes: 16 additions & 5 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ type restoreResumer struct {
// restore. It is used to simulate any errors that we may face at this point
// of the restore.
duringSystemTableRestoration func() error
// afterOfflineTableCreation is called after creating the OFFLINE table
// descriptors we're ingesting. If an error is returned, we fail the
// restore.
afterOfflineTableCreation func() error
}
}

Expand Down Expand Up @@ -1089,6 +1093,12 @@ func (r *restoreResumer) Resume(
// Refresh the job details since they may have been updated when creating the
// importing descriptors.
details = r.job.Details().(jobspb.RestoreDetails)

if fn := r.testingKnobs.afterOfflineTableCreation; fn != nil {
if err := fn(); err != nil {
return err
}
}
r.execCfg = p.ExecCfg()
backupStats, err := getStatisticsFromBackup(ctx, defaultStore, details.Encryption, latestBackupManifest)
if err != nil {
Expand Down Expand Up @@ -1334,15 +1344,16 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) er
return err
}
}
return r.dropDescriptors(ctx, execCfg.JobRegistry, txn)
return r.dropDescriptors(ctx, execCfg, txn)
})
}

// dropDescriptors implements the OnFailOrCancel logic.
func (r *restoreResumer) dropDescriptors(
ctx context.Context, jr *jobs.Registry, txn *kv.Txn,
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn,
) error {
details := r.job.Details().(jobspb.RestoreDetails)
jr := execCfg.JobRegistry

// No need to mark the tables as dropped if they were not even created in the
// first place.
Expand All @@ -1351,7 +1362,7 @@ func (r *restoreResumer) dropDescriptors(
}

// Needed to trigger the schema change manager.
if err := txn.SetSystemConfigTrigger(r.execCfg.Codec.ForSystemTenant()); err != nil {
if err := txn.SetSystemConfigTrigger(execCfg.Codec.ForSystemTenant()); err != nil {
return err
}

Expand Down Expand Up @@ -1380,7 +1391,7 @@ func (r *restoreResumer) dropDescriptors(
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace")
}
existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, prev)
existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prev)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel")
}
Expand Down Expand Up @@ -1466,7 +1477,7 @@ func (r *restoreResumer) dropDescriptors(
for _, dbDesc := range details.DatabaseDescs {
db := dbdesc.NewExistingMutable(*dbDesc)
// We need to ignore details.TableDescs since we haven't committed the txn that deletes these.
isDBEmpty, err = isDatabaseEmpty(ctx, r.execCfg.DB, db, ignoredTables)
isDBEmpty, err = isDatabaseEmpty(ctx, execCfg.DB, db, ignoredTables)
if err != nil {
return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", db.GetName())
}
Expand Down

0 comments on commit 089517c

Please sign in to comment.