From 07c51154563542781238693d829116a5fcb63116 Mon Sep 17 00:00:00 2001 From: "mingzhou.swx" Date: Wed, 9 Nov 2022 11:39:57 +0800 Subject: [PATCH] add failure threshold Signed-off-by: mingzhou.swx --- api/v1alpha1/batchrelease_plan_types.go | 6 + api/v1alpha1/rollout_types.go | 9 +- api/v1alpha1/zz_generated.deepcopy.go | 10 ++ .../rollouts.kruise.io_batchreleases.yaml | 10 ++ .../bases/rollouts.kruise.io_rollouts.yaml | 12 ++ .../workloads/cloneset_control_plane.go | 60 ++++--- .../batchrelease/workloads/commons.go | 40 +++++ .../batchrelease/workloads/commons_test.go | 147 ++++++++++++++++++ .../deployment_canary_control_plane.go | 44 +++--- .../workloads/statefulset_like_controller.go | 65 ++------ .../workload_rollout_control_plane.go | 38 +++-- .../batchrelease/inner_batchrelease.go | 7 +- pkg/util/pod_utils.go | 11 ++ test/e2e/rollout_test.go | 80 +++++++++- 14 files changed, 421 insertions(+), 118 deletions(-) diff --git a/api/v1alpha1/batchrelease_plan_types.go b/api/v1alpha1/batchrelease_plan_types.go index 2c917b1d..f2066947 100644 --- a/api/v1alpha1/batchrelease_plan_types.go +++ b/api/v1alpha1/batchrelease_plan_types.go @@ -44,6 +44,12 @@ type ReleasePlan struct { BatchPartition *int32 `json:"batchPartition,omitempty"` // RolloutID indicates an id for each rollout progress RolloutID string `json:"rolloutID,omitempty"` + // FailureThreshold indicates how many failed pods can be tolerated in all upgraded pods. + // Only when FailureThreshold are satisfied, Rollout can enter ready state. + // If FailureThreshold is nil, Rollout will use the MaxUnavailable of workload as its + // FailureThreshold. + // Defaults to nil. + FailureThreshold *intstr.IntOrString `json:"failureThreshold,omitempty"` } // ReleaseBatch is used to describe how each batch release should be diff --git a/api/v1alpha1/rollout_types.go b/api/v1alpha1/rollout_types.go index d9baafcd..64bfe44b 100644 --- a/api/v1alpha1/rollout_types.go +++ b/api/v1alpha1/rollout_types.go @@ -96,13 +96,18 @@ type CanaryStrategy struct { // TrafficRoutings hosts all the supported service meshes supported to enable more fine-grained traffic routing // todo current only support one TrafficRouting TrafficRoutings []*TrafficRouting `json:"trafficRoutings,omitempty"` + // FailureThreshold indicates how many failed pods can be tolerated in all upgraded pods. + // Only when FailureThreshold are satisfied, Rollout can enter ready state. + // If FailureThreshold is nil, Rollout will use the MaxUnavailable of workload as its + // FailureThreshold. + // Defaults to nil. + FailureThreshold *intstr.IntOrString `json:"failureThreshold,omitempty"` // MetricsAnalysis *MetricsAnalysisBackground `json:"metricsAnalysis,omitempty"` } // CanaryStep defines a step of a canary workload. type CanaryStep struct { - // SetWeight sets what percentage of the canary pods should receive - + // Weight indicate how many percentage of traffic the canary pods should receive // +optional Weight *int32 `json:"weight,omitempty"` // Replicas is the number of expected canary pods in this batch diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index ff60b066..b648c5ec 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -240,6 +240,11 @@ func (in *CanaryStrategy) DeepCopyInto(out *CanaryStrategy) { } } } + if in.FailureThreshold != nil { + in, out := &in.FailureThreshold, &out.FailureThreshold + *out = new(intstr.IntOrString) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CanaryStrategy. @@ -399,6 +404,11 @@ func (in *ReleasePlan) DeepCopyInto(out *ReleasePlan) { *out = new(int32) **out = **in } + if in.FailureThreshold != nil { + in, out := &in.FailureThreshold, &out.FailureThreshold + *out = new(intstr.IntOrString) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReleasePlan. diff --git a/config/crd/bases/rollouts.kruise.io_batchreleases.yaml b/config/crd/bases/rollouts.kruise.io_batchreleases.yaml index 00de1ae1..610a241a 100644 --- a/config/crd/bases/rollouts.kruise.io_batchreleases.yaml +++ b/config/crd/bases/rollouts.kruise.io_batchreleases.yaml @@ -92,6 +92,16 @@ spec: - canaryReplicas type: object type: array + failureThreshold: + anyOf: + - type: integer + - type: string + description: FailureThreshold indicates how many failed pods can + be tolerated in all upgraded pods. Only when FailureThreshold + are satisfied, Rollout can enter ready state. If FailureThreshold + is nil, Rollout will use the MaxUnavailable of workload as its + FailureThreshold. Defaults to nil. + x-kubernetes-int-or-string: true rolloutID: description: RolloutID indicates an id for each rollout progress type: string diff --git a/config/crd/bases/rollouts.kruise.io_rollouts.yaml b/config/crd/bases/rollouts.kruise.io_rollouts.yaml index c7626aa9..80f98b12 100644 --- a/config/crd/bases/rollouts.kruise.io_rollouts.yaml +++ b/config/crd/bases/rollouts.kruise.io_rollouts.yaml @@ -92,6 +92,16 @@ spec: description: CanaryStrategy defines parameters for a Replica Based Canary properties: + failureThreshold: + anyOf: + - type: integer + - type: string + description: FailureThreshold indicates how many failed pods + can be tolerated in all upgraded pods. Only when FailureThreshold + are satisfied, Rollout can enter ready state. If FailureThreshold + is nil, Rollout will use the MaxUnavailable of workload + as its FailureThreshold. Defaults to nil. + x-kubernetes-int-or-string: true steps: description: Steps define the order of phases to execute release in batches(20%, 40%, 60%, 80%, 100%) @@ -117,6 +127,8 @@ spec: 5) or a percentage of total pods.' x-kubernetes-int-or-string: true weight: + description: Weight indicate how many percentage of + traffic the canary pods should receive format: int32 type: integer type: object diff --git a/pkg/controller/batchrelease/workloads/cloneset_control_plane.go b/pkg/controller/batchrelease/workloads/cloneset_control_plane.go index 84d1f0d5..77399d6d 100644 --- a/pkg/controller/batchrelease/workloads/cloneset_control_plane.go +++ b/pkg/controller/batchrelease/workloads/cloneset_control_plane.go @@ -158,10 +158,14 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) { return false, nil } - pods, err := util.ListOwnedPods(c.client, c.clone) - if err != nil { - klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName) - return false, err + var err error + var pods []*v1.Pod + if c.release.Spec.ReleasePlan.RolloutID != "" { + pods, err = util.ListOwnedPods(c.client, c.clone) + if err != nil { + klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName) + return false, err + } } var noNeedRollbackReplicas int32 @@ -228,9 +232,23 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) { return false, nil } + rolloutID := c.release.Spec.ReleasePlan.RolloutID + + var err error + var pods []*v1.Pod + // if rolloutID is not set, no need to list pods, + // because we cannot patch correct batch label to pod. + if rolloutID != "" { + pods, err = util.ListOwnedPods(c.client, c.clone) + if err != nil { + return false, err + } + } + var noNeedRollbackReplicas int32 if c.newStatus.CanaryStatus.NoNeedUpdateReplicas != nil { - noNeedRollbackReplicas = *c.newStatus.CanaryStatus.NoNeedUpdateReplicas + noNeedRollbackReplicas = countNoNeedRollbackReplicas(pods, c.newStatus.UpdateRevision, c.release.Spec.ReleasePlan.RolloutID) + c.newStatus.CanaryStatus.NoNeedUpdateReplicas = pointer.Int32(noNeedRollbackReplicas) } replicas := *c.clone.Spec.Replicas @@ -241,19 +259,16 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) { // current batch id currentBatch := c.newStatus.CanaryStatus.CurrentBatch + // the number of canary pods should have in current batch in plan + plannedUpdatedReplicas := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas) // the number of pods will be partitioned by cloneSet partitionedStableReplicas, _ := intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.Partition, int(replicas), true) // the number of canary pods that consider rollback context and other real-world situations - expectedBatchCanaryReplicas := c.calculateCurrentCanary(replicas - noNeedRollbackReplicas) + expectedUpdatedReplicas := c.calculateCurrentCanary(replicas - noNeedRollbackReplicas) // the number of stable pods that consider rollback context and other real-world situations - expectedBatchStableReplicas := replicas - noNeedRollbackReplicas - expectedBatchCanaryReplicas + expectedStableReplicas := replicas - noNeedRollbackReplicas - expectedUpdatedReplicas // the number of canary pods that cloneSet will be upgraded - realNeedUpgradeCanaryReplicas := CalculateRealCanaryReplicasGoal(expectedBatchStableReplicas, replicas, &c.release.Spec.ReleasePlan.Batches[currentBatch].CanaryReplicas) - - var maxUnavailableReplicas int - if c.clone.Spec.UpdateStrategy.MaxUnavailable != nil { - maxUnavailableReplicas, _ = intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.MaxUnavailable, int(realNeedUpgradeCanaryReplicas), true) - } + realDesiredUpdatedReplicas := CalculateRealCanaryReplicasGoal(expectedStableReplicas, replicas, &c.release.Spec.ReleasePlan.Batches[currentBatch].CanaryReplicas) klog.V(3).InfoS("check one batch, current info:", "BatchRelease", c.releasePlanKey, @@ -261,21 +276,18 @@ func (c *CloneSetRolloutController) CheckOneBatchReady() (bool, error) { "replicas", replicas, "updatedReplicas", updatedReplicas, "noNeedRollbackReplicas", noNeedRollbackReplicas, - "maxUnavailableReplicas", maxUnavailableReplicas, "partitionedStableReplicas", partitionedStableReplicas, - "expectedBatchCanaryReplicas", expectedBatchCanaryReplicas, - "expectedBatchStableReplicas", expectedBatchStableReplicas) + "expectedUpdatedReplicas", expectedUpdatedReplicas, + "realDesiredUpdatedReplicas", realDesiredUpdatedReplicas, + "expectedStableReplicas", expectedStableReplicas) - currentBatchIsReady := updatedReplicas >= realNeedUpgradeCanaryReplicas && // 1.the number of upgrade pods achieved the goal - updatedReadyReplicas+int32(maxUnavailableReplicas) >= realNeedUpgradeCanaryReplicas && // 2.the number of upgraded available pods achieved the goal - (realNeedUpgradeCanaryReplicas == 0 || updatedReadyReplicas >= 1) // 3.make sure that at least one upgrade pod is available - - if !currentBatchIsReady { - klog.InfoS("the batch is not ready yet", "BatchRelease", c.releasePlanKey, "current-batch", c.newStatus.CanaryStatus.CurrentBatch) + if !isBatchReady(c.release, pods, c.clone.Spec.UpdateStrategy.MaxUnavailable, + plannedUpdatedReplicas, realDesiredUpdatedReplicas, updatedReplicas, updatedReadyReplicas) { + klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%d", klog.KObj(c.release), currentBatch) return false, nil } - c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.newStatus.CanaryStatus.CurrentBatch) + klog.Infof("BatchRelease(%v) batch is ready, current batch=%d", klog.KObj(c.release), currentBatch) return true, nil } @@ -380,7 +392,7 @@ func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() { func (c *CloneSetRolloutController) patchPodBatchLabel(pods []*v1.Pod, plannedBatchCanaryReplicas, expectedBatchStableReplicas int32) (bool, error) { rolloutID := c.release.Spec.ReleasePlan.RolloutID - if rolloutID == "" { + if rolloutID == "" || len(pods) == 0 { return true, nil } diff --git a/pkg/controller/batchrelease/workloads/commons.go b/pkg/controller/batchrelease/workloads/commons.go index 990ca8a3..4356cdfb 100644 --- a/pkg/controller/batchrelease/workloads/commons.go +++ b/pkg/controller/batchrelease/workloads/commons.go @@ -243,3 +243,43 @@ func getPodOrdinal(pod *corev1.Pod) int { ord, _ := strconv.Atoi(pod.Name[strings.LastIndex(pod.Name, "-")+1:]) return ord } + +func failureThreshold(threshold, maxUnavailable *intstr.IntOrString, replicas int32) int32 { + globalThreshold := 0 + if threshold != nil { + globalThreshold, _ = intstr.GetScaledValueFromIntOrPercent(threshold, int(replicas), true) + } else if maxUnavailable != nil { + globalThreshold, _ = intstr.GetScaledValueFromIntOrPercent(maxUnavailable, int(replicas), true) + } + return int32(integer.IntMax(0, globalThreshold)) +} + +func isBatchReady(release *v1alpha1.BatchRelease, pods []*corev1.Pod, maxUnavailable *intstr.IntOrString, labelDesired, desired, updated, updatedReady int32) bool { + updateRevision := release.Status.UpdateRevision + if updatedReady <= 0 { // Some workloads, such as StatefulSet, may not have such field + updatedReady = int32(util.WrappedPodCount(pods, func(pod *corev1.Pod) bool { + return pod.DeletionTimestamp.IsZero() && util.IsConsistentWithRevision(pod, updateRevision) && util.IsPodReady(pod) + })) + } + + rolloutID := release.Spec.ReleasePlan.RolloutID + threshold := failureThreshold(release.Spec.ReleasePlan.FailureThreshold, maxUnavailable, updated) + podReady := updated >= desired && updatedReady+threshold >= desired && (desired == 0 || updatedReady > 0) + return podReady && isPodBatchLabelSatisfied(pods, rolloutID, labelDesired) +} + +func isPodBatchLabelSatisfied(pods []*corev1.Pod, rolloutID string, targetCount int32) bool { + if len(rolloutID) == 0 || len(pods) == 0 { + return true + } + labeledCount := int32(0) + for _, pod := range pods { + if !pod.DeletionTimestamp.IsZero() { + continue + } + if pod.Labels[util.RolloutIDLabel] == rolloutID { + labeledCount++ + } + } + return labeledCount >= targetCount +} diff --git a/pkg/controller/batchrelease/workloads/commons_test.go b/pkg/controller/batchrelease/workloads/commons_test.go index 26d3740a..95882e8b 100644 --- a/pkg/controller/batchrelease/workloads/commons_test.go +++ b/pkg/controller/batchrelease/workloads/commons_test.go @@ -7,10 +7,12 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/util" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) func TestFilterPodsForUnorderedRollback(t *testing.T) { @@ -244,6 +246,141 @@ func TestFilterPodsForOrderedRollback(t *testing.T) { } } +func TestIsBatchReady(t *testing.T) { + RegisterFailHandler(Fail) + + p := func(f intstr.IntOrString) *intstr.IntOrString { + return &f + } + r := func(f *intstr.IntOrString, id, revision string) *v1alpha1.BatchRelease { + return &v1alpha1.BatchRelease{ + Spec: v1alpha1.BatchReleaseSpec{ReleasePlan: v1alpha1.ReleasePlan{RolloutID: id, FailureThreshold: f}}, + Status: v1alpha1.BatchReleaseStatus{UpdateRevision: revision}, + } + } + cases := map[string]struct { + release *v1alpha1.BatchRelease + pods []*corev1.Pod + maxUnavailable *intstr.IntOrString + labelDesired int32 + desired int32 + updated int32 + updatedReady int32 + result bool + }{ + "ready: no-rollout-id, all pod ready": { + release: r(p(intstr.FromInt(1)), "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(1)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 5, + result: true, + }, + "ready: no-rollout-id, tolerated failed pods": { + release: r(p(intstr.FromInt(1)), "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(1)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 4, + result: true, + }, + "false: no-rollout-id, un-tolerated failed pods": { + release: r(p(intstr.FromInt(1)), "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 3, + result: false, + }, + "false: no-rollout-id, tolerated failed pods, but 1 pod isn't updated": { + release: r(p(intstr.FromString("60%")), "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 4, + updatedReady: 4, + result: false, + }, + "false: no-rollout-id, tolerated, but no-pod-ready": { + release: r(p(intstr.FromInt(100)), "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 0, + result: false, + }, + "true: no-rollout-id, tolerated failed pods, failureThreshold=nil": { + release: r(nil, "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(3)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 3, + result: true, + }, + "false: no-rollout-id, un-tolerated failed pods, failureThreshold=nil": { + release: r(nil, "", "v2"), + pods: nil, + maxUnavailable: p(intstr.FromInt(1)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 3, + result: false, + }, + "true: rollout-id, labeled pods satisfied": { + release: r(p(intstr.FromInt(1)), "1", "version-1"), + pods: generatePods(5, 0), + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 5, + result: true, + }, + "false: rollout-id, labeled pods not satisfied": { + release: r(p(intstr.FromInt(1)), "1", "version-1"), + pods: generatePods(3, 0), + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 5, + result: false, + }, + "true: rollout-id, no updated-ready field": { + release: r(p(intstr.FromInt(1)), "1", "version-1"), + pods: generatePods(5, 0), + maxUnavailable: p(intstr.FromInt(5)), + labelDesired: 5, + desired: 5, + updated: 5, + updatedReady: 0, + result: true, + }, + } + + for name, cs := range cases { + t.Run(name, func(t *testing.T) { + got := isBatchReady(cs.release, cs.pods, cs.maxUnavailable, cs.labelDesired, cs.desired, cs.updated, cs.updatedReady) + fmt.Printf("%v %v", got, cs.result) + Expect(got).To(Equal(cs.result)) + fmt.Printf("%v %v", got, cs.result) + + }) + } +} + func TestSortPodsByOrdinal(t *testing.T) { RegisterFailHandler(Fail) @@ -261,9 +398,11 @@ func TestSortPodsByOrdinal(t *testing.T) { func generatePods(updatedReplicas, noNeedRollbackReplicas int) []*corev1.Pod { podsNoNeed := generatePodsWith(map[string]string{ util.NoNeedUpdatePodLabel: "0x1", + util.RolloutIDLabel: "1", apps.ControllerRevisionHashLabelKey: "version-1", }, noNeedRollbackReplicas, 0) return append(generatePodsWith(map[string]string{ + util.RolloutIDLabel: "1", apps.ControllerRevisionHashLabelKey: "version-1", }, updatedReplicas-noNeedRollbackReplicas, noNeedRollbackReplicas), podsNoNeed...) } @@ -276,6 +415,14 @@ func generatePodsWith(labels map[string]string, replicas int, beginOrder int) [] Name: fmt.Sprintf("pod-name-%d", beginOrder+i), Labels: labels, }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, } } return pods diff --git a/pkg/controller/batchrelease/workloads/deployment_canary_control_plane.go b/pkg/controller/batchrelease/workloads/deployment_canary_control_plane.go index 8efa8997..5e1fd99e 100644 --- a/pkg/controller/batchrelease/workloads/deployment_canary_control_plane.go +++ b/pkg/controller/batchrelease/workloads/deployment_canary_control_plane.go @@ -143,37 +143,37 @@ func (c *DeploymentsRolloutController) CheckOneBatchReady() (bool, error) { availableCanaryPodCount := c.canary.Status.AvailableReplicas // canary goal that should have in current batch canaryGoal := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas) - // max unavailable allowed replicas - maxUnavailable := 0 - if c.canary.Spec.Strategy.RollingUpdate != nil && - c.canary.Spec.Strategy.RollingUpdate.MaxUnavailable != nil { - maxUnavailable, _ = intstr.GetScaledValueFromIntOrPercent(c.canary.Spec.Strategy.RollingUpdate.MaxUnavailable, int(*c.canary.Spec.Replicas), true) + // max unavailable of deployment + var maxUnavailable *intstr.IntOrString + if c.canary.Spec.Strategy.RollingUpdate != nil { + maxUnavailable = c.canary.Spec.Strategy.RollingUpdate.MaxUnavailable + } + + var err error + var pods []*v1.Pod + // if rolloutID is not set, no need to list pods, + // because we cannot patch correct batch label to pod. + if c.release.Spec.ReleasePlan.RolloutID != "" { + pods, err = util.ListOwnedPods(c.client, c.canary) + if err != nil { + return false, err + } } klog.InfoS("checking the batch releasing progress", "BatchRelease", c.releaseKey, - "current-batch", c.newStatus.CanaryStatus.CurrentBatch, + "len(pods)", len(pods), "canary-goal", canaryGoal, + "current-batch", c.newStatus.CanaryStatus.CurrentBatch, "canary-available-pod-count", availableCanaryPodCount, - "stable-pod-status-replicas", c.stable.Status.Replicas, - "maxUnavailable", maxUnavailable) - - currentBatchIsNotReadyYet := func() bool { - // the number of upgrade pods does not achieve the goal - return canaryPodCount < canaryGoal || - // the number of upgraded available pods does not achieve the goal - availableCanaryPodCount+int32(maxUnavailable) < canaryGoal || - // make sure that at least one upgrade pod is available - (canaryGoal > 0 && availableCanaryPodCount == 0) - } + "stable-pod-status-replicas", c.stable.Status.Replicas) - // make sure there is at least one pod is available - if currentBatchIsNotReadyYet() { - klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%v", c.releaseKey, c.newStatus.CanaryStatus.CurrentBatch) + if !isBatchReady(c.release, pods, maxUnavailable, canaryGoal, canaryGoal, canaryPodCount, availableCanaryPodCount) { + klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%d", c.releaseKey, c.newStatus.CanaryStatus.CurrentBatch) return false, nil } - c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchReady", "Batch %d is available", c.newStatus.CanaryStatus.CurrentBatch) + klog.Infof("BatchRelease(%v) batch is ready, current batch=%d", c.releaseKey, c.newStatus.CanaryStatus.CurrentBatch) return true, nil } @@ -333,6 +333,8 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) { rolloutID := c.release.Spec.ReleasePlan.RolloutID + // if rolloutID is not set, no need to list pods, + // because we cannot patch correct batch label to pod. if rolloutID == "" || c.canary == nil { return true, nil } diff --git a/pkg/controller/batchrelease/workloads/statefulset_like_controller.go b/pkg/controller/batchrelease/workloads/statefulset_like_controller.go index 220083f6..9f9e12f9 100644 --- a/pkg/controller/batchrelease/workloads/statefulset_like_controller.go +++ b/pkg/controller/batchrelease/workloads/statefulset_like_controller.go @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -36,18 +35,18 @@ import ( type StatefulSetLikeController struct { client.Client recorder record.EventRecorder - planController *appsv1alpha1.BatchRelease + release *appsv1alpha1.BatchRelease namespacedName types.NamespacedName workloadObj client.Object gvk schema.GroupVersionKind pods []*v1.Pod } -func NewStatefulSetLikeController(c client.Client, r record.EventRecorder, p *appsv1alpha1.BatchRelease, n types.NamespacedName, gvk schema.GroupVersionKind) UnifiedWorkloadController { +func NewStatefulSetLikeController(c client.Client, r record.EventRecorder, b *appsv1alpha1.BatchRelease, n types.NamespacedName, gvk schema.GroupVersionKind) UnifiedWorkloadController { return &StatefulSetLikeController{ Client: c, recorder: r, - planController: p, + release: b, namespacedName: n, gvk: gvk, } @@ -76,7 +75,11 @@ func (c *StatefulSetLikeController) GetWorkloadInfo() (*util.WorkloadInfo, error workloadInfo := util.ParseStatefulSetInfo(set, c.namespacedName) workloadInfo.Paused = true if workloadInfo.Status.UpdatedReadyReplicas <= 0 { - updatedReadyReplicas, err := c.countUpdatedReadyPods(workloadInfo.Status.UpdateRevision) + pods, err := c.ListOwnedPods() + if err != nil { + return nil, err + } + updatedReadyReplicas, err := c.countUpdatedReadyPods(pods, workloadInfo.Status.UpdateRevision) if err != nil { return nil, err } @@ -92,7 +95,7 @@ func (c *StatefulSetLikeController) ClaimWorkload() (bool, error) { return false, err } - err = claimWorkload(c.Client, c.planController, set, map[string]interface{}{ + err = claimWorkload(c.Client, c.release, set, map[string]interface{}{ "type": apps.RollingUpdateStatefulSetStrategyType, "rollingUpdate": map[string]interface{}{ "partition": pointer.Int32(util.GetReplicas(set)), @@ -117,7 +120,7 @@ func (c *StatefulSetLikeController) ReleaseWorkload(cleanup bool) (bool, error) err = releaseWorkload(c.Client, set) if err != nil { - c.recorder.Eventf(c.planController, v1.EventTypeWarning, "ReleaseFailed", err.Error()) + c.recorder.Eventf(c.release, v1.EventTypeWarning, "ReleaseFailed", err.Error()) return false, err } @@ -161,48 +164,6 @@ func (c *StatefulSetLikeController) IsOrderedUpdate() (bool, error) { return !util.IsStatefulSetUnorderedUpdate(set), nil } -func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) { - workloadInfo, err := c.GetWorkloadInfo() - if err != nil { - return false, err - } - - // ignore this corner case - if canaryReplicasGoal <= 0 { - return true, nil - } - - // first: make sure that the canary goal is met - firstCheckPointReady := workloadInfo.Status.UpdatedReplicas >= canaryReplicasGoal - if !firstCheckPointReady { - return false, nil - } - - updatedReadyReplicas := int32(0) - // TODO: add updatedReadyReplicas for advanced statefulset - if workloadInfo.Status.UpdatedReadyReplicas > 0 { - updatedReadyReplicas = workloadInfo.Status.UpdatedReadyReplicas - } else { - updatedReadyReplicas, err = c.countUpdatedReadyPods(workloadInfo.Status.UpdateRevision) - if err != nil { - return false, err - } - } - - maxUnavailable := 0 - if workloadInfo.MaxUnavailable != nil { - maxUnavailable, _ = intstr.GetScaledValueFromIntOrPercent(workloadInfo.MaxUnavailable, int(canaryReplicasGoal), true) - } - - secondCheckPointReady := func() bool { - // 1. make sure updatedReadyReplicas has achieved the goal - return updatedReadyReplicas+int32(maxUnavailable) >= canaryReplicasGoal && - // 2. make sure at latest one updated pod is available if canaryReplicasGoal != 0 - (canaryReplicasGoal == 0 || updatedReadyReplicas >= 1) - } - return secondCheckPointReady(), nil -} - func (c *StatefulSetLikeController) ListOwnedPods() ([]*v1.Pod, error) { if c.pods != nil { return c.pods, nil @@ -215,11 +176,7 @@ func (c *StatefulSetLikeController) ListOwnedPods() ([]*v1.Pod, error) { return c.pods, err } -func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string) (int32, error) { - pods, err := c.ListOwnedPods() - if err != nil { - return 0, err - } +func (c *StatefulSetLikeController) countUpdatedReadyPods(pods []*v1.Pod, updateRevision string) (int32, error) { activePods := util.FilterActivePods(pods) updatedReadyReplicas := int32(0) for _, pod := range activePods { diff --git a/pkg/controller/batchrelease/workloads/workload_rollout_control_plane.go b/pkg/controller/batchrelease/workloads/workload_rollout_control_plane.go index d2d50d2a..55d53e5b 100644 --- a/pkg/controller/batchrelease/workloads/workload_rollout_control_plane.go +++ b/pkg/controller/batchrelease/workloads/workload_rollout_control_plane.go @@ -37,7 +37,6 @@ type UnifiedWorkloadController interface { ClaimWorkload() (bool, error) ReleaseWorkload(cleanup bool) (bool, error) UpgradeBatch(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) - IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) ListOwnedPods() ([]*v1.Pod, error) IsOrderedUpdate() (bool, error) } @@ -249,20 +248,27 @@ func (c *UnifiedWorkloadRolloutControlPlane) CheckOneBatchReady() (bool, error) } replicas := c.newStatus.ObservedWorkloadReplicas - currentBatch := c.newStatus.CanaryStatus.CurrentBatch + updatedReplicas := workloadInfo.Status.UpdatedReplicas + updatedReadyReplicas := workloadInfo.Status.UpdatedReadyReplicas + currentBatch := c.newStatus.CanaryStatus.CurrentBatch // the number of canary pods should have in current batch in plan - plannedBatchCanaryReplicas := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas) + plannedUpdatedReplicas := c.calculateCurrentCanary(c.newStatus.ObservedWorkloadReplicas) // the number of canary pods that consider rollback context and other real-world situations - expectedBatchCanaryReplicas := c.calculateCurrentCanary(replicas - noNeedRollbackReplicas) + expectedUpdatedReplicas := c.calculateCurrentCanary(replicas - noNeedRollbackReplicas) // the number of canary pods that consider rollback context and other real-world situations - expectedBatchStableReplicas := replicas - expectedBatchCanaryReplicas + expectedStableReplicas := replicas - expectedUpdatedReplicas + // the number of pods that should be upgraded in this batch + updatedReplicasInBatch := plannedUpdatedReplicas + if currentBatch > 0 { + updatedReplicasInBatch -= int32(calculateNewBatchTarget(&c.release.Spec.ReleasePlan, int(replicas), int(currentBatch-1))) + } // if ordered update, partition is related with pod ordinals // if unordered update, partition just like cloneSet partition orderedUpdate, _ := c.IsOrderedUpdate() if !orderedUpdate { - expectedBatchStableReplicas -= noNeedRollbackReplicas + expectedStableReplicas -= noNeedRollbackReplicas } klog.V(3).InfoS("check one batch, current info:", @@ -270,17 +276,23 @@ func (c *UnifiedWorkloadRolloutControlPlane) CheckOneBatchReady() (bool, error) "currentBatch", currentBatch, "replicas", replicas, "noNeedRollbackReplicas", noNeedRollbackReplicas, - "plannedBatchCanaryReplicas", plannedBatchCanaryReplicas, - "expectedBatchCanaryReplicas", expectedBatchCanaryReplicas, - "expectedBatchStableReplicas", expectedBatchStableReplicas) + "updatedReplicasInBatch", updatedReplicasInBatch, + "plannedUpdatedReplicas", plannedUpdatedReplicas, + "expectedUpdatedReplicas", expectedUpdatedReplicas, + "expectedStableReplicas", expectedStableReplicas) + + pods, err := c.ListOwnedPods() + if err != nil { + return false, err + } - if ready, err := c.IsBatchReady(expectedBatchCanaryReplicas, expectedBatchStableReplicas); err != nil || !ready { - klog.InfoS("the batch is not ready yet", "Workload", workloadInfo.GVKWithName, - "BatchRelease", client.ObjectKeyFromObject(c.release), "current-batch", c.release.Status.CanaryStatus.CurrentBatch) + if !isBatchReady(c.release, pods, workloadInfo.MaxUnavailable, + plannedUpdatedReplicas, expectedUpdatedReplicas, updatedReplicas, updatedReadyReplicas) { + klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%d", klog.KObj(c.release), currentBatch) return false, nil } - c.recorder.Eventf(c.release, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.release.Status.CanaryStatus.CurrentBatch) + klog.Infof("BatchRelease(%v) %d batch is ready", klog.KObj(c.release), currentBatch) return true, nil } diff --git a/pkg/controller/rollout/batchrelease/inner_batchrelease.go b/pkg/controller/rollout/batchrelease/inner_batchrelease.go index f30b3740..0d4fcca8 100644 --- a/pkg/controller/rollout/batchrelease/inner_batchrelease.go +++ b/pkg/controller/rollout/batchrelease/inner_batchrelease.go @@ -356,9 +356,10 @@ func createBatchRelease(rollout *rolloutv1alpha1.Rollout, batchName, rolloutID s }, }, ReleasePlan: rolloutv1alpha1.ReleasePlan{ - Batches: batches, - RolloutID: rolloutID, - BatchPartition: utilpointer.Int32Ptr(0), + Batches: batches, + RolloutID: rolloutID, + BatchPartition: utilpointer.Int32Ptr(0), + FailureThreshold: rollout.Spec.Strategy.Canary.FailureThreshold, }, }, } diff --git a/pkg/util/pod_utils.go b/pkg/util/pod_utils.go index fa967a2d..89e380a5 100644 --- a/pkg/util/pod_utils.go +++ b/pkg/util/pod_utils.go @@ -141,3 +141,14 @@ func ListOwnedPods(c client.Client, workload client.Object) ([]*v1.Pod, error) { } return pods, nil } + +// WrappedPodCount return the number of pods which satisfy the filter +func WrappedPodCount(pods []*v1.Pod, filter func(pod *v1.Pod) bool) int { + count := 0 + for _, pod := range pods { + if filter(pod) { + count++ + } + } + return count +} diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index 79b293fd..7eb19c83 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -244,7 +244,8 @@ var _ = SIGDescribe("Rollout", func() { return false } klog.Infof("current step:%v target step:%v current step state %v", clone.Status.CanaryStatus.CurrentStepIndex, stepIndex, clone.Status.CanaryStatus.CurrentStepState) - return clone.Status.CanaryStatus.CurrentStepIndex == stepIndex && clone.Status.CanaryStatus.CurrentStepState == rolloutsv1alpha1.CanaryStepStatePaused + return clone.Status.CanaryStatus.CurrentStepIndex == stepIndex && (clone.Status.CanaryStatus.CurrentStepState == rolloutsv1alpha1.CanaryStepStatePaused || + clone.Status.CanaryStatus.CurrentStepState == rolloutsv1alpha1.CanaryStepStateCompleted) }, 20*time.Minute, time.Second).Should(BeTrue()) } @@ -3844,6 +3845,83 @@ var _ = SIGDescribe("Rollout", func() { CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "2", "5", 1) }) }) + + KruiseDescribe("Test", func() { + It("failure threshold", func() { + By("Creating Rollout...") + rollout := &rolloutsv1alpha1.Rollout{} + Expect(ReadYamlToObject("./test_data/rollout/rollout_canary_base.yaml", rollout)).ToNot(HaveOccurred()) + rollout.Spec.ObjectRef.WorkloadRef = &rolloutsv1alpha1.WorkloadRef{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "CloneSet", + Name: "echoserver", + } + rollout.Spec.Strategy.Canary = &rolloutsv1alpha1.CanaryStrategy{ + FailureThreshold: &intstr.IntOrString{Type: intstr.String, StrVal: "20%"}, + Steps: []rolloutsv1alpha1.CanaryStep{ + { + Weight: utilpointer.Int32(10), + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(30), + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(60), + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + { + Weight: utilpointer.Int32(100), + Pause: rolloutsv1alpha1.RolloutPause{}, + }, + }, + } + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + workload := &appsv1alpha1.CloneSet{} + Expect(ReadYamlToObject("./test_data/rollout/cloneset.yaml", workload)).ToNot(HaveOccurred()) + workload.Spec.Replicas = utilpointer.Int32(10) + workload.Spec.UpdateStrategy.MaxUnavailable = &intstr.IntOrString{Type: intstr.String, StrVal: "100%"} + CreateObject(workload) + WaitCloneSetAllPodsReady(workload) + + checkUpdateReadyPods := func(lower, upper int32) bool { + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + return lower <= workload.Status.UpdatedReadyReplicas && workload.Status.UpdatedReadyReplicas <= upper + } + + By("start rollout") + workload.Labels[util.RolloutIDLabel] = "1" + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateCloneSet(workload) + + By("wait step(1) pause") + WaitRolloutCanaryStepPaused(rollout.Name, 1) + Expect(checkUpdateReadyPods(1, 1)).Should(BeTrue()) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "1", 1) + + By("wait step(2) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 2) + Expect(checkUpdateReadyPods(2, 3)).Should(BeTrue()) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "2", 2) + + By("wait step(3) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 3) + Expect(checkUpdateReadyPods(4, 6)).Should(BeTrue()) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "3", 3) + + By("wait step(4) pause") + ResumeRolloutCanary(rollout.Name) + WaitRolloutCanaryStepPaused(rollout.Name, 4) + Expect(checkUpdateReadyPods(8, 10)).Should(BeTrue()) + CheckPodBatchLabel(workload.Namespace, workload.Spec.Selector, "1", "4", 4) + }) + }) }) func mergeEnvVar(original []v1.EnvVar, add v1.EnvVar) []v1.EnvVar {