Skip to content

Commit

Permalink
consul: periodically reconcile services/checks
Browse files Browse the repository at this point in the history
Periodically sync services and checks from Nomad to Consul. This is
mostly useful when testing with the Consul dev agent which does not
persist state across restarts. However, this is a reasonable safety
measure to prevent skew between Consul's state and Nomad's
services+checks.

Also modernized the test suite a bit.
  • Loading branch information
schmichael committed Apr 19, 2018
1 parent 45d0c88 commit 972e861
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 46 deletions.
28 changes: 26 additions & 2 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ type MockAgent struct {
// maps of what services and checks have been registered
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
mu sync.Mutex

// hits is the total number of times agent methods have been called
hits int

// mu guards above fields
mu sync.Mutex

// when UpdateTTL is called the check ID will have its counter inc'd
checkTTLs map[string]int
Expand All @@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent {
}
}

// getHits returns how many Consul Agent API calls have been made.
func (c *MockAgent) getHits() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.hits
}

// SetStatus that Checks() should return. Returns old status value.
func (c *MockAgent) SetStatus(s string) string {
c.mu.Lock()
Expand All @@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string {
}

func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

s := map[string]map[string]interface{}{
"Member": {
"Addr": "127.0.0.1",
Expand All @@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentService, len(c.services))
for k, v := range c.services {
Expand All @@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

r := make(map[string]*api.AgentCheck, len(c.checks))
for k, v := range c.checks {
Expand All @@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()

regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
Expand All @@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

c.checks[check.ID] = check

// Be nice and make checks reachable-by-service
Expand All @@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
func (c *MockAgent) CheckDeregister(checkID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.checks, checkID)
delete(c.checkTTLs, checkID)
return nil
Expand All @@ -155,20 +175,24 @@ func (c *MockAgent) CheckDeregister(checkID string) error {
func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
c.services[service.ID] = service
return nil
}

func (c *MockAgent) ServiceDeregister(serviceID string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++
delete(c.services, serviceID)
return nil
}

func (c *MockAgent) UpdateTTL(id string, output string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.hits++

check, ok := c.checks[id]
if !ok {
return fmt.Errorf("unknown check id: %q", id)
Expand Down
17 changes: 9 additions & 8 deletions command/agent/consul/check_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {

// testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test
// logger and faster poll frequency.
func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) {
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
fakeAPI := newFakeChecksAPI()
cw := newCheckWatcher(testLogger(), fakeAPI)
cw := newCheckWatcher(testlog.Logger(t), fakeAPI)
cw.pollFreq = 10 * time.Millisecond
return fakeAPI, cw
}
Expand All @@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
check := testCheck()
check.CheckRestart = nil

cw := newCheckWatcher(testLogger(), newFakeChecksAPI())
cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI())
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)

Expand All @@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
func TestCheckWatcher_Healthy(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1)
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) {
func TestCheckWatcher_HealthyWarning(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) {
func TestCheckWatcher_Flapping(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Grace = 0
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) {
func TestCheckWatcher_Unwatch(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

// Unwatch immediately
check1 := testCheck()
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) {
func TestCheckWatcher_MultipleChecks(t *testing.T) {
t.Parallel()

fakeAPI, cw := testWatcherSetup()
fakeAPI, cw := testWatcherSetup(t)

check1 := testCheck()
check1.CheckRestart.Limit = 1
Expand Down
26 changes: 20 additions & 6 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time.Second

// defaultPeriodicalInterval is the interval at which the service
// client reconciles state between the desired services and checks and
// what's actually registered in Consul. This is done at an interval,
// rather than being purely edge triggered, to handle the case that the
// Consul agent's state may change underneath us
defaultPeriodicInterval = 30 * time.Second

// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
Expand Down Expand Up @@ -190,6 +197,7 @@ type ServiceClient struct {
logger *log.Logger
retryInterval time.Duration
maxRetryInterval time.Duration
periodicInterval time.Duration

// exitCh is closed when the main Run loop exits
exitCh chan struct{}
Expand Down Expand Up @@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
Expand Down Expand Up @@ -279,7 +288,6 @@ func (c *ServiceClient) Run() {

// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
hasOps := false
INIT:
for {
select {
Expand All @@ -289,7 +297,6 @@ INIT:
case <-c.shutdownCh:
return
case ops := <-c.opCh:
hasOps = true
c.merge(ops)
}
}
Expand All @@ -299,11 +306,8 @@ INIT:
// Start checkWatcher
go c.checkWatcher.Run(ctx)

// Always immediately sync to reconcile Nomad and Consul's state
retryTimer := time.NewTimer(0)
if !hasOps {
// No pending operations so don't immediately sync
<-retryTimer.C
}

failures := 0
for {
Expand Down Expand Up @@ -345,6 +349,16 @@ INIT:
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
failures = 0
}

// Reset timer to periodic interval to periodically
// reconile with Consul
if !retryTimer.Stop() {
select {
case <-retryTimer.C:
default:
}
}
retryTimer.Reset(c.periodicInterval)
}

select {
Expand Down
11 changes: 6 additions & 5 deletions command/agent/consul/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
exec := newBlockingScriptExec()

// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil)
handle := check.run()

// wait until Exec is called
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
exec := newBlockingScriptExec()

hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup

Expand Down Expand Up @@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) {
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
handle := check.run()
defer handle.cancel()

Expand Down
Loading

0 comments on commit 972e861

Please sign in to comment.