Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
wait for reconfigure pump
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Jun 16, 2017
1 parent 8ba539e commit a1bec6b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 12 deletions.
20 changes: 11 additions & 9 deletions client/cherami/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type (
connections map[string]*outputHostConnection
wsConnector WSConnector
reconfigurable *reconfigurable
wg sync.WaitGroup
}

deliveryID struct {
Expand Down Expand Up @@ -157,7 +158,8 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, int64(len(c.connections)))

c.reconfigurable = newReconfigurable(c.reconfigureCh, c.closingCh, c.reconfigureConsumer, c.logger, c.options.ReconfigurationPollingInterval)
go c.reconfigurable.reconfigurePump()
c.wg.Add(1)
go c.reconfigurable.reconfigurePump(&c.wg)

c.opened = true
}
Expand All @@ -166,18 +168,14 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
}

func (c *consumerImpl) Close() {
// TODO: ideally this should be synchronized, i.e. wait until all
// connections are properly shutdown, so that we make sure that
// nothing gets written to c.deliveyCh afterwards, because owner of that
// channel would likely close it after Close() returns.
if atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) {
close(c.closingCh)
} else {

if !atomic.CompareAndSwapInt32(&c.isClosing, 0, 1) {
return
}

close(c.closingCh)

c.lk.Lock()
defer c.lk.Unlock()
if c.connections != nil {
for _, outputHostConn := range c.connections {
outputHostConn.close()
Expand All @@ -187,9 +185,13 @@ func (c *consumerImpl) Close() {

if c.ackConnection != nil {
c.ackConnection.Close()
c.ackConnection = nil
}

c.opened = false
c.lk.Unlock()

c.wg.Wait()
}

func (c *consumerImpl) Pause() {
Expand Down
2 changes: 1 addition & 1 deletion client/cherami/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *publisherImpl) Open() error {
s.reporter.UpdateGauge(metrics.PublishNumConnections, nil, int64(len(s.connections)))

s.reconfigurable = newReconfigurable(s.reconfigureCh, s.closingCh, s.reconfigurePublisher, s.logger, s.reconfigurationPollingInterval)
go s.reconfigurable.reconfigurePump()
go s.reconfigurable.reconfigurePump(nil)

s.opened = true
s.logger.Info("Publisher Opened.")
Expand Down
10 changes: 9 additions & 1 deletion client/cherami/reconfigurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package cherami

import (
"sync"
"time"

"github.com/uber-common/bark"
Expand Down Expand Up @@ -68,7 +69,14 @@ func newReconfigurable(reconfigureCh <-chan reconfigureInfo, closingCh chan stru
return r
}

func (s *reconfigurable) reconfigurePump() {
func (s *reconfigurable) reconfigurePump(wg *sync.WaitGroup) {

defer func() {
if wg != nil {
wg.Done()
}
}()

s.logger.Info("Reconfiguration pump started.")
heartbeat := time.NewTicker(s.pollingInterval)
limiter := time.NewTicker(limiterDuration)
Expand Down
2 changes: 1 addition & 1 deletion client/cherami/tchanPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (p *tchannelBatchPublisher) Open() error {
p.reporter.UpdateGauge(metrics.PublishNumConnections, nil, int64(len(hostAddrs)))

p.reconfigurable = newReconfigurable(p.reconfigureCh, p.closeCh, p.reconfigureHandler, p.logger, p.reconfigurationPollingInterval)
go p.reconfigurable.reconfigurePump()
go p.reconfigurable.reconfigurePump(nil)
go p.processor()
atomic.StoreInt32(&p.opened, 1)
p.logger.WithField(`endpoints`, endpoints.addrs).Info("Publisher Opened.")
Expand Down

0 comments on commit a1bec6b

Please sign in to comment.