From 721cc761de83e1424de2ec377c6078983ea11e4f Mon Sep 17 00:00:00 2001 From: adityamaru Date: Tue, 22 Nov 2022 10:44:34 -0500 Subject: [PATCH] streamingest: write and manage PTS on the destination tenant During C2C replication as the destination tenant is ingesting KVs we must protect a certain window of MVCC revisions from garbage collection so that the user can cutover to any of the timestamps that lie within this window. To this effect we introduce a `ReplicationTTLSeconds` field to the replication job payload that governs the size of this window relative to the replication job's highwatermark (frontier timestamp). On the first resumption of the replication job we write a protected timestamp record on the destination tenant's keyspace protecting all revisions above `now()`. As the replication job updates its highwatermark, the PTS record is pulled up to protect above `highWatermark - ReplicationTTLSeconds`. This active management of the PTS always ensures that users can cutover to any time in (highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are gradually made eligible for GC as the frontier progresses. The PTS is released if the replication job fails or is cancelled. Fixes: #92093 Release note: None --- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 5 + .../stream_ingestion_frontier_processor.go | 21 +++ .../streamingest/stream_ingestion_job.go | 87 ++++++++- .../streamingest/stream_ingestion_planning.go | 6 + .../stream_replication_e2e_test.go | 171 ++++++++++++++++++ pkg/jobs/jobspb/jobs.proto | 17 ++ pkg/sql/exec_util.go | 4 + pkg/sql/gcjob/refresh_statuses.go | 43 ----- 8 files changed, 310 insertions(+), 44 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 05c03fdad4df..cb4d4369a466 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -24,9 +24,12 @@ go_library( "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", "//pkg/kv/bulk", + "//pkg/kv/kvserver/protectedts", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/repstream", "//pkg/repstream/streampb", "//pkg/roachpb", @@ -58,6 +61,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", @@ -95,6 +99,7 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/protectedts", "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/securityassets", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 2609c40d40b4..c71fb79373fd 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { f := sf.frontier registry := sf.flowCtx.Cfg.JobRegistry jobID := jobspb.JobID(sf.spec.JobID) + ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0) f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { if md.RunStats != nil && md.RunStats.NumRuns > 1 { ju.UpdateRunStats(1, md.RunStats.LastRun) } + + // Update the protected timestamp record protecting the destination tenant's + // keyspan if the highWatermark has moved forward since the last time we + // recorded progress. This makes older revisions of replicated values with a + // timestamp less than highWatermark - ReplicationTTLSeconds, eligible for + // garbage collection. + replicationDetails := md.Payload.GetStreamIngestion() + if replicationDetails.ProtectedTimestampRecordID == nil { + return errors.AssertionFailedf("expected replication job to have a protected timestamp " + + "record over the destination tenant's keyspan") + } + record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + newProtectAbove := highWatermark.Add( + -int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0) + if record.Timestamp.Less(newProtectAbove) { + return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove) + } return nil }); err != nil { return err diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index a5351939be10..b3e1478b159a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -17,7 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -29,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil { log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID) } + + // Now that we have completed the cutover we can release the protected + // timestamp record on the destination tenant's keyspace. + if details.ProtectedTimestampRecordID != nil { + if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID) + }); err != nil { + return err + } + } + return nil } return errors.CombineErrors(ingestWithClient(), client.Close(ctx)) @@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError( // Resume is part of the jobs.Resumer interface. Ensure that any errors // produced here are returned as s.handleResumeError. func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { + // Protect the destination tenant's keyspan from garbage collection. + err := s.protectDestinationTenant(resumeCtx, execCtx) + if err != nil { + return s.handleResumeError(resumeCtx, execCtx, err) + } + // Start ingesting KVs from the replication stream. - err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) + err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) if err != nil { return s.handleResumeError(resumeCtx, execCtx, err) } return nil } +func releaseDestinationTenantProtectedTimestamp( + ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID, +) error { + jobExecCtx := execCtx.(sql.JobExecContext) + ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider + if err := ptp.Release(ctx, txn, ptsID); err != nil { + if errors.Is(err, protectedts.ErrNotExists) { + // No reason to return an error which might cause problems if it doesn't + // seem to exist. + log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err) + err = nil + } + return err + } + return nil +} + +// protectDestinationTenant writes a protected timestamp record protecting the +// destination tenant's keyspace from garbage collection. This protected +// timestamp record is updated everytime the replication job records a new +// frontier timestamp, and is released OnFailOrCancel. +// +// The method persists the ID of the protected timestamp record in the +// replication job's Payload. +func (s *streamIngestionResumer) protectDestinationTenant( + ctx context.Context, execCtx interface{}, +) error { + details := s.job.Details().(jobspb.StreamIngestionDetails) + + // If we have already protected the destination tenant keyspan in a previous + // resumption of the stream ingestion job, then there is nothing to do. + if details.ProtectedTimestampRecordID != nil { + return nil + } + + execCfg := execCtx.(sql.JobExecContext).ExecCfg() + target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID}) + ptsID := uuid.MakeV4() + now := execCfg.Clock.Now() + return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now, + nil /* deprecatedSpans */, jobsprotectedts.Jobs, target) + if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil { + return err + } + return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + details.ProtectedTimestampRecordID = &ptsID + md.Payload.Details = jobspb.WrapPayloadDetails(details) + ju.UpdatePayload(md.Payload) + return nil + }) + }) +} + // revertToCutoverTimestamp attempts a cutover and errors out if one was not // executed. func revertToCutoverTimestamp( @@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel( return errors.Wrap(err, "update tenant record") } + if details.ProtectedTimestampRecordID != nil { + if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, + *details.ProtectedTimestampRecordID); err != nil { + return err + } + } + return nil }) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index e49b976c6987..b6cf803b59bf 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -169,6 +169,11 @@ func ingestionPlanHook( } prefix := keys.MakeTenantPrefix(destinationTenantID) + // TODO(adityamaru): Wire this up to the user configurable option. + replicationTTLSeconds := 25 * 60 * 60 + if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 { + replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds + } streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: string(streamAddress), StreamID: uint64(streamID), @@ -176,6 +181,7 @@ func ingestionPlanHook( DestinationTenantID: destinationTenantID, SourceTenantName: roachpb.TenantName(sourceTenant), DestinationTenantName: roachpb.TenantName(destinationTenant), + ReplicationTTLSeconds: int32(replicationTTLSeconds), } jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index fb128c123ed6..9438c3f2a77b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -23,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -768,6 +770,9 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { defer cleanup() producerJobID, ingestionJobID := c.startStreamReplication() + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';") + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) @@ -1073,3 +1078,169 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { // Since the data was distributed across multiple nodes, multiple nodes should've been connected to require.Greater(t, len(clientAddresses), 1) } + +// TestTenantReplicationProtectedTimestampManagement tests the active protected +// timestamps management on the destination tenant's keyspan. +func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + // Override the replication job details ReplicationTTLSeconds to a small value + // so that every progress update results in a protected timestamp update. + // + // TODO(adityamaru): Once this is wired up to be user configurable via an + // option to `CREATE TENANT ... FROM REPLICATION` we should replace this + // testing knob with a create tenant option. + args.testingKnobs = &sql.StreamingTestingKnobs{ + OverrideReplicationTTLSeconds: 1, + } + + testProtectedTimestampManagement := func(t *testing.T, pauseBeforeTerminal bool, completeReplication bool) { + // waitForProducerProtection asserts that there is a PTS record protecting + // the source tenant. We ensure the PTS record is protecting a timestamp + // greater or equal to the frontier we know we have replicated up until. + waitForProducerProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) { + testutils.SucceedsSoon(t, func() error { + stats := streamIngestionStats(t, c.destSysSQL, replicationJobID) + if stats.ProducerStatus == nil { + return errors.New("nil ProducerStatus") + } + if stats.ProducerStatus.ProtectedTimestamp == nil { + return errors.New("nil ProducerStatus.ProtectedTimestamp") + } + pts := *stats.ProducerStatus.ProtectedTimestamp + if pts.Less(frontier) { + return errors.Newf("protection is at %s, expected to be >= %s", + pts.String(), frontier.String()) + } + return nil + }) + } + + // checkNoDestinationProtections asserts that there is no PTS record + // protecting the destination tenant. + checkNoDestinationProtection := func(c *tenantStreamingClusters, replicationJobID int) { + execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig) + require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) + require.NoError(t, err) + payload := j.Payload() + replicationDetails := payload.GetStreamIngestion() + _, err = execCfg.ProtectedTimestampProvider.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + require.EqualError(t, err, protectedts.ErrNotExists.Error()) + return nil + })) + } + checkDestinationProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) { + execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig) + ptp := execCfg.ProtectedTimestampProvider + require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) + if err != nil { + return err + } + payload := j.Payload() + progress := j.Progress() + replicationDetails := payload.GetStreamIngestion() + + require.NotNil(t, replicationDetails.ProtectedTimestampRecordID) + rec, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + require.True(t, frontier.LessEq(*progress.GetHighWater())) + frontier := progress.GetHighWater().GoTime().Round(time.Millisecond) + window := frontier.Sub(rec.Timestamp.GoTime().Round(time.Millisecond)) + require.Equal(t, time.Second, window) + return nil + })) + } + + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';") + + producerJobID, replicationJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + + // Ensure that we wait at least a second so that the gap between the first + // time we write the protected timestamp (t1) during replication job + // startup, and the first progress update (t2) is greater than 1s. This is + // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we + // will not update the PTS record. + now := c.srcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0) + c.waitUntilHighWatermark(now, jobspb.JobID(replicationJobID)) + + // Check that the producer and replication job have written a protected + // timestamp. + waitForProducerProtection(c, now, replicationJobID) + checkDestinationProtection(c, now, replicationJobID) + + now2 := now.Add(time.Second.Nanoseconds(), 0) + c.waitUntilHighWatermark(now2, jobspb.JobID(replicationJobID)) + // Let the replication progress for a second before checking that the + // protected timestamp record has also been updated on the destination + // cluster. This update happens in the same txn in which we update the + // replication job's progress. + waitForProducerProtection(c, now2, replicationJobID) + checkDestinationProtection(c, now2, replicationJobID) + + if pauseBeforeTerminal { + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", replicationJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + } + + if completeReplication { + c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", replicationJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + var cutoverTime time.Time + c.destSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) + c.cutover(producerJobID, replicationJobID, cutoverTime) + jobutils.WaitForJobToSucceed(t, c.destSysSQL, jobspb.JobID(replicationJobID)) + } + + // Set GC TTL low, so that the GC job completes quickly in the test. + c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") + c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName)) + + if !completeReplication { + jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + } + + // Check if the producer job has released protected timestamp. + stats := streamIngestionStats(t, c.destSysSQL, replicationJobID) + require.NotNil(t, stats.ProducerStatus) + require.Nil(t, stats.ProducerStatus.ProtectedTimestamp) + + // Check if the replication job has released protected timestamp. + checkNoDestinationProtection(c, replicationJobID) + + // Wait for the GC job to finish, this should happen once the protected + // timestamp has been released. + c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'") + + // Check if dest tenant key range is cleaned up. + destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID) + rows, err := c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.Empty(t, rows) + + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"0"}}) + } + + testutils.RunTrueAndFalse(t, "pause-before-terminal", func(t *testing.T, pauseBeforeTerminal bool) { + testutils.RunTrueAndFalse(t, "complete-replication", func(t *testing.T, completeReplication bool) { + testProtectedTimestampManagement(t, pauseBeforeTerminal, completeReplication) + }) + }) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index efd8d3ce55bf..57eb23fd566d 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -97,6 +97,23 @@ message StreamIngestionDetails { (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + // ID of the protected timestamp record that protects the destination tenant's + // keyspan from GC while it is being replicated into. + bytes protected_timestamp_record_id = 10 [ + (gogoproto.customname) = "ProtectedTimestampRecordID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + // ReplicationTTLSeconds specifies the maximum age of a value relative to the + // replication job's frontier timestamp, before the value is made eligible for + // garbage collection. Note, only older versions of values are eligible for + // GC. All values newer than this maximum age will be protected from GC by a + // protected timestamp record managed by the replication job. + // + // In other words, the `replication job's frontier timestamp - ReplicationTTLSeconds` + // is the earliest timestamp that the replication job can be cut-over to. + int32 replication_ttl_seconds = 11 [(gogoproto.customname) = "ReplicationTTLSeconds"]; + reserved 5, 6; } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index af4bd70c2649..93008e486eec 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1625,6 +1625,10 @@ type StreamingTestingKnobs struct { // BeforeIngestionStart allows blocking the stream ingestion job // before a stream ingestion happens. BeforeIngestionStart func(ctx context.Context) error + + // OverrideReplicationTTLSeconds will override the default value of the + // `ReplicationTTLSeconds` field on the StreamIngestion job details. + OverrideReplicationTTLSeconds int } var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{} diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 85190cc8056f..9d44d30ec1e5 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -29,8 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -461,19 +458,6 @@ func refreshTenant( } tenID := details.Tenant.ID - // TODO(ssd): Once - // https://github.com/cockroachdb/cockroach/issues/92093 is - // done, we should be able to simply rely on the protected - // timestamp for the replication job. - jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID) - if err != nil { - return false, time.Time{}, err - } - if jobActive { - log.Infof(ctx, "tenant %d has active tenant replication job, waiting for it to stop before running GC", tenID) - return false, timeutil.Now().Add(MaxSQLGCInterval), nil - } - // Read the tenant's GC TTL to check if the tenant's data has expired. cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds @@ -507,30 +491,3 @@ func refreshTenant( } return false, deadlineUnix, nil } - -func tenantHasActiveReplicationJob( - ctx context.Context, execCfg *sql.ExecutorConfig, tenID uint64, -) (bool, error) { - info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MustMakeTenantID(tenID)) - if err != nil { - if pgerror.GetPGCode(err) == pgcode.UndefinedObject { - log.Errorf(ctx, "tenant id %d not found while attempting to GC", tenID) - return false, nil - } else { - return false, errors.Wrapf(err, "fetching tenant %d", tenID) - } - } - if jobID := info.TenantReplicationJobID; jobID != 0 { - j, err := execCfg.JobRegistry.LoadJob(ctx, jobID) - if err != nil { - if errors.Is(err, &jobs.JobNotFoundError{}) { - log.Infof(ctx, "tenant replication job %d not found", jobID) - return false, nil - } else { - return false, err - } - } - return !j.Status().Terminal(), nil - } - return false, err -}