Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: periodic pts record updates #76605

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -46,32 +43,30 @@ func emitResolvedTimestamp(
return nil
}

func shouldProtectTimestamps(codec keys.SQLCodec) bool {
// TODO(smiskin): Remove this restriction once tenant based pts are enabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we TODO this @samiskin @adityamaru, given #73727 (comment) ?

return codec.ForSystemTenant()
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp. The progress struct will be
// updated to refer to this new protected timestamp record.
func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) error {
if !codec.ForSystemTenant() {
return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed")
}

) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
rec := jobsprotectedts.MakeRecord(

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
Expand Down
139 changes: 40 additions & 99 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -913,6 +912,10 @@ type changeFrontier struct {
// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// lastProtectedTimestampUpdate is the last time the protected timestamp
// record was updated to the frontier's highwater mark
lastProtectedTimestampUpdate time.Time

// js, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
js *jobState
Expand Down Expand Up @@ -1217,13 +1220,6 @@ func (cf *changeFrontier) closeMetrics() {
cf.metrics.mu.Unlock()
}

// shouldProtectBoundaries checks the job's spec to determine whether it should
// install protected timestamps when encountering scan boundaries.
func (cf *changeFrontier) shouldProtectBoundaries() bool {
policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
return policy == changefeedbase.OptSchemaChangePolicyBackfill
}

// Next is part of the RowSource interface.
func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for cf.State == execinfra.StateRunning {
Expand Down Expand Up @@ -1315,7 +1311,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
return err
}

isBehind := cf.maybeLogBehindSpan(frontierChanged)
cf.maybeLogBehindSpan(frontierChanged)

// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged
Expand All @@ -1325,7 +1321,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind)
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}
Expand All @@ -1352,7 +1348,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
}

func (cf *changeFrontier) maybeCheckpointJob(
resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool,
resolvedSpan jobspb.ResolvedSpan, frontierChanged bool,
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
Expand All @@ -1374,11 +1370,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
!inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged))

if updateCheckpoint || updateHighWater {
manageProtected := updateHighWater
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(
cf.frontier.Frontier(), manageProtected, checkpoint, isBehind,
); err != nil {
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
Expand All @@ -1388,16 +1381,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
return false, nil
}

// checkpointJobProgress checkpoints a changefeed-level job information.
// In addition, if 'manageProtected' is true, which only happens when frontier advanced,
// this method manages the protected timestamp state.
// The isBehind argument is used to determine whether an existing protected timestamp
// should be released.
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp,
manageProtected bool,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
isBehind bool,
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
Expand All @@ -1418,16 +1403,15 @@ func (cf *changeFrontier) checkpointJobProgress(
HighWater: &frontier,
}

// Manage protected timestamps.
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if manageProtected {
if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil {
return err
changefeedProgress.Checkpoint = &checkpoint

if shouldProtectTimestamps(cf.flowCtx.Codec()) {
if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}
}

changefeedProgress.Checkpoint = &checkpoint

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
Expand All @@ -1446,77 +1430,40 @@ func (cf *changeFrontier) checkpointJobProgress(
})
}

// manageProtectedTimestamps is called when the resolved timestamp is being
// checkpointed. The changeFrontier always checkpoints resolved timestamps
// which occur at scan boundaries. It releases previously protected timestamps
// if the changefeed is not behind. See maybeLogBehindSpan for details on the
// behind calculation.
//
// Note that this function is never called for sinkless changefeeds as they have
// no corresponding job and thus no corresponding distributed state on which to
// attach protected timestamp information.
//
// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds,
// perhaps by using whatever mechanism is eventually built to protect
// data for long-running SQL transactions. There's some discussion of this
// use case in the protected timestamps RFC.
// manageProtectedTimestamps periodically advances the protected timestamp for
// the changefeed's targets to the current highwater mark. The record is
// cleared during changefeedResumer.OnFailOrCancel
func (cf *changeFrontier) manageProtectedTimestamps(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
txn *kv.Txn,
resolved hlc.Timestamp,
isBehind bool,
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil {
return err
}
return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved)
}

// maybeReleaseProtectedTimestamp will release the current protected timestamp
// if either the resolved timestamp is close to the present or we've reached
// a new schemaChangeBoundary which will be protected.
func (cf *changeFrontier) maybeReleaseProtectedTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
isBehind bool,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && isBehind {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
return nil
}
log.VEventf(ctx, 2, "releasing protected timestamp %v",
progress.ProtectedTimestampRecord)
if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil {
return err
cf.lastProtectedTimestampUpdate = timeutil.Now()

pts := cf.flowCtx.Cfg.ProtectedTimestampProvider

// Create / advance the protected timestamp record to the highwater mark
highWater := cf.frontier.Frontier()
if highWater.Less(cf.highWaterAtStart) {
highWater = cf.highWaterAtStart
}
progress.ProtectedTimestampRecord = uuid.Nil
return nil
}

// maybeProtectTimestamp creates a new protected timestamp when the
// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates
// that we should perform a backfill (see cf.shouldProtectBoundaries()).
func (cf *changeFrontier) maybeProtectTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
resolved hlc.Timestamp,
) error {
if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() {
return nil
recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
} else {
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater)
if err := pts.UpdateTimestamp(ctx, txn, recordID, highWater); err != nil {
return err
}
}

jobID := cf.spec.JobID
targets := cf.spec.Feed.Targets
return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress)
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
Expand Down Expand Up @@ -1598,12 +1545,6 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// isTenant() bool returns true if this changeFrontier is running on a
// tenant.
func (cf *changeFrontier) isTenant() bool {
return !cf.flowCtx.Codec().ForSystemTenant()
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
Expand Down
69 changes: 26 additions & 43 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/featureflag"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
Expand Down Expand Up @@ -406,29 +405,24 @@ func changefeedPlanHook(
}
}

// The below block creates the job and if there's an initial scan, protects
// the data required for that scan. We protect the data here rather than in
// The below block creates the job and protects the data required for the
// changefeed to function from being garbage collected even if the
// changefeed lags behind the gcttl. We protect the data here rather than in
// Resume to shorten the window that data may be GC'd. The protected
// timestamps are removed and created during the execution of the changefeed
// by the changeFrontier when checkpointing progress. Additionally protected
// timestamps are removed in OnFailOrCancel. See the comment on
// changeFrontier.manageProtectedTimestamps for more details on the handling of
// protected timestamps.
// timestamps are updated to the highwater mark periodically during the
// execution of the changefeed by the changeFrontier. Protected timestamps
// are removed in OnFailOrCancel. See
// changeFrontier.manageProtectedTimestamps for more details on the handling
// of protected timestamps.
var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
{

var protectedTimestampID uuid.UUID
var ptr *ptpb.Record

shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant()
if shouldProtectTimestamp {
protectedTimestampID = uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
targetToProtect := makeTargetToProtect(details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime,
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

jr := jobs.Record{
Expand Down Expand Up @@ -855,38 +849,27 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(
var _ jobs.PauseRequester = (*changefeedResumer)(nil)

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we want to install a protected timestamp at the most recent high
// watermark if there isn't already one.
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)
if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
return nil
}

cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

// If we already have a protected timestamp record, keep it where it is.
if cp.ProtectedTimestampRecord != uuid.Nil {
return nil
}

resolved := progress.GetHighWater()
if resolved == nil {
// This should only happen if the job was created in a version that did not
// use protected timestamps but has yet to checkpoint its high water.
// Changefeeds from older versions didn't get protected timestamps so it's
// fine to not protect this one. In newer versions changefeeds which perform
// an initial scan at the statement time (and don't have an initial high
// water) will have a protected timestamp.
return nil
if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
if cp.ProtectedTimestampRecord != uuid.Nil {
if err := execCfg.ProtectedTimestampProvider.Release(ctx, txn, cp.ProtectedTimestampRecord); err != nil {
log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err)
} else {
cp.ProtectedTimestampRecord = uuid.Nil
}
}
}

execCfg := jobExec.(sql.JobExecContext).ExecCfg()
pts := execCfg.ProtectedTimestampProvider
return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, b.job.ID(),
details.Targets, *resolved, cp)
return nil
}

// getQualifiedTableName returns the database-qualified name of the table
Expand Down
Loading