Skip to content

Commit

Permalink
Merge #83310
Browse files Browse the repository at this point in the history
83310: streamingccl: support canceling an ingestion job r=gh-casper a=gh-casper

    This PR supports the intended behavior when canceling
    an ingestion job: canceling the producer job which releases the
    protected timestamp for the source tenant. The
    destination tenant key ranges are left for
    the user to clean up with crdb_internal.destroy_tenant.

    This PR also expands crdb_internal.complete_stream_replication
    to support canceling a producer job to take 2nd argument
    'successfulIngestion' to support complete replication stream
    even when cutover doesn't happen, e.g., ingestion gets canceled
    and a revert happens, this cancels the producer job

    Release note (sql change): expand crdb_internal.complete_stream_replication
    to take successfulIngestion argument, which indicates if this
    stream ingestion finished successfully.

Closes: #57409

Co-authored-by: Casper <[email protected]>
  • Loading branch information
craig[bot] and gh-casper committed Jul 31, 2022
2 parents 0352fbe + a71ddbc commit 5fe0527
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 89 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2615,7 +2615,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th><th>Volatility</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_replication_stream"></a><code>crdb_internal.complete_replication_stream(stream_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to complete and clean up a replication stream after the consumer receives a cutover event and finishes the ingestion</p>
<tr><td><a name="crdb_internal.complete_replication_stream"></a><code>crdb_internal.complete_replication_stream(stream_id: <a href="int.html">int</a>, successful_ingestion: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to complete and clean up a replication stream.‘successful_ingestion’ indicates whether the stream ingestion finished successfully.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td><td>Volatile</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID) error
Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func (sc testStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (sc testStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,16 @@ func (p *partitionedStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (p *partitionedStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
row := p.mu.srcConn.QueryRow(ctx,
`SELECT crdb_internal.complete_replication_stream($1, $2)`, streamID, successfulIngestion)
if err := row.Scan(&streamID); err != nil {
return errors.Wrapf(err, "error completing replication stream %d", streamID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ INSERT INTO d.t2 VALUES (2);
require.True(t, errors.Is(err, context.Canceled) || isQueryCanceledError(err))

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999))
err = client.Complete(ctx, streaming.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
Expand All @@ -212,7 +212,7 @@ SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'
`)
streamID, err = client.Create(ctx, tenant.ID)
require.NoError(t, err)
require.NoError(t, client.Complete(ctx, streamID))
require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ func (m *randomStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (m *randomStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *randomStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
97 changes: 71 additions & 26 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,42 @@ func connectToActiveClient(
return client, errors.Wrapf(err, "ingestion job %d failed to connect to stream address or existing topology for planning", ingestionJob.ID())
}

// Ping the producer job and waits until it is active/running, returns nil when
// the job is active.
func waitUntilProducerActive(
ctx context.Context,
client streamclient.Client,
streamID streaming.StreamID,
heartbeatTimestamp hlc.Timestamp,
ingestionJobID jobspb.JobID,
) error {
ro := retry.Options{
InitialBackoff: 1 * time.Second,
Multiplier: 2,
MaxBackoff: 5 * time.Second,
MaxRetries: 4,
}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
var err error
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, heartbeatTimestamp)
if err != nil {
return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
ingestionJobID)
}
if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
break
}
log.Warningf(ctx, "producer job %d has status %s, retrying", streamID, status.StreamStatus)
}
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
"and in status %s", ingestionJobID, status.StreamStatus)
}
return nil
}

func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
progress := ingestionJob.Progress()
Expand All @@ -154,29 +190,8 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
return err
}
ingestWithClient := func() error {
ro := retry.Options{
InitialBackoff: 1 * time.Second,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, startTime)
if err != nil {
return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
ingestionJob.ID())
}
if status.StreamStatus != streampb.StreamReplicationStatus_UNKNOWN_STREAM_STATUS_RETRY {
break
}
log.Warningf(ctx,
"producer job %d has status %s, retrying", streamID, status.StreamStatus)
}
if status.StreamStatus != streampb.StreamReplicationStatus_STREAM_ACTIVE {
return errors.Errorf("failed to resume ingestion job %d as the producer job is not active "+
"and in status %s", ingestionJob.ID(), status.StreamStatus)
if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil {
return err
}

log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID)
Expand Down Expand Up @@ -246,8 +261,11 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
}

log.Infof(ctx, "starting to complete the producer job %d", streamID)
// Completes the producer job in the source cluster.
return client.Complete(ctx, streamID)
// Completes the producer job in the source cluster on best effort.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}
return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
}
Expand Down Expand Up @@ -349,6 +367,28 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
})
}

func (s *streamIngestionResumer) cancelProducerJob(
ctx context.Context, details jobspb.StreamIngestionDetails,
) {
streamID := streaming.StreamID(details.StreamID)
addr := streamingccl.StreamAddress(details.StreamAddress)
client, err := streamclient.NewStreamClient(ctx, addr)
if err != nil {
log.Warningf(ctx, "encountered error when creating the stream client: %v", err)
return
}
log.Infof(ctx, "canceling the producer job %d as stream ingestion job %d is being canceled",
streamID, s.job.ID())
if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when canceling the producer job: %v", err)
fmt.Println("canceled failure", err)
}
fmt.Println("cancel sent")
if err = client.Close(ctx); err != nil {
log.Warningf(ctx, "encountered error when closing the stream client: %v", err)
}
}

// OnFailOrCancel is part of the jobs.Resumer interface.
// There is a know race between the ingestion processors shutting down, and
// OnFailOrCancel being invoked. As a result of which we might see some keys
Expand All @@ -358,7 +398,12 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
// TODO(adityamaru): Add ClearRange logic once we have introduced
// synchronization between the flow tearing down and the job transitioning to a
// failed/canceled state.
func (s *streamIngestionResumer) OnFailOrCancel(_ context.Context, _ interface{}) error {
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// Cancel the producer job on best effort. The source job's protected timestamp is no
// longer needed as this ingestion job is in 'reverting' status and we won't resume
// ingestion anymore.
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ func (m *mockStreamClient) Close(ctx context.Context) error {
}

// Complete implements the streamclient.Client interface.
func (m *mockStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *mockStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand All @@ -163,7 +165,9 @@ func (m *errorStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (m *errorStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *errorStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
Loading

0 comments on commit 5fe0527

Please sign in to comment.