Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Merge pull request #30 from uber/fix-double-close-wait
Browse files Browse the repository at this point in the history
Fix case a second close would not block when the first one is pending
  • Loading branch information
Kiran RG authored Jun 20, 2017
2 parents 5fb91d9 + 5fc9277 commit 648db1d
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions client/cherami/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,29 +169,29 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {

func (c *consumerImpl) Close() {

if !atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) {
return
}
if atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) {

close(c.closingCh)
close(c.closingCh)

c.lk.Lock()
if c.connections != nil {
for _, outputHostConn := range c.connections {
outputHostConn.close()
c.lk.Lock()
if c.connections != nil {
for _, outputHostConn := range c.connections {
outputHostConn.close()
}
c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0)
}
c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0)
}

if c.ackConnection != nil {
c.ackConnection.Close()
c.ackConnection = nil
}
if c.ackConnection != nil {
c.ackConnection.Close()
c.ackConnection = nil
}

c.opened = false
c.lk.Unlock()
c.opened = false
c.lk.Unlock()
}

c.wg.Wait()
return
}

func (c *consumerImpl) Pause() {
Expand Down

0 comments on commit 648db1d

Please sign in to comment.