Skip to content

Commit

Permalink
Merge #82154
Browse files Browse the repository at this point in the history
82154: streamingccl: remove unnecessary error check r=gh-casper a=stevendanna

This err was already checked above.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Jun 7, 2022
2 parents 8344e69 + 0e3a6a3 commit 873c397
Showing 1 changed file with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
streamClient = sip.forceClientForTests
log.Infof(ctx, "using testing client")
} else {
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr))
return
}
streamClient, err = streamclient.NewStreamClient(streamingccl.StreamAddress(addr))
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr))
Expand All @@ -259,11 +255,11 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
}

sub, err := streamClient.Subscribe(ctx, streaming.StreamID(sip.spec.StreamID), spec, sip.spec.StartTime)
subscriptions[id] = sub
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr))
return
}
subscriptions[id] = sub
sip.cg.GoCtx(sub.Subscribe)
}
sip.eventCh = sip.merge(ctx, subscriptions)
Expand Down

0 comments on commit 873c397

Please sign in to comment.