diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index ed3d637bc062..3b261f17b287 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -246,10 +246,6 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { streamClient = sip.forceClientForTests log.Infof(ctx, "using testing client") } else { - if err != nil { - sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr)) - return - } streamClient, err = streamclient.NewStreamClient(streamingccl.StreamAddress(addr)) if err != nil { sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", spec, addr)) @@ -259,11 +255,11 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { } sub, err := streamClient.Subscribe(ctx, streaming.StreamID(sip.spec.StreamID), spec, sip.spec.StartTime) - subscriptions[id] = sub if err != nil { sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr)) return } + subscriptions[id] = sub sip.cg.GoCtx(sub.Subscribe) } sip.eventCh = sip.merge(ctx, subscriptions)