Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move ReplicaSet creation and Rollout validation earlier during the reconciliation process. #3657

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rollout/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func TestInvalidSpecMissingClusterTemplatesBackgroundAnalysis(t *testing.T) {
f.objects = append(f.objects, r)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestFailCreateStepAnalysisRunIfInvalidTemplateRef(t *testing.T) {
f.objects = append(f.objects, r, at)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func TestFailCreateBackgroundAnalysisRunIfInvalidTemplateRef(t *testing.T) {
f.objects = append(f.objects, r, at)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestFailCreateBackgroundAnalysisRunIfMetricRepeated(t *testing.T) {
f.objects = append(f.objects, r, at, at2)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down
2 changes: 1 addition & 1 deletion rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *rolloutContext) rolloutBlueGreen() error {
if err != nil {
return err
}
c.newRS, err = c.getAllReplicaSetsAndSyncRevision(true)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutBlueGreen create true: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions rollout/bluegreen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,10 @@ func TestBlueGreenRolloutStatusHPAStatusFieldsNoActiveSelector(t *testing.T) {
f := newFixture(t)
defer f.Close()
f.objects = append(f.objects, ro)
f.kubeobjects = append(f.kubeobjects, activeSvc)
f.rolloutLister = append(f.rolloutLister, ro)
f.replicaSetLister = append(f.replicaSetLister, rs)
f.serviceLister = append(f.serviceLister, activeSvc)

ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(ro)
Expand Down
11 changes: 2 additions & 9 deletions rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
func (c *rolloutContext) rolloutCanary() error {
var err error
if replicasetutil.PodTemplateOrStepsChanged(c.rollout, c.newRS) {
c.newRS, err = c.getAllReplicaSetsAndSyncRevision(false)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutCanary with PodTemplateOrStepsChanged: %w", err)
}
return c.syncRolloutStatusCanary()
}

c.newRS, err = c.getAllReplicaSetsAndSyncRevision(true)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutCanary create true: %w", err)
}
Expand Down Expand Up @@ -448,13 +448,6 @@ func (c *rolloutContext) reconcileCanaryReplicaSets() (bool, error) {
return true, nil
}

// If we have updated both the replica count and the pod template hash c.newRS will be nil we want to reconcile the newRS so we look at the
// rollout status to get the newRS to reconcile it.
if c.newRS == nil && c.rollout.Status.CurrentPodHash != c.rollout.Status.StableRS {
rs, _ := replicasetutil.GetReplicaSetByTemplateHash(c.allRSs, c.rollout.Status.CurrentPodHash)
c.newRS = rs
}

scaledNewRS, err := c.reconcileNewReplicaSet()
if err != nil {
return false, err
Expand Down
72 changes: 21 additions & 51 deletions rollout/canary_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rollout

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -421,8 +420,11 @@ func TestResetCurrentStepIndexOnStepChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateRolloutStatusAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)
createRSIndex := f.expectCreateReplicaSetAction(rs1)
f.run(getKey(r2, t))
createdRS := f.getCreatedReplicaSet(createRSIndex)

patch := f.getPatchedRollout(patchIndex)
expectedPatchWithoutPodHash := `{
Expand All @@ -433,7 +435,7 @@ func TestResetCurrentStepIndexOnStepChange(t *testing.T) {
"conditions": %s
}
}`
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, r2, false, "", false)
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, createdRS, false, "", false)
expectedPatch := fmt.Sprintf(expectedPatchWithoutPodHash, expectedCurrentPodHash, expectedCurrentStepHash, newConditions)
assert.JSONEq(t, calculatePatch(r2, expectedPatch), patch)
}
Expand Down Expand Up @@ -462,18 +464,23 @@ func TestResetCurrentStepIndexOnPodSpecChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateRolloutStatusAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)
createdRSIndex := f.expectCreateReplicaSetAction(rs1)

f.run(getKey(r2, t))

patch := f.getPatchedRollout(patchIndex)
updatedRS := f.getUpdatedReplicaSet(createdRSIndex)

expectedPatchWithoutPodHash := `{
"status": {
"currentStepIndex":0,
"currentPodHash": "%s",
"conditions": %s
}
}`
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, r2, false, "", false)
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, updatedRS, false, "", false)

expectedPatch := fmt.Sprintf(expectedPatchWithoutPodHash, expectedCurrentPodHash, newConditions)
assert.JSONEq(t, calculatePatch(r2, expectedPatch), patch)
Expand Down Expand Up @@ -1564,7 +1571,7 @@ func TestCanaryRolloutWithInvalidCanaryServiceName(t *testing.T) {
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchRolloutAction(rollout)
f.run(getKey(rollout, t))
f.runExpectError(getKey(rollout, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1616,7 +1623,7 @@ func TestCanaryRolloutWithInvalidStableServiceName(t *testing.T) {
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchRolloutAction(rollout)
f.run(getKey(rollout, t))
f.runExpectError(getKey(rollout, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1665,7 +1672,7 @@ func TestCanaryRolloutWithInvalidPingServiceName(t *testing.T) {
f.objects = append(f.objects, r)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1697,7 +1704,7 @@ func TestCanaryRolloutWithInvalidPongServiceName(t *testing.T) {
f.serviceLister = append(f.serviceLister, pingSvc)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1896,8 +1903,14 @@ func TestHandleNilNewRSOnScaleAndImageChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateReplicaSetAction(rs1)
f.expectUpdateRolloutStatusAction(r2)
f.expectPatchRolloutAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)

f.expectCreateReplicaSetAction(rs1)
f.expectUpdateReplicaSetAction(rs1)
f.expectUpdateReplicaSetAction(rs1)

f.run(getKey(r2, t))
patch := f.getPatchedRollout(patchIndex)
assert.JSONEq(t, calculatePatch(r2, OnlyObservedGenerationPatch), patch)
Expand Down Expand Up @@ -2105,49 +2118,6 @@ func TestIsDynamicallyRollingBackToStable(t *testing.T) {
}
}

func TestCanaryReplicaAndSpecChangedTogether(t *testing.T) {
f := newFixture(t)
defer f.Close()

originReplicas := 3
r1 := newCanaryRollout("foo", originReplicas, nil, nil, nil, intstr.FromInt(1), intstr.FromInt(0))
canarySVCName := "canary"
stableSVCName := "stable"
r1.Spec.Strategy.Canary.CanaryService = canarySVCName
r1.Spec.Strategy.Canary.StableService = stableSVCName

stableRS := newReplicaSetWithStatus(r1, originReplicas, originReplicas)
stableSVC := newService(stableSVCName, 80,
map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: stableRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]}, r1)

r2 := bumpVersion(r1)
canaryRS := newReplicaSetWithStatus(r2, originReplicas, originReplicas)
canarySVC := newService(canarySVCName, 80,
map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: canaryRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]}, r2)

f.replicaSetLister = append(f.replicaSetLister, canaryRS, stableRS)
f.serviceLister = append(f.serviceLister, canarySVC, stableSVC)

r3 := bumpVersion(r2)
r3.Spec.Replicas = pointer.Int32(int32(originReplicas) + 5)
r3.Status.StableRS = stableRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
r3.Status.CurrentPodHash = canaryRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]

f.rolloutLister = append(f.rolloutLister, r3)
f.kubeobjects = append(f.kubeobjects, canaryRS, stableRS, canarySVC, stableSVC)
f.objects = append(f.objects, r3)

ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(r3)
assert.NoError(t, err)
err = roCtx.reconcile()
assert.NoError(t, err)
updated, err := f.kubeclient.AppsV1().ReplicaSets(r3.Namespace).Get(context.Background(), canaryRS.Name, metav1.GetOptions{})
assert.NoError(t, err)
// check the canary one is updated
assert.NotEqual(t, originReplicas, int(*updated.Spec.Replicas))
}

func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")
Expand Down
22 changes: 3 additions & 19 deletions rollout/context.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package rollout

import (
"time"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/validation/field"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
analysisutil "github.com/argoproj/argo-rollouts/utils/analysis"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
)

type rolloutContext struct {
Expand Down Expand Up @@ -53,19 +49,7 @@ type rolloutContext struct {
}

func (c *rolloutContext) reconcile() error {
// Get Rollout Validation errors
err := c.getRolloutValidationErrors()
leoluz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if vErr, ok := err.(*field.Error); ok {
// We want to frequently requeue rollouts with InvalidSpec errors, because the error
// condition might be timing related (e.g. the Rollout was applied before the Service).
c.enqueueRolloutAfter(c.rollout, 20*time.Second)
return c.createInvalidRolloutCondition(vErr, c.rollout)
}
return err
}

err = c.checkPausedConditions()
err := c.checkPausedConditions()
if err != nil {
return err
}
Expand Down
32 changes: 32 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,38 @@
},
reconcilerBase: c.reconcilerBase,
}

// Get Rollout Validation errors
err = roCtx.getRolloutValidationErrors()
if err != nil {
if vErr, ok := err.(*field.Error); ok {
// We want to frequently requeue rollouts with InvalidSpec errors, because the error
// condition might be timing related (e.g. the Rollout was applied before the Service).
c.enqueueRolloutAfter(roCtx.rollout, 20*time.Second)
err := roCtx.createInvalidRolloutCondition(vErr, roCtx.rollout)
if err != nil {
return nil, err
}
return nil, vErr
}
return nil, err

Check warning on line 551 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L551

Added line #L551 was not covered by tests
}

if roCtx.newRS == nil {
roCtx.newRS, err = roCtx.createDesiredReplicaSet()
if err != nil {
return nil, err
}
roCtx.olderRSs = replicasetutil.FindOldReplicaSets(roCtx.rollout, rsList, roCtx.newRS)
roCtx.stableRS = replicasetutil.GetStableRS(roCtx.rollout, roCtx.newRS, roCtx.olderRSs)
roCtx.otherRSs = replicasetutil.GetOtherRSs(roCtx.rollout, roCtx.newRS, roCtx.stableRS, rsList)
roCtx.allRSs = append(rsList, roCtx.newRS)
err := roCtx.replicaSetInformer.GetIndexer().Add(roCtx.newRS)
if err != nil {
return nil, err

Check warning on line 565 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L565

Added line #L565 was not covered by tests
}
}

if rolloututil.IsFullyPromoted(rollout) && roCtx.pauseContext.IsAborted() {
logCtx.Warnf("Removing abort condition from fully promoted rollout")
roCtx.pauseContext.RemoveAbort()
Expand Down
Loading
Loading