Skip to content

Commit

Permalink
streamingest: write and manage PTS on the destination tenant
Browse files Browse the repository at this point in the history
During C2C replication as the destination tenant is ingesting KVs
we must protect a certain window of MVCC revisions from garbage collection
so that the user can cutover to any of the timestamps that lie within
this window.

To this effect we introduce a `ReplicationTTLSeconds` field to the
replication job payload that governs the size of this window relative
to the replication job's highwatermark (frontier timestamp). On the first
resumption of the replication job we write a protected timestamp record
on the destination tenant's keyspace protecting all revisions above `now()`.
As the replication job updates its highwatermark, the PTS record is pulled
up to protect above `highWatermark - ReplicationTTLSeconds`. This active management
of the PTS always ensures that users can cutover to any time in
(highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are
gradually made eligible for GC as the frontier progresses.

The PTS is released if the replication job fails or is cancelled.

Fixes: #92093

Release note: None
  • Loading branch information
adityamaru committed Nov 23, 2022
1 parent 5ebb4ca commit 721cc76
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 44 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ go_library(
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -58,6 +61,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -95,6 +99,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
f := sf.frontier
registry := sf.flowCtx.Cfg.JobRegistry
jobID := jobspb.JobID(sf.spec.JobID)
ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider

frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0)
f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
Expand Down Expand Up @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

// Update the protected timestamp record protecting the destination tenant's
// keyspan if the highWatermark has moved forward since the last time we
// recorded progress. This makes older revisions of replicated values with a
// timestamp less than highWatermark - ReplicationTTLSeconds, eligible for
// garbage collection.
replicationDetails := md.Payload.GetStreamIngestion()
if replicationDetails.ProtectedTimestampRecordID == nil {
return errors.AssertionFailedf("expected replication job to have a protected timestamp " +
"record over the destination tenant's keyspan")
}
record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
newProtectAbove := highWatermark.Add(
-int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0)
if record.Timestamp.Less(newProtectAbove) {
return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove)
}
return nil
}); err != nil {
return err
Expand Down
87 changes: 86 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.
if details.ProtectedTimestampRecordID != nil {
if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID)
}); err != nil {
return err
}
}

return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
Expand Down Expand Up @@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError(
// Resume is part of the jobs.Resumer interface. Ensure that any errors
// produced here are returned as s.handleResumeError.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
// Protect the destination tenant's keyspan from garbage collection.
err := s.protectDestinationTenant(resumeCtx, execCtx)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}

// Start ingesting KVs from the replication stream.
err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
return nil
}

func releaseDestinationTenantProtectedTimestamp(
ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID,
) error {
jobExecCtx := execCtx.(sql.JobExecContext)
ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider
if err := ptp.Release(ctx, txn, ptsID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err)
err = nil
}
return err
}
return nil
}

// protectDestinationTenant writes a protected timestamp record protecting the
// destination tenant's keyspace from garbage collection. This protected
// timestamp record is updated everytime the replication job records a new
// frontier timestamp, and is released OnFailOrCancel.
//
// The method persists the ID of the protected timestamp record in the
// replication job's Payload.
func (s *streamIngestionResumer) protectDestinationTenant(
ctx context.Context, execCtx interface{},
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)

// If we have already protected the destination tenant keyspan in a previous
// resumption of the stream ingestion job, then there is nothing to do.
if details.ProtectedTimestampRecordID != nil {
return nil
}

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID})
ptsID := uuid.MakeV4()
now := execCfg.Clock.Now()
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now,
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target)
if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil {
return err
}
return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
details.ProtectedTimestampRecordID = &ptsID
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)
return nil
})
})
}

// revertToCutoverTimestamp attempts a cutover and errors out if one was not
// executed.
func revertToCutoverTimestamp(
Expand Down Expand Up @@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel(
return errors.Wrap(err, "update tenant record")
}

if details.ProtectedTimestampRecordID != nil {
if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn,
*details.ProtectedTimestampRecordID); err != nil {
return err
}
}

return nil
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(streamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Loading

0 comments on commit 721cc76

Please sign in to comment.