Skip to content

Commit

Permalink
changefeedccl: make core changefeeds more resilient
Browse files Browse the repository at this point in the history
This change updates core changefeeds to save checkpoints
inside the EvalCtx. With this change, core changefeeds
can retry from the last checkpoint instead of restarting
from the beginning.

Fixes: #84511

Release note (general change): This change updates core/experimental
changefeeds to be more resilient to transient errors
(ex. network blips) by adding checkpointing.

Previously, transient errors would result in a core changefeed
stopping and terminating the underlying SQL statement. This
would require the SQL statement to be restarted by a user.
Furtheremore, if the core changefeed were restarted during an
inital scan, the initial scan would start from the beginning.
For large initial scans, transient errors are more likely,
so restarting from the beginning would likely see more transient
errors and restarts which would not progress the changefeed.

Now, an experimental changefeed will automatically take
frequent checkpoints and retry from the last checkpoint
when a transient errors occurs.

Release justification: This change updates an experimental
feature.
  • Loading branch information
jayshrivastava committed Aug 18, 2022
1 parent 04c6a1a commit 592ce26
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 97 deletions.
163 changes: 105 additions & 58 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ type changeFrontier struct {
// metricsID is used as the unique id of this changefeed in the
// metrics.MaxBehindNanos map.
metricsID int

knobs TestingKnobs
}

const (
Expand All @@ -729,7 +731,11 @@ const (

// jobState encapsulates changefeed job state.
type jobState struct {
job *jobs.Job
// job is set for changefeeds other than core/sinkless changefeeds.
job *jobs.Job
// coreProgress is set for only core/sinkless changefeeds.
coreProgress *coreChangefeedProgress

settings *cluster.Settings
metrics *Metrics
ts timeutil.TimeSource
Expand All @@ -744,11 +750,36 @@ type jobState struct {
progressUpdatesSkipped bool
}

type coreChangefeedProgress struct {
progress jobspb.Progress
}

// SetHighwater implements the eval.ChangefeedState interface.
func (cp *coreChangefeedProgress) SetHighwater(frontier *hlc.Timestamp) {
cp.progress.Progress = &jobspb.Progress_HighWater{
HighWater: frontier,
}
}

// SetCheckpoint implements the eval.ChangefeedState interface.
func (cp *coreChangefeedProgress) SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) {
changefeedProgress := cp.progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &jobspb.ChangefeedProgress_Checkpoint{
Spans: spans,
Timestamp: timestamp,
}
}

func newJobState(
j *jobs.Job, st *cluster.Settings, metrics *Metrics, ts timeutil.TimeSource,
j *jobs.Job,
coreProgress *coreChangefeedProgress,
st *cluster.Settings,
metrics *Metrics,
ts timeutil.TimeSource,
) *jobState {
return &jobState{
job: j,
coreProgress: coreProgress,
settings: st,
metrics: metrics,
ts: ts,
Expand Down Expand Up @@ -830,6 +861,7 @@ func newChangeFrontierProcessor(
if err != nil {
return nil, err
}

cf := &changeFrontier{
flowCtx: flowCtx,
spec: spec,
Expand All @@ -838,6 +870,11 @@ func newChangeFrontierProcessor(
frontier: sf,
slowLogEveryN: log.Every(slowSpanMaxFrequency),
}

if cfKnobs, ok := flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
cf.knobs = *cfKnobs
}

if err := cf.Init(
cf,
post,
Expand Down Expand Up @@ -937,7 +974,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.MoveToDraining(err)
return
}
cf.js = newJobState(job, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})
cf.js = newJobState(job, nil, cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})

if changefeedbase.FrontierCheckpointFrequency.Get(&cf.flowCtx.Cfg.Settings.SV) == 0 {
log.Warning(ctx,
Expand Down Expand Up @@ -968,6 +1005,10 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// running status around for a while before we override it.
cf.js.lastRunStatusUpdate = timeutil.Now()
}
} else {
cf.js = newJobState(nil,
cf.EvalCtx.ChangefeedState.(*coreChangefeedProgress),
cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})
}

cf.metrics.mu.Lock()
Expand Down Expand Up @@ -1038,7 +1079,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime)
err = changefeedbase.MarkRetryableError(err)
}
}

Expand Down Expand Up @@ -1139,22 +1180,16 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged

// Checkpoint job record progress if needed.
// NB: Sinkless changefeeds will not have a job state (js). In fact, they
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}

// Emit resolved timestamp only if we have checkpointed the job.
// Usually, this happens every time frontier changes, but we can skip some updates
// if we update frontier too rapidly.
emitResolved = checkpointed
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}

// Emit resolved timestamp only if we have checkpointed the job.
// Usually, this happens every time frontier changes, but we can skip some updates
// if we update frontier too rapidly.
emitResolved = checkpointed

if emitResolved {
// Keeping this after the checkpointJobProgress call will avoid
// some duplicates if a restart happens.
Expand Down Expand Up @@ -1239,60 +1274,72 @@ func (cf *changeFrontier) checkpointJobProgress(
}
cf.metrics.FrontierUpdates.Inc(1)
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
updateSkipped = err
return nil
}
if cf.js.job != nil {

if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
updateSkipped = err
return nil
}

// Advance resolved timestamp.
progress := md.Progress
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &frontier,
}
// Advance resolved timestamp.
progress := md.Progress
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &frontier,
}

changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint

timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
return err
}
timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
return err
}

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}

ju.UpdateProgress(progress)
ju.UpdateProgress(progress)

// Reset RunStats.NumRuns to 1 since the changefeed is
// now running. By resetting the NumRuns, we avoid
// future job system level retries from having large
// backoffs because of past failures.
if md.RunStats != nil {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}
// Reset RunStats.NumRuns to 1 since the changefeed is
// now running. By resetting the NumRuns, we avoid
// future job system level retries from having large
// backoffs because of past failures.
if md.RunStats != nil {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

return nil
}); err != nil {
return false, err
return nil
}); err != nil {
return false, err
}
} else {
cf.js.coreProgress.SetHighwater(&frontier)
cf.js.coreProgress.SetCheckpoint(checkpoint.Spans, checkpoint.Timestamp)
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
}
}

return true, nil
}

Expand Down
18 changes: 6 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func changefeedPlanHook(
}

if details.SinkURI == `` {

p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{
progress: progress,
}

// If this is a sinkless changefeed, then we should not hold on to the
// descriptor leases accessed to plan the changefeed. If changes happen
// to descriptors, they will be addressed during the execution.
Expand Down Expand Up @@ -220,18 +225,7 @@ func changefeedPlanHook(
return err
}

// Check for a schemachange boundary timestamp returned via a
// retryable error. Retrying without updating the changefeed progress
// will result in the changefeed performing the schema change again,
// causing an infinite loop.
if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok {
progress = jobspb.Progress{
Progress: &jobspb.Progress_HighWater{HighWater: &ts},
Details: &jobspb.Progress_Changefeed{
Changefeed: &jobspb.ChangefeedProgress{},
},
}
}
progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress
}
telemetry.Count(`changefeed.core.error`)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5752,6 +5752,64 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
}
}

// TestCoreChangefeedBackfillScanCheckpoint tests that a core changefeed
// successfully completes the initial scan of a table when transient errors occur.
// This test only succeeds if checkpoints are taken.
func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t)
skip.UnderShort(t)

rnd, _ := randutil.NewPseudoRand()

rowCount := 10000

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo(a INT PRIMARY KEY)`)
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo (a) SELECT * FROM generate_series(%d, %d)`, 0, rowCount))

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

// Ensure Scan Requests are always small enough that we receive multiple
// resolved events during a backfill. Also ensure that checkpoint frequency
// and size are large enough to induce several checkpoints when
// writing `rowCount` rows.
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(25)
return nil
}
changefeedbase.FrontierCheckpointFrequency.Override(
context.Background(), &s.Server.ClusterSettings().SV, 1)
changefeedbase.FrontierCheckpointMaxBytes.Override(
context.Background(), &s.Server.ClusterSettings().SV, 100<<20)

emittedCount := 0
knobs.RaiseRetryableError = func() error {
emittedCount++
if emittedCount%200 == 0 {
return changefeedbase.MarkRetryableError(errors.New("test transient error"))
}
return nil
}

foo := feed(t, f, `CREATE CHANGEFEED FOR TABLE foo`)
defer closeFeed(t, foo)

payloads := make([]string, rowCount+1)
for i := 0; i < rowCount+1; i++ {
payloads[i] = fmt.Sprintf(`foo: [%d]->{"after": {"a": %d}}`, i, i)
}
assertPayloads(t, foo, payloads)
}

cdcTest(t, testFn, feedTestForceSink("sinkless"))
}

func TestCheckpointFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -5765,6 +5823,7 @@ func TestCheckpointFrequency(t *testing.T) {
ts := timeutil.NewManualTime(timeutil.Now())
js := newJobState(
nil, /* job */
nil, /* core progress */
cluster.MakeTestingClusterSettings(),
MakeMetrics(time.Second).(*Metrics), ts,
)
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
Loading

0 comments on commit 592ce26

Please sign in to comment.