From 0572cf5ca52a8b5567df4bf6025ef4754fe209d5 Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Wed, 18 Jan 2023 13:37:53 -0500 Subject: [PATCH] changefeedccl: nilsafe kafkaSink.Close We were tolerating nils in other parts of Close, but not here, which can create a race condition if a sink gets closed before being fully initialized resulting in a panic. Fixes #95278 Release note: None --- pkg/ccl/changefeedccl/sink_kafka.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 64b624d0d809..1d09f449232f 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -307,8 +307,10 @@ func (s *kafkaSink) newSyncProducer(client kafkaClient) (sarama.SyncProducer, er // Close implements the Sink interface. func (s *kafkaSink) Close() error { - close(s.stopWorkerCh) - s.worker.Wait() + if s.stopWorkerCh != nil { + close(s.stopWorkerCh) + s.worker.Wait() + } if s.producer != nil { // Ignore errors related to outstanding messages since we're either shutting