Skip to content

Commit

Permalink
allow unhealthy canaries without blocking autopromote
Browse files Browse the repository at this point in the history
  • Loading branch information
chuckyz committed Aug 3, 2022
1 parent 696deb9 commit a1d50df
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 3 deletions.
10 changes: 7 additions & 3 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,22 @@ func (w *deploymentWatcher) autoPromoteDeployment(allocs []*structs.AllocListStu
continue
}

if !dstate.AutoPromote || dstate.DesiredCanaries != len(dstate.PlacedCanaries) {
if !dstate.AutoPromote || len(dstate.PlacedCanaries) < dstate.DesiredCanaries {
return nil
}

healthyCanaries := 0
// Find the health status of each canary
for _, c := range dstate.PlacedCanaries {
for _, a := range allocs {
if c == a.ID && !a.DeploymentStatus.IsHealthy() {
return nil
if c == a.ID && a.DeploymentStatus.IsHealthy() {
healthyCanaries += 1
}
}
}
if healthyCanaries != dstate.DesiredCanaries {
return nil
}
}

// Send the request
Expand Down
166 changes: 166 additions & 0 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,172 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) {
require.False(t, b1.DeploymentStatus.Canary)
}

func TestWatcher_AutoPromoteDeployment_WithUnhealthyCanaries(t *testing.T) {
ci.Parallel(t)
w, m := defaultTestDeploymentWatcher(t)
now := time.Now()

// Create 1 UpdateStrategy, 1 job (2 TaskGroups), 2 canaries, and 1 deployment
canaryUpd := structs.DefaultUpdateStrategy.Copy()
canaryUpd.AutoPromote = true
canaryUpd.MaxParallel = 2
canaryUpd.Canary = 2
canaryUpd.ProgressDeadline = 5 * time.Second

rollingUpd := structs.DefaultUpdateStrategy.Copy()
rollingUpd.ProgressDeadline = 5 * time.Second

j := mock.MultiTaskGroupJob()
j.TaskGroups[0].Update = canaryUpd
j.TaskGroups[1].Update = rollingUpd

d := mock.Deployment()
d.JobID = j.ID
// This is created in scheduler.computeGroup at runtime, where properties from the
// UpdateStrategy are copied in
d.TaskGroups = map[string]*structs.DeploymentState{
"web": {
AutoPromote: canaryUpd.AutoPromote,
AutoRevert: canaryUpd.AutoRevert,
ProgressDeadline: canaryUpd.ProgressDeadline,
DesiredTotal: 2,
},
"api": {
AutoPromote: rollingUpd.AutoPromote,
AutoRevert: rollingUpd.AutoRevert,
ProgressDeadline: rollingUpd.ProgressDeadline,
DesiredTotal: 2,
},
}

canaryAlloc := func() *structs.Allocation {
a := mock.Alloc()
a.DeploymentID = d.ID
a.CreateTime = now.UnixNano()
a.ModifyTime = now.UnixNano()
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Canary: true,
}
return a
}

rollingAlloc := func() *structs.Allocation {
a := mock.Alloc()
a.DeploymentID = d.ID
a.CreateTime = now.UnixNano()
a.ModifyTime = now.UnixNano()
a.TaskGroup = "api"
a.AllocatedResources.Tasks["api"] = a.AllocatedResources.Tasks["web"].Copy()
delete(a.AllocatedResources.Tasks, "web")
a.TaskResources["api"] = a.TaskResources["web"].Copy()
delete(a.TaskResources, "web")
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Canary: false,
}
return a
}

// Web taskgroup (0)
ca1 := canaryAlloc()
ca2 := canaryAlloc()
ca3 := canaryAlloc()

// Api taskgroup (1)
ra1 := rollingAlloc()
ra2 := rollingAlloc()

d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID, ca3.ID}
d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2
d.TaskGroups[ra1.TaskGroup].PlacedAllocs = 2
require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), j), "UpsertJob")
require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment")
require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3, ra1, ra2}), "UpsertAllocs")

// =============================================================
// Support method calls

// clear UpdateDeploymentStatus default expectation
m.Mock.ExpectedCalls = nil

matchConfig0 := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusFailed,
StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline,
Eval: true,
}
matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil)

matchConfig1 := &matchDeploymentAllocHealthRequestConfig{
DeploymentID: d.ID,
Healthy: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID},
Eval: true,
}
matcher1 := matchDeploymentAllocHealthRequest(matchConfig1)
m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil)

matchConfig2 := &matchDeploymentPromoteRequestConfig{
Promotion: &structs.DeploymentPromoteRequest{
DeploymentID: d.ID,
All: true,
},
Eval: true,
}
matcher2 := matchDeploymentPromoteRequest(matchConfig2)
m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil)
// =============================================================

// Start the deployment
w.SetEnabled(true, m.state)
testutil.WaitForResult(func() (bool, error) {
w.l.RLock()
defer w.l.RUnlock()
return 1 == len(w.watchers), nil
},
func(err error) {
w.l.RLock()
defer w.l.RUnlock()
require.Equal(t, 1, len(w.watchers), "Should have 1 deployment")
},
)

// Mark the canaries healthy
req := &structs.DeploymentAllocHealthRequest{
DeploymentID: d.ID,
HealthyAllocationIDs: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID},
}
var resp structs.DeploymentUpdateResponse
// Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in
// state.UpdateDeploymentAllocHealth via a raft shim?
err := w.SetAllocHealth(req, &resp)
require.NoError(t, err)

ws := memdb.NewWatchSet()

testutil.WaitForResult(
func() (bool, error) {
ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true)
d = ds[0]
return 2 == d.TaskGroups["web"].HealthyAllocs, nil
},
func(err error) { require.NoError(t, err) },
)

require.Equal(t, 1, len(w.watchers), "Deployment should still be active")
m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2))

require.Equal(t, "running", d.Status)
require.True(t, d.TaskGroups["web"].Promoted)

a1, _ := m.state.AllocByID(ws, ca1.ID)
require.False(t, a1.DeploymentStatus.Canary)
require.Equal(t, "pending", a1.ClientStatus)
require.Equal(t, "run", a1.DesiredStatus)

b1, _ := m.state.AllocByID(ws, ca2.ID)
require.False(t, b1.DeploymentStatus.Canary)
}

// Test pausing a deployment that is running
func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) {
ci.Parallel(t)
Expand Down

0 comments on commit a1d50df

Please sign in to comment.