Skip to content

Commit

Permalink
changefeedccl: Rework error handling
Browse files Browse the repository at this point in the history
Prior to this PR, changefeeds would rely on a white list
approach in order to determine which errors were retryable.
All other errors would be deemed terminal, causing changefeed
to fail.

The above approach is brittle, and causes unwanted
changefeed termination.

This PR changes this approach to treat all errors as retryable,
unless otherwise indicated.  Errors that are known by changefeed
to be fatal are handled explicitly, by marking such errors
as terminal.  For example, changefeeds would exit
if the targetted table is dropped.  On the other hand, inability
to read this table for any reason would not be treated as
terminal.

Fixes cockroachdb#90320
Fixes cockroachdb#77549
Fixes cockroachdb#63317
Fixes cockroachdb#71341
Fixes cockroachdb#73016
Informs CRDB-6788
Informs CRDB-7581

Release note (enterprise change): Changefeed will now treat
all errors, unless otherwise indicated, as retryable errors.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 1, 2022
1 parent 82fdf05 commit 1a93051
Show file tree
Hide file tree
Showing 25 changed files with 246 additions and 293 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"metrics.go",
"name.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"schema_registry.go",
"scram_client.go",
"sink.go",
Expand Down Expand Up @@ -80,7 +81,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down Expand Up @@ -234,6 +234,8 @@ go_test(
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
Expand Down
47 changes: 27 additions & 20 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func(d tree.Datum, _ interface{}) (interface{}, error) {
date := *d.(*tree.DDate)
if !date.IsFinite() {
return nil, errors.Errorf(
`infinite date not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`infinite date not yet supported with avro`))
}
// The avro library requires us to return this as a time.Time.
return date.ToTime()
Expand Down Expand Up @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)
case types.DecimalFamily:
if typ.Precision() == 0 {
return nil, errors.Errorf(
`decimal with no precision not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`decimal with no precision not yet supported with avro`))
}

width := int(typ.Width())
Expand Down Expand Up @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
case types.ArrayFamily:
itemSchema, err := typeToAvroSchema(typ.ArrayContents())
if err != nil {
return nil, errors.Wrapf(err, `could not create item schema for %s`,
typ)
return nil, changefeedbase.WithTerminalError(
errors.Wrapf(err, `could not create item schema for %s`, typ))
}
itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1])

Expand Down Expand Up @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)

default:
return nil, errors.Errorf(`type %s not yet supported with avro`,
typ.SQLString())
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`type %s not yet supported with avro`, typ.SQLString()))
}

return schema, nil
Expand All @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) {
schema, err := typeToAvroSchema(col.Typ)
if err != nil {
return nil, errors.Wrapf(err, "column %s", col.Name)
return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name))
}
schema.Name = SQLNameToAvroName(col.Name)
schema.Metadata = col.SQLStringNotHumanReadable()
Expand Down Expand Up @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error)
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) {
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) {
fieldIdx, ok := r.fieldIdxByName[col.Name]
if !ok {
return errors.AssertionFailedf("could not find avro field for column %s", col.Name)
return changefeedbase.WithTerminalError(
errors.AssertionFailedf("could not find avro field for column %s", col.Name))
}
r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d)
return err
Expand All @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) {
avroDatums, ok := native.(map[string]interface{})
if !ok {
return nil, errors.Errorf(`unknown avro native type: %T`, native)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown avro native type: %T`, native))
}
if len(r.Fields) != len(avroDatums) {
return nil, errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums))
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums)))
}

row := make(rowenc.EncDatumRow, len(r.Fields))
Expand Down Expand Up @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `updated`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
Expand All @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `resolved`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k)
return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k))
}
return r.codec.BinaryFromNative(buf, native)
}
Expand All @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error {
// precision is set) this is roundtripable without information loss.
func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) {
if dec.Form != apd.Finite {
return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`cannot convert %s form decimal`, dec.Form))
}
if scale > 0 && scale != -dec.Exponent {
return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale))
}
var r big.Rat
if dec.Exponent >= 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -73,7 +74,8 @@ func (e *Evaluator) MatchesFilter(
) (_ bool, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating WHERE clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating WHERE clause: %s", pan))
}
}()
if e.where == nil {
Expand All @@ -96,7 +98,8 @@ func (e *Evaluator) Projection(
) (_ cdcevent.Row, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating SELECT clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating SELECT clause: %s", pan))
}
}()
if len(e.selectors) == 0 {
Expand All @@ -114,7 +117,8 @@ func (e *Evaluator) Projection(
func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while validating CHANGEFEED expression: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while validating CHANGEFEED expression: %s", pan))
}
}()
if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -47,7 +48,8 @@ func NormalizeAndValidateSelectForTarget(
) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) {
defer func() {
if pan := recover(); pan != nil {
retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan)
retErr = changefeedbase.WithTerminalError(
errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan))
}
}()
execCtx.SemaCtx()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func refreshUDT(
}); err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, changefeedbase.MarkRetryableError(err)
return nil, err
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey(
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, family, changefeedbase.MarkRetryableError(err)
return nil, family, err
}
tableDesc = desc.Underlying().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
Expand Down
21 changes: 8 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, ca.sliMetrics)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand All @@ -263,8 +262,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.changedRowBuf = &b.buf
}

ca.sink = &errorWrapperSink{wrapped: ca.sink}

// If the initial scan was disabled the highwater would've already been forwarded
needsInitialScan := ca.frontier.Frontier().IsEmpty()

Expand Down Expand Up @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
err = nil
}
} else {

select {
// If the poller errored first, that's the
// interesting one, so overwrite `err`.
Expand Down Expand Up @@ -921,7 +917,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
cf.MoveToDraining(err)
return
}
Expand All @@ -930,8 +925,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.resolvedBuf = &b.buf
}

cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
Expand Down Expand Up @@ -1000,9 +993,8 @@ func (cf *changeFrontier) close() {
cf.closeMetrics()
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
// Best effort: context is often cancel by now, so we expect to see an error
_ = cf.sink.Close()
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
Expand Down Expand Up @@ -1043,8 +1035,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableError(err)
if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT {
err = changefeedbase.WithTerminalError(errors.Wrapf(err,
"shut down due to schema change and %s=%q",
changefeedbase.OptSchemaChangePolicy,
changefeedbase.OptSchemaChangePolicyStop))
}
}

Expand Down Expand Up @@ -1289,7 +1284,7 @@ func (cf *changeFrontier) checkpointJobProgress(

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
return false, err
}
}

Expand Down
Loading

0 comments on commit 1a93051

Please sign in to comment.