diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 8942f8cd60ef..6ae183b80064 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -479,6 +479,11 @@ func (ca *changeAggregator) close() { if ca.Closed { return } + if ca.cancel == nil { + // consumer close may be called even before Start is called. + // If that's the case, cancel is not initialized. + return + } ca.cancel() // Wait for the poller to finish shutting down. if ca.kvFeedDoneCh != nil {