Skip to content

Commit

Permalink
Merge #90810
Browse files Browse the repository at this point in the history
90810: changefeedccl: Rework error handling r=miretskiy a=miretskiy

Prior to this PR, changefeeds would rely on a white list
approach in order to determine which errors were retryable.
All other errors would be deemed terminal, causing changefeed
to fail.

The above approach is brittle, and causes unwanted
changefeed termination.

This PR changes this approach to treat all errors as retryable,
unless otherwise indicated.  Errors that are known by changefeed
to be fatal are handled explicitly, by marking such errors
as terminal.  For example, changefeeds would exit
if the targeted table is dropped.  On the other hand, inability
to read this table for any reason would not be treated as
terminal.

Fixes #90320
Fixes #77549
Fixes #63317
Fixes #71341
Fixes #73016
Informs CRDB-6788
Informs CRDB-7581

Release Note (enterprise change): Changefeed will now treat
all errors, unless otherwise indicated, as retryable errors.


Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Nov 6, 2022
2 parents c9094fa + 86fffa9 commit 4c828ca
Show file tree
Hide file tree
Showing 25 changed files with 240 additions and 295 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"metrics.go",
"name.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"schema_registry.go",
"scram_client.go",
"sink.go",
Expand Down Expand Up @@ -80,7 +81,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down Expand Up @@ -234,6 +234,8 @@ go_test(
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
Expand Down
47 changes: 27 additions & 20 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func(d tree.Datum, _ interface{}) (interface{}, error) {
date := *d.(*tree.DDate)
if !date.IsFinite() {
return nil, errors.Errorf(
`infinite date not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`infinite date not yet supported with avro`))
}
// The avro library requires us to return this as a time.Time.
return date.ToTime()
Expand Down Expand Up @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)
case types.DecimalFamily:
if typ.Precision() == 0 {
return nil, errors.Errorf(
`decimal with no precision not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`decimal with no precision not yet supported with avro`))
}

width := int(typ.Width())
Expand Down Expand Up @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
case types.ArrayFamily:
itemSchema, err := typeToAvroSchema(typ.ArrayContents())
if err != nil {
return nil, errors.Wrapf(err, `could not create item schema for %s`,
typ)
return nil, changefeedbase.WithTerminalError(
errors.Wrapf(err, `could not create item schema for %s`, typ))
}
itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1])

Expand Down Expand Up @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)

default:
return nil, errors.Errorf(`type %s not yet supported with avro`,
typ.SQLString())
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`type %s not yet supported with avro`, typ.SQLString()))
}

return schema, nil
Expand All @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) {
schema, err := typeToAvroSchema(col.Typ)
if err != nil {
return nil, errors.Wrapf(err, "column %s", col.Name)
return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name))
}
schema.Name = SQLNameToAvroName(col.Name)
schema.Metadata = col.SQLStringNotHumanReadable()
Expand Down Expand Up @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error)
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) {
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) {
fieldIdx, ok := r.fieldIdxByName[col.Name]
if !ok {
return errors.AssertionFailedf("could not find avro field for column %s", col.Name)
return changefeedbase.WithTerminalError(
errors.AssertionFailedf("could not find avro field for column %s", col.Name))
}
r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d)
return err
Expand All @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) {
avroDatums, ok := native.(map[string]interface{})
if !ok {
return nil, errors.Errorf(`unknown avro native type: %T`, native)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown avro native type: %T`, native))
}
if len(r.Fields) != len(avroDatums) {
return nil, errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums))
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums)))
}

row := make(rowenc.EncDatumRow, len(r.Fields))
Expand Down Expand Up @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `updated`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
Expand All @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `resolved`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k)
return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k))
}
return r.codec.BinaryFromNative(buf, native)
}
Expand All @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error {
// precision is set) this is roundtripable without information loss.
func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) {
if dec.Form != apd.Finite {
return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`cannot convert %s form decimal`, dec.Form))
}
if scale > 0 && scale != -dec.Exponent {
return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale))
}
var r big.Rat
if dec.Exponent >= 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -73,7 +74,8 @@ func (e *Evaluator) MatchesFilter(
) (_ bool, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating WHERE clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating WHERE clause: %s", pan))
}
}()
if e.where == nil {
Expand All @@ -96,7 +98,8 @@ func (e *Evaluator) Projection(
) (_ cdcevent.Row, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating SELECT clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating SELECT clause: %s", pan))
}
}()
if len(e.selectors) == 0 {
Expand All @@ -114,7 +117,8 @@ func (e *Evaluator) Projection(
func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while validating CHANGEFEED expression: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while validating CHANGEFEED expression: %s", pan))
}
}()
if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -47,7 +48,8 @@ func NormalizeAndValidateSelectForTarget(
) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) {
defer func() {
if pan := recover(); pan != nil {
retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan)
retErr = changefeedbase.WithTerminalError(
errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan))
}
}()
execCtx.SemaCtx()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func refreshUDT(
}); err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, changefeedbase.MarkRetryableError(err)
return nil, err
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey(
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, family, changefeedbase.MarkRetryableError(err)
return nil, family, err
}
tableDesc = desc.Underlying().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
Expand Down
21 changes: 8 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, ca.sliMetrics)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand All @@ -263,8 +262,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.changedRowBuf = &b.buf
}

ca.sink = &errorWrapperSink{wrapped: ca.sink}

// If the initial scan was disabled the highwater would've already been forwarded
needsInitialScan := ca.frontier.Frontier().IsEmpty()

Expand Down Expand Up @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
err = nil
}
} else {

select {
// If the poller errored first, that's the
// interesting one, so overwrite `err`.
Expand Down Expand Up @@ -921,7 +917,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
cf.MoveToDraining(err)
return
}
Expand All @@ -930,8 +925,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.resolvedBuf = &b.buf
}

cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
Expand Down Expand Up @@ -1000,9 +993,8 @@ func (cf *changeFrontier) close() {
cf.closeMetrics()
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
// Best effort: context is often cancel by now, so we expect to see an error
_ = cf.sink.Close()
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
Expand Down Expand Up @@ -1043,8 +1035,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableError(err)
if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT {
err = changefeedbase.WithTerminalError(errors.Wrapf(err,
"shut down due to schema change and %s=%q",
changefeedbase.OptSchemaChangePolicy,
changefeedbase.OptSchemaChangePolicyStop))
}
}

Expand Down Expand Up @@ -1289,7 +1284,7 @@ func (cf *changeFrontier) checkpointJobProgress(

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
return false, err
}
}

Expand Down
Loading

0 comments on commit 4c828ca

Please sign in to comment.