Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78014: streamingccl: make producer job exit smoothly after ingestion cutover r=gh-casper a=gh-casper

Previously producer job will time out and fail automatically after
ingestion cutover as consumer stops sending heartbeats.
This is not a good UX experience since stream replication is successful
but showed up failed.

This PR adds a new crdb builtin "crdb_internal.complete_replication_stream"
to let consumer send signal to source cluster that ingestion happens.

Closes: #76954
Release justification: Cat 4.
Release note: none.

78302: sql: fix migration with new system.table_statistics column r=rharding6373 a=rharding6373

Before this change, the new `system.table_statistics` column `avgSize`
introduced in version 22.1.12 was appended to the end of the table
during migration, but the system schema had the new column in a
different order. The column was also not added to the existing column
family containing all table columns during migration.

This change fixes both the system schema and the migration commands so
that the column ordering is the same and the new column is added to the
existing column family. Unfortunately, this means that the existing
column family name is unable to be updated to include the column.

Fixes: #77979

Release justification: Fixes a schema migration bug in the
table_statistics table.

Release note: None

78410: changefeedccl: remove tenant timestamp protection gates r=samiskin a=samiskin

Now that protected timestamps function in tenants in 22.1 the pts gates in
changefeeds can be removed.

Resolves #76936

Release justification: low risk change turning off now-unneeded gates
Release note (enterprise change): changefeeds can now protect targets
from gc on user tenants

78445: colexec: use Bytes.Copy instead of Get and Set in most places r=yuzefovich a=yuzefovich

**coldata: fix the usage of Bytes.Copy in CopyWithReorderedSource**

This was the intention but wasn't working because the call happens
inside a separate template.

Release note: None

**colexec: use Bytes.Copy instead of Get and Set in most places**

This commit audits our code for the usage of `Bytes.Get` followed by
`Bytes.Set` pattern and replaces those with `Bytes.Copy` (which is
faster for inlined values) in non-test code.

Release note: None

78456: roachtest: wait for ranges to replicate before filling disk r=tbg a=nicktrav

Currently, the `disk-full` roachtest creates a cluster and immediately
places a ballast file on one node, which causes it to crash. If this
node is the only replica for a range containing a system table, when the
node crashes due to a full disk certain system queries may not complete.
This results in the test being unable to make forward progress, as the
one dead node prevents a system query from completing, and this query
prevents the node from being restarted.

Wait for all ranges to have at least two replicas before placing the
ballast file on the one node.

Touches #78337, #78270.

Release note: None.

78468: sql: return an error when partition spans has no healthy instances r=rharding6373 a=rharding6373

If there are no SQL instances available for planning,
partitionSpansTenant in the DistSQL planner will panic. This PR fixes
the issue so that it instead returns an error if there are no instances
available.

Fixes: #77590

Release justification: Fixes a bug in DistSQL that can cause a panic for
non-system tenants.

Release note: None

Co-authored-by: Casper <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nick Travers <[email protected]>
  • Loading branch information
6 people committed Mar 25, 2022
7 parents 7001882 + 7d2ac30 + 40ffa99 + 25c996f + 8593589 + b2900ab + 56e2c64 commit 2635dc1
Show file tree
Hide file tree
Showing 59 changed files with 384 additions and 266 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,8 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_replication_stream"></a><code>crdb_internal.complete_replication_stream(stream_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to complete and clean up a replication stream after the consumer receives a cutover event and finishes the ingestion</p>
</span></td></tr>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.replication_stream_progress"></a><code>crdb_internal.replication_stream_progress(stream_id: <a href="int.html">int</a>, frontier_ts: <a href="string.html">string</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to heartbeat its replication progress to a replication stream in the source cluster. The returns a StreamReplicationStatus message that indicates stream status (RUNNING, PAUSED, or STOPPED).</p>
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ go_library(
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ func emitResolvedTimestamp(
return nil
}

func shouldProtectTimestamps(codec keys.SQLCodec) bool {
// TODO(smiskin): Remove this restriction once tenant based pts are enabled
return codec.ForSystemTenant()
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp. The progress struct will be
// updated to refer to this new protected timestamp record.
Expand Down
18 changes: 8 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,16 +1514,14 @@ func (cf *changeFrontier) checkpointJobProgress(
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &checkpoint

if shouldProtectTimestamps(cf.flowCtx.Codec()) {
timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}
timestampManager := cf.manageProtectedTimestamps
// TODO(samiskin): Remove this conditional and the associated deprecated
// methods once we're confident in ActiveProtectedTimestampsEnabled
if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) {
timestampManager = cf.deprecatedManageProtectedTimestamps
}
if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}

if updateRunStatus {
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -197,7 +196,7 @@ func changefeedPlanHook(
codec := p.ExecCfg().Codec

activeTimestampProtection := changefeedbase.ActiveProtectedTimestampsEnabled.Get(&p.ExecCfg().Settings.SV)
shouldProtectTimestamp := (activeTimestampProtection || initialScanFromOptions(details.Opts)) && shouldProtectTimestamps(codec)
shouldProtectTimestamp := activeTimestampProtection || initialScanFromOptions(details.Opts)
if shouldProtectTimestamp {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), details.StatementTime, progress.GetChangefeed())
}
Expand Down Expand Up @@ -391,10 +390,6 @@ func createChangefeedJobRecord(
return nil, errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme)
}

if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; shouldProtect && !p.ExecCfg().Codec.ForSystemTenant() {
return nil, errorutil.UnsupportedWithMultiTenancy(67271)
}

if telemetryPath != `` {
// Feature telemetry
telemetrySink := parsedSink.Scheme
Expand Down
6 changes: 0 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,6 @@ func TestChangefeedTenantsExternalIOEnabled(t *testing.T) {
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)

t.Run("sinkful changefeed fails if protect_data_from_gc_on_pause is set", func(t *testing.T) {
tenantSQL.ExpectErr(t, "operation is unsupported in multi-tenancy mode",
`CREATE CHANGEFEED FOR foo_in_tenant INTO 'kafka://does-not-matter' WITH protect_data_from_gc_on_pause`,
)
})

t.Run("sinkful changefeed works", func(t *testing.T) {
f := makeKafkaFeedFactory(&testServerShim{
TestTenantInterface: tenantServer,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Client interface {

// Close releases all the resources used by this client.
Close() error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (sc testStreamClient) Subscribe(
}, nil
}

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
return nil
}

type testStreamSubscription struct {
eventCh chan streamingccl.Event
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ func (p *partitionedStreamClient) Subscribe(
return res, nil
}

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
conn, err := p.srcDB.Conn(ctx)
if err != nil {
return err
}
row := conn.QueryRowContext(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
if row.Err() != nil {
return errors.Wrap(row.Err(), "Error in completing a replication stream")
}
return nil
}

type partitionedStreamSubscription struct {
err error
db *gosql.DB
Expand Down
57 changes: 39 additions & 18 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -63,7 +64,9 @@ func TestPartitionedStreamReplicationClient(t *testing.T) {

ctx := context.Background()
// Makes sure source cluster producer job does not time out within test timeout
h.SysDB.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
h.SysDB.Exec(t, `
SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s';
`)
h.Tenant.SQL.Exec(t, `
CREATE DATABASE d;
CREATE TABLE d.t1(i int primary key, a string, b string);
Expand All @@ -82,37 +85,41 @@ INSERT INTO d.t2 VALUES (2);
[][]string{{string(status)}})
}

id, err := client.Create(ctx, h.Tenant.ID)
streamID, err := client.Create(ctx, h.Tenant.ID)
require.NoError(t, err)
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, h.Tenant.ID)
require.NoError(t, err)

top, err := client.Plan(ctx, id)
top, err := client.Plan(ctx, streamID)
require.NoError(t, err)
require.Equal(t, 1, len(top))
// Plan for a non-existent stream
_, err = client.Plan(ctx, 999)
require.Errorf(t, err, "Replication stream %d not found", 999)
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)

expectStreamState(id, jobs.StatusRunning)
require.NoError(t, client.Heartbeat(ctx, id, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}))
expectStreamState(streamID, jobs.StatusRunning)
require.NoError(t, client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}))

// Pause the underlying producer job of the replication stream
h.SysDB.Exec(t, `PAUSE JOB $1`, id)
expectStreamState(id, jobs.StatusPaused)
require.Errorf(t, client.Heartbeat(ctx, id, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}),
"Replication stream %d is not running, status is STREAM_PAUSED", id)
h.SysDB.Exec(t, `PAUSE JOB $1`, streamID)
expectStreamState(streamID, jobs.StatusPaused)
err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_PAUSED", streamID)), err)

// Cancel the underlying producer job of the replication stream
h.SysDB.Exec(t, `CANCEL JOB $1`, id)
expectStreamState(id, jobs.StatusCanceled)
require.Errorf(t, client.Heartbeat(ctx, id, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}),
"Replication stream %d is not running, status is STREAM_INACTIVE", id)
h.SysDB.Exec(t, `CANCEL JOB $1`, streamID)
expectStreamState(streamID, jobs.StatusCanceled)

err = client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_INACTIVE", streamID)), err)

// Non-existent stream is not active in the source cluster.
require.Errorf(t, client.Heartbeat(ctx, 999, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}),
"Replication stream %d is not running, status is STREAM_INACTIVE", 999)
err = client.Heartbeat(ctx, 999, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.True(t, testutils.IsError(err,
fmt.Sprintf("replication stream %d is not running, status is STREAM_INACTIVE", 999)), err)

// Testing client.Subscribe()
makePartitionSpec := func(tables ...string) *streampb.StreamPartitionSpec {
Expand Down Expand Up @@ -147,7 +154,7 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, subClient.Close())
}()
require.NoError(t, err)
sub, err := subClient.Subscribe(ctx, id, encodeSpec("t1"), hlc.Timestamp{})
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), hlc.Timestamp{})
require.NoError(t, err)

rf := streamingtest.MakeReplicationFeed(t, &subscriptionFeedSource{sub: sub})
Expand All @@ -173,5 +180,19 @@ INSERT INTO d.t2 VALUES (2);

// Test if Subscribe can react to cancellation signal.
cancelFn()
require.Error(t, cg.Wait(), "context canceled")
require.ErrorIs(t, cg.Wait(), context.Canceled)

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999))
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
h.SysDB.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms';
`)
streamID, err = client.Create(ctx, h.Tenant.ID)
require.NoError(t, err)
require.NoError(t, client.Complete(ctx, streamID))
h.SysDB.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
}
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ func (m *randomStreamClient) Subscribe(
}, nil
}

// Complete implements the streamclient.Client interface.
func (m *randomStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
return nil
}

type randomStreamSubscription struct {
receiveFn func(ctx context.Context) error
eventCh chan streamingccl.Event
Expand Down
42 changes: 22 additions & 20 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ingest(
tenantID roachpb.TenantID,
startTime hlc.Timestamp,
progress jobspb.Progress,
jobID jobspb.JobID,
ingestionJobID jobspb.JobID,
) error {
// Initialize a stream client and resolve topology.
client, err := streamclient.NewStreamClient(streamAddress)
Expand Down Expand Up @@ -73,14 +73,26 @@ func ingest(

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, sqlInstanceIDs, initialHighWater, jobID, streamID)
streamAddress, topology, sqlInstanceIDs, initialHighWater, ingestionJobID, streamID)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
return distStreamIngest(ctx, execCtx, sqlInstanceIDs, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec); err != nil {
return err
}

// A nil error is only possible if the job was signaled to cutover and the
// processors shut down gracefully, i.e stopped ingesting any additional
// events from the replication stream. At this point it is safe to revert to
// the cutoff time to leave the cluster in a consistent state.
if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJobID); err != nil {
return err
}
// Completes the producer job in the source cluster.
return client.Complete(ctx, streamID)
}
return errors.CombineErrors(ingestWithClient(), client.Close())
}
Expand All @@ -92,28 +104,18 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter

// Start ingesting KVs from the replication stream.
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
err := ingest(resumeCtx, p, streamAddress, details.TenantID, details.StartTime, s.job.Progress(), s.job.ID())
if err != nil {
return err
}

// A nil error is only possible if the job was signaled to cutover and the
// processors shut down gracefully, i.e stopped ingesting any additional
// events from the replication stream. At this point it is safe to revert to
// the cutoff time to leave the cluster in a consistent state.
// TODO: after this, we need to complete the producer job into "replication complete state" in the future.
return s.revertToCutoverTimestamp(resumeCtx, execCtx)
return ingest(resumeCtx, p, streamAddress, details.TenantID, details.StartTime, s.job.Progress(), s.job.ID())
}

// revertToCutoverTimestamp reads the job progress for the cutover time and
// issues a RevertRangeRequest with the target time set to that cutover time, to
// bring the ingesting cluster to a consistent state.
func (s *streamIngestionResumer) revertToCutoverTimestamp(
ctx context.Context, execCtx interface{},
func revertToCutoverTimestamp(
ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID,
) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, s.job.ID())
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID)
if err != nil {
return err
}
Expand All @@ -122,13 +124,13 @@ func (s *streamIngestionResumer) revertToCutoverTimestamp(
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, s.job.ID())
details, ingestionJobID)
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, s.job.ID())
j.Progress().Progress, ingestionJobID)
}

if sp.StreamIngest.CutoverTime.IsEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (m *mockStreamClient) Close() error {
return nil
}

// Complete implements the streamclient.Client interface.
func (m *mockStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
return nil
}

// errorStreamClient always returns an error when consuming a partition.
type errorStreamClient struct{ mockStreamClient }

Expand All @@ -140,6 +145,11 @@ func (m *errorStreamClient) Subscribe(
return nil, errors.New("this client always returns an error")
}

// Complete implements the streamclient.Client interface.
func (m *errorStreamClient) Complete(ctx context.Context, streamID streaming.StreamID) error {
return nil
}

func TestStreamIngestionProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ func (c *tenantStreamingClusters) cutover(
// Cut over the ingestion job and the job will stop eventually.
c.destSysSQL.Exec(c.t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
// TODO(casper): Make producer job exit normally in the cutover scenario.
c.srcSysSQL.CheckQueryResultsRetry(c.t,
fmt.Sprintf("SELECT status, error FROM [SHOW JOBS] WHERE job_id = %d", producerJobID),
[][]string{{"failed", fmt.Sprintf("replication stream %d timed out", producerJobID)}})
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", producerJobID), [][]string{{"succeeded"}})
}

// Returns producer job ID and ingestion job ID.
Expand Down Expand Up @@ -159,7 +157,7 @@ var srcClusterSetting = `
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING stream_replication.job_liveness_timeout = '3s';
SET CLUSTER SETTING stream_replication.job_liveness_timeout = '20s';
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';
SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s';
`
Expand Down
Loading

0 comments on commit 2635dc1

Please sign in to comment.