Skip to content

Commit

Permalink
Merge pull request #77589 from samiskin/backport-changefeed-active-pr…
Browse files Browse the repository at this point in the history
…otected-ts

release-21.2: changefeedccl: resurrect manual protected timestamps
  • Loading branch information
samiskin authored Mar 11, 2022
2 parents 3994491 + b5d95d8 commit 238dba4
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 151 deletions.
28 changes: 12 additions & 16 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -44,30 +42,28 @@ func emitResolvedTimestamp(
return nil
}

func shouldProtectTimestamps(codec keys.SQLCodec) bool {
// TODO(smiskin): Remove this restriction once tenant based pts are enabled
return codec.ForSystemTenant()
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp. The progress struct will be
// updated to refer to this new protected timestamp record.
func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) error {
if !codec.ForSystemTenant() {
return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed")
}

) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
spansToProtect := makeSpansToProtect(codec, targets)
rec := jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, spansToProtect, jobsprotectedts.Jobs)
return pts.Protect(ctx, txn, rec)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect, jobsprotectedts.Jobs)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
Expand Down
191 changes: 95 additions & 96 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ type changeFrontier struct {
// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// lastProtectedTimestampUpdate is the last time the protected timestamp
// record was updated to the frontier's highwater mark
lastProtectedTimestampUpdate time.Time

// js, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
js *jobState
Expand Down Expand Up @@ -1218,13 +1222,6 @@ func (cf *changeFrontier) closeMetrics() {
cf.metrics.mu.Unlock()
}

// shouldProtectBoundaries checks the job's spec to determine whether it should
// install protected timestamps when encountering scan boundaries.
func (cf *changeFrontier) shouldProtectBoundaries() bool {
policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
return policy == changefeedbase.OptSchemaChangePolicyBackfill
}

// Next is part of the RowSource interface.
func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for cf.State == execinfra.StateRunning {
Expand Down Expand Up @@ -1316,7 +1313,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
return err
}

isBehind := cf.maybeLogBehindSpan(frontierChanged)
cf.maybeLogBehindSpan(frontierChanged)

// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged
Expand All @@ -1326,7 +1323,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind)
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}
Expand All @@ -1353,7 +1350,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
}

func (cf *changeFrontier) maybeCheckpointJob(
resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool,
resolvedSpan jobspb.ResolvedSpan, frontierChanged bool,
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
Expand All @@ -1375,11 +1372,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
!inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged))

if updateCheckpoint || updateHighWater {
manageProtected := updateHighWater
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(
cf.frontier.Frontier(), manageProtected, checkpoint, isBehind,
); err != nil {
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
Expand All @@ -1389,16 +1383,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
return false, nil
}

// checkpointJobProgress checkpoints a changefeed-level job information.
// In addition, if 'manageProtected' is true, which only happens when frontier advanced,
// this method manages the protected timestamp state.
// The isBehind argument is used to determine whether an existing protected timestamp
// should be released.
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp,
manageProtected bool,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
isBehind bool,
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
Expand All @@ -1419,16 +1405,21 @@ func (cf *changeFrontier) checkpointJobProgress(
HighWater: &frontier,
}

// Manage protected timestamps.
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if manageProtected {
if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil {
return err
changefeedProgress.Checkpoint = &checkpoint

if shouldProtectTimestamps(cf.flowCtx.Codec()) {
timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional once we're confident in
// ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}
}

changefeedProgress.Checkpoint = &checkpoint

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
Expand All @@ -1447,48 +1438,13 @@ func (cf *changeFrontier) checkpointJobProgress(
})
}

// manageProtectedTimestamps is called when the resolved timestamp is being
// checkpointed. The changeFrontier always checkpoints resolved timestamps
// which occur at scan boundaries. It releases previously protected timestamps
// if the changefeed is not behind. See maybeLogBehindSpan for details on the
// behind calculation.
//
// Note that this function is never called for sinkless changefeeds as they have
// no corresponding job and thus no corresponding distributed state on which to
// attach protected timestamp information.
//
// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds,
// perhaps by using whatever mechanism is eventually built to protect
// data for long-running SQL transactions. There's some discussion of this
// use case in the protected timestamps RFC.
func (cf *changeFrontier) manageProtectedTimestamps(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
txn *kv.Txn,
resolved hlc.Timestamp,
isBehind bool,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil {
return err
}
return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved)
}

// maybeReleaseProtectedTimestamp will release the current protected timestamp
// if either the resolved timestamp is close to the present or we've reached
// a new schemaChangeBoundary which will be protected.
func (cf *changeFrontier) maybeReleaseProtectedTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
isBehind bool,
func (cf *changeFrontier) deprecatedMaybeReleaseProtectedTimestamp(
ctx context.Context, progress *jobspb.ChangefeedProgress, pts protectedts.Storage, txn *kv.Txn,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && isBehind {
if !cf.frontier.schemaChangeBoundaryReached() && cf.isBehind() {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
return nil
}
Expand All @@ -1501,23 +1457,62 @@ func (cf *changeFrontier) maybeReleaseProtectedTimestamp(
return nil
}

// maybeProtectTimestamp creates a new protected timestamp when the
// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates
// that we should perform a backfill (see cf.shouldProtectBoundaries()).
func (cf *changeFrontier) maybeProtectTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
resolved hlc.Timestamp,
// manageProtectedTimestamps periodically advances the protected timestamp for
// the changefeed's targets to the current highwater mark. The record is
// cleared during changefeedResumer.OnFailOrCancel
func (cf *changeFrontier) manageProtectedTimestamps(
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() {
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
return nil
}
cf.lastProtectedTimestampUpdate = timeutil.Now()

pts := cf.flowCtx.Cfg.ProtectedTimestampProvider

// Create / advance the protected timestamp record to the highwater mark
highWater := cf.frontier.Frontier()
if highWater.Less(cf.highWaterAtStart) {
highWater = cf.highWaterAtStart
}

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
} else {
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater)
if err := pts.UpdateTimestamp(ctx, txn, recordID, highWater); err != nil {
return err
}
}

jobID := cf.spec.JobID
targets := cf.spec.Feed.Targets
return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress)
return nil
}

// deprecatedManageProtectedTimestamps only sets a protected timestamp when the
// changefeed is in a backfill or the highwater is lagging behind to a
// sufficient degree after a backfill. This was deprecated in favor of always
// maintaining a timestamp record to avoid issues with a low gcttl setting.
func (cf *changeFrontier) deprecatedManageProtectedTimestamps(
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.deprecatedMaybeReleaseProtectedTimestamp(ctx, progress, pts, txn); err != nil {
return err
}

schemaChangePolicy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
shouldProtectBoundaries := schemaChangePolicy == changefeedbase.OptSchemaChangePolicyBackfill
if cf.frontier.schemaChangeBoundaryReached() && shouldProtectBoundaries {
highWater := cf.frontier.Frontier()
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
return pts.Protect(ctx, txn, ptr)
}
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
Expand All @@ -1536,21 +1531,32 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
return nil
}

// Potentially log the most behind span in the frontier for debugging. The
// returned boolean will be true if the resolved timestamp lags far behind the
// present as defined by the current configuration.
func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind bool) {
func (cf *changeFrontier) isBehind() bool {
frontier := cf.frontier.Frontier()
if frontier.IsEmpty() {
// Do not log potentially confusing "behind" messages when backfilling, but
// consider span(s) behind so that we do not inadvertently release protected timestamp.
// During backfills we consider ourselves "behind" for the purposes of
// maintaining protected timestamps
return true
}

return timeutil.Since(frontier.GoTime()) > cf.slownessThreshold()
}

// Potentially log the most behind span in the frontier for debugging if the
// frontier is behind
func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) {
if !cf.isBehind() {
return
}

// Do not log when we're "behind" due to a backfill
frontier := cf.frontier.Frontier()
if frontier.IsEmpty() {
return
}

now := timeutil.Now()
resolvedBehind := now.Sub(frontier.GoTime())
if resolvedBehind <= cf.slownessThreshold() {
return false
}

description := "sinkless feed"
if !cf.isSinkless() {
Expand All @@ -1565,7 +1571,6 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind boo
s := cf.frontier.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
return true
}

func (cf *changeFrontier) slownessThreshold() time.Duration {
Expand Down Expand Up @@ -1599,12 +1604,6 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// isTenant() bool returns true if this changeFrontier is running on a
// tenant.
func (cf *changeFrontier) isTenant() bool {
return !cf.flowCtx.Codec().ForSystemTenant()
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
Expand Down
Loading

0 comments on commit 238dba4

Please sign in to comment.