Skip to content

Commit

Permalink
streamingccl: expand crdb_internal.complete_stream_replication
Browse files Browse the repository at this point in the history
to support canceling a producer job

Previously we can only make a producer job normally finish
through complete_stream_replication after ingestion is cutover.
Now we expand complete_stream_replication to take 2nd argument
'successfulIngestion' to support complete replication stream
even when cutover doesn't happen, e.g., ingestion gets canceled
and a revert happens, this cancels the producer job.

Release note (sql change): expand crdb_internal.complete_stream_replication
to take successfulIngestion argument, which indicates if this
stream ingestion finished successfully.
  • Loading branch information
gh-casper committed Jul 30, 2022
1 parent 0352fbe commit 1b4a2c4
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 41 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2615,7 +2615,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th><th>Volatility</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_replication_stream"></a><code>crdb_internal.complete_replication_stream(stream_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to complete and clean up a replication stream after the consumer receives a cutover event and finishes the ingestion</p>
<tr><td><a name="crdb_internal.complete_replication_stream"></a><code>crdb_internal.complete_replication_stream(stream_id: <a href="int.html">int</a>, successful_ingestion: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to complete and clean up a replication stream.‘successful_ingestion’ indicates whether the stream ingestion finished successfully.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td><td>Volatile</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID) error
Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func (sc testStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (sc testStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,16 @@ func (p *partitionedStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (p *partitionedStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
row := p.mu.srcConn.QueryRow(ctx,
`SELECT crdb_internal.complete_replication_stream($1)`, streamID)
if err := row.Scan(&streamID); err != nil {
return errors.Wrapf(err, "error completing replication stream %d", streamID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ INSERT INTO d.t2 VALUES (2);
require.True(t, errors.Is(err, context.Canceled) || isQueryCanceledError(err))

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999))
err = client.Complete(ctx, streaming.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
Expand All @@ -212,7 +212,7 @@ SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'
`)
streamID, err = client.Create(ctx, tenant.ID)
require.NoError(t, err)
require.NoError(t, client.Complete(ctx, streamID))
require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ func (m *randomStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (m *randomStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *randomStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.

log.Infof(ctx, "starting to complete the producer job %d", streamID)
// Completes the producer job in the source cluster.
return client.Complete(ctx, streamID)
return client.Complete(ctx, streamID, true /* successfulIngestion */)
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ func (m *mockStreamClient) Close(ctx context.Context) error {
}

// Complete implements the streamclient.Client interface.
func (m *mockStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *mockStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand All @@ -163,7 +165,9 @@ func (m *errorStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (m *errorStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
func (m *errorStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_test(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand All @@ -86,6 +87,7 @@ go_test(
"//pkg/sql/sessiondatapb",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
Expand Down
49 changes: 33 additions & 16 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ type producerJobResumer struct {
timer timeutil.TimerI
}

// Releases the protected timestamp record associated with the producer
// job if it exists.
func (p *producerJobResumer) releaseProtectedTimestamp(
ctx context.Context, executorConfig *sql.ExecutorConfig,
) error {
ptr := p.job.Details().(jobspb.StreamReplicationDetails).ProtectedTimestampRecordID
return executorConfig.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := executorConfig.ProtectedTimestampProvider.Release(ctx, txn, ptr)
// In case that a retry happens, the record might have been released.
if errors.Is(err, exec.ErrNotFound) {
return nil
}
return err
})
}

// Resume is part of the jobs.Resumer interface.
func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
Expand All @@ -84,12 +100,22 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
if err != nil {
return err
}
// The job completes successfully if the ingestion has been cut over.
if p := j.Progress(); p.GetStreamReplication().IngestionCutOver {
return nil
}
if isTimedOut(j) {
return errors.Errorf("replication stream %d timed out", p.job.ID())

prog := j.Progress()
switch prog.GetStreamReplication().StreamIngestionStatus {
case jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY:
return p.releaseProtectedTimestamp(ctx, execCfg)
case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY:
return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
ju.UpdateStatus(jobs.StatusCancelRequested)
return nil
})
case jobspb.StreamReplicationProgress_NOT_FINISHED:
if isTimedOut(j) {
return errors.Errorf("replication stream %d timed out", p.job.ID())
}
default:
return errors.New("unrecognized stream ingestion status")
}
}
}
Expand All @@ -99,17 +125,8 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
func (p *producerJobResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()

// Releases the protected timestamp record.
ptr := p.job.Details().(jobspb.StreamReplicationDetails).ProtectedTimestampRecordID
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := execCfg.ProtectedTimestampProvider.Release(ctx, txn, ptr)
// In case that a retry happens, the record might have been released.
if errors.Is(err, exec.ErrNotFound) {
return nil
}
return err
})
return p.releaseProtectedTimestamp(ctx, execCfg)
}

func init() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(

// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
) error {
return completeReplicationStream(evalCtx, txn, streamID)
return completeReplicationStream(evalCtx, txn, streamID, successfulIngestion)
}

func newReplicationStreamManagerWithPrivilegesCheck(
Expand Down
78 changes: 78 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -485,3 +490,76 @@ USE d;
}
}
}

func TestCompleteStreamReplication(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
h, cleanup := streamingtest.NewReplicationHelper(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DisableDefaultTestTenant: true,
})
defer cleanup()
srcTenantID := serverutils.TestTenantID()
_, cleanupTenant := h.CreateTenant(t, srcTenantID)
defer cleanupTenant()

// Make the producer job times out fast and fastly tracks ingestion cutover signal.
h.SysSQL.ExecMultiple(t,
"SET CLUSTER SETTING stream_replication.job_liveness_timeout = '2s';",
"SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';")

var timedOutStreamID int
row := h.SysSQL.QueryRow(t,
"SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
row.Scan(&timedOutStreamID)
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID))

// Makes the producer job not easily time out.
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10m';")
testCompleteStreamReplication := func(t *testing.T, successfulIngestion bool) {
// Verify no error when completing a timed out replication stream.
h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
timedOutStreamID, successfulIngestion)

// Create a new replication stream and complete it.
var streamID int
row := h.SysSQL.QueryRow(t,
"SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
row.Scan(&streamID)
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
streamID, successfulIngestion)

if successfulIngestion {
jobutils.WaitForJobToSucceed(t, h.SysSQL, jobspb.JobID(streamID))
} else {
jobutils.WaitForJobToCancel(t, h.SysSQL, jobspb.JobID(streamID))
}
// Verify protected timestamp record gets released.
jr := h.SysServer.JobRegistry().(*jobs.Registry)
pj, err := jr.LoadJob(ctx, jobspb.JobID(streamID))
require.NoError(t, err)
payload := pj.Payload()
require.ErrorIs(t, h.SysServer.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ptp := h.SysServer.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
_, err = ptp.GetRecord(ctx, txn, payload.GetStreamReplication().ProtectedTimestampRecordID)
return err
}), protectedts.ErrNotExists)
}

for _, tc := range []struct {
testName string
successfulIngestion bool
}{
{"complete-with-successful-ingestion", true},
{"complete-without-successful-ingestion", false},
} {
t.Run(tc.testName, func(t *testing.T) {
testCompleteStreamReplication(t, tc.successfulIngestion)
})
}
}
23 changes: 17 additions & 6 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,29 @@ func getReplicationStreamSpec(
}

func completeReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID, successfulIngestion bool,
) error {
// Update the producer job that a cutover happens on the consumer side.
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
const useReadLock = false
return registry.UpdateJobWithTxn(evalCtx.Ctx(), jobspb.JobID(streamID), txn, useReadLock,
func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
// Updates the streamingestion status, make the job resumer exit running
// when picking up the new status.
if (md.Status == jobs.StatusRunning || md.Status == jobs.StatusPending) &&
!md.Progress.GetStreamReplication().IngestionCutOver {
p := md.Progress
p.GetStreamReplication().IngestionCutOver = true
ju.UpdateProgress(p)
md.Progress.GetStreamReplication().StreamIngestionStatus ==
jobspb.StreamReplicationProgress_NOT_FINISHED {
if successfulIngestion {
md.Progress.GetStreamReplication().StreamIngestionStatus =
jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY
md.Progress.RunningStatus = "succeeding this producer job as the corresponding " +
"stream ingestion finished successfully"
} else {
md.Progress.GetStreamReplication().StreamIngestionStatus =
jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY
md.Progress.RunningStatus = "canceling this producer job as the corresponding " +
"stream ingestion did not finish successfully"
}
ju.UpdateProgress(md.Progress)
}
return nil
})
Expand Down
11 changes: 9 additions & 2 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,15 @@ message StreamReplicationProgress {
// Expiration timestamp of consumer heartbeat
google.protobuf.Timestamp expiration = 1 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];

// If the ingestion side has been cut over.
bool ingestion_cut_over = 2;
enum StreamIngestionStatus {
NOT_FINISHED = 0;
FINISHED_SUCCESSFULLY = 1;
FINISHED_UNSUCCESSFULLY = 2;
}

// Status of the corresponding stream ingestion. The producer job tracks this
// to determine its fate.
StreamIngestionStatus stream_ingestion_status = 2;
}

message SchedulePTSChainingRecord {
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/sem/builtins/replication_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ var replicationBuiltins = map[string]builtinDefinition{
tree.Overload{
Types: tree.ArgTypes{
{"stream_id", types.Int},
{"successful_ingestion", types.Bool},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
Expand All @@ -307,13 +308,15 @@ var replicationBuiltins = map[string]builtinDefinition{
}

streamID := int64(tree.MustBeDInt(args[0]))
if err := mgr.CompleteReplicationStream(evalCtx, evalCtx.Txn, streaming.StreamID(streamID)); err != nil {
successfulIngestion := bool(tree.MustBeDBool(args[1]))
if err := mgr.CompleteReplicationStream(evalCtx, evalCtx.Txn,
streaming.StreamID(streamID), successfulIngestion); err != nil {
return nil, err
}
return tree.NewDInt(tree.DInt(streamID)), err
},
Info: "This function can be used on the producer side to complete and clean up a replication stream " +
"after the consumer receives a cutover event and finishes the ingestion",
Info: "This function can be used on the producer side to complete and clean up a replication stream." +
"'successful_ingestion' indicates whether the stream ingestion finished successfully.",
Volatility: volatility.Volatile,
},
),
Expand Down
4 changes: 3 additions & 1 deletion pkg/streaming/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ type ReplicationStreamManager interface {
) (*streampb.ReplicationStreamSpec, error)

// CompleteReplicationStream completes a replication stream job on the producer side.
// 'successfulIngestion' indicates whether the stream ingestion finished successfully and
// determines the fate of the producer job, succeeded or canceled.
CompleteReplicationStream(
evalCtx *eval.Context, txn *kv.Txn, streamID StreamID,
evalCtx *eval.Context, txn *kv.Txn, streamID StreamID, successfulIngestion bool,
) error
}

Expand Down

0 comments on commit 1b4a2c4

Please sign in to comment.