Skip to content

Commit

Permalink
test: add canary w/ traffic routing scaleDownDelay tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Suen <[email protected]>
  • Loading branch information
jessesuen committed Apr 9, 2021
1 parent b9d8713 commit a0f595e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 28 deletions.
2 changes: 1 addition & 1 deletion rollout/bluegreen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ func TestFastRollback(t *testing.T) {
assert.Equal(t, expectedPatch, patch)
}

func TestScaleDownLimit(t *testing.T) {
func TestBlueGreenScaleDownLimit(t *testing.T) {
f := newFixture(t)
defer f.Close()

Expand Down
13 changes: 10 additions & 3 deletions rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ func (c *rolloutContext) syncRolloutStatusCanary() error {
newStatus.HPAReplicas = replicasetutil.GetActualReplicaCountForReplicaSets(c.allRSs)
newStatus.Selector = metav1.FormatLabelSelector(c.rollout.Spec.Selector)

_, currentStepIndex := replicasetutil.GetCurrentCanaryStep(c.rollout)
newStatus.StableRS = c.rollout.Status.StableRS
newStatus.CurrentStepHash = conditions.ComputeStepHash(c.rollout)
stepCount := int32(len(c.rollout.Spec.Strategy.Canary.Steps))

if replicasetutil.PodTemplateOrStepsChanged(c.rollout, c.newRS) {
c.resetRolloutStatus(&newStatus)
c.SetRestartedAt()
if c.newRS != nil && c.rollout.Status.StableRS == replicasetutil.GetPodTemplateHash(c.newRS) {
if stepCount > 0 {
// If we get here, we detected that we've moved back to the stable ReplicaSet
Expand All @@ -280,6 +280,14 @@ func (c *rolloutContext) syncRolloutStatusCanary() error {
return c.persistRolloutStatus(&newStatus)
}

if c.rollout.Status.PromoteFull {
c.pauseContext.ClearPauseConditions()
c.pauseContext.RemoveAbort()
if stepCount > 0 {
currentStepIndex = &stepCount
}
}

if reason := c.shouldFullPromote(newStatus); reason != "" {
err := c.promoteStable(&newStatus, reason)
if err != nil {
Expand All @@ -301,7 +309,6 @@ func (c *rolloutContext) syncRolloutStatusCanary() error {
return c.persistRolloutStatus(&newStatus)
}

_, currentStepIndex := replicasetutil.GetCurrentCanaryStep(c.rollout)
if c.completedCurrentCanaryStep() {
*currentStepIndex++
newStatus.Canary.CurrentStepAnalysisRunStatus = nil
Expand Down Expand Up @@ -343,7 +350,7 @@ func (c *rolloutContext) reconcileCanaryReplicaSets() (bool, error) {
return false, err
}
if scaledDown {
c.log.Info("Not finished reconciling old replica sets")
c.log.Info("Not finished reconciling old ReplicaSets")
return true, nil
}
return false, nil
Expand Down
12 changes: 8 additions & 4 deletions rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error {
if !replicasetutil.HasScaleDownDeadline(rs) {
return nil
}
c.log.Infof("Removing '%s' annotation on RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
patch := fmt.Sprintf(removeScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
}
return err
}

Expand All @@ -52,10 +54,12 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet) error {
}
return nil
}
c.log.Infof("Adding '%s' annotation to RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
now := metav1.Now().Add(scaleDownDelaySeconds * time.Second).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, now)
deadline := metav1.Now().Add(scaleDownDelaySeconds * time.Second).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, deadline)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Set '%s' annotation on '%s' to %s (%ds)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds)
}
return err
}

Expand Down
30 changes: 17 additions & 13 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,36 +784,40 @@ func (c *rolloutContext) shouldFullPromote(newStatus v1alpha1.RolloutStatus) str
// NOTE: the order of these checks are significant
if c.stableRS == nil {
return "Initial deploy"
// } else if newStatus.StableRS == newStatus.CurrentPodHash {
// // Don't think we should ever get here
// return "Already fully promoted"
} else if c.rollout.Spec.Strategy.Canary != nil {
if c.pauseContext.IsAborted() {
return ""
}
if c.newRS == nil || c.newRS.Status.AvailableReplicas != defaults.GetReplicasOrDefault(c.rollout.Spec.Replicas) {
return ""
}
if c.rollout.Status.PromoteFull {
return "Full promotion requested"
} else if c.pauseContext.IsAborted() {
return ""
}
_, currentStepIndex := replicasetutil.GetCurrentCanaryStep(c.rollout)
stepCount := len(c.rollout.Spec.Strategy.Canary.Steps)
if stepCount == 0 || (currentStepIndex != nil && *currentStepIndex == int32(stepCount)) {
if c.newRS != nil && c.newRS.Status.AvailableReplicas == defaults.GetReplicasOrDefault(c.rollout.Spec.Replicas) {
return fmt.Sprintf("Completed all %d steps", stepCount)
}
completedAllSteps := stepCount == 0 || (currentStepIndex != nil && *currentStepIndex == int32(stepCount))
if completedAllSteps {
return fmt.Sprintf("Completed all %d canary steps", stepCount)
}
} else if c.rollout.Spec.Strategy.BlueGreen != nil {
if newStatus.BlueGreen.ActiveSelector == "" {
// corner case - initial deployments won't update the active selector until stable is set.
// We must allow current to be marked stable, so that active can be marked to current, and
// subsequently stable marked to current too. (chicken and egg problem)
return "Initial deploy"
} else if newStatus.BlueGreen.ActiveSelector != newStatus.CurrentPodHash {
}
if newStatus.BlueGreen.ActiveSelector != newStatus.CurrentPodHash {
// active selector still pointing to previous RS, don't update stable yet
return ""
} else if c.rollout.Status.PromoteFull {
}
if c.rollout.Status.PromoteFull {
return "Full promotion requested"
} else if c.pauseContext.IsAborted() {
}
if c.pauseContext.IsAborted() {
return ""
} else if c.rollout.Spec.Strategy.BlueGreen.PostPromotionAnalysis != nil {
}
if c.rollout.Spec.Strategy.BlueGreen.PostPromotionAnalysis != nil {
// corner case - we fast-track the StableRS to be updated to CurrentPodHash when we are
// moving to a ReplicaSet within scaleDownDelay and wish to skip analysis.
if replicasetutil.HasScaleDownDeadline(c.newRS) {
Expand Down
12 changes: 8 additions & 4 deletions rollout/trafficrouting/istio/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (r *Reconciler) UpdateHash(canaryHash, stableHash string) error {
return err
}
if modified {
r.log.Infof("DestinationRule %s subset updated (%s: %s, %s: %s)", dRuleSpec.Name, dRuleSpec.CanarySubsetName, canaryHash, dRuleSpec.StableSubsetName, stableHash)
msg := fmt.Sprintf("DestinationRule %s subset updated (%s: %s, %s: %s)", dRuleSpec.Name, dRuleSpec.CanarySubsetName, canaryHash, dRuleSpec.StableSubsetName, stableHash)
r.log.Info(msg)
r.recorder.Event(r.rollout, corev1.EventTypeNormal, "UpdatedDestinationRule", msg)
}
return nil
}
Expand Down Expand Up @@ -331,10 +333,12 @@ func (r *Reconciler) SetWeight(desiredWeight int32) error {
if !modified {
return nil
}
msg := fmt.Sprintf("Updating VirtualService `%s` to desiredWeight '%d'", vsvcName, desiredWeight)
r.log.Info(msg)
r.recorder.Event(r.rollout, corev1.EventTypeNormal, "UpdatingVirtualService", msg)
_, err = client.Update(ctx, modifiedVsvc, metav1.UpdateOptions{})
if err == nil {
msg := fmt.Sprintf("VirtualService `%s` set to desiredWeight '%d'", vsvcName, desiredWeight)
r.log.Info(msg)
r.recorder.Event(r.rollout, corev1.EventTypeNormal, "UpdatedVirtualService", msg)
}
return err
}

Expand Down
90 changes: 90 additions & 0 deletions rollout/trafficrouting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package rollout

import (
"errors"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/dynamic/dynamiclister"
Expand Down Expand Up @@ -373,3 +375,91 @@ func TestNewTrafficRoutingReconciler(t *testing.T) {
assert.Equal(t, smi.Type, networkReconciler.Type())
}
}

// Verifies with a canary using traffic routing, we add a scaledown delay to the old ReplicaSet
// after promoting desired ReplicaSet to stable
func TestCanaryWithTrafficRoutingAddScaleDownDelay(t *testing.T) {
f := newFixture(t)
defer f.Close()

r1 := newCanaryRollout("foo", 1, nil, nil, pointer.Int32Ptr(1), intstr.FromInt(1), intstr.FromInt(1))
r1.Spec.Strategy.Canary.CanaryService = "canary"
r1.Spec.Strategy.Canary.StableService = "stable"
r1.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{
SMI: &v1alpha1.SMITrafficRouting{},
}
r2 := bumpVersion(r1)
rs1 := newReplicaSetWithStatus(r1, 1, 1)
rs1PodHash := rs1.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
rs2 := newReplicaSetWithStatus(r2, 1, 1)
r2 = updateCanaryRolloutStatus(r2, rs1PodHash, 2, 2, 2, false)
rs2PodHash := rs2.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
r2.Status.ObservedGeneration = strconv.Itoa(int(r2.Generation))

canarySelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs2PodHash}
stableSelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs1PodHash}
canarySvc := newService("canary", 80, canarySelector, r2)
stableSvc := newService("stable", 80, stableSelector, r2)

f.kubeobjects = append(f.kubeobjects, rs1, rs2, canarySvc, stableSvc)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2)
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

rs1Patch := f.expectPatchReplicaSetAction(rs1) // adds the annotation
patchIndex := f.expectPatchRolloutAction(r2) // updates the rollout status
f.run(getKey(r2, t))

f.verifyPatchedReplicaSet(rs1Patch, 30)
roPatchObj := f.getPatchedRolloutAsObject(patchIndex)
assert.Equal(t, rs2PodHash, roPatchObj.Status.StableRS)
assert.Nil(t, roPatchObj.Status.CurrentStepIndex)
}

// Verifies with a canary using traffic routing, we scale down old ReplicaSets which exceed our limit
// after promoting desired ReplicaSet to stable
func TestCanaryWithTrafficRoutingScaleDownLimit(t *testing.T) {
f := newFixture(t)
defer f.Close()

inTheFuture := metav1.Now().Add(10 * time.Second).UTC().Format(time.RFC3339)

r1 := newCanaryRollout("foo", 1, nil, nil, pointer.Int32Ptr(1), intstr.FromInt(1), intstr.FromInt(1))
rs1 := newReplicaSetWithStatus(r1, 1, 1)
rs1.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = inTheFuture
r1.Spec.Strategy.Canary.ScaleDownDelayRevisionLimit = pointer.Int32Ptr(1)
r1.Spec.Strategy.Canary.CanaryService = "canary"
r1.Spec.Strategy.Canary.StableService = "stable"
r1.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{
SMI: &v1alpha1.SMITrafficRouting{},
}

r2 := bumpVersion(r1)
rs2 := newReplicaSetWithStatus(r2, 1, 1)
rs2.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = inTheFuture

r3 := bumpVersion(r2)
rs3 := newReplicaSetWithStatus(r3, 1, 1)
rs3PodHash := rs3.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
r3 = updateCanaryRolloutStatus(r3, rs3PodHash, 2, 2, 2, false)

r3.Status.ObservedGeneration = strconv.Itoa(int(r3.Generation))
canarySelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs3PodHash}
stableSelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs3PodHash}
canarySvc := newService("canary", 80, canarySelector, r3)
stableSvc := newService("stable", 80, stableSelector, r3)

f.kubeobjects = append(f.kubeobjects, rs1, rs2, rs3, canarySvc, stableSvc)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2, rs3)
f.rolloutLister = append(f.rolloutLister, r3)
f.objects = append(f.objects, r3)

rs1ScaleDownIndex := f.expectUpdateReplicaSetAction(rs1) // scale down ReplicaSet
_ = f.expectPatchRolloutAction(r3) // updates the rollout status
f.run(getKey(r3, t))

rs1Updated := f.getUpdatedReplicaSet(rs1ScaleDownIndex)
assert.Equal(t, int32(0), *rs1Updated.Spec.Replicas)
_, ok := rs1Updated.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]
assert.False(t, ok, "annotation not removed")
}
11 changes: 8 additions & 3 deletions test/e2e/istio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *IstioSuite) TestIstioHostSplit() {
When().
PromoteRollout().
WaitForRolloutStatus("Healthy").
Sleep(1*time.Second). // stable is currently set first, and then changes made to VirtualServices/DestinationRules
Then().
Assert(func(t *fixtures.Then) {
vsvc := t.GetVirtualService()
Expand All @@ -73,7 +74,8 @@ func (s *IstioSuite) TestIstioHostSplit() {
rs2 := t.GetReplicaSetByRevision("2")
assert.Equal(s.T(), rs2.Spec.Template.Labels[v1alpha1.DefaultRolloutUniqueLabelKey], desired.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey])
assert.Equal(s.T(), rs2.Spec.Template.Labels[v1alpha1.DefaultRolloutUniqueLabelKey], stable.Spec.Selector[v1alpha1.DefaultRolloutUniqueLabelKey])
})
}).
ExpectRevisionPodCount("1", 1) // don't scale down old replicaset since it will be within scaleDownDelay
}

func (s *IstioSuite) TestIstioSubsetSplit() {
Expand Down Expand Up @@ -111,6 +113,7 @@ func (s *IstioSuite) TestIstioSubsetSplit() {
When().
PromoteRollout().
WaitForRolloutStatus("Healthy").
Sleep(1*time.Second). // stable is currently set first, and then changes made to VirtualServices/DestinationRules
Then().
Assert(func(t *fixtures.Then) {
vsvc := t.GetVirtualService()
Expand All @@ -122,6 +125,7 @@ func (s *IstioSuite) TestIstioSubsetSplit() {
assert.Equal(s.T(), rs2.Spec.Template.Labels[v1alpha1.DefaultRolloutUniqueLabelKey], destrule.Spec.Subsets[0].Labels[v1alpha1.DefaultRolloutUniqueLabelKey]) // stable
assert.Equal(s.T(), rs2.Spec.Template.Labels[v1alpha1.DefaultRolloutUniqueLabelKey], destrule.Spec.Subsets[1].Labels[v1alpha1.DefaultRolloutUniqueLabelKey]) // canary
}).
ExpectRevisionPodCount("1", 1). // don't scale down old replicaset since it will be within scaleDownDelay
When().
// Verify we remove the injections on the DestinationRule when a rollout no longer references it
UpdateSpec(`
Expand All @@ -130,7 +134,7 @@ spec:
canary:
trafficRouting: null
`).
Sleep(2 * time.Second).
Sleep(1*time.Second).
Then().
Assert(func(t *fixtures.Then) {
destrule := t.GetDestinationRule()
Expand All @@ -140,5 +144,6 @@ spec:
assert.False(s.T(), ok)
_, ok = destrule.Spec.Subsets[1].Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
assert.False(s.T(), ok)
})
}).
ExpectRevisionPodCount("1", 0) // since we moved back to basic canary, we should scale down older RSs
}

0 comments on commit a0f595e

Please sign in to comment.