Skip to content

Commit

Permalink
streamingccl: remove unnecessary error check
Browse files Browse the repository at this point in the history
This err was already checked above.

Release note: None
  • Loading branch information
stevendanna committed May 31, 2022
1 parent 2585513 commit 0e3a6a3
Showing 1 changed file with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
streamClient = sip.forceClientForTests
log.Infof(ctx, "using testing client")
} else {
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr))
return
}
streamClient, err = streamclient.NewStreamClient(streamingccl.StreamAddress(addr))
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr))
Expand All @@ -259,11 +255,11 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
}

sub, err := streamClient.Subscribe(ctx, streaming.StreamID(sip.spec.StreamID), spec, sip.spec.StartTime)
subscriptions[id] = sub
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr))
return
}
subscriptions[id] = sub
sip.cg.GoCtx(sub.Subscribe)
}
sip.eventCh = sip.merge(ctx, subscriptions)
Expand Down

0 comments on commit 0e3a6a3

Please sign in to comment.