diff --git a/client/cherami/consumer.go b/client/cherami/consumer.go index 97fa4bb..d843a7b 100644 --- a/client/cherami/consumer.go +++ b/client/cherami/consumer.go @@ -58,6 +58,7 @@ type ( connections map[string]*outputHostConnection wsConnector WSConnector reconfigurable *reconfigurable + wg sync.WaitGroup } deliveryID struct { @@ -157,7 +158,8 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) { c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, int64(len(c.connections))) c.reconfigurable = newReconfigurable(c.reconfigureCh, c.closingCh, c.reconfigureConsumer, c.logger, c.options.ReconfigurationPollingInterval) - go c.reconfigurable.reconfigurePump() + c.wg.Add(1) + go c.reconfigurable.reconfigurePump(&c.wg) c.opened = true } @@ -166,18 +168,14 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) { } func (c *consumerImpl) Close() { - // TODO: ideally this should be synchronized, i.e. wait until all - // connections are properly shutdown, so that we make sure that - // nothing gets written to c.deliveyCh afterwards, because owner of that - // channel would likely close it after Close() returns. - if atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) { - close(c.closingCh) - } else { + + if !atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) { return } + close(c.closingCh) + c.lk.Lock() - defer c.lk.Unlock() if c.connections != nil { for _, outputHostConn := range c.connections { outputHostConn.close() @@ -187,9 +185,13 @@ func (c *consumerImpl) Close() { if c.ackConnection != nil { c.ackConnection.Close() + c.ackConnection = nil } c.opened = false + c.lk.Unlock() + + c.wg.Wait() } func (c *consumerImpl) Pause() { diff --git a/client/cherami/publisher.go b/client/cherami/publisher.go index 37b116a..c4629da 100644 --- a/client/cherami/publisher.go +++ b/client/cherami/publisher.go @@ -139,7 +139,7 @@ func (s *publisherImpl) Open() error { s.reporter.UpdateGauge(metrics.PublishNumConnections, nil, int64(len(s.connections))) s.reconfigurable = newReconfigurable(s.reconfigureCh, s.closingCh, s.reconfigurePublisher, s.logger, s.reconfigurationPollingInterval) - go s.reconfigurable.reconfigurePump() + go s.reconfigurable.reconfigurePump(nil) s.opened = true s.logger.Info("Publisher Opened.") diff --git a/client/cherami/reconfigurable.go b/client/cherami/reconfigurable.go index b3a03a3..4589e7a 100644 --- a/client/cherami/reconfigurable.go +++ b/client/cherami/reconfigurable.go @@ -21,6 +21,7 @@ package cherami import ( + "sync" "time" "github.com/uber-common/bark" @@ -68,7 +69,14 @@ func newReconfigurable(reconfigureCh <-chan reconfigureInfo, closingCh chan stru return r } -func (s *reconfigurable) reconfigurePump() { +func (s *reconfigurable) reconfigurePump(wg *sync.WaitGroup) { + + defer func() { + if wg != nil { + wg.Done() + } + }() + s.logger.Info("Reconfiguration pump started.") heartbeat := time.NewTicker(s.pollingInterval) limiter := time.NewTicker(limiterDuration) diff --git a/client/cherami/tchanPublisher.go b/client/cherami/tchanPublisher.go index a085333..11c5a6d 100644 --- a/client/cherami/tchanPublisher.go +++ b/client/cherami/tchanPublisher.go @@ -127,7 +127,7 @@ func (p *tchannelBatchPublisher) Open() error { p.reporter.UpdateGauge(metrics.PublishNumConnections, nil, int64(len(hostAddrs))) p.reconfigurable = newReconfigurable(p.reconfigureCh, p.closeCh, p.reconfigureHandler, p.logger, p.reconfigurationPollingInterval) - go p.reconfigurable.reconfigurePump() + go p.reconfigurable.reconfigurePump(nil) go p.processor() atomic.StoreInt32(&p.opened, 1) p.logger.WithField(`endpoints`, endpoints.addrs).Info("Publisher Opened.")