crdb_internal.complete_replication_stream(stream_id: int) → int | This function can be used on the producer side to complete and clean up a replication stream after the consumer receives a cutover event and finishes the ingestion
+crdb_internal.complete_replication_stream(stream_id: int, successful_ingestion: bool) → int | This function can be used on the producer side to complete and clean up a replication stream.‘successful_ingestion’ indicates whether the stream ingestion finished successfully.
| Volatile |
crdb_internal.complete_stream_ingestion_job(job_id: int, cutover_ts: timestamptz) → int | This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.
| Volatile |
diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go
index 0cbf9999051b..e85a76bd1e51 100644
--- a/pkg/ccl/streamingccl/streamclient/client.go
+++ b/pkg/ccl/streamingccl/streamclient/client.go
@@ -85,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error
// Complete completes a replication stream consumption.
- Complete(ctx context.Context, streamID streaming.StreamID) error
+ Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
}
// Topology is a configuration of stream partitions. These are particular to a
diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go
index 53b2dfb92dca..5b98977bd338 100644
--- a/pkg/ccl/streamingccl/streamclient/client_test.go
+++ b/pkg/ccl/streamingccl/streamclient/client_test.go
@@ -92,7 +92,9 @@ func (sc testStreamClient) Subscribe(
}
// Complete implements the streamclient.Client interface.
-func (sc testStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
+func (sc testStreamClient) Complete(
+ ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
+) error {
return nil
}
diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
index ab83c4d49279..be23beaedc6e 100644
--- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
+++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
@@ -214,13 +214,16 @@ func (p *partitionedStreamClient) Subscribe(
}
// Complete implements the streamclient.Client interface.
-func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
+func (p *partitionedStreamClient) Complete(
+ ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
+) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()
p.mu.Lock()
defer p.mu.Unlock()
- row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
+ row := p.mu.srcConn.QueryRow(ctx,
+ `SELECT crdb_internal.complete_replication_stream($1, $2)`, streamID, successfulIngestion)
if err := row.Scan(&streamID); err != nil {
return errors.Wrapf(err, "error completing replication stream %d", streamID)
}
diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
index e510d40ed861..5b47817e7d90 100644
--- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
+++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
@@ -203,7 +203,7 @@ INSERT INTO d.t2 VALUES (2);
require.True(t, errors.Is(err, context.Canceled) || isQueryCanceledError(err))
// Testing client.Complete()
- err = client.Complete(ctx, streaming.StreamID(999))
+ err = client.Complete(ctx, streaming.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)
// Makes producer job exit quickly.
@@ -212,7 +212,7 @@ SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'
`)
streamID, err = client.Create(ctx, tenant.ID)
require.NoError(t, err)
- require.NoError(t, client.Complete(ctx, streamID))
+ require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
}
diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go
index abedd379758f..e413513f8259 100644
--- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go
+++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go
@@ -512,7 +512,9 @@ func (m *randomStreamClient) Subscribe(
}
// Complete implements the streamclient.Client interface.
-func (m *randomStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
+func (m *randomStreamClient) Complete(
+ ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
+) error {
return nil
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
index dd19cd8f630f..42e6dfa71b8c 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
@@ -135,6 +135,42 @@ func connectToActiveClient(
return client, errors.Wrapf(err, "ingestion job %d failed to connect to stream address or existing topology for planning", ingestionJob.ID())
}
+// Ping the producer job and waits until it is active/running, returns nil when
+// the job is active.
+func waitUntilProducerActive(
+ ctx context.Context,
+ client streamclient.Client,
+ streamID streaming.StreamID,
+ heartbeatTimestamp hlc.Timestamp,
+ ingestionJobID jobspb.JobID,
+) error {
+ ro := retry.Options{
+ InitialBackoff: 1 * time.Second,
+ Multiplier: 2,
+ MaxBackoff: 5 * time.Second,
+ MaxRetries: 4,
+ }
+ // Make sure the producer job is active before start the stream replication.
+ var status streampb.StreamReplicationStatus
+ var err error
+ for r := retry.Start(ro); r.Next(); {
+ status, err = client.Heartbeat(ctx, streamID, heartbeatTimestamp)
+ if err != nil {
+ return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
+ ingestionJobID)
+ }
+ if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
+ break
+ }
+ log.Warningf(ctx, "producer job %d has status %s, retrying", streamID, status.StreamStatus)
+ }
+ if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
+ return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
+ "and in status %s", ingestionJobID, status.StreamStatus)
+ }
+ return nil
+}
+
func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
progress := ingestionJob.Progress()
@@ -154,29 +190,8 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
return err
}
ingestWithClient := func() error {
- ro := retry.Options{
- InitialBackoff: 1 * time.Second,
- Multiplier: 2,
- MaxBackoff: 10 * time.Second,
- MaxRetries: 5,
- }
- // Make sure the producer job is active before start the stream replication.
- var status streampb.StreamReplicationStatus
- for r := retry.Start(ro); r.Next(); {
- status, err = client.Heartbeat(ctx, streamID, startTime)
- if err != nil {
- return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
- ingestionJob.ID())
- }
- if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
- break
- }
- log.Warningf(ctx,
- "producer job %d has status %s, retrying", streamID, status.StreamStatus)
- }
- if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
- return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
- "and in status %s", ingestionJob.ID(), status.StreamStatus)
+ if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil {
+ return err
}
log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID)
@@ -246,8 +261,11 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
}
log.Infof(ctx, "starting to complete the producer job %d", streamID)
- // Completes the producer job in the source cluster.
- return client.Complete(ctx, streamID)
+ // Completes the producer job in the source cluster on best effort.
+ if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
+ log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
+ }
+ return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
}
@@ -349,6 +367,28 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
})
}
+func (s *streamIngestionResumer) cancelProducerJob(
+ ctx context.Context, details jobspb.StreamIngestionDetails,
+) {
+ streamID := streaming.StreamID(details.StreamID)
+ addr := streamingccl.StreamAddress(details.StreamAddress)
+ client, err := streamclient.NewStreamClient(ctx, addr)
+ if err != nil {
+ log.Warningf(ctx, "encountered error when creating the stream client: %v", err)
+ return
+ }
+ log.Infof(ctx, "canceling the producer job %d as stream ingestion job %d is being canceled",
+ streamID, s.job.ID())
+ if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil {
+ log.Warningf(ctx, "encountered error when canceling the producer job: %v", err)
+ fmt.Println("canceled failure", err)
+ }
+ fmt.Println("cancel sent")
+ if err = client.Close(ctx); err != nil {
+ log.Warningf(ctx, "encountered error when closing the stream client: %v", err)
+ }
+}
+
// OnFailOrCancel is part of the jobs.Resumer interface.
// There is a know race between the ingestion processors shutting down, and
// OnFailOrCancel being invoked. As a result of which we might see some keys
@@ -358,7 +398,12 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
// TODO(adityamaru): Add ClearRange logic once we have introduced
// synchronization between the flow tearing down and the job transitioning to a
// failed/canceled state.
-func (s *streamIngestionResumer) OnFailOrCancel(_ context.Context, _ interface{}) error {
+func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
+ // Cancel the producer job on best effort. The source job's protected timestamp is no
+ // longer needed as this ingestion job is in 'reverting' status and we won't resume
+ // ingestion anymore.
+ details := s.job.Details().(jobspb.StreamIngestionDetails)
+ s.cancelProducerJob(ctx, details)
return nil
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
index 2247096a1890..d3ff10993c3f 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
@@ -143,7 +143,9 @@ func (m *mockStreamClient) Close(ctx context.Context) error {
}
// Complete implements the streamclient.Client interface.
-func (m *mockStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
+func (m *mockStreamClient) Complete(
+ ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
+) error {
return nil
}
@@ -163,7 +165,9 @@ func (m *errorStreamClient) Subscribe(
}
// Complete implements the streamclient.Client interface.
-func (m *errorStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
+func (m *errorStreamClient) Complete(
+ ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
+) error {
return nil
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
index 5298544e1638..fcc01d920e1d 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
@@ -94,18 +94,32 @@ type tenantStreamingClusters struct {
destCluster *testcluster.TestCluster
destSysServer serverutils.TestServerInterface
destSysSQL *sqlutils.SQLRunner
+ destTenantSQL *sqlutils.SQLRunner
}
+// Creates a dest tenant SQL runner and returns a cleanup function that shuts
+// tenant SQL instance and closes all sessions.
// This function will fail the test if ran prior to the Replication stream
// closing as the tenant will not yet be active
-func (c *tenantStreamingClusters) getDestTenantSQL() *sqlutils.SQLRunner {
- _, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer, base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
- return sqlutils.MakeSQLRunner(destTenantConn)
+func (c *tenantStreamingClusters) createDestTenantSQL(ctx context.Context) func() error {
+ testTenant, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer,
+ base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
+ c.destTenantSQL = sqlutils.MakeSQLRunner(destTenantConn)
+ return func() error {
+ if err := destTenantConn.Close(); err != nil {
+ return err
+ }
+ testTenant.Stopper().Stop(ctx)
+ return nil
+ }
}
+// This function has to be called after createTenantSQL.
func (c *tenantStreamingClusters) compareResult(query string) {
+ require.NotNil(c.t, c.destTenantSQL,
+ "destination tenant SQL runner should be created first")
sourceData := c.srcTenantSQL.QueryStr(c.t, query)
- destData := c.getDestTenantSQL().QueryStr(c.t, query)
+ destData := c.destTenantSQL.QueryStr(c.t, query)
require.Equal(c.t, sourceData, destData)
}
@@ -342,6 +356,11 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
c.cutover(producerJobID, ingestionJobID, cutoverTime)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
+ cleanupTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanupTenant())
+ }()
+
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
c.compareResult("SELECT * FROM d.x")
@@ -351,7 +370,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
})
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
- require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2"))
+ require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2"))
}
func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
@@ -364,7 +383,6 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
- // initial scan
producerJobID, ingestionJobID := c.startStreamReplication()
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
@@ -373,6 +391,10 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))
+ cleanupTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanupTenant())
+ }()
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
@@ -399,7 +421,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
require.Equal(t, [][]string{{"0"}},
- c.getDestTenantSQL().QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3"))
+ c.destTenantSQL.QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3"))
// After resumed, the ingestion job paused on failure again.
c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
@@ -423,6 +445,11 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))
+ cleanupTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanupTenant())
+ }()
+
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
@@ -486,6 +513,12 @@ func TestTenantStreamingPauseOnError(t *testing.T) {
// Check dest has caught up the previous updates.
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))
+
+ cleanupTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanupTenant())
+ }()
+
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
@@ -562,6 +595,11 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
return nil
})
+ cleanupTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanupTenant())
+ }()
+
c.destSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID))
// Clear out the map to ignore the initial client starts
@@ -590,7 +628,83 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
})
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
- require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2"))
+ require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2"))
+}
+
+func TestTenantStreamingCancelIngestion(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ args := defaultTenantStreamingClustersArgs
+
+ testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) {
+ c, cleanup := createTenantStreamingClusters(ctx, t, args)
+ defer cleanup()
+ producerJobID, ingestionJobID := c.startStreamReplication()
+
+ jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
+ jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
+
+ srcTime := c.srcCluster.Server(0).Clock().Now()
+ c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))
+
+ cleanUpTenant := c.createDestTenantSQL(ctx)
+ c.compareResult("SELECT * FROM d.t1")
+ c.compareResult("SELECT * FROM d.t2")
+
+ if cancelAfterPaused {
+ c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID))
+ jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
+ }
+
+ // Close all tenant SQL sessions before we cancel the job so that
+ // we don't have any process still writing to 'sqlliveness' table after
+ // the tenant key range is cleared.
+ require.NoError(t, cleanUpTenant())
+
+ c.destSysSQL.Exec(t, fmt.Sprintf("CANCEL JOB %d", ingestionJobID))
+ jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
+ jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
+
+ // Check if the producer job has released protected timestamp.
+ stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID)
+ require.NotNil(t, stats.ProducerStatus)
+ require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)
+
+ // Check if dest tenant key ranges are not cleaned up.
+ destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID)
+
+ rows, err := c.destCluster.Server(0).DB().
+ Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
+ require.NoError(t, err)
+ require.NotEmpty(t, rows)
+
+ // Check if the tenant record still exits.
+ c.destSysSQL.CheckQueryResults(t,
+ fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
+ [][]string{{"1"}})
+
+ // Check if we can successfully GC the tenant.
+ c.destSysSQL.Exec(t, "SELECT crdb_internal.destroy_tenant($1, true)",
+ args.destTenantID.ToUint64())
+ rows, err = c.destCluster.Server(0).DB().
+ Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
+ require.NoError(t, err)
+ require.Empty(t, rows)
+
+ c.destSysSQL.CheckQueryResults(t,
+ fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
+ [][]string{{"0"}})
+ }
+
+ t.Run("cancel-ingestion-after-paused", func(t *testing.T) {
+ testCancelIngestion(t, true)
+ })
+
+ t.Run("cancel-ingestion-while-running", func(t *testing.T) {
+ testCancelIngestion(t, false)
+ })
}
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
@@ -641,9 +755,14 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer alternateSrcTenantConn.Close()
alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn)
+ cleanUpTenant := c.createDestTenantSQL(ctx)
+ defer func() {
+ require.NoError(t, cleanUpTenant())
+ }()
+
alternateCompareResult := func(query string) {
sourceData := alternateSrcTenantSQL.QueryStr(c.t, query)
- destData := c.getDestTenantSQL().QueryStr(c.t, query)
+ destData := c.destTenantSQL.QueryStr(c.t, query)
require.Equal(c.t, sourceData, destData)
}
diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel
index 8b2daa520e98..45a5f7988abf 100644
--- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel
+++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel
@@ -72,6 +72,7 @@ go_test(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
+ "//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security/securityassets",
@@ -86,6 +87,7 @@ go_test(
"//pkg/sql/sessiondatapb",
"//pkg/streaming",
"//pkg/testutils",
+ "//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go
index c60436bbbd9d..10bc68f51a83 100644
--- a/pkg/ccl/streamingccl/streamproducer/producer_job.go
+++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go
@@ -61,18 +61,29 @@ type producerJobResumer struct {
timer timeutil.TimerI
}
+// Releases the protected timestamp record associated with the producer
+// job if it exists.
+func (p *producerJobResumer) releaseProtectedTimestamp(
+ ctx context.Context, executorConfig *sql.ExecutorConfig,
+) error {
+ ptr := p.job.Details().(jobspb.StreamReplicationDetails).ProtectedTimestampRecordID
+ return executorConfig.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
+ err := executorConfig.ProtectedTimestampProvider.Release(ctx, txn, ptr)
+ // In case that a retry happens, the record might have been released.
+ if errors.Is(err, exec.ErrNotFound) {
+ return nil
+ }
+ return err
+ })
+}
+
// Resume is part of the jobs.Resumer interface.
func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()
- isTimedOut := func(job *jobs.Job) bool {
- progress := job.Progress()
- return progress.GetStreamReplication().Expiration.Before(p.timeSource.Now())
- }
- if isTimedOut(p.job) {
- return errors.Errorf("replication stream %d timed out", p.job.ID())
- }
- p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV()))
+
+ // Fire the timer immediately to start an initial progress check
+ p.timer.Reset(0)
for {
select {
case <-ctx.Done():
@@ -84,12 +95,24 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
if err != nil {
return err
}
- // The job completes successfully if the ingestion has been cut over.
- if p := j.Progress(); p.GetStreamReplication().IngestionCutOver {
- return nil
- }
- if isTimedOut(j) {
- return errors.Errorf("replication stream %d timed out", p.job.ID())
+
+ prog := j.Progress()
+ switch prog.GetStreamReplication().StreamIngestionStatus {
+ case jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY:
+ return p.releaseProtectedTimestamp(ctx, execCfg)
+ case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY:
+ fmt.Println("producer try update cancel requested")
+ return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
+ ju.UpdateStatus(jobs.StatusCancelRequested)
+ return nil
+ })
+ case jobspb.StreamReplicationProgress_NOT_FINISHED:
+ // Check if the job timed out.
+ if prog.GetStreamReplication().Expiration.Before(p.timeSource.Now()) {
+ return errors.Errorf("replication stream %d timed out", p.job.ID())
+ }
+ default:
+ return errors.New("unrecognized stream ingestion status")
}
}
}
@@ -99,17 +122,8 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
func (p *producerJobResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()
-
// Releases the protected timestamp record.
- ptr := p.job.Details().(jobspb.StreamReplicationDetails).ProtectedTimestampRecordID
- return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- err := execCfg.ProtectedTimestampProvider.Release(ctx, txn, ptr)
- // In case that a retry happens, the record might have been released.
- if errors.Is(err, exec.ErrNotFound) {
- return nil
- }
- return err
- })
+ return p.releaseProtectedTimestamp(ctx, execCfg)
}
func init() {
diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go
index a30904dc84a1..ca442ff600f5 100644
--- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go
+++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go
@@ -121,8 +121,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
ptp := source.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
timeout, usr := 1*time.Second, username.MakeSQLUsernameFromPreNormalizedString("user")
- registerConstructor := func(initialTime time.Time) (*timeutil.ManualTime, func(), func(), func()) {
- mt := timeutil.NewManualTime(initialTime)
+ registerConstructor := func() (*timeutil.ManualTime, func(), func(), func()) {
+ mt := timeutil.NewManualTime(timeutil.Now())
waitJobFinishReverting := make(chan struct{})
in, out := make(chan struct{}, 1), make(chan struct{}, 1)
jobs.RegisterConstructor(jobspb.TypeStreamReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
@@ -181,10 +181,15 @@ func TestStreamReplicationProducerJob(t *testing.T) {
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, 10, timeout, usr, ptsID)
defer jobs.ResetConstructors()()
- _, _, _, waitJobFinishReverting := registerConstructor(expirationTime(jr).Add(1 * time.Millisecond))
+ mt, timeGiven, waitForTimeRequest, waitJobFinishReverting := registerConstructor()
require.NoError(t, runJobWithProtectedTimestamp(ptsID, ts, jr))
+ // Coordinate the job to get a new time.
+ waitForTimeRequest()
+ mt.AdvanceTo(expirationTime(jr).Add(1 * time.Millisecond))
+ timeGiven()
+
waitJobFinishReverting()
sql.CheckQueryResultsRetry(t, jobsQuery(jr.JobID), [][]string{{"failed"}})
@@ -210,10 +215,14 @@ func TestStreamReplicationProducerJob(t *testing.T) {
jr := makeProducerJobRecord(registry, 20, timeout, usr, ptsID)
defer jobs.ResetConstructors()()
mt, timeGiven, waitForTimeRequest, waitJobFinishReverting :=
- registerConstructor(expirationTime(jr).Add(-5 * time.Millisecond))
-
+ registerConstructor()
require.NoError(t, runJobWithProtectedTimestamp(ptsID, ts, jr))
+
+ // Coordinate the job to get a new time.
waitForTimeRequest()
+ mt.AdvanceTo(expirationTime(jr).Add(-5 * time.Millisecond))
+ timeGiven()
+
sql.CheckQueryResults(t, jobsQuery(jr.JobID), [][]string{{"running"}})
updatedFrontier := hlc.Timestamp{
@@ -238,7 +247,8 @@ func TestStreamReplicationProducerJob(t *testing.T) {
// Ensure the timestamp is updated on the PTS record
require.Equal(t, updatedFrontier, r.Timestamp)
- // Reset the time to be after the timeout
+ // Coordinate the job to get a new time. Reset the time to be after the timeout.
+ waitForTimeRequest()
mt.AdvanceTo(expire.Add(12 * time.Millisecond))
timeGiven()
waitJobFinishReverting()
diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go
index 9ba05a36ecf6..b83fad87a46b 100644
--- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go
+++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go
@@ -52,9 +52,9 @@ func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
- evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
+ evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
) error {
- return completeReplicationStream(evalCtx, txn, streamID)
+ return completeReplicationStream(evalCtx, txn, streamID, successfulIngestion)
}
func newReplicationStreamManagerWithPrivilegesCheck(
diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
index 0ce142de2095..a755f2f5396c 100644
--- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
+++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
@@ -24,9 +24,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl"
"github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
+ "github.com/cockroachdb/cockroach/pkg/sql/distsql"
+ "github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -485,3 +490,76 @@ USE d;
}
}
}
+
+func TestCompleteStreamReplication(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ h, cleanup := streamingtest.NewReplicationHelper(t,
+ base.TestServerArgs{
+ Knobs: base.TestingKnobs{
+ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
+ },
+ DisableDefaultTestTenant: true,
+ })
+ defer cleanup()
+ srcTenantID := serverutils.TestTenantID()
+ _, cleanupTenant := h.CreateTenant(t, srcTenantID)
+ defer cleanupTenant()
+
+ // Make the producer job times out fast and fastly tracks ingestion cutover signal.
+ h.SysSQL.ExecMultiple(t,
+ "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '2s';",
+ "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';")
+
+ var timedOutStreamID int
+ row := h.SysSQL.QueryRow(t,
+ "SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
+ row.Scan(&timedOutStreamID)
+ jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID))
+
+ // Makes the producer job not easily time out.
+ h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10m';")
+ testCompleteStreamReplication := func(t *testing.T, successfulIngestion bool) {
+ // Verify no error when completing a timed out replication stream.
+ h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
+ timedOutStreamID, successfulIngestion)
+
+ // Create a new replication stream and complete it.
+ var streamID int
+ row := h.SysSQL.QueryRow(t,
+ "SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
+ row.Scan(&streamID)
+ jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
+ h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
+ streamID, successfulIngestion)
+
+ if successfulIngestion {
+ jobutils.WaitForJobToSucceed(t, h.SysSQL, jobspb.JobID(streamID))
+ } else {
+ jobutils.WaitForJobToCancel(t, h.SysSQL, jobspb.JobID(streamID))
+ }
+ // Verify protected timestamp record gets released.
+ jr := h.SysServer.JobRegistry().(*jobs.Registry)
+ pj, err := jr.LoadJob(ctx, jobspb.JobID(streamID))
+ require.NoError(t, err)
+ payload := pj.Payload()
+ require.ErrorIs(t, h.SysServer.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
+ ptp := h.SysServer.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
+ _, err = ptp.GetRecord(ctx, txn, payload.GetStreamReplication().ProtectedTimestampRecordID)
+ return err
+ }), protectedts.ErrNotExists)
+ }
+
+ for _, tc := range []struct {
+ testName string
+ successfulIngestion bool
+ }{
+ {"complete-with-successful-ingestion", true},
+ {"complete-without-successful-ingestion", false},
+ } {
+ t.Run(tc.testName, func(t *testing.T) {
+ testCompleteStreamReplication(t, tc.successfulIngestion)
+ })
+ }
+}
diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
index 6ee4e5f41d1a..05d18da17581 100644
--- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
+++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
@@ -10,6 +10,7 @@ package streamproducer
import (
"context"
+ "fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
@@ -246,18 +247,30 @@ func getReplicationStreamSpec(
}
func completeReplicationStream(
- evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
+ evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
) error {
- // Update the producer job that a cutover happens on the consumer side.
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
const useReadLock = false
return registry.UpdateJobWithTxn(evalCtx.Ctx(), jobspb.JobID(streamID), txn, useReadLock,
func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
+ // Updates the streamingestion status, make the job resumer exit running
+ // when picking up the new status.
if (md.Status == jobs.StatusRunning || md.Status == jobs.StatusPending) &&
- !md.Progress.GetStreamReplication().IngestionCutOver {
- p := md.Progress
- p.GetStreamReplication().IngestionCutOver = true
- ju.UpdateProgress(p)
+ md.Progress.GetStreamReplication().StreamIngestionStatus ==
+ jobspb.StreamReplicationProgress_NOT_FINISHED {
+ if successfulIngestion {
+ md.Progress.GetStreamReplication().StreamIngestionStatus =
+ jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY
+ md.Progress.RunningStatus = "succeeding this producer job as the corresponding " +
+ "stream ingestion finished successfully"
+ } else {
+ fmt.Println("producer update stream ingestion status")
+ md.Progress.GetStreamReplication().StreamIngestionStatus =
+ jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY
+ md.Progress.RunningStatus = "canceling this producer job as the corresponding " +
+ "stream ingestion did not finish successfully"
+ }
+ ju.UpdateProgress(md.Progress)
}
return nil
})
diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto
index ce26c92f2b16..b499f46382e8 100644
--- a/pkg/jobs/jobspb/jobs.proto
+++ b/pkg/jobs/jobspb/jobs.proto
@@ -148,8 +148,15 @@ message StreamReplicationProgress {
// Expiration timestamp of consumer heartbeat
google.protobuf.Timestamp expiration = 1 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
- // If the ingestion side has been cut over.
- bool ingestion_cut_over = 2;
+ enum StreamIngestionStatus {
+ NOT_FINISHED = 0;
+ FINISHED_SUCCESSFULLY = 1;
+ FINISHED_UNSUCCESSFULLY = 2;
+ }
+
+ // Status of the corresponding stream ingestion. The producer job tracks this
+ // to determine its fate.
+ StreamIngestionStatus stream_ingestion_status = 2;
}
message SchedulePTSChainingRecord {
diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go
index 1ebda888e472..79f664a72cc8 100644
--- a/pkg/sql/sem/builtins/replication_builtins.go
+++ b/pkg/sql/sem/builtins/replication_builtins.go
@@ -298,6 +298,7 @@ var replicationBuiltins = map[string]builtinDefinition{
tree.Overload{
Types: tree.ArgTypes{
{"stream_id", types.Int},
+ {"successful_ingestion", types.Bool},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
@@ -307,13 +308,15 @@ var replicationBuiltins = map[string]builtinDefinition{
}
streamID := int64(tree.MustBeDInt(args[0]))
- if err := mgr.CompleteReplicationStream(evalCtx, evalCtx.Txn, streaming.StreamID(streamID)); err != nil {
+ successfulIngestion := bool(tree.MustBeDBool(args[1]))
+ if err := mgr.CompleteReplicationStream(evalCtx, evalCtx.Txn,
+ streaming.StreamID(streamID), successfulIngestion); err != nil {
return nil, err
}
return tree.NewDInt(tree.DInt(streamID)), err
},
- Info: "This function can be used on the producer side to complete and clean up a replication stream " +
- "after the consumer receives a cutover event and finishes the ingestion",
+ Info: "This function can be used on the producer side to complete and clean up a replication stream." +
+ "'successful_ingestion' indicates whether the stream ingestion finished successfully.",
Volatility: volatility.Volatile,
},
),
diff --git a/pkg/streaming/api.go b/pkg/streaming/api.go
index a4fcc9dbd59e..11c081ef83fb 100644
--- a/pkg/streaming/api.go
+++ b/pkg/streaming/api.go
@@ -73,8 +73,10 @@ type ReplicationStreamManager interface {
) (*streampb.ReplicationStreamSpec, error)
// CompleteReplicationStream completes a replication stream job on the producer side.
+ // 'successfulIngestion' indicates whether the stream ingestion finished successfully and
+ // determines the fate of the producer job, succeeded or canceled.
CompleteReplicationStream(
- evalCtx *eval.Context, txn *kv.Txn, streamID StreamID,
+ evalCtx *eval.Context, txn *kv.Txn, streamID StreamID, successfulIngestion bool,
) error
}
|