Skip to content

Commit

Permalink
jobs: retry jobs with exponential-backoff
Browse files Browse the repository at this point in the history
Failed jobs were being retried with a constant interval in the previous
implementation. This commit enables jobs to be retried with exponentially
increasing delays with an upper bound. This change enables to retry the jobs
that are not currently retried when they fail due to transient problems.

Release note: None

Fixes: cockroachdb#44594
  • Loading branch information
Sajjad Rizvi committed Aug 10, 2021
1 parent 847514d commit fe8fb0e
Show file tree
Hide file tree
Showing 41 changed files with 1,961 additions and 200 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-124 set the active cluster version in the format '<major>.<minor>'
version version 21.1-126 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-124</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-126</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,10 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
},
}
}

// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
// Reduce retry delays.
sqlDBRestore.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '1ms'")
// Expect the restore to succeed.
sqlDBRestore.CheckQueryResultsRetry(t,
`SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`,
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ const (
SQLInstancesTable
// Can return new retryable rangefeed errors without crashing the client
NewRetryableRangefeedErrors
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -440,6 +442,10 @@ var versionsSingleton = keyedVersions{
Key: NewRetryableRangefeedErrors,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 124},
},
{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 126},
},

// Step (2): Add new versions here.
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
Expand Down
147 changes: 118 additions & 29 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -43,16 +44,16 @@ const (
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)`
)

const claimQuery = `
claimQuery = `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE (claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `)
WHERE ((claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `))
ORDER BY created DESC
LIMIT $3
RETURNING id;`
)

func (r *Registry) maybeDumpTrace(
resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error,
Expand Down Expand Up @@ -96,26 +97,81 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, claimQuery,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
r.metrics.ClaimedJobs.Inc(int64(numRows))
if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 {
log.Infof(ctx, "claimed %d jobs", numRows)
}
return nil
})
}

const (
// processQueryStatusTupleString includes the states of a job in which a
// job can be claimed and resumed.
processQueryStatusTupleString = `(` +
`'` + string(StatusRunning) + `', ` +
`'` + string(StatusReverting) + `'` +
`)`

// canRunArgs are used in canRunClause, which specify whether a job can be
// run now or not.
canRunArgs = `(SELECT $3 AS ts, $4 AS initial_delay, $5 AS max_delay) args`
canRunClause = `
args.ts >= COALESCE(last_run, created) + least(
IF(
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0,
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT,
args.max_delay
),
args.max_delay
)::INTERVAL
`
// processQueryBase and processQueryWhereBase select IDs of the jobs that
// can be processed among the claimed jobs.
processQueryBase = `SELECT id FROM system.jobs`
processQueryWhereBase = ` status IN ` + processQueryStatusTupleString + ` AND (claim_session_id = $1 AND claim_instance_id = $2)`

processQueryWithoutBackoff = processQueryBase + " WHERE " + processQueryWhereBase
processQueryWithBackoff = processQueryBase + ", " + canRunArgs +
" WHERE " + processQueryWhereBase + " AND " + canRunClause

resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
resumeQueryWithoutBackoff = `SELECT ` + resumeQueryBaseCols + ` FROM system.jobs WHERE ` + resumeQueryWhereBase
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run` +
` FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
)

// getProcessQuery returns the query that selects the jobs that are claimed
// by this node.
func getProcessQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
// Select the running or reverting jobs that this node has claimed.
query := processQueryWithoutBackoff
args := []interface{}{s.ID().UnsafeBytes(), r.ID()}
// Gating the version that introduced job retries with exponential backoff.
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query = processQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
return query, args
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
query, args := getProcessQuery(ctx, s, r)

it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args...,
)
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
Expand All @@ -134,7 +190,6 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
}

r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
return nil
Expand Down Expand Up @@ -190,12 +245,18 @@ func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions(
// resumeJob resumes a claimed job.
func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session) error {
log.Infof(ctx, "job %d: resuming execution", jobID)
resumeQuery := resumeQueryWithoutBackoff
args := []interface{}{jobID, s.ID().UnsafeBytes()}
backoffIsActive := r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff)
if backoffIsActive {
resumeQuery = resumeQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
jobID, s.ID().UnsafeBytes(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, resumeQuery, args...,
)
if err != nil {
return errors.Wrapf(err, "job %d: could not query job table row", jobID)
Expand All @@ -218,6 +279,25 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID())
}

if backoffIsActive {
// It's too soon to run the job.
//
// We need this check to address a race between adopt-loop and an existing
// resumer, e.g., in the following schedule:
// Adopt loop: Cl(j,n1) St(r1) Cl(j, n1) St(r2)
// Resumer 1: Rg(j) Up(n1->n2) Fl(j) Ur(j)
// Resumer 2: x-| Starting too soon
// Where:
// - Cl(j,nx): claim job j when num_runs is x
// - St(r1): start resumer r1
// - Rg(j): Add jobID of j in adoptedJobs, disabling further resumers
// - Ur(j): Remove jobID of j from adoptedJobs, enabling further resumers
// - Up(n1->2): Update number of runs from 1 to 2.
if !(*row[4].(*tree.DBool)) {
return nil
}
}

payload, err := UnmarshalPayload(row[1])
if err != nil {
return err
Expand Down Expand Up @@ -248,6 +328,7 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
r.metrics.ResumedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
Expand All @@ -266,6 +347,9 @@ func (r *Registry) removeAdoptedJob(jobID jobspb.JobID) {
}

func (r *Registry) addAdoptedJob(jobID jobspb.JobID, aj *adoptedJob) {
// TODO(sajjad): We should check whether adoptedJobs already has jobID or not. If
// the ID exists, we should not add it again amd the caller should not start
// another resumer.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.adoptedJobs[jobID] = aj
Expand Down Expand Up @@ -334,16 +418,18 @@ func (r *Registry) runJob(
return err
}

const cancelQuery = `
UPDATE system.jobs
SET status =
CASE
WHEN status = $1 THEN $2
WHEN status = $3 THEN $4
ELSE status
END
WHERE (status IN ($1, $3)) AND ((claim_session_id = $5) AND (claim_instance_id = $6))
RETURNING id, status`
const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
`

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -358,10 +444,7 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
// registry).
rows, err := r.ex.QueryBufferedEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
cancelQuery,
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
pauseAndCancelUpdate, s.ID().UnsafeBytes(), r.ID(),
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
Expand All @@ -381,6 +464,12 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
ju.UpdatePayload(md.Payload)
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// When we cancel a job, we want to reset its last_run and num_runs
// so that the job can be picked-up in the next adopt-loop, sooner
// than its current next-retry time.
ju.UpdateRunStats(0 /* numRuns */, r.clock.Now().GoTime() /* lastRun */)
}
return nil
}); err != nil {
return errors.Wrapf(err, "job %d: tried to cancel but could not mark as reverting: %s", id, err)
Expand Down
Loading

0 comments on commit fe8fb0e

Please sign in to comment.