Skip to content

Commit

Permalink
changefeedccl: cleanup deprecated pts handling
Browse files Browse the repository at this point in the history
Going forward we no longer want customers to be able to turn off
active protected timestamps, as we always want to remain resilient to
low ttl tables.

Release note (enterprise change):
The changefeed.active_protected_timestamps.enabled cluster setting has
been removed and is always treated as if it was true.
  • Loading branch information
samiskin committed Dec 14, 2022
1 parent 7e0ecb6 commit dce1723
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 72 deletions.
50 changes: 1 addition & 49 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1246,13 +1245,7 @@ func (cf *changeFrontier) checkpointJobProgress(
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint

timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil {
if err := cf.manageProtectedTimestamps(cf.Ctx(), txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err)
return err
}
Expand Down Expand Up @@ -1331,47 +1324,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
return nil
}

// deprecatedManageProtectedTimestamps only sets a protected timestamp when the
// changefeed is in a backfill or the highwater is lagging behind to a
// sufficient degree after a backfill. This was deprecated in favor of always
// maintaining a timestamp record to avoid issues with a low gcttl setting.
func (cf *changeFrontier) deprecatedManageProtectedTimestamps(
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.deprecatedMaybeReleaseProtectedTimestamp(ctx, progress, pts, txn); err != nil {
return err
}

schemaChangePolicy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
shouldProtectBoundaries := schemaChangePolicy == changefeedbase.OptSchemaChangePolicyBackfill
if cf.frontier.schemaChangeBoundaryReached() && shouldProtectBoundaries {
highWater := cf.frontier.Frontier()
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
return pts.Protect(ctx, txn, ptr)
}
return nil
}

func (cf *changeFrontier) deprecatedMaybeReleaseProtectedTimestamp(
ctx context.Context, progress *jobspb.ChangefeedProgress, pts protectedts.Storage, txn *kv.Txn,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && cf.isBehind() {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
return nil
}
log.VEventf(ctx, 2, "releasing protected timestamp %v",
progress.ProtectedTimestampRecord)
if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil {
return err
}
progress.ProtectedTimestampRecord = uuid.Nil
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
if cf.freqEmitResolved == emitNoResolved || newResolved.IsEmpty() {
return nil
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,14 @@ func changefeedPlanHook(
{
var ptr *ptpb.Record
codec := p.ExecCfg().Codec

activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV)
initialScanType, err := opts.GetInitialScanType()
if err != nil {
return err
}
shouldProtectTimestamp := activeTimestampProtection || (initialScanType != changefeedbase.NoInitialScan)
if shouldProtectTimestamp {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed())
}
ptr = createProtectedTimestampRecord(
ctx,
codec,
jobID,
AllTargets(details),
details.StatementTime,
progress.GetChangefeed(),
)

jr.Progress = *progress.GetChangefeed()

Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,6 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting(
settings.PositiveDuration,
)

// ActiveProtectedTimestampsEnabled enables always having protected timestamps
// laid down that are periodically advanced to the highwater mark.
var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.active_protected_timestamps.enabled",
"if set, rather than only protecting changefeed targets from garbage collection during backfills, data will always be protected up to the changefeed's frontier",
true,
)

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down
9 changes: 5 additions & 4 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ var retiredSettings = map[string]struct{}{
"server.web_session.auto_logout.timeout": {},

// removed as of 23.1.
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
"sql.distsql.drain.cancel_after_wait.enabled": {},
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
"sql.distsql.drain.cancel_after_wait.enabled": {},
"changefeed.active_protected_timestamps.enabled": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit dce1723

Please sign in to comment.