Skip to content

Commit

Permalink
streamingccl: use roachpb.TenantID in job proto
Browse files Browse the repository at this point in the history
Not bothering with dual write/read old and new field, since there is no
backwards compat concern for this unlaunched feature.

Release note: none.
  • Loading branch information
dt committed Dec 14, 2021
1 parent 112b5b9 commit 080ffae
Show file tree
Hide file tree
Showing 4 changed files with 530 additions and 510 deletions.
4 changes: 1 addition & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

tenantID := roachpb.MakeTenantID(details.TenantID)

// Start ingesting KVs from the replication stream.
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
err := ingest(resumeCtx, p, streamAddress, tenantID, details.StartTime, s.job.Progress(), s.job.ID())
err := ingest(resumeCtx, p, streamAddress, details.TenantID, details.StartTime, s.job.Progress(), s.job.ID())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ingestionPlanHook(

streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
TenantID: ingestionStmt.Targets.Tenant.ToUint64(),
TenantID: ingestionStmt.Targets.Tenant,
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
StartTime: startTime,
}
Expand Down
Loading

0 comments on commit 080ffae

Please sign in to comment.