diff --git a/client/cherami/consumer.go b/client/cherami/consumer.go index d843a7b..2effe55 100644 --- a/client/cherami/consumer.go +++ b/client/cherami/consumer.go @@ -169,29 +169,29 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) { func (c *consumerImpl) Close() { - if !atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) { - return - } + if atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) { - close(c.closingCh) + close(c.closingCh) - c.lk.Lock() - if c.connections != nil { - for _, outputHostConn := range c.connections { - outputHostConn.close() + c.lk.Lock() + if c.connections != nil { + for _, outputHostConn := range c.connections { + outputHostConn.close() + } + c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0) } - c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0) - } - if c.ackConnection != nil { - c.ackConnection.Close() - c.ackConnection = nil - } + if c.ackConnection != nil { + c.ackConnection.Close() + c.ackConnection = nil + } - c.opened = false - c.lk.Unlock() + c.opened = false + c.lk.Unlock() + } c.wg.Wait() + return } func (c *consumerImpl) Pause() {