Skip to content

Commit

Permalink
changefeedccl: periodic pts record updates
Browse files Browse the repository at this point in the history
Previously changefeeds only laid down protected timestamp records to
protect against either an ongoing backfill or the changefeed lagging
behind.  This is insufficient in cases such as if the gcttl is very
short, recurring errors retry the changefeed for too long, or in
upcoming work to enable serverless to shut down idle changefeeds.

This PR removes the manual PTS protection on backfills and begins an
async routine on the changeFrontier that updates the protected timestamp
record to the current highwater mark.

Fixes #76247

Release note (enterprise change): changefeeds running on tables with a
low gcttl will function more reliably due to protected timestamps being
maintained for the changefeed targets at the resolved timestamp of the
changefeed.  The frequency at which the protected timestamp is updated
to the resolved timestamp can be configured through the
`changefeed.protect_timestamp_interval` cluster setting. If the
changefeed lags too far behind such that storage of old data becomes an
issue, cancelling the changefeed will release the protected timestamps
and allow garbage collection to resume. If
`protect_data_from_gc_on_pause` is unset, pausing the changefeed will
release the existing protected timestamp record.
  • Loading branch information
samiskin committed Feb 17, 2022
1 parent a3cfd63 commit 54203d6
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 168 deletions.
23 changes: 9 additions & 14 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -46,32 +43,30 @@ func emitResolvedTimestamp(
return nil
}

func shouldProtectTimestamps(codec keys.SQLCodec) bool {
// TODO(smiskin): Remove this restriction once tenant based pts are enabled
return codec.ForSystemTenant()
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp. The progress struct will be
// updated to refer to this new protected timestamp record.
func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) error {
if !codec.ForSystemTenant() {
return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed")
}

) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
rec := jobsprotectedts.MakeRecord(

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
Expand Down
139 changes: 40 additions & 99 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -913,6 +912,10 @@ type changeFrontier struct {
// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// lastProtectedTimestampUpdate is the last time the protected timestamp
// record was updated to the frontier's highwater mark
lastProtectedTimestampUpdate time.Time

// js, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
js *jobState
Expand Down Expand Up @@ -1217,13 +1220,6 @@ func (cf *changeFrontier) closeMetrics() {
cf.metrics.mu.Unlock()
}

// shouldProtectBoundaries checks the job's spec to determine whether it should
// install protected timestamps when encountering scan boundaries.
func (cf *changeFrontier) shouldProtectBoundaries() bool {
policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
return policy == changefeedbase.OptSchemaChangePolicyBackfill
}

// Next is part of the RowSource interface.
func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for cf.State == execinfra.StateRunning {
Expand Down Expand Up @@ -1315,7 +1311,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
return err
}

isBehind := cf.maybeLogBehindSpan(frontierChanged)
cf.maybeLogBehindSpan(frontierChanged)

// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged
Expand All @@ -1325,7 +1321,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind)
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}
Expand All @@ -1352,7 +1348,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
}

func (cf *changeFrontier) maybeCheckpointJob(
resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool,
resolvedSpan jobspb.ResolvedSpan, frontierChanged bool,
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
Expand All @@ -1374,11 +1370,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
!inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged))

if updateCheckpoint || updateHighWater {
manageProtected := updateHighWater
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(
cf.frontier.Frontier(), manageProtected, checkpoint, isBehind,
); err != nil {
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
Expand All @@ -1388,16 +1381,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
return false, nil
}

// checkpointJobProgress checkpoints a changefeed-level job information.
// In addition, if 'manageProtected' is true, which only happens when frontier advanced,
// this method manages the protected timestamp state.
// The isBehind argument is used to determine whether an existing protected timestamp
// should be released.
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp,
manageProtected bool,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
isBehind bool,
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
Expand All @@ -1418,16 +1403,15 @@ func (cf *changeFrontier) checkpointJobProgress(
HighWater: &frontier,
}

// Manage protected timestamps.
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if manageProtected {
if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil {
return err
changefeedProgress.Checkpoint = &checkpoint

if shouldProtectTimestamps(cf.flowCtx.Codec()) {
if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}
}

changefeedProgress.Checkpoint = &checkpoint

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
Expand All @@ -1446,77 +1430,40 @@ func (cf *changeFrontier) checkpointJobProgress(
})
}

// manageProtectedTimestamps is called when the resolved timestamp is being
// checkpointed. The changeFrontier always checkpoints resolved timestamps
// which occur at scan boundaries. It releases previously protected timestamps
// if the changefeed is not behind. See maybeLogBehindSpan for details on the
// behind calculation.
//
// Note that this function is never called for sinkless changefeeds as they have
// no corresponding job and thus no corresponding distributed state on which to
// attach protected timestamp information.
//
// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds,
// perhaps by using whatever mechanism is eventually built to protect
// data for long-running SQL transactions. There's some discussion of this
// use case in the protected timestamps RFC.
// manageProtectedTimestamps periodically advances the protected timestamp for
// the changefeed's targets to the current highwater mark. The record is
// cleared during changefeedResumer.OnFailOrCancel
func (cf *changeFrontier) manageProtectedTimestamps(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
txn *kv.Txn,
resolved hlc.Timestamp,
isBehind bool,
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil {
return err
}
return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved)
}

// maybeReleaseProtectedTimestamp will release the current protected timestamp
// if either the resolved timestamp is close to the present or we've reached
// a new schemaChangeBoundary which will be protected.
func (cf *changeFrontier) maybeReleaseProtectedTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
isBehind bool,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && isBehind {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
return nil
}
log.VEventf(ctx, 2, "releasing protected timestamp %v",
progress.ProtectedTimestampRecord)
if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil {
return err
cf.lastProtectedTimestampUpdate = timeutil.Now()

pts := cf.flowCtx.Cfg.ProtectedTimestampProvider

// Create / advance the protected timestamp record to the highwater mark
highWater := cf.frontier.Frontier()
if highWater.Less(cf.highWaterAtStart) {
highWater = cf.highWaterAtStart
}
progress.ProtectedTimestampRecord = uuid.Nil
return nil
}

// maybeProtectTimestamp creates a new protected timestamp when the
// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates
// that we should perform a backfill (see cf.shouldProtectBoundaries()).
func (cf *changeFrontier) maybeProtectTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
resolved hlc.Timestamp,
) error {
if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() {
return nil
recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
} else {
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater)
if err := pts.UpdateTimestamp(ctx, txn, recordID, highWater); err != nil {
return err
}
}

jobID := cf.spec.JobID
targets := cf.spec.Feed.Targets
return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress)
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
Expand Down Expand Up @@ -1598,12 +1545,6 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// isTenant() bool returns true if this changeFrontier is running on a
// tenant.
func (cf *changeFrontier) isTenant() bool {
return !cf.flowCtx.Codec().ForSystemTenant()
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
Expand Down
69 changes: 26 additions & 43 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/featureflag"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
Expand Down Expand Up @@ -406,29 +405,24 @@ func changefeedPlanHook(
}
}

// The below block creates the job and if there's an initial scan, protects
// the data required for that scan. We protect the data here rather than in
// The below block creates the job and protects the data required for the
// changefeed to function from being garbage collected even if the
// changefeed lags behind the gcttl. We protect the data here rather than in
// Resume to shorten the window that data may be GC'd. The protected
// timestamps are removed and created during the execution of the changefeed
// by the changeFrontier when checkpointing progress. Additionally protected
// timestamps are removed in OnFailOrCancel. See the comment on
// changeFrontier.manageProtectedTimestamps for more details on the handling of
// protected timestamps.
// timestamps are updated to the highwater mark periodically during the
// execution of the changefeed by the changeFrontier. Protected timestamps
// are removed in OnFailOrCancel. See
// changeFrontier.manageProtectedTimestamps for more details on the handling
// of protected timestamps.
var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
{

var protectedTimestampID uuid.UUID
var ptr *ptpb.Record

shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant()
if shouldProtectTimestamp {
protectedTimestampID = uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
targetToProtect := makeTargetToProtect(details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime,
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

jr := jobs.Record{
Expand Down Expand Up @@ -855,38 +849,27 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(
var _ jobs.PauseRequester = (*changefeedResumer)(nil)

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we want to install a protected timestamp at the most recent high
// watermark if there isn't already one.
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)
if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
return nil
}

cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

// If we already have a protected timestamp record, keep it where it is.
if cp.ProtectedTimestampRecord != uuid.Nil {
return nil
}

resolved := progress.GetHighWater()
if resolved == nil {
// This should only happen if the job was created in a version that did not
// use protected timestamps but has yet to checkpoint its high water.
// Changefeeds from older versions didn't get protected timestamps so it's
// fine to not protect this one. In newer versions changefeeds which perform
// an initial scan at the statement time (and don't have an initial high
// water) will have a protected timestamp.
return nil
if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
if cp.ProtectedTimestampRecord != uuid.Nil {
if err := execCfg.ProtectedTimestampProvider.Release(ctx, txn, cp.ProtectedTimestampRecord); err != nil {
log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err)
} else {
cp.ProtectedTimestampRecord = uuid.Nil
}
}
}

execCfg := jobExec.(sql.JobExecContext).ExecCfg()
pts := execCfg.ProtectedTimestampProvider
return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, b.job.ID(),
details.Targets, *resolved, cp)
return nil
}

// getQualifiedTableName returns the database-qualified name of the table
Expand Down
Loading

0 comments on commit 54203d6

Please sign in to comment.