diff --git a/.github/workflows/e2e-advanced-deployment-1.19.yaml b/.github/workflows/e2e-advanced-deployment-1.19.yaml index 527b0018..cad7c5c8 100644 --- a/.github/workflows/e2e-advanced-deployment-1.19.yaml +++ b/.github/workflows/e2e-advanced-deployment-1.19.yaml @@ -90,7 +90,7 @@ jobs: echo "Timeout to wait for kruise-rollout ready" exit 1 fi - - name: Run E2E Tests + - name: Run E2E Tests For Deployment Controller run: | export KUBECONFIG=/home/runner/.kube/config make ginkgo @@ -108,3 +108,21 @@ jobs: exit 1 fi exit $retVal + - name: Run E2E Tests For Control Plane + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi + exit $retVal diff --git a/.github/workflows/e2e-advanced-deployment-1.23.yaml b/.github/workflows/e2e-advanced-deployment-1.23.yaml index 3aaa9ee3..e236c34d 100644 --- a/.github/workflows/e2e-advanced-deployment-1.23.yaml +++ b/.github/workflows/e2e-advanced-deployment-1.23.yaml @@ -90,7 +90,7 @@ jobs: echo "Timeout to wait for kruise-rollout ready" exit 1 fi - - name: Run E2E Tests + - name: Run E2E Tests For Deployment Controller run: | export KUBECONFIG=/home/runner/.kube/config make ginkgo @@ -107,4 +107,22 @@ jobs: kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout exit 1 fi + exit $retVal + - name: Run E2E Tests For Control Plane + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi exit $retVal \ No newline at end of file diff --git a/api/v1alpha1/deployment_types.go b/api/v1alpha1/deployment_types.go index b750db5e..eb40e7fe 100644 --- a/api/v1alpha1/deployment_types.go +++ b/api/v1alpha1/deployment_types.go @@ -13,6 +13,10 @@ const ( // DeploymentExtraStatusAnnotation is annotation for deployment, // which is extra status field of Advanced Deployment. DeploymentExtraStatusAnnotation = "rollouts.kruise.io/deployment-extra-status" + + // DeploymentStableRevisionLabel is label for deployment, + // which record the stable revision during the current rolling process. + DeploymentStableRevisionLabel = "rollouts.kruise.io/stable-revision" ) // DeploymentStrategy is strategy field for Advanced Deployment diff --git a/api/v1alpha1/rollout_types.go b/api/v1alpha1/rollout_types.go index db174046..4661058c 100644 --- a/api/v1alpha1/rollout_types.go +++ b/api/v1alpha1/rollout_types.go @@ -42,12 +42,15 @@ const ( // RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style. RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch" - // DeploymentRolloutStyleAnnotation define the rolling behavior for Deployment. + // RolloutStyleAnnotation define the rolling behavior for Deployment. // must be "partition" or "canary": - // * "partition" means rolling Deployment in batches just like CloneSet, and will NOT create any extra Deployment; - // * "canary" means rolling in canary way, and will create a canary Deployment. - // Defaults to canary - DeploymentRolloutStyleAnnotation = "rollouts.kruise.io/deployment-rolling-style" + // * "partition" means rolling in batches just like CloneSet, and will NOT create any extra Workload; + // * "canary" means rolling in canary way, and will create a canary Workload. + // Currently, only Deployment support both "partition" and "canary" rolling styles. + // For other workload types, they only support "partition" styles. + // Defaults to "canary" to Deployment. + // Defaults to "partition" to the others. + RolloutStyleAnnotation = "rollouts.kruise.io/rolling-style" ) // RolloutSpec defines the desired state of Rollout diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index aa9b95f5..36663923 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -15,4 +15,7 @@ spec: - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" - "--feature-gates=AdvancedDeployment=true" - - "--v=3" + - "--v=5" + env: + - name: KUBE_CACHE_MUTATION_DETECTOR + value: "true" diff --git a/pkg/controller/batchrelease/batchrelease_executor.go b/pkg/controller/batchrelease/batchrelease_executor.go index c66d73a6..bc217f54 100644 --- a/pkg/controller/batchrelease/batchrelease_executor.go +++ b/pkg/controller/batchrelease/batchrelease_executor.go @@ -19,15 +19,17 @@ package batchrelease import ( "fmt" "reflect" + "strings" "time" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle" - "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment" + canarydeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/cloneset" + partitiondeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/deployment" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/statefulset" "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" @@ -195,19 +197,24 @@ func (r *Executor) getReleaseController(release *v1alpha1.BatchRelease, newStatu switch targetRef.APIVersion { case appsv1alpha1.GroupVersion.String(): if targetRef.Kind == reflect.TypeOf(appsv1alpha1.CloneSet{}).Name() { - klog.InfoS("Using CloneSet batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + klog.InfoS("Using CloneSet partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) return partitionstyle.NewControlPlane(cloneset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil } case apps.SchemeGroupVersion.String(): if targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() { - klog.InfoS("Using Deployment batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) - return canarystyle.NewControlPlane(deployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil + if strings.EqualFold(release.Annotations[v1alpha1.RolloutStyleAnnotation], string(v1alpha1.PartitionRollingStyle)) { + klog.InfoS("Using Deployment partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + return partitionstyle.NewControlPlane(partitiondeployment.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil + } else { + klog.InfoS("Using Deployment canary-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + return canarystyle.NewControlPlane(canarydeployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil + } } } // try to use StatefulSet-like rollout controller by default - klog.InfoS("Using StatefulSet-like batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + klog.InfoS("Using StatefulSet-Like partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) return partitionstyle.NewControlPlane(statefulset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil } diff --git a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go index 87db6ed9..1cad56ac 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go +++ b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go @@ -136,7 +136,7 @@ func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { return err } - klog.Infof("Successfully finalize StatefulSet %v", klog.KObj(rc.object)) + klog.Infof("Successfully finalize CloneSet %v", klog.KObj(rc.object)) return nil } diff --git a/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go b/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go new file mode 100644 index 00000000..073bd356 --- /dev/null +++ b/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go @@ -0,0 +1,219 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "context" + "encoding/json" + + "github.com/openkruise/rollouts/api/v1alpha1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle" + deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" + "github.com/openkruise/rollouts/pkg/util" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type realController struct { + *util.WorkloadInfo + client client.Client + pods []*corev1.Pod + key types.NamespacedName + object *apps.Deployment +} + +func NewController(cli client.Client, key types.NamespacedName, _ schema.GroupVersionKind) partitionstyle.Interface { + return &realController{ + key: key, + client: cli, + } +} + +func (rc *realController) GetInfo() *util.WorkloadInfo { + return rc.WorkloadInfo +} + +func (rc *realController) BuildController() (partitionstyle.Interface, error) { + if rc.object != nil { + return rc, nil + } + object := &apps.Deployment{} + if err := rc.client.Get(context.TODO(), rc.key, object); err != nil { + return rc, err + } + rc.object = object + workloadInfo, err := rc.getWorkloadInfo(object) + if err != nil { + return nil, err + } + rc.WorkloadInfo = workloadInfo + return rc, nil +} + +func (rc *realController) ListOwnedPods() ([]*corev1.Pod, error) { + if rc.pods != nil { + return rc.pods, nil + } + var err error + rc.pods, err = util.ListOwnedPods(rc.client, rc.object) + return rc.pods, err +} + +func (rc *realController) Initialize(release *v1alpha1.BatchRelease) error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + d := &apps.Deployment{} + if err := rc.client.Get(context.TODO(), rc.key, d); err != nil { + return err + } + if control.IsControlledByBatchRelease(release, d) { + return nil + } + if d.Annotations == nil { + d.Annotations = map[string]string{} + } + // set strategy to annotations + strategy := util.UnmarshalledDeploymentStrategy(d) + rollingUpdate := strategy.RollingUpdate + if d.Spec.Strategy.RollingUpdate != nil { + rollingUpdate = d.Spec.Strategy.RollingUpdate + } + d.Annotations[v1alpha1.DeploymentStrategyAnnotation] = util.MarshalledDeploymentStrategy( + v1alpha1.PartitionRollingStyle, rollingUpdate, intstr.FromInt(0), false) + // claim the deployment is under our control + owner, _ := json.Marshal(metav1.NewControllerRef(release, release.GetObjectKind().GroupVersionKind())) + d.Annotations[util.BatchReleaseControlAnnotation] = string(owner) + // disable the native deployment controller + d.Spec.Paused = true + d.Spec.Strategy = apps.DeploymentStrategy{Type: apps.RecreateDeploymentStrategyType} + return rc.client.Update(context.TODO(), d) + }); err != nil { + return err + } + + klog.Infof("Successfully initialized Deployment %v", rc.key) + return nil +} + +func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + d := &apps.Deployment{} + if err := rc.client.Get(context.TODO(), rc.key, d); err != nil { + return err + } + if !deploymentutil.IsUnderRolloutControl(d) { + klog.Warningf("Cannot upgrade batch because Deployment %v is not under our control", rc.key) + return nil + } + strategy := util.UnmarshalledDeploymentStrategy(d) + if control.IsCurrentMoreThanOrEqualToDesired(strategy.Partition, ctx.DesiredPartition) { + return nil + } + strategy.Partition = ctx.DesiredPartition + strategyAnno, _ := json.Marshal(&strategy) + d.Annotations[v1alpha1.DeploymentStrategyAnnotation] = string(strategyAnno) + return rc.client.Update(context.TODO(), d) + }); err != nil { + return err + } + + klog.Infof("Successfully submit partition %v for Deployment %v", ctx.DesiredPartition, rc.key) + return nil +} + +func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { + if rc.object == nil { + return nil + } + + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + d := &apps.Deployment{} + if err := rc.client.Get(context.TODO(), rc.key, d); err != nil { + return client.IgnoreNotFound(err) + } + if release.Spec.ReleasePlan.BatchPartition == nil { + d.Spec.Paused = false + strategy := util.UnmarshalledDeploymentStrategy(d) + d.Spec.Strategy.Type = apps.RollingUpdateDeploymentStrategyType + d.Spec.Strategy.RollingUpdate = strategy.RollingUpdate + delete(d.Annotations, v1alpha1.DeploymentStrategyAnnotation) + delete(d.Annotations, v1alpha1.DeploymentExtraStatusAnnotation) + delete(d.Labels, v1alpha1.DeploymentStableRevisionLabel) + } + delete(d.Annotations, util.BatchReleaseControlAnnotation) + return rc.client.Update(context.TODO(), d) + }); err != nil { + return err + } + + klog.Infof("Successfully finalize Deployment %v", klog.KObj(rc.object)) + return nil +} + +func (rc *realController) CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) { + rolloutID := release.Spec.ReleasePlan.RolloutID + if rolloutID != "" { + // if rollout-id is set, the pod will be patched batch label, + // so we have to list pod here. + if _, err := rc.ListOwnedPods(); err != nil { + return nil, err + } + } + + currentBatch := release.Status.CanaryStatus.CurrentBatch + desiredPartition := release.Spec.ReleasePlan.Batches[currentBatch].CanaryReplicas + PlannedUpdatedReplicas := deploymentutil.NewRSReplicasLimit(desiredPartition, rc.object) + + return &batchcontext.BatchContext{ + Pods: rc.pods, + RolloutID: rolloutID, + CurrentBatch: currentBatch, + UpdateRevision: release.Status.UpdateRevision, + DesiredPartition: desiredPartition, + FailureThreshold: release.Spec.ReleasePlan.FailureThreshold, + + Replicas: rc.Replicas, + UpdatedReplicas: rc.Status.UpdatedReplicas, + UpdatedReadyReplicas: rc.Status.UpdatedReadyReplicas, + PlannedUpdatedReplicas: PlannedUpdatedReplicas, + DesiredUpdatedReplicas: PlannedUpdatedReplicas, + }, nil +} + +func (rc *realController) getWorkloadInfo(d *apps.Deployment) (*util.WorkloadInfo, error) { + workloadInfo := util.ParseWorkload(d) + extraStatus := util.UnmarshalledDeploymentExtraStatus(d) + workloadInfo.Status.UpdatedReadyReplicas = extraStatus.UpdatedReadyReplicas + finder := util.NewControllerFinder(rc.client) + rss, err := finder.GetReplicaSetsForDeployment(d) + if err != nil { + return nil, err + } + _, oldRS := util.FindCanaryAndStableReplicaSet(rss, d) + if oldRS != nil { + workloadInfo.Status.StableRevision = oldRS.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + return workloadInfo, nil +} diff --git a/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go b/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go new file mode 100644 index 00000000..ff9bfb38 --- /dev/null +++ b/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go @@ -0,0 +1,365 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + "github.com/openkruise/rollouts/api/v1alpha1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/util" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var ( + scheme = runtime.NewScheme() + + deploymentKey = types.NamespacedName{ + Name: "deployment", + Namespace: "default", + } + + deploymentDemo = &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentKey.Name, + Namespace: deploymentKey.Namespace, + Generation: 1, + Labels: map[string]string{ + "app": "busybox", + }, + Annotations: map[string]string{ + "type": "unit-test", + }, + }, + Spec: apps.DeploymentSpec{ + Paused: true, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "busybox", + }, + }, + Replicas: pointer.Int32(10), + Strategy: apps.DeploymentStrategy{ + Type: apps.RollingUpdateDeploymentStrategyType, + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + MaxSurge: &intstr.IntOrString{Type: intstr.String, StrVal: "20%"}, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "busybox", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox:latest", + }, + }, + }, + }, + }, + Status: apps.DeploymentStatus{ + Replicas: 10, + UpdatedReplicas: 10, + ReadyReplicas: 10, + AvailableReplicas: 10, + CollisionCount: pointer.Int32Ptr(1), + ObservedGeneration: 1, + }, + } + + releaseDemo = &v1alpha1.BatchRelease{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rollouts.kruise.io/v1alpha1", + Kind: "BatchRelease", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "release", + Namespace: deploymentKey.Namespace, + UID: uuid.NewUUID(), + }, + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: intstr.FromString("10%"), + }, + { + CanaryReplicas: intstr.FromString("50%"), + }, + { + CanaryReplicas: intstr.FromString("100%"), + }, + }, + }, + TargetRef: v1alpha1.ObjectRef{ + WorkloadRef: &v1alpha1.WorkloadRef{ + APIVersion: deploymentDemo.APIVersion, + Kind: deploymentDemo.Kind, + Name: deploymentDemo.Name, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 1, + }, + }, + } +) + +func init() { + apps.AddToScheme(scheme) + v1alpha1.AddToScheme(scheme) + kruiseappsv1alpha1.AddToScheme(scheme) +} + +func TestCalculateBatchContext(t *testing.T) { + RegisterFailHandler(Fail) + + percent := intstr.FromString("20%") + cases := map[string]struct { + workload func() *apps.Deployment + release func() *v1alpha1.BatchRelease + result *batchcontext.BatchContext + }{ + "noraml case": { + workload: func() *apps.Deployment { + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha1.DeploymentStrategyAnnotation: util.MarshalledDeploymentStrategy( + v1alpha1.PartitionRollingStyle, + &apps.RollingUpdateDeployment{MaxUnavailable: &percent, MaxSurge: &percent}, + percent, false), + v1alpha1.DeploymentExtraStatusAnnotation: util.MarshalledDeploymentExtraStatus( + 1, 2), + }, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(10), + }, + Status: apps.DeploymentStatus{ + Replicas: 10, + UpdatedReplicas: 2, + AvailableReplicas: 9, + ReadyReplicas: 9, + }, + } + return deployment + }, + release: func() *v1alpha1.BatchRelease { + r := &v1alpha1.BatchRelease{ + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FailureThreshold: &percent, + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: percent, + }, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 0, + }, + UpdateRevision: "version-2", + }, + } + return r + }, + result: &batchcontext.BatchContext{ + CurrentBatch: 0, + UpdateRevision: "version-2", + DesiredPartition: percent, + FailureThreshold: &percent, + + Replicas: 10, + UpdatedReplicas: 2, + UpdatedReadyReplicas: 1, + PlannedUpdatedReplicas: 2, + DesiredUpdatedReplicas: 2, + }, + }, + "partition=90%, replicas=5": { + workload: func() *apps.Deployment { + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha1.DeploymentStrategyAnnotation: util.MarshalledDeploymentStrategy( + v1alpha1.PartitionRollingStyle, + &apps.RollingUpdateDeployment{MaxUnavailable: &percent, MaxSurge: &percent}, + intstr.FromString("20%"), false), + v1alpha1.DeploymentExtraStatusAnnotation: util.MarshalledDeploymentExtraStatus( + 4, 4), + }, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(5), + }, + Status: apps.DeploymentStatus{ + Replicas: 5, + UpdatedReplicas: 4, + AvailableReplicas: 5, + ReadyReplicas: 5, + }, + } + return deployment + }, + release: func() *v1alpha1.BatchRelease { + r := &v1alpha1.BatchRelease{ + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FailureThreshold: &percent, + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: intstr.FromString("90%"), + }, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 0, + }, + UpdateRevision: "version-2", + }, + } + return r + }, + result: &batchcontext.BatchContext{ + CurrentBatch: 0, + UpdateRevision: "version-2", + DesiredPartition: intstr.FromString("90%"), + FailureThreshold: &percent, + + Replicas: 5, + UpdatedReplicas: 4, + UpdatedReadyReplicas: 4, + PlannedUpdatedReplicas: 4, + DesiredUpdatedReplicas: 4, + }, + }, + } + + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.workload()).Build() + control := realController{ + client: cli, + } + _, err := control.BuildController() + Expect(err).NotTo(HaveOccurred()) + got, err := control.CalculateBatchContext(cs.release()) + fmt.Println(got) + Expect(err).NotTo(HaveOccurred()) + Expect(got.Log()).Should(Equal(cs.result.Log())) + }) + } +} + +func TestRealController(t *testing.T) { + RegisterFailHandler(Fail) + + release := releaseDemo.DeepCopy() + clone := deploymentDemo.DeepCopy() + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(release, clone).Build() + c := NewController(cli, deploymentKey, clone.GroupVersionKind()).(*realController) + controller, err := c.BuildController() + Expect(err).NotTo(HaveOccurred()) + + err = controller.Initialize(release) + Expect(err).NotTo(HaveOccurred()) + fetch := &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + Expect(fetch.Spec.Paused).Should(BeTrue()) + Expect(fetch.Spec.Strategy.Type).Should(Equal(apps.RecreateDeploymentStrategyType)) + Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal(getControlInfo(release))) + c.object = fetch // mock + + for { + batchContext, err := controller.CalculateBatchContext(release) + Expect(err).NotTo(HaveOccurred()) + err = controller.UpgradeBatch(batchContext) + fetch := &apps.Deployment{} + // mock + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + c.object = fetch + if err == nil { + break + } + } + fetch = &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + strategy := util.UnmarshalledDeploymentStrategy(fetch) + Expect(strategy.Partition.StrVal).Should(Equal("50%")) + + release.Spec.ReleasePlan.BatchPartition = nil + err = controller.Finalize(release) + Expect(err).NotTo(HaveOccurred()) + fetch = &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal("")) + Expect(fetch.Annotations[v1alpha1.DeploymentStrategyAnnotation]).Should(Equal("")) + Expect(fetch.Annotations[v1alpha1.DeploymentExtraStatusAnnotation]).Should(Equal("")) + Expect(fetch.Spec.Paused).Should(BeFalse()) + Expect(fetch.Spec.Strategy.Type).Should(Equal(apps.RollingUpdateDeploymentStrategyType)) + + workloadInfo := controller.GetInfo() + Expect(workloadInfo).ShouldNot(BeNil()) + checkWorkloadInfo(workloadInfo, clone) +} + +func checkWorkloadInfo(stableInfo *util.WorkloadInfo, clone *apps.Deployment) { + Expect(stableInfo.Replicas).Should(Equal(*clone.Spec.Replicas)) + Expect(stableInfo.Status.Replicas).Should(Equal(clone.Status.Replicas)) + Expect(stableInfo.Status.ReadyReplicas).Should(Equal(clone.Status.ReadyReplicas)) + Expect(stableInfo.Status.UpdatedReplicas).Should(Equal(clone.Status.UpdatedReplicas)) + Expect(stableInfo.Status.AvailableReplicas).Should(Equal(clone.Status.AvailableReplicas)) + Expect(stableInfo.Status.ObservedGeneration).Should(Equal(clone.Status.ObservedGeneration)) +} + +func getControlInfo(release *v1alpha1.BatchRelease) string { + owner, _ := json.Marshal(metav1.NewControllerRef(release, release.GetObjectKind().GroupVersionKind())) + return string(owner) +} diff --git a/pkg/controller/batchrelease/control/util.go b/pkg/controller/batchrelease/control/util.go index c1a4a698..a6484ebd 100644 --- a/pkg/controller/batchrelease/control/util.go +++ b/pkg/controller/batchrelease/control/util.go @@ -101,6 +101,14 @@ func GenerateNotFoundError(name, resource string) error { return errors.NewNotFound(schema.GroupResource{Group: "apps", Resource: resource}, name) } +// ShouldWaitResume return true if FinalizingPolicy is "waitResume". func ShouldWaitResume(release *v1alpha1.BatchRelease) bool { return release.Spec.ReleasePlan.FinalizingPolicy == v1alpha1.WaitResumeFinalizingPolicyType } + +// IsCurrentMoreThanOrEqualToDesired return true if current >= desired +func IsCurrentMoreThanOrEqualToDesired(current, desired intstr.IntOrString) bool { + currentNum, _ := intstr.GetScaledValueFromIntOrPercent(¤t, 10000000, true) + desiredNum, _ := intstr.GetScaledValueFromIntOrPercent(&desired, 10000000, true) + return currentNum >= desiredNum +} diff --git a/pkg/controller/batchrelease/control/util_test.go b/pkg/controller/batchrelease/control/util_test.go index fa11b737..baef4387 100644 --- a/pkg/controller/batchrelease/control/util_test.go +++ b/pkg/controller/batchrelease/control/util_test.go @@ -167,3 +167,50 @@ func TestIsControlledByBatchRelease(t *testing.T) { }) } } + +func TestIsCurrentMoreThanOrEqualToDesired(t *testing.T) { + RegisterFailHandler(Fail) + + cases := map[string]struct { + current intstr.IntOrString + desired intstr.IntOrString + result bool + }{ + "current=2,desired=1": { + current: intstr.FromInt(2), + desired: intstr.FromInt(1), + result: true, + }, + "current=2,desired=2": { + current: intstr.FromInt(2), + desired: intstr.FromInt(2), + result: true, + }, + "current=2,desired=3": { + current: intstr.FromInt(2), + desired: intstr.FromInt(3), + result: false, + }, + "current=80%,desired=79%": { + current: intstr.FromString("80%"), + desired: intstr.FromString("79%"), + result: true, + }, + "current=80%,desired=80%": { + current: intstr.FromString("80%"), + desired: intstr.FromString("80%"), + result: true, + }, + "current=90%,desired=91%": { + current: intstr.FromString("90%"), + desired: intstr.FromString("91%"), + result: false, + }, + } + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + got := IsCurrentMoreThanOrEqualToDesired(cs.current, cs.desired) + Expect(got == cs.result).Should(BeTrue()) + }) + } +} diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 53ec156a..e238eddf 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -130,7 +130,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { } if err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, OwnerType: &appsv1.ReplicaSet{}}, predicate.Funcs{}); err != nil { + IsController: true, OwnerType: &appsv1.Deployment{}}, predicate.Funcs{}); err != nil { return err } @@ -187,13 +187,15 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req if err != nil { errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err)) } + if len(errList) > 0 { + return ctrl.Result{}, errList.ToAggregate() + } err = deploymentutil.DeploymentRolloutSatisfied(deployment, dc.strategy.Partition) if err != nil { klog.V(3).Infof("Deployment %v is still rolling: %v", klog.KObj(deployment), err) - return reconcile.Result{RequeueAfter: DefaultRetryDuration}, errList.ToAggregate() + return reconcile.Result{RequeueAfter: DefaultRetryDuration}, nil } - - return ctrl.Result{}, errList.ToAggregate() + return reconcile.Result{}, nil } type controllerFactory DeploymentController diff --git a/pkg/controller/rollout/rollout_canary.go b/pkg/controller/rollout/rollout_canary.go index a02b4770..3ab2b960 100644 --- a/pkg/controller/rollout/rollout_canary.go +++ b/pkg/controller/rollout/rollout_canary.go @@ -362,11 +362,15 @@ func createBatchRelease(rollout *v1alpha1.Rollout, rolloutID string, batch int32 }, }, } + annotations := map[string]string{} if isRollback { - if br.Annotations == nil { - br.Annotations = map[string]string{} - } - br.Annotations[v1alpha1.RollbackInBatchAnnotation] = "true" + annotations[v1alpha1.RollbackInBatchAnnotation] = rollout.Annotations[v1alpha1.RollbackInBatchAnnotation] + } + if style, ok := rollout.Annotations[v1alpha1.RolloutStyleAnnotation]; ok { + annotations[v1alpha1.RolloutStyleAnnotation] = style + } + if len(annotations) > 0 { + br.Annotations = annotations } return br } diff --git a/pkg/controller/rollout/rollout_canary_test.go b/pkg/controller/rollout/rollout_canary_test.go index 9ef53c1e..3ad2d658 100644 --- a/pkg/controller/rollout/rollout_canary_test.go +++ b/pkg/controller/rollout/rollout_canary_test.go @@ -233,7 +233,7 @@ func TestRunCanary(t *testing.T) { trafficRoutingManager: r.trafficRoutingManager, recorder: r.Recorder, } - workload, _ := r.finder.GetWorkloadForRef("", rollout.Spec.ObjectRef.WorkloadRef) + workload, _ := r.finder.GetWorkloadForRef(rollout) c := &util.RolloutContext{ Rollout: rollout, NewStatus: rollout.Status.DeepCopy(), @@ -253,7 +253,8 @@ func TestRunCanary(t *testing.T) { cond := util.GetRolloutCondition(*cStatus, v1alpha1.RolloutConditionProgressing) cond.Message = "" util.SetRolloutCondition(cStatus, *cond) - if !reflect.DeepEqual(cs.expectStatus(), cStatus) { + expectStatus := cs.expectStatus() + if !reflect.DeepEqual(expectStatus, cStatus) { t.Fatalf("expect(%s), but get(%s)", util.DumpJSON(cs.expectStatus()), util.DumpJSON(cStatus)) } }) diff --git a/pkg/controller/rollout/rollout_progressing.go b/pkg/controller/rollout/rollout_progressing.go index c3a90d0b..8f89e902 100644 --- a/pkg/controller/rollout/rollout_progressing.go +++ b/pkg/controller/rollout/rollout_progressing.go @@ -35,7 +35,7 @@ var defaultGracePeriodSeconds int32 = 3 func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1alpha1.Rollout, newStatus *v1alpha1.RolloutStatus) (*time.Time, error) { cond := util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionProgressing) klog.Infof("reconcile rollout(%s/%s) progressing action...", rollout.Namespace, rollout.Name) - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return nil, err diff --git a/pkg/controller/rollout/rollout_progressing_test.go b/pkg/controller/rollout/rollout_progressing_test.go index 6237e6d6..57e4db73 100644 --- a/pkg/controller/rollout/rollout_progressing_test.go +++ b/pkg/controller/rollout/rollout_progressing_test.go @@ -815,7 +815,7 @@ func TestReCalculateCanaryStepIndex(t *testing.T) { recorder: reconciler.Recorder, } rollout := cs.getRollout() - workload, err := reconciler.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := reconciler.finder.GetWorkloadForRef(rollout) if err != nil { t.Fatalf(err.Error()) } diff --git a/pkg/controller/rollout/rollout_status.go b/pkg/controller/rollout/rollout_status.go index 5ceaf7d4..bf62ccf1 100644 --- a/pkg/controller/rollout/rollout_status.go +++ b/pkg/controller/rollout/rollout_status.go @@ -53,7 +53,7 @@ func (r *RolloutReconciler) calculateRolloutStatus(rollout *v1alpha1.Rollout) (r newStatus.Phase = v1alpha1.RolloutPhaseInitial } // get ref workload - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return false, nil, err @@ -182,7 +182,7 @@ func (r *RolloutReconciler) reconcileRolloutTerminating(rollout *v1alpha1.Rollou if cond.Reason == v1alpha1.TerminatingReasonCompleted { return nil, nil } - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return nil, err diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 579a36bd..23fbd3c1 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -37,6 +37,8 @@ const ( KruiseRolloutFinalizer = "rollouts.kruise.io/rollout" // WorkloadTypeLabel is a label to identify workload type WorkloadTypeLabel = "rollouts.kruise.io/workload-type" + // DeploymentRevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence + DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision" ) // For Pods diff --git a/pkg/util/controller_finder.go b/pkg/util/controller_finder.go index 2b563466..07bc7721 100644 --- a/pkg/util/controller_finder.go +++ b/pkg/util/controller_finder.go @@ -83,18 +83,39 @@ func NewControllerFinder(c client.Client) *ControllerFinder { // +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=replicasets/status,verbs=get;update;patch -func (r *ControllerFinder) GetWorkloadForRef(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { - for _, finder := range r.finders() { - scale, err := finder(namespace, ref) - if scale != nil || err != nil { - return scale, err +func (r *ControllerFinder) GetWorkloadForRef(rollout *rolloutv1alpha1.Rollout) (*Workload, error) { + switch strings.ToLower(rollout.Annotations[rolloutv1alpha1.RolloutStyleAnnotation]) { + case strings.ToLower(string(rolloutv1alpha1.PartitionRollingStyle)): + for _, finder := range r.partitionStyleFinders() { + workload, err := finder(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + if workload != nil || err != nil { + return workload, err + } + } + default: + for _, finder := range r.canaryStyleFinders() { + workload, err := finder(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + if workload != nil || err != nil { + return workload, err + } + } + for _, finder := range r.partitionStyleFinders() { + workload, err := finder(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + if workload != nil || err != nil { + return workload, err + } } } + return nil, nil } -func (r *ControllerFinder) finders() []ControllerFinderFunc { - return []ControllerFinderFunc{r.getKruiseCloneSet, r.getDeployment, r.getStatefulSetLikeWorkload} +func (r *ControllerFinder) canaryStyleFinders() []ControllerFinderFunc { + return []ControllerFinderFunc{r.getDeployment} +} + +func (r *ControllerFinder) partitionStyleFinders() []ControllerFinderFunc { + return []ControllerFinderFunc{r.getKruiseCloneSet, r.getAdvancedDeployment, r.getStatefulSetLikeWorkload} } var ( @@ -148,6 +169,59 @@ func (r *ControllerFinder) getKruiseCloneSet(namespace string, ref *rolloutv1alp return workload, nil } +// getPartitionStyleDeployment returns the Advanced Deployment referenced by the provided controllerRef. +func (r *ControllerFinder) getAdvancedDeployment(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { + // This error is irreversible, so there is no need to return error + ok, _ := verifyGroupKind(ref, ControllerKindDep.Kind, []string{ControllerKindDep.Group}) + if !ok { + return nil, nil + } + deployment := &apps.Deployment{} + err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, deployment) + if err != nil { + // when error is NotFound, it is ok here. + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + if deployment.Generation != deployment.Status.ObservedGeneration { + return &Workload{IsStatusConsistent: false}, nil + } + + stableRevision := deployment.Labels[rolloutv1alpha1.DeploymentStableRevisionLabel] + + workload := &Workload{ + RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey, + StableRevision: stableRevision, + CanaryRevision: ComputeHash(&deployment.Spec.Template, nil), + ObjectMeta: deployment.ObjectMeta, + Replicas: *deployment.Spec.Replicas, + IsStatusConsistent: true, + } + + // not in rollout progressing + if _, ok = workload.Annotations[InRolloutProgressingAnnotation]; !ok { + return workload, nil + } + // set pod template hash for canary + rss, err := r.GetReplicaSetsForDeployment(deployment) + if err != nil { + return &Workload{IsStatusConsistent: false}, err + } + newRS, _ := FindCanaryAndStableReplicaSet(rss, deployment) + if newRS != nil { + workload.PodTemplateHash = newRS.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + // in rolling back + if workload.StableRevision != "" && workload.StableRevision == workload.PodTemplateHash { + workload.IsInRollback = true + } + // in rollout progressing + workload.InRolloutProgressing = true + return workload, nil +} + // getDeployment returns the k8s native deployment referenced by the provided controllerRef. func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { // This error is irreversible, so there is no need to return error @@ -190,7 +264,6 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1. workload.InRolloutProgressing = true // workload is continuous release, indicates rollback(v1 -> v2 -> v1) // delete auto-generated labels - delete(stableRs.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey) if EqualIgnoreHash(&stableRs.Spec.Template, &stable.Spec.Template) { workload.IsInRollback = true return workload, nil @@ -274,7 +347,7 @@ func (r *ControllerFinder) getLatestCanaryDeployment(stable *apps.Deployment) (* return nil, nil } -func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([]apps.ReplicaSet, error) { +func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([]*apps.ReplicaSet, error) { // List ReplicaSets owned by this Deployment rsList := &apps.ReplicaSetList{} selector, err := metav1.LabelSelectorAsSelector(obj.Spec.Selector) @@ -288,7 +361,7 @@ func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([] return nil, err } - rss := make([]apps.ReplicaSet, 0) + var rss []*apps.ReplicaSet for i := range rsList.Items { rs := rsList.Items[i] if !rs.DeletionTimestamp.IsZero() || (rs.Spec.Replicas != nil && *rs.Spec.Replicas == 0) { @@ -296,7 +369,7 @@ func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([] } if ref := metav1.GetControllerOf(&rs); ref != nil { if ref.UID == obj.UID { - rss = append(rss, rs) + rss = append(rss, &rs) } } } @@ -315,7 +388,7 @@ func (r *ControllerFinder) getDeploymentStableRs(obj *apps.Deployment) (*apps.Re sort.Slice(rss, func(i, j int) bool { return rss[i].CreationTimestamp.Before(&rss[j].CreationTimestamp) }) - return &rss[0], nil + return rss[0], nil } func verifyGroupKind(ref *rolloutv1alpha1.WorkloadRef, expectedKind string, expectedGroups []string) (bool, error) { diff --git a/pkg/util/workloads_utils.go b/pkg/util/workloads_utils.go index 21bd8346..9df126b3 100644 --- a/pkg/util/workloads_utils.go +++ b/pkg/util/workloads_utils.go @@ -19,14 +19,18 @@ package util import ( "context" "encoding/binary" + "encoding/json" "fmt" "hash" "hash/fnv" + "sort" + "strconv" "strings" "github.com/davecgh/go-spew/spew" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" + "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/feature" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -372,3 +376,79 @@ func GetEmptyObjectWithKey(object client.Object) client.Object { empty.SetNamespace(object.GetNamespace()) return empty } + +// MarshalledDeploymentStrategy return a annotation value for rollouts.kruise.io/deployment-strategy +func MarshalledDeploymentStrategy(style v1alpha1.RollingStyleType, rollingUpdate *apps.RollingUpdateDeployment, partition intstr.IntOrString, paused bool) string { + strategyStr, _ := json.Marshal(&v1alpha1.DeploymentStrategy{ + Paused: paused, + Partition: partition, + RollingStyle: style, + RollingUpdate: rollingUpdate, + }) + return string(strategyStr) +} + +// UnmarshalledDeploymentStrategy decode the strategy object for advanced deployment +// from the annotation rollouts.kruise.io/deployment-strategy +func UnmarshalledDeploymentStrategy(deployment *apps.Deployment) v1alpha1.DeploymentStrategy { + strategy := v1alpha1.DeploymentStrategy{} + if deployment == nil { + return strategy + } + strategyStr := deployment.Annotations[v1alpha1.DeploymentStrategyAnnotation] + if strategyStr == "" { + return strategy + } + _ = json.Unmarshal([]byte(strategyStr), &strategy) + return strategy +} + +// MarshalledDeploymentExtraStatus return a annotation value for rollouts.kruise.io/deployment-extra-status +func MarshalledDeploymentExtraStatus(updatedReady, expectedUpdated int32) string { + strategyStr, _ := json.Marshal(&v1alpha1.DeploymentExtraStatus{ + UpdatedReadyReplicas: updatedReady, + ExpectedUpdatedReplicas: expectedUpdated, + }) + return string(strategyStr) +} + +// UnmarshalledDeploymentExtraStatus decode the extra-status object for advanced deployment +// from the annotation rollouts.kruise.io/deployment-extra-status +func UnmarshalledDeploymentExtraStatus(deployment *apps.Deployment) v1alpha1.DeploymentExtraStatus { + extraStatus := v1alpha1.DeploymentExtraStatus{} + if deployment == nil { + return extraStatus + } + extraStatusStr := deployment.Annotations[v1alpha1.DeploymentExtraStatusAnnotation] + if extraStatusStr == "" { + return extraStatus + } + _ = json.Unmarshal([]byte(extraStatusStr), &extraStatus) + return extraStatus +} + +// FindCanaryAndStableReplicaSet find the canary and stable replicaset for the deployment +// - canary replicaset: the template equals to deployment's; +// - stable replicaset: an active replicaset(replicas>0) with the smallest revision. +func FindCanaryAndStableReplicaSet(rss []*apps.ReplicaSet, d *apps.Deployment) (*apps.ReplicaSet, *apps.ReplicaSet) { + // sort replicas set by revision ordinals + sort.Slice(rss, func(i, j int) bool { + revision1, err1 := strconv.Atoi(rss[i].Annotations[DeploymentRevisionAnnotation]) + revision2, err2 := strconv.Atoi(rss[j].Annotations[DeploymentRevisionAnnotation]) + if err1 != nil || err2 != nil || revision1 == revision2 { + return rss[i].CreationTimestamp.Before(&rss[j].CreationTimestamp) + } + return revision1 < revision2 + }) + + var newRS *apps.ReplicaSet + var oldRS *apps.ReplicaSet + for _, rs := range rss { + if EqualIgnoreHash(&rs.Spec.Template, &d.Spec.Template) { + newRS = rs + } else if oldRS == nil && *rs.Spec.Replicas > 0 { + oldRS = rs + } + } + return newRS, oldRS +} diff --git a/pkg/util/workloads_utils_test.go b/pkg/util/workloads_utils_test.go index f1f8ef2c..a9aca549 100644 --- a/pkg/util/workloads_utils_test.go +++ b/pkg/util/workloads_utils_test.go @@ -19,6 +19,7 @@ package util import ( "reflect" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -26,8 +27,10 @@ import ( appsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -221,3 +224,115 @@ func TestGetOwnerWorkload(t *testing.T) { }) } } + +func TestFilterCanaryAndStableReplicaSet(t *testing.T) { + RegisterFailHandler(Fail) + + const notExists = "not-exists" + createTimestamps := []time.Time{ + time.Now().Add(0 * time.Second), + time.Now().Add(1 * time.Second), + time.Now().Add(2 * time.Second), + time.Now().Add(3 * time.Second), + time.Now().Add(4 * time.Second), + time.Now().Add(5 * time.Second), + } + templateFactory := func(order int64) corev1.PodTemplateSpec { + return corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Generation: order}, + } + } + makeRS := func(name, revision string, createTime time.Time, templateOrder int64, replicas int32) *appsv1.ReplicaSet { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: createTime}, + Annotations: map[string]string{DeploymentRevisionAnnotation: revision}, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: pointer.Int32(replicas), + Template: templateFactory(templateOrder), + }, + } + } + makeD := func(name, revision string, templateOrder int64) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{DeploymentRevisionAnnotation: revision}, + }, + Spec: appsv1.DeploymentSpec{Template: templateFactory(templateOrder)}, + } + } + + cases := map[string]struct { + parameters func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) + stableName string + canaryName string + }{ + "no canary": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "1", createTimestamps[1], 1, 1), + makeRS("r1", "0", createTimestamps[0], 0, 0), + } + return rss, makeD("d", "0", 2) + }, + stableName: "r0", + canaryName: notExists, + }, + "no stable": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "0", createTimestamps[0], 0, 1), + } + return rss, makeD("d", "0", 0) + }, + stableName: notExists, + canaryName: "r0", + }, + "1 active oldRS": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "2", createTimestamps[0], 0, 1), + makeRS("r1", "3", createTimestamps[1], 1, 1), + makeRS("r1", "1", createTimestamps[3], 3, 0), + makeRS("r1", "0", createTimestamps[4], 4, 0), + } + return rss, makeD("d", "0", 1) + }, + stableName: "r0", + canaryName: "r1", + }, + "many active oldRS": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "0", createTimestamps[3], 0, 1), + makeRS("r3", "2", createTimestamps[1], 3, 1), + makeRS("r2", "3", createTimestamps[0], 2, 1), + makeRS("r1", "1", createTimestamps[2], 1, 1), + } + return rss, makeD("d", "4", 3) + }, + stableName: "r0", + canaryName: "r3", + }, + } + + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + rss, d := cs.parameters() + canary, stable := FindCanaryAndStableReplicaSet(rss, d) + if canary != nil { + Expect(canary.Name).Should(Equal(cs.canaryName)) + } else { + Expect(cs.canaryName).Should(Equal(notExists)) + } + if stable != nil { + Expect(stable.Name).Should(Equal(cs.stableName)) + } else { + Expect(cs.stableName).Should(Equal(notExists)) + } + }) + } +} diff --git a/pkg/webhook/workload/mutating/workload_update_handler.go b/pkg/webhook/workload/mutating/workload_update_handler.go index 61aa3bca..d4dc8b86 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler.go +++ b/pkg/webhook/workload/mutating/workload_update_handler.go @@ -21,7 +21,7 @@ import ( "encoding/json" "math" "net/http" - "reflect" + "strings" kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" @@ -205,13 +205,39 @@ func (h *WorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstruct func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (bool, error) { // in rollout progressing if newObj.Annotations[util.InRolloutProgressingAnnotation] != "" { - if !newObj.Spec.Paused || !reflect.DeepEqual(newObj.Spec.Strategy, oldObj.Spec.Strategy) { + modified := false + if !newObj.Spec.Paused { + modified = true newObj.Spec.Paused = true - newObj.Spec.Strategy = oldObj.Spec.Strategy - klog.Warningf("deployment(%s/%s) is in rollout progressing, and do not modify strategy", newObj.Namespace, newObj.Name) - return true, nil } - return false, nil + strategy := util.UnmarshalledDeploymentStrategy(newObj) + switch strings.ToLower(string(strategy.RollingStyle)) { + case strings.ToLower(string(appsv1alpha1.PartitionRollingStyle)): + // Make sure it is always Recreate to disable native controller + if newObj.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType { + modified = true + newObj.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType + } + if newObj.Spec.Strategy.RollingUpdate != nil { + modified = true + // Allow to modify RollingUpdate config during rolling + strategy.RollingUpdate = newObj.Spec.Strategy.RollingUpdate + newObj.Spec.Strategy.RollingUpdate = nil + } + if isEffectiveDeploymentRevisionChange(oldObj, newObj) { + modified = true + strategy.Paused = true + } + setDeploymentStrategyAnnotation(strategy, newObj) + default: + // Do not allow to modify strategy as Recreate during rolling + if newObj.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { + modified = true + newObj.Spec.Strategy = oldObj.Spec.Strategy + klog.Warningf("") + } + } + return modified, nil } // indicate whether the workload can enter the rollout process @@ -219,10 +245,7 @@ func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (boo if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 { return false, nil } - if newObj.Annotations[appsv1alpha1.RolloutIDLabel] != "" && - oldObj.Annotations[appsv1alpha1.RolloutIDLabel] == newObj.Annotations[appsv1alpha1.RolloutIDLabel] { - return false, nil - } else if newObj.Annotations[appsv1alpha1.RolloutIDLabel] == "" && util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) { + if !isEffectiveDeploymentRevisionChange(oldObj, newObj) { return false, nil } @@ -232,16 +255,30 @@ func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (boo } else if rollout == nil || rollout.Spec.Strategy.Canary == nil { return false, nil } + rss, err := h.Finder.GetReplicaSetsForDeployment(newObj) + if err != nil || len(rss) == 0 { + klog.Warningf("Cannot find any activate replicaset for deployment %s/%s, no need to rolling", newObj.Namespace, newObj.Name) + return false, nil + } // if traffic routing, workload must only be one version of Pods if len(rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 { - if rss, err := h.Finder.GetReplicaSetsForDeployment(newObj); err != nil { - return false, nil - } else if len(rss) != 1 { + if len(rss) != 1 { klog.Warningf("Because deployment(%s/%s) have multiple versions of Pods, so can not enter rollout progressing", newObj.Namespace, newObj.Name) return false, nil } } + // label the stable version replicaset + _, stableRS := util.FindCanaryAndStableReplicaSet(rss, newObj) + if stableRS == nil { + klog.Warningf("Cannot find any stable replicaset for deployment %s/%s", newObj.Namespace, newObj.Name) + } else { + if newObj.Labels == nil { + newObj.Labels = map[string]string{} + } + newObj.Labels[appsv1alpha1.DeploymentStableRevisionLabel] = stableRS.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + // need set workload paused = true newObj.Spec.Paused = true state := &util.RolloutState{RolloutName: rollout.Name} @@ -332,3 +369,19 @@ func (h *WorkloadHandler) InjectDecoder(d *admission.Decoder) error { h.Decoder = d return nil } + +func isEffectiveDeploymentRevisionChange(oldObj, newObj *apps.Deployment) bool { + if newObj.Annotations[appsv1alpha1.RolloutIDLabel] != "" && + oldObj.Annotations[appsv1alpha1.RolloutIDLabel] == newObj.Annotations[appsv1alpha1.RolloutIDLabel] { + return false + } else if newObj.Annotations[appsv1alpha1.RolloutIDLabel] == "" && + util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) { + return false + } + return true +} + +func setDeploymentStrategyAnnotation(strategy appsv1alpha1.DeploymentStrategy, d *apps.Deployment) { + strategyAnno, _ := json.Marshal(&strategy) + d.Annotations[appsv1alpha1.DeploymentStrategyAnnotation] = string(strategyAnno) +} diff --git a/pkg/webhook/workload/mutating/workload_update_handler_test.go b/pkg/webhook/workload/mutating/workload_update_handler_test.go index 610d1186..26690945 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler_test.go +++ b/pkg/webhook/workload/mutating/workload_update_handler_test.go @@ -105,6 +105,7 @@ var ( "app": "echoserver", }, }, + Replicas: pointer.Int32(5), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -490,6 +491,7 @@ func TestHandlerDeployment(t *testing.T) { } else if !cs.isError && err != nil { t.Fatalf(err.Error()) } + delete(newObj.Labels, appsv1alpha1.DeploymentStableRevisionLabel) if !reflect.DeepEqual(newObj, cs.expectObj()) { by, _ := json.Marshal(newObj) t.Fatalf("handlerDeployment failed, and new(%s)", string(by)) diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index cfa56b91..bdd78d90 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -326,6 +326,40 @@ var _ = SIGDescribe("Rollout", func() { Expect(count).Should(BeNumerically("==", expected)) } + ListReplicaSet := func(d *apps.Deployment) []*apps.ReplicaSet { + var rss []*apps.ReplicaSet + rsLister := &apps.ReplicaSetList{} + selectorOpt, _ := metav1.LabelSelectorAsSelector(d.Spec.Selector) + err := k8sClient.List(context.TODO(), rsLister, &client.ListOptions{LabelSelector: selectorOpt, Namespace: d.Namespace}) + Expect(err).NotTo(HaveOccurred()) + for i := range rsLister.Items { + rs := &rsLister.Items[i] + if !rs.DeletionTimestamp.IsZero() { + continue + } + rss = append(rss, rs) + } + return rss + } + + GetStableRSRevision := func(d *apps.Deployment) string { + rss := ListReplicaSet(d) + _, stable := util.FindCanaryAndStableReplicaSet(rss, d) + if stable != nil { + return stable.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + return "" + } + + GetCanaryRSRevision := func(d *apps.Deployment) string { + rss := ListReplicaSet(d) + canary, _ := util.FindCanaryAndStableReplicaSet(rss, d) + if canary != nil { + return canary.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + return "" + } + BeforeEach(func() { namespace = randomNamespaceName("rollout") ns := v1.Namespace{ @@ -4453,6 +4487,512 @@ var _ = SIGDescribe("Rollout", func() { CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 4) }) }) + + KruiseDescribe("Advanced Deployment canary rollout with Ingress", func() { + It("advanced deployment rolling with traffic case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + // service + service := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred()) + CreateObject(service) + // ingress + ingress := &netv1.Ingress{} + Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred()) + CreateObject(ingress) + // workload + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + By("Update cloneSet env NODE_NAME from(version1) -> to(version2)") + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + By(stableRevision) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.UnmarshalledDeploymentStrategy(workload) + extraStatus := util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check cloneSet status & paused success") + + // check rollout status + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseProgressing)) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + Expect(rollout.Status.CanaryStatus.CanaryRevision).Should(Equal(util.ComputeHash(&workload.Spec.Template, nil))) + Expect(rollout.Status.CanaryStatus.PodTemplateHash).Should(Equal(GetCanaryRSRevision(workload))) + canaryRevision := rollout.Status.CanaryStatus.PodTemplateHash + Expect(rollout.Status.CanaryStatus.CurrentStepIndex).Should(BeNumerically("==", 1)) + Expect(rollout.Status.CanaryStatus.RolloutHash).Should(Equal(rollout.Annotations[util.RolloutHashAnnotation])) + // check stable, canary service & ingress + // stable service + Expect(GetObject(service.Name, service)).NotTo(HaveOccurred()) + Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal(stableRevision)) + //canary service + cService := &v1.Service{} + Expect(GetObject(service.Name+"-canary", cService)).NotTo(HaveOccurred()) + Expect(cService.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal(canaryRevision)) + // canary ingress + cIngress := &netv1.Ingress{} + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true")) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(fmt.Sprintf("%d", *rollout.Spec.Strategy.Canary.Steps[0].Weight))) + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + // check stable, canary service & ingress + // canary ingress + cIngress = &netv1.Ingress{} + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(fmt.Sprintf("%d", *rollout.Spec.Strategy.Canary.Steps[1].Weight))) + // cloneset + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.UnmarshalledDeploymentStrategy(workload) + extraStatus = util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + // resume rollout + ResumeRolloutCanary(rollout.Name) + WaitRolloutStatusPhase(rollout.Name, v1alpha1.RolloutPhaseHealthy) + WaitDeploymentAllPodsReady(workload) + By("rollout completed, and check") + + // check service & ingress & deployment + // ingress + Expect(GetObject(ingress.Name, ingress)).NotTo(HaveOccurred()) + cIngress = &netv1.Ingress{} + Expect(GetObject(fmt.Sprintf("%s-canary", ingress.Name), cIngress)).To(HaveOccurred()) + // service + Expect(GetObject(service.Name, service)).NotTo(HaveOccurred()) + Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal("")) + cService = &v1.Service{} + Expect(GetObject(fmt.Sprintf("%s-canary", service.Name), cService)).To(HaveOccurred()) + // cloneset + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 5)) + Expect(workload.Status.AvailableReplicas).Should(BeNumerically("==", 5)) + for _, env := range workload.Spec.Template.Spec.Containers[0].Env { + if env.Name == "NODE_NAME" { + Expect(env.Value).Should(Equal("version2")) + } + } + time.Sleep(time.Second * 3) + + // check progressing succeed + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + cond := util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionProgressing) + Expect(cond.Reason).Should(Equal(v1alpha1.ProgressingReasonCompleted)) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionFalse))) + cond = util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionSucceeded) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionTrue))) + WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation) + //Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(canaryRevision)) + + // scale up replicas 5 -> 6 + workload.Spec.Replicas = utilpointer.Int32(6) + UpdateDeployment(workload) + By("Update cloneSet replicas from(5) -> to(6)") + time.Sleep(time.Second * 2) + + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation) + }) + + It("advanced deployment continuous rolling case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.UnmarshalledDeploymentStrategy(workload) + extraStatus := util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.UnmarshalledDeploymentStrategy(workload) + extraStatus = util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + By("update workload env NODE_NAME from(version2) -> to(version3)") + newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version3"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision = workload.Labels[v1alpha1.DeploymentStableRevisionLabel] + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy = util.UnmarshalledDeploymentStrategy(workload) + extraStatus = util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.UnmarshalledDeploymentStrategy(workload) + extraStatus = util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + }) + + It("advanced deployment rollback case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update cloneSet env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.UnmarshalledDeploymentStrategy(workload) + extraStatus := util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.UnmarshalledDeploymentStrategy(workload) + extraStatus = util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + By("update workload env NODE_NAME from(version2) -> to(version1)") + newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version1"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + WaitRolloutStatusPhase(rollout.Name, v1alpha1.RolloutPhaseHealthy) + WaitDeploymentAllPodsReady(workload) + }) + + It("advanced deployment delete rollout case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.UnmarshalledDeploymentStrategy(workload) + extraStatus := util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + By("delete rollout and check deployment") + k8sClient.Delete(context.TODO(), rollout) + WaitRolloutNotFound(rollout.Name) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Spec.Strategy.Type).Should(Equal(apps.RollingUpdateDeploymentStrategyType)) + Expect(workload.Spec.Paused).Should(BeFalse()) + WaitDeploymentAllPodsReady(workload) + }) + + It("advanced deployment scaling case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.UnmarshalledDeploymentStrategy(workload) + extraStatus := util.UnmarshalledDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + By("scale up workload from 5 to 10, and check") + workload.Spec.Replicas = utilpointer.Int32(10) + UpdateDeployment(workload) + Eventually(func() bool { + object := &v1alpha1.Rollout{} + Expect(GetObject(rollout.Name, object)).NotTo(HaveOccurred()) + return object.Status.CanaryStatus.CanaryReadyReplicas == 2 + }, 5*time.Minute, time.Second).Should(BeTrue()) + + By("scale down workload from 10 to 5, and check") + workload.Spec.Replicas = utilpointer.Int32(5) + UpdateDeployment(workload) + Eventually(func() bool { + object := &v1alpha1.Rollout{} + Expect(GetObject(rollout.Name, object)).NotTo(HaveOccurred()) + return object.Status.CanaryStatus.CanaryReadyReplicas == 1 + }, 5*time.Minute, time.Second).Should(BeTrue()) + + By("rolling deployment to be completed") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 2) + ResumeRolloutCanary(rollout.Name) + WaitDeploymentAllPodsReady(workload) + }) + }) }) func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar {