-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: add on_error option to pause changefeeds on failure #68176
Conversation
6a5022c
to
66bc601
Compare
@@ -665,8 +709,6 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err | |||
progress = reloadedJob.Progress() | |||
} | |||
} | |||
// We only hit this if `r.Next()` returns false, which right now only happens | |||
// on context cancellation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outdated comment: we can hit this if the retry count is exhausted as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice!
Just few minor nits/questions.
Reviewed 6 of 6 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @spiffyyeng)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 555 at r1 (raw file):
default: return jobspb.ChangefeedDetails{}, errors.Errorf( `unknown %s: %s`, opt, v)
I would add maybe a a hint (errors.WithHint), or just expand the error message with "valid values are....."
pkg/ccl/changefeedccl/changefeed_stmt.go, line 629 at r1 (raw file):
return pauseErr } // TODO (ryan min): Populate pause reason with error once column is added (#67928)
We already have that. See running status (see changefeed_processors).
In order to update run status and indicate that we're pausing because of this policy, as opposed to user initiated action, we will probably need to export pauseRequested
method (as well as onPauseRequestFunc
) so that we can directly call b.job.PauseRequested, and update run status in the passed in function.
pkg/ccl/changefeedccl/changefeed_stmt.go, line 633 at r1 (raw file):
Quoted 18 lines of code…
switch onError := changefeedbase.OnErrorType(details.Opts[changefeedbase.OptOnError]); onError { // default behavior case changefeedbase.OptOnErrorFail: return err // pause instead of failing case changefeedbase.OptOnErrorPause: // note: we only want the job to pause here if a failure happens, not a // user-initiated cancellation. if the job has been canceled, the ctx // will handle it and the pause will return an error. pauseErr := execCfg.JobRegistry.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, jobID) if pauseErr != nil { return pauseErr } // TODO (ryan min): Populate pause reason with error once column is added (#67928) default: return errors.Errorf("unrecognized option value: %s=%s", changefeedbase.OptOnError, details.Opts[changefeedbase.OptOnError]) }
I would move this into a helper handleChangefeedError()
which returns an error (or nil)..
So, this code becomes:
if err != nil {
err = handleChangefeedError(....)
}
return err
pkg/ccl/changefeedccl/changefeed_test.go, line 4265 at r1 (raw file):
// check for paused status on failure err := feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused })
require.NoError(t, feedJob.WaitForStatus...)?
pkg/ccl/changefeedccl/changefeed_test.go, line 4345 at r1 (raw file):
t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) t.Run(`webhook`, webhookTest(testFn))
nice test.
66bc601
to
4c803b9
Compare
4c803b9
to
43a342e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @spiffyyeng)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 555 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I would add maybe a a hint (errors.WithHint), or just expand the error message with "valid values are....."
Done.
pkg/ccl/changefeedccl/changefeed_stmt.go, line 629 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
We already have that. See running status (see changefeed_processors).
In order to update run status and indicate that we're pausing because of this policy, as opposed to user initiated action, we will probably need to exportpauseRequested
method (as well asonPauseRequestFunc
) so that we can directly call b.job.PauseRequested, and update run status in the passed in function.
discussed offline. run status is updated internally in the progress column but is not visible in SHOW JOBS
when paused.
pkg/ccl/changefeedccl/changefeed_stmt.go, line 633 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
switch onError := changefeedbase.OnErrorType(details.Opts[changefeedbase.OptOnError]); onError { // default behavior case changefeedbase.OptOnErrorFail: return err // pause instead of failing case changefeedbase.OptOnErrorPause: // note: we only want the job to pause here if a failure happens, not a // user-initiated cancellation. if the job has been canceled, the ctx // will handle it and the pause will return an error. pauseErr := execCfg.JobRegistry.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, jobID) if pauseErr != nil { return pauseErr } // TODO (ryan min): Populate pause reason with error once column is added (#67928) default: return errors.Errorf("unrecognized option value: %s=%s", changefeedbase.OptOnError, details.Opts[changefeedbase.OptOnError]) }
I would move this into a helper
handleChangefeedError()
which returns an error (or nil)..
So, this code becomes:if err != nil { err = handleChangefeedError(....) } return err
Done.
pkg/ccl/changefeedccl/changefeed_test.go, line 4265 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
require.NoError(t, feedJob.WaitForStatus...)?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r3.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy and @spiffyyeng)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 646 at r3 (raw file):
// directly update running status to avoid the running/reverted job status check progress.RunningStatus = fmt.Sprintf("paused on error: %v", changefeedErr) log.Warningf(ctx, progress.RunningStatus)
nit: Let's be more verbose here
log.Warningf(ctx, "job failed (%v) but is being paused because of on_error=pause", err)
43a342e
to
274c8ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy and @spiffyyeng)
pkg/ccl/changefeedccl/changefeed_stmt.go, line 646 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nit: Let's be more verbose here
log.Warningf(ctx, "job failed (%v) but is being paused because of on_error=pause", err)
Done.
TFTR! |
Previously, changefeeds always failed when encountering a non- retryable error. This option allows the user to pause on failure and resume later, while still failing as default behavior. Resolves cockroachdb#67965 Release note (enterprise change): new 'on_error' option to pause on non-retryable errors instead of failing.
274c8ef
to
602b1a7
Compare
bors r+ |
Build succeeded: |
Previously, changefeeds always failed when encountering a non-
retryable error. This option allows the user to pause on failure
and resume later, while still failing as default behavior.
Release note (enterprise change): new 'on_error' option to pause
on non-retryable errors instead of failing.