Skip to content

Commit

Permalink
Merge #63311
Browse files Browse the repository at this point in the history
63311: jobs: add RevertFailed terminal state for jobs that fail during revert r=adityamaru a=adityamaru

Previously, a failure during revert would mark the job as "failed" and
indicate in the error string that manual cleanup is required. This job
was however gc'eable if the retention time has elapsed.  This would
leave no trace of the job which is bad, since we expect users/support to
cleanup the partial work done by the job.

This change adds a new terminal state `RevertFailed` that does not allow
the job to get gc'ed.

Fixes: #59542

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed May 17, 2021
2 parents 91e40c3 + 1c6a600 commit 25e26a5
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 83 deletions.
2 changes: 2 additions & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ go_test(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/optionalnodeliveness",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ SELECT
FROM %s J
WHERE
J.created_by_type = '%s' AND J.created_by_id = S.schedule_id AND
J.status NOT IN ('%s', '%s', '%s')
J.status NOT IN ('%s', '%s', '%s', '%s')
) AS num_running, S.*
FROM %s S
WHERE next_run < %s
ORDER BY random()
%s
FOR UPDATE`, env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed,
StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed,
env.ScheduledJobsTableName(), env.NowExpr(), limitClause)
}

Expand Down Expand Up @@ -206,9 +206,9 @@ func newLoopStats(
ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn,
) (*loopStats, error) {
numRunningJobsStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s')",
"SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')",
env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed)
StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed)
readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
env.ScheduledJobsTableName(), env.NowExpr())
Expand Down
28 changes: 27 additions & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ const (
// job will change its state to StatusPaused the next time it runs
// maybeAdoptJobs and will stop running it.
StatusPauseRequested Status = "pause-requested"
// StatusRevertFailed is for jobs that encountered an non-retryable error when
// reverting their changes. Manual cleanup is required when a job ends up in
// this state.
StatusRevertFailed Status = "revert-failed"
)

var (
Expand All @@ -172,7 +176,7 @@ func deprecatedIsOldSchemaChangeJob(payload *jobspb.Payload) bool {
// Terminal returns whether this status represents a "terminal" state: a state
// after which the job should never be updated again.
func (s Status) Terminal() bool {
return s == StatusFailed || s == StatusSucceeded || s == StatusCanceled
return s == StatusFailed || s == StatusSucceeded || s == StatusCanceled || s == StatusRevertFailed
}

// InvalidStatusError is the error returned when the desired operation is
Expand Down Expand Up @@ -584,6 +588,28 @@ func (j *Job) failed(
})
}

// RevertFailed marks the tracked job as having failed during revert with the
// given error. Manual cleanup is required when the job is in this state.
func (j *Job) revertFailed(
ctx context.Context, txn *kv.Txn, err error, fn func(context.Context, *kv.Txn) error,
) error {
return j.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Status != StatusReverting {
return fmt.Errorf("job with status %s cannot fail during a revert", md.Status)
}
if fn != nil {
if err := fn(ctx, txn); err != nil {
return err
}
}
ju.UpdateStatus(StatusRevertFailed)
md.Payload.FinishedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
md.Payload.Error = err.Error()
ju.UpdatePayload(md.Payload)
return nil
})
}

// succeeded marks the tracked job as having succeeded and sets its fraction
// completed to 1.0.
func (j *Job) succeeded(
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.OnFailOrCancelExit = true
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- errors.New("injected failure while blocked in reverting")
rts.check(t, jobs.StatusFailed)
rts.check(t, jobs.StatusRevertFailed)
})

// Fail the job, but also fail to mark it failed.
Expand Down Expand Up @@ -744,7 +744,7 @@ func TestRegistryLifecycle(t *testing.T) {
// But let it fail.
rts.mu.e.OnFailOrCancelExit = true
rts.failOrCancelCh <- errors.New("resume failed")
rts.check(t, jobs.StatusFailed)
rts.check(t, jobs.StatusRevertFailed)
})

t.Run("OnPauseRequest", func(t *testing.T) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (r *Registry) WaitForJobs(
// populate the crdb_internal.jobs vtable.
query := fmt.Sprintf(
`SELECT count(*) FROM system.jobs WHERE id IN (%s)
AND (status != 'succeeded' AND status != 'failed' AND status != 'canceled')`,
AND (status != $1 AND status != $2 AND status != $3 AND status != $4)`,
buf.String())
for r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 5 * time.Millisecond,
Expand All @@ -332,6 +332,10 @@ func (r *Registry) WaitForJobs(
nil, /* txn */
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
query,
StatusSucceeded,
StatusFailed,
StatusCanceled,
StatusRevertFailed,
)
if err != nil {
return errors.Wrap(err, "polling for queued jobs to complete")
Expand Down Expand Up @@ -1249,7 +1253,7 @@ func (r *Registry) stepThroughStateMachine(
}
return sErr
}
return r.stepThroughStateMachine(ctx, execCtx, resumer, job, StatusFailed,
return r.stepThroughStateMachine(ctx, execCtx, resumer, job, StatusRevertFailed,
errors.Wrapf(err, "job %d: cannot be reverted, manual cleanup may be required", job.ID()))
case StatusFailed:
if jobErr == nil {
Expand All @@ -1262,6 +1266,17 @@ func (r *Registry) stepThroughStateMachine(
}
telemetry.Inc(TelemetryMetrics[jobType].Failed)
return jobErr
case StatusRevertFailed:
if jobErr == nil {
return errors.AssertionFailedf("job %d: has StatusRevertFailed but no error was provided",
job.ID())
}
if err := job.revertFailed(ctx, nil /* txn */, jobErr, nil /* fn */); err != nil {
// If we can't transactionally mark the job as failed then it will be
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as revert field: %s", job.ID(), jobErr)
}
return jobErr
default:
return errors.NewAssertionErrorWithWrappedErrf(jobErr,
"job %d: has unsupported status %s", job.ID(), status)
Expand Down
Loading

0 comments on commit 25e26a5

Please sign in to comment.