Skip to content

Commit

Permalink
Merge #66889 #68212
Browse files Browse the repository at this point in the history
66889: jobs: retry jobs with exponential backoff r=ajwerner a=sajjadrizvi

This commit adds a mechanism to retry jobs with exponentially increasing
delays. This is achieved through two new columns in system.jobs table,
last_run and num_runs. In addition, this commit adds cluster settings
to control exponential-backoff parameters, initial delay and max delay,
with corresponding settings `jobs.registry.retry.initial_delay` and
`jobs.registry.retry.max_delay`. Finally, this commit adds a new
partial-index in the jobs table that improves the performance of periodic 
queries run by registry in each node.

Release note (general change): The behavior for retrying jobs, which fail
due to a retriable error or due to job coordinator failure, is now delayed
using exponential backoff. Before this change, jobs which failed in a
retryable manner, would be resumed immediately on a different coordinator.
This change reduces the impact of recurrently failing jobs on the cluster.
This change adds two new cluster settings that control this behavior:
"jobs.registry.retry.initial_delay" and "jobs.registry.retry.max_delay",
which respectively control initial delay and maximum delay between 
resumptions.

Fixes #44594
Fixes #65080

68212: colexec: add optimized versions of aggregate window functions r=DrewKimball a=DrewKimball

**colexecwindow: add sliding window functionality to window framer**

This commit adds a method `slidingWindowIntervals` to `windowFramer`
operators that returns a set of `toAdd` intervals and a set of
`toRemove` intervals, which indicate the rows that should be added
to the current aggregation and those that should be removed, respectively.
This will be used to implement the sliding window optimization for
aggregate window functions such as `sum`.

**colexecwindow: implement sliding window aggregator**

This commit supplies a new operator, `slidingWindowAggregator`, which
is used for any window aggregate functions that implement the
`slidingWindowAggregateFunc` interface. Rather than aggregating over
the entire window frame for each row, the `slidingWindowAggregator`
operator aggregates over the rows that are in the current window
frame but were not in the previous, and removes from the aggregation
the rows that were in the previous window frame but not the current.
This allows window aggregate functions to be evaluated in linear rather
than quadratic time.

**colexec: implement sliding window optimization for sum window function**

This commit modifies the `sum` aggregate window function to implement
the `slidingWindowAggregateFunc`, which allows it to be used in a
sliding window context. This yields linear rather than quadratic scaling
in the worst case, and allows the vectorized engine to meet or exceed
parity with the row engine for `sum` window functions.

**colexec: implement sliding window optimization for count window function**

This commit modifies the count aggregate operator to implement the
`slidingWindowAggregateFunc` interface so that it can be used with
the sliding window optimization.

**colexec: implement sliding window optimization for average window function**

This commit modifies the `average` aggregate operator to implement the
`slidingWindowAggregateFunc` interface so that it can be used with the
sliding window optimization.

**colexec: optimize count_rows window function**

This commit implements an optimized version of `count_rows` that
calculates the size of the window frame as soon as the window frame
is calculated. This means that most of the overhead for `count_rows`
now comes from calculating the window frame, which is worst-case
linear time (previously, the step to retrieve the size of the frame
was quadratic, though with a small constant).

**colexec: optimize min and max window functions with default exclusion**

This commit modifies the 'min' and 'max' aggregate window functions
to implement the `slidingWindowAggregateFunc` interface, which allows
them to be used in a sliding window context. However, this is only
usable when the window frame never shrinks - e.g. it always contains
all rows from the previous frame.

This commit also provides implementations of `min` and `max` for use
when the window frame can shrink. The indices of the 'next best'
minimum or maximum values are stored in a priority queue that is
updated for each row. Using the priority queue allows the `min` and
`max` operators to avoid fully aggregating over the window frame
even when the previous best value goes out of scope. Note that this
implementation currently does not handle the case of non-default
exclusion clause, in which case we must fall back to the quadratic
approach.

Fixes: #37039

Release note (performance improvement): The vectorized engine can now
use the sliding-window approach to execute common aggregate functions 
as window functions. This allows aggregate window functions to be evaluated
in linear rather than quadratic time. Currently, sum, count, average, min, and 
max are executed using this approach.

Co-authored-by: Sajjad Rizvi <[email protected]>
Co-authored-by: Drew Kimball <[email protected]>
  • Loading branch information
3 people committed Aug 14, 2021
3 parents 0880e83 + fa98ca1 + cd63a65 commit 5dc07da
Show file tree
Hide file tree
Showing 102 changed files with 11,960 additions and 2,023 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -887,12 +887,14 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecwindow/lag.eg.go \
pkg/sql/colexec/colexecwindow/last_value.eg.go \
pkg/sql/colexec/colexecwindow/lead.eg.go \
pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go \
pkg/sql/colexec/colexecwindow/ntile.eg.go \
pkg/sql/colexec/colexecwindow/nth_value.eg.go \
pkg/sql/colexec/colexecwindow/range_offset_handler.eg.go \
pkg/sql/colexec/colexecwindow/rank.eg.go \
pkg/sql/colexec/colexecwindow/relative_rank.eg.go \
pkg/sql/colexec/colexecwindow/row_number.eg.go \
pkg/sql/colexec/colexecwindow/window_aggregator.eg.go \
pkg/sql/colexec/colexecwindow/window_framer.eg.go \
pkg/sql/colexec/colexecwindow/window_peer_grouper.eg.go

Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-130 set the active cluster version in the format '<major>.<minor>'
version version 21.1-132 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-130</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-132</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,10 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
},
}
}

// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
// Reduce retry delays.
sqlDBRestore.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '1ms'")
// Expect the restore to succeed.
sqlDBRestore.CheckQueryResultsRetry(t,
`SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`,
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ const (
// PostSeparatedIntentsMigration runs a cleanup migration after the main
// SeparatedIntentsMigration.
PostSeparatedIntentsMigration
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -461,6 +463,10 @@ var versionsSingleton = keyedVersions{
Key: PostSeparatedIntentsMigration,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 130},
},
{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 132},
},

// Step (2): Add new versions here.
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
Expand Down
160 changes: 131 additions & 29 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -43,16 +44,16 @@ const (
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)`
)

const claimQuery = `
claimQuery = `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE (claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `)
WHERE ((claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `))
ORDER BY created DESC
LIMIT $3
RETURNING id;`
)

func (r *Registry) maybeDumpTrace(
resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error,
Expand Down Expand Up @@ -96,26 +97,87 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, claimQuery,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
r.metrics.ClaimedJobs.Inc(int64(numRows))
if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 {
log.Infof(ctx, "claimed %d jobs", numRows)
}
return nil
})
}

const (
// processQueryStatusTupleString includes the states of a job in which a
// job can be claimed and resumed.
processQueryStatusTupleString = `(` +
`'` + string(StatusRunning) + `', ` +
`'` + string(StatusReverting) + `'` +
`)`

// canRunArgs are used in canRunClause, which specify whether a job can be
// run now or not.
canRunArgs = `(SELECT $3::TIMESTAMP AS ts, $4::FLOAT AS initial_delay, $5::FLOAT AS max_delay) args`
canRunClause = `
args.ts >= COALESCE(last_run, created) + least(
IF(
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT >= 0.0,
args.initial_delay * (power(2, least(62, COALESCE(num_runs, 0))) - 1)::FLOAT,
args.max_delay
),
args.max_delay
)::INTERVAL
`
// processQueryBase and processQueryWhereBase select IDs of the jobs that
// can be processed among the claimed jobs.
processQueryBase = `SELECT id FROM system.jobs`
processQueryWhereBase = ` status IN ` + processQueryStatusTupleString + ` AND (claim_session_id = $1 AND claim_instance_id = $2)`

processQueryWithoutBackoff = processQueryBase + " WHERE " + processQueryWhereBase
processQueryWithBackoff = processQueryBase + ", " + canRunArgs +
" WHERE " + processQueryWhereBase + " AND " + canRunClause

resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
resumeQueryWithoutBackoff = `SELECT ` + resumeQueryBaseCols + ` FROM system.jobs WHERE ` + resumeQueryWhereBase
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run` +
` FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
)

// getProcessQuery returns the query that selects the jobs that are claimed
// by this node.
func getProcessQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
// Select the running or reverting jobs that this node has claimed.
query := processQueryWithoutBackoff
args := []interface{}{s.ID().UnsafeBytes(), r.ID()}
// Gating the version that introduced job retries with exponential backoff.
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query = processQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
if r.knobs.IntervalOverrides.RetryInitialDelay != nil {
initDelay = r.knobs.IntervalOverrides.RetryInitialDelay.Seconds()
}
if r.knobs.IntervalOverrides.RetryMaxDelay != nil {
maxDelay = r.knobs.IntervalOverrides.RetryMaxDelay.Seconds()
}
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
return query, args
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
query, args := getProcessQuery(ctx, s, r)

it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args...,
)
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
Expand All @@ -134,7 +196,6 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
}

r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
return nil
Expand Down Expand Up @@ -190,12 +251,24 @@ func (r *Registry) filterAlreadyRunningAndCancelFromPreviousSessions(
// resumeJob resumes a claimed job.
func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session) error {
log.Infof(ctx, "job %d: resuming execution", jobID)
resumeQuery := resumeQueryWithoutBackoff
args := []interface{}{jobID, s.ID().UnsafeBytes()}
backoffIsActive := r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff)
if backoffIsActive {
resumeQuery = resumeQueryWithBackoff
initDelay := retryInitialDelaySetting.Get(&r.settings.SV).Seconds()
maxDelay := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
if r.knobs.IntervalOverrides.RetryInitialDelay != nil {
initDelay = r.knobs.IntervalOverrides.RetryInitialDelay.Seconds()
}
if r.knobs.IntervalOverrides.RetryMaxDelay != nil {
maxDelay = r.knobs.IntervalOverrides.RetryMaxDelay.Seconds()
}
args = append(args, r.clock.Now().GoTime(), initDelay, maxDelay)
}
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)
FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
jobID, s.ID().UnsafeBytes(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, resumeQuery, args...,
)
if err != nil {
return errors.Wrapf(err, "job %d: could not query job table row", jobID)
Expand All @@ -218,6 +291,26 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
return errors.Errorf("job %d: claim with session id %s has expired", jobID, s.ID())
}

if backoffIsActive {
// It's too soon to run the job.
//
// We need this check to address a race between adopt-loop and an existing
// resumer, e.g., in the following schedule:
// Adopt loop: Cl(j,n1) St(r1) Cl(j, n1) St(r2)
// Resumer 1: Rg(j) Up(n1->n2) Fl(j) Ur(j)
// Resumer 2: x-| Starting too soon
// Where:
// - Cl(j,nx): claim job j when num_runs is x
// - St(r1): start resumer r1
// - Rg(j): Add jobID of j in adoptedJobs, disabling further resumers
// - Ur(j): Remove jobID of j from adoptedJobs, enabling further resumers
// - Up(n1->2): Update number of runs from 1 to 2
// - Fl(j): Job j fails
if !(*row[4].(*tree.DBool)) {
return nil
}
}

payload, err := UnmarshalPayload(row[1])
if err != nil {
return err
Expand Down Expand Up @@ -248,6 +341,7 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
r.metrics.ResumedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
Expand All @@ -266,6 +360,9 @@ func (r *Registry) removeAdoptedJob(jobID jobspb.JobID) {
}

func (r *Registry) addAdoptedJob(jobID jobspb.JobID, aj *adoptedJob) {
// TODO(sajjad): We should check whether adoptedJobs already has jobID or not. If
// the ID exists, we should not add it again and the caller should not start
// another resumer.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.adoptedJobs[jobID] = aj
Expand Down Expand Up @@ -334,16 +431,18 @@ func (r *Registry) runJob(
return err
}

const cancelQuery = `
UPDATE system.jobs
SET status =
CASE
WHEN status = $1 THEN $2
WHEN status = $3 THEN $4
ELSE status
END
WHERE (status IN ($1, $3)) AND ((claim_session_id = $5) AND (claim_instance_id = $6))
RETURNING id, status`
const pauseAndCancelUpdate = `
UPDATE system.jobs
SET status =
CASE
WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `'
WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `'
ELSE status
END
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
`

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -358,10 +457,7 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
// registry).
rows, err := r.ex.QueryBufferedEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
cancelQuery,
StatusPauseRequested, StatusPaused,
StatusCancelRequested, StatusReverting,
s.ID().UnsafeBytes(), r.ID(),
pauseAndCancelUpdate, s.ID().UnsafeBytes(), r.ID(),
)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
Expand All @@ -381,6 +477,12 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
ju.UpdatePayload(md.Payload)
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// When we cancel a job, we want to reset its last_run and num_runs
// so that the job can be picked-up in the next adopt-loop, sooner
// than its current next-retry time.
ju.UpdateRunStats(0 /* numRuns */, r.clock.Now().GoTime() /* lastRun */)
}
return nil
}); err != nil {
return errors.Wrapf(err, "job %d: tried to cancel but could not mark as reverting: %s", id, err)
Expand Down
Loading

0 comments on commit 5dc07da

Please sign in to comment.