diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 101d1ecff4e9..f001f60d0f21 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1243,13 +1242,7 @@ func (cf *changeFrontier) checkpointJobProgress( changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed changefeedProgress.Checkpoint = &checkpoint - timestampManager := cf.manageProtectedTimestamps - // TODO(samiskin): Remove this conditional and the associated deprecated - // methods once we're confident in ActiveProtectedTimestampsEnabled - if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { - timestampManager = cf.deprecatedManageProtectedTimestamps - } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { + if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil { log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) return err } @@ -1327,47 +1320,6 @@ func (cf *changeFrontier) manageProtectedTimestamps( return nil } -// deprecatedManageProtectedTimestamps only sets a protected timestamp when the -// changefeed is in a backfill or the highwater is lagging behind to a -// sufficient degree after a backfill. This was deprecated in favor of always -// maintaining a timestamp record to avoid issues with a low gcttl setting. -func (cf *changeFrontier) deprecatedManageProtectedTimestamps( - ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress, -) error { - pts := cf.flowCtx.Cfg.ProtectedTimestampProvider - if err := cf.deprecatedMaybeReleaseProtectedTimestamp(ctx, progress, pts, txn); err != nil { - return err - } - - schemaChangePolicy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - shouldProtectBoundaries := schemaChangePolicy == changefeedbase.OptSchemaChangePolicyBackfill - if cf.frontier.schemaChangeBoundaryReached() && shouldProtectBoundaries { - highWater := cf.frontier.Frontier() - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) - return pts.Protect(ctx, txn, ptr) - } - return nil -} - -func (cf *changeFrontier) deprecatedMaybeReleaseProtectedTimestamp( - ctx context.Context, progress *jobspb.ChangefeedProgress, pts protectedts.Storage, txn *kv.Txn, -) error { - if progress.ProtectedTimestampRecord == uuid.Nil { - return nil - } - if !cf.frontier.schemaChangeBoundaryReached() && cf.isBehind() { - log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind") - return nil - } - log.VEventf(ctx, 2, "releasing protected timestamp %v", - progress.ProtectedTimestampRecord) - if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil { - return err - } - progress.ProtectedTimestampRecord = uuid.Nil - return nil -} - func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if cf.freqEmitResolved == emitNoResolved || newResolved.IsEmpty() { return nil diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index dc796396a190..dcd069a332ef 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -261,16 +261,14 @@ func changefeedPlanHook( { var ptr *ptpb.Record codec := p.ExecCfg().Codec - - activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV) - initialScanType, err := opts.GetInitialScanType() - if err != nil { - return err - } - shouldProtectTimestamp := activeTimestampProtection || (initialScanType != changefeedbase.NoInitialScan) - if shouldProtectTimestamp { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed()) - } + ptr = createProtectedTimestampRecord( + ctx, + codec, + jobID, + AllTargets(details), + details.StatementTime, + progress.GetChangefeed(), + ) jr.Progress = *progress.GetChangefeed() diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 22841234c7e1..4c09ad9a6d38 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -211,15 +211,6 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting( settings.PositiveDuration, ) -// ActiveProtectedTimestampsEnabled enables always having protected timestamps -// laid down that are periodically advanced to the highwater mark. -var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, - "changefeed.active_protected_timestamps.enabled", - "if set, rather than only protecting changefeed targets from garbage collection during backfills, data will always be protected up to the changefeed's frontier", - true, -) - // BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index e50664666bdc..f75659d42535 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -150,9 +150,10 @@ var retiredSettings = map[string]struct{}{ "sql.ttl.default_range_concurrency": {}, // removed as of 23.1. - "sql.catalog.descs.validate_on_write.enabled": {}, - "sql.distsql.max_running_flows": {}, - "sql.distsql.flow_scheduler_queueing.enabled": {}, + "sql.catalog.descs.validate_on_write.enabled": {}, + "sql.distsql.max_running_flows": {}, + "sql.distsql.flow_scheduler_queueing.enabled": {}, + "changefeed.active_protected_timestamps.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults