Skip to content

Commit

Permalink
health: fail health if any task is pending
Browse files Browse the repository at this point in the history
Fixes a bug where an allocation is considered healthy if some of the
tasks are being restarted and as such, their checks aren't tracked by
consul agent client.

Here, we fix the immediate case by ensuring that an alloc is healthy
only if tasks are running and the registered checks at the time are
healthy.

Previously, health tracker tracked task "health" independently from
checks and leads to problems when a task restarts.  Consider the
following series of events:

1. all tasks start running -> `tracker.tasksHealthy` is true
2. one task has unhealthy checks and get restarted
3. remaining checks are healthy -> `tracker.checksHealthy` is true
4. propagate health status now that `tracker.tasksHealthy` and
`tracker.checksHealthy`.

This change ensures that we accurately use the latest status of tasks
and checks regardless of their status changes.

Also, ensures that we only consider check health after tasks are
considered healthy, otherwise we risk trusting incomplete checks.

This approach accomodates task dependencies well.  Service jobs can have
prestart short-lived tasks that will terminate before main process runs.
These dead tasks that complete successfully will not negate health
status.
  • Loading branch information
Mahmood Ali committed Mar 18, 2020
1 parent b9e3e12 commit 2cbca7b
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 107 deletions.
44 changes: 31 additions & 13 deletions client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Tracker struct {
// considered healthy
minHealthyTime time.Duration

// checkLookupInterval is the interval at which we check if the
// Consul checks are healthy or unhealthy.
checkLookupInterval time.Duration

// useChecks specifies whether to use Consul healh checks or not
useChecks bool

Expand Down Expand Up @@ -92,15 +96,16 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A
// this struct should pass in an appropriately named
// sub-logger.
t := &Tracker{
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
logger: logger,
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
checkLookupInterval: consulCheckLookupInterval,
logger: logger,
}

t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
Expand Down Expand Up @@ -171,6 +176,12 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
defer t.l.Unlock()
t.tasksHealthy = healthy

// if unhealthy, force waiting for new checks health status
if !terminal && !healthy {
t.checksHealthy = false
return
}

// If we are marked healthy but we also require Consul to be healthy and it
// isn't yet, return, unless the task is terminal
requireConsul := t.useChecks && t.consulCheckCount > 0
Expand All @@ -191,10 +202,13 @@ func (t *Tracker) setTaskHealth(healthy, terminal bool) {
func (t *Tracker) setCheckHealth(healthy bool) {
t.l.Lock()
defer t.l.Unlock()
t.checksHealthy = healthy

// check health should always be false if tasks are unhealthy
// as checks might be missing from unhealthy tasks
t.checksHealthy = healthy && t.tasksHealthy

// Only signal if we are healthy and so is the tasks
if !healthy || !t.tasksHealthy {
if !t.checksHealthy {
return
}

Expand Down Expand Up @@ -256,10 +270,11 @@ func (t *Tracker) watchTaskEvents() {
return
}

if state.State != structs.TaskStateRunning {
if state.State == structs.TaskStatePending {
latestStartTime = time.Time{}
break
} else if state.StartedAt.After(latestStartTime) {
// task is either running or exited successfully
latestStartTime = state.StartedAt
}
}
Expand All @@ -276,6 +291,9 @@ func (t *Tracker) watchTaskEvents() {
}

if !latestStartTime.Equal(allStartedTime) {
// reset task health
t.setTaskHealth(false, false)

// Avoid the timer from firing at the old start time
if !healthyTimer.Stop() {
select {
Expand Down Expand Up @@ -310,7 +328,7 @@ func (t *Tracker) watchTaskEvents() {
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in
// Consul
checkTicker := time.NewTicker(consulCheckLookupInterval)
checkTicker := time.NewTicker(t.checkLookupInterval)
defer checkTicker.Stop()

// healthyTimer fires when the checks have been healthy for the
Expand Down
229 changes: 229 additions & 0 deletions client/allochealth/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package allochealth

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestTracker_Checks_Healthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]

// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}

// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}

reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}

return reg, nil
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

select {
case <-time.After(4 * checkInterval):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}

func TestTracker_Checks_Unhealthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]

newCheck := task.Services[0].Checks[0].Copy()
newCheck.Name = "failing-check"
task.Services[0].Checks = append(task.Services[0].Checks, newCheck)

// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}

// Make Consul response
checkHealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
checksUnhealthy := &consulapi.AgentCheck{
Name: task.Services[0].Checks[1].Name,
Status: consulapi.HealthCritical,
}
taskRegs := map[string]*agentconsul.ServiceRegistrations{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{checkHealthy, checksUnhealthy},
},
},
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

// Don't reply on the first call
var called uint64
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if atomic.AddUint64(&called, 1) == 1 {
return nil, nil
}

reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}

return reg, nil
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
time.Millisecond, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

testutil.WaitForResult(func() (bool, error) {
lookup := atomic.LoadUint64(&called)
return lookup < 4, fmt.Errorf("wait to get more task registration lookups: %v", lookup)
}, func(err error) {
require.NoError(t, err)
})

tracker.l.Lock()
require.False(t, tracker.checksHealthy)
tracker.l.Unlock()

select {
case v := <-tracker.HealthyCh():
require.Failf(t, "expected no health value", " got %v", v)
default:
// good
}
}

func TestTracker_Healthy_IfBothTasksAndConsulChecksAreHealthy(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
logger := testlog.HCLogger(t)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

tracker := NewTracker(ctx, logger, alloc, nil, nil,
time.Millisecond, true)

assertNoHealth := func() {
require.NoError(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.Failf(t, "unexpected healthy event", "got %v", v)
default:
}
}

// first set task health without checks
tracker.setTaskHealth(true, false)
assertNoHealth()

// now fail task health again before checks are successful
tracker.setTaskHealth(false, false)
assertNoHealth()

// now pass health checks - do not propagate health yet
tracker.setCheckHealth(true)
assertNoHealth()

// set tasks to healthy - don't propagate health yet, wait for the next check
tracker.setTaskHealth(true, false)
assertNoHealth()

// set checks to true, now propagate health status
tracker.setCheckHealth(true)

require.Error(t, tracker.ctx.Err())
select {
case v := <-tracker.HealthyCh():
require.True(t, v)
default:
require.Fail(t, "expected a health status")
}
}
Loading

0 comments on commit 2cbca7b

Please sign in to comment.