From 0babeee8af42d5521991802021c1eac0c1afedb9 Mon Sep 17 00:00:00 2001 From: "mingzhou.swx" Date: Wed, 4 Jan 2023 00:30:34 +0800 Subject: [PATCH] add advanced deployment controller Signed-off-by: mingzhou.swx --- .../e2e-advanced-deployment-1.19.yaml | 110 +++++ .../e2e-advanced-deployment-1.23.yaml | 110 +++++ api/v1alpha1/deployment_types.go | 12 +- config/default/manager_auth_proxy_patch.yaml | 1 + config/manager/manager.yaml | 1 + main.go | 5 + pkg/controller/deployment/controller.go | 34 +- .../deployment/deployment_controller.go | 50 +- .../deployment/deployment_controller_test.go | 216 +++++++++ pkg/controller/deployment/progress.go | 2 +- pkg/controller/deployment/rolling.go | 65 ++- pkg/controller/deployment/rolling_test.go | 449 ++++++++++++++++++ pkg/controller/deployment/sync.go | 63 +-- .../deployment/util/deployment_util.go | 109 +++-- .../deployment/util/deployment_util_test.go | 152 +++++- test/e2e/deployment_test.go | 339 +++++++++++++ test/e2e/rollout_test.go | 2 - test/e2e/test_data/deployment/deployment.yaml | 34 ++ 18 files changed, 1592 insertions(+), 162 deletions(-) create mode 100644 .github/workflows/e2e-advanced-deployment-1.19.yaml create mode 100644 .github/workflows/e2e-advanced-deployment-1.23.yaml create mode 100644 pkg/controller/deployment/deployment_controller_test.go create mode 100644 pkg/controller/deployment/rolling_test.go create mode 100644 test/e2e/deployment_test.go create mode 100644 test/e2e/test_data/deployment/deployment.yaml diff --git a/.github/workflows/e2e-advanced-deployment-1.19.yaml b/.github/workflows/e2e-advanced-deployment-1.19.yaml new file mode 100644 index 00000000..527b0018 --- /dev/null +++ b/.github/workflows/e2e-advanced-deployment-1.19.yaml @@ -0,0 +1,110 @@ +name: E2E-Advanced-Deployment-1.19 + +on: + push: + branches: + - master + - release-* + pull_request: {} + workflow_dispatch: {} + +env: + # Common versions + GO_VERSION: '1.17' + KIND_IMAGE: 'kindest/node:v1.19.16' + KIND_CLUSTER_NAME: 'ci-testing' + +jobs: + + rollout: + runs-on: ubuntu-18.04 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: ${{ env.GO_VERSION }} + - name: Setup Kind Cluster + uses: helm/kind-action@v1.2.0 + with: + node_image: ${{ env.KIND_IMAGE }} + cluster_name: ${{ env.KIND_CLUSTER_NAME }} + config: ./test/kind-conf.yaml + - name: Build image + run: | + export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}" + docker build --pull --no-cache . -t $IMAGE + kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; } + - name: Install Kruise + run: | + set -ex + kubectl cluster-info + make helm + helm repo add openkruise https://openkruise.github.io/charts/ + helm repo update + helm install kruise openkruise/kruise + for ((i=1;i<10;i++)); + do + set +e + PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l) + set -e + if [ "$PODS" -eq "2" ]; then + break + fi + sleep 3 + done + set +e + PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l) + set -e + if [ "$PODS" -eq "2" ]; then + echo "Wait for kruise-manager ready successfully" + else + echo "Timeout to wait for kruise-manager ready" + exit 1 + fi + - name: Install Kruise Rollout + run: | + set -ex + kubectl cluster-info + IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh + for ((i=1;i<10;i++)); + do + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + set -e + if [ "$PODS" -eq "1" ]; then + break + fi + sleep 3 + done + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + kubectl get node -o yaml + kubectl get all -n kruise-rollout -o yaml + set -e + if [ "$PODS" -eq "1" ]; then + echo "Wait for kruise-rollout ready successfully" + else + echo "Timeout to wait for kruise-rollout ready" + exit 1 + fi + - name: Run E2E Tests + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi + exit $retVal diff --git a/.github/workflows/e2e-advanced-deployment-1.23.yaml b/.github/workflows/e2e-advanced-deployment-1.23.yaml new file mode 100644 index 00000000..3aaa9ee3 --- /dev/null +++ b/.github/workflows/e2e-advanced-deployment-1.23.yaml @@ -0,0 +1,110 @@ +name: E2E-Advanced-Deployment-1.23 + +on: + push: + branches: + - master + - release-* + pull_request: {} + workflow_dispatch: {} + +env: + # Common versions + GO_VERSION: '1.17' + KIND_IMAGE: 'kindest/node:v1.23.3' + KIND_CLUSTER_NAME: 'ci-testing' + +jobs: + + rollout: + runs-on: ubuntu-18.04 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: ${{ env.GO_VERSION }} + - name: Setup Kind Cluster + uses: helm/kind-action@v1.2.0 + with: + node_image: ${{ env.KIND_IMAGE }} + cluster_name: ${{ env.KIND_CLUSTER_NAME }} + config: ./test/kind-conf.yaml + - name: Build image + run: | + export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}" + docker build --pull --no-cache . -t $IMAGE + kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; } + - name: Install Kruise + run: | + set -ex + kubectl cluster-info + make helm + helm repo add openkruise https://openkruise.github.io/charts/ + helm repo update + helm install kruise openkruise/kruise + for ((i=1;i<10;i++)); + do + set +e + PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l) + set -e + if [ "$PODS" -eq "2" ]; then + break + fi + sleep 3 + done + set +e + PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l) + set -e + if [ "$PODS" -eq "2" ]; then + echo "Wait for kruise-manager ready successfully" + else + echo "Timeout to wait for kruise-manager ready" + exit 1 + fi + - name: Install Kruise Rollout + run: | + set -ex + kubectl cluster-info + IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh + for ((i=1;i<10;i++)); + do + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + set -e + if [ "$PODS" -eq "1" ]; then + break + fi + sleep 3 + done + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + kubectl get node -o yaml + kubectl get all -n kruise-rollout -o yaml + set -e + if [ "$PODS" -eq "1" ]; then + echo "Wait for kruise-rollout ready successfully" + else + echo "Timeout to wait for kruise-rollout ready" + exit 1 + fi + - name: Run E2E Tests + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi + exit $retVal \ No newline at end of file diff --git a/api/v1alpha1/deployment_types.go b/api/v1alpha1/deployment_types.go index c4bda99c..b750db5e 100644 --- a/api/v1alpha1/deployment_types.go +++ b/api/v1alpha1/deployment_types.go @@ -31,16 +31,14 @@ type DeploymentStrategy struct { type RollingStyleType string const ( - // PartitionRollingStyleType means rolling in batches just like CloneSet, and will NOT create any extra Deployment; - PartitionRollingStyleType RollingStyleType = "Partition" - // CanaryRollingStyleType means rolling in canary way, and will create a canary Deployment. - CanaryRollingStyleType RollingStyleType = "Canary" + // PartitionRollingStyle means rolling in batches just like CloneSet, and will NOT create any extra Deployment; + PartitionRollingStyle RollingStyleType = "Partition" + // CanaryRollingStyle means rolling in canary way, and will create a canary Deployment. + CanaryRollingStyle RollingStyleType = "Canary" ) // DeploymentExtraStatus is extra status field for Advanced Deployment type DeploymentExtraStatus struct { - // ObservedGeneration record the generation of deployment this status observed. - ObservedGeneration int64 `json:"observedGeneration,omitempty"` // UpdatedReadyReplicas the number of pods that has been updated and ready. UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"` // ExpectedUpdatedReplicas is an absolute number calculated based on Partition @@ -52,7 +50,7 @@ type DeploymentExtraStatus struct { } func SetDefaultDeploymentStrategy(strategy *DeploymentStrategy) { - if strategy.RollingStyle == CanaryRollingStyleType { + if strategy.RollingStyle == CanaryRollingStyle { return } if strategy.RollingUpdate == nil { diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index 542638cd..aa9b95f5 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -14,4 +14,5 @@ spec: - "--health-probe-bind-address=:8081" - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" + - "--feature-gates=AdvancedDeployment=true" - "--v=3" diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index b8bdeb53..f8ae707a 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -29,6 +29,7 @@ spec: - /manager args: - --leader-elect + - --feature-gates=AdvancedDeployment=true image: controller:latest name: manager securityContext: diff --git a/main.go b/main.go index 116d2e60..73b6ce3b 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" br "github.com/openkruise/rollouts/pkg/controller/batchrelease" + "github.com/openkruise/rollouts/pkg/controller/deployment" "github.com/openkruise/rollouts/pkg/controller/rollout" "github.com/openkruise/rollouts/pkg/controller/rollouthistory" utilclient "github.com/openkruise/rollouts/pkg/util/client" @@ -116,6 +117,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "rollouthistory") os.Exit(1) } + if err = deployment.Add(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "advanceddeployment") + os.Exit(1) + } //+kubebuilder:scaffold:builder setupLog.Info("setup webhook") diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index a1bf24d5..53ec156a 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -22,14 +22,15 @@ import ( "encoding/json" "flag" "reflect" + "time" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" - corelisters "k8s.io/client-go/listers/core/v1" toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -51,9 +52,13 @@ import ( ) func init() { - flag.IntVar(&concurrentReconciles, "deployment-workers", concurrentReconciles, "Max concurrent workers for StatefulSet controller.") + flag.IntVar(&concurrentReconciles, "deployment-workers", concurrentReconciles, "Max concurrent workers for advanced deployment controller.") } +const ( + DefaultRetryDuration = 2 * time.Second +) + var ( concurrentReconciles = 3 ) @@ -75,10 +80,6 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { cacher := mgr.GetCache() - podInformer, err := cacher.GetInformerForKind(context.TODO(), v1.SchemeGroupVersion.WithKind("Pod")) - if err != nil { - return nil, err - } dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment")) if err != nil { return nil, err @@ -91,7 +92,6 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Lister dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer()) rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer()) - podLister := corelisters.NewPodLister(podInformer.(toolscache.SharedIndexInformer).GetIndexer()) // Client & Recorder genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller") @@ -107,7 +107,6 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { eventRecorder: recorder, dLister: dLister, rsLister: rsLister, - podLister: podLister, } return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil } @@ -179,8 +178,22 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req return reconcile.Result{}, nil } + errList := field.ErrorList{} err = dc.syncDeployment(context.Background(), deployment) - return ctrl.Result{}, err + if err != nil { + errList = append(errList, field.InternalError(field.NewPath("syncDeployment"), err)) + } + err = dc.patchExtraStatus(deployment) + if err != nil { + errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err)) + } + err = deploymentutil.DeploymentRolloutSatisfied(deployment, dc.strategy.Partition) + if err != nil { + klog.V(3).Infof("Deployment %v is still rolling: %v", klog.KObj(deployment), err) + return reconcile.Result{RequeueAfter: DefaultRetryDuration}, errList.ToAggregate() + } + + return ctrl.Result{}, errList.ToAggregate() } type controllerFactory DeploymentController @@ -201,7 +214,7 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy } // We do NOT process such deployment with canary rolling style - if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyleType { + if strategy.RollingStyle == rolloutsv1alpha1.CanaryRollingStyle { return nil } @@ -214,7 +227,6 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy eventRecorder: f.eventRecorder, dLister: f.dLister, rsLister: f.rsLister, - podLister: f.podLister, strategy: strategy, } } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b075fd61..97242419 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" - corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -66,8 +65,6 @@ type DeploymentController struct { dLister appslisters.DeploymentLister // rsLister can list/get replica sets from the shared informer's store rsLister appslisters.ReplicaSetLister - // podLister can list/get pods from the shared informer's store - podLister corelisters.PodLister // we will use this strategy to replace spec.strategy of deployment strategy rolloutsv1alpha1.DeploymentStrategy @@ -88,7 +85,7 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, // syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. -func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) (err error) { +func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *apps.Deployment) error { startTime := time.Now() klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KObj(deployment), "startTime", startTime) defer func() { @@ -106,64 +103,50 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } - return + return nil } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { - return + return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(ctx, d, rsList) } - defer func() { - err = dc.updateExtraStatus(deployment, rsList) - }() - - // Update deployment conditions with an Unknown condition when pausing/resuming - // a deployment. In this way, we can be sure that we won't timeout when a user - // resumes a Deployment with a set progressDeadlineSeconds. - if err = dc.checkPausedConditions(ctx, d); err != nil { - return - } - - if d.Spec.Paused { - err = dc.sync(ctx, d, rsList) - return + if dc.strategy.Paused { + return dc.sync(ctx, d, rsList) } scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) if err != nil { - return + return err } if scalingEvent { - err = dc.sync(ctx, d, rsList) - return + return dc.sync(ctx, d, rsList) } - err = dc.rolloutRolling(ctx, d, rsList) - return + return dc.rolloutRolling(ctx, d, rsList) } -// updateExtraStatus will update extra status for advancedStatus -func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, rsList []*apps.ReplicaSet) error { - newRS, _, err := dc.getAllReplicaSetsAndSyncRevision(context.TODO(), deployment, rsList, false) +// patchExtraStatus will update extra status for advancedStatus +func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error { + rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment) if err != nil { return err } updatedReadyReplicas := int32(0) + newRS := deploymentutil.FindNewReplicaSet(deployment, rsList) if newRS != nil { updatedReadyReplicas = newRS.Status.ReadyReplicas } extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ - ObservedGeneration: deployment.Generation, UpdatedReadyReplicas: updatedReadyReplicas, ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment), } @@ -179,8 +162,11 @@ func (dc *DeploymentController) updateExtraStatus(deployment *apps.Deployment, r return nil // no need to update } - extraStatusAnno = strings.Replace(extraStatusAnno, `"`, `\"`, -1) - body := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, rolloutsv1alpha1.DeploymentExtraStatusAnnotation, extraStatusAnno)) - _, err = dc.client.AppsV1().Deployments(deployment.Namespace).Patch(context.TODO(), deployment.Name, types.MergePatchType, body, metav1.PatchOptions{}) + body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, + rolloutsv1alpha1.DeploymentExtraStatusAnnotation, + strings.Replace(extraStatusAnno, `"`, `\"`, -1)) + + _, err = dc.client.AppsV1().Deployments(deployment.Namespace). + Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{}) return err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go new file mode 100644 index 00000000..2a8b0319 --- /dev/null +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -0,0 +1,216 @@ +package deployment + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + apps "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + appslisters "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + "github.com/openkruise/rollouts/pkg/controller/deployment/util" +) + +func TestSyncDeployment(t *testing.T) { + tests := map[string]struct { + oldRSsReplicas []int32 + newRSReplicas int32 + dReplicas int32 + oldRSsAvailable []int32 + newRSAvailable int32 + dAvailable int32 + partition intstrutil.IntOrString + maxSurge intstrutil.IntOrString + maxUnavailable intstrutil.IntOrString + expectOldReplicas int32 + expectNewReplicas int32 + }{ + "rolling new: surge new first, limited by maxSurge": { + []int32{6, 4}, 0, 10, + []int32{0, 0}, 0, 0, + intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("50%"), + 10, 2, + }, + "rolling new: surge new first, limited by partition": { + []int32{6, 4}, 0, 10, + []int32{0, 0}, 0, 0, + intstrutil.FromInt(4), intstrutil.FromInt(10), intstrutil.FromString("50%"), + 10, 4, + }, + "rolling old: limited by maxUnavailable": { + []int32{6, 4}, 2, 10, + []int32{6, 4}, 0, 10, + intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("20%"), + 8, 2, + }, + "rolling new: limited by partition": { + []int32{6, 2}, 2, 10, + []int32{6, 2}, 0, 8, + intstrutil.FromInt(3), intstrutil.FromInt(5), intstrutil.FromString("40%"), + 8, 3, + }, + "rolling new: scaling down old first, limited by partition": { + []int32{6, 4}, 0, 10, + []int32{6, 4}, 0, 10, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"), + 7, 0, + }, + "rolling new: scaling down old first, limited by maxUnavailable": { + []int32{6, 4}, 0, 10, + []int32{6, 4}, 0, 10, + intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("20%"), + 8, 0, + }, + "no op: partition satisfied, maxSurge>0, new pod unavailable": { + []int32{3, 4}, 3, 10, + []int32{3, 4}, 0, 7, + intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("30%"), + 7, 3, + }, + "no op: partition satisfied, maxSurge>0, new pod available": { + []int32{3, 4}, 3, 10, + []int32{3, 4}, 3, 10, + intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("30%"), + 7, 3, + }, + "rolling old: scale down old to satisfied replicas": { + []int32{3}, 3, 5, + []int32{3}, 3, 6, + intstrutil.FromInt(3), intstrutil.FromInt(2), intstrutil.FromString("25%"), + 2, 3, + }, + "scale up: scale down old first": { + []int32{4}, 0, 5, + []int32{4}, 0, 4, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromInt(1), + 5, 0, + }, + "scale up": { + []int32{5}, 5, 20, + []int32{5}, 5, 10, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("30%"), + 10, 10, + }, + "scale down": { + []int32{12}, 8, 10, + []int32{12}, 8, 20, + intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("30%"), + 6, 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fakeRecord := record.NewFakeRecorder(10) + informers := informers.NewSharedInformerFactory(fakeClient, 0) + rsInformer := informers.Apps().V1().ReplicaSets().Informer() + dInformer := informers.Apps().V1().Deployments().Informer() + + var deployment apps.Deployment + var newRS apps.ReplicaSet + { + deployment = generateDeployment("busybox") + deployment.Spec.Replicas = pointer.Int32(test.dReplicas) + deployment.Status.ReadyReplicas = test.newRSReplicas + availableReplicas := test.newRSAvailable + for _, available := range test.oldRSsAvailable { + availableReplicas += available + } + deployment.Status.UpdatedReplicas = test.newRSReplicas + deployment.Status.Replicas = availableReplicas + deployment.Status.AvailableReplicas = availableReplicas + dInformer.GetIndexer().Add(&deployment) + _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + { + for index, replicas := range test.oldRSsReplicas { + rs := generateRS(deployment) + rs.SetName(fmt.Sprintf("rs-%d", index)) + rs.Spec.Replicas = pointer.Int32(replicas) + rs.Status.Replicas = replicas + if strings.HasPrefix(name, "scale") { + rs.Annotations = map[string]string{ + util.DesiredReplicasAnnotation: strconv.Itoa(-1), + util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)), + } + } + rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index) + rs.Status.ReadyReplicas = test.oldRSsAvailable[index] + rs.Status.AvailableReplicas = test.oldRSsAvailable[index] + rsInformer.GetIndexer().Add(&rs) + _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + } + { + newRS = generateRS(deployment) + newRS.SetName("rs-new") + newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas) + if strings.HasPrefix(name, "scale") { + newRS.Annotations = map[string]string{ + util.DesiredReplicasAnnotation: strconv.Itoa(-1), + util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)), + } + } + newRS.Status.Replicas = test.newRSReplicas + newRS.Status.ReadyReplicas = test.newRSAvailable + newRS.Status.AvailableReplicas = test.newRSAvailable + rsInformer.GetIndexer().Add(&newRS) + _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + + dc := &DeploymentController{ + client: fakeClient, + eventRecorder: fakeRecord, + dLister: appslisters.NewDeploymentLister(dInformer.GetIndexer()), + rsLister: appslisters.NewReplicaSetLister(rsInformer.GetIndexer()), + strategy: rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxSurge: &test.maxSurge, + MaxUnavailable: &test.maxUnavailable, + }, + Partition: test.partition, + }, + } + + err := dc.syncDeployment(context.TODO(), &deployment) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + resultOld := int32(0) + resultNew := int32(0) + for _, rs := range rss.Items { + if rs.GetName() != "rs-new" { + resultOld += *rs.Spec.Replicas + } else { + resultNew = *rs.Spec.Replicas + } + } + if resultOld != test.expectOldReplicas || resultNew != test.expectNewReplicas { + t.Fatalf("expect new %d, but got new %d; expect old %d, but got old %d ", test.expectNewReplicas, resultNew, test.expectOldReplicas, resultOld) + } + }) + } +} diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 6788f25c..a62e3807 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -35,7 +35,7 @@ import ( // for example a resync of the deployment after it was scaled up. In those cases, // we shouldn't try to estimate any progress. func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { - newStatus := calculateStatus(allRSs, newRS, d) + newStatus := calculateStatus(allRSs, newRS, d, &dc.strategy) // If there is no progressDeadlineSeconds set, remove any Progressing condition. if !util.HasProgressDeadline(d) { diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 84f50f65..d6e16ac6 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -22,6 +22,7 @@ import ( "sort" apps "k8s.io/api/apps/v1" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/integer" @@ -56,12 +57,6 @@ func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Depl return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } - if deploymentutil.DeploymentComplete(d, &d.Status) { - if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { - return err - } - } - // Sync deployment status return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } @@ -76,7 +71,7 @@ func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allR scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment) return scaled, err } - newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) + newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS, &dc.strategy) if err != nil { return false, err } @@ -93,7 +88,14 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas) - maxUnavailable := deploymentutil.MaxUnavailable(*deployment) + maxUnavailable := deploymentutil.MaxUnavailable(deployment, &dc.strategy) + + // Old RSes should obey the limitation of partition. + ScaleDownOldLimit := ScaleDownLimitForOld(oldRSs, newRS, deployment, dc.strategy.Partition) + if ScaleDownOldLimit <= 0 { + // Old replica sets do not satisfied as partition expectation, scale up. + return dc.scaleUpOldReplicaSets(ctx, oldRSs, -ScaleDownOldLimit, deployment) + } // Check if we can scale down. We can scale down in the following 2 cases: // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further @@ -128,6 +130,8 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all minAvailable := *(deployment.Spec.Replicas) - maxUnavailable newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount + // But, do not exceed the number of the desired partition. + maxScaledDown = integer.Int32Min(maxScaledDown, ScaleDownOldLimit) if maxScaledDown <= 0 { return false, nil } @@ -191,7 +195,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, ol // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". // Need check maxUnavailable to ensure availability func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) { - maxUnavailable := deploymentutil.MaxUnavailable(*deployment) + maxUnavailable := deploymentutil.MaxUnavailable(deployment, &dc.strategy) // Check if we can scale down. minAvailable := *(deployment.Spec.Replicas) - maxUnavailable @@ -203,10 +207,15 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx cont } klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name) - sort.Sort(deploymentutil.ReplicaSetsByCreationTimestamp(oldRSs)) + // We expected scaled down the middle revision firstly. + sort.Sort(deploymentutil.ReplicaSetsBySmallerRevision(oldRSs)) totalScaledDown := int32(0) totalScaleDownCount := availablePodCount - minAvailable + newRS := deploymentutil.FindNewReplicaSet(deployment, allRSs) + // Old RSes should obey the limitation of partition. + ScaleDownOldLimit := ScaleDownLimitForOld(oldRSs, newRS, deployment, dc.strategy.Partition) + totalScaleDownCount = integer.Int32Min(totalScaleDownCount, ScaleDownOldLimit) for _, targetRS := range oldRSs { if totalScaledDown >= totalScaleDownCount { // No further scaling required. @@ -232,3 +241,39 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx cont return totalScaledDown, nil } + +// scaleUpOldReplicaSets is different from native deployment: consider partition limitation. +func (dc *DeploymentController) scaleUpOldReplicaSets(ctx context.Context, oldRSs []*apps.ReplicaSet, scaledUpCount int32, deployment *apps.Deployment) (bool, error) { + if scaledUpCount <= 0 || len(oldRSs) == 0 { + return false, nil + } + // Scale up the biggest one or older. + sort.Sort(deploymentutil.ReplicaSetsBySizeOlder(oldRSs)) + newScale := (*oldRSs[0].Spec.Replicas) + scaledUpCount + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, oldRSs[0], newScale, deployment) + return scaled, err +} + +// ScaleDownLimitForOld return the limitation of old replica sets under the partition settings. +func ScaleDownLimitForOld(oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment, partition intstrutil.IntOrString) int32 { + newRSUpdateLimit := deploymentutil.NewRSReplicasLimit(partition, deployment) + // Expected replicas of the new replica set under the partition settings. + newRSDesiredCount := integer.Int32Max(newRSUpdateLimit, *newRS.Spec.Replicas) + // Expected total replicas for old replica sets. + oldRSDesiredCount := *(deployment.Spec.Replicas) - newRSDesiredCount + // Actual total replicas for old replica sets. + oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) + // oldRSDesiredDiff is the gap between the reality and the desired. + scaleDownOldLimit := oldPodsCount - oldRSDesiredCount + + klog.V(4).InfoS("Calculate scale down limit for ", + "Deployment", klog.KObj(deployment), + // About the new replica set + "Replicas(New)", *(newRS.Spec.Replicas), "DesiredReplicas(New)", newRSDesiredCount, + // About the old replica sets + "ReplicaS(Old)", oldPodsCount, "DesiredReplicas(Old)", oldRSDesiredCount, "ScaleDownLimit(Old)", scaleDownOldLimit, + // About the deployment + "Replicas(Deployment)", *(deployment.Spec.Replicas), "Partition(Deployment)", newRSUpdateLimit) + + return scaleDownOldLimit +} diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go new file mode 100644 index 00000000..621fb007 --- /dev/null +++ b/pkg/controller/deployment/rolling_test.go @@ -0,0 +1,449 @@ +package deployment + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "testing" + + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" +) + +func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference { + isController := true + return &metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: d.GetName(), + UID: d.GetUID(), + Controller: &isController, + } +} + +// generateRS creates a replica set, with the input deployment's template as its template +func generateRS(deployment apps.Deployment) apps.ReplicaSet { + template := deployment.Spec.Template.DeepCopy() + return apps.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + UID: randomUID(), + Name: randomName("replicaset"), + Labels: template.Labels, + OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)}, + }, + Spec: apps.ReplicaSetSpec{ + Replicas: new(int32), + Template: *template, + Selector: &metav1.LabelSelector{MatchLabels: template.Labels}, + }, + } +} + +func randomUID() types.UID { + return types.UID(strconv.FormatInt(rand.Int63(), 10)) +} + +func randomName(prefix string) string { + return fmt.Sprintf("%s-%s", prefix, strconv.FormatInt(5, 10)) +} + +// generateDeployment creates a deployment, with the input image as its template +func generateDeployment(image string) apps.Deployment { + podLabels := map[string]string{"name": image} + terminationSec := int64(30) + enableServiceLinks := v1.DefaultEnableServiceLinks + return apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: image, + Annotations: make(map[string]string), + }, + Spec: apps.DeploymentSpec{ + Replicas: func(i int32) *int32 { return &i }(1), + Selector: &metav1.LabelSelector{MatchLabels: podLabels}, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: image, + Image: image, + ImagePullPolicy: v1.PullAlways, + TerminationMessagePath: v1.TerminationMessagePathDefault, + }, + }, + DNSPolicy: v1.DNSClusterFirst, + TerminationGracePeriodSeconds: &terminationSec, + RestartPolicy: v1.RestartPolicyAlways, + SecurityContext: &v1.PodSecurityContext{}, + EnableServiceLinks: &enableServiceLinks, + }, + }, + }, + } +} + +func TestScaleDownLimitForOld(t *testing.T) { + tDeployment := apps.Deployment{Spec: apps.DeploymentSpec{Replicas: pointer.Int32(10)}} + tReplicaSet := &apps.ReplicaSet{Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(5)}} + tOldRSes := func() []*apps.ReplicaSet { + return []*apps.ReplicaSet{ + {Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}}, + {Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}}, + {Spec: apps.ReplicaSetSpec{Replicas: pointer.Int32(2)}}, + } + } + tests := map[string]struct { + deployment func() *apps.Deployment + oldRSes func() []*apps.ReplicaSet + newRS func() *apps.ReplicaSet + partition intstrutil.IntOrString + expect int32 + }{ + "ScaleDownLimit > 0": { + deployment: func() *apps.Deployment { + return tDeployment.DeepCopy() + }, + oldRSes: func() []*apps.ReplicaSet { + return tOldRSes() + }, + newRS: func() *apps.ReplicaSet { + return tReplicaSet.DeepCopy() + }, + partition: intstrutil.FromInt(5), + expect: 1, + }, + "ScaleDownLimit = 0": { + deployment: func() *apps.Deployment { + return tDeployment.DeepCopy() + }, + oldRSes: func() []*apps.ReplicaSet { + return tOldRSes() + }, + newRS: func() *apps.ReplicaSet { + newRS := tReplicaSet.DeepCopy() + newRS.Spec.Replicas = pointer.Int32(4) + return newRS + }, + partition: intstrutil.FromInt(4), + expect: 0, + }, + "ScaleDownLimit < 0": { + deployment: func() *apps.Deployment { + return tDeployment.DeepCopy() + }, + oldRSes: func() []*apps.ReplicaSet { + return tOldRSes() + }, + newRS: func() *apps.ReplicaSet { + newRS := tReplicaSet.DeepCopy() + newRS.Spec.Replicas = pointer.Int32(2) + return newRS + }, + partition: intstrutil.FromInt(2), + expect: -2, + }, + "newRS replicas > partition": { + deployment: func() *apps.Deployment { + return tDeployment.DeepCopy() + }, + oldRSes: func() []*apps.ReplicaSet { + return tOldRSes() + }, + newRS: func() *apps.ReplicaSet { + return tReplicaSet.DeepCopy() + }, + partition: intstrutil.FromInt(2), + expect: 1, + }, + "newRS replicas < partition": { + deployment: func() *apps.Deployment { + return tDeployment.DeepCopy() + }, + oldRSes: func() []*apps.ReplicaSet { + return tOldRSes() + }, + newRS: func() *apps.ReplicaSet { + newRS := tReplicaSet.DeepCopy() + newRS.Spec.Replicas = pointer.Int32(2) + return newRS + }, + partition: intstrutil.FromInt(5), + expect: 1, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + result := ScaleDownLimitForOld(test.oldRSes(), test.newRS(), test.deployment(), test.partition) + if result != test.expect { + t.Fatalf("expect %d, but got %d", test.expect, result) + } + }) + } +} + +func TestReconcileNewReplicaSet(t *testing.T) { + tests := map[string]struct { + oldRSs []int32 + newRS int32 + deployment int32 + partition intstrutil.IntOrString + maxSurge intstrutil.IntOrString + expect int32 + }{ + "limited by partition": { + []int32{2, 3}, 3, 10, + intstrutil.FromInt(4), intstrutil.FromInt(0), 4, + }, + "limited by deployment replicas": { + []int32{2, 3}, 3, 10, + intstrutil.FromInt(10), intstrutil.FromInt(0), 5, + }, + "surge first": { + []int32{10}, 0, 10, + intstrutil.FromInt(3), intstrutil.FromInt(2), 2, + }, + "surge first, but limited by partition": { + []int32{10}, 0, 10, + intstrutil.FromInt(2), intstrutil.FromInt(3), 2, + }, + "partition satisfied, no scale": { + []int32{7}, 3, 10, + intstrutil.FromInt(3), intstrutil.FromInt(3), 3, + }, + "partition satisfied, no scale even though deployment replicas not reach": { + []int32{5}, 3, 10, + intstrutil.FromInt(3), intstrutil.FromInt(3), 3, + }, + "new replica set has been greater than partition, no scale down": { + []int32{7}, 3, 10, + intstrutil.FromInt(1), intstrutil.FromInt(3), 3, + }, + "total replicas are more than deployment desired, no scale down": { + []int32{7}, 3, 10, + intstrutil.FromInt(6), intstrutil.FromInt(0), 3, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fakeRecord := record.NewFakeRecorder(10) + dc := &DeploymentController{ + client: fakeClient, + eventRecorder: fakeRecord, + strategy: rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxSurge: &test.maxSurge, + }, + Partition: test.partition, + }, + } + + var deployment apps.Deployment + var newRS apps.ReplicaSet + var allRSs []*apps.ReplicaSet + { + deployment = generateDeployment("busybox") + deployment.Spec.Replicas = pointer.Int32(test.deployment) + _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + { + for index, replicas := range test.oldRSs { + rs := generateRS(deployment) + rs.SetName(fmt.Sprintf("rs-%d", index)) + rs.Spec.Replicas = pointer.Int32(replicas) + allRSs = append(allRSs, &rs) + _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + } + { + newRS = generateRS(deployment) + newRS.SetName("rs-new") + newRS.Spec.Replicas = pointer.Int32(test.newRS) + allRSs = append(allRSs, &newRS) + _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + _, err := dc.reconcileNewReplicaSet(context.TODO(), allRSs, &newRS, &deployment) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + result, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).Get(context.TODO(), newRS.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + if *result.Spec.Replicas != test.expect { + t.Fatalf("expect %d, but got %d", test.expect, *result.Spec.Replicas) + } + }) + } +} + +func TestReconcileOldReplicaSet(t *testing.T) { + tests := map[string]struct { + oldRSsReplicas []int32 + newRSReplicas int32 + dReplicas int32 + oldRSsAvailable []int32 + newRSAvailable int32 + dAvailable int32 + partition intstrutil.IntOrString + maxSurge intstrutil.IntOrString + maxUnavailable intstrutil.IntOrString + expectOldReplicas int32 + }{ + "scale down: all unavailable, limited by partition": { + []int32{6, 4}, 0, 10, + []int32{0, 0}, 0, 0, + intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("50%"), + 6, + }, + "scale down: all available, limited by maxUnavailable": { + []int32{6, 4}, 0, 10, + []int32{6, 4}, 0, 10, + intstrutil.FromInt(4), intstrutil.FromInt(2), intstrutil.FromString("20%"), + 8, + }, + "scale down: scale unavailable first, then scale available, limited by maxUnavailable": { + []int32{6, 4}, 0, 10, + []int32{6, 2}, 0, 8, + intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("40%"), + 6, + }, + "scale down: scale unavailable first, then scale available, limited by partition": { + []int32{6, 4}, 0, 10, + []int32{6, 2}, 0, 8, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"), + 7, + }, + "no op: newRS replicas more than partition, limited by newRS replicas": { + []int32{0, 5}, 5, 10, + []int32{0, 5}, 5, 10, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("40%"), + 5, + }, + "no op: limited by unavailable newRS": { + []int32{3, 4}, 3, 10, + []int32{3, 4}, 0, 7, + intstrutil.FromInt(5), intstrutil.FromInt(0), intstrutil.FromString("30%"), + 7, + }, + "scale up oldRS": { + []int32{3, 0}, 3, 10, + []int32{3, 0}, 3, 6, + intstrutil.FromInt(3), intstrutil.FromInt(0), intstrutil.FromString("30%"), + 7, + }, + "scale down oldRS": { + []int32{3}, 3, 5, + []int32{3}, 3, 6, + intstrutil.FromString("60%"), intstrutil.FromString("25%"), intstrutil.FromString("25%"), + 2, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fakeRecord := record.NewFakeRecorder(10) + dc := &DeploymentController{ + client: fakeClient, + eventRecorder: fakeRecord, + strategy: rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxSurge: &test.maxSurge, + MaxUnavailable: &test.maxUnavailable, + }, + Partition: test.partition, + }, + } + + var deployment apps.Deployment + var newRS apps.ReplicaSet + var allRSs []*apps.ReplicaSet + var oldRSs []*apps.ReplicaSet + { + deployment = generateDeployment("busybox:latest") + deployment.Spec.Replicas = pointer.Int32(test.dReplicas) + deployment.Status.ReadyReplicas = test.newRSReplicas + availableReplicas := test.newRSAvailable + for _, available := range test.oldRSsAvailable { + availableReplicas += available + } + deployment.Status.UpdatedReplicas = test.newRSReplicas + deployment.Status.Replicas = availableReplicas + deployment.Status.AvailableReplicas = availableReplicas + _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + { + for index, replicas := range test.oldRSsReplicas { + rs := generateRS(deployment) + rs.SetName(fmt.Sprintf("rs-%d", index)) + rs.Spec.Replicas = pointer.Int32(replicas) + rs.Status.Replicas = replicas + rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index) + rs.Status.ReadyReplicas = test.oldRSsAvailable[index] + rs.Status.AvailableReplicas = test.oldRSsAvailable[index] + allRSs = append(allRSs, &rs) + oldRSs = append(oldRSs, &rs) + _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + } + { + newRS = generateRS(deployment) + newRS.SetName("rs-new") + newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas) + newRS.Status.Replicas = test.newRSReplicas + newRS.Status.ReadyReplicas = test.newRSAvailable + newRS.Status.AvailableReplicas = test.newRSAvailable + allRSs = append(allRSs, &newRS) + _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + _, err := dc.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, &newRS, &deployment) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + result := int32(0) + for _, rs := range rss.Items { + if rs.GetName() != "rs-new" { + result += *rs.Spec.Replicas + } + } + if result != test.expectOldReplicas { + t.Fatalf("expect %d, but got %d", test.expectOldReplicas, result) + } + }) + } +} diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 9b8ae6ca..314b9af4 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -28,7 +28,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "k8s.io/utils/integer" + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" "github.com/openkruise/rollouts/pkg/util" labelsutil "github.com/openkruise/rollouts/pkg/util/labels" @@ -62,40 +64,6 @@ func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rs return dc.syncDeploymentStatus(ctx, allRSs, newRS, d) } -// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. -// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments -// that were paused for longer than progressDeadlineSeconds. -func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error { - if !deploymentutil.HasProgressDeadline(d) { - return nil - } - cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) - if cond != nil && cond.Reason == deploymentutil.TimedOutReason { - // If we have reported lack of progress, do not overwrite it with a paused condition. - return nil - } - pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason - - needsUpdate := false - if d.Spec.Paused && !pausedCondExists { - condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused") - deploymentutil.SetDeploymentCondition(&d.Status, *condition) - needsUpdate = true - } else if !d.Spec.Paused && pausedCondExists { - condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed") - deploymentutil.SetDeploymentCondition(&d.Status, *condition) - needsUpdate = true - } - - if !needsUpdate { - return nil - } - - var err error - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) - return err -} - // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. // // rsList should come from getReplicaSetsForDeployment(d). @@ -145,7 +113,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De rsCopy := existingNewRS.DeepCopy() // Set existing new replica set's annotation - annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true, maxRevHistoryLengthInChars) + annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds @@ -202,14 +170,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De }, } allRSs := append(oldRSs, &newRS) - newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS) + newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS, &dc.strategy) if err != nil { return nil, err } - *(newRS.Spec.Replicas) = newReplicasCount + // We ensure that newReplicasLowerBound is greater than 0 unless deployment is 0, + // this is because if we set new replicas as 0, the native deployment controller + // will flight with ours. + newReplicasLowerBound := deploymentutil.NewRSReplicasLowerBound(d, &dc.strategy) + + *(newRS.Spec.Replicas) = integer.Int32Max(newReplicasCount, newReplicasLowerBound) // Set new replica set's annotation - deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false, maxRevHistoryLengthInChars) + deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, &dc.strategy, newRevision, false, maxRevHistoryLengthInChars) // Create the new ReplicaSet. If it already exists, then we need to check for possible // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. @@ -320,7 +293,7 @@ func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Depl allowedSize := int32(0) if *(deployment.Spec.Replicas) > 0 { - allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment) + allowedSize = *(deployment.Spec.Replicas) } // Number of additional replicas that can be either added or removed from the total @@ -355,7 +328,7 @@ func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Depl // Estimate proportions if we have replicas to add, otherwise simply populate // nameToSize with the current sizes for each replica set. if deploymentReplicasToAdd != 0 { - proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + proportion := deploymentutil.GetProportion(rs, *deployment, &dc.strategy, deploymentReplicasToAdd, deploymentReplicasAdded) nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion deploymentReplicasAdded += proportion @@ -406,7 +379,7 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale - annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) + annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) scaled := false var err error @@ -414,7 +387,7 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re oldScale := *(rs.Spec.Replicas) rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale - deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) + deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true @@ -465,7 +438,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { - newStatus := calculateStatus(allRSs, newRS, d) + newStatus := calculateStatus(allRSs, newRS, d, &dc.strategy) if reflect.DeepEqual(d.Status, newStatus) { return nil @@ -478,7 +451,7 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs } // calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets. -func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus { +func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) apps.DeploymentStatus { availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) unavailableReplicas := totalReplicas - availableReplicas @@ -505,7 +478,7 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme status.Conditions = append(status.Conditions, conditions[i]) } - if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) { + if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(deployment, strategy) { minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") deploymentutil.SetDeploymentCondition(&status, *minAvailability) } else { diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 693c30b2..0b904d52 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -35,6 +35,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/integer" + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/util" ) @@ -200,7 +201,7 @@ func Revision(obj runtime.Object) (int64, error) { // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // copying required deployment annotations to it; it returns true if replica set's annotation is changed. -func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool, revHistoryLimitInChars int) bool { +func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, strategy *rolloutsv1alpha1.DeploymentStrategy, newRevision string, exists bool, revHistoryLimitInChars int) bool { // First, copy deployment's annotations (except for apply and revision annotations) annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) // Then, update replica set's revision annotation @@ -256,7 +257,7 @@ func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.Replic } } // If the new replica set is about to be created, we need to add replica annotations to it. - if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(*deployment)) { + if !exists && SetReplicasAnnotations(newRS, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+MaxSurge(deployment, strategy)) { annotationChanged = true } return annotationChanged @@ -382,12 +383,12 @@ func ReplicasAnnotationsNeedUpdate(rs *apps.ReplicaSet, desiredReplicas, maxRepl } // MaxUnavailable returns the maximum unavailable pods a rolling deployment can take. -func MaxUnavailable(deployment apps.Deployment) int32 { - if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 { +func MaxUnavailable(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 { + if strategy == nil || strategy.RollingUpdate == nil || *(deployment.Spec.Replicas) == 0 { return int32(0) } // Error caught by validation - _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) + _, maxUnavailable, _ := ResolveFenceposts(strategy.RollingUpdate.MaxSurge, strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) if maxUnavailable > *deployment.Spec.Replicas { return *deployment.Spec.Replicas } @@ -395,32 +396,32 @@ func MaxUnavailable(deployment apps.Deployment) int32 { } // MinAvailable returns the minimum available pods of a given deployment -func MinAvailable(deployment *apps.Deployment) int32 { - if !IsRollingUpdate(deployment) { +func MinAvailable(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 { + if strategy == nil || strategy.RollingUpdate == nil { return int32(0) } - return *(deployment.Spec.Replicas) - MaxUnavailable(*deployment) + return *(deployment.Spec.Replicas) - MaxUnavailable(deployment, strategy) } // MaxSurge returns the maximum surge pods a rolling deployment can take. -func MaxSurge(deployment apps.Deployment) int32 { - if !IsRollingUpdate(&deployment) { +func MaxSurge(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 { + if strategy == nil || strategy.RollingUpdate == nil { return int32(0) } // Error caught by validation - maxSurge, _, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) + maxSurge, _, _ := ResolveFenceposts(strategy.RollingUpdate.MaxSurge, strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) return maxSurge } // GetProportion will estimate the proportion for the provided replica set using 1. the current size // of the parent deployment, 2. the replica count that needs be added on the replica sets of the // deployment, and 3. the total replicas added in the replica sets of the deployment so far. -func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { +func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { if rs == nil || *(rs.Spec.Replicas) == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { return int32(0) } - rsFraction := getReplicaSetFraction(*rs, d) + rsFraction := getReplicaSetFraction(*rs, d, strategy) allowed := deploymentReplicasToAdd - deploymentReplicasAdded if deploymentReplicasToAdd > 0 { @@ -437,13 +438,13 @@ func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToA // getReplicaSetFraction estimates the fraction of replicas a replica set can have in // 1. a scaling event during a rollout or 2. when scaling a paused deployment. -func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment) int32 { +func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 { // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) if *(d.Spec.Replicas) == int32(0) { return -*(rs.Spec.Replicas) } - deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(d) + deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(&d, strategy) annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) if !ok { // If we cannot find the annotation then fallback to the current deployment size. Note that this @@ -622,8 +623,8 @@ func GetAvailableReplicaCountForReplicaSets(replicaSets []*apps.ReplicaSet) int3 } // IsRollingUpdate returns true if the strategy type is a rolling update. -func IsRollingUpdate(deployment *apps.Deployment) bool { - return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType +func IsRollingUpdate(_ *apps.Deployment) bool { + return true } // DeploymentComplete considers a deployment to be complete once all of its desired replicas @@ -704,17 +705,19 @@ func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentS // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it. // 1) The new RS is saturated: newRS's replicas == deployment's replicas // 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas -func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) { - switch deployment.Spec.Strategy.Type { - case apps.RollingUpdateDeploymentStrategyType: - // Check if we can scale up. - maxSurge, err := intstrutil.GetScaledValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true) - if err != nil { - return 0, err +func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, strategy *rolloutsv1alpha1.DeploymentStrategy) (int32, error) { + // Find the total number of pods + currentPodCount := GetReplicaCountForReplicaSets(allRSs) + switch { + case currentPodCount > *newRS.Spec.Replicas: + // Do not scale down due to partition settings. + scaleUpLimit := NewRSReplicasLimit(strategy.Partition, deployment) + if *newRS.Spec.Replicas >= scaleUpLimit { + // Cannot scale up. + return *(newRS.Spec.Replicas), nil } - // Find the total number of pods - currentPodCount := GetReplicaCountForReplicaSets(allRSs) - maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge) + // Do not scale up due to exceeded current replicas. + maxTotalPods := *(deployment.Spec.Replicas) + MaxSurge(deployment, strategy) if currentPodCount >= maxTotalPods { // Cannot scale up. return *(newRS.Spec.Replicas), nil @@ -723,11 +726,11 @@ func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, ne scaleUpCount := maxTotalPods - currentPodCount // Do not exceed the number of desired replicas. scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas)))) - return *(newRS.Spec.Replicas) + scaleUpCount, nil - case apps.RecreateDeploymentStrategyType: - return *(deployment.Spec.Replicas), nil + // Do not exceed the number of partition replicas. + return integer.Int32Min(*(newRS.Spec.Replicas)+scaleUpCount, scaleUpLimit), nil default: - return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type) + // If there is ONLY ONE active replica set, just be in line with deployment replicas. + return *(deployment.Spec.Replicas), nil } } @@ -813,6 +816,21 @@ func (o ReplicaSetsByRevision) Less(i, j int) bool { -------------------------------- BEGIN -------------------------------------- */ +// ReplicaSetsBySmallerRevision sorts a list of ReplicaSet by revision in desc, using their creation timestamp or name as a tie breaker. +// By using the creation timestamp, this sorts from old to new replica sets. +type ReplicaSetsBySmallerRevision []*apps.ReplicaSet + +func (o ReplicaSetsBySmallerRevision) Len() int { return len(o) } +func (o ReplicaSetsBySmallerRevision) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsBySmallerRevision) Less(i, j int) bool { + revision1, err1 := Revision(o[i]) + revision2, err2 := Revision(o[j]) + if err1 != nil || err2 != nil || revision1 == revision2 { + return ReplicaSetsByCreationTimestamp(o).Less(i, j) + } + return revision1 > revision2 +} + // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet { activeFilter := func(rs *apps.ReplicaSet) bool { @@ -918,3 +936,32 @@ func NewRSReplicasLimit(partition intstrutil.IntOrString, deployment *apps.Deplo } return int32(replicaLimit) } + +// DeploymentRolloutSatisfied return nil if deployment has satisfied partition and replicas, +// or will return an error. +func DeploymentRolloutSatisfied(deployment *apps.Deployment, partition intstrutil.IntOrString) error { + if deployment.Status.ObservedGeneration < deployment.Generation { + return fmt.Errorf("deployment %v observed generation %d less than generation %d", + klog.KObj(deployment), deployment.Status.ObservedGeneration, deployment.Generation) + } + if deployment.Status.Replicas != *(deployment.Spec.Replicas) { + return fmt.Errorf("deployment %v status replicas %d not equals to replicas %d", + klog.KObj(deployment), deployment.Status.Replicas, *deployment.Spec.Replicas) + } + newRSReplicasLimit := NewRSReplicasLimit(partition, deployment) + if deployment.Status.UpdatedReplicas < newRSReplicasLimit { + return fmt.Errorf("deployment %v updated replicas %d less than partition %d", + klog.KObj(deployment), deployment.Status.UpdatedReplicas, newRSReplicasLimit) + } + return nil +} + +// NewRSReplicasLowerBound ensure that newReplicasLowerBound is greater than 0 when create newRS +// unless deployment is 0 or MaxSurge > 0, this is because if we set new replicas as 0, the native +// deployment controller will flight with ours. +func NewRSReplicasLowerBound(deployment *apps.Deployment, strategy *rolloutsv1alpha1.DeploymentStrategy) int32 { + if MaxSurge(deployment, strategy) > 0 { + return int32(0) + } + return integer.Int32Min(int32(1), *deployment.Spec.Replicas) +} diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 16bc1ed7..ca17d11f 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/integer" "k8s.io/utils/pointer" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" ) func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference { @@ -481,9 +483,19 @@ func newString(s string) *string { } func TestNewRSNewReplicas(t *testing.T) { + newDeployment := generateDeployment("nginx") + newRC := generateRS(newDeployment) + rss := make([]*apps.ReplicaSet, 10) + for i := range rss { + rs := generateRS(newDeployment) + rs.Spec.Replicas = pointer.Int32(int32(i)) + rss[i] = &rs + } + tests := []struct { Name string - strategyType apps.DeploymentStrategyType + oldRSs []*apps.ReplicaSet + partition intstr.IntOrString depReplicas int32 newRSReplicas int32 maxSurge int @@ -491,41 +503,61 @@ func TestNewRSNewReplicas(t *testing.T) { }{ { "can not scale up - to newRSReplicas", - apps.RollingUpdateDeploymentStrategyType, + []*apps.ReplicaSet{rss[5]}, + intstr.FromInt(1), 1, 5, 1, 5, }, { "scale up - to depReplicas", - apps.RollingUpdateDeploymentStrategyType, + []*apps.ReplicaSet{rss[0]}, + intstr.FromString("100%"), 6, 2, 10, 6, }, { - "recreate - to depReplicas", - apps.RecreateDeploymentStrategyType, - 3, 1, 1, 3, + "scale up - to int-type partition", + []*apps.ReplicaSet{rss[8]}, + intstr.FromInt(4), + 10, 2, 10, 4, + }, + { + "scala up without old - to depReplicas", + []*apps.ReplicaSet{}, + intstr.FromInt(2), + 10, 2, 10, 10, + }, + { + "cannot scale due to partition - to newRSReplica", + []*apps.ReplicaSet{rss[8]}, + intstr.FromInt(2), + 10, 2, 10, 2, + }, + { + "cannot scale because new replicas grater than partition - to newRSReplica", + []*apps.ReplicaSet{rss[5]}, + intstr.FromInt(2), + 10, 5, 10, 5, }, } - newDeployment := generateDeployment("nginx") - newRC := generateRS(newDeployment) - rs5 := generateRS(newDeployment) - *(rs5.Spec.Replicas) = 5 for _, test := range tests { t.Run(test.Name, func(t *testing.T) { *(newDeployment.Spec.Replicas) = test.depReplicas - newDeployment.Spec.Strategy = apps.DeploymentStrategy{Type: test.strategyType} - newDeployment.Spec.Strategy.RollingUpdate = &apps.RollingUpdateDeployment{ - MaxUnavailable: func(i int) *intstr.IntOrString { - x := intstr.FromInt(i) - return &x - }(1), - MaxSurge: func(i int) *intstr.IntOrString { - x := intstr.FromInt(i) - return &x - }(test.maxSurge), + strategy := &rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxUnavailable: func(i int) *intstr.IntOrString { + x := intstr.FromInt(i) + return &x + }(1), + MaxSurge: func(i int) *intstr.IntOrString { + x := intstr.FromInt(i) + return &x + }(test.maxSurge), + }, + Partition: test.partition, } *(newRC.Spec.Replicas) = test.newRSReplicas - rs, err := NewRSNewReplicas(&newDeployment, []*apps.ReplicaSet{&rs5}, &newRC) + allRSs := append(test.oldRSs, &newRC) + rs, err := NewRSNewReplicas(&newDeployment, allRSs, &newRC, strategy) if err != nil { t.Errorf("In test case %s, got unexpected error %v", test.Name, err) } @@ -1023,7 +1055,10 @@ func TestMaxUnavailable(t *testing.T) { for _, test := range tests { t.Log(test.name) t.Run(test.name, func(t *testing.T) { - maxUnavailable := MaxUnavailable(test.deployment) + strategy := rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: test.deployment.Spec.Strategy.RollingUpdate, + } + maxUnavailable := MaxUnavailable(&test.deployment, &strategy) if test.expected != maxUnavailable { t.Fatalf("expected:%v, got:%v", test.expected, maxUnavailable) } @@ -1045,7 +1080,10 @@ func TestAnnotationUtils(t *testing.T) { for i := 10; i < 20; i++ { nextRevision := fmt.Sprintf("%d", i+1) - SetNewReplicaSetAnnotations(&tDeployment, &tRS, nextRevision, true, 5) + strategy := rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: tDeployment.Spec.Strategy.RollingUpdate, + } + SetNewReplicaSetAnnotations(&tDeployment, &tRS, &strategy, nextRevision, true, 5) //Now the ReplicaSets Revision Annotation should be i+1 if i >= 12 { @@ -1196,3 +1234,71 @@ func TestNewRSReplicasLimit(t *testing.T) { } } } + +func TestDeploymentRolloutSatisfied(t *testing.T) { + tPartition := intstr.FromInt(3) + tDeployment := apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{Generation: 2}, + Spec: apps.DeploymentSpec{Replicas: pointer.Int32(10)}, + Status: apps.DeploymentStatus{Replicas: 10, UpdatedReplicas: 3, ObservedGeneration: 2}, + } + tests := map[string]struct { + deployment func() *apps.Deployment + partition intstr.IntOrString + expect bool + }{ + "generation unsatisfied": { + deployment: func() *apps.Deployment { + d := tDeployment.DeepCopy() + d.Status.ObservedGeneration = 1 + return d + }, + partition: tPartition, + expect: false, + }, + "status.replicas greater than replicas": { + deployment: func() *apps.Deployment { + d := tDeployment.DeepCopy() + d.Status.Replicas = 11 + return d + }, + partition: tPartition, + expect: false, + }, + "status.replicas less than replicas": { + deployment: func() *apps.Deployment { + d := tDeployment.DeepCopy() + d.Status.Replicas = 9 + return d + }, + partition: tPartition, + expect: false, + }, + "partition greater than new replica set": { + deployment: func() *apps.Deployment { + d := tDeployment.DeepCopy() + d.Status.UpdatedReplicas = 2 + return d + }, + partition: tPartition, + expect: false, + }, + "partition less than new replicas set": { + deployment: func() *apps.Deployment { + d := tDeployment.DeepCopy() + d.Status.UpdatedReplicas = 5 + return d + }, + partition: tPartition, + expect: true, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + err := DeploymentRolloutSatisfied(test.deployment(), test.partition) + if (test.expect && err != nil) || (!test.expect && err == nil) { + t.Fatalf("expect error %v, but got %v", test.expect, err) + } + }) + } +} diff --git a/test/e2e/deployment_test.go b/test/e2e/deployment_test.go new file mode 100644 index 00000000..a6a12cda --- /dev/null +++ b/test/e2e/deployment_test.go @@ -0,0 +1,339 @@ +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + netv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + "github.com/openkruise/rollouts/pkg/util" +) + +var _ = SIGDescribe("Advanced Deployment", func() { + var namespace string + defaultRetry := wait.Backoff{ + Steps: 10, + Duration: 10 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, + } + + CreateObject := func(object client.Object, options ...client.CreateOption) { + By(fmt.Sprintf("create deployment %v", client.ObjectKeyFromObject(object))) + object.SetNamespace(namespace) + Expect(k8sClient.Create(context.TODO(), object)).NotTo(HaveOccurred()) + } + + GetObject := func(namespace, name string, object client.Object) error { + key := types.NamespacedName{Namespace: namespace, Name: name} + return k8sClient.Get(context.TODO(), key, object) + } + + UpdateDeployment := func(deployment *apps.Deployment, version string) *apps.Deployment { + By(fmt.Sprintf("update deployment %v to version: %v", client.ObjectKeyFromObject(deployment), version)) + var clone *apps.Deployment + Expect(retry.RetryOnConflict(defaultRetry, func() error { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + if err != nil { + return err + } + clone.Spec.Template.Spec.Containers[0].Image = deployment.Spec.Template.Spec.Containers[0].Image + clone.Spec.Template.Spec.Containers[0].Env[0].Value = version + strategy := unmarshal(clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation]) + strategy.Paused = true + clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = marshal(strategy) + return k8sClient.Update(context.TODO(), clone) + })).NotTo(HaveOccurred()) + + Eventually(func() bool { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + Expect(err).NotTo(HaveOccurred()) + By(fmt.Sprintf("image: %s, version env: %s", clone.Spec.Template.Spec.Containers[0].Image, clone.Spec.Template.Spec.Containers[0].Env[0].Value)) + return clone.Status.ObservedGeneration >= clone.Generation + }, time.Minute, time.Second).Should(BeTrue()) + return clone + } + + UpdatePartitionWithoutCheck := func(deployment *apps.Deployment, desired intstr.IntOrString) *apps.Deployment { + By(fmt.Sprintf("update deployment %v to desired: %v", client.ObjectKeyFromObject(deployment), desired)) + var clone *apps.Deployment + Expect(retry.RetryOnConflict(defaultRetry, func() error { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + if err != nil { + return err + } + strategy := unmarshal(clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation]) + if reflect.DeepEqual(desired, strategy.Partition) { + return nil + } + strategy.Paused = false + strategy.Partition = desired + clone.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = marshal(strategy) + return k8sClient.Update(context.TODO(), clone) + })).NotTo(HaveOccurred()) + return clone + } + + ListPods := func(namespace string, labelSelector *metav1.LabelSelector) ([]*v1.Pod, error) { + appList := &v1.PodList{} + selector, _ := metav1.LabelSelectorAsSelector(labelSelector) + err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}) + if err != nil { + return nil, err + } + apps := make([]*v1.Pod, 0) + for i := range appList.Items { + pod := &appList.Items[i] + if pod.DeletionTimestamp.IsZero() { + apps = append(apps, pod) + } + } + return apps, nil + } + + ListReplicaSets := func(namespace string, labelSelector *metav1.LabelSelector) ([]*apps.ReplicaSet, error) { + appList := &apps.ReplicaSetList{} + selector, _ := metav1.LabelSelectorAsSelector(labelSelector) + err := k8sClient.List(context.TODO(), appList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}) + if err != nil { + return nil, err + } + apps := make([]*apps.ReplicaSet, 0) + for i := range appList.Items { + pod := &appList.Items[i] + if pod.DeletionTimestamp.IsZero() { + apps = append(apps, pod) + } + } + return apps, nil + } + + CheckReplicas := func(deployment *apps.Deployment, replicas, available, updated int32) { + var clone *apps.Deployment + Eventually(func() bool { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + Expect(err).NotTo(HaveOccurred()) + fmt.Printf("replicas %d, available: %d, updated: %d\n", + clone.Status.Replicas, clone.Status.AvailableReplicas, clone.Status.UpdatedReplicas) + return clone.Status.Replicas == replicas && clone.Status.AvailableReplicas == available && clone.Status.UpdatedReplicas == updated + }, 10*time.Minute, time.Second).Should(BeTrue()) + + Eventually(func() int { + pods, err := ListPods(deployment.Namespace, deployment.Spec.Selector) + Expect(err).NotTo(HaveOccurred()) + return len(pods) + }, 10*time.Second, time.Second).Should(BeNumerically("==", replicas)) + + rss, err := ListReplicaSets(deployment.Namespace, deployment.Spec.Selector) + Expect(err).NotTo(HaveOccurred()) + var rsReplicas, rsAvailable, rsUpdated int32 + for _, rs := range rss { + if !rs.DeletionTimestamp.IsZero() { + continue + } + if util.EqualIgnoreHash(&rs.Spec.Template, &clone.Spec.Template) { + rsUpdated = rs.Status.Replicas + } + rsReplicas += rs.Status.Replicas + rsAvailable += rs.Status.AvailableReplicas + } + Expect(rsUpdated).Should(BeNumerically("==", updated)) + Expect(rsReplicas).Should(BeNumerically("==", replicas)) + Expect(rsAvailable).Should(BeNumerically("==", available)) + } + + ScaleDeployment := func(deployment *apps.Deployment, replicas int32) *apps.Deployment { + By(fmt.Sprintf("update deployment %v to replicas: %v", client.ObjectKeyFromObject(deployment), replicas)) + var clone *apps.Deployment + Expect(retry.RetryOnConflict(defaultRetry, func() error { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + if err != nil { + return err + } + clone.Spec.Replicas = pointer.Int32(replicas) + return k8sClient.Update(context.TODO(), clone) + })).NotTo(HaveOccurred()) + + Eventually(func() bool { + clone = &apps.Deployment{} + err := GetObject(deployment.Namespace, deployment.Name, clone) + Expect(err).NotTo(HaveOccurred()) + return clone.Status.ObservedGeneration >= clone.Generation + }, time.Minute, time.Second).Should(BeTrue()) + return clone + } + + UpdatePartitionWithCheck := func(deployment *apps.Deployment, desired intstr.IntOrString) { + By(fmt.Sprintf("update deployment %v to desired: %v, strategy: %v, and check", + client.ObjectKeyFromObject(deployment), deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation], desired)) + clone := UpdatePartitionWithoutCheck(deployment, desired) + count := 5 + for count > 0 { + desiredUpdatedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(&desired, int(*deployment.Spec.Replicas), true) + CheckReplicas(deployment, *clone.Spec.Replicas, *clone.Spec.Replicas, int32(desiredUpdatedReplicas)) + time.Sleep(time.Second) + count-- + } + } + + BeforeEach(func() { + namespace = randomNamespaceName("deployment") + ns := v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + Expect(k8sClient.Create(context.TODO(), &ns)).Should(SatisfyAny(BeNil())) + }) + + AfterEach(func() { + By("[TEST] Clean up resources after an integration test") + k8sClient.DeleteAllOf(context.TODO(), &apps.Deployment{}, client.InNamespace(namespace)) + k8sClient.DeleteAllOf(context.TODO(), &v1.Service{}, client.InNamespace(namespace)) + k8sClient.DeleteAllOf(context.TODO(), &netv1.Ingress{}, client.InNamespace(namespace)) + Expect(k8sClient.Delete(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, client.PropagationPolicy(metav1.DeletePropagationForeground))).Should(Succeed()) + time.Sleep(time.Second * 3) + }) + + KruiseDescribe("Advanced Deployment Checker", func() { + It("update with partition", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + CreateObject(deployment) + CheckReplicas(deployment, 5, 5, 5) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(1)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(2)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(3)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(5)) + }) + + It("update with scale up", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + CreateObject(deployment) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(1)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(2)) + deployment = ScaleDeployment(deployment, 10) + CheckReplicas(deployment, 10, 10, 4) + UpdatePartitionWithCheck(deployment, intstr.FromInt(7)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(10)) + }) + + It("update with scale down", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + deployment.Spec.Replicas = pointer.Int32(10) + CreateObject(deployment) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromString("0%")) + UpdatePartitionWithCheck(deployment, intstr.FromString("40%")) + deployment = ScaleDeployment(deployment, 5) + CheckReplicas(deployment, 5, 5, 2) + UpdatePartitionWithCheck(deployment, intstr.FromString("60%")) + UpdatePartitionWithCheck(deployment, intstr.FromString("100%")) + }) + + It("update with MaxSurge=1, MaxUnavailable=0", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = + `{"rollingStyle":"Partition","rollingUpdate":{"maxUnavailable":0,"maxSurge":1}}` + CreateObject(deployment) + CheckReplicas(deployment, 5, 5, 5) + deployment.Spec.Template.Spec.Containers[0].Image = "failed_image:failed" + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithoutCheck(deployment, intstr.FromInt(3)) + CheckReplicas(deployment, 6, 5, 1) + }) + + It("update with MaxSurge=0, MaxUnavailable=1", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + deployment.Annotations[rolloutsv1alpha1.DeploymentStrategyAnnotation] = + `{"rollingStyle":"Partition","rollingUpdate":{"maxUnavailable":1,"maxSurge":0}}` + deployment.Spec.MinReadySeconds = 10 + CreateObject(deployment) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithoutCheck(deployment, intstr.FromInt(3)) + CheckReplicas(deployment, 5, 4, 1) + CheckReplicas(deployment, 5, 4, 2) + CheckReplicas(deployment, 5, 4, 3) + UpdatePartitionWithoutCheck(deployment, intstr.FromInt(5)) + CheckReplicas(deployment, 5, 4, 4) + CheckReplicas(deployment, 5, 5, 5) + }) + + It("continuous update", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + CreateObject(deployment) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(2)) + UpdateDeployment(deployment, "version3") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(3)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(5)) + }) + + It("rollback", func() { + deployment := &apps.Deployment{} + deployment.Namespace = namespace + Expect(ReadYamlToObject("./test_data/deployment/deployment.yaml", deployment)).ToNot(HaveOccurred()) + CreateObject(deployment) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(2)) + UpdateDeployment(deployment, "version3") + UpdatePartitionWithCheck(deployment, intstr.FromInt(0)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(3)) + UpdateDeployment(deployment, "version2") + UpdatePartitionWithCheck(deployment, intstr.FromInt(2)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(3)) + UpdatePartitionWithCheck(deployment, intstr.FromInt(5)) + }) + }) +}) + +func unmarshal(strategyAnno string) *rolloutsv1alpha1.DeploymentStrategy { + strategy := &rolloutsv1alpha1.DeploymentStrategy{} + _ = json.Unmarshal([]byte(strategyAnno), strategy) + return strategy +} + +func marshal(strategy *rolloutsv1alpha1.DeploymentStrategy) string { + strategyAnno, _ := json.Marshal(strategy) + return string(strategyAnno) +} diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index 6f7d10cc..751a36d3 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -1391,8 +1391,6 @@ var _ = SIGDescribe("Rollout", func() { WaitRolloutNotFound(rollout.Name) Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) fmt.Println(util.DumpJSON(workload)) - Expect(workload.Spec.Paused).Should(BeTrue()) - Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 0)) workload.Spec.Paused = false UpdateDeployment(workload) By("Update deployment paused=false") diff --git a/test/e2e/test_data/deployment/deployment.yaml b/test/e2e/test_data/deployment/deployment.yaml new file mode 100644 index 00000000..28c0fc93 --- /dev/null +++ b/test/e2e/test_data/deployment/deployment.yaml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sample + labels: + app: busybox + Annotations: + batchrelease.rollouts.kruise.io/control-info: '{"apiVersion":"rollouts.kruise.io/v1alpha1","kind":"BatchRelease","name":"rollouts-demo","uid":"45891961-8c29-4ea9-8e61-fd5a1fd19ffa","controller":true,"blockOwnerDeletion":true}' + rollouts.kruise.io/deployment-strategy: '{"rollingUpdate":{"maxUnavailable":"25%","maxSurge":"25%"}}' +spec: + paused: true + replicas: 5 + strategy: + type: Recreate + selector: + matchLabels: + app: busybox + template: + metadata: + labels: + app: busybox + spec: + containers: + - name: busybox + image: busybox:1.32 + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c", "sleep 10000"] + env: + - name: VERSION + value: version1 + resources: + limits: + memory: "10Mi" + cpu: "10m" \ No newline at end of file