diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index cd1dc1cefefa..2d6a8d7f93f6 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1633,11 +1633,12 @@ func (cf *changeFrontier) maybeCheckpointJob( if updateCheckpoint || updateHighWater { checkpointStart := timeutil.Now() - if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil { + updated, err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint) + if err != nil { return false, err } cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) - return true, nil + return updated, nil } return false, nil @@ -1645,20 +1646,20 @@ func (cf *changeFrontier) maybeCheckpointJob( func (cf *changeFrontier) checkpointJobProgress( frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint, -) (err error) { +) (bool, error) { updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency if updateRunStatus { defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }() } cf.metrics.FrontierUpdates.Inc(1) - - return cf.js.job.Update(cf.Ctx, nil, func( + var updateSkipped error + if err := cf.js.job.Update(cf.Ctx, nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during // pause-requested, simply skip the checkpoint if err := md.CheckRunningOrReverting(); err != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", err.Error()) + updateSkipped = err return nil } @@ -1696,7 +1697,16 @@ func (cf *changeFrontier) checkpointJobProgress( } return nil - }) + }); err != nil { + return false, err + } + + if updateSkipped != nil { + log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + return false, nil + } + + return true, nil } // manageProtectedTimestamps periodically advances the protected timestamp for diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2e471179044c..ba34b5219d12 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -951,10 +951,22 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error { return nil } - if strings.Compare(c.resolved, path) >= 0 { + tsFromPath := func(p string) string { + return strings.Split(filepath.Base(p), "-")[0] + } + + // Skip files with timestamp greater than the previously observed timestamp. + // Note: theoretically, we should be able to skip any file with timestamp + // greater *or equal* to the previously observed timestamp. However, alter + // changefeed pose a problem, since a table maybe added with initial scan + // option, causing new events (possibly including resolved event) to be + // emitted as of previously emitted timestamp. + // See https://github.com/cockroachdb/cockroach/issues/84102 + if strings.Compare(tsFromPath(c.resolved), tsFromPath(path)) >= 0 { // Already output this in a previous walkDir. return nil } + if strings.HasSuffix(path, `RESOLVED`) { resolvedPayload, err := ioutil.ReadFile(path) if err != nil {