Skip to content

Commit

Permalink
fix: canary replicas/weight could flap during abort with dynamic scaling
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Suen <[email protected]>
  • Loading branch information
jessesuen committed Jan 21, 2022
1 parent a74ad8e commit 9753009
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 152 deletions.
10 changes: 4 additions & 6 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ type fixture struct {
unfreezeTime func() error

// events holds all the K8s Event Reasons emitted during the run
events []string
fakeTrafficRouting []*mocks.TrafficRoutingReconciler
fakeSingleTrafficRouting *mocks.TrafficRoutingReconciler
events []string
fakeTrafficRouting *mocks.TrafficRoutingReconciler
}

func newFixture(t *testing.T) *fixture {
Expand All @@ -121,8 +120,7 @@ func newFixture(t *testing.T) *fixture {
return nil
}

f.fakeTrafficRouting = newFakeTrafficRoutingReconciler()
f.fakeSingleTrafficRouting = newFakeSingleTrafficRoutingReconciler()
f.fakeTrafficRouting = newFakeSingleTrafficRoutingReconciler()
return f
}

Expand Down Expand Up @@ -570,7 +568,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
return nil, nil
}
var reconcilers = []trafficrouting.TrafficRoutingReconciler{}
reconcilers = append(reconcilers, f.fakeSingleTrafficRouting)
reconcilers = append(reconcilers, f.fakeTrafficRouting)
return reconcilers, nil
}

Expand Down
7 changes: 7 additions & 0 deletions rollout/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func maxInt(left, right int32) int32 {
return right
}

func minInt(left, right int32) int32 {
if left < right {
return left
}
return right
}

// getRolloutPods returns all pods associated with a rollout
func (p *RolloutPodRestarter) getRolloutPods(ctx context.Context, ro *v1alpha1.Rollout, allRSs []*appsv1.ReplicaSet) ([]*corev1.Pod, error) {
pods, err := p.client.CoreV1().Pods(ro.Namespace).List(ctx, metav1.ListOptions{
Expand Down
28 changes: 25 additions & 3 deletions rollout/trafficrouting.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package rollout

import (
"fmt"
"reflect"
"strings"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/rollout/trafficrouting"
Expand Down Expand Up @@ -120,11 +122,16 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
if rolloututil.IsFullyPromoted(c.rollout) {
// when we are fully promoted. desired canary weight should be 0
} else if c.pauseContext.IsAborted() {
// when aborted, desired canary weight should be 0 (100% to stable), *unless* we
// are using dynamic stable scaling. In that case, we can only decrease canary weight
// according to available replica counts of the stable.
// when aborted, desired canary weight should immediately be 0 (100% to stable), *unless*
// we are using dynamic stable scaling. In that case, we are dynamically decreasing the
// weight to the canary according to the availability of the stable (whatever it can support).
if c.rollout.Spec.Strategy.Canary.DynamicStableScale {
desiredWeight = 100 - ((100 * c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas)
if c.rollout.Status.Canary.Weights != nil {
// This ensures that if we are already at a lower weight, then we will not
// increase the weight because stable availability is flapping (e.g. pod restarts)
desiredWeight = minInt(desiredWeight, c.rollout.Status.Canary.Weights.Canary.Weight)
}
}
} else if c.newRS == nil || c.newRS.Status.AvailableReplicas == 0 {
// when newRS is not available or replicas num is 0. never weight to canary
Expand Down Expand Up @@ -174,6 +181,7 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
if modified, newWeights := calculateWeightStatus(c.rollout, canaryHash, stableHash, desiredWeight, weightDestinations...); modified {
c.log.Infof("Previous weights: %v", c.rollout.Status.Canary.Weights)
c.log.Infof("New weights: %v", newWeights)
c.recorder.Eventf(c.rollout, record.EventOptions{EventReason: conditions.TrafficWeightUpdatedReason}, trafficWeightUpdatedMessage(c.rollout.Status.Canary.Weights, newWeights))
c.newStatus.Canary.Weights = newWeights
}

Expand Down Expand Up @@ -204,6 +212,20 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
return nil
}

// trafficWeightUpdatedMessage returns a message we emit for the kubernetes event whenever we adjust traffic weights
func trafficWeightUpdatedMessage(prev, new *v1alpha1.TrafficWeights) string {
var details []string
if prev == nil {
details = append(details, fmt.Sprintf("to %d", new.Canary.Weight))
} else if prev.Canary.Weight != new.Canary.Weight {
details = append(details, fmt.Sprintf("from %d to %d", prev.Canary.Weight, new.Canary.Weight))
}
if prev != nil && new != nil && !reflect.DeepEqual(prev.Additional, new.Additional) {
details = append(details, fmt.Sprintf("additional: %v", new.Additional))
}
return fmt.Sprintf(conditions.TrafficWeightUpdatedMessage, strings.Join(details, ", "))
}

// calculateWeightStatus calculates the Rollout's `status.canary.weights` values. Returns true if
// it has changed from previous values (which indicates we should reset status.canary.weights.verified)
func calculateWeightStatus(ro *v1alpha1.Rollout, canaryHash, stableHash string, desiredWeight int32, weightDestinations ...v1alpha1.WeightDestination) (bool, *v1alpha1.TrafficWeights) {
Expand Down
6 changes: 1 addition & 5 deletions rollout/trafficrouting/smi/smi.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,7 @@ func (r *Reconciler) SetWeight(desiredWeight int32, additionalDestinations ...v1
if !isControlledBy {
return fmt.Errorf("Rollout does not own TrafficSplit `%s`", trafficSplitName)
}
err = r.patchTrafficSplit(existingTrafficSplit, trafficSplits)
if err == nil {
r.cfg.Recorder.Eventf(r.cfg.Rollout, record.EventOptions{EventReason: "TrafficSplitModified"}, "TrafficSplit `%s` modified", trafficSplitName)
}
return err
return r.patchTrafficSplit(existingTrafficSplit, trafficSplits)
}

func (r *Reconciler) generateTrafficSplits(trafficSplitName string, desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) VersionedTrafficSplits {
Expand Down
Loading

0 comments on commit 9753009

Please sign in to comment.