Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add on_error option to pause changefeeds on failure #68176

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/sql",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdctest
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
)
Expand Down Expand Up @@ -67,6 +68,10 @@ type EnterpriseTestFeed interface {
Pause() error
// Resume restarts the feed from the last changefeed-wide resolved timestamp.
Resume() error
// WaitForStatus waits for the provided func to return true, or returns an error.
WaitForStatus(func(s jobs.Status) bool) error
// FetchTerminalJobErr retrieves the error message from changefeed job.
FetchTerminalJobErr() error
// Details returns changefeed details for this feed.
Details() (*jobspb.ChangefeedDetails, error)
}
65 changes: 63 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,20 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
`unknown %s: %s`, opt, v)
}
}
{
const opt = changefeedbase.OptOnError
switch v := changefeedbase.OnErrorType(details.Opts[opt]); v {
case ``, changefeedbase.OptOnErrorFail:
details.Opts[opt] = string(changefeedbase.OptOnErrorFail)
case changefeedbase.OptOnErrorPause:
// No-op.
default:
return jobspb.ChangefeedDetails{}, errors.Errorf(
`unknown %s: %s, valid values are '%s' and '%s'`, opt, v,
changefeedbase.OptOnErrorPause,
changefeedbase.OptOnErrorFail)
}
}
return details, nil
}

Expand Down Expand Up @@ -599,6 +613,55 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()

err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg)
if err != nil {
return b.handleChangefeedError(ctx, err, details, jobExec)
}
return nil
}

func (b *changefeedResumer) handleChangefeedError(
ctx context.Context,
changefeedErr error,
details jobspb.ChangefeedDetails,
jobExec sql.JobExecContext,
) error {
switch onError := changefeedbase.OnErrorType(details.Opts[changefeedbase.OptOnError]); onError {
// default behavior
case changefeedbase.OptOnErrorFail:
return changefeedErr
// pause instead of failing
case changefeedbase.OptOnErrorPause:
// note: we only want the job to pause here if a failure happens, not a
// user-initiated cancellation. if the job has been canceled, the ctx
// will handle it and the pause will return an error.
return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = fmt.Sprintf("job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
log.Warningf(ctx, "job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return nil
})
default:
return errors.Errorf("unrecognized option value: %s=%s",
changefeedbase.OptOnError, details.Opts[changefeedbase.OptOnError])
}
}

func (b *changefeedResumer) resumeWithRetries(
ctx context.Context,
jobExec sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
execCfg *sql.ExecutorConfig,
) error {
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
Expand Down Expand Up @@ -665,8 +728,6 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
progress = reloadedJob.Progress()
}
}
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comment: we can hit this if the retry count is exhausted as well

return errors.Wrap(err, `ran out of retries`)
}

Expand Down
108 changes: 108 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,16 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='row'`,
`webhook-https://fake-host`,
)

// Sanity check on_error option
sqlDB.ExpectErr(
t, `option "on_error" requires a value`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error`,
`kafka://nope`)
sqlDB.ExpectErr(
t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`,
`kafka://nope`)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down Expand Up @@ -4228,3 +4238,101 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
// we can control the ordering of errors
t.Run(`webhook`, webhookTest(testFn))
}

func TestChangefeedOnErrorOption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`pause on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// check for paused status on failure
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused }))

// Verify job progress contains paused on error status.
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
registry := f.Server().JobRegistry().(*jobs.Registry)
job, err := registry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
require.Contains(t, job.Progress().RunningStatus, "job failed (should fail with custom error) but is being paused because of on_error=pause")
knobs.BeforeEmitRow = nil

require.NoError(t, feedJob.Resume())
// changefeed should continue to work after it has been resumed
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
})

closeFeed(t, foo)
// cancellation should still go through if option is in place
// to avoid race condition, check only that the job is progressing to be
// canceled (we don't know what stage it will be in)
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusCancelRequested ||
s == jobs.StatusReverting ||
s == jobs.StatusCanceled
}))
})

t.Run(`fail on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})

t.Run(`default`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE quux (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR quux`)
sqlDB.Exec(t, `INSERT INTO quux VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// if no option is provided, fail should be the default behavior
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})
}

t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type EnvelopeType string
// FormatType configures the encoding format.
type FormatType string

// OnErrorType configures the job behavior when an error occurs.
type OnErrorType string

// SchemaChangeEventClass defines a set of schema change event types which
// trigger the action defined by the SchemaChangeEventPolicy.
type SchemaChangeEventClass string
Expand Down Expand Up @@ -44,6 +47,7 @@ const (
OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`

// OptSchemaChangeEventClassColumnChange corresponds to all schema change
// events which add or remove any column.
Expand Down Expand Up @@ -88,6 +92,9 @@ const (
OptFormatAvro FormatType = `experimental_avro`
OptFormatNative FormatType = `native`

OptOnErrorFail OnErrorType = `fail`
OptOnErrorPause OnErrorType = `pause`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`

Expand Down Expand Up @@ -147,4 +154,5 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptKafkaSinkConfig: sql.KVStringOptRequireValue,
OptWebhookAuthHeader: sql.KVStringOptRequireValue,
OptWebhookClientTimeout: sql.KVStringOptRequireValue,
OptOnError: sql.KVStringOptRequireValue,
}
25 changes: 17 additions & 8 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (f *jobFeed) jobFailed() {
// Already failed/done.
return
}
f.mu.terminalErr = f.fetchTerminalJobErr()
f.mu.terminalErr = f.FetchTerminalJobErr()
close(f.shutdown)
}

Expand All @@ -274,16 +274,22 @@ func (f *jobFeed) JobID() jobspb.JobID {
return f.jobID
}

func (f *jobFeed) waitForStatus(statusPred func(status jobs.Status) bool) error {
func (f *jobFeed) status() (status string, err error) {
err = f.db.QueryRowContext(context.Background(),
`SELECT status FROM system.jobs WHERE id = $1`, f.jobID).Scan(&status)
return
}

func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error {
if f.jobID == jobspb.InvalidJobID {
// Job may not have been started.
return nil
}
// Wait for the job status predicate to become true.
return testutils.SucceedsSoonError(func() error {
var status string
if err := f.db.QueryRowContext(context.Background(),
`SELECT status FROM system.jobs WHERE id = $1`, f.jobID).Scan(&status); err != nil {
var err error
if status, err = f.status(); err != nil {
return err
}
if statusPred(jobs.Status(status)) {
Expand All @@ -300,13 +306,16 @@ func (f *jobFeed) Pause() error {
if err != nil {
return err
}
return f.waitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused })
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused })
}

// Resume implements the TestFeed interface.
func (f *jobFeed) Resume() error {
_, err := f.db.Exec(`RESUME JOB $1`, f.jobID)
return err
if err != nil {
return err
}
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusRunning })
}

// Details implements FeedJob interface.
Expand All @@ -324,8 +333,8 @@ func (f *jobFeed) Details() (*jobspb.ChangefeedDetails, error) {
return payload.GetChangefeed(), nil
}

// fetchTerminalJobErr retrieves the error message from changefeed job.
func (f *jobFeed) fetchTerminalJobErr() error {
// FetchTerminalJobErr retrieves the error message from changefeed job.
func (f *jobFeed) FetchTerminalJobErr() error {
var errStr string
if err := f.db.QueryRow(
`SELECT error FROM [SHOW JOBS] WHERE job_id=$1`, f.jobID,
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (j *Job) FractionProgressed(

// paused sets the status of the tracked job to paused. It is called by the
// registry adoption loop by the node currently running a job to move it from
// pauseRequested to paused.
// PauseRequested to paused.
func (j *Job) paused(
ctx context.Context, txn *kv.Txn, fn func(context.Context, *kv.Txn) error,
) error {
Expand Down Expand Up @@ -472,11 +472,11 @@ type onPauseRequestFunc func(
ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress,
) error

// pauseRequested sets the status of the tracked job to pause-requested. It does
// PauseRequested sets the status of the tracked job to pause-requested. It does
// not directly pause the job; it expects the node that runs the job will
// actively cancel it when it notices that it is in state StatusPauseRequested
// and will move it to state StatusPaused.
func (j *Job) pauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequestFunc) error {
func (j *Job) PauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequestFunc) error {
return j.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
// Don't allow 19.2-style schema change jobs to undergo changes in job state
// before they undergo a migration to make them properly runnable in 20.1 and
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (r *Registry) PauseRequested(ctx context.Context, txn *kv.Txn, id jobspb.Jo
if pr, ok := resumer.(PauseRequester); ok {
onPauseRequested = pr.OnPauseRequest
}
return job.pauseRequested(ctx, txn, onPauseRequested)
return job.PauseRequested(ctx, txn, onPauseRequested)
}

// Succeeded marks the job with id as succeeded.
Expand Down