Skip to content

Commit

Permalink
Merge #36923
Browse files Browse the repository at this point in the history
36923: changefeedccl: fix race in changeAggregator r=nvanbenschoten a=danhhz

This race was introduced in 9bde786,
which fixed a hang in `(*changeAggregator).close` which happened because
the pollerDoneCh never got closed if the RunAsyncTask rejected the work.
We could introduce some locking to fix the race, but better is to just
to close the channel if the RunAsyncTask is not going to.

Closes #35265

Release note: None

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Apr 17, 2019
2 parents 744edaf + f6d1e96 commit 83de585
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// but only the first one is ever used.
ca.errCh = make(chan error, 2)

ca.pollerDoneCh = make(chan struct{})
if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
ca.pollerDoneCh = make(chan struct{})
defer close(ca.pollerDoneCh)
var err error
if PushEnabled.Get(&ca.flowCtx.Settings.SV) {
Expand All @@ -201,6 +201,10 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ca.errCh <- err
ca.cancel()
}); err != nil {
// If err != nil then the RunAsyncTask closure never ran, which means we
// need to manually close ca.pollerDoneCh so `(*changeAggregator).close`
// doesn't hang.
close(ca.pollerDoneCh)
ca.errCh <- err
ca.cancel()
}
Expand Down

0 comments on commit 83de585

Please sign in to comment.