From a71ddbc672b89583aed3692372fd0708c151a7bd Mon Sep 17 00:00:00 2001 From: Casper Date: Thu, 14 Jul 2022 11:26:34 -0400 Subject: [PATCH] streamingccl: support canceling ingestion job This commit supports the intended behavior when canceling an ingestion job: canceling the producer job which releases the protected timestamp for the source tenant. The destination tenant key ranges are left for the user to clean up with crdb_internal.destroy_tenant Release note: None --- .../streamclient/partitioned_stream_client.go | 2 +- .../streamingest/stream_ingestion_job.go | 97 +++++++++---- .../stream_replication_e2e_test.go | 137 ++++++++++++++++-- .../streamproducer/producer_job.go | 15 +- .../streamproducer/producer_job_test.go | 22 ++- .../streamproducer/stream_lifetime.go | 2 + 6 files changed, 224 insertions(+), 51 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 573306d8d85e..be23beaedc6e 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -223,7 +223,7 @@ func (p *partitionedStreamClient) Complete( p.mu.Lock() defer p.mu.Unlock() row := p.mu.srcConn.QueryRow(ctx, - `SELECT crdb_internal.complete_replication_stream($1)`, streamID) + `SELECT crdb_internal.complete_replication_stream($1, $2)`, streamID, successfulIngestion) if err := row.Scan(&streamID); err != nil { return errors.Wrapf(err, "error completing replication stream %d", streamID) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 023f8b15c35f..42e6dfa71b8c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -135,6 +135,42 @@ func connectToActiveClient( return client, errors.Wrapf(err, "ingestion job %d failed to connect to stream address or existing topology for planning", ingestionJob.ID()) } +// Ping the producer job and waits until it is active/running, returns nil when +// the job is active. +func waitUntilProducerActive( + ctx context.Context, + client streamclient.Client, + streamID streaming.StreamID, + heartbeatTimestamp hlc.Timestamp, + ingestionJobID jobspb.JobID, +) error { + ro := retry.Options{ + InitialBackoff: 1 * time.Second, + Multiplier: 2, + MaxBackoff: 5 * time.Second, + MaxRetries: 4, + } + // Make sure the producer job is active before start the stream replication. + var status streampb.StreamReplicationStatus + var err error + for r := retry.Start(ro); r.Next(); { + status, err = client.Heartbeat(ctx, streamID, heartbeatTimestamp) + if err != nil { + return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error", + ingestionJobID) + } + if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY { + break + } + log.Warningf(ctx, "producer job %d has status %s, retrying", streamID, status.StreamStatus) + } + if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE { + return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+ + "and in status %s", ingestionJobID, status.StreamStatus) + } + return nil +} + func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error { details := ingestionJob.Details().(jobspb.StreamIngestionDetails) progress := ingestionJob.Progress() @@ -154,29 +190,8 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. return err } ingestWithClient := func() error { - ro := retry.Options{ - InitialBackoff: 1 * time.Second, - Multiplier: 2, - MaxBackoff: 10 * time.Second, - MaxRetries: 5, - } - // Make sure the producer job is active before start the stream replication. - var status streampb.StreamReplicationStatus - for r := retry.Start(ro); r.Next(); { - status, err = client.Heartbeat(ctx, streamID, startTime) - if err != nil { - return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error", - ingestionJob.ID()) - } - if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY { - break - } - log.Warningf(ctx, - "producer job %d has status %s, retrying", streamID, status.StreamStatus) - } - if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE { - return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+ - "and in status %s", ingestionJob.ID(), status.StreamStatus) + if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil { + return err } log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) @@ -246,8 +261,11 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. } log.Infof(ctx, "starting to complete the producer job %d", streamID) - // Completes the producer job in the source cluster. - return client.Complete(ctx, streamID, true /* successfulIngestion */) + // Completes the producer job in the source cluster on best effort. + if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil { + log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID) + } + return nil } return errors.CombineErrors(ingestWithClient(), client.Close(ctx)) } @@ -349,6 +367,28 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp }) } +func (s *streamIngestionResumer) cancelProducerJob( + ctx context.Context, details jobspb.StreamIngestionDetails, +) { + streamID := streaming.StreamID(details.StreamID) + addr := streamingccl.StreamAddress(details.StreamAddress) + client, err := streamclient.NewStreamClient(ctx, addr) + if err != nil { + log.Warningf(ctx, "encountered error when creating the stream client: %v", err) + return + } + log.Infof(ctx, "canceling the producer job %d as stream ingestion job %d is being canceled", + streamID, s.job.ID()) + if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil { + log.Warningf(ctx, "encountered error when canceling the producer job: %v", err) + fmt.Println("canceled failure", err) + } + fmt.Println("cancel sent") + if err = client.Close(ctx); err != nil { + log.Warningf(ctx, "encountered error when closing the stream client: %v", err) + } +} + // OnFailOrCancel is part of the jobs.Resumer interface. // There is a know race between the ingestion processors shutting down, and // OnFailOrCancel being invoked. As a result of which we might see some keys @@ -358,7 +398,12 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp // TODO(adityamaru): Add ClearRange logic once we have introduced // synchronization between the flow tearing down and the job transitioning to a // failed/canceled state. -func (s *streamIngestionResumer) OnFailOrCancel(_ context.Context, _ interface{}) error { +func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + // Cancel the producer job on best effort. The source job's protected timestamp is no + // longer needed as this ingestion job is in 'reverting' status and we won't resume + // ingestion anymore. + details := s.job.Details().(jobspb.StreamIngestionDetails) + s.cancelProducerJob(ctx, details) return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 5298544e1638..fcc01d920e1d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -94,18 +94,32 @@ type tenantStreamingClusters struct { destCluster *testcluster.TestCluster destSysServer serverutils.TestServerInterface destSysSQL *sqlutils.SQLRunner + destTenantSQL *sqlutils.SQLRunner } +// Creates a dest tenant SQL runner and returns a cleanup function that shuts +// tenant SQL instance and closes all sessions. // This function will fail the test if ran prior to the Replication stream // closing as the tenant will not yet be active -func (c *tenantStreamingClusters) getDestTenantSQL() *sqlutils.SQLRunner { - _, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer, base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true}) - return sqlutils.MakeSQLRunner(destTenantConn) +func (c *tenantStreamingClusters) createDestTenantSQL(ctx context.Context) func() error { + testTenant, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer, + base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true}) + c.destTenantSQL = sqlutils.MakeSQLRunner(destTenantConn) + return func() error { + if err := destTenantConn.Close(); err != nil { + return err + } + testTenant.Stopper().Stop(ctx) + return nil + } } +// This function has to be called after createTenantSQL. func (c *tenantStreamingClusters) compareResult(query string) { + require.NotNil(c.t, c.destTenantSQL, + "destination tenant SQL runner should be created first") sourceData := c.srcTenantSQL.QueryStr(c.t, query) - destData := c.getDestTenantSQL().QueryStr(c.t, query) + destData := c.destTenantSQL.QueryStr(c.t, query) require.Equal(c.t, sourceData, destData) } @@ -342,6 +356,11 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { c.cutover(producerJobID, ingestionJobID, cutoverTime) jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() + c.compareResult("SELECT * FROM d.t1") c.compareResult("SELECT * FROM d.t2") c.compareResult("SELECT * FROM d.x") @@ -351,7 +370,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { }) // Check the dst cluster didn't receive the change after a while. <-time.NewTimer(3 * time.Second).C - require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2")) + require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2")) } func TestTenantStreamingProducerJobTimedOut(t *testing.T) { @@ -364,7 +383,6 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - // initial scan producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) @@ -373,6 +391,10 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { srcTime := c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() c.compareResult("SELECT * FROM d.t1") c.compareResult("SELECT * FROM d.t2") @@ -399,7 +421,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { // Check the dst cluster didn't receive the change after a while. <-time.NewTimer(3 * time.Second).C require.Equal(t, [][]string{{"0"}}, - c.getDestTenantSQL().QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3")) + c.destTenantSQL.QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3")) // After resumed, the ingestion job paused on failure again. c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID)) @@ -423,6 +445,11 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { srcTime := c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() + c.compareResult("SELECT * FROM d.t1") c.compareResult("SELECT * FROM d.t2") @@ -486,6 +513,12 @@ func TestTenantStreamingPauseOnError(t *testing.T) { // Check dest has caught up the previous updates. srcTime := c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() + c.compareResult("SELECT * FROM d.t1") c.compareResult("SELECT * FROM d.t2") @@ -562,6 +595,11 @@ func TestTenantStreamingCheckpoint(t *testing.T) { return nil }) + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() + c.destSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID)) // Clear out the map to ignore the initial client starts @@ -590,7 +628,83 @@ func TestTenantStreamingCheckpoint(t *testing.T) { }) // Check the dst cluster didn't receive the change after a while. <-time.NewTimer(3 * time.Second).C - require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2")) + require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2")) +} + +func TestTenantStreamingCancelIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + + testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) { + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + srcTime := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + + cleanUpTenant := c.createDestTenantSQL(ctx) + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + + if cancelAfterPaused { + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + } + + // Close all tenant SQL sessions before we cancel the job so that + // we don't have any process still writing to 'sqlliveness' table after + // the tenant key range is cleared. + require.NoError(t, cleanUpTenant()) + + c.destSysSQL.Exec(t, fmt.Sprintf("CANCEL JOB %d", ingestionJobID)) + jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + + // Check if the producer job has released protected timestamp. + stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID) + require.NotNil(t, stats.ProducerStatus) + require.Nil(t, stats.ProducerStatus.ProtectedTimestamp) + + // Check if dest tenant key ranges are not cleaned up. + destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID) + + rows, err := c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.NotEmpty(t, rows) + + // Check if the tenant record still exits. + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"1"}}) + + // Check if we can successfully GC the tenant. + c.destSysSQL.Exec(t, "SELECT crdb_internal.destroy_tenant($1, true)", + args.destTenantID.ToUint64()) + rows, err = c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.Empty(t, rows) + + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"0"}}) + } + + t.Run("cancel-ingestion-after-paused", func(t *testing.T) { + testCancelIngestion(t, true) + }) + + t.Run("cancel-ingestion-while-running", func(t *testing.T) { + testCancelIngestion(t, false) + }) } func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { @@ -641,9 +755,14 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer alternateSrcTenantConn.Close() alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn) + cleanUpTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanUpTenant()) + }() + alternateCompareResult := func(query string) { sourceData := alternateSrcTenantSQL.QueryStr(c.t, query) - destData := c.getDestTenantSQL().QueryStr(c.t, query) + destData := c.destTenantSQL.QueryStr(c.t, query) require.Equal(c.t, sourceData, destData) } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index 4add2dfa7a4e..10bc68f51a83 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -81,14 +81,9 @@ func (p *producerJobResumer) releaseProtectedTimestamp( func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) error { jobExec := execCtx.(sql.JobExecContext) execCfg := jobExec.ExecCfg() - isTimedOut := func(job *jobs.Job) bool { - progress := job.Progress() - return progress.GetStreamReplication().Expiration.Before(p.timeSource.Now()) - } - if isTimedOut(p.job) { - return errors.Errorf("replication stream %d timed out", p.job.ID()) - } - p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) + + // Fire the timer immediately to start an initial progress check + p.timer.Reset(0) for { select { case <-ctx.Done(): @@ -106,12 +101,14 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er case jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY: return p.releaseProtectedTimestamp(ctx, execCfg) case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY: + fmt.Println("producer try update cancel requested") return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { ju.UpdateStatus(jobs.StatusCancelRequested) return nil }) case jobspb.StreamReplicationProgress_NOT_FINISHED: - if isTimedOut(j) { + // Check if the job timed out. + if prog.GetStreamReplication().Expiration.Before(p.timeSource.Now()) { return errors.Errorf("replication stream %d timed out", p.job.ID()) } default: diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index a30904dc84a1..ca442ff600f5 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -121,8 +121,8 @@ func TestStreamReplicationProducerJob(t *testing.T) { ptp := source.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider timeout, usr := 1*time.Second, username.MakeSQLUsernameFromPreNormalizedString("user") - registerConstructor := func(initialTime time.Time) (*timeutil.ManualTime, func(), func(), func()) { - mt := timeutil.NewManualTime(initialTime) + registerConstructor := func() (*timeutil.ManualTime, func(), func(), func()) { + mt := timeutil.NewManualTime(timeutil.Now()) waitJobFinishReverting := make(chan struct{}) in, out := make(chan struct{}, 1), make(chan struct{}, 1) jobs.RegisterConstructor(jobspb.TypeStreamReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { @@ -181,10 +181,15 @@ func TestStreamReplicationProducerJob(t *testing.T) { ptsID := uuid.MakeV4() jr := makeProducerJobRecord(registry, 10, timeout, usr, ptsID) defer jobs.ResetConstructors()() - _, _, _, waitJobFinishReverting := registerConstructor(expirationTime(jr).Add(1 * time.Millisecond)) + mt, timeGiven, waitForTimeRequest, waitJobFinishReverting := registerConstructor() require.NoError(t, runJobWithProtectedTimestamp(ptsID, ts, jr)) + // Coordinate the job to get a new time. + waitForTimeRequest() + mt.AdvanceTo(expirationTime(jr).Add(1 * time.Millisecond)) + timeGiven() + waitJobFinishReverting() sql.CheckQueryResultsRetry(t, jobsQuery(jr.JobID), [][]string{{"failed"}}) @@ -210,10 +215,14 @@ func TestStreamReplicationProducerJob(t *testing.T) { jr := makeProducerJobRecord(registry, 20, timeout, usr, ptsID) defer jobs.ResetConstructors()() mt, timeGiven, waitForTimeRequest, waitJobFinishReverting := - registerConstructor(expirationTime(jr).Add(-5 * time.Millisecond)) - + registerConstructor() require.NoError(t, runJobWithProtectedTimestamp(ptsID, ts, jr)) + + // Coordinate the job to get a new time. waitForTimeRequest() + mt.AdvanceTo(expirationTime(jr).Add(-5 * time.Millisecond)) + timeGiven() + sql.CheckQueryResults(t, jobsQuery(jr.JobID), [][]string{{"running"}}) updatedFrontier := hlc.Timestamp{ @@ -238,7 +247,8 @@ func TestStreamReplicationProducerJob(t *testing.T) { // Ensure the timestamp is updated on the PTS record require.Equal(t, updatedFrontier, r.Timestamp) - // Reset the time to be after the timeout + // Coordinate the job to get a new time. Reset the time to be after the timeout. + waitForTimeRequest() mt.AdvanceTo(expire.Add(12 * time.Millisecond)) timeGiven() waitJobFinishReverting() diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 536ff6a417eb..05d18da17581 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -10,6 +10,7 @@ package streamproducer import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" @@ -263,6 +264,7 @@ func completeReplicationStream( md.Progress.RunningStatus = "succeeding this producer job as the corresponding " + "stream ingestion finished successfully" } else { + fmt.Println("producer update stream ingestion status") md.Progress.GetStreamReplication().StreamIngestionStatus = jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY md.Progress.RunningStatus = "canceling this producer job as the corresponding " +