Skip to content

Commit

Permalink
changefeedccl: deflake TestChangefeedRetryableSinkError
Browse files Browse the repository at this point in the history
This channel wasn't getting closed if the stopper rejected the async
task, which casued `(*changeAggregator).close()` to hang.

Release note: None
  • Loading branch information
danhhz committed Jan 18, 2019
1 parent f11a9da commit 9bde786
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
ca.errCh = make(chan error, 2)
ca.pollerDoneCh = make(chan struct{})

if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
ca.pollerDoneCh = make(chan struct{})
defer close(ca.pollerDoneCh)
var err error
if storage.RangefeedEnabled.Get(&ca.flowCtx.Settings.SV) {
Expand Down

0 comments on commit 9bde786

Please sign in to comment.