Skip to content

Commit

Permalink
backupccl: allow restore jobs to self pause on errors
Browse files Browse the repository at this point in the history
Previously RESTORE jobs would automatically revert on failure. It may be
advantageous from a debugging perspective to allow the job to pause at the exact
moment of failure to identify the source of the errors. This patch adds a WITH
option DEBUG_PAUSE_ON which allows the job to pause itself when it encounters
the events described in this option. Currently the only value it can take is
'error', which allows jobs to self pause on errors. The job can then be RESUMEd
after the error has been fixed, or CANCELed if the desired behavior is to
rollback the job.

Resolves #36887

Release note (enterprise change): Added new DEBUG_PAUSE_ON option to RESTORE
jobs to allow for self pause on errors.

Release justification: low-risk as it is opt-in debugging tool off by default.
  • Loading branch information
Rui Hu committed Aug 31, 2021
1 parent e946415 commit 1684499
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 370 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ unreserved_keyword ::=
| 'DATABASES'
| 'DAY'
| 'DEALLOCATE'
| 'DEBUG_PAUSE_ON'
| 'DECLARE'
| 'DELETE'
| 'DEFAULTS'
Expand Down Expand Up @@ -2095,6 +2096,7 @@ restore_options ::=
| 'SKIP_MISSING_VIEWS'
| 'DETACHED'
| 'SKIP_LOCALITIES_CHECK'
| 'DEBUG_PAUSE_ON' '=' string_or_placeholder

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
91 changes: 91 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8567,3 +8567,94 @@ DROP INDEX idx_3;

sqlDB.Exec(t, `BACKUP test.t TO 'nodelocal://1/backup_test' WITH revision_history`)
}

func waitForStatus(t *testing.T, db *sqlutils.SQLRunner, jobID int64, status jobs.Status) error {
return testutils.SucceedsSoonError(func() error {
var jobStatus string
db.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&jobStatus)

if status != jobs.Status(jobStatus) {
return errors.Newf("expected jobID %d to have status %, got %s", jobID, status, jobStatus)
}
return nil
})
}

// Test to verify that RESTORE jobs self pause on error when given the
// DEBUG_PAUSE_ON = 'error' option.
func TestRestorePauseOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.ScopeWithoutShowLogs(t).Close(t)

defer jobs.TestingSetProgressThresholds()()

baseDir := "testdata"
args := base.TestServerArgs{ExternalIODir: baseDir, Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}}
params := base.TestClusterArgs{ServerArgs: args}
_, tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1,
InitManualReplication, params)
defer cleanupFn()

var forceFailure bool
for i := range tc.Servers {
tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.beforePublishingDescriptors = func() error {
if forceFailure {
return errors.New("testing injected failure")
}
return nil
}
return r
},
}
}

sqlDB.Exec(t, `CREATE DATABASE r1`)
sqlDB.Exec(t, `CREATE TABLE r1.foo (id INT)`)
sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/eventlogging'`)
sqlDB.Exec(t, `DROP DATABASE r1`)

restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/eventlogging' WITH DEBUG_PAUSE_ON = 'error'`
findJobQuery := `SELECT job_id FROM [SHOW JOBS] WHERE description LIKE '%RESTORE DATABASE%' ORDER BY created DESC`

// Verify that a RESTORE job will self pause on an error, but can be resumed
// after the source of error is fixed.
{
var jobID int64
forceFailure = true

sqlDB.QueryRow(t, restoreQuery)

sqlDB.QueryRow(t, findJobQuery).Scan(&jobID)
if err := waitForStatus(t, sqlDB, jobID, jobs.StatusPaused); err != nil {
t.Fatal(err)
}

forceFailure = false
sqlDB.Exec(t, "RESUME JOB $1", jobID)

if err := waitForStatus(t, sqlDB, jobID, jobs.StatusSucceeded); err != nil {
t.Fatal(err)
}
}

// Verify that a RESTORE job will self pause on an error and can be canceled.
{
var jobID int64
forceFailure = true
sqlDB.Exec(t, `DROP DATABASE r1`)
sqlDB.QueryRow(t, restoreQuery)
sqlDB.QueryRow(t, findJobQuery).Scan(&jobID)
if err := waitForStatus(t, sqlDB, jobID, jobs.StatusPaused); err != nil {
t.Fatal(err)
}

sqlDB.Exec(t, "CANCEL JOB $1", jobID)

if err := waitForStatus(t, sqlDB, jobID, jobs.StatusCanceled); err != nil {
t.Fatal(err)
}
}
}
16 changes: 16 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,22 @@ func createImportingDescriptors(

// Resume is part of the jobs.Resumer interface.
func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error {
if err := r.doResume(ctx, execCtx); err != nil {
details := r.job.Details().(jobspb.RestoreDetails)
if details.DebugPauseOn == "error" {
const errorFmt = "job failed with error (%v) but is being paused due to the %s=%s setting"
log.Warningf(ctx, errorFmt, err, restoreOptDebugPauseOn, details.DebugPauseOn)

return r.execCfg.JobRegistry.PauseRequested(ctx, nil, r.job.ID(),
fmt.Sprintf(errorFmt, err, restoreOptDebugPauseOn, details.DebugPauseOn))
}
return err
}

return nil
}

func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) error {
details := r.job.Details().(jobspb.RestoreDetails)
p := execCtx.(sql.JobExecContext)

Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ const (
restoreOptSkipMissingSequenceOwners = "skip_missing_sequence_owners"
restoreOptSkipMissingViews = "skip_missing_views"
restoreOptSkipLocalitiesCheck = "skip_localities_check"
restoreOptDebugPauseOn = "debug_pause_on"

// The temporary database system tables will be restored into for full
// cluster backups.
restoreTempSystemDB = "crdb_temp_system"
)

var allowedDebugPauseOnValues = map[string]struct{}{
"error": {},
}

// featureRestoreEnabled is used to enable and disable the RESTORE feature.
var featureRestoreEnabled = settings.RegisterBoolSetting(
"feature.restore.enabled",
Expand Down Expand Up @@ -1914,6 +1919,23 @@ func doRestorePlan(
}
}

var debugPauseOn string
if restoreStmt.Options.DebugPauseOn != nil {
pauseOnFn, err := p.TypeAsString(ctx, restoreStmt.Options.DebugPauseOn, "RESTORE")
if err != nil {
return err
}

debugPauseOn, err = pauseOnFn()
if err != nil {
return err
}

if _, ok := allowedDebugPauseOnValues[debugPauseOn]; len(debugPauseOn) > 0 && !ok {
return errors.Newf("%s cannot be set with the value %s", restoreOptDebugPauseOn, debugPauseOn)
}
}

filteredTablesByID, err := maybeFilterMissingViews(
tablesByID,
typesByID,
Expand Down Expand Up @@ -2011,6 +2033,7 @@ func doRestorePlan(
Encryption: encryption,
RevalidateIndexes: revalidateIndexes,
DatabaseModifiers: databaseModifiers,
DebugPauseOn: debugPauseOn,
},
Progress: jobspb.RestoreProgress{},
}
Expand Down
Loading

0 comments on commit 1684499

Please sign in to comment.