Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68176: changefeedccl: add on_error option to pause changefeeds on failure r=spiffyyeng a=spiffyyeng

Previously, changefeeds always failed when encountering a non-
retryable error. This option allows the user to pause on failure
and resume later, while still failing as default behavior.

Release note (enterprise change): new 'on_error' option to pause
on non-retryable errors instead of failing.

68249: coldataext: remove a wrapper and use tree.Datum directly r=yuzefovich a=yuzefovich

**coldataext: propagate evalCtx directly for casts**

This commit removes the reliance on propagating `datumVec` objects to
supply the cast function with an eval context. All casts in the datum
land have been refactored to use `tree.PerformCast` for simplicity.

Note that the eval context is not removed from the `datumVec` because it
is used by `CompareDatum` method which is a lot more common (and, thus,
would require more plumbing to get rid off).

Release note: None

**coldataext: remove a wrapper and use tree.Datum directly**

Previously, we had separate `coldataext.Datum` wrapper around
`tree.Datum`. I don't remember what was the reasoning behind introducing
it (probably so that we could define methods with the wrapper as the
receiver in `coldataext` package), but it seems unnecessary. It also has
some performance cost because on every `DatumVec.Get` we are currently
allocating a new object on the heap. This commit removes the wrapper in
favor of working with `tree.Datum`s directly. This work was prompted by
looking at some profiles around the memory accounting for datum-backed
types in the cFetcher.

Release note: None

68256: changefeedccl: Add pushback duration metric to blocking buffer. r=miretskiy a=miretskiy

Add a blocking buffer metric which keeps track of the amount
of time waited for resource aquisition.

Release Notes: None

Co-authored-by: Ryan Min <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
4 people committed Jul 30, 2021
4 parents ba978d6 + 602b1a7 + 62b3462 + 4191609 commit cfc525f
Show file tree
Hide file tree
Showing 87 changed files with 1,989 additions and 1,348 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/sql",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdctest
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
)
Expand Down Expand Up @@ -67,6 +68,10 @@ type EnterpriseTestFeed interface {
Pause() error
// Resume restarts the feed from the last changefeed-wide resolved timestamp.
Resume() error
// WaitForStatus waits for the provided func to return true, or returns an error.
WaitForStatus(func(s jobs.Status) bool) error
// FetchTerminalJobErr retrieves the error message from changefeed job.
FetchTerminalJobErr() error
// Details returns changefeed details for this feed.
Details() (*jobspb.ChangefeedDetails, error)
}
65 changes: 63 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,20 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
`unknown %s: %s`, opt, v)
}
}
{
const opt = changefeedbase.OptOnError
switch v := changefeedbase.OnErrorType(details.Opts[opt]); v {
case ``, changefeedbase.OptOnErrorFail:
details.Opts[opt] = string(changefeedbase.OptOnErrorFail)
case changefeedbase.OptOnErrorPause:
// No-op.
default:
return jobspb.ChangefeedDetails{}, errors.Errorf(
`unknown %s: %s, valid values are '%s' and '%s'`, opt, v,
changefeedbase.OptOnErrorPause,
changefeedbase.OptOnErrorFail)
}
}
return details, nil
}

Expand Down Expand Up @@ -599,6 +613,55 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()

err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg)
if err != nil {
return b.handleChangefeedError(ctx, err, details, jobExec)
}
return nil
}

func (b *changefeedResumer) handleChangefeedError(
ctx context.Context,
changefeedErr error,
details jobspb.ChangefeedDetails,
jobExec sql.JobExecContext,
) error {
switch onError := changefeedbase.OnErrorType(details.Opts[changefeedbase.OptOnError]); onError {
// default behavior
case changefeedbase.OptOnErrorFail:
return changefeedErr
// pause instead of failing
case changefeedbase.OptOnErrorPause:
// note: we only want the job to pause here if a failure happens, not a
// user-initiated cancellation. if the job has been canceled, the ctx
// will handle it and the pause will return an error.
return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
return err
}
// directly update running status to avoid the running/reverted job status check
progress.RunningStatus = fmt.Sprintf("job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
log.Warningf(ctx, "job failed (%v) but is being paused because of %s=%s", changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return nil
})
default:
return errors.Errorf("unrecognized option value: %s=%s",
changefeedbase.OptOnError, details.Opts[changefeedbase.OptOnError])
}
}

func (b *changefeedResumer) resumeWithRetries(
ctx context.Context,
jobExec sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
execCfg *sql.ExecutorConfig,
) error {
// We'd like to avoid failing a changefeed unnecessarily, so when an error
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
Expand Down Expand Up @@ -665,8 +728,6 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
progress = reloadedJob.Progress()
}
}
// We only hit this if `r.Next()` returns false, which right now only happens
// on context cancellation.
return errors.Wrap(err, `ran out of retries`)
}

Expand Down
108 changes: 108 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,6 +2774,16 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='row'`,
`webhook-https://fake-host`,
)

// Sanity check on_error option
sqlDB.ExpectErr(
t, `option "on_error" requires a value`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error`,
`kafka://nope`)
sqlDB.ExpectErr(
t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`,
`kafka://nope`)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down Expand Up @@ -4177,3 +4187,101 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
// we can control the ordering of errors
t.Run(`webhook`, webhookTest(testFn))
}

func TestChangefeedOnErrorOption(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`pause on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// check for paused status on failure
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusPaused }))

// Verify job progress contains paused on error status.
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
registry := f.Server().JobRegistry().(*jobs.Registry)
job, err := registry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
require.Contains(t, job.Progress().RunningStatus, "job failed (should fail with custom error) but is being paused because of on_error=pause")
knobs.BeforeEmitRow = nil

require.NoError(t, feedJob.Resume())
// changefeed should continue to work after it has been resumed
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
})

closeFeed(t, foo)
// cancellation should still go through if option is in place
// to avoid race condition, check only that the job is progressing to be
// canceled (we don't know what stage it will be in)
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusCancelRequested ||
s == jobs.StatusReverting ||
s == jobs.StatusCanceled
}))
})

t.Run(`fail on error`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})

t.Run(`default`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE quux (a INT PRIMARY KEY, b STRING)`)

knobs := f.Server().TestingKnobs().
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func(_ context.Context) error {
return errors.Errorf("should fail with custom error")
}

foo := feed(t, f, `CREATE CHANGEFEED FOR quux`)
sqlDB.Exec(t, `INSERT INTO quux VALUES (1, 'a')`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// if no option is provided, fail should be the default behavior
require.NoError(t, feedJob.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
require.EqualError(t, feedJob.FetchTerminalJobErr(), "should fail with custom error")
})
}

t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type EnvelopeType string
// FormatType configures the encoding format.
type FormatType string

// OnErrorType configures the job behavior when an error occurs.
type OnErrorType string

// SchemaChangeEventClass defines a set of schema change event types which
// trigger the action defined by the SchemaChangeEventPolicy.
type SchemaChangeEventClass string
Expand Down Expand Up @@ -44,6 +47,7 @@ const (
OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`

// OptSchemaChangeEventClassColumnChange corresponds to all schema change
// events which add or remove any column.
Expand Down Expand Up @@ -88,6 +92,9 @@ const (
OptFormatAvro FormatType = `experimental_avro`
OptFormatNative FormatType = `native`

OptOnErrorFail OnErrorType = `fail`
OptOnErrorPause OnErrorType = `pause`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`

Expand Down Expand Up @@ -147,4 +154,5 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptKafkaSinkConfig: sql.KVStringOptRequireValue,
OptWebhookAuthHeader: sql.KVStringOptRequireValue,
OptWebhookClientTimeout: sql.KVStringOptRequireValue,
OptOnError: sql.KVStringOptRequireValue,
}
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ go_test(
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/settings/cluster",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_stretchr_testify//require",
],
)
18 changes: 12 additions & 6 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ type blockingBuffer struct {
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(acc mon.BoundAccount, metrics *Metrics) Buffer {
func NewMemBuffer(acc mon.BoundAccount, metrics *Metrics, opts ...quotapool.Option) Buffer {
bb := &blockingBuffer{
signalCh: make(chan struct{}, 1),
}
bb.acc = acc
bb.metrics = metrics
bb.qp = quotapool.New("changefeed", &bb.blockingBufferQuotaPool)

opts = append(opts,
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))

bb.qp = quotapool.New("changefeed", &bb.blockingBufferQuotaPool, opts...)
return bb
}

Expand Down Expand Up @@ -192,10 +199,9 @@ func (b *blockingBufferQuotaPool) release(e *bufferEntry) {
type bufferEntry struct {
e Event

alloc int64 // bytes allocated from the quotapool
err error // error populated from under the quotapool

next *bufferEntry // linked-list element
alloc int64 // bytes allocated from the quotapool
err error // error populated from under the quotapool
next *bufferEntry // linked-list element
}

var bufferEntryPool = sync.Pool{
Expand Down
Loading

0 comments on commit cfc525f

Please sign in to comment.