Skip to content

Commit

Permalink
c2c: pass statement transaction when loading producer side job
Browse files Browse the repository at this point in the history
This patch fixes a small bug where we previously used a new txn to read from
the producer job instead of the txn associated with the crdb_internal builtin
call.

Epic: none

Release note: none
  • Loading branch information
msbutler committed Aug 1, 2023
1 parent 0e915fb commit ffea5af
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
2 changes: 0 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ subtest replication-builtins

user root

query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): unimplemented
SELECT crdb_internal.stream_ingestion_stats_json(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ INSERT INTO d.t2 VALUES (2);
_, err = client.Plan(ctx, 999)
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)

var telemetryJobID int64
h.SysSQL.QueryRow(t, "SELECT crdb_internal.create_sql_schema_telemetry_job()").Scan(&telemetryJobID)
_, err = client.Plan(ctx, streampb.StreamID(telemetryJobID))
require.True(t, testutils.IsError(err, fmt.Sprintf("job with id %d is not a replication stream job", telemetryJobID)), err)

expectStreamState(streamID, jobs.StatusRunning)
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *replicationStreamManagerImpl) StreamPartition(
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
ctx context.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
return getReplicationStreamSpec(ctx, r.evalCtx, r.txn, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func heartbeatReplicationStream(
// job progress.
if frontier == hlc.MaxTimestamp {
var status streampb.StreamReplicationStatus
pj, err := execConfig.JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
pj, err := execConfig.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn)
if jobs.HasJobNotFoundError(err) || testutils.IsError(err, "not found in system.jobs table") {
status.StreamStatus = streampb.StreamReplicationStatus_STREAM_INACTIVE
return status, nil
Expand Down Expand Up @@ -219,13 +219,13 @@ func heartbeatReplicationStream(

// getReplicationStreamSpec gets a replication stream specification for the specified stream.
func getReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID,
ctx context.Context, evalCtx *eval.Context, txn isql.Txn, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
// Returns error if the replication stream is not active
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(ctx, jobspb.JobID(streamID))
j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn)
if err != nil {
return nil, errors.Wrapf(err, "replication stream %d has error", streamID)
return nil, errors.Wrapf(err, "could not load job for replication stream %d", streamID)
}
if j.Status() != jobs.StatusRunning {
return nil, errors.Errorf("replication stream %d is not running", streamID)
Expand Down

0 comments on commit ffea5af

Please sign in to comment.