diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 86e691fdce26..6bd602f053d7 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "event_processing.go", "metrics.go", "name.go", + "retry.go", "schema_registry.go", "scram_client.go", "sink.go", @@ -79,7 +80,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/flowinfra", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index e6a81150daeb..eb559a85988e 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/geo" "github.com/cockroachdb/cockroach/pkg/geo/geopb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func(d tree.Datum, _ interface{}) (interface{}, error) { date := *d.(*tree.DDate) if !date.IsFinite() { - return nil, errors.Errorf( - `infinite date not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `infinite date not yet supported with avro`)) } // The avro library requires us to return this as a time.Time. return date.ToTime() @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) case types.DecimalFamily: if typ.Precision() == 0 { - return nil, errors.Errorf( - `decimal with no precision not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `decimal with no precision not yet supported with avro`)) } width := int(typ.Width()) @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { case types.ArrayFamily: itemSchema, err := typeToAvroSchema(typ.ArrayContents()) if err != nil { - return nil, errors.Wrapf(err, `could not create item schema for %s`, - typ) + return nil, changefeedbase.WithTerminalError( + errors.Wrapf(err, `could not create item schema for %s`, typ)) } itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1]) @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) default: - return nil, errors.Errorf(`type %s not yet supported with avro`, - typ.SQLString()) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`type %s not yet supported with avro`, typ.SQLString())) } return schema, nil @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) { schema, err := typeToAvroSchema(col.Typ) if err != nil { - return nil, errors.Wrapf(err, "column %s", col.Name) + return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name)) } schema.Name = SQLNameToAvroName(col.Name) schema.Metadata = col.SQLStringNotHumanReadable() @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error) return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) { return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) { fieldIdx, ok := r.fieldIdxByName[col.Name] if !ok { - return errors.AssertionFailedf("could not find avro field for column %s", col.Name) + return changefeedbase.WithTerminalError( + errors.AssertionFailedf("could not find avro field for column %s", col.Name)) } r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d) return err @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) { avroDatums, ok := native.(map[string]interface{}) if !ok { - return nil, errors.Errorf(`unknown avro native type: %T`, native) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown avro native type: %T`, native)) } if len(r.Fields) != len(avroDatums) { - return nil, errors.Errorf( - `expected row with %d columns got %d`, len(r.Fields), len(avroDatums)) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `expected row with %d columns got %d`, len(r.Fields), len(avroDatums))) } row := make(rowenc.EncDatumRow, len(r.Fields)) @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `updated`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `resolved`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } for k := range meta { - return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k) + return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k)) } return r.codec.BinaryFromNative(buf, native) } @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error { // precision is set) this is roundtripable without information loss. func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) { if dec.Form != apd.Finite { - return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`cannot convert %s form decimal`, dec.Form)) } if scale > 0 && scale != -dec.Exponent { - return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)) } var r big.Rat if dec.Exponent >= 0 { diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel index fc42be8203a6..af9c2910fd9c 100644 --- a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -16,6 +16,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index d1c2be833bad..03362d8ca9da 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -13,6 +13,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -71,7 +72,8 @@ func (e *Evaluator) MatchesFilter( ) (_ bool, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating WHERE clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating WHERE clause: %s", pan)) } }() if e.where == nil { @@ -94,7 +96,8 @@ func (e *Evaluator) Projection( ) (_ cdcevent.Row, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating SELECT clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating SELECT clause: %s", pan)) } }() if len(e.selectors) == 0 { @@ -112,7 +115,8 @@ func (e *Evaluator) Projection( func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while validating CHANGEFEED expression: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while validating CHANGEFEED expression: %s", pan)) } }() if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive. diff --git a/pkg/ccl/changefeedccl/cdceval/validation.go b/pkg/ccl/changefeedccl/cdceval/validation.go index 3419ae9833eb..b423c33f8a94 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation.go +++ b/pkg/ccl/changefeedccl/cdceval/validation.go @@ -12,6 +12,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -46,7 +47,8 @@ func NormalizeAndValidateSelectForTarget( ) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) { defer func() { if pan := recover(); pan != nil { - retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan) + retErr = changefeedbase.WithTerminalError( + errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan)) } }() execCtx.SemaCtx() diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 7d29b5af9c91..a031a8f170f0 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -110,7 +110,7 @@ func refreshUDT( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, changefeedbase.MarkRetryableError(err) + return nil, err } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, family, changefeedbase.MarkRetryableError(err) + return nil, family, err } tableDesc = desc.Underlying().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index c2d327162c8b..f4f0435f0145 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -249,7 +249,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { - err = changefeedbase.MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -262,8 +261,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.changedRowBuf = &b.buf } - ca.sink = &errorWrapperSink{wrapped: ca.sink} - // If the initial scan was disabled the highwater would've already been forwarded needsInitialScan := ca.frontier.Frontier().IsEmpty() @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet err = nil } } else { - select { // If the poller errored first, that's the // interesting one, so overwrite `err`. @@ -920,7 +916,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { - err = changefeedbase.MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -929,8 +924,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.resolvedBuf = &b.buf } - cf.sink = &errorWrapperSink{wrapped: cf.sink} - cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) @@ -999,9 +992,8 @@ func (cf *changeFrontier) close() { cf.closeMetrics() } if cf.sink != nil { - if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) - } + // Best effort: context is often cancel by now, so we expect to see an error + _ = cf.sink.Close() } cf.memAcc.Close(cf.Ctx()) cf.MemMonitor.Stop(cf.Ctx()) @@ -1042,8 +1034,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // Detect whether this boundary should be used to kill or restart the // changefeed. - if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { - err = changefeedbase.MarkRetryableError(err) + if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT { + err = changefeedbase.WithTerminalError(errors.Wrapf(err, + "shut down due to schema change and %s=%q", + changefeedbase.OptSchemaChangePolicy, + changefeedbase.OptSchemaChangePolicyStop)) } } @@ -1288,7 +1283,7 @@ func (cf *changeFrontier) checkpointJobProgress( if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { - return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError")) + return false, err } } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index ab46c9a7d65f..69d4a487444d 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -51,7 +50,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -59,12 +57,6 @@ import ( "github.com/cockroachdb/redact" ) -var changefeedRetryOptions = retry.Options{ - InitialBackoff: 5 * time.Millisecond, - Multiplier: 2, - MaxBackoff: 10 * time.Second, -} - // featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature. var featureChangefeedEnabled = settings.RegisterBoolSetting( settings.TenantWritable, @@ -214,7 +206,6 @@ func changefeedPlanHook( } if details.SinkURI == `` { - p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{ progress: progress, } @@ -232,7 +223,7 @@ func changefeedPlanHook( logChangefeedCreateTelemetry(ctx, jr) var err error - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { if err = distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh); err == nil { return nil } @@ -243,15 +234,18 @@ func changefeedPlanHook( } } - if !changefeedbase.IsRetryableError(err) { - log.Warningf(ctx, `CHANGEFEED returning with error: %+v`, err) - return err + if err = changefeedbase.AsTerminalError(ctx, p.ExecCfg().LeaseManager, err); err != nil { + break } + // All other errors retry. progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress } + // TODO(yevgeniy): This seems wrong -- core changefeeds always terminate + // with an error. Perhaps rename this telemetry to indicate number of + // completed feeds. telemetry.Count(`changefeed.core.error`) - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } // The below block creates the job and protects the data required for the @@ -790,7 +784,7 @@ func validateSink( canarySink, err := getSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), jobID, sli) if err != nil { - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } if err := canarySink.Close(); err != nil { return err @@ -1010,18 +1004,18 @@ func (b *changefeedResumer) resumeWithRetries( // bubbles up to this level, we'd like to "retry" the flow if possible. This // could be because the sink is down or because a cockroach node has crashed // or for many other reasons. - var err error var lastRunStatusUpdate time.Time - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh := make(chan tree.Datums, 1) - if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { - return nil + err := distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh) + if err == nil { + return nil // Changefeed completed -- e.g. due to initial_scan=only mode. } if knobs, ok := execCfg.DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok { @@ -1030,30 +1024,16 @@ func (b *changefeedResumer) resumeWithRetries( } } - // Retry changefeed if error is retryable. In addition, we want to handle - // context cancellation as retryable, but only if the resumer context has not been cancelled. - // (resumer context is canceled by the jobs framework -- so we should respect it). - isRetryableErr := changefeedbase.IsRetryableError(err) || - (ctx.Err() == nil && errors.Is(err, context.Canceled)) - if !isRetryableErr { - if ctx.Err() != nil { - return ctx.Err() - } - - if flowinfra.IsFlowRetryableError(err) { - // We don't want to retry flowinfra retryable error in the retry loop above. - // This error currently indicates that this node is being drained. As such, - // retries will not help. - // Instead, we want to make sure that the changefeed job is not marked failed - // due to a transient, retryable error. - err = jobs.MarkAsRetryJobError(err) - _ = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable flow error: %s", err) - } - - log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err) + // Terminate changefeed if needed. + if err := changefeedbase.AsTerminalError(ctx, jobExec.ExecCfg().LeaseManager, err); err != nil { + log.Infof(ctx, "CHANGEFEED %d shutting down (cause: %v)", jobID, err) + // Best effort -- update job status to make it clear why changefeed shut down. + // This won't always work if this node is being shutdown/drained. + b.setJobRunningStatus(ctx, time.Time{}, "shutdown due to %s", err) return err } + // All other errors retry. log.Warningf(ctx, `WARNING: CHANGEFEED job %d encountered retryable error: %v`, jobID, err) lastRunStatusUpdate = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable error: %s", err) if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { @@ -1077,7 +1057,7 @@ func (b *changefeedResumer) resumeWithRetries( progress = reloadedJob.Progress() } } - return errors.Wrap(err, `ran out of retries`) + return errors.Wrap(ctx.Err(), `ran out of retries`) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d502b7bace34..86c2c8bf593c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -120,7 +120,6 @@ func TestChangefeedReplanning(t *testing.T) { } testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - ctx := context.Background() numNodes := 3 @@ -1024,7 +1023,7 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { // Check that dropping a watched column still stops the changefeed. sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) { - t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error()) + require.Regexp(t, `expected "schema change occurred at ..." got: %+v`, err) } } @@ -2069,7 +2068,6 @@ func fetchDescVersionModificationTime( RequestHeader: header, MVCCFilter: roachpb.MVCCFilter_All, StartTime: hlc.Timestamp{}, - ReturnSST: true, } clock := hlc.NewClockWithSystemTimeSource(time.Minute /* maxOffset */) hh := roachpb.Header{Timestamp: clock.Now()} @@ -3427,9 +3425,9 @@ func TestChangefeedRetryableError(t *testing.T) { knobs.BeforeEmitRow = func(_ context.Context) error { switch atomic.LoadInt64(&failEmit) { case 1: - return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error")) + return errors.New("synthetic retryable error") case 2: - return fmt.Errorf("synthetic terminal error") + return changefeedbase.WithTerminalError(errors.New("synthetic terminal error")) default: return nil } @@ -3499,63 +3497,12 @@ func TestChangefeedRetryableError(t *testing.T) { cdcTest(t, testFn, feedTestEnterpriseSinks) } -func TestChangefeedJobRetryOnNoInboundStream(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.UnderRace(t) - skip.UnderStress(t) - - cluster, db, cleanup := startTestCluster(t) - defer cleanup() - sqlDB := sqlutils.MakeSQLRunner(db) - - // force fast "no inbound stream" error - var oldMaxRunningFlows int - var oldTimeout string - sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") - sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.max_running_flows").Scan(&oldMaxRunningFlows) - sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.flow_stream_timeout").Scan(&oldTimeout) - serverutils.SetClusterSetting(t, cluster, "sql.distsql.max_running_flows", 0) - serverutils.SetClusterSetting(t, cluster, "sql.distsql.flow_stream_timeout", "1s") - - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) - - // Connect to a non-leaseholder node so that a DistSQL flow is required - var leaseHolder int - sqlDB.QueryRow(t, `SELECT lease_holder FROM [SHOW RANGES FROM TABLE foo] LIMIT 1`).Scan(&leaseHolder) - feedServerID := ((leaseHolder - 1) + 1) % 3 - db = cluster.ServerConn(feedServerID) - sqlDB = sqlutils.MakeSQLRunner(db) - f := makeKafkaFeedFactoryForCluster(cluster, db) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) - defer closeFeed(t, foo) - - // Verify job progress contains retryable error status. - registry := cluster.Server(feedServerID).JobRegistry().(*jobs.Registry) - jobID := foo.(cdctest.EnterpriseTestFeed).JobID() - testutils.SucceedsSoon(t, func() error { - job, err := registry.LoadJob(context.Background(), jobID) - require.NoError(t, err) - if strings.Contains(job.Progress().RunningStatus, "retryable error") { - return nil - } - return errors.Newf("job status was %s", job.Progress().RunningStatus) - }) - - // Fix the error. Job should retry successfully. - sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.max_running_flows=$1", oldMaxRunningFlows) - sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_stream_timeout=$1", oldTimeout) - assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": 1}}`, - }) - -} - func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 91548) + // Set TestingKnobs to return a known session for easier // comparison. testSession := sqllivenesstestutils.NewAlwaysAliveSession("known-test-session") @@ -3703,6 +3650,7 @@ func TestChangefeedDataTTL(t *testing.T) { atomic.StoreInt32(&shouldWait, 0) resume <- struct{}{} dataExpiredRows = <-changefeedInit + require.NotNil(t, dataExpiredRows) // Verify that, at some point, Next() returns a "must // be after replica GC threshold" error. In the common @@ -4464,7 +4412,7 @@ func TestChangefeedPanicRecovery(t *testing.T) { prep(t, sqlDB) // Check that disallowed expressions have a good error message. // Also regression test for https://github.com/cockroachdb/cockroach/issues/90416 - sqlDB.ExpectErr(t, "syntax is unsupported in CREATE CHANGEFEED", + sqlDB.ExpectErr(t, "expression currently unsupported in CREATE CHANGEFEED", `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE EXISTS (SELECT true)`) }) @@ -5279,7 +5227,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) { require.NoError(t, feedJob.Pause()) // Make extra sure that the zombie changefeed can't write any more data. - beforeEmitRowCh <- changefeedbase.MarkRetryableError(errors.New(`nope don't write it`)) + beforeEmitRowCh <- errors.New(`nope don't write it`) // Insert some data that we should only see out of the changefeed after it // re-runs the backfill. @@ -5917,7 +5865,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { knobs.RaiseRetryableError = func() error { emittedCount++ if emittedCount%200 == 0 { - return changefeedbase.MarkRetryableError(errors.New("test transient error")) + return errors.New("test transient error") } return nil } @@ -6025,9 +5973,7 @@ func TestChangefeedOrderingWithErrors(t *testing.T) { if err != nil { return err } - if status != "retryable error: retryable changefeed error: 500 Internal Server Error: " { - return errors.Errorf("expected retryable error: retryable changefeed error: 500 Internal Server Error:, got: %v", status) - } + require.Regexp(t, "500 Internal Server Error", status) return nil }) @@ -6058,7 +6004,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`) @@ -6101,7 +6047,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`) @@ -6121,7 +6067,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR quux`) @@ -6313,6 +6259,12 @@ func TestChangefeedOnlyInitialScan(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO foo (a) SELECT * FROM generate_series(1, 5000);`) + // Most changefeed tests can afford to have a race condition between the initial + // inserts and starting the feed because the output looks the same for an initial + // scan and an insert. For tests with initial_scan=only, though, we can't start the feed + // until it's going to see all the initial inserts in the initial scan. + sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo`, [][]string{{`5000`}}) + feed := feed(t, f, changefeedStmt) sqlDB.Exec(t, "INSERT INTO foo VALUES (5005), (5007), (5009)") @@ -6404,6 +6356,8 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) { sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')") sqlDB.Exec(t, "INSERT INTO bar VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo,bar`, [][]string{{`9`}}) + feed := feed(t, f, testData.changefeedStmt) sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") @@ -6461,6 +6415,8 @@ func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) { sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, name STRING)") sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')") + sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo`, [][]string{{`3`}}) + feed := feed(t, f, changefeedStmt) sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") @@ -7211,7 +7167,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail") + return changefeedbase.WithTerminalError(errors.New("should fail")) } beforeCreate := timeutil.Now() diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index 4bb125e722a1..e55f7cb0ae78 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -13,14 +13,12 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase", visibility = ["//visibility:public"], deps = [ - "//pkg/jobs/joberror", + "//pkg/jobs", "//pkg/jobs/jobspb", - "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", "//pkg/settings", - "//pkg/sql", "//pkg/sql/catalog/descpb", - "//pkg/sql/flowinfra", + "//pkg/sql/catalog/lease", "//pkg/util", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 0c1384cb8bfe..54507c7b0c48 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -9,15 +9,11 @@ package changefeedbase import ( - "fmt" - "reflect" - "strings" + "context" - "github.com/cockroachdb/cockroach/pkg/jobs/joberror" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/errors" ) @@ -75,81 +71,53 @@ func (e *taggedError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *taggedError) Unwrap() error { return e.wrapped } -const retryableErrorString = "retryable changefeed error" +type terminalError struct{} -type retryableError struct { - wrapped error -} - -// MarkRetryableError wraps the given error, marking it as retryable to -// changefeeds. -func MarkRetryableError(e error) error { - return &retryableError{wrapped: e} +func (e *terminalError) Error() string { + return "terminal changefeed error" } -// Error implements the error interface. -func (e *retryableError) Error() string { - return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) +// WithTerminalError decorates underlying error to indicate +// that the error is a terminal changefeed error. +func WithTerminalError(cause error) error { + if cause == nil { + return nil + } + return errors.Mark(cause, &terminalError{}) } -// Cause implements the github.com/pkg/errors.causer interface. -func (e *retryableError) Cause() error { return e.wrapped } - -// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is -// planned to be moved to the stdlib in go 1.13. -func (e *retryableError) Unwrap() error { return e.wrapped } - -// IsRetryableError returns true if the supplied error, or any of its parent -// causes, is a IsRetryableError. -func IsRetryableError(err error) bool { - if err == nil { - return false - } - if errors.HasType(err, (*retryableError)(nil)) { - return true +// AsTerminalError determines if the cause error is a terminal changefeed +// error. Returns non-nil error if changefeed should terminate with the +// returned error. +func AsTerminalError(ctx context.Context, lm *lease.Manager, cause error) (termErr error) { + if cause == nil { + return nil } - // During node shutdown it is possible for all outgoing transports used by - // the kvfeed to expire, producing a SendError that the node is still able - // to propagate to the frontier. This has been known to happen during - // cluster upgrades. This scenario should not fail the changefeed. - if kvcoord.IsSendError(err) { - return true + if err := ctx.Err(); err != nil { + // If context has been cancelled, we must respect that; this happens + // if, e.g. this changefeed is being cancelled. + return err } - // TODO(knz): this is a bad implementation. Make it go away - // by avoiding string comparisons. - - // If a RetryableError occurs on a remote node, DistSQL serializes it such - // that we can't recover the structure and we have to rely on this - // unfortunate string comparison. - errStr := err.Error() - if strings.Contains(errStr, retryableErrorString) || - strings.Contains(errStr, kvcoord.SendErrorString) || - strings.Contains(errStr, "draining") { - return true + if lm.IsDraining() { + // This node is being drained. It's safe to propagate this error (to the + // job registry) since job registry should not be able to commit this error + // to the jobs table; but to be safe, make sure this error is marked as jobs + // retryable error to ensure that some other node retries this changefeed. + return jobs.MarkAsRetryJobError(cause) } - return (joberror.IsDistSQLRetryableError(err) || - flowinfra.IsNoInboundStreamConnectionError(err) || - errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)) || - errors.Is(err, sql.ErrPlanChanged)) -} + // GC TTL errors are always fatal. + if errors.HasType(cause, (*roachpb.BatchTimestampBeforeGCError)(nil)) { + return WithTerminalError(cause) + } -// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the -// RetryableError marker out. This won't do anything if the RetryableError -// itself has been wrapped, but that's okay, we'll just have an uglier string. -func MaybeStripRetryableErrorMarker(err error) error { - // The following is a hack to work around the error cast linter. - // What we're doing here is really not kosher; this function - // has no business in assuming that the retryableError{} wrapper - // has not been wrapped already. We could even expect that - // it gets wrapped in the common case. - // TODO(knz): Remove/replace this. - if reflect.TypeOf(err) == retryableErrorType { - err = errors.UnwrapOnce(err) + // Explicitly marked terminal errors are terminal. + if errors.Is(cause, &terminalError{}) { + return cause } - return err -} -var retryableErrorType = reflect.TypeOf((*retryableError)(nil)) + // All other errors retry. + return nil +} diff --git a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go index e7502ac1b0ff..84583aabb32c 100644 --- a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go +++ b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go @@ -20,6 +20,17 @@ func ValidateTable( targets changefeedbase.Targets, tableDesc catalog.TableDescriptor, canHandle changefeedbase.CanHandle, +) error { + if err := validateTable(targets, tableDesc, canHandle); err != nil { + return changefeedbase.WithTerminalError(err) + } + return nil +} + +func validateTable( + targets changefeedbase.Targets, + tableDesc catalog.TableDescriptor, + canHandle changefeedbase.CanHandle, ) error { // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index eaf680bd63f7..fb669cd23de3 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -410,9 +410,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err := getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, "x509", err) wrongCert, _, err := cdctest.NewCACertBase64Encoded() require.NoError(t, err) @@ -425,9 +423,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err = getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, `contacting confluent schema registry.*: x509`, err) }) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 923fbebba273..e00eef23d33e 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -389,13 +389,15 @@ func startTestFullServer( options.argsFn(&args) } - ctx := context.Background() + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) + ctx := context.Background() cleanup := func() { s.Stopper().Stop(ctx) resetFlushFrequency() + resetRetry() } var err error defer func() { @@ -429,6 +431,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( t, 3 /* numServers */, knobs, @@ -437,6 +440,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB cleanupAndReset := func() { cleanup() resetFlushFrequency() + resetRetry() } var err error @@ -497,9 +501,10 @@ func startTestTenant( tenantRunner.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...) waitForTenantPodsActive(t, tenantServer, 1) - + resetRetry := testingUseFastRetry() return tenantID, tenantServer, tenantDB, func() { tenantServer.Stopper().Stop(context.Background()) + resetRetry() } } diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 35052ab74d33..a0710fb93de8 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -8,7 +8,6 @@ go_library( "blocking_buffer.go", "chan_buffer.go", "chunked_event_queue.go", - "err_buffer.go", "event.go", "metrics.go", "throttling_buffer.go", diff --git a/pkg/ccl/changefeedccl/kvevent/err_buffer.go b/pkg/ccl/changefeedccl/kvevent/err_buffer.go deleted file mode 100644 index 06953d63111e..000000000000 --- a/pkg/ccl/changefeedccl/kvevent/err_buffer.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package kvevent - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" -) - -type errorWrapperEventBuffer struct { - Buffer -} - -// NewErrorWrapperEventBuffer returns kvevent Buffer which treats any errors -// as retryable. -func NewErrorWrapperEventBuffer(b Buffer) Buffer { - return &errorWrapperEventBuffer{b} -} - -// Add implements Writer interface. -func (e errorWrapperEventBuffer) Add(ctx context.Context, event Event) error { - if err := e.Buffer.Add(ctx, event); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -var _ Buffer = (*errorWrapperEventBuffer)(nil) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index f4bcf9175153..412c150231fb 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -98,8 +98,7 @@ func Run(ctx context.Context, cfg Config) error { } bf := func() kvevent.Buffer { - return kvevent.NewErrorWrapperEventBuffer( - kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics)) + return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } f := newKVFeed( diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cd625b0f60fd..6ee871291459 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -529,10 +530,11 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { if scope != defaultSLIScope { if !enableSLIMetrics { - return nil, errors.WithHint( - pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), - "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", - ) + return nil, changefeedbase.WithTerminalError( + errors.WithHint( + pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), + "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", + )) } const failSafeMax = 1024 if len(a.mu.sliMetrics) == failSafeMax { diff --git a/pkg/ccl/changefeedccl/retry.go b/pkg/ccl/changefeedccl/retry.go new file mode 100644 index 000000000000..7537fa19c723 --- /dev/null +++ b/pkg/ccl/changefeedccl/retry.go @@ -0,0 +1,71 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +var useFastRetry = false + +// getRetry returns retry object for changefeed. +func getRetry(ctx context.Context) Retry { + opts := retry.Options{ + InitialBackoff: 5 * time.Second, + Multiplier: 2, + MaxBackoff: 10 * time.Minute, + } + + if useFastRetry { + opts = retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 250 * time.Minute, + } + } + + return Retry{Retry: retry.StartWithCtx(ctx, opts)} +} + +func testingUseFastRetry() func() { + useFastRetry = true + return func() { + useFastRetry = false + } +} + +// reset retry state after changefeed ran for that much time +// without errors. +const resetRetryAfter = 10 * time.Minute + +// Retry is a thin wrapper around retry.Retry which +// resets retry state if changefeed been running for sufficiently +// long time. +type Retry struct { + retry.Retry + lastRetry time.Time +} + +// Next returns whether the retry loop should continue, and blocks for the +// appropriate length of time before yielding back to the caller. +// If the last call to Next() happened long time ago, the amount of time +// to wait gets reset. +func (r *Retry) Next() bool { + defer func() { + r.lastRetry = timeutil.Now() + }() + if timeutil.Since(r.lastRetry) > resetRetryAfter { + r.Reset() + } + return r.Retry.Next() +} diff --git a/pkg/ccl/changefeedccl/schema_registry.go b/pkg/ccl/changefeedccl/schema_registry.go index be35c75d7539..bb230dd8c4d3 100644 --- a/pkg/ccl/changefeedccl/schema_registry.go +++ b/pkg/ccl/changefeedccl/schema_registry.go @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err } log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error()) } - return changefeedbase.MarkRetryableError(err) + return err } func gracefulClose(ctx context.Context, toClose io.ReadCloser) { diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 29a8078adabe..664f51a017b3 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -559,7 +559,7 @@ func (tf *schemaFeed) validateDescriptor( shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets) log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter) if err != nil { - return err + return changefeedbase.WithTerminalError(err) } if !shouldFilter { // Only sort the tail of the events from earliestTsBeingIngested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 6b9d367d8a91..804bdf1218b5 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -298,59 +298,6 @@ func (u *sinkURL) String() string { return u.URL.String() } -// errorWrapperSink delegates to another sink and marks all returned errors as -// retryable. During changefeed setup, we use the sink once without this to -// verify configuration, but in the steady state, no sink error should be -// terminal. -type errorWrapperSink struct { - wrapped externalResource -} - -// EmitRow implements Sink interface. -func (s errorWrapperSink) EmitRow( - ctx context.Context, - topic TopicDescriptor, - key, value []byte, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// EmitResolvedTimestamp implements Sink interface. -func (s errorWrapperSink) EmitResolvedTimestamp( - ctx context.Context, encoder Encoder, resolved hlc.Timestamp, -) error { - if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Flush implements Sink interface. -func (s errorWrapperSink) Flush(ctx context.Context) error { - if err := s.wrapped.(EventSink).Flush(ctx); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Close implements Sink interface. -func (s errorWrapperSink) Close() error { - if err := s.wrapped.Close(); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Dial implements Sink interface. -func (s errorWrapperSink) Dial() error { - return s.wrapped.Dial() -} - // encDatumRowBuffer is a FIFO of `EncDatumRow`s. // // TODO(dan): There's some potential allocation savings here by reusing the same diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 85b8ce91ede7..199734be6bf5 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -171,8 +171,7 @@ func TestWebhookSink(t *testing.T) { // now sink's client accepts no custom certs, should reject the server's cert and fail require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) - require.EqualError(t, sinkSrcNoCert.Flush(context.Background()), - fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) + require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), `context canceled`) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 35f7cff7b4ef..3cd486c3ae23 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -505,7 +505,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro lt := logtags.FromContext(ctx) ctx, cancel := m.stopper.WithCancelOnQuiesce(logtags.AddTags(m.ambientCtx.AnnotateCtx(context.Background()), lt)) defer cancel() - if m.isDraining() { + if m.IsDraining() { return nil, errors.New("cannot acquire lease when draining") } newest := m.findNewest(id) @@ -548,7 +548,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // releaseLease from store. func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { - if m.isDraining() { + if m.IsDraining() { // Release synchronously to guarantee release before exiting. m.storage.release(ctx, m.stopper, lease) return @@ -1014,10 +1014,11 @@ func (m *Manager) Acquire( func (m *Manager) removeOnceDereferenced() bool { return m.storage.testingKnobs.RemoveOnceDereferenced || // Release from the store if the Manager is draining. - m.isDraining() + m.IsDraining() } -func (m *Manager) isDraining() bool { +// IsDraining returns true if this node's lease manager is draining. +func (m *Manager) IsDraining() bool { return m.draining.Load().(bool) } diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index dcbeae391771..60e5440a3e33 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1195,6 +1195,7 @@ func TestLint(t *testing.T) { "*.go", ":!*.pb.go", ":!*.pb.gw.go", + ":!ccl/changefeedccl/changefeedbase/errors.go", ":!kv/kvclient/kvcoord/lock_spans_over_budget_error.go", ":!spanconfig/errors.go", ":!roachpb/replica_unavailable_error.go",