From a1d50df6f6a8c5d0bd49dbc5f3a8cba07a46ff10 Mon Sep 17 00:00:00 2001 From: Charles Zaffery Date: Wed, 3 Aug 2022 16:55:12 -0700 Subject: [PATCH] allow unhealthy canaries without blocking autopromote --- nomad/deploymentwatcher/deployment_watcher.go | 10 +- .../deployments_watcher_test.go | 166 ++++++++++++++++++ 2 files changed, 173 insertions(+), 3 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index bb7bc1f5258..dec0c8f2c95 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -293,18 +293,22 @@ func (w *deploymentWatcher) autoPromoteDeployment(allocs []*structs.AllocListStu continue } - if !dstate.AutoPromote || dstate.DesiredCanaries != len(dstate.PlacedCanaries) { + if !dstate.AutoPromote || len(dstate.PlacedCanaries) < dstate.DesiredCanaries { return nil } + healthyCanaries := 0 // Find the health status of each canary for _, c := range dstate.PlacedCanaries { for _, a := range allocs { - if c == a.ID && !a.DeploymentStatus.IsHealthy() { - return nil + if c == a.ID && a.DeploymentStatus.IsHealthy() { + healthyCanaries += 1 } } } + if healthyCanaries != dstate.DesiredCanaries { + return nil + } } // Send the request diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 467ffcca20b..592af357661 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -696,6 +696,172 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) { require.False(t, b1.DeploymentStatus.Canary) } +func TestWatcher_AutoPromoteDeployment_WithUnhealthyCanaries(t *testing.T) { + ci.Parallel(t) + w, m := defaultTestDeploymentWatcher(t) + now := time.Now() + + // Create 1 UpdateStrategy, 1 job (2 TaskGroups), 2 canaries, and 1 deployment + canaryUpd := structs.DefaultUpdateStrategy.Copy() + canaryUpd.AutoPromote = true + canaryUpd.MaxParallel = 2 + canaryUpd.Canary = 2 + canaryUpd.ProgressDeadline = 5 * time.Second + + rollingUpd := structs.DefaultUpdateStrategy.Copy() + rollingUpd.ProgressDeadline = 5 * time.Second + + j := mock.MultiTaskGroupJob() + j.TaskGroups[0].Update = canaryUpd + j.TaskGroups[1].Update = rollingUpd + + d := mock.Deployment() + d.JobID = j.ID + // This is created in scheduler.computeGroup at runtime, where properties from the + // UpdateStrategy are copied in + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + AutoPromote: canaryUpd.AutoPromote, + AutoRevert: canaryUpd.AutoRevert, + ProgressDeadline: canaryUpd.ProgressDeadline, + DesiredTotal: 2, + }, + "api": { + AutoPromote: rollingUpd.AutoPromote, + AutoRevert: rollingUpd.AutoRevert, + ProgressDeadline: rollingUpd.ProgressDeadline, + DesiredTotal: 2, + }, + } + + canaryAlloc := func() *structs.Allocation { + a := mock.Alloc() + a.DeploymentID = d.ID + a.CreateTime = now.UnixNano() + a.ModifyTime = now.UnixNano() + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Canary: true, + } + return a + } + + rollingAlloc := func() *structs.Allocation { + a := mock.Alloc() + a.DeploymentID = d.ID + a.CreateTime = now.UnixNano() + a.ModifyTime = now.UnixNano() + a.TaskGroup = "api" + a.AllocatedResources.Tasks["api"] = a.AllocatedResources.Tasks["web"].Copy() + delete(a.AllocatedResources.Tasks, "web") + a.TaskResources["api"] = a.TaskResources["web"].Copy() + delete(a.TaskResources, "web") + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Canary: false, + } + return a + } + + // Web taskgroup (0) + ca1 := canaryAlloc() + ca2 := canaryAlloc() + ca3 := canaryAlloc() + + // Api taskgroup (1) + ra1 := rollingAlloc() + ra2 := rollingAlloc() + + d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID, ca3.ID} + d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2 + d.TaskGroups[ra1.TaskGroup].PlacedAllocs = 2 + require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), j), "UpsertJob") + require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3, ra1, ra2}), "UpsertAllocs") + + // ============================================================= + // Support method calls + + // clear UpdateDeploymentStatus default expectation + m.Mock.ExpectedCalls = nil + + matchConfig0 := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusFailed, + StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, + Eval: true, + } + matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0) + m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil) + + matchConfig1 := &matchDeploymentAllocHealthRequestConfig{ + DeploymentID: d.ID, + Healthy: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID}, + Eval: true, + } + matcher1 := matchDeploymentAllocHealthRequest(matchConfig1) + m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil) + + matchConfig2 := &matchDeploymentPromoteRequestConfig{ + Promotion: &structs.DeploymentPromoteRequest{ + DeploymentID: d.ID, + All: true, + }, + Eval: true, + } + matcher2 := matchDeploymentPromoteRequest(matchConfig2) + m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil) + // ============================================================= + + // Start the deployment + w.SetEnabled(true, m.state) + testutil.WaitForResult(func() (bool, error) { + w.l.RLock() + defer w.l.RUnlock() + return 1 == len(w.watchers), nil + }, + func(err error) { + w.l.RLock() + defer w.l.RUnlock() + require.Equal(t, 1, len(w.watchers), "Should have 1 deployment") + }, + ) + + // Mark the canaries healthy + req := &structs.DeploymentAllocHealthRequest{ + DeploymentID: d.ID, + HealthyAllocationIDs: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID}, + } + var resp structs.DeploymentUpdateResponse + // Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in + // state.UpdateDeploymentAllocHealth via a raft shim? + err := w.SetAllocHealth(req, &resp) + require.NoError(t, err) + + ws := memdb.NewWatchSet() + + testutil.WaitForResult( + func() (bool, error) { + ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) + d = ds[0] + return 2 == d.TaskGroups["web"].HealthyAllocs, nil + }, + func(err error) { require.NoError(t, err) }, + ) + + require.Equal(t, 1, len(w.watchers), "Deployment should still be active") + m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)) + + require.Equal(t, "running", d.Status) + require.True(t, d.TaskGroups["web"].Promoted) + + a1, _ := m.state.AllocByID(ws, ca1.ID) + require.False(t, a1.DeploymentStatus.Canary) + require.Equal(t, "pending", a1.ClientStatus) + require.Equal(t, "run", a1.DesiredStatus) + + b1, _ := m.state.AllocByID(ws, ca2.ID) + require.False(t, b1.DeploymentStatus.Canary) +} + // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { ci.Parallel(t)