Skip to content

Commit

Permalink
Fix regression where canary would not progress past a pause step (#198)
Browse files Browse the repository at this point in the history
* Fix regression where canary would not progress past a pause step
* Simplify logic of syncRolloutStatusCanary. Add unit test
  • Loading branch information
jessesuen authored and dthomson25 committed Oct 15, 2019
1 parent 1a433bc commit 55894e6
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 59 deletions.
3 changes: 2 additions & 1 deletion analysis/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func (f *fixture) runController(analysisRunName string, startInformers bool, exp
actions := filterInformerActions(f.client.Actions())
for i, action := range actions {
if len(f.actions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
actionsBytes, _ := json.Marshal(actions[i:])
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), string(actionsBytes))
break
}

Expand Down
6 changes: 4 additions & 2 deletions experiments/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ func (f *fixture) runController(experimentName string, startInformers bool, expe
actions := filterInformerActions(f.client.Actions())
for i, action := range actions {
if len(f.actions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
actionsBytes, _ := json.Marshal(actions[i:])
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), string(actionsBytes))
break
}

Expand All @@ -377,7 +378,8 @@ func (f *fixture) runController(experimentName string, startInformers bool, expe
k8sActions := filterInformerActions(f.kubeclient.Actions())
for i, action := range k8sActions {
if len(f.kubeactions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), k8sActions[i:])
actionsBytes, _ := json.Marshal(k8sActions[i:])
f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), string(actionsBytes))
break
}

Expand Down
2 changes: 1 addition & 1 deletion metricproviders/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
// JobNameKey is the measurement's metadata key holding the job name associated with the measurement
JobNameKey = "job-name"
// AnalysisRunLabelKey is the job's label key where we label the name of the AnalysisRun associated to it
AnalysisRunLabelKey = "analysisruns.argoproj.io/name"
AnalysisRunLabelKey = "analysisrun.argoproj.io/name"
)

var (
Expand Down
79 changes: 40 additions & 39 deletions rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func completedCurrentCanaryStep(olderRSs []*appsv1.ReplicaSet, newRS *appsv1.Rep
return false
}
if currentStep.Pause != nil {
return completedPauseStep(r, currentStep.Pause)
return completedPauseStep(r, *currentStep.Pause)
}
if currentStep.SetWeight != nil && replicasetutil.AtDesiredReplicaCountsForCanary(r, newRS, stableRS, olderRSs) {
logCtx.Info("Rollout has reached the desired state for the correct weight")
Expand Down Expand Up @@ -274,54 +274,55 @@ func (c *RolloutController) syncRolloutStatusCanary(olderRSs []*appsv1.ReplicaSe
if currBackgroundAr.Status == nil || !currBackgroundAr.Status.Status.Completed() || analysisutil.IsTerminating(currBackgroundAr) {
newStatus.Canary.CurrentBackgroundAnalysisRun = currBackgroundAr.Name
}

}

if !r.Spec.Paused {
if stepCount == 0 {
logCtx.Info("Rollout has no steps")
if newRS != nil && newRS.Status.AvailableReplicas == defaults.GetRolloutReplicasOrDefault(r) {
logCtx.Info("New RS has successfully progressed")
newStatus.Canary.StableRS = newStatus.CurrentPodHash
}
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
if stepCount == 0 {
logCtx.Info("Rollout has no steps")
if newRS != nil && newRS.Status.AvailableReplicas == defaults.GetRolloutReplicasOrDefault(r) {
logCtx.Info("New RS has successfully progressed")
newStatus.Canary.StableRS = newStatus.CurrentPodHash
}
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
}

if *currentStepIndex == stepCount {
logCtx.Info("Rollout has executed every step")
newStatus.CurrentStepIndex = &stepCount
if newRS != nil && newRS.Status.AvailableReplicas == defaults.GetRolloutReplicasOrDefault(r) {
logCtx.Info("New RS has successfully progressed")
newStatus.Canary.StableRS = newStatus.CurrentPodHash
}
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
if *currentStepIndex == stepCount {
logCtx.Info("Rollout has executed every step")
newStatus.CurrentStepIndex = &stepCount
if newRS != nil && newRS.Status.AvailableReplicas == defaults.GetRolloutReplicasOrDefault(r) {
logCtx.Info("New RS has successfully progressed")
newStatus.Canary.StableRS = newStatus.CurrentPodHash
}
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
}

if completedCurrentCanaryStep(olderRSs, newRS, stableRS, currExp, currStepAr, r) {
*currentStepIndex++
newStatus.CurrentStepIndex = currentStepIndex
if int(*currentStepIndex) == len(r.Spec.Strategy.CanaryStrategy.Steps) {
c.recorder.Event(r, corev1.EventTypeNormal, "SettingStableRS", "Completed all steps")
}
logCtx.Infof("Incrementing the Current Step Index to %d", *currentStepIndex)
c.recorder.Eventf(r, corev1.EventTypeNormal, "SetStepIndex", "Set Step Index to %d", int(*currentStepIndex))
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
}
if currExp != nil {
newStatus.Canary.CurrentExperiment = currExp.Name
if conditions.ExperimentTimeOut(currExp, currExp.Status) {
newStatus.Canary.ExperimentFailed = true
}
// reconcileCanaryPause will ensure we will requeue this rollout at the appropriate time
// if we are at a pause step with a duration.
c.reconcileCanaryPause(r)

if completedCurrentCanaryStep(olderRSs, newRS, stableRS, currExp, currStepAr, r) {
*currentStepIndex++
newStatus.CurrentStepIndex = currentStepIndex
if int(*currentStepIndex) == len(r.Spec.Strategy.CanaryStrategy.Steps) {
c.recorder.Event(r, corev1.EventTypeNormal, "SettingStableRS", "Completed all steps")
}
logCtx.Infof("Incrementing the Current Step Index to %d", *currentStepIndex)
c.recorder.Eventf(r, corev1.EventTypeNormal, "SetStepIndex", "Set Step Index to %d", int(*currentStepIndex))
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, pointer.BoolPtr(false))
}

addPause := currentStep.Pause != nil
pauseStartTime, paused := calculatePauseStatus(r, newRS, addPause, currArs)
newStatus.PauseStartTime = pauseStartTime
if currExp != nil {
newStatus.Canary.CurrentExperiment = currExp.Name
if conditions.ExperimentTimeOut(currExp, currExp.Status) {
newStatus.Canary.ExperimentFailed = true
}
}

addPause := !r.Spec.Paused && currentStep != nil && currentStep.Pause != nil
var paused bool
newStatus.PauseStartTime, paused = calculatePauseStatus(r, newRS, addPause, currArs)
newStatus.CurrentStepIndex = currentStepIndex
newStatus = c.calculateRolloutConditions(r, newStatus, allRSs, newRS, currExp, currArs)
return c.persistRolloutStatus(r, &newStatus, &paused)
Expand Down
52 changes: 52 additions & 0 deletions rollout/canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,3 +1046,55 @@ func TestCanaryRolloutScaleWhilePaused(t *testing.T) {
expectedPatch := calculatePatch(r2, OnlyObservedGenerationPatch)
assert.Equal(t, expectedPatch, patch)
}

func TestResumeRolloutAfterPauseDuration(t *testing.T) {
f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
SetWeight: pointer.Int32Ptr(10),
},
{
Pause: &v1alpha1.RolloutPause{
Duration: pointer.Int32Ptr(60),
},
},
{
SetWeight: pointer.Int32Ptr(20),
},
}
r1 := newCanaryRollout("foo", 1, nil, steps, pointer.Int32Ptr(1), intstr.FromInt(1), intstr.FromInt(1))
rs1 := newReplicaSetWithStatus(r1, 1, 1)
rs1PodHash := rs1.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
r1 = updateCanaryRolloutStatus(r1, rs1PodHash, 1, 1, 1, true)
r1.Spec.Paused = true
overAMinuteAgo := metav1.Time{Time: time.Now().Add(-61 * time.Second)}
r1.Status.ObservedGeneration = conditions.ComputeGenerationHash(r1.Spec)
r1.Status.PauseStartTime = &overAMinuteAgo

f.kubeobjects = append(f.kubeobjects, rs1)
f.replicaSetLister = append(f.replicaSetLister, rs1)
f.rolloutLister = append(f.rolloutLister, r1)
f.objects = append(f.objects, r1)

_ = f.expectPatchRolloutAction(r1) // this just sets a conditions. ignore for now
patchIndex := f.expectPatchRolloutAction(r1) // this patch should resume the rollout
f.run(getKey(r1, t))

patch := f.getPatchedRollout(patchIndex)
var patchObj map[string]interface{}
err := json.Unmarshal([]byte(patch), &patchObj)
assert.NoError(t, err)

spec := patchObj["spec"].(map[string]interface{})
paused, ok := spec["paused"]
assert.True(t, ok)
assert.Nil(t, paused)

status := patchObj["status"].(map[string]interface{})
assert.Equal(t, float64(2), status["currentStepIndex"])
pauseStartTime, ok := status["pauseStartTime"]
assert.True(t, ok)
assert.Equal(t, nil, pauseStartTime)
}
6 changes: 4 additions & 2 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ func (f *fixture) runController(rolloutName string, startInformers bool, expectE
actions := filterInformerActions(f.client.Actions())
for i, action := range actions {
if len(f.actions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
actionsBytes, _ := json.Marshal(actions[i:])
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), string(actionsBytes))
break
}

Expand All @@ -480,7 +481,8 @@ func (f *fixture) runController(rolloutName string, startInformers bool, expectE
k8sActions := filterInformerActions(f.kubeclient.Actions())
for i, action := range k8sActions {
if len(f.kubeactions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), k8sActions[i:])
actionsBytes, _ := json.Marshal(k8sActions[i:])
f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), string(actionsBytes))
break
}

Expand Down
13 changes: 4 additions & 9 deletions rollout/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (
logutil "github.com/argoproj/argo-rollouts/utils/log"
)

func completedPauseStep(rollout *v1alpha1.Rollout, pause *v1alpha1.RolloutPause) bool {
func completedPauseStep(rollout *v1alpha1.Rollout, pause v1alpha1.RolloutPause) bool {
logCtx := logutil.WithRollout(rollout)

if pause != nil && pause.Duration != nil {
if pause.Duration != nil {
now := metav1.Now()
if rollout.Status.PauseStartTime != nil {
expiredTime := rollout.Status.PauseStartTime.Add(time.Duration(*pause.Duration) * time.Second)
Expand All @@ -23,8 +22,7 @@ func completedPauseStep(rollout *v1alpha1.Rollout, pause *v1alpha1.RolloutPause)
return true
}
}
}
if pause != nil && pause.Duration == nil && rollout.Status.PauseStartTime != nil && !rollout.Spec.Paused {
} else if rollout.Status.PauseStartTime != nil && !rollout.Spec.Paused {
logCtx.Info("Rollout has been unpaused")
return true
}
Expand Down Expand Up @@ -77,10 +75,7 @@ func calculatePauseStatus(rollout *v1alpha1.Rollout, newRS *appsv1.ReplicaSet, a
if reconcileBlueGreenTemplateChange(rollout, newRS) {
return nil, false
}
}

if paused && rollout.Spec.Strategy.BlueGreenStrategy != nil {
if pauseStartTime != nil && rollout.Spec.Strategy.BlueGreenStrategy.AutoPromotionSeconds != nil {
if paused && pauseStartTime != nil && rollout.Spec.Strategy.BlueGreenStrategy.AutoPromotionSeconds != nil {
now := metav1.Now()
autoPromoteActiveServiceDelaySeconds := *rollout.Spec.Strategy.BlueGreenStrategy.AutoPromotionSeconds
switchDeadline := pauseStartTime.Add(time.Duration(autoPromoteActiveServiceDelaySeconds) * time.Second)
Expand Down
8 changes: 3 additions & 5 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *RolloutController) getNewReplicaSet(rollout *v1alpha1.Rollout, rsList,
// syncReplicasOnly is responsible for reconciling rollouts on scaling events.
func (c *RolloutController) syncReplicasOnly(r *v1alpha1.Rollout, rsList []*appsv1.ReplicaSet, isScaling bool) error {
logCtx := logutil.WithRollout(r)
logCtx.Info("Reconciling scaling event")
logCtx.Infof("Syncing replicas only (paused: %v, isScaling: %v)", r.Spec.Paused, isScaling)
newRS, oldRSs, err := c.getAllReplicaSetsAndSyncRevision(r, rsList, false)
if err != nil {
return err
Expand All @@ -227,7 +227,8 @@ func (c *RolloutController) syncReplicasOnly(r *v1alpha1.Rollout, rsList []*apps
return c.syncRolloutStatusBlueGreen(oldRSs, newRS, previewSvc, activeSvc, r, r.Spec.Paused)
}
// The controller wants to use the rolloutCanary method to reconcile the rolllout if the rollout is not paused.
if r.Spec.Strategy.CanaryStrategy != nil && r.Spec.Paused {
// If there are no scaling events, the rollout should only sync its status
if r.Spec.Strategy.CanaryStrategy != nil {
exList, err := c.getExperimentsForRollout(r)
if err != nil {
return err
Expand All @@ -242,9 +243,6 @@ func (c *RolloutController) syncReplicasOnly(r *v1alpha1.Rollout, rsList []*apps

stableRS, oldRSs := replicasetutil.GetStableRS(r, newRS, rsList)

c.reconcileCanaryPause(r)

// If the rollout is paused and there are no scaling events, the rollout should only sync its status
if isScaling {
if _, err := c.reconcileCanaryReplicaSets(r, newRS, stableRS, oldRSs); err != nil {
// If we get an error while trying to scale, the rollout will be requeued
Expand Down

0 comments on commit 55894e6

Please sign in to comment.