From b4ea63bb7f91d2ea78428beff3ca10b9263ed4fa Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 8 Jul 2022 17:22:00 -0400 Subject: [PATCH] changefeedcc: De-flake changefeed tests. Address multiple source of flakes in changefeed tests. https://github.com/cockroachdb/cockroach/pull/83530 made a change to ensure that changefeed do not fail when they are in the transient (e.g. pause-requested) state. Unfortunately, the PR made a mistake where even if the checkpoint could not be completed because the cangefeed is in the "pause requested" state, we would still proceed to emit resolved event. This is wrong, and the resolved event should never be emitted if we failed to checkpoint. In addition, alter changefeed can be used to add new tables to existing changefeed, with initial scan. In such cases, the newly added table will emit events as of the timestamp of "alter changefeed statement". When this happens, the semantics around resolved events are murky as document in https://github.com/cockroachdb/cockroach/issues/84102 Address this issue by making cloud storage sink more permissive around it's handling of resolved timestamp. When completing initial scan for newly added tables, fix an "off by 1" error when frontier was advanced to the next timestamp. This was wrong since https://github.com/cockroachdb/cockroach/pull/82451 clarified that the rangefeed start time is exclusive. Informs #83882 Fixes #83946 Release Notes: None Release note (): --- .../changefeedccl/changefeed_processors.go | 24 +++++++++++++------ pkg/ccl/changefeedccl/testfeed_test.go | 14 ++++++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index cd1dc1cefefa..2d6a8d7f93f6 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1633,11 +1633,12 @@ func (cf *changeFrontier) maybeCheckpointJob( if updateCheckpoint || updateHighWater { checkpointStart := timeutil.Now() - if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil { + updated, err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint) + if err != nil { return false, err } cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) - return true, nil + return updated, nil } return false, nil @@ -1645,20 +1646,20 @@ func (cf *changeFrontier) maybeCheckpointJob( func (cf *changeFrontier) checkpointJobProgress( frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint, -) (err error) { +) (bool, error) { updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency if updateRunStatus { defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }() } cf.metrics.FrontierUpdates.Inc(1) - - return cf.js.job.Update(cf.Ctx, nil, func( + var updateSkipped error + if err := cf.js.job.Update(cf.Ctx, nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during // pause-requested, simply skip the checkpoint if err := md.CheckRunningOrReverting(); err != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", err.Error()) + updateSkipped = err return nil } @@ -1696,7 +1697,16 @@ func (cf *changeFrontier) checkpointJobProgress( } return nil - }) + }); err != nil { + return false, err + } + + if updateSkipped != nil { + log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + return false, nil + } + + return true, nil } // manageProtectedTimestamps periodically advances the protected timestamp for diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2e471179044c..ba34b5219d12 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -951,10 +951,22 @@ func (c *cloudFeed) walkDir(path string, info os.FileInfo, err error) error { return nil } - if strings.Compare(c.resolved, path) >= 0 { + tsFromPath := func(p string) string { + return strings.Split(filepath.Base(p), "-")[0] + } + + // Skip files with timestamp greater than the previously observed timestamp. + // Note: theoretically, we should be able to skip any file with timestamp + // greater *or equal* to the previously observed timestamp. However, alter + // changefeed pose a problem, since a table maybe added with initial scan + // option, causing new events (possibly including resolved event) to be + // emitted as of previously emitted timestamp. + // See https://github.com/cockroachdb/cockroach/issues/84102 + if strings.Compare(tsFromPath(c.resolved), tsFromPath(path)) >= 0 { // Already output this in a previous walkDir. return nil } + if strings.HasSuffix(path, `RESOLVED`) { resolvedPayload, err := ioutil.ReadFile(path) if err != nil {