Skip to content

Commit

Permalink
streamingccl: support canceling ingestion job
Browse files Browse the repository at this point in the history
This commit supports the intended behavior when canceling
an ingestion job: canceling the producer job which releases the
protected timestamp for the source tenant. The
destination tenant key ranges are left for
the user to clean up with crdb_internal.destroy_tenant

Release note: None
  • Loading branch information
gh-casper committed Jul 31, 2022
1 parent 1b4a2c4 commit a71ddbc
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (p *partitionedStreamClient) Complete(
p.mu.Lock()
defer p.mu.Unlock()
row := p.mu.srcConn.QueryRow(ctx,
`SELECT crdb_internal.complete_replication_stream($1)`, streamID)
`SELECT crdb_internal.complete_replication_stream($1, $2)`, streamID, successfulIngestion)
if err := row.Scan(&streamID); err != nil {
return errors.Wrapf(err, "error completing replication stream %d", streamID)
}
Expand Down
97 changes: 71 additions & 26 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,42 @@ func connectToActiveClient(
return client, errors.Wrapf(err, "ingestion job %d failed to connect to stream address or existing topology for planning", ingestionJob.ID())
}

// Ping the producer job and waits until it is active/running, returns nil when
// the job is active.
func waitUntilProducerActive(
ctx context.Context,
client streamclient.Client,
streamID streaming.StreamID,
heartbeatTimestamp hlc.Timestamp,
ingestionJobID jobspb.JobID,
) error {
ro := retry.Options{
InitialBackoff: 1 * time.Second,
Multiplier: 2,
MaxBackoff: 5 * time.Second,
MaxRetries: 4,
}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
var err error
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, heartbeatTimestamp)
if err != nil {
return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
ingestionJobID)
}
if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
break
}
log.Warningf(ctx, "producer job %d has status %s, retrying", streamID, status.StreamStatus)
}
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
"and in status %s", ingestionJobID, status.StreamStatus)
}
return nil
}

func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
progress := ingestionJob.Progress()
Expand All @@ -154,29 +190,8 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
return err
}
ingestWithClient := func() error {
ro := retry.Options{
InitialBackoff: 1 * time.Second,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, startTime)
if err != nil {
return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
ingestionJob.ID())
}
if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
break
}
log.Warningf(ctx,
"producer job %d has status %s, retrying", streamID, status.StreamStatus)
}
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
"and in status %s", ingestionJob.ID(), status.StreamStatus)
if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil {
return err
}

log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID)
Expand Down Expand Up @@ -246,8 +261,11 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
}

log.Infof(ctx, "starting to complete the producer job %d", streamID)
// Completes the producer job in the source cluster.
return client.Complete(ctx, streamID, true /* successfulIngestion */)
// Completes the producer job in the source cluster on best effort.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}
return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
}
Expand Down Expand Up @@ -349,6 +367,28 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
})
}

func (s *streamIngestionResumer) cancelProducerJob(
ctx context.Context, details jobspb.StreamIngestionDetails,
) {
streamID := streaming.StreamID(details.StreamID)
addr := streamingccl.StreamAddress(details.StreamAddress)
client, err := streamclient.NewStreamClient(ctx, addr)
if err != nil {
log.Warningf(ctx, "encountered error when creating the stream client: %v", err)
return
}
log.Infof(ctx, "canceling the producer job %d as stream ingestion job %d is being canceled",
streamID, s.job.ID())
if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when canceling the producer job: %v", err)
fmt.Println("canceled failure", err)
}
fmt.Println("cancel sent")
if err = client.Close(ctx); err != nil {
log.Warningf(ctx, "encountered error when closing the stream client: %v", err)
}
}

// OnFailOrCancel is part of the jobs.Resumer interface.
// There is a know race between the ingestion processors shutting down, and
// OnFailOrCancel being invoked. As a result of which we might see some keys
Expand All @@ -358,7 +398,12 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
// TODO(adityamaru): Add ClearRange logic once we have introduced
// synchronization between the flow tearing down and the job transitioning to a
// failed/canceled state.
func (s *streamIngestionResumer) OnFailOrCancel(_ context.Context, _ interface{}) error {
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// Cancel the producer job on best effort. The source job's protected timestamp is no
// longer needed as this ingestion job is in 'reverting' status and we won't resume
// ingestion anymore.
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details)
return nil
}

Expand Down
137 changes: 128 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,32 @@ type tenantStreamingClusters struct {
destCluster *testcluster.TestCluster
destSysServer serverutils.TestServerInterface
destSysSQL *sqlutils.SQLRunner
destTenantSQL *sqlutils.SQLRunner
}

// Creates a dest tenant SQL runner and returns a cleanup function that shuts
// tenant SQL instance and closes all sessions.
// This function will fail the test if ran prior to the Replication stream
// closing as the tenant will not yet be active
func (c *tenantStreamingClusters) getDestTenantSQL() *sqlutils.SQLRunner {
_, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer, base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
return sqlutils.MakeSQLRunner(destTenantConn)
func (c *tenantStreamingClusters) createDestTenantSQL(ctx context.Context) func() error {
testTenant, destTenantConn := serverutils.StartTenant(c.t, c.destSysServer,
base.TestTenantArgs{TenantID: c.args.destTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
c.destTenantSQL = sqlutils.MakeSQLRunner(destTenantConn)
return func() error {
if err := destTenantConn.Close(); err != nil {
return err
}
testTenant.Stopper().Stop(ctx)
return nil
}
}

// This function has to be called after createTenantSQL.
func (c *tenantStreamingClusters) compareResult(query string) {
require.NotNil(c.t, c.destTenantSQL,
"destination tenant SQL runner should be created first")
sourceData := c.srcTenantSQL.QueryStr(c.t, query)
destData := c.getDestTenantSQL().QueryStr(c.t, query)
destData := c.destTenantSQL.QueryStr(c.t, query)
require.Equal(c.t, sourceData, destData)
}

Expand Down Expand Up @@ -342,6 +356,11 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
c.cutover(producerJobID, ingestionJobID, cutoverTime)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()

c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
c.compareResult("SELECT * FROM d.x")
Expand All @@ -351,7 +370,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
})
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2"))
require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2"))
}

func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
Expand All @@ -364,7 +383,6 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

// initial scan
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
Expand All @@ -373,6 +391,10 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")

Expand All @@ -399,7 +421,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
require.Equal(t, [][]string{{"0"}},
c.getDestTenantSQL().QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3"))
c.destTenantSQL.QueryStr(t, "SELECT count(*) FROM d.t2 WHERE i = 3"))

// After resumed, the ingestion job paused on failure again.
c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
Expand All @@ -423,6 +445,11 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()

c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")

Expand Down Expand Up @@ -486,6 +513,12 @@ func TestTenantStreamingPauseOnError(t *testing.T) {
// Check dest has caught up the previous updates.
srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()

c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")

Expand Down Expand Up @@ -562,6 +595,11 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
return nil
})

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()

c.destSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID))
// Clear out the map to ignore the initial client starts
Expand Down Expand Up @@ -590,7 +628,83 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
})
// Check the dst cluster didn't receive the change after a while.
<-time.NewTimer(3 * time.Second).C
require.Equal(t, [][]string{{"2"}}, c.getDestTenantSQL().QueryStr(t, "SELECT * FROM d.t2"))
require.Equal(t, [][]string{{"2"}}, c.destTenantSQL.QueryStr(t, "SELECT * FROM d.t2"))
}

func TestTenantStreamingCancelIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs

testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

srcTime := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.createDestTenantSQL(ctx)
c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")

if cancelAfterPaused {
c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID))
jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
}

// Close all tenant SQL sessions before we cancel the job so that
// we don't have any process still writing to 'sqlliveness' table after
// the tenant key range is cleared.
require.NoError(t, cleanUpTenant())

c.destSysSQL.Exec(t, fmt.Sprintf("CANCEL JOB %d", ingestionJobID))
jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))

// Check if the producer job has released protected timestamp.
stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)

// Check if dest tenant key ranges are not cleaned up.
destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID)

rows, err := c.destCluster.Server(0).DB().
Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
require.NoError(t, err)
require.NotEmpty(t, rows)

// Check if the tenant record still exits.
c.destSysSQL.CheckQueryResults(t,
fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
[][]string{{"1"}})

// Check if we can successfully GC the tenant.
c.destSysSQL.Exec(t, "SELECT crdb_internal.destroy_tenant($1, true)",
args.destTenantID.ToUint64())
rows, err = c.destCluster.Server(0).DB().
Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
require.NoError(t, err)
require.Empty(t, rows)

c.destSysSQL.CheckQueryResults(t,
fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
[][]string{{"0"}})
}

t.Run("cancel-ingestion-after-paused", func(t *testing.T) {
testCancelIngestion(t, true)
})

t.Run("cancel-ingestion-while-running", func(t *testing.T) {
testCancelIngestion(t, false)
})
}

func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
Expand Down Expand Up @@ -641,9 +755,14 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer alternateSrcTenantConn.Close()
alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn)

cleanUpTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanUpTenant())
}()

alternateCompareResult := func(query string) {
sourceData := alternateSrcTenantSQL.QueryStr(c.t, query)
destData := c.getDestTenantSQL().QueryStr(c.t, query)
destData := c.destTenantSQL.QueryStr(c.t, query)
require.Equal(c.t, sourceData, destData)
}

Expand Down
15 changes: 6 additions & 9 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,9 @@ func (p *producerJobResumer) releaseProtectedTimestamp(
func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()
isTimedOut := func(job *jobs.Job) bool {
progress := job.Progress()
return progress.GetStreamReplication().Expiration.Before(p.timeSource.Now())
}
if isTimedOut(p.job) {
return errors.Errorf("replication stream %d timed out", p.job.ID())
}
p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV()))

// Fire the timer immediately to start an initial progress check
p.timer.Reset(0)
for {
select {
case <-ctx.Done():
Expand All @@ -106,12 +101,14 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
case jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY:
return p.releaseProtectedTimestamp(ctx, execCfg)
case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY:
fmt.Println("producer try update cancel requested")
return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
ju.UpdateStatus(jobs.StatusCancelRequested)
return nil
})
case jobspb.StreamReplicationProgress_NOT_FINISHED:
if isTimedOut(j) {
// Check if the job timed out.
if prog.GetStreamReplication().Expiration.Before(p.timeSource.Now()) {
return errors.Errorf("replication stream %d timed out", p.job.ID())
}
default:
Expand Down
Loading

0 comments on commit a71ddbc

Please sign in to comment.