Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: write and manage PTS on the destination tenant #92336

Merged
merged 1 commit into from
Nov 23, 2022

Conversation

adityamaru
Copy link
Contributor

During C2C replication as the destination tenant is ingesting KVs we must protect a certain window of MVCC revisions from garbage collection so that the user can cutover to any of the timestamps that lie within this window.

To this effect we introduce a ReplicationTTLSeconds field to the replication job payload that governs the size of this window relative to the replication job's highwatermark (frontier timestamp). On the first resumption of the replication job we write a protected timestamp record on the destination tenant's keyspace protecting all revisions above now(). As the replication job updates its highwatermark, the PTS record is pulled up to protect above highWatermark - ReplicationTTLSeconds. This active management of the PTS always ensures that users can cutover to any time in (highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are gradually made eligible for GC as the frontier progresses.

The PTS is released if the replication job fails or is cancelled.

Fixes: #92093

Release note: None

@adityamaru adityamaru requested a review from a team as a code owner November 22, 2022 15:51
@adityamaru adityamaru requested a review from a team November 22, 2022 15:51
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Comment on lines +374 to +387
err := s.protectDestinationTenant(resumeCtx, execCtx)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Right now cutover-after-failure requires that the user be able to Resume and reach the cutover code. So we need to be careful about all of the code between Resume and the cutover code in ingest.

I believe that should be OK in this case since we early out in the function we are calling if we already have a pts record, which we should have.

Copy link
Contributor Author

@adityamaru adityamaru Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't realize this. I wonder if we need to refactor Resume to run a different code path if it's being resumed after a pause due to failure. This way we minimize what we do between resumption and checking for cutover. But, yes I don't think we've made things worse because of the reason you described above.

pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go Outdated Show resolved Hide resolved
// https://github.com/cockroachdb/cockroach/issues/92093 is
// done, we should be able to simply rely on the protected
// timestamp for the replication job.
jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can nuke tenantHasActiveReplicationJob as well.

We may want to beef up the tests around this garbage collection waiting, but that doesn't need to happen in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back when we were rewriting PTS we added a bunch of tests in sqlccl/tenant_gc_test.go such as TestGCTenantJobWaitsForProtectedTimestamps but yeah we might want to add some tests specifically around C2C PTS management and the GC job.

@adityamaru adityamaru force-pushed the destination-cluster-pts branch from f608bd5 to 7a649a2 Compare November 23, 2022 03:39
During C2C replication as the destination tenant is ingesting KVs
we must protect a certain window of MVCC revisions from garbage collection
so that the user can cutover to any of the timestamps that lie within
this window.

To this effect we introduce a `ReplicationTTLSeconds` field to the
replication job payload that governs the size of this window relative
to the replication job's highwatermark (frontier timestamp). On the first
resumption of the replication job we write a protected timestamp record
on the destination tenant's keyspace protecting all revisions above `now()`.
As the replication job updates its highwatermark, the PTS record is pulled
up to protect above `highWatermark - ReplicationTTLSeconds`. This active management
of the PTS always ensures that users can cutover to any time in
(highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are
gradually made eligible for GC as the frontier progresses.

The PTS is released if the replication job fails or is cancelled.

Fixes: cockroachdb#92093

Release note: None
@adityamaru adityamaru force-pushed the destination-cluster-pts branch from 7a649a2 to 721cc76 Compare November 23, 2022 17:43
@adityamaru
Copy link
Contributor Author

TFTR!

bors r=stevendanna

Copy link
Contributor

@lidorcarmel lidorcarmel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PTS is released if the replication job fails or is cancelled.

  1. do we allow cancelling the job? I think we said we don't, instead we only let you drop the dest tenant, right?
  2. failure is interesting, do we think we can resume the job? if not, then maybe we should keep the PTS? after all, there is no data coming into this cluster anyway, so why not freeze everything until the tenant is dropped or the job is restarted?

this and my other comments are not critical so please keep merging..

@@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.

we don't really need to discuss it here but are we sure we want that?

say we had a 25h PTS on the destination, and now we did a cutover, if we release the PTS we will start gc-ing a ton of bytes which may get the cluster in trouble.

I guess we cannot keep the PTS trailing at now-25h and let humans change the PTS when it makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid concern but I'm not sure it's one the replication job should be responsible for solving. Once the cutover is successful the replication job ceases to exist and we will go back to respecting the span config that applies to the tenant keyspace. In an ideal work we make the uncorking of GC not as chaos-inducing as it is today, but if the user does not wish to have a flurry of GC activity they can either:

  1. Configure the GC TTL on the tenant keyspan to be 25h so that even after cutover we do not suddenly have a large amount of data eligible for GC.

  2. Run the replication stream with a smaller retention window once we make this ReplicationTTLSeconds a user-configurable value.

I do agree that this is something we will need to document explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good points! thanks.

if err := ptp.Release(ctx, txn, ptsID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// No reason to return an error which might cause problems if it doesn't
// seem to exist.

nit: will anything break if we return the error? sometimes we might hide bugs when swallowing errors.. so I'd drop this (and return the error) unless this is really needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return an error here then that will bubble up to OnFailOrCancel which is a method the job registry does not allow to fail. In other words, we will retry OnFailOrCancel endlessly (with backoff) until it succeeds which it will never since it won't be able to find the record. A reason why we might not find the record is if OnFailOrCancel gets retried causing us to invoke this method more than once in separate txns. The expectation is that all operations in Resume and OnFailOrCancel should be idempotent and so we safeguard against multiple invocations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this definitely makes sense in that case. thanks for explaining!

@craig
Copy link
Contributor

craig bot commented Nov 23, 2022

Build succeeded:

@craig craig bot merged commit b04c3b0 into cockroachdb:master Nov 23, 2022
@adityamaru
Copy link
Contributor Author

do we allow cancelling the job?

I believe we do at least for now. CANCEL JOB <replication-job-id> would cancel the replication job.

I think we said we don't, instead we only let you drop the dest tenant, right?

That's the desired UX, but DROP TENANT internally cancels the replication job associated with the tenant being dropped, so we still need to release the PTS as part of the OnFailOrCancel method invoked on job cancellation. If we do not release the PTS then the GC job that is queued as part of the DROP TENANT to clean up the tenant's keyspace will forever wait for the protected timestamp to be removed.

failure is interesting, do we think we can resume the job? if not, then maybe we should keep the PTS? after all, there is no data coming into this cluster anyway, so why not freeze everything until the tenant is dropped or the job is restarted?

Yup, you're right we don't actually fail the job -

return s.handleResumeError(resumeCtx, execCtx, err)
. All errors during replication job execution are "handled" and the replication job is moved into a paused state. This means that the OnFailOrCancel hook will not be run as the job goes from running to paused on failure. Consequently the PTS is not released and so the user can resume the replication job and will find the PTS still present and holding up GC.

@adityamaru adityamaru deleted the destination-cluster-pts branch November 23, 2022 21:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

c2c: write a protected timestamp on the destination cluster
4 participants