Skip to content

Commit

Permalink
changefeedccl: add on_error option to pause changefeeds on failure
Browse files Browse the repository at this point in the history
Previously, changefeeds always failed when encountering a non-
retryable error. This option allows the user to pause on failure
and resume later, while still failing as default behavior.

Resolves #67965

Release note (enterprise change): new 'on_error' option to pause
on non-retryable errors instead of failing.
  • Loading branch information
spiffyy99 committed Jul 30, 2021
1 parent a35b845 commit 602b1a7
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/sql",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdctest
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
)
Expand Down Expand Up @@ -67,6 +68,10 @@ type EnterpriseTestFeed interface {
Pause() error
// Resume restarts the feed from the last changefeed-wide resolved timestamp.
Resume() error
// WaitForStatus waits for the provided func to return true, or returns an error.
WaitForStatus(func(s jobs.Status) bool) error
// FetchTerminalJobErr retrieves the error message from changefeed job.
FetchTerminalJobErr() error
// Details returns changefeed details for this feed.
Details() (*jobspb.ChangefeedDetails, error)
}
65 changes: 63 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,20 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
`unknown %s: %s`, opt, v)
}
}
{
const opt = changefeedbase.OptOnError
switch v := changefeedbase.OnErrorType(details.Opts[opt]); v {
case ``, changefeedbase.OptOnErrorFail:
details.Opts[opt] = string(changefeedbase.OptOnErrorFail)
case changefeedbase.OptOnErrorPause:
// No-op.
default:
return jobspb.ChangefeedDetails{}, errors.Errorf(
`unknown %s: %s, valid values are '%s' and '%s'`, opt, v,
changefeedbase.OptOnErrorPause,
changefeedbase.OptOnErrorFail)
}
}
return details, nil
}

Expand Down Expand Up @@ -599,6 +613,55 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()

err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg)
if err != nil {
return b.handleChangefeedError(ctx, err, details, jobExec)
}
return nil
}

func (b *changefeedResumer) handleChangefeedError(
ctx context.Context,
changefeedErr error,
details jobspb.ChangefeedDetails,
jobExec sql.JobExecContext,
) error {
switch onError := changefeedbase.OnErrorType(details.Opts[changefeedbase.OptOnError]); onError {
// default behavior
case changefeedbase.OptOnErrorFail:
return changefeedErr
// pause instead of failing
case changefeedbase.OptOnErrorPause:
// note: we only want the job to pause here if a failure happens, not a
// user-initiated cancellation. if the job has been canceled, the ctx
// will handle it and the pause will return an error.
return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = fmt.Sprintf("job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
log.Warningf(ctx, "job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return nil
})
default:
return errors.Errorf("unrecognized option value: %s=%s",
changefeedbase.OptOnError, details.Opts[changefeedbase.OptOnError])
}
}

func (b *changefeedResumer) resumeWithRetries(
ctx context.Context,
jobExec sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
execCfg *sql.ExecutorConfig,
) error {
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
Expand Down Expand Up @@ -665,8 +728,6 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
progress = reloadedJob.Progress()
}
}
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
return errors.Wrap(err, `ran out of retries`)
}

Expand Down
108 changes: 108 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,16 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='row'`,
`webhook-https://fake-host`,
)

// Sanity check on_error option
sqlDB.ExpectErr(
t, `option "on_error" requires a value`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error`,
`kafka://nope`)
sqlDB.ExpectErr(
t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`,
`kafka://nope`)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down Expand Up @@ -4228,3 +4238,101 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
// we can control the ordering of errors
t.Run(`webhook`, webhookTest(testFn))
}

func TestChangefeedOnErrorOption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`pause on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// check for paused status on failure
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused }))

// Verify job progress contains paused on error status.
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
registry := f.Server().JobRegistry().(*jobs.Registry)
job, err := registry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
require.Contains(t, job.Progress().RunningStatus, "job failed (should fail with custom error) but is being paused because of on_error=pause")
knobs.BeforeEmitRow = nil

require.NoError(t, feedJob.Resume())
// changefeed should continue to work after it has been resumed
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
})

closeFeed(t, foo)
// cancellation should still go through if option is in place
// to avoid race condition, check only that the job is progressing to be
// canceled (we don't know what stage it will be in)
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusCancelRequested ||
s == jobs.StatusReverting ||
s == jobs.StatusCanceled
}))
})

t.Run(`fail on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})

t.Run(`default`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE quux (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR quux`)
sqlDB.Exec(t, `INSERT INTO quux VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// if no option is provided, fail should be the default behavior
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})
}

t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type EnvelopeType string
// FormatType configures the encoding format.
type FormatType string

// OnErrorType configures the job behavior when an error occurs.
type OnErrorType string

// SchemaChangeEventClass defines a set of schema change event types which
// trigger the action defined by the SchemaChangeEventPolicy.
type SchemaChangeEventClass string
Expand Down Expand Up @@ -44,6 +47,7 @@ const (
OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`

// OptSchemaChangeEventClassColumnChange corresponds to all schema change
// events which add or remove any column.
Expand Down Expand Up @@ -88,6 +92,9 @@ const (
OptFormatAvro FormatType = `experimental_avro`
OptFormatNative FormatType = `native`

OptOnErrorFail OnErrorType = `fail`
OptOnErrorPause OnErrorType = `pause`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`

Expand Down Expand Up @@ -147,4 +154,5 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptKafkaSinkConfig: sql.KVStringOptRequireValue,
OptWebhookAuthHeader: sql.KVStringOptRequireValue,
OptWebhookClientTimeout: sql.KVStringOptRequireValue,
OptOnError: sql.KVStringOptRequireValue,
}
25 changes: 17 additions & 8 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (f *jobFeed) jobFailed() {
// Already failed/done.
return
}
f.mu.terminalErr = f.fetchTerminalJobErr()
f.mu.terminalErr = f.FetchTerminalJobErr()
close(f.shutdown)
}

Expand All @@ -274,16 +274,22 @@ func (f *jobFeed) JobID() jobspb.JobID {
return f.jobID
}

func (f *jobFeed) waitForStatus(statusPred func(status jobs.Status) bool) error {
func (f *jobFeed) status() (status string, err error) {
err = f.db.QueryRowContext(context.Background(),
`SELECT status FROM system.jobs WHERE id = $1`, f.jobID).Scan(&status)
return
}

func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error {
if f.jobID == jobspb.InvalidJobID {
// Job may not have been started.
return nil
}
// Wait for the job status predicate to become true.
return testutils.SucceedsSoonError(func() error {
var status string
if err := f.db.QueryRowContext(context.Background(),
`SELECT status FROM system.jobs WHERE id = $1`, f.jobID).Scan(&status); err != nil {
var err error
if status, err = f.status(); err != nil {
return err
}
if statusPred(jobs.Status(status)) {
Expand All @@ -300,13 +306,16 @@ func (f *jobFeed) Pause() error {
if err != nil {
return err
}
return f.waitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused })
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused })
}

// Resume implements the TestFeed interface.
func (f *jobFeed) Resume() error {
_, err := f.db.Exec(`RESUME JOB $1`, f.jobID)
return err
if err != nil {
return err
}
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusRunning })
}

// Details implements FeedJob interface.
Expand All @@ -324,8 +333,8 @@ func (f *jobFeed) Details() (*jobspb.ChangefeedDetails, error) {
return payload.GetChangefeed(), nil
}

// fetchTerminalJobErr retrieves the error message from changefeed job.
func (f *jobFeed) fetchTerminalJobErr() error {
// FetchTerminalJobErr retrieves the error message from changefeed job.
func (f *jobFeed) FetchTerminalJobErr() error {
var errStr string
if err := f.db.QueryRow(
`SELECT error FROM [SHOW JOBS] WHERE job_id=$1`, f.jobID,
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (j *Job) FractionProgressed(

// paused sets the status of the tracked job to paused. It is called by the
// registry adoption loop by the node currently running a job to move it from
// pauseRequested to paused.
// PauseRequested to paused.
func (j *Job) paused(
ctx context.Context, txn *kv.Txn, fn func(context.Context, *kv.Txn) error,
) error {
Expand Down Expand Up @@ -472,11 +472,11 @@ type onPauseRequestFunc func(
ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress,
) error

// pauseRequested sets the status of the tracked job to pause-requested. It does
// PauseRequested sets the status of the tracked job to pause-requested. It does
// not directly pause the job; it expects the node that runs the job will
// actively cancel it when it notices that it is in state StatusPauseRequested
// and will move it to state StatusPaused.
func (j *Job) pauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequestFunc) error {
func (j *Job) PauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequestFunc) error {
return j.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
// Don't allow 19.2-style schema change jobs to undergo changes in job state
// before they undergo a migration to make them properly runnable in 20.1 and
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (r *Registry) PauseRequested(ctx context.Context, txn *kv.Txn, id jobspb.Jo
if pr, ok := resumer.(PauseRequester); ok {
onPauseRequested = pr.OnPauseRequest
}
return job.pauseRequested(ctx, txn, onPauseRequested)
return job.PauseRequested(ctx, txn, onPauseRequested)
}

// Succeeded marks the job with id as succeeded.
Expand Down

0 comments on commit 602b1a7

Please sign in to comment.