From 9bde78612e41b8024ff6fd6e4fcf192ad45c0e00 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 17 Jan 2019 17:10:45 -0800 Subject: [PATCH] changefeedccl: deflake TestChangefeedRetryableSinkError This channel wasn't getting closed if the stopper rejected the async task, which casued `(*changeAggregator).close()` to hang. Release note: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5a71bcf3255a..5d6392970e1f 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -166,8 +166,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) - ca.pollerDoneCh = make(chan struct{}) + if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) { + ca.pollerDoneCh = make(chan struct{}) defer close(ca.pollerDoneCh) var err error if storage.RangefeedEnabled.Get(&ca.flowCtx.Settings.SV) {