Skip to content

Commit

Permalink
streamingccl: allow CUTOVER TO LATEST before initial scan finishes
Browse files Browse the repository at this point in the history
This patch allows the user to execute ALTER TENANT x COMPLETE REPLICATION TO
LATEST before the initial scan completes. After this cmd, the cutover time is
set to the replicated start time.

Fixes: cockroachdb#114734

Epic: none
  • Loading branch information
msbutler committed Nov 28, 2023
1 parent 37df486 commit bcdc589
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ func alterTenantJobCutover(
if alterTenantStmt.Cutover.Latest {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
if replicatedTime.IsEmpty() {
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
cutoverTime = details.ReplicationStartTime
} else {
cutoverTime = replicatedTime
}
cutoverTime = replicatedTime
}

// TODO(ssd): We could use the replication manager here, but
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return s.handleResumeError(ctx, jobExecCtx, err)
}

if err := jobExecCtx.ExecCfg().JobRegistry.CheckPausepoint("stream_ingestion.before_ingestion"); err != nil {
return err
}

// Start ingesting KVs from the replication stream.
err = ingestWithRetries(ctx, jobExecCtx, s)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This test ensures 1) the user can set a cutover before the initial scan completes; 2) cannot set a
# cutover time before the replicatedStartTime.

create-replication-clusters
----

exec-sql as=destination-system
SET CLUSTER SETTING jobs.debug.pausepoints = 'stream_ingestion.before_ingestion';
----

let $pre as=source-system
SELECT clock_timestamp()::timestamp::string
----

start-replication-stream
----

job as=destination-system wait-for-state=paused
----

query-sql as=destination-system regex-error=(.*before earliest safe cutover.*)
ALTER TENANT "destination" COMPLETE REPLICATION TO SYSTEM TIME '$pre'
----

exec-sql as=destination-system
ALTER TENANT "destination" COMPLETE REPLICATION TO LATEST
----

0 comments on commit bcdc589

Please sign in to comment.