Skip to content

Commit

Permalink
[FABG-726] monitorBlockHeight ticker chan not closed
Browse files Browse the repository at this point in the history
This change adds a monitorBlockHeightDone channel to signal
completion of the goroutine.

Change-Id: I29f4f4db2c168268adff43884783ecd3b7360483
Signed-off-by: Troy Ronda <[email protected]>
  • Loading branch information
troyronda committed Aug 22, 2018
1 parent fa3c843 commit 22fbd8b
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions pkg/fab/events/client/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Dispatcher struct {
connectionRegistration *ConnectionReg
connectionProvider api.ConnectionProvider
discoveryService fab.DiscoveryService
ticker *time.Ticker
monitorBlockHeightDone chan struct{}
peer fab.Peer
lock sync.RWMutex
}
Expand Down Expand Up @@ -82,8 +82,9 @@ func (ed *Dispatcher) HandleStopEvent(e esdispatcher.Event) {
// Remove all registrations and close the associated event channels
// so that the client is notified that the registration has been removed
ed.clearConnectionRegistration()
if ed.ticker != nil {
ed.ticker.Stop()
if ed.monitorBlockHeightDone != nil {
close(ed.monitorBlockHeightDone)
ed.monitorBlockHeightDone = nil
}

ed.Dispatcher.HandleStopEvent(e)
Expand Down Expand Up @@ -146,7 +147,7 @@ func (ed *Dispatcher) HandleDisconnectEvent(e esdispatcher.Event) {
return
}

logger.Debug("Closing connection...")
logger.Debug("Closing connection due to disconnect event...")

ed.connection.Close()
ed.connection = nil
Expand Down Expand Up @@ -183,8 +184,8 @@ func (ed *Dispatcher) HandleConnectedEvent(e esdispatcher.Event) {
}

if ed.reconnectBlockHeightLagThreshold > 0 {
ed.ticker = time.NewTicker(ed.blockHeightMonitorPeriod)
go ed.monitorBlockHeight()
ed.monitorBlockHeightDone = make(chan struct{})
go ed.monitorBlockHeight(ed.monitorBlockHeightDone)
}
}

Expand All @@ -210,8 +211,9 @@ func (ed *Dispatcher) HandleDisconnectedEvent(e esdispatcher.Event) {
logger.Warnf("Disconnected from event server: %s", evt.Err)
}

if ed.ticker != nil {
ed.ticker.Stop()
if ed.monitorBlockHeightDone != nil {
close(ed.monitorBlockHeightDone)
ed.monitorBlockHeightDone = nil
}
}

Expand Down Expand Up @@ -283,19 +285,24 @@ func getMaxBlockHeight(peers []fab.Peer) uint64 {
return maxHeight
}

func (ed *Dispatcher) monitorBlockHeight() {
func (ed *Dispatcher) monitorBlockHeight(done chan struct{}) {
logger.Debugf("Starting block height monitor on channel [%s]. Lag threshold: %d", ed.chConfig.ID(), ed.reconnectBlockHeightLagThreshold)

ticker := time.NewTicker(ed.blockHeightMonitorPeriod)
defer ticker.Stop()

for {
if _, ok := <-ed.ticker.C; !ok {
select {
case <-ticker.C:
if !ed.checkBlockHeight() {
// Disconnected
logger.Debugf("Client on channel [%s] has disconnected - stopping block height monitor", ed.chConfig.ID())
return
}
case <-done:
logger.Debugf("Stopping block height monitor on channel [%s]", ed.chConfig.ID())
return
}
if !ed.checkBlockHeight() {
// Disconnected
logger.Debugf("Client on channel [%s] has disconnected - stopping block height monitor", ed.chConfig.ID())
return
}
}
}

Expand Down

0 comments on commit 22fbd8b

Please sign in to comment.