From 519906fc4d533b16b6e7407dde7e355266cd91bd Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Tue, 15 Feb 2022 14:18:58 -0500 Subject: [PATCH] changefeedccl: periodic pts record updates Previously changefeeds only laid down protected timestamp records to protect against either an ongoing backfill or the changefeed lagging behind. This is insufficient in cases such as if the gcttl is very short, recurring errors retry the changefeed for too long, or in upcoming work to enable serverless to shut down idle changefeeds. This PR removes the manual PTS protection on backfills and begins an async routine on the changeFrontier that updates the protected timestamp record to the current highwater mark. Release note: None --- pkg/ccl/changefeedccl/changefeed.go | 22 +-- .../changefeedccl/changefeed_processors.go | 159 +++++++----------- pkg/ccl/changefeedccl/changefeed_stmt.go | 66 +++----- pkg/ccl/changefeedccl/changefeed_test.go | 107 ++++++++++-- .../changefeedccl/changefeedbase/settings.go | 9 + 5 files changed, 200 insertions(+), 163 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 544900e7f5fb..5c592c81f33b 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -15,15 +15,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" ) const ( @@ -46,32 +43,29 @@ func emitResolvedTimestamp( return nil } +func shouldProtectTimestamps(codec keys.SQLCodec) bool { + return codec.ForSystemTenant() +} + // createProtectedTimestampRecord will create a record to protect the spans for // this changefeed at the resolved timestamp. The progress struct will be // updated to refer to this new protected timestamp record. func createProtectedTimestampRecord( ctx context.Context, codec keys.SQLCodec, - pts protectedts.Storage, - txn *kv.Txn, jobID jobspb.JobID, targets jobspb.ChangefeedTargets, resolved hlc.Timestamp, progress *jobspb.ChangefeedProgress, -) error { - if !codec.ForSystemTenant() { - return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed") - } - +) *ptpb.Record { progress.ProtectedTimestampRecord = uuid.MakeV4() - log.VEventf(ctx, 2, "creating protected timestamp %v at %v", - progress.ProtectedTimestampRecord, resolved) deprecatedSpansToProtect := makeSpansToProtect(codec, targets) targetToProtect := makeTargetToProtect(targets) - rec := jobsprotectedts.MakeRecord( + + log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved) + return jobsprotectedts.MakeRecord( progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) - return pts.Protect(ctx, txn, rec) } func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target { diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d5572d5afe5b..a781d4bc19f9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1170,6 +1169,13 @@ func (cf *changeFrontier) Start(ctx context.Context) { } } + if err := cf.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-pts-manager", func(ctx context.Context) { + cf.manageProtectedTimestamps(ctx) + }); err != nil { + cf.MoveToDraining(err) + return + } + cf.metrics.mu.Lock() cf.metricsID = cf.metrics.mu.id cf.metrics.mu.id++ @@ -1217,13 +1223,6 @@ func (cf *changeFrontier) closeMetrics() { cf.metrics.mu.Unlock() } -// shouldProtectBoundaries checks the job's spec to determine whether it should -// install protected timestamps when encountering scan boundaries. -func (cf *changeFrontier) shouldProtectBoundaries() bool { - policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - return policy == changefeedbase.OptSchemaChangePolicyBackfill -} - // Next is part of the RowSource interface. func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for cf.State == execinfra.StateRunning { @@ -1315,7 +1314,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { return err } - isBehind := cf.maybeLogBehindSpan(frontierChanged) + cf.maybeLogBehindSpan(frontierChanged) // If frontier changed, we emit resolved timestamp. emitResolved := frontierChanged @@ -1325,7 +1324,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { // have no distributed state whatsoever. Because of this they also do not // use protected timestamps. if cf.js != nil { - checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind) + checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) if err != nil { return err } @@ -1352,7 +1351,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { } func (cf *changeFrontier) maybeCheckpointJob( - resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool, + resolvedSpan jobspb.ResolvedSpan, frontierChanged bool, ) (bool, error) { // When in a Backfill, the frontier remains unchanged at the backfill boundary // as we receive spans from the scan request at the Backfill Timestamp @@ -1374,11 +1373,8 @@ func (cf *changeFrontier) maybeCheckpointJob( !inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged)) if updateCheckpoint || updateHighWater { - manageProtected := updateHighWater checkpointStart := timeutil.Now() - if err := cf.checkpointJobProgress( - cf.frontier.Frontier(), manageProtected, checkpoint, isBehind, - ); err != nil { + if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil { return false, err } cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) @@ -1388,16 +1384,8 @@ func (cf *changeFrontier) maybeCheckpointJob( return false, nil } -// checkpointJobProgress checkpoints a changefeed-level job information. -// In addition, if 'manageProtected' is true, which only happens when frontier advanced, -// this method manages the protected timestamp state. -// The isBehind argument is used to determine whether an existing protected timestamp -// should be released. func (cf *changeFrontier) checkpointJobProgress( - frontier hlc.Timestamp, - manageProtected bool, - checkpoint jobspb.ChangefeedProgress_Checkpoint, - isBehind bool, + frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint, ) (err error) { updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency if updateRunStatus { @@ -1420,12 +1408,6 @@ func (cf *changeFrontier) checkpointJobProgress( // Manage protected timestamps. changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - if manageProtected { - if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil { - return err - } - } - changefeedProgress.Checkpoint = &checkpoint if updateRunStatus { @@ -1446,77 +1428,62 @@ func (cf *changeFrontier) checkpointJobProgress( }) } -// manageProtectedTimestamps is called when the resolved timestamp is being -// checkpointed. The changeFrontier always checkpoints resolved timestamps -// which occur at scan boundaries. It releases previously protected timestamps -// if the changefeed is not behind. See maybeLogBehindSpan for details on the -// behind calculation. -// -// Note that this function is never called for sinkless changefeeds as they have -// no corresponding job and thus no corresponding distributed state on which to -// attach protected timestamp information. -// -// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds, -// perhaps by using whatever mechanism is eventually built to protect -// data for long-running SQL transactions. There's some discussion of this -// use case in the protected timestamps RFC. -func (cf *changeFrontier) manageProtectedTimestamps( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - txn *kv.Txn, - resolved hlc.Timestamp, - isBehind bool, -) error { - pts := cf.flowCtx.Cfg.ProtectedTimestampProvider - if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil { - return err +// manageProtectedTimestamps periodically advances the protected timestamp for +// the changefeed's targets to the current highwater mark. The record is +// cleared during changefeedResumer.OnFailOrCancel +func (cf *changeFrontier) manageProtectedTimestamps(ctx context.Context) error { + if cf.isSinkless() || cf.isTenant() { + return errors.AssertionFailedf("manageProtectedTimestamps unsupported on sinkless or tenant-based environments") } - return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved) -} -// maybeReleaseProtectedTimestamp will release the current protected timestamp -// if either the resolved timestamp is close to the present or we've reached -// a new schemaChangeBoundary which will be protected. -func (cf *changeFrontier) maybeReleaseProtectedTimestamp( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - pts protectedts.Storage, - txn *kv.Txn, - isBehind bool, -) error { - if progress.ProtectedTimestampRecord == uuid.Nil { - return nil + if !cf.flowCtx.Codec().ForSystemTenant() { + return errors.AssertionFailedf("manageProtectedTimestamps must run on the system tenant") } - if !cf.frontier.schemaChangeBoundaryReached() && isBehind { - log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind") - return nil - } - log.VEventf(ctx, 2, "releasing protected timestamp %v", - progress.ProtectedTimestampRecord) - if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil { - return err - } - progress.ProtectedTimestampRecord = uuid.Nil - return nil -} -// maybeProtectTimestamp creates a new protected timestamp when the -// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates -// that we should perform a backfill (see cf.shouldProtectBoundaries()). -func (cf *changeFrontier) maybeProtectTimestamp( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - pts protectedts.Storage, - txn *kv.Txn, - resolved hlc.Timestamp, -) error { - if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() { - return nil - } + var timer timeutil.Timer + defer timer.Stop() + + for { + pts := cf.flowCtx.Cfg.ProtectedTimestampProvider + + // Create / advance the protected timestamp record to the highwater mark + cf.js.job.Update(ctx, nil, func( + txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + progress := md.Progress + changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed + + highWater := cf.frontier.Frontier() + if highWater.Less(cf.highWaterAtStart) { + highWater = cf.highWaterAtStart + } - jobID := cf.spec.JobID - targets := cf.spec.Feed.Targets - return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress) + var err error + recordID := changefeedProgress.ProtectedTimestampRecord + if recordID == uuid.Nil { + ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, changefeedProgress) + pts.Protect(ctx, txn, ptr) + } else { + log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater) + err = pts.UpdateTimestamp(ctx, txn, recordID, highWater) + } + if err != nil { + return err + } + + ju.UpdateProgress(progress) + return nil + }) + + ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV) + timer.Reset(ptsUpdateInterval) + select { + case <-timer.C: + timer.Read = true + case <-cf.flowCtx.Stopper().ShouldQuiesce(): + return nil + } + } } func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 06ed22d5da15..c2f1ba178778 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -406,29 +405,24 @@ func changefeedPlanHook( } } - // The below block creates the job and if there's an initial scan, protects - // the data required for that scan. We protect the data here rather than in + // The below block creates the job and protects the data required for the + // changefeed to function from being garbage collected even if the + // changefeed lags behind the gcttl. We protect the data here rather than in // Resume to shorten the window that data may be GC'd. The protected - // timestamps are removed and created during the execution of the changefeed - // by the changeFrontier when checkpointing progress. Additionally protected - // timestamps are removed in OnFailOrCancel. See the comment on - // changeFrontier.manageProtectedTimestamps for more details on the handling of - // protected timestamps. + // timestamps are updated to the highwater mark periodically during the + // execution of the changefeed by the changeFrontier. Protected timestamps + // are removed in OnFailOrCancel. See + // changeFrontier.manageProtectedTimestamps for more details on the handling + // of protected timestamps. var sj *jobs.StartableJob jobID := p.ExecCfg().JobRegistry.MakeJobID() { - - var protectedTimestampID uuid.UUID var ptr *ptpb.Record - - shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant() - if shouldProtectTimestamp { - protectedTimestampID = uuid.MakeV4() - deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets) - targetToProtect := makeTargetToProtect(details.Targets) - progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID - ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime, - deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) + var protectedTimestampID uuid.UUID + codec := p.ExecCfg().Codec + if shouldProtectTimestamps(codec) { + ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed()) + protectedTimestampID = ptr.ID.GetUUID() } jr := jobs.Record{ @@ -855,38 +849,24 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( var _ jobs.PauseRequester = (*changefeedResumer)(nil) // OnPauseRequest implements jobs.PauseRequester. If this changefeed is being -// paused, we want to install a protected timestamp at the most recent high -// watermark if there isn't already one. +// paused, we may want to clear the protected timestamp record. func (b *changefeedResumer) OnPauseRequest( ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress, ) error { details := b.job.Details().(jobspb.ChangefeedDetails) - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { - return nil - } cp := progress.GetChangefeed() + execCfg := jobExec.(sql.JobExecContext).ExecCfg() - // If we already have a protected timestamp record, keep it where it is. - if cp.ProtectedTimestampRecord != uuid.Nil { - return nil - } - - resolved := progress.GetHighWater() - if resolved == nil { - // This should only happen if the job was created in a version that did not - // use protected timestamps but has yet to checkpoint its high water. - // Changefeeds from older versions didn't get protected timestamps so it's - // fine to not protect this one. In newer versions changefeeds which perform - // an initial scan at the statement time (and don't have an initial high - // water) will have a protected timestamp. - return nil + if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { + // Release existing pts record to avoid a single changefeed left on pause + // resulting in storage issues + if cp.ProtectedTimestampRecord != uuid.Nil { + execCfg.ProtectedTimestampProvider.Release(ctx, txn, cp.ProtectedTimestampRecord) + cp.ProtectedTimestampRecord = uuid.Nil + } } - - execCfg := jobExec.(sql.JobExecContext).ExecCfg() - pts := execCfg.ProtectedTimestampProvider - return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, b.job.ID(), - details.Targets, *resolved, cp) + return nil } // getQualifiedTableName returns the database-qualified name of the table diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index eedd3b0a8f29..9e248315bcf5 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3663,6 +3663,75 @@ func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) { t.Run(`pubsub`, pubsubTest(testFn)) } +func TestChangefeedUpdateProtectedTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + ptsInterval := 50 * time.Millisecond + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &f.Server().ClusterSettings().SV, ptsInterval) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`) + defer closeFeed(t, foo) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + f.Server().DB(), keys.SystemSQLCodec, "d", "foo") + tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + ptsProvider := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider + + var tableID int + sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+ + `WHERE name = 'foo' AND database_name = current_database()`). + Scan(&tableID) + + getTablePtsRecord := func() *ptpb.Record { + var r *ptpb.Record + require.NoError(t, ptsProvider.Refresh(context.Background(), f.Server().Clock().Now())) + ptsProvider.Iterate(context.Background(), tableSpan.Key, tableSpan.EndKey, func(record *ptpb.Record) (wantMore bool) { + r = record + return false + }) + + expectedKeys := map[string]struct{}{ + string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {}, + string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {}, + } + require.Equal(t, len(r.DeprecatedSpans), len(expectedKeys)) + for _, s := range r.DeprecatedSpans { + require.Contains(t, expectedKeys, string(s.Key)) + } + return r + } + + // Wait and return the next resolved timestamp after the wait time + waitAndDrainResolved := func(ts time.Duration) hlc.Timestamp { + targetTs := timeutil.Now().Add(ts) + for { + resolvedTs, _ := expectResolvedTimestamp(t, foo) + if resolvedTs.GoTime().UnixNano() > targetTs.UnixNano() { + return resolvedTs + } + } + } + + // Observe the protected timestamp advancing along with resolved timestamps + for i := 0; i < 5; i++ { + // Progress the changefeed and allow time for a pts record to be laid down + nextResolved := waitAndDrainResolved(100 * time.Millisecond) + time.Sleep(2 * ptsInterval) + rec := getTablePtsRecord() + require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), rec.Timestamp.GoTime().UnixNano()) + } + } + + t.Run(`enterprise`, enterpriseTest(testFn, feedTestNoTenants)) + t.Run(`kafka`, kafkaTest(testFn, feedTestNoTenants)) + t.Run(`webhook`, webhookTest(testFn, feedTestNoTenants)) +} + func TestChangefeedProtectedTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3766,11 +3835,24 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `WHERE name = 'foo' AND database_name = current_database()`). Scan(&tableID) + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &f.Server().ClusterSettings().SV, 100*time.Millisecond) + ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider getPtsRec := mkGetPtsRec(t, ptp, f.Server().Clock()) waitForRecord := mkWaitForRecordCond(t, getPtsRec, mkCheckRecord(t, tableID)) waitForNoRecord := mkWaitForRecordCond(t, getPtsRec, checkNoRecord) waitForBlocked := requestBlockedScan() + waitForRecordAdvanced := func(ts hlc.Timestamp) { + check := func(ptr *ptpb.Record) error { + if ptr != nil && !ptr.Timestamp.LessEq(ts) { + return nil + } + return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, ptr.Timestamp) + } + + mkWaitForRecordCond(t, getPtsRec, check)() + } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved`) defer closeFeed(t, foo) @@ -3787,8 +3869,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [7]->{"after": {"a": 7, "b": "d"}}`, `foo: [8]->{"after": {"a": 8, "b": "e"}}`, }) - expectResolvedTimestamp(t, foo) - waitForNoRecord() + resolved, _ := expectResolvedTimestamp(t, foo) + waitForRecordAdvanced(resolved) } { @@ -3806,8 +3888,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [7]->{"after": {"a": 7, "b": "d", "c": 1}}`, `foo: [8]->{"after": {"a": 8, "b": "e", "c": 1}}`, }) - expectResolvedTimestamp(t, foo) - waitForNoRecord() + resolved, _ := expectResolvedTimestamp(t, foo) + waitForRecordAdvanced(resolved) } { @@ -3878,7 +3960,7 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) { r, err = pts.GetRecord(ctx, txn, details.ProtectedTimestampRecord) return err })) - require.Equal(t, r.Timestamp, *progress.GetHighWater()) + require.LessOrEqual(t, r.Timestamp, *progress.GetHighWater()) } else { require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord) } @@ -3888,14 +3970,19 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) { // the changefeed has caught up. require.NoError(t, feedJob.Resume()) testutils.SucceedsSoon(t, func() error { - expectResolvedTimestamp(t, foo) + resolvedTs, _ := expectResolvedTimestamp(t, foo) j, err := jr.LoadJob(ctx, feedJob.JobID()) require.NoError(t, err) details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed - if details.ProtectedTimestampRecord != uuid.Nil { - return fmt.Errorf("expected no protected timestamp record") - } - return nil + + err = serverCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + r, err := pts.GetRecord(ctx, txn, details.ProtectedTimestampRecord) + if err != nil || r.Timestamp.Less(resolvedTs) { + return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs) + } + return nil + }) + return err }) } } diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 4651dac53413..aa26650201f9 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -167,3 +167,12 @@ var EventMemoryMultiplier = settings.RegisterFloatSetting( return nil }, ) + +// ProtectTimestampInterval controls the frequency of protected timestamp record updates +var ProtectTimestampInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.protect_timestamp_interval", + "controls how often the changefeed forwards its protected timestamp to the highwater mark", + 10*time.Minute, + settings.PositiveDuration, +)