Skip to content

Commit

Permalink
changefeedccl: nilsafe kafkaSink.Close
Browse files Browse the repository at this point in the history
We were tolerating nils in other parts of Close, but not here,
which can create a race condition if a sink gets closed before
being fully initialized resulting in a panic.

Fixes #95278
Release note: None
  • Loading branch information
HonoreDB committed Jan 18, 2023
1 parent 0e835dd commit 0572cf5
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ func (s *kafkaSink) newSyncProducer(client kafkaClient) (sarama.SyncProducer, er

// Close implements the Sink interface.
func (s *kafkaSink) Close() error {
close(s.stopWorkerCh)
s.worker.Wait()
if s.stopWorkerCh != nil {
close(s.stopWorkerCh)
s.worker.Wait()
}

if s.producer != nil {
// Ignore errors related to outstanding messages since we're either shutting
Expand Down

0 comments on commit 0572cf5

Please sign in to comment.