Skip to content

Commit

Permalink
Makes the flood goroutine more reusable.
Browse files Browse the repository at this point in the history
  • Loading branch information
slackpad committed Mar 15, 2017
1 parent 6b02a4d commit c2e0469
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 43 deletions.
68 changes: 68 additions & 0 deletions consul/flood.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package consul

import (
"time"

"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/serf/serf"
)

// FloodNotify lets all the waiting Flood goroutines know that some change may
// have affected them.
func (s *Server) FloodNotify() {
s.floodLock.RLock()
defer s.floodLock.RUnlock()

for _, ch := range s.floodCh {
select {
case ch <- struct{}{}:
default:
}
}
}

// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
s.floodLock.Unlock()

ticker := time.NewTicker(s.config.SerfFloodInterval)
defer ticker.Stop()
defer func() {
s.floodLock.Lock()
defer s.floodLock.Unlock()

for i, ch := range s.floodCh {
if ch == floodCh {
s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...)
return
}
}
panic("flood channels out of sync")
}()

for {
WAIT:
select {
case <-s.serfLAN.ShutdownCh():
return

case <-global.ShutdownCh():
return

case <-ticker.C:
goto FLOOD

case <-floodCh:
goto FLOOD
}
goto WAIT

FLOOD:
servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
}
}
7 changes: 2 additions & 5 deletions consul/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,8 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
s.maybeBootstrap()
}

// Kick the WAN flooder.
select {
case s.floodCh <- struct{}{}:
default:
}
// Kick the join flooders.
s.FloodNotify()
}
}

Expand Down
48 changes: 11 additions & 37 deletions consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf

// floodCh is kicked whenever we should try to flood LAN servers into
// the WAN.
floodCh chan struct{}
// floodLock controls access to floodCh.
floodLock sync.RWMutex
floodCh []chan struct{}

// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
Expand Down Expand Up @@ -258,7 +258,6 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
floodCh: make(chan struct{}),
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}
Expand Down Expand Up @@ -318,40 +317,15 @@ func NewServer(config *Config) (*Server, error) {
}
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)

// Fire up the LAN <-> WAN Serf join flooder.
go func() {
ticker := time.NewTicker(config.SerfFloodInterval)
defer ticker.Stop()

portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}

for {
WAIT:
select {
case <-s.serfLAN.ShutdownCh():
return

case <-s.serfWAN.ShutdownCh():
return

case <-ticker.C:
goto FLOOD

case <-s.floodCh:
goto FLOOD
}
goto WAIT

FLOOD:
servers.FloodJoins(s.logger, portFn, config.Datacenter, s.serfLAN, s.serfWAN)
}
}()
}
go s.Flood(portFn, s.serfWAN)

// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
Expand Down
6 changes: 5 additions & 1 deletion consul/servers/serf_flooder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn,
}

// Do the join!
if _, err := globalSerf.Join([]string{addr}, true); err != nil {
n, err := globalSerf.Join([]string{addr}, true)
if err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v",
server.Name, addr, err)
} else if n > 0 {
logger.Printf("[INFO] consul: Successfully performed flood-join for %q at %s",
server.Name, addr)
}
}
}

0 comments on commit c2e0469

Please sign in to comment.