diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 7b9b39f0cf38..bc46b8c98e56 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -720,6 +720,8 @@ type changeFrontier struct { // metricsID is used as the unique id of this changefeed in the // metrics.MaxBehindNanos map. metricsID int + + knobs TestingKnobs } const ( @@ -729,7 +731,11 @@ const ( // jobState encapsulates changefeed job state. type jobState struct { - job *jobs.Job + // job is set for changefeeds other than core/sinkless changefeeds. + job *jobs.Job + // coreProgress is set for only core/sinkless changefeeds. + coreProgress *coreChangefeedProgress + settings *cluster.Settings metrics *Metrics ts timeutil.TimeSource @@ -744,11 +750,36 @@ type jobState struct { progressUpdatesSkipped bool } +type coreChangefeedProgress struct { + progress jobspb.Progress +} + +// SetHighwater implements the eval.ChangefeedState interface. +func (cp *coreChangefeedProgress) SetHighwater(frontier *hlc.Timestamp) { + cp.progress.Progress = &jobspb.Progress_HighWater{ + HighWater: frontier, + } +} + +// SetCheckpoint implements the eval.ChangefeedState interface. +func (cp *coreChangefeedProgress) SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) { + changefeedProgress := cp.progress.Details.(*jobspb.Progress_Changefeed).Changefeed + changefeedProgress.Checkpoint = &jobspb.ChangefeedProgress_Checkpoint{ + Spans: spans, + Timestamp: timestamp, + } +} + func newJobState( - j *jobs.Job, st *cluster.Settings, metrics *Metrics, ts timeutil.TimeSource, + j *jobs.Job, + coreProgress *coreChangefeedProgress, + st *cluster.Settings, + metrics *Metrics, + ts timeutil.TimeSource, ) *jobState { return &jobState{ job: j, + coreProgress: coreProgress, settings: st, metrics: metrics, ts: ts, @@ -830,6 +861,7 @@ func newChangeFrontierProcessor( if err != nil { return nil, err } + cf := &changeFrontier{ flowCtx: flowCtx, spec: spec, @@ -838,6 +870,11 @@ func newChangeFrontierProcessor( frontier: sf, slowLogEveryN: log.Every(slowSpanMaxFrequency), } + + if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { + cf.knobs = *cfKnobs + } + if err := cf.Init( cf, post, @@ -937,7 +974,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.MoveToDraining(err) return } - cf.js = newJobState(job, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) + cf.js = newJobState(job, nil, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) if changefeedbase.FrontierCheckpointFrequency.Get(&cf.flowCtx.Cfg.Settings.SV) == 0 { log.Warning(ctx, @@ -968,6 +1005,10 @@ func (cf *changeFrontier) Start(ctx context.Context) { // running status around for a while before we override it. cf.js.lastRunStatusUpdate = timeutil.Now() } + } else { + cf.js = newJobState(nil, + cf.EvalCtx.ChangefeedState.(*coreChangefeedProgress), + cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) } cf.metrics.mu.Lock() @@ -1038,7 +1079,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // Detect whether this boundary should be used to kill or restart the // changefeed. if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { - err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime) + err = changefeedbase.MarkRetryableError(err) } } @@ -1139,22 +1180,16 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { // If frontier changed, we emit resolved timestamp. emitResolved := frontierChanged - // Checkpoint job record progress if needed. - // NB: Sinkless changefeeds will not have a job state (js). In fact, they - // have no distributed state whatsoever. Because of this they also do not - // use protected timestamps. - if cf.js != nil { - checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) - if err != nil { - return err - } - - // Emit resolved timestamp only if we have checkpointed the job. - // Usually, this happens every time frontier changes, but we can skip some updates - // if we update frontier too rapidly. - emitResolved = checkpointed + checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) + if err != nil { + return err } + // Emit resolved timestamp only if we have checkpointed the job. + // Usually, this happens every time frontier changes, but we can skip some updates + // if we update frontier too rapidly. + emitResolved = checkpointed + if emitResolved { // Keeping this after the checkpointJobProgress call will avoid // some duplicates if a restart happens. @@ -1239,53 +1274,59 @@ func (cf *changeFrontier) checkpointJobProgress( } cf.metrics.FrontierUpdates.Inc(1) var updateSkipped error - if err := cf.js.job.Update(cf.Ctx, nil, func( - txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, - ) error { - // If we're unable to update the job due to the job state, such as during - // pause-requested, simply skip the checkpoint - if err := md.CheckRunningOrReverting(); err != nil { - updateSkipped = err - return nil - } + if cf.js.job != nil { + + if err := cf.js.job.Update(cf.Ctx, nil, func( + txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + // If we're unable to update the job due to the job state, such as during + // pause-requested, simply skip the checkpoint + if err := md.CheckRunningOrReverting(); err != nil { + updateSkipped = err + return nil + } - // Advance resolved timestamp. - progress := md.Progress - progress.Progress = &jobspb.Progress_HighWater{ - HighWater: &frontier, - } + // Advance resolved timestamp. + progress := md.Progress + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &frontier, + } - changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - changefeedProgress.Checkpoint = &checkpoint + changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed + changefeedProgress.Checkpoint = &checkpoint - timestampManager := cf.manageProtectedTimestamps - // TODO(samiskin): Remove this conditional and the associated deprecated - // methods once we're confident in ActiveProtectedTimestampsEnabled - if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { - timestampManager = cf.deprecatedManageProtectedTimestamps - } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) - return err - } + timestampManager := cf.manageProtectedTimestamps + // TODO(samiskin): Remove this conditional and the associated deprecated + // methods once we're confident in ActiveProtectedTimestampsEnabled + if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { + timestampManager = cf.deprecatedManageProtectedTimestamps + } + if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + return err + } - if updateRunStatus { - md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier) - } + if updateRunStatus { + md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier) + } - ju.UpdateProgress(progress) + ju.UpdateProgress(progress) - // Reset RunStats.NumRuns to 1 since the changefeed is - // now running. By resetting the NumRuns, we avoid - // future job system level retries from having large - // backoffs because of past failures. - if md.RunStats != nil { - ju.UpdateRunStats(1, md.RunStats.LastRun) - } + // Reset RunStats.NumRuns to 1 since the changefeed is + // now running. By resetting the NumRuns, we avoid + // future job system level retries from having large + // backoffs because of past failures. + if md.RunStats != nil { + ju.UpdateRunStats(1, md.RunStats.LastRun) + } - return nil - }); err != nil { - return false, err + return nil + }); err != nil { + return false, err + } + } else { + cf.js.coreProgress.SetHighwater(&frontier) + cf.js.coreProgress.SetCheckpoint(checkpoint.Spans, checkpoint.Timestamp) } if updateSkipped != nil { @@ -1293,6 +1334,12 @@ func (cf *changeFrontier) checkpointJobProgress( return false, nil } + if cf.knobs.RaiseRetryableError != nil { + if err := cf.knobs.RaiseRetryableError(); err != nil { + return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError")) + } + } + return true, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index c536f0c7023f..44dff1e4a3fe 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -191,6 +191,11 @@ func changefeedPlanHook( } if details.SinkURI == `` { + + p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{ + progress: progress, + } + // If this is a sinkless changefeed, then we should not hold on to the // descriptor leases accessed to plan the changefeed. If changes happen // to descriptors, they will be addressed during the execution. @@ -220,18 +225,7 @@ func changefeedPlanHook( return err } - // Check for a schemachange boundary timestamp returned via a - // retryable error. Retrying without updating the changefeed progress - // will result in the changefeed performing the schema change again, - // causing an infinite loop. - if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok { - progress = jobspb.Progress{ - Progress: &jobspb.Progress_HighWater{HighWater: &ts}, - Details: &jobspb.Progress_Changefeed{ - Changefeed: &jobspb.ChangefeedProgress{}, - }, - } - } + progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress } telemetry.Count(`changefeed.core.error`) return changefeedbase.MaybeStripRetryableErrorMarker(err) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 48c1ee88fd60..c8d636e167b3 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5752,6 +5752,64 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { } } +// TestCoreChangefeedBackfillScanCheckpoint tests that a core changefeed +// successfully completes the initial scan of a table when transient errors occur. +// This test only succeeds if checkpoints are taken. +func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) + skip.UnderShort(t) + + rnd, _ := randutil.NewPseudoRand() + + rowCount := 10000 + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo(a INT PRIMARY KEY)`) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo (a) SELECT * FROM generate_series(%d, %d)`, 0, rowCount)) + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + + // Ensure Scan Requests are always small enough that we receive multiple + // resolved events during a backfill. Also ensure that checkpoint frequency + // and size are large enough to induce several checkpoints when + // writing `rowCount` rows. + knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error { + b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25) + return nil + } + changefeedbase.FrontierCheckpointFrequency.Override( + context.Background(), &s.Server.ClusterSettings().SV, 1) + changefeedbase.FrontierCheckpointMaxBytes.Override( + context.Background(), &s.Server.ClusterSettings().SV, 100<<20) + + emittedCount := 0 + knobs.RaiseRetryableError = func() error { + emittedCount++ + if emittedCount%200 == 0 { + return changefeedbase.MarkRetryableError(errors.New("test transient error")) + } + return nil + } + + foo := feed(t, f, `CREATE CHANGEFEED FOR TABLE foo`) + defer closeFeed(t, foo) + + payloads := make([]string, rowCount+1) + for i := 0; i < rowCount+1; i++ { + payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i) + } + assertPayloads(t, foo, payloads) + } + + cdcTest(t, testFn, feedTestForceSink("sinkless")) +} + func TestCheckpointFrequency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -5765,6 +5823,7 @@ func TestCheckpointFrequency(t *testing.T) { ts := timeutil.NewManualTime(timeutil.Now()) js := newJobState( nil, /* job */ + nil, /* core progress */ cluster.MakeTestingClusterSettings(), MakeMetrics(time.Second).(*Metrics), ts, ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index f34ecdaffd42..64d92e46876e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/sql", "//pkg/sql/catalog/descpb", "//pkg/sql/flowinfra", - "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 5dd16ca3debe..a10415b4ad4c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -78,14 +77,7 @@ func (e *taggedError) Unwrap() error { return e.wrapped } const retryableErrorString = "retryable changefeed error" type retryableError struct { - // A schema change may result in a changefeed returning retryableError, - // which can signal the changefeed to restart. - // boundaryTimestamp can be returned inside this error so - // the changefeed knows where to restart from. Note this is - // only useful for sinkless/core changefeeds because they do not have - // the ability to read/write their state to jobs tables during restarts. - boundaryTimestamp hlc.Timestamp - wrapped error + wrapped error } // MarkRetryableError wraps the given error, marking it as retryable to @@ -94,12 +86,6 @@ func MarkRetryableError(e error) error { return &retryableError{wrapped: e} } -// MarkRetryableErrorWithTimestamp wraps the given error, marks it as -// retryable, and attaches a timestamp to the error. -func MarkRetryableErrorWithTimestamp(e error, ts hlc.Timestamp) error { - return &retryableError{boundaryTimestamp: ts, wrapped: e} -} - // Error implements the error interface. func (e *retryableError) Error() string { return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) @@ -139,17 +125,6 @@ func IsRetryableError(err error) bool { errors.Is(err, sql.ErrPlanChanged)) } -// MaybeGetRetryableErrorTimestamp will get the timestamp of an error if -// the error is a retryableError and the timestamp field is populated. -func MaybeGetRetryableErrorTimestamp(err error) (timestamp hlc.Timestamp, ok bool) { - if retryableErr := (*retryableError)(nil); errors.As(err, &retryableErr) { - if !retryableErr.boundaryTimestamp.IsEmpty() { - return retryableErr.boundaryTimestamp, true - } - } - return hlc.Timestamp{}, false -} - // MaybeStripRetryableErrorMarker performs some minimal attempt to clean the // RetryableError marker out. This won't do anything if the RetryableError // itself has been wrapped, but that's okay, we'll just have an uglier string. diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index cafb5c04986a..f7a993da2340 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -45,6 +45,8 @@ type TestingKnobs struct { OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec) // ShouldReplan is used to see if a replan for a changefeed should be triggered ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool + // RaiseRetryableError is a knob used to possibly return an error. + RaiseRetryableError func() error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 8d92cf6973c7..3c4c019f128e 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -230,6 +230,9 @@ type Context struct { // RangeStatsFetcher is used to fetch RangeStats. RangeStatsFetcher RangeStatsFetcher + + // ChangefeedState stores the state (progress) of core changefeeds. + ChangefeedState ChangefeedState } // DescIDGenerator generates unique descriptor IDs. diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index e2e15a3a22ff..f42fd291303a 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -502,6 +503,17 @@ type SequenceOperators interface { SetSequenceValueByID(ctx context.Context, seqID uint32, newVal int64, isCalled bool) error } +// ChangefeedState is used to track progress and checkpointing for sinkless/core changefeeds. +// Because a CREATE CHANGEFEED statement for a sinkless changefeed will hang and return data +// over the SQL connection, this state belongs in the EvalCtx. +type ChangefeedState interface { + // SetHighwater sets the frontier timestamp for the changefeed. + SetHighwater(frontier *hlc.Timestamp) + + // SetCheckpoint sets the checkpoint for the changefeed. + SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) +} + // TenantOperator is capable of interacting with tenant state, allowing SQL // builtin functions to create, configure, and destroy tenants. The methods will // return errors when run by any tenant other than the system tenant.