Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rs conflict with fallback to patch #3559

Merged
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7943d8d
fix: fallback to patch on scale conflict
zachaller Apr 29, 2024
650e86d
typo
zachaller May 7, 2024
ae05623
cleanup commented out code
zachaller May 7, 2024
29fe254
Trigger Build
zachaller May 8, 2024
8bfb37d
only patch rollouts manged fields
zachaller May 8, 2024
7526220
lint
zachaller May 8, 2024
3822105
fix flaky test
zachaller May 8, 2024
b30b93d
fix flaky test
zachaller May 8, 2024
5e2bda5
reduce patch size
zachaller May 8, 2024
0dd919e
get some logs
zachaller May 8, 2024
90d5ada
cleanup
zachaller May 8, 2024
aa8f677
improve tests
zachaller May 8, 2024
332ef46
Trigger Build
zachaller May 9, 2024
a73bfa5
add env var to log diff
zachaller May 9, 2024
23f6883
remove expirment rs patch
zachaller May 9, 2024
a6297b6
imporve logs
zachaller May 9, 2024
a913d1c
use correct variable
zachaller May 9, 2024
401c8c8
change env var
zachaller May 9, 2024
0e6db13
fix flaky e2e
zachaller May 9, 2024
3810f1d
fix flaky e2e
zachaller May 9, 2024
2384b7e
fix flaky e2e
zachaller May 9, 2024
89ed30f
remove not found rollouts
zachaller May 9, 2024
61a56bb
update replica count
zachaller May 9, 2024
d9a6655
lint
zachaller May 9, 2024
faaafc6
refactor cleanup
zachaller May 9, 2024
2aeaf63
keep track of UpdatedReplicas on sync
zachaller May 10, 2024
7411720
some hpa tests and log changes
zachaller May 10, 2024
f4a2e27
remove update to UpdatedReplicas
zachaller May 10, 2024
bec20f2
add more test
zachaller May 13, 2024
a9e11ef
fix test
zachaller May 13, 2024
1a6fe3b
undo change
zachaller Jun 4, 2024
8454637
add comment to flaky tests
zachaller Jun 4, 2024
2b73c42
cleanup Makefile
zachaller Jun 4, 2024
5f09e84
Merge branch 'master' of github.com:argoproj/argo-rollouts into fix-r…
zachaller Jun 4, 2024
66b2e59
remove test
zachaller Jun 4, 2024
cce7ac4
use labels
zachaller Jun 4, 2024
1272d22
remove make file change
zachaller Jun 4, 2024
bbcc80e
add label to test
zachaller Jun 4, 2024
e1bc540
review changes
zachaller Jun 6, 2024
4ebd960
change to TODO
zachaller Jun 6, 2024
fed308b
fix test
zachaller Jun 6, 2024
046fde9
Merge branch 'master' of github.com:argoproj/argo-rollouts into fix-r…
zachaller Jun 6, 2024
e39bb6e
add extra logging for tests
zachaller Jun 6, 2024
697a55f
Trigger Build
zachaller Jun 6, 2024
e52400d
add ignore to codecov
zachaller Jun 6, 2024
ec9d9b0
we always generate patch because we are comparing against emtpy obj
zachaller Jun 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ ignore:
- 'pkg/client/.*'
- 'vendor/.*'
- '**/mocks/*'
- 'hack/gen-crd-spec/main.go'
- 'hack/gen-docs/main.go'
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ jobs:
with:
name: e2e-controller-k8s-${{ matrix.kubernetes-minor-version }}.log
path: /tmp/e2e-controller.log
if: ${{ failure() }}
if: ${{ always() }}
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ DEV_IMAGE ?= false
E2E_INSTANCE_ID ?= argo-rollouts-e2e
E2E_TEST_OPTIONS ?=
E2E_PARALLEL ?= 1
E2E_WAIT_TIMEOUT ?= 120
E2E_WAIT_TIMEOUT ?= 90
GOPATH ?= $(shell go env GOPATH)

# Global toolchain configuration
Expand Down
11 changes: 6 additions & 5 deletions hack/gen-crd-spec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ func NewCustomResourceDefinition() []*extensionsobj.CustomResourceDefinition {
// clean up stuff left by controller-gen
deleteFile("config/webhook/manifests.yaml")
deleteFile("config/webhook")
deleteFile("config/argoproj.io_analysisruns.yaml")
deleteFile("config/argoproj.io_analysistemplates.yaml")
deleteFile("config/argoproj.io_clusteranalysistemplates.yaml")
deleteFile("config/argoproj.io_experiments.yaml")
deleteFile("config/argoproj.io_rollouts.yaml")
deleteFile("config/crd/argoproj.io_analysisruns.yaml")
deleteFile("config/crd/argoproj.io_analysistemplates.yaml")
deleteFile("config/crd/argoproj.io_clusteranalysistemplates.yaml")
deleteFile("config/crd/argoproj.io_experiments.yaml")
deleteFile("config/crd/argoproj.io_rollouts.yaml")
deleteFile("config/crd")
deleteFile("config")

crds := []*extensionsobj.CustomResourceDefinition{}
Expand Down
2 changes: 1 addition & 1 deletion rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
}
scaled, _, err := c.scaleReplicaSetAndRecordEvent(c.stableRS, desiredStableRSReplicaCount)
if err != nil {
return scaled, fmt.Errorf("failed to scaleReplicaSetAndRecordEvent in reconcileCanaryStableReplicaSet:L %w", err)
return scaled, fmt.Errorf("failed to scaleReplicaSetAndRecordEvent in reconcileCanaryStableReplicaSet: %w", err)

Check warning on line 115 in rollout/canary.go

View check run for this annotation

Codecov / codecov/patch

rollout/canary.go#L115

Added line #L115 was not covered by tests
}
return scaled, err
}
Expand Down
109 changes: 109 additions & 0 deletions rollout/canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"testing"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stesting "k8s.io/client-go/testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -2141,3 +2147,106 @@ func TestCanaryReplicaAndSpecChangedTogether(t *testing.T) {
// check the canary one is updated
assert.NotEqual(t, originReplicas, int(*updated.Spec.Replicas))
}

func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")

f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
SetWeight: int32Ptr(10),
}, {
Pause: &v1alpha1.RolloutPause{
Duration: v1alpha1.DurationFromInt(10),
},
},
}
r1 := newCanaryRollout("foo", 10, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0))
r1.Spec.Template.Labels["rollout.argoproj.io/foo"] = "bar"

rs1 := newReplicaSetWithStatus(r1, 10, 10)
r1.Spec.Replicas = pointer.Int32(2)
f.kubeobjects = append(f.kubeobjects, rs1)
f.replicaSetLister = append(f.replicaSetLister, rs1)

f.rolloutLister = append(f.rolloutLister, r1)
f.objects = append(f.objects, r1)

f.expectPatchRolloutAction(r1)
f.expectUpdateReplicaSetAction(rs1) // attempt to scale replicaset but conflict
patchIndex := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset

key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name)
c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute })

f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(k8stesting.UpdateAction).GetObject(), errors.NewConflict(schema.GroupResource{
Group: "Apps",
Resource: "ReplicaSet",
}, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error"))
})

f.runController(key, true, false, c, i, k8sI)

updatedRs := f.getPatchedReplicaSet(patchIndex) // minus one because update did not happen because conflict
assert.Equal(t, int32(2), *updatedRs.Spec.Replicas)
}

func TestSyncRolloutWithConflictInSyncReplicaSetRevision(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")

f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
SetWeight: int32Ptr(10),
}, {
Pause: &v1alpha1.RolloutPause{
Duration: v1alpha1.DurationFromInt(10),
},
},
}
r1 := newCanaryRollout("foo", 3, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0))
r2 := bumpVersion(r1)

rs1 := newReplicaSetWithStatus(r1, 3, 3)
rs2 := newReplicaSetWithStatus(r2, 3, 3)
rs2.Annotations["rollout.argoproj.io/revision"] = "1"

f.kubeobjects = append(f.kubeobjects, rs1, rs2)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2)

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name)
c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute })

f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &appsv1.ReplicaSet{}, errors.NewConflict(schema.GroupResource{
Group: "Apps",
Resource: "ReplicaSet",
}, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error"))
})

f.expectPatchRolloutAction(r2)
f.expectUpdateReplicaSetAction(rs1) // attempt to update replicaset revision but conflict
patchIndex1 := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset

f.expectUpdateReplicaSetAction(rs2) // attempt to scale replicaset but conflict
patchIndex2 := f.expectPatchReplicaSetAction(rs2) // instead of update patch replicaset

f.runController(key, true, false, c, i, k8sI)

updatedRs1 := f.getPatchedReplicaSet(patchIndex1)
assert.Equal(t, "2", updatedRs1.Annotations["rollout.argoproj.io/revision"])
assert.Equal(t, int32(3), *updatedRs1.Spec.Replicas)

updatedRs2 := f.getPatchedReplicaSet(patchIndex2)
assert.Equal(t, int32(0), *updatedRs2.Spec.Replicas)
}
98 changes: 98 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/argoproj/argo-rollouts/utils/annotations"

"github.com/argoproj/argo-rollouts/utils/diff"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -937,3 +944,94 @@
}
return &remarshalled
}

// updateReplicaSetWithPatch updates the replicaset using Update and on failure falls back to a patch this function only exists to make sure we always can update
// replicasets and to not get into an conflict loop updating replicasets. We should really look into a complete refactor of how rollouts handles replicasets such
// that we do not keep a fully replicaset on the rollout context under newRS and instead switch to a patch only based approach.
func (c *rolloutContext) updateReplicaSetFallbackToPatch(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) {
updatedRS, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Update(ctx, rs, metav1.UpdateOptions{})
if err != nil {
if errors.IsConflict(err) {
if os.Getenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") == "true" {
rsGet, err := c.replicaSetLister.ReplicaSets(rs.Namespace).Get(rs.Name)
if err != nil {
return nil, fmt.Errorf("error getting replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 958 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L958

Added line #L958 was not covered by tests
}
rsGetJson, err := json.Marshal(rsGet)
if err != nil {
return nil, fmt.Errorf("error marshalling informer replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 962 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L962

Added line #L962 was not covered by tests
}
rsCopyJson, err := json.Marshal(rs)
if err != nil {
return nil, fmt.Errorf("error marshalling memory replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 966 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L966

Added line #L966 was not covered by tests
}
c.log.Infof("Informer RS: %s", rsGetJson)
c.log.Infof("Memory RS: %s", rsCopyJson)
}

c.log.Infof("Conflict when updating replicaset %s, falling back to patch", rs.Name)

patchRS := appsv1.ReplicaSet{}
patchRS.Spec.Replicas = rs.Spec.Replicas
patchRS.Spec.Template.Labels = rs.Spec.Template.Labels
patchRS.Spec.Template.Annotations = rs.Spec.Template.Annotations

patchRS.Annotations = make(map[string]string)
patchRS.Labels = make(map[string]string)
patchRS.Spec.Selector = &metav1.LabelSelector{
MatchLabels: make(map[string]string),
}

if _, found := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; found {
patchRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
}

if _, found := rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; found {
patchRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = rs.Labels[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]

Check warning on line 990 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L990

Added line #L990 was not covered by tests
}

if _, found := rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]; found {
patchRS.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]
}

for key, value := range rs.Annotations {
if strings.HasPrefix(key, annotations.RolloutLabel) ||
strings.HasPrefix(key, "argo-rollouts.argoproj.io") ||
strings.HasPrefix(key, "experiment.argoproj.io") {
patchRS.Annotations[key] = value
}
}
for key, value := range rs.Labels {
if strings.HasPrefix(key, annotations.RolloutLabel) ||
strings.HasPrefix(key, "argo-rollouts.argoproj.io") ||
strings.HasPrefix(key, "experiment.argoproj.io") {
patchRS.Labels[key] = value
}
}

patch, changed, err := diff.CreateTwoWayMergePatch(appsv1.ReplicaSet{}, patchRS, appsv1.ReplicaSet{})
leoluz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("error creating patch for conflict log in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 1014 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L1014

Added line #L1014 was not covered by tests
}

if changed {
c.log.Infof("Patching replicaset with patch: %s", string(patch))
updatedRS, err = c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("error patching replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 1021 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L1021

Added line #L1021 was not covered by tests
}
}

err = c.replicaSetInformer.GetIndexer().Update(updatedRS)
if err != nil {
return nil, fmt.Errorf("error updating replicaset informer in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)

Check warning on line 1027 in rollout/controller.go

View check run for this annotation

Codecov / codecov/patch

rollout/controller.go#L1027

Added line #L1027 was not covered by tests
}

return updatedRS, err
}
}
if updatedRS != nil {
updatedRS.DeepCopyInto(rs)
}
return rs, err
}
21 changes: 21 additions & 0 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,12 @@ func (f *fixture) expectPatchServiceAction(s *corev1.Service, newLabel string) i
return len
}

func (f *fixture) expectGetReplicaSetAction(r *appsv1.ReplicaSet) int { //nolint:unused
len := len(f.kubeactions)
f.kubeactions = append(f.kubeactions, core.NewGetAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r.Name))
return len
}
Comment on lines +795 to +799
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably add several test cases to validate the new updateReplicaSetFallbackToPatch method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two test that cover most the happy paths to lock in the behavior on the two conflict points here. I don't think it's worth going down a lot of the error paths like incorrect json marshal's etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the coverage is comparing the wrong base from the link above.


func (f *fixture) expectCreateReplicaSetAction(r *appsv1.ReplicaSet) int {
len := len(f.kubeactions)
f.kubeactions = append(f.kubeactions, core.NewCreateAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r))
Expand Down Expand Up @@ -950,6 +956,21 @@ func (f *fixture) getUpdatedReplicaSet(index int) *appsv1.ReplicaSet {
return rs
}

func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet {
action := filterInformerActions(f.kubeclient.Actions())[index]
patchAction, ok := action.(core.PatchAction)
if !ok {
f.t.Fatalf("Expected Patch action, not %s", action.GetVerb())
}

rs := appsv1.ReplicaSet{}
err := json.Unmarshal(patchAction.GetPatch(), &rs)
if err != nil {
panic(err)
}
return &rs
}

func (f *fixture) verifyPatchedReplicaSet(index int, scaleDownDelaySeconds int32) {
action := filterInformerActions(f.kubeclient.Actions())[index]
patchAction, ok := action.(core.PatchAction)
Expand Down
10 changes: 4 additions & 6 deletions rollout/ephemeralmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@
}

// 2. Update ReplicaSet so that any new pods it creates will have the metadata
rs, err = c.kubeclientset.AppsV1().ReplicaSets(modifiedRS.Namespace).Update(ctx, modifiedRS, metav1.UpdateOptions{})
rs, err = c.updateReplicaSetFallbackToPatch(ctx, modifiedRS)
if err != nil {
return fmt.Errorf("error updating replicaset in syncEphemeralMetadata: %w", err)
}
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error updating replicaset informer in syncEphemeralMetadata: %w", err)
c.log.Infof("failed to sync ephemeral metadata %v to ReplicaSet %s: %v", podMetadata, rs.Name, err)
return fmt.Errorf("failed to sync ephemeral metadata: %w", err)

Check warning on line 88 in rollout/ephemeralmetadata.go

View check run for this annotation

Codecov / codecov/patch

rollout/ephemeralmetadata.go#L87-L88

Added lines #L87 - L88 were not covered by tests
}

c.log.Infof("synced ephemeral metadata %v to ReplicaSet %s", podMetadata, rs.Name)
return nil
}
Loading
Loading