Skip to content

Commit

Permalink
changefeedcc: De-flake changefeed tests.
Browse files Browse the repository at this point in the history
Address multiple source of flakes in changefeed tests.

cockroachdb#83530 made a change
to ensure that changefeed do not fail when they are in the transient
(e.g. pause-requested) state.  Unfortunately, the PR made a mistake
where even if the checkpoint could not be completed because
the cangefeed is in the "pause requested" state, we would still
proceed to emit resolved event.  This is wrong, and the resolved
event should never be emitted if we failed to checkpoint.

In addition, alter changefeed can be used to add new tables to existing
changefeed, with initial scan.  In such cases, the newly added table
will emit events as of the timestamp of "alter changefeed statement".
When this happens, the semantics around resolved events are murky
as document in cockroachdb#84102
Address this issue by making cloud storage sink more permissive around
it's handling of resolved timestamp.

When completing initial scan for newly added tables, fix an "off by 1"
error when frontier was advanced to the next timestamp.
This was wrong since cockroachdb#82451
clarified that the rangefeed start time is exclusive.

Informs cockroachdb#83882
Fixes cockroachdb#83946

Release Notes: None

Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
Yevgeniy Miretskiy committed Jul 27, 2022
1 parent f162001 commit b4ea63b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
24 changes: 17 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,32 +1633,33 @@ func (cf *changeFrontier) maybeCheckpointJob(

if updateCheckpoint || updateHighWater {
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
updated, err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint)
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
return true, nil
return updated, nil
}

return false, nil
}

func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
) (bool, error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }()
}
cf.metrics.FrontierUpdates.Inc(1)

return cf.js.job.Update(cf.Ctx, nil, func(
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", err.Error())
updateSkipped = err
return nil
}

Expand Down Expand Up @@ -1696,7 +1697,16 @@ func (cf *changeFrontier) checkpointJobProgress(
}

return nil
})
}); err != nil {
return false, err
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

return true, nil
}

// manageProtectedTimestamps periodically advances the protected timestamp for
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,10 +951,22 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return nil
}

if strings.Compare(c.resolved, path) >= 0 {
tsFromPath := func(p string) string {
return strings.Split(filepath.Base(p), "-")[0]
}

// Skip files with timestamp greater than the previously observed timestamp.
// Note: theoretically, we should be able to skip any file with timestamp
// greater *or equal* to the previously observed timestamp. However, alter
// changefeed pose a problem, since a table maybe added with initial scan
// option, causing new events (possibly including resolved event) to be
// emitted as of previously emitted timestamp.
// See https://github.com/cockroachdb/cockroach/issues/84102
if strings.Compare(tsFromPath(c.resolved), tsFromPath(path)) >= 0 {
// Already output this in a previous walkDir.
return nil
}

if strings.HasSuffix(path, `RESOLVED`) {
resolvedPayload, err := ioutil.ReadFile(path)
if err != nil {
Expand Down

0 comments on commit b4ea63b

Please sign in to comment.