Skip to content

Commit

Permalink
Merge #84109
Browse files Browse the repository at this point in the history
84109: changefeedcc: De-flake changefeed tests. r=miretskiy a=miretskiy

Address multiple source of flakes in changefeed tests.

#83530 made a change
to ensure that changefeed do not fail when they are in the transient
(e.g. pause-requested) state.  Unfortunately, the PR made a mistake
where even if the checkpoint could not be completed because
the cangefeed is in the "pause requested" state, we would still
proceed to emit resolved event.  This is wrong, and the resolved
event should never be emitted if we failed to checkpoint.

In addition, alter changefeed can be used to add new tables to existing
changefeed, with initial scan.  In such cases, the newly added table
will emit events as of the timestamp of "alter changefeed statement".
When this happens, the semantics around resolved events are murky
as documented in #84102
Address this issue by making cloud storage sink more permissive around
its handling of resolved timestamp.

When completing initial scan for newly added tables, fix an "off by 1"
error when frontier was advanced to the next timestamp.
This was wrong since #82451
clarified that the rangefeed start time is exclusive.

Informs #83882
Fixes #83946

Release Notes: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jul 9, 2022
2 parents cc22360 + 62c7995 commit 54b4fca
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
24 changes: 17 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,32 +1219,33 @@ func (cf *changeFrontier) maybeCheckpointJob(

if updateCheckpoint || updateHighWater {
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
updated, err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint)
if err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
return true, nil
return updated, nil
}

return false, nil
}

func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
) (bool, error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }()
}
cf.metrics.FrontierUpdates.Inc(1)

return cf.js.job.Update(cf.Ctx, nil, func(
var updateSkipped error
if err := cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// If we're unable to update the job due to the job state, such as during
// pause-requested, simply skip the checkpoint
if err := md.CheckRunningOrReverting(); err != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", err.Error())
updateSkipped = err
return nil
}

Expand Down Expand Up @@ -1282,7 +1283,16 @@ func (cf *changeFrontier) checkpointJobProgress(
}

return nil
})
}); err != nil {
return false, err
}

if updateSkipped != nil {
log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped)
return false, nil
}

return true, nil
}

// manageProtectedTimestamps periodically advances the protected timestamp for
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
return err
}
// We have scanned scannedSpans up to and including scannedTS. Advance frontier
// for those spans -- we can start their range feed from scannedTS.Next().
// for those spans. Note, since rangefeed start time is *exclusive* (that it, rangefeed
// starts from timestamp.Next()), we advanced frontier to the scannedTS.
for _, sp := range scannedSpans {
if _, err := rangeFeedResumeFrontier.Forward(sp, scannedTS.Next()); err != nil {
if _, err := rangeFeedResumeFrontier.Forward(sp, scannedTS); err != nil {
return err
}
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,22 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error {
return nil
}

if strings.Compare(c.resolved, path) >= 0 {
tsFromPath := func(p string) string {
return strings.Split(filepath.Base(p), "-")[0]
}

// Skip files with timestamp greater than the previously observed timestamp.
// Note: theoretically, we should be able to skip any file with timestamp
// greater *or equal* to the previously observed timestamp. However, alter
// changefeed pose a problem, since a table maybe added with initial scan
// option, causing new events (possibly including resolved event) to be
// emitted as of previously emitted timestamp.
// See https://github.com/cockroachdb/cockroach/issues/84102
if strings.Compare(tsFromPath(c.resolved), tsFromPath(path)) >= 0 {
// Already output this in a previous walkDir.
return nil
}

if strings.HasSuffix(path, `RESOLVED`) {
resolvedPayload, err := ioutil.ReadFile(path)
if err != nil {
Expand Down

0 comments on commit 54b4fca

Please sign in to comment.