Skip to content

Commit

Permalink
sql: add jobs backoff state columns in jobs internal table
Browse files Browse the repository at this point in the history
This commit adds new columns in `crdb_internal.jobs` table, which
show the current exponential-backoff state of a job and its execution
history.

Release justification: This commit adds low-risk updates to new
functionality. Jobs subsystem now supports job retries with
exponential-backoff. We want to give users more insights
about the backoff state of jobs and jobs' lifecycles through
additional columns in `crdb_internal.jobs` table.

Release note (general change): The functionality to retry failed
jobs with exponential-backoff has introduced recently in the system.
This commit adds new columns in `crdb_internal.jobs` table, which
show the current backoff-state of a job and its execution log. The
execution log consists of a sequence of job start and end events
and any associated errors that were encountered during the job's
each execution. Now users can query internal jobs table to get
more insights about jobs through the following columns: (a) `last_run`
shows the last execution time of a job, (b) `next_run` shows the
next execution time of a job based on exponential-backoff delay,
(c) `num_runs` shows the number of times the job has been executed,
and (d) `execution_log` provides a set of events that are generated
when a job starts and ends its execution.
  • Loading branch information
Sajjad Rizvi committed Aug 24, 2021
1 parent 6c05f99 commit 3ccdba8
Show file tree
Hide file tree
Showing 13 changed files with 1,067 additions and 391 deletions.
44 changes: 15 additions & 29 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,19 @@ const (

// canRunArgs are used in canRunClause, which specify whether a job can be
// run now or not.
canRunArgs = `(SELECT $3::TIMESTAMP AS ts, $4::FLOAT AS initial_delay, $5::FLOAT AS max_delay) args`
canRunClause = `
args.ts >= COALESCE(last_run, created) + least(
IF(
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0,
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT,
args.max_delay
),
args.max_delay
)::INTERVAL
`
canRunArgs = `(SELECT $3::TIMESTAMP AS ts, $4::FLOAT AS initial_delay, $5::FLOAT AS max_delay) args`
// NextRunClause calculates the next execution time of a job with exponential backoff delay, calculated
// using last_run and num_runs values.
NextRunClause = `
COALESCE(last_run, created) + least(
IF(
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0,
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT,
args.max_delay
),
args.max_delay
)::INTERVAL`
canRunClause = `args.ts >= ` + NextRunClause
// processQueryBase and processQueryWhereBase select IDs of the jobs that
// can be processed among the claimed jobs.
processQueryBase = `SELECT id FROM system.jobs`
Expand Down Expand Up @@ -158,15 +160,7 @@ func getProcessQuery(
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query = processQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
if r.knobs.IntervalOverrides.RetryInitialDelay != nil {
initDelay = r.knobs.IntervalOverrides.RetryInitialDelay.Seconds()
}
if r.knobs.IntervalOverrides.RetryMaxDelay != nil {
maxDelay = r.knobs.IntervalOverrides.RetryMaxDelay.Seconds()
}
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
args = append(args, r.clock.Now().GoTime(), r.RetryInitialDelay(), r.RetryMaxDelay())
}
return query, args
}
Expand Down Expand Up @@ -256,15 +250,7 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
backoffIsActive := r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff)
if backoffIsActive {
resumeQuery = resumeQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
if r.knobs.IntervalOverrides.RetryInitialDelay != nil {
initDelay = r.knobs.IntervalOverrides.RetryInitialDelay.Seconds()
}
if r.knobs.IntervalOverrides.RetryMaxDelay != nil {
maxDelay = r.knobs.IntervalOverrides.RetryMaxDelay.Seconds()
}
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
args = append(args, r.clock.Now().GoTime(), r.RetryInitialDelay(), r.RetryMaxDelay())
}
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
Expand Down
38 changes: 38 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3257,3 +3257,41 @@ func TestNonCancelableJobsRetry(t *testing.T) {
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
}

// TestExecutionLogToJSON tests conversion of an executionLog in jobs payload
// to a JSON string.
func TestExecutionLogToJSON(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, test := range []struct {
name string
executionLog []*jobspb.ExecutionEvent
expected string
}{
{
"empty",
[]*jobspb.ExecutionEvent{},
`[]`,
},
{
"with values",
[]*jobspb.ExecutionEvent{
{
InstanceId: 1,
Status: string(jobs.StatusRunning),
EventTimeMicros: timeutil.ToUnixMicros(timeutil.Unix(1, 0)),
ExecutionError: "error string",
Type: jobspb.JobStartEvent,
},
},
`[{"eventTimeMicros": "1000000", "executionError": "error string", "instanceId": 1, "status": "` + string(jobs.StatusRunning) + `", "type": "START"}]`,
},
} {
t.Run(test.name, func(t *testing.T) {
encoded, err := jobspb.ExecutionLogToJSON(test.executionLog)
require.NoError(t, err)
require.Equal(t, test.expected, encoded.String())
})
}
}
7 changes: 6 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "jobspb",
srcs = ["wrap.go"],
srcs = [
"json_encoding.go",
"wrap.go",
],
embed = [":jobspb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb",
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud",
"//pkg/sql/catalog/descpb",
"//pkg/sql/protoreflect",
"//pkg/util/json",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
],
Expand Down
Loading

0 comments on commit 3ccdba8

Please sign in to comment.