From d457b0ccaf64aaafb1fb0cdb24dc081bfe2951f9 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 21 Nov 2023 18:52:03 -0700 Subject: [PATCH] streamingccl: allow CUTOVER TO LATEST before initial scan finishes This patch allows the user to execute ALTER TENANT x COMPLETE REPLICATION TO LATEST before the initial scan completes. After this cmd, the cutover time is set to the replicated start time. Fixes: #114734 Epic: none --- .../streamingest/alter_replication_job.go | 6 ++-- .../streamingest/stream_ingestion_job.go | 4 +++ .../streamingest/testdata/add_early_cutover | 28 +++++++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 27503ccbec7d..baacdc70f86c 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -268,10 +268,10 @@ func alterTenantJobCutover( if alterTenantStmt.Cutover.Latest { replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) if replicatedTime.IsEmpty() { - return hlc.Timestamp{}, - errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName) + cutoverTime = details.ReplicationStartTime + } else { + cutoverTime = replicatedTime } - cutoverTime = replicatedTime } // TODO(ssd): We could use the replication manager here, but diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index d8b952e1001b..95e4647b680c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -279,6 +279,10 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{} return s.handleResumeError(ctx, jobExecCtx, err) } + if err := jobExecCtx.ExecCfg().JobRegistry.CheckPausepoint("stream_ingestion.before_ingestion"); err != nil { + return err + } + // Start ingesting KVs from the replication stream. err = ingestWithRetries(ctx, jobExecCtx, s) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover b/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover new file mode 100644 index 000000000000..62e998bea81a --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover @@ -0,0 +1,28 @@ +# This test ensures 1) the user can set a cutover before the initial scan completes; 2) cannot set a +# cutover time before the replicatedStartTime. + +create-replication-clusters +---- + +exec-sql as=destination-system +SET CLUSTER SETTING jobs.debug.pausepoints = 'stream_ingestion.before_ingestion'; +---- + +let $pre as=source-system +SELECT clock_timestamp()::timestamp::string +---- + +start-replication-stream +---- + +job as=destination-system wait-for-state=paused +---- + +query-sql as=destination-system regex-error=(.*before earliest safe cutover.*) +ALTER TENANT "destination" COMPLETE REPLICATION TO SYSTEM TIME '$pre' +---- + +exec-sql as=destination-system +ALTER TENANT "destination" COMPLETE REPLICATION TO LATEST +---- +