Skip to content

Commit

Permalink
streamingest: fix early span termination
Browse files Browse the repository at this point in the history
This processor was calling InternalClose() while goroutines were still
running. InternalClose() finishes the processor's tracing span, which is
used by those goroutines. We don't like span use-after-Finish.

Release note: None
  • Loading branch information
andreimatei committed Nov 11, 2021
1 parent cc03770 commit 42db1dd
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
eventChs[id] = eventCh
errChs[id] = errCh
}
sip.eventCh, err = sip.merge(ctx, eventChs, errChs)
if err != nil {
sip.MoveToDraining(err)
return
}
sip.eventCh = sip.merge(ctx, eventChs, errChs)
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -295,20 +291,26 @@ func (sip *streamIngestionProcessor) ConsumerClosed() {
}

func (sip *streamIngestionProcessor) close() {
if sip.InternalClose() {
if sip.batcher != nil {
sip.batcher.Close()
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
close(sip.closePoller)
// Wait for the processor goroutine to return so that we do not access
// processor state once it has shutdown.
sip.pollingWaitGroup.Wait()
// Wait for the merge goroutine.
if sip.Closed {
return
}

if sip.batcher != nil {
sip.batcher.Close()
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
}
close(sip.closePoller)
// Wait for the processor goroutine to return so that we do not access
// processor state once it has shutdown.
sip.pollingWaitGroup.Wait()
// Wait for the merge goroutine.
if sip.cancelMergeAndWait != nil {
sip.cancelMergeAndWait()
}

sip.InternalClose()
}

// checkForCutoverSignal periodically loads the job progress to check for the
Expand Down Expand Up @@ -369,18 +371,25 @@ func (sip *streamIngestionProcessor) merge(
ctx context.Context,
partitionStreams map[string]chan streamingccl.Event,
errorStreams map[string]chan error,
) (chan partitionEvent, error) {
) chan partitionEvent {
merged := make(chan partitionEvent)

ctx, cancel := context.WithCancel(ctx)
g := ctxgroup.WithContext(ctx)

sip.cancelMergeAndWait = func() {
cancel()
// Wait until the merged channel is closed by the goroutine above.
for range merged {
}
}

for partition, eventCh := range partitionStreams {
partition := partition
eventCh := eventCh
errCh, ok := errorStreams[partition]
if !ok {
return nil, errors.Newf("could not find error channel for partition %q", partition)
log.Fatalf(ctx, "could not find error channel for partition %q", partition)
}
g.GoCtx(func(ctx context.Context) error {
ctxDone := ctx.Done()
Expand Down Expand Up @@ -417,14 +426,7 @@ func (sip *streamIngestionProcessor) merge(
close(merged)
}()

sip.cancelMergeAndWait = func() {
cancel()
// Wait until the merged channel is closed by the goroutine above.
for range merged {
}
}

return merged, nil
return merged
}

// consumeEvents handles processing events on the merged event queue and returns
Expand Down

0 comments on commit 42db1dd

Please sign in to comment.