diff --git a/.codecov.yml b/.codecov.yml index a8e52fbeca..2758a60933 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -19,3 +19,5 @@ ignore: - 'pkg/client/.*' - 'vendor/.*' - '**/mocks/*' + - 'hack/gen-crd-spec/main.go' + - 'hack/gen-docs/main.go' diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 7b7b2c5300..0b7aca1509 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -95,4 +95,4 @@ jobs: with: name: e2e-controller-k8s-${{ matrix.kubernetes-minor-version }}.log path: /tmp/e2e-controller.log - if: ${{ failure() }} + if: ${{ always() }} diff --git a/Makefile b/Makefile index fd5736aa15..609cb724d9 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ DEV_IMAGE ?= false E2E_INSTANCE_ID ?= argo-rollouts-e2e E2E_TEST_OPTIONS ?= E2E_PARALLEL ?= 1 -E2E_WAIT_TIMEOUT ?= 120 +E2E_WAIT_TIMEOUT ?= 90 GOPATH ?= $(shell go env GOPATH) # Global toolchain configuration diff --git a/hack/gen-crd-spec/main.go b/hack/gen-crd-spec/main.go index a3e31d92b8..60c1bde9d7 100644 --- a/hack/gen-crd-spec/main.go +++ b/hack/gen-crd-spec/main.go @@ -96,11 +96,12 @@ func NewCustomResourceDefinition() []*extensionsobj.CustomResourceDefinition { // clean up stuff left by controller-gen deleteFile("config/webhook/manifests.yaml") deleteFile("config/webhook") - deleteFile("config/argoproj.io_analysisruns.yaml") - deleteFile("config/argoproj.io_analysistemplates.yaml") - deleteFile("config/argoproj.io_clusteranalysistemplates.yaml") - deleteFile("config/argoproj.io_experiments.yaml") - deleteFile("config/argoproj.io_rollouts.yaml") + deleteFile("config/crd/argoproj.io_analysisruns.yaml") + deleteFile("config/crd/argoproj.io_analysistemplates.yaml") + deleteFile("config/crd/argoproj.io_clusteranalysistemplates.yaml") + deleteFile("config/crd/argoproj.io_experiments.yaml") + deleteFile("config/crd/argoproj.io_rollouts.yaml") + deleteFile("config/crd") deleteFile("config") crds := []*extensionsobj.CustomResourceDefinition{} diff --git a/rollout/canary.go b/rollout/canary.go index ed27bd0a79..fa33b43616 100644 --- a/rollout/canary.go +++ b/rollout/canary.go @@ -112,7 +112,7 @@ func (c *rolloutContext) reconcileCanaryStableReplicaSet() (bool, error) { } scaled, _, err := c.scaleReplicaSetAndRecordEvent(c.stableRS, desiredStableRSReplicaCount) if err != nil { - return scaled, fmt.Errorf("failed to scaleReplicaSetAndRecordEvent in reconcileCanaryStableReplicaSet:L %w", err) + return scaled, fmt.Errorf("failed to scaleReplicaSetAndRecordEvent in reconcileCanaryStableReplicaSet: %w", err) } return scaled, err } diff --git a/rollout/canary_test.go b/rollout/canary_test.go index 56b05c7177..3d1eec9d14 100644 --- a/rollout/canary_test.go +++ b/rollout/canary_test.go @@ -4,10 +4,16 @@ import ( "context" "encoding/json" "fmt" + "os" "strconv" "testing" "time" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + k8stesting "k8s.io/client-go/testing" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/apps/v1" @@ -2141,3 +2147,106 @@ func TestCanaryReplicaAndSpecChangedTogether(t *testing.T) { // check the canary one is updated assert.NotEqual(t, originReplicas, int(*updated.Spec.Replicas)) } + +func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) { + os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true") + defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") + + f := newFixture(t) + defer f.Close() + + steps := []v1alpha1.CanaryStep{ + { + SetWeight: int32Ptr(10), + }, { + Pause: &v1alpha1.RolloutPause{ + Duration: v1alpha1.DurationFromInt(10), + }, + }, + } + r1 := newCanaryRollout("foo", 10, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0)) + r1.Spec.Template.Labels["rollout.argoproj.io/foo"] = "bar" + + rs1 := newReplicaSetWithStatus(r1, 10, 10) + r1.Spec.Replicas = pointer.Int32(2) + f.kubeobjects = append(f.kubeobjects, rs1) + f.replicaSetLister = append(f.replicaSetLister, rs1) + + f.rolloutLister = append(f.rolloutLister, r1) + f.objects = append(f.objects, r1) + + f.expectPatchRolloutAction(r1) + f.expectUpdateReplicaSetAction(rs1) // attempt to scale replicaset but conflict + patchIndex := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset + + key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name) + c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute }) + + f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(k8stesting.UpdateAction).GetObject(), errors.NewConflict(schema.GroupResource{ + Group: "Apps", + Resource: "ReplicaSet", + }, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error")) + }) + + f.runController(key, true, false, c, i, k8sI) + + updatedRs := f.getPatchedReplicaSet(patchIndex) // minus one because update did not happen because conflict + assert.Equal(t, int32(2), *updatedRs.Spec.Replicas) +} + +func TestSyncRolloutWithConflictInSyncReplicaSetRevision(t *testing.T) { + os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true") + defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") + + f := newFixture(t) + defer f.Close() + + steps := []v1alpha1.CanaryStep{ + { + SetWeight: int32Ptr(10), + }, { + Pause: &v1alpha1.RolloutPause{ + Duration: v1alpha1.DurationFromInt(10), + }, + }, + } + r1 := newCanaryRollout("foo", 3, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0)) + r2 := bumpVersion(r1) + + rs1 := newReplicaSetWithStatus(r1, 3, 3) + rs2 := newReplicaSetWithStatus(r2, 3, 3) + rs2.Annotations["rollout.argoproj.io/revision"] = "1" + + f.kubeobjects = append(f.kubeobjects, rs1, rs2) + f.replicaSetLister = append(f.replicaSetLister, rs1, rs2) + + f.rolloutLister = append(f.rolloutLister, r2) + f.objects = append(f.objects, r2) + + key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name) + c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute }) + + f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.ReplicaSet{}, errors.NewConflict(schema.GroupResource{ + Group: "Apps", + Resource: "ReplicaSet", + }, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error")) + }) + + f.expectPatchRolloutAction(r2) + f.expectUpdateReplicaSetAction(rs1) // attempt to update replicaset revision but conflict + patchIndex1 := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset + + f.expectUpdateReplicaSetAction(rs2) // attempt to scale replicaset but conflict + patchIndex2 := f.expectPatchReplicaSetAction(rs2) // instead of update patch replicaset + + f.runController(key, true, false, c, i, k8sI) + + updatedRs1 := f.getPatchedReplicaSet(patchIndex1) + assert.Equal(t, "2", updatedRs1.Annotations["rollout.argoproj.io/revision"]) + assert.Equal(t, int32(3), *updatedRs1.Spec.Replicas) + + updatedRs2 := f.getPatchedReplicaSet(patchIndex2) + assert.Equal(t, int32(0), *updatedRs2.Spec.Replicas) +} diff --git a/rollout/controller.go b/rollout/controller.go index 972cea882c..23f7f340dd 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -4,11 +4,16 @@ import ( "context" "encoding/json" "fmt" + "os" "reflect" "strconv" + "strings" "sync" "time" + "github.com/argoproj/argo-rollouts/utils/annotations" + + "github.com/argoproj/argo-rollouts/utils/diff" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" @@ -16,11 +21,13 @@ import ( log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + patchtypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -937,3 +944,92 @@ func remarshalRollout(r *v1alpha1.Rollout) *v1alpha1.Rollout { } return &remarshalled } + +// updateReplicaSetWithPatch updates the replicaset using Update and on failure falls back to a patch this function only exists to make sure we always can update +// replicasets and to not get into an conflict loop updating replicasets. We should really look into a complete refactor of how rollouts handles replicasets such +// that we do not keep a fully replicaset on the rollout context under newRS and instead switch to a patch only based approach. +func (c *rolloutContext) updateReplicaSetFallbackToPatch(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) { + updatedRS, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Update(ctx, rs, metav1.UpdateOptions{}) + if err != nil { + if errors.IsConflict(err) { + if os.Getenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") == "true" { + rsGet, err := c.replicaSetLister.ReplicaSets(rs.Namespace).Get(rs.Name) + if err != nil { + return nil, fmt.Errorf("error getting replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + rsGetJson, err := json.Marshal(rsGet) + if err != nil { + return nil, fmt.Errorf("error marshalling informer replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + rsCopyJson, err := json.Marshal(rs) + if err != nil { + return nil, fmt.Errorf("error marshalling memory replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + c.log.Infof("Informer RS: %s", rsGetJson) + c.log.Infof("Memory RS: %s", rsCopyJson) + } + + c.log.Infof("Conflict when updating replicaset %s, falling back to patch", rs.Name) + + patchRS := appsv1.ReplicaSet{} + patchRS.Spec.Replicas = rs.Spec.Replicas + patchRS.Spec.Template.Labels = rs.Spec.Template.Labels + patchRS.Spec.Template.Annotations = rs.Spec.Template.Annotations + + patchRS.Annotations = make(map[string]string) + patchRS.Labels = make(map[string]string) + patchRS.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: make(map[string]string), + } + + if _, found := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; found { + patchRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] + } + + if _, found := rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; found { + patchRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = rs.Labels[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] + } + + if _, found := rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]; found { + patchRS.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] + } + + for key, value := range rs.Annotations { + if strings.HasPrefix(key, annotations.RolloutLabel) || + strings.HasPrefix(key, "argo-rollouts.argoproj.io") || + strings.HasPrefix(key, "experiment.argoproj.io") { + patchRS.Annotations[key] = value + } + } + for key, value := range rs.Labels { + if strings.HasPrefix(key, annotations.RolloutLabel) || + strings.HasPrefix(key, "argo-rollouts.argoproj.io") || + strings.HasPrefix(key, "experiment.argoproj.io") { + patchRS.Labels[key] = value + } + } + + patch, _, err := diff.CreateTwoWayMergePatch(appsv1.ReplicaSet{}, patchRS, appsv1.ReplicaSet{}) + if err != nil { + return nil, fmt.Errorf("error creating patch for conflict log in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + + c.log.Infof("Patching replicaset with patch: %s", string(patch)) + updatedRS, err = c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.StrategicMergePatchType, patch, metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("error patching replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + + err = c.replicaSetInformer.GetIndexer().Update(updatedRS) + if err != nil { + return nil, fmt.Errorf("error updating replicaset informer in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) + } + + return updatedRS, err + } + } + if updatedRS != nil { + updatedRS.DeepCopyInto(rs) + } + return rs, err +} diff --git a/rollout/controller_test.go b/rollout/controller_test.go index 892a2be64f..e38fca79c5 100644 --- a/rollout/controller_test.go +++ b/rollout/controller_test.go @@ -792,6 +792,12 @@ func (f *fixture) expectPatchServiceAction(s *corev1.Service, newLabel string) i return len } +func (f *fixture) expectGetReplicaSetAction(r *appsv1.ReplicaSet) int { //nolint:unused + len := len(f.kubeactions) + f.kubeactions = append(f.kubeactions, core.NewGetAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r.Name)) + return len +} + func (f *fixture) expectCreateReplicaSetAction(r *appsv1.ReplicaSet) int { len := len(f.kubeactions) f.kubeactions = append(f.kubeactions, core.NewCreateAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r)) @@ -950,6 +956,21 @@ func (f *fixture) getUpdatedReplicaSet(index int) *appsv1.ReplicaSet { return rs } +func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet { + action := filterInformerActions(f.kubeclient.Actions())[index] + patchAction, ok := action.(core.PatchAction) + if !ok { + f.t.Fatalf("Expected Patch action, not %s", action.GetVerb()) + } + + rs := appsv1.ReplicaSet{} + err := json.Unmarshal(patchAction.GetPatch(), &rs) + if err != nil { + panic(err) + } + return &rs +} + func (f *fixture) verifyPatchedReplicaSet(index int, scaleDownDelaySeconds int32) { action := filterInformerActions(f.kubeclient.Actions())[index] patchAction, ok := action.(core.PatchAction) diff --git a/rollout/ephemeralmetadata.go b/rollout/ephemeralmetadata.go index 60b745ed29..4f502a78ab 100644 --- a/rollout/ephemeralmetadata.go +++ b/rollout/ephemeralmetadata.go @@ -82,14 +82,12 @@ func (c *rolloutContext) syncEphemeralMetadata(ctx context.Context, rs *appsv1.R } // 2. Update ReplicaSet so that any new pods it creates will have the metadata - rs, err = c.kubeclientset.AppsV1().ReplicaSets(modifiedRS.Namespace).Update(ctx, modifiedRS, metav1.UpdateOptions{}) + rs, err = c.updateReplicaSetFallbackToPatch(ctx, modifiedRS) if err != nil { - return fmt.Errorf("error updating replicaset in syncEphemeralMetadata: %w", err) - } - err = c.replicaSetInformer.GetIndexer().Update(rs) - if err != nil { - return fmt.Errorf("error updating replicaset informer in syncEphemeralMetadata: %w", err) + c.log.Infof("failed to sync ephemeral metadata %v to ReplicaSet %s: %v", podMetadata, rs.Name, err) + return fmt.Errorf("failed to sync ephemeral metadata: %w", err) } + c.log.Infof("synced ephemeral metadata %v to ReplicaSet %s", podMetadata, rs.Name) return nil } diff --git a/rollout/sync.go b/rollout/sync.go index 875949ee55..6d656e2d5a 100644 --- a/rollout/sync.go +++ b/rollout/sync.go @@ -83,17 +83,13 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) { affinityNeedsUpdate := replicasetutil.IfInjectedAntiAffinityRuleNeedsUpdate(rsCopy.Spec.Template.Spec.Affinity, *c.rollout) if annotationsUpdated || minReadySecondsNeedsUpdate || affinityNeedsUpdate { + rsCopy.Spec.MinReadySeconds = c.rollout.Spec.MinReadySeconds rsCopy.Spec.Template.Spec.Affinity = replicasetutil.GenerateReplicaSetAffinity(*c.rollout) - rs, err := c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) - if err != nil { - c.log.WithError(err).Error("Error: updating replicaset revision") - return nil, fmt.Errorf("error updating replicaset revision: %v", err) - } - c.log.Infof("Synced revision on ReplicaSet '%s' to '%s'", rs.Name, newRevision) - err = c.replicaSetInformer.GetIndexer().Update(rs) + + rs, err := c.updateReplicaSetFallbackToPatch(ctx, rsCopy) if err != nil { - return nil, fmt.Errorf("error updating replicaset informer in syncReplicaSetRevision: %w", err) + return nil, fmt.Errorf("failed to update replicaset revision on %s: %w", rsCopy.Name, err) } return rs, nil } @@ -113,7 +109,7 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) { conditions.SetRolloutCondition(&c.rollout.Status, *condition) updatedRollout, err := c.argoprojclientset.ArgoprojV1alpha1().Rollouts(c.rollout.Namespace).UpdateStatus(ctx, c.rollout, metav1.UpdateOptions{}) if err != nil { - c.log.WithError(err).Error("Error: updating rollout revision") + c.log.WithError(err).Error("Error: updating rollout status in syncReplicaSetRevision") return nil, err } c.rollout = updatedRollout @@ -245,7 +241,7 @@ func (c *rolloutContext) createDesiredReplicaSet() (*appsv1.ReplicaSet, error) { cond := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionFalse, conditions.FailedRSCreateReason, msg) patchErr := c.patchCondition(c.rollout, newStatus, cond) if patchErr != nil { - c.log.Warnf("Error Patching Rollout: %s", patchErr.Error()) + c.log.Warnf("Error Patching Rollout Conditions: %s", patchErr.Error()) } return nil, err default: @@ -370,25 +366,21 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, rolloutReplicas := defaults.GetReplicasOrDefault(rollout.Spec.Replicas) annotationsNeedUpdate := annotations.ReplicasAnnotationsNeedUpdate(rs, rolloutReplicas) - scaled := false var err error + scaled := false if sizeNeedsUpdate || annotationsNeedUpdate { rsCopy := rs.DeepCopy() oldScale := defaults.GetReplicasOrDefault(rs.Spec.Replicas) *(rsCopy.Spec.Replicas) = newScale annotations.SetReplicasAnnotations(rsCopy, rolloutReplicas) if fullScaleDown && !c.shouldDelayScaleDownOnAbort() { + // This bypasses the normal call to removeScaleDownDelay and then depends on the removal via an update in updateReplicaSetFallbackToPatch delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey) } - rs, err = c.kubeclientset.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) - if err != nil { - return scaled, rs, fmt.Errorf("error updating replicaset %s: %w", rsCopy.Name, err) - } - err = c.replicaSetInformer.GetIndexer().Update(rs) + rs, err = c.updateReplicaSetFallbackToPatch(ctx, rsCopy) if err != nil { - err = fmt.Errorf("error updating replicaset informer in scaleReplicaSet: %w", err) - return scaled, rs, err + return scaled, rs, fmt.Errorf("failed to updateReplicaSetFallbackToPatch in scaleReplicaSet: %w", err) } if sizeNeedsUpdate { diff --git a/test/e2e/aws_test.go b/test/e2e/aws_test.go index f8b0553fe2..dab037e395 100644 --- a/test/e2e/aws_test.go +++ b/test/e2e/aws_test.go @@ -239,6 +239,7 @@ func (s *AWSSuite) TestALBExperimentStepMultiIngress() { } func (s *AWSSuite) TestALBExperimentStepNoSetWeight() { + //TODO: this test is flaky s.Given(). RolloutObjects("@alb/rollout-alb-experiment-no-setweight.yaml"). When(). @@ -272,6 +273,7 @@ func (s *AWSSuite) TestALBExperimentStepNoSetWeight() { } func (s *AWSSuite) TestALBExperimentStepNoSetWeightMultiIngress() { + //TODO: this test is flaky s.Given(). RolloutObjects("@alb/rollout-alb-multi-ingress-experiment-no-setweight.yaml"). When(). diff --git a/test/e2e/canary_test.go b/test/e2e/canary_test.go index 2c68ec8bd7..8656aa5c4d 100644 --- a/test/e2e/canary_test.go +++ b/test/e2e/canary_test.go @@ -149,7 +149,10 @@ spec: spec: containers: - name: updatescaling - command: [/bad-command]`). + resources: + requests: + memory: 16Mi + cpu: 2m`). WaitForRolloutReplicas(7). Then(). ExpectCanaryStablePodCount(4, 3). @@ -658,6 +661,7 @@ func (s *CanarySuite) TestCanaryDynamicStableScale() { When(). MarkPodsReady("1", 1). // mark last remaining stable pod as ready (4/4 stable are ready) WaitForRevisionPodCount("2", 0). + Sleep(2*time.Second). //WaitForRevisionPodCount does not wait for terminating pods and so ExpectServiceSelector fails sleep a bit for the terminating pods to be deleted Then(). // Expect that the canary service selector is now set to stable because of dynamic stable scale is over and we have all pods up on stable rs ExpectServiceSelector("dynamic-stable-scale-canary", map[string]string{"app": "dynamic-stable-scale", "rollouts-pod-template-hash": "868d98995b"}, false). diff --git a/test/e2e/functional/analysistemplate-sleep-job.yaml b/test/e2e/functional/analysistemplate-sleep-job.yaml index 86faa2c877..4fdd369dee 100644 --- a/test/e2e/functional/analysistemplate-sleep-job.yaml +++ b/test/e2e/functional/analysistemplate-sleep-job.yaml @@ -6,7 +6,7 @@ metadata: spec: args: - name: duration - value: 0s + value: "0" - name: exit-code value: "0" - name: count diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 930c7d0c61..668c75bb0c 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -166,13 +166,14 @@ spec: UpdateSpec(). WaitForRolloutStatus("Paused"). // At step 1 (pause: {duration: 24h}) PromoteRollout(). - Sleep(2*time.Second). + Sleep(3*time.Second). + WaitForInlineAnalysisRunPhase("Running"). Then(). + ExpectRolloutStatus("Progressing"). // At step 2 (analysis: sleep-job - 24h) + ExpectAnalysisRunCount(1). ExpectRollout("status.currentStepIndex == 1", func(r *v1alpha1.Rollout) bool { return *r.Status.CurrentStepIndex == 1 }). - ExpectRolloutStatus("Progressing"). // At step 2 (analysis: sleep-job - 24h) - ExpectAnalysisRunCount(1). When(). PromoteRollout(). Sleep(2 * time.Second). @@ -205,6 +206,9 @@ spec: prePromotionAnalysis: templates: - templateName: sleep-job + args: + - name: duration + value: "10" postPromotionAnalysis: templates: - templateName: sleep-job @@ -228,7 +232,8 @@ spec: ApplyManifests(). WaitForRolloutStatus("Healthy"). UpdateSpec(). - Sleep(3 * time.Second). + Sleep(5 * time.Second). + WaitForPrePromotionAnalysisRunPhase("Running"). PromoteRolloutFull(). WaitForRolloutStatus("Healthy"). Then(). diff --git a/test/fixtures/e2e_suite.go b/test/fixtures/e2e_suite.go index a774afcf94..e78ae1aa68 100644 --- a/test/fixtures/e2e_suite.go +++ b/test/fixtures/e2e_suite.go @@ -54,7 +54,7 @@ const ( ) var ( - E2EWaitTimeout time.Duration = time.Second * 120 + E2EWaitTimeout time.Duration = time.Second * 90 E2EPodDelay = 0 E2EALBIngressAnnotations map[string]string @@ -143,8 +143,8 @@ func (s *E2ESuite) SetupSuite() { restConfig, err := config.ClientConfig() s.CheckError(err) s.Common.kubernetesHost = restConfig.Host - restConfig.Burst = defaults.DefaultBurst * 2 - restConfig.QPS = defaults.DefaultQPS * 2 + restConfig.Burst = defaults.DefaultBurst * 10 + restConfig.QPS = defaults.DefaultQPS * 10 s.namespace, _, err = config.Namespace() s.CheckError(err) s.kubeClient, err = kubernetes.NewForConfig(restConfig) diff --git a/utils/annotations/annotations.go b/utils/annotations/annotations.go index 121c6c0803..067492bc2b 100644 --- a/utils/annotations/annotations.go +++ b/utils/annotations/annotations.go @@ -28,6 +28,8 @@ const ( DesiredReplicasAnnotation = RolloutLabel + "/desired-replicas" // WorkloadGenerationAnnotation is the generation of the referenced workload WorkloadGenerationAnnotation = RolloutLabel + "/workload-generation" + // NotificationEngineAnnotation the annotation notification engine uses to determine if it should notify + NotificationEngineAnnotation = "notified.notifications.argoproj.io" ) // GetDesiredReplicasAnnotation returns the number of desired replicas @@ -219,6 +221,7 @@ var annotationsToSkip = map[string]bool{ RevisionAnnotation: true, RevisionHistoryAnnotation: true, DesiredReplicasAnnotation: true, + NotificationEngineAnnotation: true, } // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key diff --git a/utils/controller/controller.go b/utils/controller/controller.go index 2530e1d5fa..b5c1fc875b 100644 --- a/utils/controller/controller.go +++ b/utils/controller/controller.go @@ -6,6 +6,8 @@ import ( "runtime/debug" "time" + "k8s.io/apimachinery/pkg/api/errors" + log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -157,6 +159,11 @@ func processNextWorkItem(ctx context.Context, workqueue workqueue.RateLimitingIn if err := runSyncHandler(); err != nil { logCtx.Errorf("%s syncHandler error: %v", objType, err) metricsServer.IncError(namespace, name, objType) + + if errors.IsNotFound(err) { + workqueue.Forget(obj) + return nil + } // Put the item back on // the workqueue to handle any transient errors. workqueue.AddRateLimited(key) diff --git a/utils/replicaset/canary.go b/utils/replicaset/canary.go old mode 100755 new mode 100644 index cf41e6baa5..f8fd8fe869 --- a/utils/replicaset/canary.go +++ b/utils/replicaset/canary.go @@ -4,6 +4,8 @@ import ( "encoding/json" "math" + "github.com/argoproj/argo-rollouts/utils/annotations" + log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,7 +17,7 @@ import ( const ( // EphemeralMetadataAnnotation denotes pod metadata which is ephemerally injected to canary/stable pods - EphemeralMetadataAnnotation = "rollout.argoproj.io/ephemeral-metadata" + EphemeralMetadataAnnotation = annotations.RolloutLabel + "/ephemeral-metadata" ) func allDesiredAreAvailable(rs *appsv1.ReplicaSet, desired int32) bool {