From 9986217d4810acaa78a573763fbb27e5d077a7fa Mon Sep 17 00:00:00 2001 From: "mingzhou.swx" Date: Wed, 18 Jan 2023 21:56:21 +0800 Subject: [PATCH] rolling deployment in partition-style Signed-off-by: mingzhou.swx --- .../e2e-advanced-deployment-1.19.yaml | 20 +- .../e2e-advanced-deployment-1.23.yaml | 20 +- Makefile | 2 +- api/v1alpha1/deployment_types.go | 8 + api/v1alpha1/rollout_types.go | 13 +- config/default/manager_auth_proxy_patch.yaml | 5 +- main.go | 2 + .../batchrelease/batchrelease_executor.go | 25 +- .../batchrelease/batchrelease_status.go | 2 +- .../control/canarystyle/control_plane.go | 6 +- .../control/canarystyle/deployment/canary.go | 6 +- .../control/canarystyle/deployment/stable.go | 23 +- .../partitionstyle/cloneset/control.go | 21 +- .../partitionstyle/cloneset/control_test.go | 2 +- .../control/partitionstyle/control_plane.go | 14 +- .../partitionstyle/deployment/control.go | 186 ++++++ .../partitionstyle/deployment/control_test.go | 375 ++++++++++++ .../control/partitionstyle/interface.go | 8 +- .../partitionstyle/statefulset/control.go | 24 +- .../statefulset/control_test.go | 2 +- pkg/controller/batchrelease/control/util.go | 8 + .../batchrelease/control/util_test.go | 47 ++ pkg/controller/deployment/controller.go | 56 +- .../deployment/deployment_controller.go | 1 + .../deployment/deployment_controller_test.go | 16 + .../deployment/deployment_event_handler.go | 71 +++ pkg/controller/deployment/progress.go | 1 + pkg/controller/deployment/rolling.go | 1 + pkg/controller/deployment/rolling_test.go | 16 + pkg/controller/deployment/sync.go | 1 + pkg/controller/rollout/rollout_canary.go | 12 +- pkg/controller/rollout/rollout_canary_test.go | 5 +- pkg/controller/rollout/rollout_progressing.go | 2 +- .../rollout/rollout_progressing_test.go | 2 +- pkg/controller/rollout/rollout_status.go | 4 +- pkg/util/constant.go | 2 + pkg/util/controller_finder.go | 106 +++- pkg/util/patch/patch_utils.go | 224 ++++++++ pkg/util/patch/patch_utils_test.go | 45 ++ pkg/util/workloads_utils.go | 60 ++ pkg/util/workloads_utils_test.go | 115 ++++ .../rollout_create_update_handler.go | 24 + .../rollout_create_update_handler_test.go | 67 ++- .../util/configuration/configuration.go | 16 +- .../mutating/workload_update_handler.go | 79 ++- .../mutating/workload_update_handler_test.go | 2 + test/e2e/rollout_test.go | 540 ++++++++++++++++++ 47 files changed, 2148 insertions(+), 139 deletions(-) create mode 100644 pkg/controller/batchrelease/control/partitionstyle/deployment/control.go create mode 100644 pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go create mode 100644 pkg/controller/deployment/deployment_event_handler.go create mode 100644 pkg/util/patch/patch_utils.go create mode 100644 pkg/util/patch/patch_utils_test.go diff --git a/.github/workflows/e2e-advanced-deployment-1.19.yaml b/.github/workflows/e2e-advanced-deployment-1.19.yaml index 527b0018..cad7c5c8 100644 --- a/.github/workflows/e2e-advanced-deployment-1.19.yaml +++ b/.github/workflows/e2e-advanced-deployment-1.19.yaml @@ -90,7 +90,7 @@ jobs: echo "Timeout to wait for kruise-rollout ready" exit 1 fi - - name: Run E2E Tests + - name: Run E2E Tests For Deployment Controller run: | export KUBECONFIG=/home/runner/.kube/config make ginkgo @@ -108,3 +108,21 @@ jobs: exit 1 fi exit $retVal + - name: Run E2E Tests For Control Plane + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi + exit $retVal diff --git a/.github/workflows/e2e-advanced-deployment-1.23.yaml b/.github/workflows/e2e-advanced-deployment-1.23.yaml index 3aaa9ee3..e236c34d 100644 --- a/.github/workflows/e2e-advanced-deployment-1.23.yaml +++ b/.github/workflows/e2e-advanced-deployment-1.23.yaml @@ -90,7 +90,7 @@ jobs: echo "Timeout to wait for kruise-rollout ready" exit 1 fi - - name: Run E2E Tests + - name: Run E2E Tests For Deployment Controller run: | export KUBECONFIG=/home/runner/.kube/config make ginkgo @@ -107,4 +107,22 @@ jobs: kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout exit 1 fi + exit $retVal + - name: Run E2E Tests For Control Plane + run: | + export KUBECONFIG=/home/runner/.kube/config + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Advanced Deployment canary rollout with Ingress' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi exit $retVal \ No newline at end of file diff --git a/Makefile b/Makefile index 69312295..1ecec6c4 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ build: manifests generate fmt vet ## Build manager binary. go build -o bin/manager main.go run: manifests generate fmt vet ## Run a controller from your host. - go run ./main.go + go run ./main.go --v=5 --feature-gates=AdvancedDeployment=true docker-build: ## Build docker image with the manager. docker build -t ${IMG} . diff --git a/api/v1alpha1/deployment_types.go b/api/v1alpha1/deployment_types.go index b750db5e..940dd341 100644 --- a/api/v1alpha1/deployment_types.go +++ b/api/v1alpha1/deployment_types.go @@ -13,6 +13,14 @@ const ( // DeploymentExtraStatusAnnotation is annotation for deployment, // which is extra status field of Advanced Deployment. DeploymentExtraStatusAnnotation = "rollouts.kruise.io/deployment-extra-status" + + // DeploymentStableRevisionLabel is label for deployment, + // which record the stable revision during the current rolling process. + DeploymentStableRevisionLabel = "rollouts.kruise.io/stable-revision" + + // AdvancedDeploymentControlLabel is label for deployment, + // which labels whether the deployment is controlled by advanced-deployment-controller. + AdvancedDeploymentControlLabel = "rollouts.kruise.io/controlled-by-advanced-deployment-controller" ) // DeploymentStrategy is strategy field for Advanced Deployment diff --git a/api/v1alpha1/rollout_types.go b/api/v1alpha1/rollout_types.go index db174046..4661058c 100644 --- a/api/v1alpha1/rollout_types.go +++ b/api/v1alpha1/rollout_types.go @@ -42,12 +42,15 @@ const ( // RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style. RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch" - // DeploymentRolloutStyleAnnotation define the rolling behavior for Deployment. + // RolloutStyleAnnotation define the rolling behavior for Deployment. // must be "partition" or "canary": - // * "partition" means rolling Deployment in batches just like CloneSet, and will NOT create any extra Deployment; - // * "canary" means rolling in canary way, and will create a canary Deployment. - // Defaults to canary - DeploymentRolloutStyleAnnotation = "rollouts.kruise.io/deployment-rolling-style" + // * "partition" means rolling in batches just like CloneSet, and will NOT create any extra Workload; + // * "canary" means rolling in canary way, and will create a canary Workload. + // Currently, only Deployment support both "partition" and "canary" rolling styles. + // For other workload types, they only support "partition" styles. + // Defaults to "canary" to Deployment. + // Defaults to "partition" to the others. + RolloutStyleAnnotation = "rollouts.kruise.io/rolling-style" ) // RolloutSpec defines the desired state of Rollout diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index aa9b95f5..36663923 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -15,4 +15,7 @@ spec: - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" - "--feature-gates=AdvancedDeployment=true" - - "--v=3" + - "--v=5" + env: + - name: KUBE_CACHE_MUTATION_DETECTOR + value: "true" diff --git a/main.go b/main.go index 73b6ce3b..11dd9f4f 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,7 @@ import ( utilfeature "github.com/openkruise/rollouts/pkg/util/feature" "github.com/openkruise/rollouts/pkg/webhook" "github.com/spf13/pflag" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -57,6 +58,7 @@ func init() { utilruntime.Must(kruisev1beta1.AddToScheme(scheme)) utilruntime.Must(rolloutsv1alpha1.AddToScheme(scheme)) utilruntime.Must(gatewayv1alpha2.AddToScheme(scheme)) + utilruntime.Must(admissionregistrationv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } diff --git a/pkg/controller/batchrelease/batchrelease_executor.go b/pkg/controller/batchrelease/batchrelease_executor.go index c66d73a6..7ae22f07 100644 --- a/pkg/controller/batchrelease/batchrelease_executor.go +++ b/pkg/controller/batchrelease/batchrelease_executor.go @@ -19,15 +19,17 @@ package batchrelease import ( "fmt" "reflect" + "strings" "time" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle" - "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment" + canarydeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/cloneset" + partitiondeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/deployment" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/statefulset" "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" @@ -100,6 +102,8 @@ func (r *Executor) executeBatchReleasePlan(release *v1alpha1.BatchRelease, newSt case err == nil: newStatus.Phase = v1alpha1.RolloutPhaseProgressing result = reconcile.Result{RequeueAfter: DefaultDuration} + default: + klog.Warningf("Failed to initialize %v, err %v", klog.KObj(release), err) } case v1alpha1.RolloutPhaseProgressing: @@ -111,6 +115,8 @@ func (r *Executor) executeBatchReleasePlan(release *v1alpha1.BatchRelease, newSt switch { case err == nil: newStatus.Phase = v1alpha1.RolloutPhaseCompleted + default: + klog.Warningf("Failed to finalize %v, err %v", klog.KObj(release), err) } case v1alpha1.RolloutPhaseCompleted: @@ -140,6 +146,8 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1 case err == nil: result = reconcile.Result{RequeueAfter: DefaultDuration} newStatus.CanaryStatus.CurrentBatchState = v1alpha1.VerifyingBatchState + default: + klog.Warningf("Failed to upgrade %v, err %v", klog.KObj(release), err) } case v1alpha1.VerifyingBatchState: @@ -149,6 +157,7 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1 case err != nil: // should go to upgrade state to do again to avoid dead wait. newStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState + klog.Warningf("%v current batch is not ready, err %v", klog.KObj(release), err) default: now := metav1.Now() newStatus.CanaryStatus.BatchReadyTime = &now @@ -164,6 +173,7 @@ func (r *Executor) progressBatches(release *v1alpha1.BatchRelease, newStatus *v1 // if the batch ready condition changed due to some reasons, just recalculate the current batch. newStatus.CanaryStatus.BatchReadyTime = nil newStatus.CanaryStatus.CurrentBatchState = v1alpha1.UpgradingBatchState + klog.Warningf("%v current batch is not ready, err %v", klog.KObj(release), err) case !isPartitioned(release): r.moveToNextBatch(release, newStatus) result = reconcile.Result{RequeueAfter: DefaultDuration} @@ -195,19 +205,24 @@ func (r *Executor) getReleaseController(release *v1alpha1.BatchRelease, newStatu switch targetRef.APIVersion { case appsv1alpha1.GroupVersion.String(): if targetRef.Kind == reflect.TypeOf(appsv1alpha1.CloneSet{}).Name() { - klog.InfoS("Using CloneSet batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + klog.InfoS("Using CloneSet partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) return partitionstyle.NewControlPlane(cloneset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil } case apps.SchemeGroupVersion.String(): if targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() { - klog.InfoS("Using Deployment batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) - return canarystyle.NewControlPlane(deployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil + if strings.EqualFold(release.Annotations[v1alpha1.RolloutStyleAnnotation], string(v1alpha1.PartitionRollingStyle)) { + klog.InfoS("Using Deployment partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + return partitionstyle.NewControlPlane(partitiondeployment.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil + } else { + klog.InfoS("Using Deployment canary-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + return canarystyle.NewControlPlane(canarydeployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil + } } } // try to use StatefulSet-like rollout controller by default - klog.InfoS("Using StatefulSet-like batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + klog.InfoS("Using StatefulSet-Like partition-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) return partitionstyle.NewControlPlane(statefulset.NewController, r.client, r.recorder, release, newStatus, targetKey, gvk), nil } diff --git a/pkg/controller/batchrelease/batchrelease_status.go b/pkg/controller/batchrelease/batchrelease_status.go index 1baf9d8c..81c660b9 100644 --- a/pkg/controller/batchrelease/batchrelease_status.go +++ b/pkg/controller/batchrelease/batchrelease_status.go @@ -49,7 +49,7 @@ func (r *Executor) syncStatusBeforeExecuting(release *v1alpha1.BatchRelease, new // (3). Plan is changed during rollout // (4). Plan status is unexpected/unhealthy case isPlanCompleted(release): - message = "release plan has been terminated, will do nothing" + message = "release plan has been completed, will do nothing" needStopThisRound = true case isPlanFinalizing(release): diff --git a/pkg/controller/batchrelease/control/canarystyle/control_plane.go b/pkg/controller/batchrelease/control/canarystyle/control_plane.go index bda594a3..03f315ff 100644 --- a/pkg/controller/batchrelease/control/canarystyle/control_plane.go +++ b/pkg/controller/batchrelease/control/canarystyle/control_plane.go @@ -104,7 +104,8 @@ func (rc *realCanaryController) UpgradeBatch() error { } batchContext := rc.CalculateBatchContext(rc.release) - klog.Infof("BatchRelease %v upgrade batch: %s", klog.KObj(rc.release), batchContext.Log()) + klog.Infof("BatchRelease %v calculated context when upgrade batch: %s", + klog.KObj(rc.release), batchContext.Log()) return canary.UpgradeBatch(batchContext) } @@ -129,7 +130,8 @@ func (rc *realCanaryController) CheckBatchReady() error { } batchContext := rc.CalculateBatchContext(rc.release) - klog.Infof("BatchRelease %v check batch: %s", klog.KObj(rc.release), batchContext.Log()) + klog.Infof("BatchRelease %v calculated context when check batch ready: %s", + klog.KObj(rc.release), batchContext.Log()) return batchContext.IsBatchReady() } diff --git a/pkg/controller/batchrelease/control/canarystyle/deployment/canary.go b/pkg/controller/batchrelease/control/canarystyle/deployment/canary.go index 80d4b457..a4ddf198 100644 --- a/pkg/controller/batchrelease/control/canarystyle/deployment/canary.go +++ b/pkg/controller/batchrelease/control/canarystyle/deployment/canary.go @@ -84,11 +84,7 @@ func (r *realCanaryController) UpgradeBatch(ctx *batchcontext.BatchContext) erro } body := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desired) - if err := r.canaryClient.Patch(context.TODO(), deployment, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - klog.Infof("Successfully submit rolling replicas %d to Deployment %v", desired, klog.KObj(deployment)) - return nil + return r.canaryClient.Patch(context.TODO(), deployment, client.RawPatch(types.StrategicMergePatchType, []byte(body))) } func (r *realCanaryController) Create(release *v1alpha1.BatchRelease) error { diff --git a/pkg/controller/batchrelease/control/canarystyle/deployment/stable.go b/pkg/controller/batchrelease/control/canarystyle/deployment/stable.go index d0252008..a90b9792 100644 --- a/pkg/controller/batchrelease/control/canarystyle/deployment/stable.go +++ b/pkg/controller/batchrelease/control/canarystyle/deployment/stable.go @@ -25,7 +25,6 @@ import ( "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -53,37 +52,27 @@ func (rc *realStableController) Initialize(release *v1alpha1.BatchRelease) error owner := control.BuildReleaseControlInfo(release) body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, util.BatchReleaseControlAnnotation, owner) - if err := rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - klog.Infof("Successfully claim Deployment %v", klog.KObj(rc.stableObject)) - return nil + return rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.StrategicMergePatchType, []byte(body))) } -func (rc *realStableController) Finalize(release *v1alpha1.BatchRelease) (err error) { +func (rc *realStableController) Finalize(release *v1alpha1.BatchRelease) error { if rc.stableObject == nil { return nil // no need to process deleted object } - defer func() { - if err == nil { - klog.Infof("Successfully finalize Deployment %v", klog.KObj(rc.stableObject)) - } - }() - // if batchPartition == nil, workload should be promoted; pause := release.Spec.ReleasePlan.BatchPartition != nil body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}},"spec":{"paused":%v}}`, util.BatchReleaseControlAnnotation, pause) d := util.GetEmptyObjectWithKey(rc.stableObject) - if err = rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return + if err := rc.stableClient.Patch(context.TODO(), d, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + return err } if control.ShouldWaitResume(release) { - err = waitAllUpdatedAndReady(d.(*apps.Deployment)) + return waitAllUpdatedAndReady(d.(*apps.Deployment)) } - return + return nil } func waitAllUpdatedAndReady(deployment *apps.Deployment) error { diff --git a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go index 87db6ed9..2e5e6194 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go +++ b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -50,7 +49,7 @@ func NewController(cli client.Client, key types.NamespacedName, _ schema.GroupVe } } -func (rc *realController) GetInfo() *util.WorkloadInfo { +func (rc *realController) GetWorkloadInfo() *util.WorkloadInfo { return rc.WorkloadInfo } @@ -85,11 +84,7 @@ func (rc *realController) Initialize(release *v1alpha1.BatchRelease) error { owner := control.BuildReleaseControlInfo(release) body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}},"spec":{"updateStrategy":{"paused":%v,"partition":"%s"}}}`, util.BatchReleaseControlAnnotation, owner, false, "100%") - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - klog.Infof("Successfully initialized CloneSet %v", klog.KObj(clone)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { @@ -112,11 +107,7 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { } clone := util.GetEmptyObjectWithKey(rc.object) - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - klog.Infof("Successfully submit partition %v for CloneSet %v", ctx.DesiredPartition, klog.KObj(clone)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { @@ -133,11 +124,7 @@ func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}%s}`, util.BatchReleaseControlAnnotation, specBody) clone := util.GetEmptyObjectWithKey(rc.object) - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - klog.Infof("Successfully finalize StatefulSet %v", klog.KObj(rc.object)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) { diff --git a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control_test.go b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control_test.go index 3be07836..435ec33a 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/cloneset/control_test.go +++ b/pkg/controller/batchrelease/control/partitionstyle/cloneset/control_test.go @@ -319,7 +319,7 @@ func TestRealController(t *testing.T) { Expect(cli.Get(context.TODO(), cloneKey, fetch)).NotTo(HaveOccurred()) Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal("")) - stableInfo := controller.GetInfo() + stableInfo := controller.GetWorkloadInfo() Expect(stableInfo).ShouldNot(BeNil()) checkWorkloadInfo(stableInfo, clone) } diff --git a/pkg/controller/batchrelease/control/partitionstyle/control_plane.go b/pkg/controller/batchrelease/control/partitionstyle/control_plane.go index c44592b3..9788175b 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/control_plane.go +++ b/pkg/controller/batchrelease/control/partitionstyle/control_plane.go @@ -70,7 +70,7 @@ func (rc *realBatchControlPlane) Initialize() error { } // record revision and replicas - workloadInfo := controller.GetInfo() + workloadInfo := controller.GetWorkloadInfo() rc.newStatus.StableRevision = workloadInfo.Status.StableRevision rc.newStatus.UpdateRevision = workloadInfo.Status.UpdateRevision rc.newStatus.ObservedWorkloadReplicas = workloadInfo.Replicas @@ -89,7 +89,7 @@ func (rc *realBatchControlPlane) UpgradeBatch() error { return err } - if controller.GetInfo().Replicas == 0 { + if controller.GetWorkloadInfo().Replicas == 0 { return nil } @@ -102,7 +102,8 @@ func (rc *realBatchControlPlane) UpgradeBatch() error { if err != nil { return err } - klog.Infof("BatchRelease %v upgrade batch: %s", klog.KObj(rc.release), batchContext.Log()) + klog.Infof("BatchRelease %v calculated context when upgrade batch: %s", + klog.KObj(rc.release), batchContext.Log()) err = controller.UpgradeBatch(batchContext) if err != nil { @@ -118,7 +119,7 @@ func (rc *realBatchControlPlane) CheckBatchReady() error { return err } - if controller.GetInfo().Replicas == 0 { + if controller.GetWorkloadInfo().Replicas == 0 { return nil } @@ -129,7 +130,8 @@ func (rc *realBatchControlPlane) CheckBatchReady() error { return err } - klog.Infof("BatchRelease %v check batch: %s", klog.KObj(rc.release), batchContext.Log()) + klog.Infof("BatchRelease %v calculated context when check batch ready: %s", + klog.KObj(rc.release), batchContext.Log()) return batchContext.IsBatchReady() } @@ -158,7 +160,7 @@ func (rc *realBatchControlPlane) SyncWorkloadInformation() (control.WorkloadEven return control.WorkloadUnknownState, nil, err } - workloadInfo := controller.GetInfo() + workloadInfo := controller.GetWorkloadInfo() if !workloadInfo.IsStable() { klog.Infof("Workload(%v) still reconciling, waiting for it to complete, generation: %v, observed: %v", workloadInfo.LogKey, workloadInfo.Generation, workloadInfo.Status.ObservedGeneration) diff --git a/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go b/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go new file mode 100644 index 00000000..828590e5 --- /dev/null +++ b/pkg/controller/batchrelease/control/partitionstyle/deployment/control.go @@ -0,0 +1,186 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "context" + + "github.com/openkruise/rollouts/api/v1alpha1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle" + deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" + "github.com/openkruise/rollouts/pkg/util" + "github.com/openkruise/rollouts/pkg/util/patch" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type realController struct { + *util.WorkloadInfo + client client.Client + pods []*corev1.Pod + key types.NamespacedName + object *apps.Deployment +} + +func NewController(cli client.Client, key types.NamespacedName, _ schema.GroupVersionKind) partitionstyle.Interface { + return &realController{ + key: key, + client: cli, + } +} + +func (rc *realController) GetWorkloadInfo() *util.WorkloadInfo { + return rc.WorkloadInfo +} + +func (rc *realController) BuildController() (partitionstyle.Interface, error) { + if rc.object != nil { + return rc, nil + } + object := &apps.Deployment{} + if err := rc.client.Get(context.TODO(), rc.key, object); err != nil { + return rc, err + } + rc.object = object + rc.WorkloadInfo = rc.getWorkloadInfo(object) + return rc, nil +} + +func (rc *realController) ListOwnedPods() ([]*corev1.Pod, error) { + if rc.pods != nil { + return rc.pods, nil + } + var err error + rc.pods, err = util.ListOwnedPods(rc.client, rc.object) + return rc.pods, err +} + +func (rc *realController) Initialize(release *v1alpha1.BatchRelease) error { + if deploymentutil.IsUnderRolloutControl(rc.object) { + return nil // No need initialize again. + } + + // Set strategy to deployment annotations + strategy := util.GetDeploymentStrategy(rc.object) + rollingUpdate := strategy.RollingUpdate + if rc.object.Spec.Strategy.RollingUpdate != nil { + rollingUpdate = rc.object.Spec.Strategy.RollingUpdate + } + strategy = v1alpha1.DeploymentStrategy{ + Paused: false, + Partition: intstr.FromInt(0), + RollingStyle: v1alpha1.PartitionRollingStyle, + RollingUpdate: rollingUpdate, + } + + d := rc.object.DeepCopy() + patchData := patch.NewDeploymentPatch() + patchData.InsertLabel(v1alpha1.AdvancedDeploymentControlLabel, "true") + patchData.InsertAnnotation(v1alpha1.DeploymentStrategyAnnotation, util.DumpJSON(&strategy)) + patchData.InsertAnnotation(util.BatchReleaseControlAnnotation, util.DumpJSON(metav1.NewControllerRef( + release, release.GetObjectKind().GroupVersionKind()))) + + // Disable the native deployment controller + patchData.UpdatePaused(true) + patchData.UpdateStrategy(apps.DeploymentStrategy{Type: apps.RecreateDeploymentStrategyType}) + return rc.client.Patch(context.TODO(), d, patchData) +} + +func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { + if !deploymentutil.IsUnderRolloutControl(rc.object) { + klog.Warningf("Cannot upgrade batch, because "+ + "deployment %v has ridden out of our control", klog.KObj(rc.object)) + return nil + } + + strategy := util.GetDeploymentStrategy(rc.object) + if control.IsCurrentMoreThanOrEqualToDesired(strategy.Partition, ctx.DesiredPartition) { + return nil // Satisfied, no need patch again. + } + + d := rc.object.DeepCopy() + strategy.Partition = ctx.DesiredPartition + patchData := patch.NewDeploymentPatch() + patchData.InsertAnnotation(v1alpha1.DeploymentStrategyAnnotation, util.DumpJSON(&strategy)) + return rc.client.Patch(context.TODO(), d, patchData) +} + +func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { + if rc.object == nil || !deploymentutil.IsUnderRolloutControl(rc.object) { + return nil // No need to finalize again. + } + + patchData := patch.NewDeploymentPatch() + if release.Spec.ReleasePlan.BatchPartition == nil { + strategy := util.GetDeploymentStrategy(rc.object) + patchData.UpdatePaused(false) + patchData.UpdateStrategy(apps.DeploymentStrategy{Type: apps.RollingUpdateDeploymentStrategyType, RollingUpdate: strategy.RollingUpdate}) + patchData.DeleteAnnotation(v1alpha1.DeploymentStrategyAnnotation) + patchData.DeleteAnnotation(v1alpha1.DeploymentExtraStatusAnnotation) + patchData.DeleteLabel(v1alpha1.DeploymentStableRevisionLabel) + patchData.DeleteLabel(v1alpha1.AdvancedDeploymentControlLabel) + } + d := rc.object.DeepCopy() + patchData.DeleteAnnotation(util.BatchReleaseControlAnnotation) + return rc.client.Patch(context.TODO(), d, patchData) +} + +func (rc *realController) CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) { + rolloutID := release.Spec.ReleasePlan.RolloutID + if rolloutID != "" { + // if rollout-id is set, the pod will be patched batch label, + // so we have to list pod here. + if _, err := rc.ListOwnedPods(); err != nil { + return nil, err + } + } + + currentBatch := release.Status.CanaryStatus.CurrentBatch + desiredPartition := release.Spec.ReleasePlan.Batches[currentBatch].CanaryReplicas + PlannedUpdatedReplicas := deploymentutil.NewRSReplicasLimit(desiredPartition, rc.object) + + return &batchcontext.BatchContext{ + Pods: rc.pods, + RolloutID: rolloutID, + CurrentBatch: currentBatch, + UpdateRevision: release.Status.UpdateRevision, + DesiredPartition: desiredPartition, + FailureThreshold: release.Spec.ReleasePlan.FailureThreshold, + + Replicas: rc.Replicas, + UpdatedReplicas: rc.Status.UpdatedReplicas, + UpdatedReadyReplicas: rc.Status.UpdatedReadyReplicas, + PlannedUpdatedReplicas: PlannedUpdatedReplicas, + DesiredUpdatedReplicas: PlannedUpdatedReplicas, + }, nil +} + +func (rc *realController) getWorkloadInfo(d *apps.Deployment) *util.WorkloadInfo { + workloadInfo := util.ParseWorkload(d) + extraStatus := util.GetDeploymentExtraStatus(d) + workloadInfo.Status.UpdatedReadyReplicas = extraStatus.UpdatedReadyReplicas + workloadInfo.Status.StableRevision = d.Labels[v1alpha1.DeploymentStableRevisionLabel] + return workloadInfo +} diff --git a/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go b/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go new file mode 100644 index 00000000..c0c25885 --- /dev/null +++ b/pkg/controller/batchrelease/control/partitionstyle/deployment/control_test.go @@ -0,0 +1,375 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + "github.com/openkruise/rollouts/api/v1alpha1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/util" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var ( + scheme = runtime.NewScheme() + + deploymentKey = types.NamespacedName{ + Name: "deployment", + Namespace: "default", + } + + deploymentDemo = &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentKey.Name, + Namespace: deploymentKey.Namespace, + Generation: 1, + Labels: map[string]string{ + "app": "busybox", + }, + Annotations: map[string]string{ + "type": "unit-test", + }, + }, + Spec: apps.DeploymentSpec{ + Paused: true, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "busybox", + }, + }, + Replicas: pointer.Int32(10), + Strategy: apps.DeploymentStrategy{ + Type: apps.RollingUpdateDeploymentStrategyType, + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + MaxSurge: &intstr.IntOrString{Type: intstr.String, StrVal: "20%"}, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "busybox", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox:latest", + }, + }, + }, + }, + }, + Status: apps.DeploymentStatus{ + Replicas: 10, + UpdatedReplicas: 10, + ReadyReplicas: 10, + AvailableReplicas: 10, + CollisionCount: pointer.Int32Ptr(1), + ObservedGeneration: 1, + }, + } + + releaseDemo = &v1alpha1.BatchRelease{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rollouts.kruise.io/v1alpha1", + Kind: "BatchRelease", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "release", + Namespace: deploymentKey.Namespace, + UID: uuid.NewUUID(), + }, + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: intstr.FromString("10%"), + }, + { + CanaryReplicas: intstr.FromString("50%"), + }, + { + CanaryReplicas: intstr.FromString("100%"), + }, + }, + }, + TargetRef: v1alpha1.ObjectRef{ + WorkloadRef: &v1alpha1.WorkloadRef{ + APIVersion: deploymentDemo.APIVersion, + Kind: deploymentDemo.Kind, + Name: deploymentDemo.Name, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 1, + }, + }, + } +) + +func init() { + apps.AddToScheme(scheme) + v1alpha1.AddToScheme(scheme) + kruiseappsv1alpha1.AddToScheme(scheme) +} + +func TestCalculateBatchContext(t *testing.T) { + RegisterFailHandler(Fail) + + percent := intstr.FromString("20%") + cases := map[string]struct { + workload func() *apps.Deployment + release func() *v1alpha1.BatchRelease + result *batchcontext.BatchContext + }{ + "noraml case": { + workload: func() *apps.Deployment { + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha1.DeploymentStrategyAnnotation: util.DumpJSON(&v1alpha1.DeploymentStrategy{ + RollingStyle: v1alpha1.PartitionRollingStyle, + RollingUpdate: &apps.RollingUpdateDeployment{MaxUnavailable: &percent, MaxSurge: &percent}, + Partition: percent, + Paused: false, + }), + v1alpha1.DeploymentExtraStatusAnnotation: util.DumpJSON(&v1alpha1.DeploymentExtraStatus{ + UpdatedReadyReplicas: 1, + ExpectedUpdatedReplicas: 2, + }), + }, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(10), + }, + Status: apps.DeploymentStatus{ + Replicas: 10, + UpdatedReplicas: 2, + AvailableReplicas: 9, + ReadyReplicas: 9, + }, + } + return deployment + }, + release: func() *v1alpha1.BatchRelease { + r := &v1alpha1.BatchRelease{ + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FailureThreshold: &percent, + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: percent, + }, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 0, + }, + UpdateRevision: "version-2", + }, + } + return r + }, + result: &batchcontext.BatchContext{ + CurrentBatch: 0, + UpdateRevision: "version-2", + DesiredPartition: percent, + FailureThreshold: &percent, + + Replicas: 10, + UpdatedReplicas: 2, + UpdatedReadyReplicas: 1, + PlannedUpdatedReplicas: 2, + DesiredUpdatedReplicas: 2, + }, + }, + "partition=90%, replicas=5": { + workload: func() *apps.Deployment { + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha1.DeploymentStrategyAnnotation: util.DumpJSON(&v1alpha1.DeploymentStrategy{ + RollingStyle: v1alpha1.PartitionRollingStyle, + RollingUpdate: &apps.RollingUpdateDeployment{MaxUnavailable: &percent, MaxSurge: &percent}, + Partition: intstr.FromString("20%"), + Paused: false, + }), + v1alpha1.DeploymentExtraStatusAnnotation: util.DumpJSON(&v1alpha1.DeploymentExtraStatus{ + UpdatedReadyReplicas: 4, + ExpectedUpdatedReplicas: 4, + }), + }, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(5), + }, + Status: apps.DeploymentStatus{ + Replicas: 5, + UpdatedReplicas: 4, + AvailableReplicas: 5, + ReadyReplicas: 5, + }, + } + return deployment + }, + release: func() *v1alpha1.BatchRelease { + r := &v1alpha1.BatchRelease{ + Spec: v1alpha1.BatchReleaseSpec{ + ReleasePlan: v1alpha1.ReleasePlan{ + FailureThreshold: &percent, + FinalizingPolicy: v1alpha1.WaitResumeFinalizingPolicyType, + Batches: []v1alpha1.ReleaseBatch{ + { + CanaryReplicas: intstr.FromString("90%"), + }, + }, + }, + }, + Status: v1alpha1.BatchReleaseStatus{ + CanaryStatus: v1alpha1.BatchReleaseCanaryStatus{ + CurrentBatch: 0, + }, + UpdateRevision: "version-2", + }, + } + return r + }, + result: &batchcontext.BatchContext{ + CurrentBatch: 0, + UpdateRevision: "version-2", + DesiredPartition: intstr.FromString("90%"), + FailureThreshold: &percent, + + Replicas: 5, + UpdatedReplicas: 4, + UpdatedReadyReplicas: 4, + PlannedUpdatedReplicas: 4, + DesiredUpdatedReplicas: 4, + }, + }, + } + + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.workload()).Build() + control := realController{ + client: cli, + } + _, err := control.BuildController() + Expect(err).NotTo(HaveOccurred()) + got, err := control.CalculateBatchContext(cs.release()) + fmt.Println(got) + Expect(err).NotTo(HaveOccurred()) + Expect(got.Log()).Should(Equal(cs.result.Log())) + }) + } +} + +func TestRealController(t *testing.T) { + RegisterFailHandler(Fail) + + release := releaseDemo.DeepCopy() + clone := deploymentDemo.DeepCopy() + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(release, clone).Build() + c := NewController(cli, deploymentKey, clone.GroupVersionKind()).(*realController) + controller, err := c.BuildController() + Expect(err).NotTo(HaveOccurred()) + + err = controller.Initialize(release) + Expect(err).NotTo(HaveOccurred()) + fetch := &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + Expect(fetch.Spec.Paused).Should(BeTrue()) + Expect(fetch.Spec.Strategy.Type).Should(Equal(apps.RecreateDeploymentStrategyType)) + Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal(getControlInfo(release))) + strategy := util.GetDeploymentStrategy(fetch) + Expect(strategy.Paused).Should(BeFalse()) + c.object = fetch // mock + + for { + batchContext, err := controller.CalculateBatchContext(release) + Expect(err).NotTo(HaveOccurred()) + err = controller.UpgradeBatch(batchContext) + fetch := &apps.Deployment{} + // mock + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + c.object = fetch + if err == nil { + break + } + } + fetch = &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + strategy = util.GetDeploymentStrategy(fetch) + Expect(strategy.Partition.StrVal).Should(Equal("50%")) + + release.Spec.ReleasePlan.BatchPartition = nil + err = controller.Finalize(release) + Expect(err).NotTo(HaveOccurred()) + fetch = &apps.Deployment{} + Expect(cli.Get(context.TODO(), deploymentKey, fetch)).NotTo(HaveOccurred()) + Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal("")) + Expect(fetch.Annotations[v1alpha1.DeploymentStrategyAnnotation]).Should(Equal("")) + Expect(fetch.Annotations[v1alpha1.DeploymentExtraStatusAnnotation]).Should(Equal("")) + Expect(fetch.Spec.Paused).Should(BeFalse()) + Expect(fetch.Spec.Strategy.Type).Should(Equal(apps.RollingUpdateDeploymentStrategyType)) + + workloadInfo := controller.GetWorkloadInfo() + Expect(workloadInfo).ShouldNot(BeNil()) + checkWorkloadInfo(workloadInfo, clone) +} + +func checkWorkloadInfo(stableInfo *util.WorkloadInfo, clone *apps.Deployment) { + Expect(stableInfo.Replicas).Should(Equal(*clone.Spec.Replicas)) + Expect(stableInfo.Status.Replicas).Should(Equal(clone.Status.Replicas)) + Expect(stableInfo.Status.ReadyReplicas).Should(Equal(clone.Status.ReadyReplicas)) + Expect(stableInfo.Status.UpdatedReplicas).Should(Equal(clone.Status.UpdatedReplicas)) + Expect(stableInfo.Status.AvailableReplicas).Should(Equal(clone.Status.AvailableReplicas)) + Expect(stableInfo.Status.ObservedGeneration).Should(Equal(clone.Status.ObservedGeneration)) +} + +func getControlInfo(release *v1alpha1.BatchRelease) string { + owner, _ := json.Marshal(metav1.NewControllerRef(release, release.GetObjectKind().GroupVersionKind())) + return string(owner) +} diff --git a/pkg/controller/batchrelease/control/partitionstyle/interface.go b/pkg/controller/batchrelease/control/partitionstyle/interface.go index 4fb27f36..99fbacf0 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/interface.go +++ b/pkg/controller/batchrelease/control/partitionstyle/interface.go @@ -25,10 +25,10 @@ import ( type Interface interface { // BuildController will get workload object and parse workload info, - // and return a controller for workload + // and return a initialized controller for workload. BuildController() (Interface, error) - // GetInfo return workload information - GetInfo() *util.WorkloadInfo + // GetWorkloadInfo return workload information. + GetWorkloadInfo() *util.WorkloadInfo // ListOwnedPods fetch the pods owned by the workload. // Note that we should list pod only if we really need it. ListOwnedPods() ([]*corev1.Pod, error) @@ -36,7 +36,7 @@ type Interface interface { // according to release plan and current status of workload. CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) - // Initialize do something before rolling out, for example + // Initialize do something before rolling out, for example: // - claim the workload is under our control; // - other things related with specific type of workload, such as 100% partition settings. Initialize(release *v1alpha1.BatchRelease) error diff --git a/pkg/controller/batchrelease/control/partitionstyle/statefulset/control.go b/pkg/controller/batchrelease/control/partitionstyle/statefulset/control.go index 99e37941..fa8e3219 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/statefulset/control.go +++ b/pkg/controller/batchrelease/control/partitionstyle/statefulset/control.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -52,7 +51,7 @@ func NewController(cli client.Client, key types.NamespacedName, gvk schema.Group } } -func (rc *realController) GetInfo() *util.WorkloadInfo { +func (rc *realController) GetWorkloadInfo() *util.WorkloadInfo { return rc.WorkloadInfo } @@ -109,12 +108,7 @@ func (rc *realController) Initialize(release *v1alpha1.BatchRelease) error { body := fmt.Sprintf(`{%s,%s}`, metaBody, specBody) clone := util.GetEmptyObjectWithKey(rc.object) - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - - klog.Infof("Successfully initialize StatefulSet %v", klog.KObj(clone)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { @@ -129,12 +123,7 @@ func (rc *realController) UpgradeBatch(ctx *batchcontext.BatchContext) error { body := fmt.Sprintf(`{"spec":{"updateStrategy":{"rollingUpdate":{"partition":%d}}}}`, desired) clone := rc.object.DeepCopyObject().(client.Object) - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - - klog.Infof("Successfully patch partition from %d to %d for StatefulSet %v", current, desired, klog.KObj(clone)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { @@ -151,12 +140,7 @@ func (rc *realController) Finalize(release *v1alpha1.BatchRelease) error { body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}%s}`, util.BatchReleaseControlAnnotation, specBody) clone := util.GetEmptyObjectWithKey(rc.object) - if err := rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))); err != nil { - return err - } - - klog.Infof("Successfully finalize StatefulSet %v", klog.KObj(clone)) - return nil + return rc.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(body))) } func (rc *realController) CalculateBatchContext(release *v1alpha1.BatchRelease) (*batchcontext.BatchContext, error) { diff --git a/pkg/controller/batchrelease/control/partitionstyle/statefulset/control_test.go b/pkg/controller/batchrelease/control/partitionstyle/statefulset/control_test.go index a21aea02..59451c52 100644 --- a/pkg/controller/batchrelease/control/partitionstyle/statefulset/control_test.go +++ b/pkg/controller/batchrelease/control/partitionstyle/statefulset/control_test.go @@ -612,7 +612,7 @@ func TestRealController(t *testing.T) { Expect(cli.Get(context.TODO(), stsKey, fetch)).NotTo(HaveOccurred()) Expect(fetch.Annotations[util.BatchReleaseControlAnnotation]).Should(Equal("")) - stableInfo := controller.GetInfo() + stableInfo := controller.GetWorkloadInfo() Expect(stableInfo).ShouldNot(BeNil()) checkWorkloadInfo(stableInfo, sts) } diff --git a/pkg/controller/batchrelease/control/util.go b/pkg/controller/batchrelease/control/util.go index c1a4a698..a6484ebd 100644 --- a/pkg/controller/batchrelease/control/util.go +++ b/pkg/controller/batchrelease/control/util.go @@ -101,6 +101,14 @@ func GenerateNotFoundError(name, resource string) error { return errors.NewNotFound(schema.GroupResource{Group: "apps", Resource: resource}, name) } +// ShouldWaitResume return true if FinalizingPolicy is "waitResume". func ShouldWaitResume(release *v1alpha1.BatchRelease) bool { return release.Spec.ReleasePlan.FinalizingPolicy == v1alpha1.WaitResumeFinalizingPolicyType } + +// IsCurrentMoreThanOrEqualToDesired return true if current >= desired +func IsCurrentMoreThanOrEqualToDesired(current, desired intstr.IntOrString) bool { + currentNum, _ := intstr.GetScaledValueFromIntOrPercent(¤t, 10000000, true) + desiredNum, _ := intstr.GetScaledValueFromIntOrPercent(&desired, 10000000, true) + return currentNum >= desiredNum +} diff --git a/pkg/controller/batchrelease/control/util_test.go b/pkg/controller/batchrelease/control/util_test.go index fa11b737..baef4387 100644 --- a/pkg/controller/batchrelease/control/util_test.go +++ b/pkg/controller/batchrelease/control/util_test.go @@ -167,3 +167,50 @@ func TestIsControlledByBatchRelease(t *testing.T) { }) } } + +func TestIsCurrentMoreThanOrEqualToDesired(t *testing.T) { + RegisterFailHandler(Fail) + + cases := map[string]struct { + current intstr.IntOrString + desired intstr.IntOrString + result bool + }{ + "current=2,desired=1": { + current: intstr.FromInt(2), + desired: intstr.FromInt(1), + result: true, + }, + "current=2,desired=2": { + current: intstr.FromInt(2), + desired: intstr.FromInt(2), + result: true, + }, + "current=2,desired=3": { + current: intstr.FromInt(2), + desired: intstr.FromInt(3), + result: false, + }, + "current=80%,desired=79%": { + current: intstr.FromString("80%"), + desired: intstr.FromString("79%"), + result: true, + }, + "current=80%,desired=80%": { + current: intstr.FromString("80%"), + desired: intstr.FromString("80%"), + result: true, + }, + "current=90%,desired=91%": { + current: intstr.FromString("90%"), + desired: intstr.FromString("91%"), + result: false, + }, + } + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + got := IsCurrentMoreThanOrEqualToDesired(cs.current, cs.desired) + Expect(got == cs.result).Should(BeTrue()) + }) + } +} diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 53ec156a..3519f577 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -1,6 +1,5 @@ /* Copyright 2019 The Kruise Authors. -Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,9 +23,11 @@ import ( "reflect" "time" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -47,8 +48,11 @@ import ( rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" "github.com/openkruise/rollouts/pkg/feature" + "github.com/openkruise/rollouts/pkg/util" clientutil "github.com/openkruise/rollouts/pkg/util/client" utilfeature "github.com/openkruise/rollouts/pkg/util/feature" + "github.com/openkruise/rollouts/pkg/util/patch" + "github.com/openkruise/rollouts/pkg/webhook/util/configuration" ) func init() { @@ -129,8 +133,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + // Watch for changes to ReplicaSet if err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, OwnerType: &appsv1.ReplicaSet{}}, predicate.Funcs{}); err != nil { + IsController: true, OwnerType: &appsv1.Deployment{}}, predicate.Funcs{}); err != nil { + return err + } + + // Watch for changes to MutatingWebhookConfigurations of kruise-rollout operator + if err = c.Watch(&source.Kind{Type: &admissionregistrationv1.MutatingWebhookConfiguration{}}, &MutatingWebhookEventHandler{mgr.GetCache()}); err != nil { return err } @@ -178,6 +188,16 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req return reconcile.Result{}, nil } + // If MutatingWebhookConfiguration is deleted, the Deployment may be set paused=false, + // which will increase the risk of release. To prevent such a risk, in such a case, we + // will update the Deployment strategy type field to RollingUpdate. + invalid, err := r.mutatingProtectionInvalid(deployment) + if err != nil { + return reconcile.Result{}, err + } else if invalid { + return reconcile.Result{}, nil + } + errList := field.ErrorList{} err = dc.syncDeployment(context.Background(), deployment) if err != nil { @@ -187,13 +207,41 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req if err != nil { errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err)) } + if len(errList) > 0 { + return ctrl.Result{}, errList.ToAggregate() + } err = deploymentutil.DeploymentRolloutSatisfied(deployment, dc.strategy.Partition) if err != nil { klog.V(3).Infof("Deployment %v is still rolling: %v", klog.KObj(deployment), err) - return reconcile.Result{RequeueAfter: DefaultRetryDuration}, errList.ToAggregate() + return reconcile.Result{RequeueAfter: DefaultRetryDuration}, nil } + return reconcile.Result{}, nil +} - return ctrl.Result{}, errList.ToAggregate() +// mutatingProtectionInvalid check if mutating webhook configuration not exists, if not exists, +// we should update deployment strategy type tpo 'RollingUpdate' to avoid release risk. +func (r *ReconcileDeployment) mutatingProtectionInvalid(deployment *appsv1.Deployment) (bool, error) { + configKey := types.NamespacedName{Name: configuration.MutatingWebhookConfigurationName} + mutatingWebhookConfiguration := &admissionregistrationv1.MutatingWebhookConfiguration{} + err := r.Get(context.TODO(), configKey, mutatingWebhookConfiguration) + klog.Warningf("gotten mutating webhook configuration: %v", mutatingWebhookConfiguration.GetName()) + if client.IgnoreNotFound(err) != nil { + return false, err + } + klog.Warningf("gotten mutating webhook configuration: %v", mutatingWebhookConfiguration.GetName()) + if errors.IsNotFound(err) || !mutatingWebhookConfiguration.DeletionTimestamp.IsZero() { + klog.Warningf("gotten mutating webhook configuration: %v", mutatingWebhookConfiguration.GetName()) + if deployment.Spec.Strategy.Type == appsv1.RollingUpdateDeploymentStrategyType { + return false, nil + } + strategy := util.GetDeploymentStrategy(deployment) + d := deployment.DeepCopy() + patchData := patch.NewDeploymentPatch() + patchData.UpdateStrategy(appsv1.DeploymentStrategy{Type: appsv1.RollingUpdateDeploymentStrategyType, RollingUpdate: strategy.RollingUpdate}) + klog.Warningf("Kruise-Rollout mutating webhook configuration is deleted, update Deployment %v strategy to 'RollingUpdate'", klog.KObj(deployment)) + return true, r.Patch(context.TODO(), d, patchData) + } + return false, nil } type controllerFactory DeploymentController diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 97242419..6150942e 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -1,4 +1,5 @@ /* +Copyright 2022 The Kruise Authors. Copyright 2015 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 2a8b0319..4a36a34e 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package deployment import ( diff --git a/pkg/controller/deployment/deployment_event_handler.go b/pkg/controller/deployment/deployment_event_handler.go new file mode 100644 index 00000000..0a6019f0 --- /dev/null +++ b/pkg/controller/deployment/deployment_event_handler.go @@ -0,0 +1,71 @@ +package deployment + +import ( + "context" + + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/openkruise/rollouts/api/v1alpha1" + "github.com/openkruise/rollouts/pkg/webhook/util/configuration" +) + +type MutatingWebhookEventHandler struct { + client.Reader +} + +func (m MutatingWebhookEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + config, ok := evt.Object.(*admissionregistrationv1.MutatingWebhookConfiguration) + if !ok || config == nil || !isKruiseRolloutMutatingConfiguration(config) || config.DeletionTimestamp.IsZero() { + return + } + m.enqueue(q) +} + +func (m MutatingWebhookEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + config, ok := evt.Object.(*admissionregistrationv1.MutatingWebhookConfiguration) + if !ok || config == nil || !isKruiseRolloutMutatingConfiguration(config) || config.DeletionTimestamp.IsZero() { + return + } + m.enqueue(q) +} + +func (m MutatingWebhookEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + config, ok := evt.ObjectNew.(*admissionregistrationv1.MutatingWebhookConfiguration) + if !ok || config == nil || !isKruiseRolloutMutatingConfiguration(config) || config.DeletionTimestamp.IsZero() { + return + } + m.enqueue(q) +} + +func (m MutatingWebhookEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + config, ok := evt.Object.(*admissionregistrationv1.MutatingWebhookConfiguration) + if !ok || config == nil || !isKruiseRolloutMutatingConfiguration(config) { + return + } + m.enqueue(q) +} + +func (m MutatingWebhookEventHandler) enqueue(q workqueue.RateLimitingInterface) { + deploymentLister := appsv1.DeploymentList{} + err := m.List(context.TODO(), &deploymentLister, client.MatchingLabels(map[string]string{v1alpha1.AdvancedDeploymentControlLabel: "true"})) + if err != nil { + klog.Errorf("Failed to list deployment, error: %v", err) + } + for index := range deploymentLister.Items { + if deploymentLister.Items[index].Spec.Strategy.Type == appsv1.RollingUpdateDeploymentStrategyType { + continue + } + klog.Warningf("enqueue deployment %v", client.ObjectKeyFromObject(&deploymentLister.Items[index])) + q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&deploymentLister.Items[index])}) + } +} + +func isKruiseRolloutMutatingConfiguration(object client.Object) bool { + return object.GetName() == configuration.MutatingWebhookConfigurationName +} diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index a62e3807..d8dc2e56 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -1,4 +1,5 @@ /* +Copyright 2022 The Kruise Authors. Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index d6e16ac6..bc5cb73b 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -1,4 +1,5 @@ /* +Copyright 2022 The Kruise Authors. Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 621fb007..b6f3c7fb 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package deployment import ( diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 314b9af4..9b821e96 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -1,4 +1,5 @@ /* +Copyright 2022 The Kruise Authors. Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/pkg/controller/rollout/rollout_canary.go b/pkg/controller/rollout/rollout_canary.go index a02b4770..3ab2b960 100644 --- a/pkg/controller/rollout/rollout_canary.go +++ b/pkg/controller/rollout/rollout_canary.go @@ -362,11 +362,15 @@ func createBatchRelease(rollout *v1alpha1.Rollout, rolloutID string, batch int32 }, }, } + annotations := map[string]string{} if isRollback { - if br.Annotations == nil { - br.Annotations = map[string]string{} - } - br.Annotations[v1alpha1.RollbackInBatchAnnotation] = "true" + annotations[v1alpha1.RollbackInBatchAnnotation] = rollout.Annotations[v1alpha1.RollbackInBatchAnnotation] + } + if style, ok := rollout.Annotations[v1alpha1.RolloutStyleAnnotation]; ok { + annotations[v1alpha1.RolloutStyleAnnotation] = style + } + if len(annotations) > 0 { + br.Annotations = annotations } return br } diff --git a/pkg/controller/rollout/rollout_canary_test.go b/pkg/controller/rollout/rollout_canary_test.go index 9ef53c1e..3ad2d658 100644 --- a/pkg/controller/rollout/rollout_canary_test.go +++ b/pkg/controller/rollout/rollout_canary_test.go @@ -233,7 +233,7 @@ func TestRunCanary(t *testing.T) { trafficRoutingManager: r.trafficRoutingManager, recorder: r.Recorder, } - workload, _ := r.finder.GetWorkloadForRef("", rollout.Spec.ObjectRef.WorkloadRef) + workload, _ := r.finder.GetWorkloadForRef(rollout) c := &util.RolloutContext{ Rollout: rollout, NewStatus: rollout.Status.DeepCopy(), @@ -253,7 +253,8 @@ func TestRunCanary(t *testing.T) { cond := util.GetRolloutCondition(*cStatus, v1alpha1.RolloutConditionProgressing) cond.Message = "" util.SetRolloutCondition(cStatus, *cond) - if !reflect.DeepEqual(cs.expectStatus(), cStatus) { + expectStatus := cs.expectStatus() + if !reflect.DeepEqual(expectStatus, cStatus) { t.Fatalf("expect(%s), but get(%s)", util.DumpJSON(cs.expectStatus()), util.DumpJSON(cStatus)) } }) diff --git a/pkg/controller/rollout/rollout_progressing.go b/pkg/controller/rollout/rollout_progressing.go index c3a90d0b..8f89e902 100644 --- a/pkg/controller/rollout/rollout_progressing.go +++ b/pkg/controller/rollout/rollout_progressing.go @@ -35,7 +35,7 @@ var defaultGracePeriodSeconds int32 = 3 func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *v1alpha1.Rollout, newStatus *v1alpha1.RolloutStatus) (*time.Time, error) { cond := util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionProgressing) klog.Infof("reconcile rollout(%s/%s) progressing action...", rollout.Namespace, rollout.Name) - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return nil, err diff --git a/pkg/controller/rollout/rollout_progressing_test.go b/pkg/controller/rollout/rollout_progressing_test.go index 6237e6d6..57e4db73 100644 --- a/pkg/controller/rollout/rollout_progressing_test.go +++ b/pkg/controller/rollout/rollout_progressing_test.go @@ -815,7 +815,7 @@ func TestReCalculateCanaryStepIndex(t *testing.T) { recorder: reconciler.Recorder, } rollout := cs.getRollout() - workload, err := reconciler.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := reconciler.finder.GetWorkloadForRef(rollout) if err != nil { t.Fatalf(err.Error()) } diff --git a/pkg/controller/rollout/rollout_status.go b/pkg/controller/rollout/rollout_status.go index 5ceaf7d4..bf62ccf1 100644 --- a/pkg/controller/rollout/rollout_status.go +++ b/pkg/controller/rollout/rollout_status.go @@ -53,7 +53,7 @@ func (r *RolloutReconciler) calculateRolloutStatus(rollout *v1alpha1.Rollout) (r newStatus.Phase = v1alpha1.RolloutPhaseInitial } // get ref workload - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return false, nil, err @@ -182,7 +182,7 @@ func (r *RolloutReconciler) reconcileRolloutTerminating(rollout *v1alpha1.Rollou if cond.Reason == v1alpha1.TerminatingReasonCompleted { return nil, nil } - workload, err := r.finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef) + workload, err := r.finder.GetWorkloadForRef(rollout) if err != nil { klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error()) return nil, err diff --git a/pkg/util/constant.go b/pkg/util/constant.go index 579a36bd..23fbd3c1 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -37,6 +37,8 @@ const ( KruiseRolloutFinalizer = "rollouts.kruise.io/rollout" // WorkloadTypeLabel is a label to identify workload type WorkloadTypeLabel = "rollouts.kruise.io/workload-type" + // DeploymentRevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence + DeploymentRevisionAnnotation = "deployment.kubernetes.io/revision" ) // For Pods diff --git a/pkg/util/controller_finder.go b/pkg/util/controller_finder.go index 2b563466..4326e690 100644 --- a/pkg/util/controller_finder.go +++ b/pkg/util/controller_finder.go @@ -83,18 +83,46 @@ func NewControllerFinder(c client.Client) *ControllerFinder { // +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=replicasets/status,verbs=get;update;patch -func (r *ControllerFinder) GetWorkloadForRef(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { - for _, finder := range r.finders() { - scale, err := finder(namespace, ref) - if scale != nil || err != nil { - return scale, err +func (r *ControllerFinder) GetWorkloadForRef(rollout *rolloutv1alpha1.Rollout) (*Workload, error) { + workloadRef := rollout.Spec.ObjectRef.WorkloadRef + if workloadRef == nil { + return nil, nil + } + + switch strings.ToLower(rollout.Annotations[rolloutv1alpha1.RolloutStyleAnnotation]) { + case strings.ToLower(string(rolloutv1alpha1.PartitionRollingStyle)): + for _, finder := range r.partitionStyleFinders() { + workload, err := finder(rollout.Namespace, workloadRef) + if workload != nil || err != nil { + return workload, err + } + } + case strings.ToLower(string(rolloutv1alpha1.CanaryRollingStyle)): + for _, finder := range r.canaryStyleFinders() { + workload, err := finder(rollout.Namespace, workloadRef) + if workload != nil || err != nil { + return workload, err + } + } + default: + for _, finder := range append(r.canaryStyleFinders(), r.partitionStyleFinders()...) { + workload, err := finder(rollout.Namespace, workloadRef) + if workload != nil || err != nil { + return workload, err + } } } + + klog.Errorf("Failed to get workload for rollout %v due to no correct finders", klog.KObj(rollout)) return nil, nil } -func (r *ControllerFinder) finders() []ControllerFinderFunc { - return []ControllerFinderFunc{r.getKruiseCloneSet, r.getDeployment, r.getStatefulSetLikeWorkload} +func (r *ControllerFinder) canaryStyleFinders() []ControllerFinderFunc { + return []ControllerFinderFunc{r.getDeployment} +} + +func (r *ControllerFinder) partitionStyleFinders() []ControllerFinderFunc { + return []ControllerFinderFunc{r.getKruiseCloneSet, r.getAdvancedDeployment, r.getStatefulSetLikeWorkload} } var ( @@ -148,6 +176,59 @@ func (r *ControllerFinder) getKruiseCloneSet(namespace string, ref *rolloutv1alp return workload, nil } +// getPartitionStyleDeployment returns the Advanced Deployment referenced by the provided controllerRef. +func (r *ControllerFinder) getAdvancedDeployment(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { + // This error is irreversible, so there is no need to return error + ok, _ := verifyGroupKind(ref, ControllerKindDep.Kind, []string{ControllerKindDep.Group}) + if !ok { + return nil, nil + } + deployment := &apps.Deployment{} + err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, deployment) + if err != nil { + // when error is NotFound, it is ok here. + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + if deployment.Generation != deployment.Status.ObservedGeneration { + return &Workload{IsStatusConsistent: false}, nil + } + + stableRevision := deployment.Labels[rolloutv1alpha1.DeploymentStableRevisionLabel] + + workload := &Workload{ + RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey, + StableRevision: stableRevision, + CanaryRevision: ComputeHash(&deployment.Spec.Template, nil), + ObjectMeta: deployment.ObjectMeta, + Replicas: *deployment.Spec.Replicas, + IsStatusConsistent: true, + } + + // not in rollout progressing + if _, ok = workload.Annotations[InRolloutProgressingAnnotation]; !ok { + return workload, nil + } + // set pod template hash for canary + rss, err := r.GetReplicaSetsForDeployment(deployment) + if err != nil { + return &Workload{IsStatusConsistent: false}, err + } + newRS, _ := FindCanaryAndStableReplicaSet(rss, deployment) + if newRS != nil { + workload.PodTemplateHash = newRS.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + // in rolling back + if workload.StableRevision != "" && workload.StableRevision == workload.PodTemplateHash { + workload.IsInRollback = true + } + // in rollout progressing + workload.InRolloutProgressing = true + return workload, nil +} + // getDeployment returns the k8s native deployment referenced by the provided controllerRef. func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1.WorkloadRef) (*Workload, error) { // This error is irreversible, so there is no need to return error @@ -188,9 +269,6 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1. } // in rollout progressing workload.InRolloutProgressing = true - // workload is continuous release, indicates rollback(v1 -> v2 -> v1) - // delete auto-generated labels - delete(stableRs.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey) if EqualIgnoreHash(&stableRs.Spec.Template, &stable.Spec.Template) { workload.IsInRollback = true return workload, nil @@ -274,7 +352,7 @@ func (r *ControllerFinder) getLatestCanaryDeployment(stable *apps.Deployment) (* return nil, nil } -func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([]apps.ReplicaSet, error) { +func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([]*apps.ReplicaSet, error) { // List ReplicaSets owned by this Deployment rsList := &apps.ReplicaSetList{} selector, err := metav1.LabelSelectorAsSelector(obj.Spec.Selector) @@ -288,7 +366,7 @@ func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([] return nil, err } - rss := make([]apps.ReplicaSet, 0) + var rss []*apps.ReplicaSet for i := range rsList.Items { rs := rsList.Items[i] if !rs.DeletionTimestamp.IsZero() || (rs.Spec.Replicas != nil && *rs.Spec.Replicas == 0) { @@ -296,7 +374,7 @@ func (r *ControllerFinder) GetReplicaSetsForDeployment(obj *apps.Deployment) ([] } if ref := metav1.GetControllerOf(&rs); ref != nil { if ref.UID == obj.UID { - rss = append(rss, rs) + rss = append(rss, &rs) } } } @@ -315,7 +393,7 @@ func (r *ControllerFinder) getDeploymentStableRs(obj *apps.Deployment) (*apps.Re sort.Slice(rss, func(i, j int) bool { return rss[i].CreationTimestamp.Before(&rss[j].CreationTimestamp) }) - return &rss[0], nil + return rss[0], nil } func verifyGroupKind(ref *rolloutv1alpha1.WorkloadRef, expectedKind string, expectedGroups []string) (bool, error) { diff --git a/pkg/util/patch/patch_utils.go b/pkg/util/patch/patch_utils.go new file mode 100644 index 00000000..42fd8a6b --- /dev/null +++ b/pkg/util/patch/patch_utils.go @@ -0,0 +1,224 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package patch + +import ( + "strings" + + "github.com/openkruise/rollouts/pkg/util" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + NULL_HOLDER = "NULL_HOLDER" + NULL_HOLDER_STR = "\"NULL_HOLDER\"" +) + +type CommonPatch struct { + PatchType types.PatchType `json:"patchType"` + PatchData map[string]interface{} `json:"data"` +} + +// Type implements Patch. +func (s *CommonPatch) Type() types.PatchType { + return s.PatchType +} + +// Data implements Patch. +func (s *CommonPatch) Data(_ client.Object) ([]byte, error) { + return []byte(s.String()), nil +} + +func (s *CommonPatch) String() string { + jsonStr := util.DumpJSON(s.PatchData) + return strings.Replace(jsonStr, NULL_HOLDER_STR, "null", -1) +} + +func NewStrategicPatch() *CommonPatch { + return &CommonPatch{PatchType: types.StrategicMergePatchType, PatchData: make(map[string]interface{})} +} + +func NewMergePatch() *CommonPatch { + return &CommonPatch{PatchType: types.MergePatchType, PatchData: make(map[string]interface{})} +} + +func (s *CommonPatch) AddFinalizer(item string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldList, ok := metadata["finalizers"]; !ok { + metadata["finalizers"] = []string{item} + } else { + metadata["finalizers"] = append(oldList.([]string), item) + } + } + return s +} + +func (s *CommonPatch) RemoveFinalizer(item string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldList, ok := metadata["$deleteFromPrimitiveList/finalizers"]; !ok { + metadata["$deleteFromPrimitiveList/finalizers"] = []string{item} + } else { + metadata["$deleteFromPrimitiveList/finalizers"] = append(oldList.([]string), item) + } + } + return s +} + +func (s *CommonPatch) OverrideFinalizer(items []string) *CommonPatch { + switch s.PatchType { + case types.MergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + metadata["finalizers"] = items + } + return s +} + +func (s *CommonPatch) InsertLabel(key, value string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldMap, ok := metadata["labels"]; !ok { + metadata["labels"] = map[string]string{key: value} + } else { + oldMap.(map[string]string)[key] = value + } + } + return s +} + +func (s *CommonPatch) DeleteLabel(key string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldMap, ok := metadata["labels"]; !ok { + metadata["labels"] = map[string]string{key: NULL_HOLDER} + } else { + oldMap.(map[string]string)[key] = NULL_HOLDER + } + } + return s +} + +func (s *CommonPatch) InsertAnnotation(key, value string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldMap, ok := metadata["annotations"]; !ok { + metadata["annotations"] = map[string]string{key: value} + } else { + oldMap.(map[string]string)[key] = value + } + } + return s +} + +func (s *CommonPatch) DeleteAnnotation(key string) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["metadata"]; !ok { + s.PatchData["metadata"] = make(map[string]interface{}) + } + metadata := s.PatchData["metadata"].(map[string]interface{}) + + if oldMap, ok := metadata["annotations"]; !ok { + metadata["annotations"] = map[string]string{key: NULL_HOLDER} + } else { + oldMap.(map[string]string)[key] = NULL_HOLDER + } + } + return s +} + +func (s *CommonPatch) UpdatePodCondition(condition v1.PodCondition) *CommonPatch { + switch s.PatchType { + case types.StrategicMergePatchType: + if _, ok := s.PatchData["status"]; !ok { + s.PatchData["status"] = make(map[string]interface{}) + } + status := s.PatchData["status"].(map[string]interface{}) + + if oldList, ok := status["conditions"]; !ok { + status["conditions"] = []v1.PodCondition{condition} + } else { + status["conditions"] = append(oldList.([]v1.PodCondition), condition) + } + } + return s +} + +type DeploymentPatch struct { + CommonPatch +} + +func NewDeploymentPatch() *DeploymentPatch { + return &DeploymentPatch{CommonPatch{PatchType: types.StrategicMergePatchType, PatchData: make(map[string]interface{})}} +} + +func (s *DeploymentPatch) UpdateStrategy(strategy apps.DeploymentStrategy) *DeploymentPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["spec"]; !ok { + s.PatchData["spec"] = make(map[string]interface{}) + } + spec := s.PatchData["spec"].(map[string]interface{}) + spec["strategy"] = strategy + } + return s +} + +func (s *DeploymentPatch) UpdatePaused(paused bool) *DeploymentPatch { + switch s.PatchType { + case types.StrategicMergePatchType, types.MergePatchType: + if _, ok := s.PatchData["spec"]; !ok { + s.PatchData["spec"] = make(map[string]interface{}) + } + spec := s.PatchData["spec"].(map[string]interface{}) + spec["paused"] = paused + } + return s +} diff --git a/pkg/util/patch/patch_utils_test.go b/pkg/util/patch/patch_utils_test.go new file mode 100644 index 00000000..6a9b8b36 --- /dev/null +++ b/pkg/util/patch/patch_utils_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package patch + +import ( + "fmt" + "reflect" + "testing" + + "github.com/openkruise/rollouts/pkg/util" + v1 "k8s.io/api/core/v1" +) + +func TestCommonPatch(t *testing.T) { + condition := v1.PodCondition{Type: v1.ContainersReady, Status: v1.ConditionTrue, Message: "just for test"} + patchReq := NewStrategicPatch(). + AddFinalizer("new-finalizer"). + RemoveFinalizer("old-finalizer"). + InsertLabel("new-label", "foo1"). + DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2"). + DeleteAnnotation("old-annotation"). + UpdatePodCondition(condition) + + expectedPatchBody := fmt.Sprintf(`{"metadata":{"$deleteFromPrimitiveList/finalizers":["old-finalizer"],"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["new-finalizer"],"labels":{"new-label":"foo1","old-label":null}},"status":{"conditions":[%s]}}`, util.DumpJSON(condition)) + + if !reflect.DeepEqual(patchReq.String(), expectedPatchBody) { + t.Fatalf("Not equal: \n%s \n%s", expectedPatchBody, patchReq.String()) + } + +} diff --git a/pkg/util/workloads_utils.go b/pkg/util/workloads_utils.go index 21bd8346..f95928d7 100644 --- a/pkg/util/workloads_utils.go +++ b/pkg/util/workloads_utils.go @@ -19,14 +19,18 @@ package util import ( "context" "encoding/binary" + "encoding/json" "fmt" "hash" "hash/fnv" + "sort" + "strconv" "strings" "github.com/davecgh/go-spew/spew" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" + "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/feature" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -372,3 +376,59 @@ func GetEmptyObjectWithKey(object client.Object) client.Object { empty.SetNamespace(object.GetNamespace()) return empty } + +// GetDeploymentStrategy decode the strategy object for advanced deployment +// from the annotation rollouts.kruise.io/deployment-strategy +func GetDeploymentStrategy(deployment *apps.Deployment) v1alpha1.DeploymentStrategy { + strategy := v1alpha1.DeploymentStrategy{} + if deployment == nil { + return strategy + } + strategyStr := deployment.Annotations[v1alpha1.DeploymentStrategyAnnotation] + if strategyStr == "" { + return strategy + } + _ = json.Unmarshal([]byte(strategyStr), &strategy) + return strategy +} + +// GetDeploymentExtraStatus decode the extra-status object for advanced deployment +// from the annotation rollouts.kruise.io/deployment-extra-status +func GetDeploymentExtraStatus(deployment *apps.Deployment) v1alpha1.DeploymentExtraStatus { + extraStatus := v1alpha1.DeploymentExtraStatus{} + if deployment == nil { + return extraStatus + } + extraStatusStr := deployment.Annotations[v1alpha1.DeploymentExtraStatusAnnotation] + if extraStatusStr == "" { + return extraStatus + } + _ = json.Unmarshal([]byte(extraStatusStr), &extraStatus) + return extraStatus +} + +// FindCanaryAndStableReplicaSet find the canary and stable replicaset for the deployment +// - canary replicaset: the template equals to deployment's; +// - stable replicaset: an active replicaset(replicas>0) with the smallest revision. +func FindCanaryAndStableReplicaSet(rss []*apps.ReplicaSet, d *apps.Deployment) (*apps.ReplicaSet, *apps.ReplicaSet) { + // sort replicas set by revision ordinals + sort.Slice(rss, func(i, j int) bool { + revision1, err1 := strconv.Atoi(rss[i].Annotations[DeploymentRevisionAnnotation]) + revision2, err2 := strconv.Atoi(rss[j].Annotations[DeploymentRevisionAnnotation]) + if err1 != nil || err2 != nil || revision1 == revision2 { + return rss[i].CreationTimestamp.Before(&rss[j].CreationTimestamp) + } + return revision1 < revision2 + }) + + var newRS *apps.ReplicaSet + var oldRS *apps.ReplicaSet + for _, rs := range rss { + if EqualIgnoreHash(&rs.Spec.Template, &d.Spec.Template) { + newRS = rs + } else if oldRS == nil && *rs.Spec.Replicas > 0 { + oldRS = rs + } + } + return newRS, oldRS +} diff --git a/pkg/util/workloads_utils_test.go b/pkg/util/workloads_utils_test.go index f1f8ef2c..a9aca549 100644 --- a/pkg/util/workloads_utils_test.go +++ b/pkg/util/workloads_utils_test.go @@ -19,6 +19,7 @@ package util import ( "reflect" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -26,8 +27,10 @@ import ( appsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -221,3 +224,115 @@ func TestGetOwnerWorkload(t *testing.T) { }) } } + +func TestFilterCanaryAndStableReplicaSet(t *testing.T) { + RegisterFailHandler(Fail) + + const notExists = "not-exists" + createTimestamps := []time.Time{ + time.Now().Add(0 * time.Second), + time.Now().Add(1 * time.Second), + time.Now().Add(2 * time.Second), + time.Now().Add(3 * time.Second), + time.Now().Add(4 * time.Second), + time.Now().Add(5 * time.Second), + } + templateFactory := func(order int64) corev1.PodTemplateSpec { + return corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Generation: order}, + } + } + makeRS := func(name, revision string, createTime time.Time, templateOrder int64, replicas int32) *appsv1.ReplicaSet { + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: createTime}, + Annotations: map[string]string{DeploymentRevisionAnnotation: revision}, + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: pointer.Int32(replicas), + Template: templateFactory(templateOrder), + }, + } + } + makeD := func(name, revision string, templateOrder int64) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{DeploymentRevisionAnnotation: revision}, + }, + Spec: appsv1.DeploymentSpec{Template: templateFactory(templateOrder)}, + } + } + + cases := map[string]struct { + parameters func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) + stableName string + canaryName string + }{ + "no canary": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "1", createTimestamps[1], 1, 1), + makeRS("r1", "0", createTimestamps[0], 0, 0), + } + return rss, makeD("d", "0", 2) + }, + stableName: "r0", + canaryName: notExists, + }, + "no stable": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "0", createTimestamps[0], 0, 1), + } + return rss, makeD("d", "0", 0) + }, + stableName: notExists, + canaryName: "r0", + }, + "1 active oldRS": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "2", createTimestamps[0], 0, 1), + makeRS("r1", "3", createTimestamps[1], 1, 1), + makeRS("r1", "1", createTimestamps[3], 3, 0), + makeRS("r1", "0", createTimestamps[4], 4, 0), + } + return rss, makeD("d", "0", 1) + }, + stableName: "r0", + canaryName: "r1", + }, + "many active oldRS": { + parameters: func() ([]*appsv1.ReplicaSet, *appsv1.Deployment) { + rss := []*appsv1.ReplicaSet{ + makeRS("r0", "0", createTimestamps[3], 0, 1), + makeRS("r3", "2", createTimestamps[1], 3, 1), + makeRS("r2", "3", createTimestamps[0], 2, 1), + makeRS("r1", "1", createTimestamps[2], 1, 1), + } + return rss, makeD("d", "4", 3) + }, + stableName: "r0", + canaryName: "r3", + }, + } + + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + rss, d := cs.parameters() + canary, stable := FindCanaryAndStableReplicaSet(rss, d) + if canary != nil { + Expect(canary.Name).Should(Equal(cs.canaryName)) + } else { + Expect(cs.canaryName).Should(Equal(notExists)) + } + if stable != nil { + Expect(stable.Name).Should(Equal(cs.stableName)) + } else { + Expect(cs.stableName).Should(Equal(notExists)) + } + }) + } +} diff --git a/pkg/webhook/rollout/validating/rollout_create_update_handler.go b/pkg/webhook/rollout/validating/rollout_create_update_handler.go index 608b6910..9bbe3d38 100644 --- a/pkg/webhook/rollout/validating/rollout_create_update_handler.go +++ b/pkg/webhook/rollout/validating/rollout_create_update_handler.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "reflect" + "strings" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/util" @@ -99,6 +100,9 @@ func (h *RolloutCreateUpdateHandler) validateRolloutUpdate(oldObj, newObj *appsv if !reflect.DeepEqual(oldObj.Spec.Strategy.Canary.TrafficRoutings, newObj.Spec.Strategy.Canary.TrafficRoutings) { return field.ErrorList{field.Forbidden(field.NewPath("Spec.Strategy.Canary.TrafficRoutings"), "Rollout 'Strategy.Canary.TrafficRoutings' field is immutable")} } + if !strings.EqualFold(oldObj.Annotations[appsv1alpha1.RolloutStyleAnnotation], newObj.Annotations[appsv1alpha1.RolloutStyleAnnotation]) { + return field.ErrorList{field.Forbidden(field.NewPath("Metadata.Annotation"), "Rollout 'Rolling-Style' annotation is immutable")} + } } /*if newObj.Status.CanaryStatus != nil && newObj.Status.CanaryStatus.CurrentStepState == appsv1alpha1.CanaryStepStateReady { @@ -140,10 +144,30 @@ func (h *RolloutCreateUpdateHandler) validateRolloutConflict(rollout *appsv1alph func validateRolloutSpec(rollout *appsv1alpha1.Rollout, fldPath *field.Path) field.ErrorList { errList := validateRolloutSpecObjectRef(&rollout.Spec.ObjectRef, fldPath.Child("ObjectRef")) + errList = append(errList, validateRolloutRollingStyle(rollout, field.NewPath("RollingStyle"))...) errList = append(errList, validateRolloutSpecStrategy(&rollout.Spec.Strategy, fldPath.Child("Strategy"))...) return errList } +func validateRolloutRollingStyle(rollout *appsv1alpha1.Rollout, fldPath *field.Path) field.ErrorList { + switch strings.ToLower(rollout.Annotations[appsv1alpha1.RolloutStyleAnnotation]) { + case "", strings.ToLower(string(appsv1alpha1.CanaryRollingStyle)), strings.ToLower(string(appsv1alpha1.PartitionRollingStyle)): + default: + return field.ErrorList{field.Invalid(fldPath, rollout.Annotations[appsv1alpha1.RolloutStyleAnnotation], + "Rolling style must be 'Canary', 'Partition' or empty")} + } + + workloadRef := rollout.Spec.ObjectRef.WorkloadRef + if workloadRef == nil || workloadRef.Kind == util.ControllerKindDep.Kind { + return nil // Deployment support all rolling styles, no need to validate. + } + if strings.EqualFold(rollout.Annotations[appsv1alpha1.RolloutStyleAnnotation], string(appsv1alpha1.CanaryRollingStyle)) { + return field.ErrorList{field.Invalid(fldPath, rollout.Annotations[appsv1alpha1.RolloutStyleAnnotation], + "Only Deployment support canary rolling style")} + } + return nil +} + func validateRolloutSpecObjectRef(objectRef *appsv1alpha1.ObjectRef, fldPath *field.Path) field.ErrorList { if objectRef.WorkloadRef == nil { return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef is required")} diff --git a/pkg/webhook/rollout/validating/rollout_create_update_handler_test.go b/pkg/webhook/rollout/validating/rollout_create_update_handler_test.go index 027f9ce0..672681e8 100644 --- a/pkg/webhook/rollout/validating/rollout_create_update_handler_test.go +++ b/pkg/webhook/rollout/validating/rollout_create_update_handler_test.go @@ -40,8 +40,9 @@ var ( Kind: "Rollout", }, ObjectMeta: metav1.ObjectMeta{ - Name: "rollout-demo", - Namespace: "namespace-unit-test", + Name: "rollout-demo", + Namespace: "namespace-unit-test", + Annotations: map[string]string{}, }, Spec: appsv1alpha1.RolloutSpec{ ObjectRef: appsv1alpha1.ObjectRef{ @@ -238,6 +239,52 @@ func TestRolloutValidateCreate(t *testing.T) { return []client.Object{object} }, }, + { + Name: "Canary rolling style", + Succeed: true, + GetObject: func() []client.Object { + object := rollout.DeepCopy() + object.Annotations = map[string]string{ + appsv1alpha1.RolloutStyleAnnotation: "Canary", + } + return []client.Object{object} + }, + }, + { + Name: "Partition rolling style", + Succeed: true, + GetObject: func() []client.Object { + object := rollout.DeepCopy() + object.Annotations = map[string]string{ + appsv1alpha1.RolloutStyleAnnotation: "Partition", + } + return []client.Object{object} + }, + }, + { + Name: "Wrong rolling style", + Succeed: false, + GetObject: func() []client.Object { + object := rollout.DeepCopy() + object.Annotations = map[string]string{ + appsv1alpha1.RolloutStyleAnnotation: "Other", + } + return []client.Object{object} + }, + }, + { + Name: "Miss matched rolling style", + Succeed: false, + GetObject: func() []client.Object { + object := rollout.DeepCopy() + object.Annotations = map[string]string{ + appsv1alpha1.RolloutStyleAnnotation: "Canary", + } + object.Spec.ObjectRef.WorkloadRef.APIVersion = "apps.kruise.io/v1alpha1" + object.Spec.ObjectRef.WorkloadRef.Kind = "CloneSet" + return []client.Object{object} + }, + }, //{ // Name: "The last Steps.Weight is not 100", // Succeed: false, @@ -366,6 +413,22 @@ func TestRolloutValidateUpdate(t *testing.T) { return object }, }, + { + Name: "Rollout is progressing, and rolling style changed", + Succeed: false, + GetOldObject: func() client.Object { + object := rollout.DeepCopy() + object.Annotations[appsv1alpha1.RolloutStyleAnnotation] = "Partition" + object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing + return object + }, + GetNewObject: func() client.Object { + object := rollout.DeepCopy() + object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing + object.Annotations[appsv1alpha1.RolloutStyleAnnotation] = "Canary" + return object + }, + }, { Name: "Rollout is terminating, and spec changed", Succeed: false, diff --git a/pkg/webhook/util/configuration/configuration.go b/pkg/webhook/util/configuration/configuration.go index 0ab87cef..3468be5c 100644 --- a/pkg/webhook/util/configuration/configuration.go +++ b/pkg/webhook/util/configuration/configuration.go @@ -32,18 +32,18 @@ import ( ) const ( - mutatingWebhookConfigurationName = "kruise-rollout-mutating-webhook-configuration" - validatingWebhookConfigurationName = "kruise-rollout-validating-webhook-configuration" + MutatingWebhookConfigurationName = "kruise-rollout-mutating-webhook-configuration" + ValidatingWebhookConfigurationName = "kruise-rollout-validating-webhook-configuration" ) func Ensure(kubeClient clientset.Interface, handlers map[string]admission.Handler, caBundle []byte) error { - mutatingConfig, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), mutatingWebhookConfigurationName, metav1.GetOptions{}) + mutatingConfig, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), MutatingWebhookConfigurationName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("not found MutatingWebhookConfiguration %s", mutatingWebhookConfigurationName) + return fmt.Errorf("not found MutatingWebhookConfiguration %s", MutatingWebhookConfigurationName) } - validatingConfig, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.TODO(), validatingWebhookConfigurationName, metav1.GetOptions{}) + validatingConfig, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.TODO(), ValidatingWebhookConfigurationName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("not found ValidatingWebhookConfiguration %s", validatingWebhookConfigurationName) + return fmt.Errorf("not found ValidatingWebhookConfiguration %s", ValidatingWebhookConfigurationName) } oldMutatingConfig := mutatingConfig.DeepCopy() oldValidatingConfig := validatingConfig.DeepCopy() @@ -105,13 +105,13 @@ func Ensure(kubeClient clientset.Interface, handlers map[string]admission.Handle if !reflect.DeepEqual(mutatingConfig, oldMutatingConfig) { if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mutatingConfig, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update %s: %v", mutatingWebhookConfigurationName, err) + return fmt.Errorf("failed to update %s: %v", MutatingWebhookConfigurationName, err) } } if !reflect.DeepEqual(validatingConfig, oldValidatingConfig) { if _, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Update(context.TODO(), validatingConfig, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update %s: %v", validatingWebhookConfigurationName, err) + return fmt.Errorf("failed to update %s: %v", ValidatingWebhookConfigurationName, err) } } diff --git a/pkg/webhook/workload/mutating/workload_update_handler.go b/pkg/webhook/workload/mutating/workload_update_handler.go index 61aa3bca..23908f85 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler.go +++ b/pkg/webhook/workload/mutating/workload_update_handler.go @@ -21,7 +21,7 @@ import ( "encoding/json" "math" "net/http" - "reflect" + "strings" kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" @@ -205,13 +205,39 @@ func (h *WorkloadHandler) handleStatefulSetLikeWorkload(newObj, oldObj *unstruct func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (bool, error) { // in rollout progressing if newObj.Annotations[util.InRolloutProgressingAnnotation] != "" { - if !newObj.Spec.Paused || !reflect.DeepEqual(newObj.Spec.Strategy, oldObj.Spec.Strategy) { + modified := false + if !newObj.Spec.Paused { + modified = true newObj.Spec.Paused = true - newObj.Spec.Strategy = oldObj.Spec.Strategy - klog.Warningf("deployment(%s/%s) is in rollout progressing, and do not modify strategy", newObj.Namespace, newObj.Name) - return true, nil } - return false, nil + strategy := util.GetDeploymentStrategy(newObj) + switch strings.ToLower(string(strategy.RollingStyle)) { + case strings.ToLower(string(appsv1alpha1.PartitionRollingStyle)): + // Make sure it is always Recreate to disable native controller + if newObj.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType { + modified = true + newObj.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType + } + if newObj.Spec.Strategy.RollingUpdate != nil { + modified = true + // Allow to modify RollingUpdate config during rolling + strategy.RollingUpdate = newObj.Spec.Strategy.RollingUpdate + newObj.Spec.Strategy.RollingUpdate = nil + } + if isEffectiveDeploymentRevisionChange(oldObj, newObj) { + modified = true + strategy.Paused = true + } + setDeploymentStrategyAnnotation(strategy, newObj) + default: + // Do not allow to modify strategy as Recreate during rolling + if newObj.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { + modified = true + newObj.Spec.Strategy = oldObj.Spec.Strategy + klog.Warningf("") + } + } + return modified, nil } // indicate whether the workload can enter the rollout process @@ -219,10 +245,7 @@ func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (boo if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 { return false, nil } - if newObj.Annotations[appsv1alpha1.RolloutIDLabel] != "" && - oldObj.Annotations[appsv1alpha1.RolloutIDLabel] == newObj.Annotations[appsv1alpha1.RolloutIDLabel] { - return false, nil - } else if newObj.Annotations[appsv1alpha1.RolloutIDLabel] == "" && util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) { + if !isEffectiveDeploymentRevisionChange(oldObj, newObj) { return false, nil } @@ -232,16 +255,30 @@ func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (boo } else if rollout == nil || rollout.Spec.Strategy.Canary == nil { return false, nil } + rss, err := h.Finder.GetReplicaSetsForDeployment(newObj) + if err != nil || len(rss) == 0 { + klog.Warningf("Cannot find any activate replicaset for deployment %s/%s, no need to rolling", newObj.Namespace, newObj.Name) + return false, nil + } // if traffic routing, workload must only be one version of Pods if len(rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 { - if rss, err := h.Finder.GetReplicaSetsForDeployment(newObj); err != nil { - return false, nil - } else if len(rss) != 1 { + if len(rss) != 1 { klog.Warningf("Because deployment(%s/%s) have multiple versions of Pods, so can not enter rollout progressing", newObj.Namespace, newObj.Name) return false, nil } } + // label the stable version replicaset + _, stableRS := util.FindCanaryAndStableReplicaSet(rss, newObj) + if stableRS == nil { + klog.Warningf("Cannot find any stable replicaset for deployment %s/%s", newObj.Namespace, newObj.Name) + } else { + if newObj.Labels == nil { + newObj.Labels = map[string]string{} + } + newObj.Labels[appsv1alpha1.DeploymentStableRevisionLabel] = stableRS.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + // need set workload paused = true newObj.Spec.Paused = true state := &util.RolloutState{RolloutName: rollout.Name} @@ -332,3 +369,19 @@ func (h *WorkloadHandler) InjectDecoder(d *admission.Decoder) error { h.Decoder = d return nil } + +func isEffectiveDeploymentRevisionChange(oldObj, newObj *apps.Deployment) bool { + if newObj.Annotations[appsv1alpha1.RolloutIDLabel] != "" && + oldObj.Annotations[appsv1alpha1.RolloutIDLabel] == newObj.Annotations[appsv1alpha1.RolloutIDLabel] { + return false + } else if newObj.Annotations[appsv1alpha1.RolloutIDLabel] == "" && + util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) { + return false + } + return true +} + +func setDeploymentStrategyAnnotation(strategy appsv1alpha1.DeploymentStrategy, d *apps.Deployment) { + strategyAnno, _ := json.Marshal(&strategy) + d.Annotations[appsv1alpha1.DeploymentStrategyAnnotation] = string(strategyAnno) +} diff --git a/pkg/webhook/workload/mutating/workload_update_handler_test.go b/pkg/webhook/workload/mutating/workload_update_handler_test.go index 610d1186..26690945 100644 --- a/pkg/webhook/workload/mutating/workload_update_handler_test.go +++ b/pkg/webhook/workload/mutating/workload_update_handler_test.go @@ -105,6 +105,7 @@ var ( "app": "echoserver", }, }, + Replicas: pointer.Int32(5), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -490,6 +491,7 @@ func TestHandlerDeployment(t *testing.T) { } else if !cs.isError && err != nil { t.Fatalf(err.Error()) } + delete(newObj.Labels, appsv1alpha1.DeploymentStableRevisionLabel) if !reflect.DeepEqual(newObj, cs.expectObj()) { by, _ := json.Marshal(newObj) t.Fatalf("handlerDeployment failed, and new(%s)", string(by)) diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index cfa56b91..6e3905fb 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -326,6 +326,40 @@ var _ = SIGDescribe("Rollout", func() { Expect(count).Should(BeNumerically("==", expected)) } + ListReplicaSet := func(d *apps.Deployment) []*apps.ReplicaSet { + var rss []*apps.ReplicaSet + rsLister := &apps.ReplicaSetList{} + selectorOpt, _ := metav1.LabelSelectorAsSelector(d.Spec.Selector) + err := k8sClient.List(context.TODO(), rsLister, &client.ListOptions{LabelSelector: selectorOpt, Namespace: d.Namespace}) + Expect(err).NotTo(HaveOccurred()) + for i := range rsLister.Items { + rs := &rsLister.Items[i] + if !rs.DeletionTimestamp.IsZero() { + continue + } + rss = append(rss, rs) + } + return rss + } + + GetStableRSRevision := func(d *apps.Deployment) string { + rss := ListReplicaSet(d) + _, stable := util.FindCanaryAndStableReplicaSet(rss, d) + if stable != nil { + return stable.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + return "" + } + + GetCanaryRSRevision := func(d *apps.Deployment) string { + rss := ListReplicaSet(d) + canary, _ := util.FindCanaryAndStableReplicaSet(rss, d) + if canary != nil { + return canary.Labels[apps.DefaultDeploymentUniqueLabelKey] + } + return "" + } + BeforeEach(func() { namespace = randomNamespaceName("rollout") ns := v1.Namespace{ @@ -4453,6 +4487,512 @@ var _ = SIGDescribe("Rollout", func() { CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 4) }) }) + + KruiseDescribe("Advanced Deployment canary rollout with Ingress", func() { + It("advanced deployment rolling with traffic case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + // service + service := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred()) + CreateObject(service) + // ingress + ingress := &netv1.Ingress{} + Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred()) + CreateObject(ingress) + // workload + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + By("Update cloneSet env NODE_NAME from(version1) -> to(version2)") + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + By(stableRevision) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.GetDeploymentStrategy(workload) + extraStatus := util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check cloneSet status & paused success") + + // check rollout status + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseProgressing)) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + Expect(rollout.Status.CanaryStatus.CanaryRevision).Should(Equal(util.ComputeHash(&workload.Spec.Template, nil))) + Expect(rollout.Status.CanaryStatus.PodTemplateHash).Should(Equal(GetCanaryRSRevision(workload))) + canaryRevision := rollout.Status.CanaryStatus.PodTemplateHash + Expect(rollout.Status.CanaryStatus.CurrentStepIndex).Should(BeNumerically("==", 1)) + Expect(rollout.Status.CanaryStatus.RolloutHash).Should(Equal(rollout.Annotations[util.RolloutHashAnnotation])) + // check stable, canary service & ingress + // stable service + Expect(GetObject(service.Name, service)).NotTo(HaveOccurred()) + Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal(stableRevision)) + //canary service + cService := &v1.Service{} + Expect(GetObject(service.Name+"-canary", cService)).NotTo(HaveOccurred()) + Expect(cService.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal(canaryRevision)) + // canary ingress + cIngress := &netv1.Ingress{} + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true")) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(fmt.Sprintf("%d", *rollout.Spec.Strategy.Canary.Steps[0].Weight))) + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + // check stable, canary service & ingress + // canary ingress + cIngress = &netv1.Ingress{} + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(fmt.Sprintf("%d", *rollout.Spec.Strategy.Canary.Steps[1].Weight))) + // cloneset + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.GetDeploymentStrategy(workload) + extraStatus = util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + // resume rollout + ResumeRolloutCanary(rollout.Name) + WaitRolloutStatusPhase(rollout.Name, v1alpha1.RolloutPhaseHealthy) + WaitDeploymentAllPodsReady(workload) + By("rollout completed, and check") + + // check service & ingress & deployment + // ingress + Expect(GetObject(ingress.Name, ingress)).NotTo(HaveOccurred()) + cIngress = &netv1.Ingress{} + Expect(GetObject(fmt.Sprintf("%s-canary", ingress.Name), cIngress)).To(HaveOccurred()) + // service + Expect(GetObject(service.Name, service)).NotTo(HaveOccurred()) + Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal("")) + cService = &v1.Service{} + Expect(GetObject(fmt.Sprintf("%s-canary", service.Name), cService)).To(HaveOccurred()) + // cloneset + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 5)) + Expect(workload.Status.AvailableReplicas).Should(BeNumerically("==", 5)) + for _, env := range workload.Spec.Template.Spec.Containers[0].Env { + if env.Name == "NODE_NAME" { + Expect(env.Value).Should(Equal("version2")) + } + } + time.Sleep(time.Second * 3) + + // check progressing succeed + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + cond := util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionProgressing) + Expect(cond.Reason).Should(Equal(v1alpha1.ProgressingReasonCompleted)) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionFalse))) + cond = util.GetRolloutCondition(rollout.Status, v1alpha1.RolloutConditionSucceeded) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionTrue))) + WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation) + //Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(canaryRevision)) + + // scale up replicas 5 -> 6 + workload.Spec.Replicas = utilpointer.Int32(6) + UpdateDeployment(workload) + By("Update cloneSet replicas from(5) -> to(6)") + time.Sleep(time.Second * 2) + + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation) + }) + + It("advanced deployment continuous rolling case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.GetDeploymentStrategy(workload) + extraStatus := util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.GetDeploymentStrategy(workload) + extraStatus = util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + By("update workload env NODE_NAME from(version2) -> to(version3)") + newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version3"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision = workload.Labels[v1alpha1.DeploymentStableRevisionLabel] + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy = util.GetDeploymentStrategy(workload) + extraStatus = util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.GetDeploymentStrategy(workload) + extraStatus = util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + }) + + It("advanced deployment rollback case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update cloneSet env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.GetDeploymentStrategy(workload) + extraStatus := util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("resume rollout, and wait next step(2)") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 3)) + strategy = util.GetDeploymentStrategy(workload) + extraStatus = util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 3)) + Expect(strategy.Paused).Should(BeFalse()) + + By("update workload env NODE_NAME from(version2) -> to(version1)") + newEnvs = mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version1"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + WaitRolloutStatusPhase(rollout.Name, v1alpha1.RolloutPhaseHealthy) + WaitDeploymentAllPodsReady(workload) + }) + + It("advanced deployment delete rollout case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.GetDeploymentStrategy(workload) + extraStatus := util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + By("delete rollout and check deployment") + k8sClient.Delete(context.TODO(), rollout) + WaitRolloutNotFound(rollout.Name) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Spec.Strategy.Type).Should(Equal(apps.RollingUpdateDeploymentStrategyType)) + Expect(workload.Spec.Paused).Should(BeFalse()) + WaitDeploymentAllPodsReady(workload) + }) + + It("advanced deployment scaling case", func() { + By("Creating Rollout...") + rollout := &v1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Annotations = map[string]string{ + v1alpha1.RolloutStyleAnnotation: string(v1alpha1.PartitionRollingStyle), + } + rollout.Spec.Strategy.Canary.TrafficRoutings = nil + rollout.Spec.Strategy.Canary.Steps = []v1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(20), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: v1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: v1alpha1.RolloutPause{Duration: utilpointer.Int32(0)}, + }, + } + rollout.Spec.ObjectRef.WorkloadRef = &v1alpha1.WorkloadRef{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "echoserver", + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1alpha1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + By("update workload env NODE_NAME from(version1) -> to(version2)") + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + stableRevision := GetStableRSRevision(workload) + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.StableRevision).Should(Equal(stableRevision)) + + // check workload status & paused + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", 1)) + strategy := util.GetDeploymentStrategy(workload) + extraStatus := util.GetDeploymentExtraStatus(workload) + Expect(extraStatus.UpdatedReadyReplicas).Should(BeNumerically("==", 1)) + Expect(strategy.Paused).Should(BeFalse()) + By("check workload status & paused success") + + By("scale up workload from 5 to 10, and check") + workload.Spec.Replicas = utilpointer.Int32(10) + UpdateDeployment(workload) + Eventually(func() bool { + object := &v1alpha1.Rollout{} + Expect(GetObject(rollout.Name, object)).NotTo(HaveOccurred()) + return object.Status.CanaryStatus.CanaryReadyReplicas == 2 + }, 5*time.Minute, time.Second).Should(BeTrue()) + + By("scale down workload from 10 to 5, and check") + workload.Spec.Replicas = utilpointer.Int32(5) + UpdateDeployment(workload) + Eventually(func() bool { + object := &v1alpha1.Rollout{} + Expect(GetObject(rollout.Name, object)).NotTo(HaveOccurred()) + return object.Status.CanaryStatus.CanaryReadyReplicas == 1 + }, 5*time.Minute, time.Second).Should(BeTrue()) + + By("rolling deployment to be completed") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 2) + ResumeRolloutCanary(rollout.Name) + WaitDeploymentAllPodsReady(workload) + }) + }) }) func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar {