diff --git a/pkg/control/sorting/pod_sorting.go b/pkg/control/sorting/pod_sorting.go index 250f36ba54..d916cf7c47 100644 --- a/pkg/control/sorting/pod_sorting.go +++ b/pkg/control/sorting/pod_sorting.go @@ -20,6 +20,13 @@ import ( "context" "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core" @@ -27,12 +34,6 @@ import ( sidecarsetcontroller "github.com/openkruise/kruise/pkg/controller/sidecarset" statefulsetcontroller "github.com/openkruise/kruise/pkg/controller/statefulset" "github.com/openkruise/kruise/pkg/util/updatesort" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" ) // SortPods sorts the given Pods according the owner workload logic. diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index ed02816a49..26df266b40 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "sort" + "sync" "time" apps "k8s.io/api/apps/v1" @@ -32,6 +33,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" + "k8s.io/utils/integer" utilpointer "k8s.io/utils/pointer" appspub "github.com/openkruise/kruise/apis/apps/pub" @@ -305,6 +307,39 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( return currentRevision, updateRevision, collisionCount, nil } +func (ssc *defaultStatefulSetControl) doPreDownload(set *appsv1beta1.StatefulSet, currentRevision, updateRevision *apps.ControllerRevision) { + var err error + if isPreDownloadDisabled || sigsruntimeClient == nil { + return + } + if currentRevision.Name != updateRevision.Name { + // get asts pre-download annotation + minUpdatedReadyPodsCount := 0 + if minUpdatedReadyPods, ok := set.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok { + minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods) + minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(*set.Spec.Replicas), true) + if err != nil { + klog.ErrorS(err, "Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for statefulSet", "statefulSet", klog.KObj(set)) + } + } + updatedReadyReplicas := set.Status.UpdatedReadyReplicas + if updateRevision.Name != set.Status.UpdateRevision { + updatedReadyReplicas = 0 + } + if int32(minUpdatedReadyPodsCount) <= updatedReadyReplicas { + // pre-download images for new revision + if err := ssc.createImagePullJobsForInPlaceUpdate(set, currentRevision, updateRevision); err != nil { + klog.ErrorS(err, "Failed to create ImagePullJobs for statefulSet", "statefulSet", klog.KObj(set)) + } + } + } else { + // delete ImagePullJobs if revisions have been consistent + if err := imagejobutilfunc.DeleteJobsForWorkload(sigsruntimeClient, set); err != nil { + klog.ErrorS(err, "Failed to delete ImagePullJobs for statefulSet", "statefulSet", klog.KObj(set)) + } + } +} + // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in // the set in order to conform the system to the target state for the set. The target state always contains // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is @@ -314,8 +349,6 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the // update must be recorded. If the error is not nil, the method should be retried until successful. - -// TODO (RZ): Break the below spaghetti code into smaller chucks with unit tests func (ssc *defaultStatefulSetControl) updateStatefulSet( ctx context.Context, set *appsv1beta1.StatefulSet, @@ -339,34 +372,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return set.Status.DeepCopy(), err } - if !isPreDownloadDisabled && sigsruntimeClient != nil { - if currentRevision.Name != updateRevision.Name { - // get asts pre-download annotation - minUpdatedReadyPodsCount := 0 - if minUpdatedReadyPods, ok := set.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok { - minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods) - minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(*set.Spec.Replicas), true) - if err != nil { - klog.Errorf("Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for %v: %v", set, err) - } - } - updatedReadyReplicas := set.Status.UpdatedReadyReplicas - if updateRevision.Name != set.Status.UpdateRevision { - updatedReadyReplicas = 0 - } - if int32(minUpdatedReadyPodsCount) <= updatedReadyReplicas { - // pre-download images for new revision - if err := ssc.createImagePullJobsForInPlaceUpdate(set, currentRevision, updateRevision); err != nil { - klog.Errorf("Failed to create ImagePullJobs for %v: %v", set, err) - } - } - } else { - // delete ImagePullJobs if revisions have been consistent - if err := imagejobutilfunc.DeleteJobsForWorkload(sigsruntimeClient, set); err != nil { - klog.Errorf("Failed to delete ImagePullJobs for %v: %v", set, err) - } - } - } + ssc.doPreDownload(set, currentRevision, updateRevision) // set the generation, and revisions in the returned status status := appsv1beta1.StatefulSetStatus{} @@ -375,6 +381,9 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( status.UpdateRevision = updateRevision.Name status.CollisionCount = utilpointer.Int32Ptr(collisionCount) status.LabelSelector = selector.String() + minReadySeconds := getMinReadySeconds(set) + + updateStatus(&status, minReadySeconds, currentRevision, updateRevision, pods) startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) // slice that will contain all Pods such that startOrdinal <= getOrdinal(pod) < endOrdinal and not in reserveOrdinals @@ -385,48 +394,9 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( firstUnhealthyOrdinal := math.MaxInt32 var firstUnhealthyPod *v1.Pod monotonic := !allowsBurst(set) - minReadySeconds := getMinReadySeconds(set) - var scaleMaxUnavailable *int - if set.Spec.ScaleStrategy != nil && set.Spec.ScaleStrategy.MaxUnavailable != nil { - maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(set.Spec.ScaleStrategy.MaxUnavailable, int(*set.Spec.Replicas), false) - if err != nil { - return &status, err - } - // maxUnavailable should not be less than 1 - if maxUnavailable < 1 { - maxUnavailable = 1 - } - scaleMaxUnavailable = &maxUnavailable - } // First we partition pods into two lists valid replicas and condemned Pods for i := range pods { - status.Replicas++ - - // count the number of running and ready replicas - if isRunningAndReady(pods[i]) { - status.ReadyReplicas++ - if getPodRevision(pods[i]) == updateRevision.Name { - status.UpdatedReadyReplicas++ - if avail, _ := isRunningAndAvailable(pods[i], minReadySeconds); avail { - status.UpdatedAvailableReplicas++ - } - } - if avail, _ := isRunningAndAvailable(pods[i], minReadySeconds); avail { - status.AvailableReplicas++ - } - } - - // count the number of current and update replicas - if isCreated(pods[i]) && !isTerminating(pods[i]) { - if getPodRevision(pods[i]) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(pods[i]) == updateRevision.Name { - status.UpdatedReplicas++ - } - } - if ord := getOrdinal(pods[i]); podInOrdinalRangeWithParams(pods[i], startOrdinal, endOrdinal, reserveOrdinals) { // if the ordinal of the pod is within the range of the current number of replicas and not in reserveOrdinals, // insert it at the indirection of its ordinal @@ -456,7 +426,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // sort the condemned Pods by their ordinals - sort.Sort(ascendingOrdinal(condemned)) + sort.Sort(descendingOrdinal(condemned)) // find the first unhealthy Pod for i := range replicas { @@ -472,7 +442,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - for i := range condemned { + // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use) + for i := len(condemned) - 1; i >= 0; i-- { if !isHealthy(condemned[i]) { unhealthy++ if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal { @@ -492,228 +463,51 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } - // Examine each replica with respect to its ordinal - for i := range replicas { - if replicas[i] == nil { - continue - } - // delete and recreate failed pods - if isFailed(replicas[i]) { - ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", - "StatefulSet %s/%s is recreating failed Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) - if _, err := ssc.deletePod(set, replicas[i]); err != nil { - return &status, err - } - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas-- - } - status.Replicas-- - replicas[i] = newVersionedStatefulSetPod( - currentSet, - updateSet, - currentRevision.Name, - updateRevision.Name, - i, replicas) - } - // If we find a Pod that has not been created we create the Pod - if !isCreated(replicas[i]) { - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { - return &status, err - } else if isStale { - // If a pod has a stale PVC, no more work can be done this round. - return &status, err - } - } - - lifecycle.SetPodLifecycle(appspub.LifecycleStateNormal)(replicas[i]) - if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { - msg := fmt.Sprintf("StatefulPodControl failed to create Pod error: %s", err) - condition := NewStatefulsetCondition(appsv1beta1.FailedCreatePod, v1.ConditionTrue, "", msg) - SetStatefulsetCondition(&status, condition) - return &status, err - } - status.Replicas++ - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas++ - } - // if the set does not allow bursting, return immediately - if monotonic { - return &status, nil - } else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { - klog.V(4).Infof( - "StatefulSet %s/%s Pod %s is Creating, and break pods scale", - set.Namespace, - set.Name, - replicas[i].Name) - break - } - // pod created, no more work possible for this round - continue - } - - // If the Pod is in pending state then trigger PVC creation to create missing PVCs - if isPending(replicas[i]) { - klog.V(4).Info( - "StatefulSet is triggering PVC creation for pending Pod", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { - return &status, err - } - } - - // If we find a Pod that is currently terminating, we must wait until graceful deletion - // completes before we continue to make progress. - if isTerminating(replicas[i]) && monotonic { - klog.V(4).Infof( - "StatefulSet %s/%s is waiting for Pod %s to Terminate", - set.Namespace, - set.Name, - replicas[i].Name) - return &status, nil - } else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { - klog.V(4).Infof( - "StatefulSet %s/%s Pod %s is Terminating, and break pods scale", - set.Namespace, - set.Name, - replicas[i].Name) - break - } - // Update InPlaceUpdateReady condition for pod - if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil { - klog.Errorf("StatefulSet %s/%s failed to update pod %s condition for inplace: %v", - set.Namespace, set.Name, replicas[i].Name, res.RefreshErr) - return &status, res.RefreshErr - } else if res.DelayDuration > 0 { - durationStore.Push(getStatefulSetKey(set), res.DelayDuration) - } - // If we have a Pod that has been created but is not running and available we can not make progress. - // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its - // ordinal, are Running and Available. - if monotonic || scaleMaxUnavailable != nil { - isAvailable, waitTime := isRunningAndAvailable(replicas[i], minReadySeconds) - if !isAvailable && monotonic { - if waitTime > 0 { - // make sure we check later - durationStore.Push(getStatefulSetKey(set), waitTime) - klog.V(4).Infof( - "StatefulSet %s/%s needs to wait %s for the Pod %s to be Running and Available after being"+ - " Ready for %d seconds", - set.Namespace, - set.Name, - waitTime, - replicas[i].Name, - minReadySeconds) - } else { - klog.V(4).Infof( - "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready", - set.Namespace, - set.Name, - replicas[i].Name) - } - return &status, nil - } else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { - klog.V(4).Infof( - "StatefulSet %s/%s Pod %s is unavailable, and break pods scale", - set.Namespace, - set.Name, - replicas[i].Name) - if waitTime > 0 { - // make sure we check later - durationStore.Push(getStatefulSetKey(set), waitTime) - } - break - } - } - // Enforce the StatefulSet invariants - retentionMatch := true - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - var err error - retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i]) - // An error is expected if the pod is not yet fully updated, and so return is treated as matching. - if err != nil { - retentionMatch = true - } - } - if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { - continue - } - // Make a deep copy so we don't mutate the shared cache - replica := replicas[i].DeepCopy() - if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { - msg := fmt.Sprintf("StatefulPodControl failed to update Pod error: %s", err) - condition := NewStatefulsetCondition(appsv1beta1.FailedUpdatePod, v1.ConditionTrue, "", msg) - SetStatefulsetCondition(&status, condition) - return &status, err - } + // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. + scaleMaxUnavailable, err := getScaleMaxUnavailable(set) + if err != nil { + return &status, err + } + processReplicaFn := func(i int) (bool, bool, error) { + return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, &status, scaleMaxUnavailable) + } + if shouldExit, err := runForAllWithBreak(replicas, processReplicaFn); shouldExit || err != nil { + updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Ensure ownerRefs are set correctly for the condemned pods. - for i := range condemned { + fixPodClaim := func(i int) (bool, error) { if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, condemned[i]); err != nil { - return &status, err + return true, err } else if !matchPolicy { if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(updateSet, condemned[i]); err != nil { - return &status, err + return true, err } } + return false, nil + } + if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil { + updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err } } - // At this point, all of the current Replicas are Running and Ready, we can consider termination. + // At this point, in monotonic mode all of the current Replicas are Running, Ready and Available, + // and we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. - // We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas). - // Note that we do not resurrect Pods in this interval. Also not that scaling will take precedence over + // We will terminate Pods in a monotonically decreasing order. + // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over // updates. - for target := len(condemned) - 1; target >= 0; target-- { - // wait for terminating pods to expire - if isTerminating(condemned[target]) { - klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) - // block if we are in monotonic mode - if monotonic { - return &status, nil - } - continue - } - // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block - if avail, waitTime := isRunningAndAvailable(condemned[target], minReadySeconds); !avail && monotonic && condemned[target] != firstUnhealthyPod { - klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) - if waitTime > 0 { - durationStore.Push(getStatefulSetKey(condemned[target]), waitTime) - } - return &status, nil - } - klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down", - set.Namespace, - set.Name, - condemned[target].Name) - - modified, err := ssc.deletePod(set, condemned[target]) - if err != nil || modified { - return &status, err - } - if getPodRevision(condemned[target]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(condemned[target]) == updateRevision.Name { - status.UpdatedReplicas-- - } - if monotonic { - return &status, nil - } + processCondemnedFn := func(i int) (bool, error) { + return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i) + } + if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil { + updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err } + updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { @@ -1055,6 +849,77 @@ func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( return nil } +type replicaStatus struct { + replicas int32 + readyReplicas int32 + availableReplicas int32 + currentReplicas int32 + + updatedReplicas int32 + updatedReadyReplicas int32 + updatedAvailableReplicas int32 +} + +func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus { + status := replicaStatus{} + for _, pod := range pods { + if pod == nil { + continue + } + if isCreated(pod) { + status.replicas++ + } + + // count the number of running and ready replicas + if isRunningAndReady(pod) { + status.readyReplicas++ + if getPodRevision(pod) == updateRevision.Name { + status.updatedReadyReplicas++ + if avail, _ := isRunningAndAvailable(pod, minReadySeconds); avail { + status.updatedAvailableReplicas++ + } + } + // count the number of running and available replicas + ok, _ := isRunningAndAvailable(pod, minReadySeconds) + if ok { + status.availableReplicas++ + } + } + + // count the number of current and update replicas + if isCreated(pod) && !isTerminating(pod) { + revision := getPodRevision(pod) + if revision == currentRevision.Name { + status.currentReplicas++ + } + if revision == updateRevision.Name { + status.updatedReplicas++ + } + } + } + return status +} + +func updateStatus(status *appsv1beta1.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) { + status.Replicas = 0 + status.ReadyReplicas = 0 + status.AvailableReplicas = 0 + status.CurrentReplicas = 0 + status.UpdatedReplicas = 0 + status.UpdatedReadyReplicas = 0 + status.UpdatedAvailableReplicas = 0 + for _, list := range podLists { + replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision) + status.Replicas += replicaStatus.replicas + status.ReadyReplicas += replicaStatus.readyReplicas + status.AvailableReplicas += replicaStatus.availableReplicas + status.UpdatedReadyReplicas += replicaStatus.updatedReadyReplicas + status.UpdatedAvailableReplicas += replicaStatus.updatedAvailableReplicas + status.CurrentReplicas += replicaStatus.currentReplicas + status.UpdatedReplicas += replicaStatus.updatedReplicas + } +} + // getStartOrdinal gets the first possible ordinal (inclusive). // Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0. func getStartOrdinal(set *appsv1beta1.StatefulSet) int { @@ -1065,3 +930,289 @@ func getStartOrdinal(set *appsv1beta1.StatefulSet) int { } return 0 } + +func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *appsv1beta1.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) { + logger := klog.FromContext(ctx) + if isTerminating(condemned[i]) { + // if we are in monotonic mode, block and wait for terminating pods to expire + if monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + return true, nil + } + return false, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block + if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + return true, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. + if avail, waitTime := isRunningAndAvailable(condemned[i], getMinReadySeconds(set)); !avail && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + if waitTime > 0 { + durationStore.Push(getStatefulSetKey(condemned[i]), waitTime) + } + return true, nil + } + + logger.V(2).Info("Pod of StatefulSet is terminating for scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + + modified, err := ssc.deletePod(set, condemned[i]) + if err != nil || (monotonic && modified) { + return true, err + } + return false, nil +} + +// processReplica handles an individual replica within a StatefulSet based on its current state. +// It decides whether to delete, create, update the replica, or await its readiness. +// +// different from stateful set: +// +// If decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) returns true, +// break the pod for-loop and proceed with the update logic, +// which will apply in-place conditions to make the pod ready. +// For example: update unhealthy pods or add some conditions +// +// Returns: +// - bool shouldExit: whether to exit. +// - bool shouldBreak: whether to break the pod for-loop and proceed with the update logic. +// - An error if encountered during processing; nil otherwise. +func (ssc *defaultStatefulSetControl) processReplica( + ctx context.Context, + set *appsv1beta1.StatefulSet, + updateSet *appsv1beta1.StatefulSet, + monotonic bool, + replicas []*v1.Pod, + i int, + status *appsv1beta1.StatefulSetStatus, + scaleMaxUnavailable *int) (bool, bool, error) { + minReadySeconds := getMinReadySeconds(set) + logger := klog.FromContext(ctx) + + if replicas[i] == nil { + return false, false, nil + } + // Note that pods with phase Succeeded will also trigger this event. This is + // because final pod phase of evicted or otherwise forcibly stopped pods + // (e.g. terminated on node reboot) is determined by the exit code of the + // container, not by the reason for pod termination. We should restart the pod + // regardless of the exit code. + if isFailed(replicas[i]) || isSucceeded(replicas[i]) { + if replicas[i].DeletionTimestamp == nil { + if _, err := ssc.deletePod(set, replicas[i]); err != nil { + return true, false, err + } + } + // New pod should be generated on the next sync after the current pod is removed from etcd. + return true, false, nil + } + // If we find a Pod that has not been created we create the Pod + if !isCreated(replicas[i]) { + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { + return true, false, err + } else if isStale { + // If a pod has a stale PVC, no more work can be done this round. + return true, false, err + } + } + lifecycle.SetPodLifecycle(appspub.LifecycleStateNormal)(replicas[i]) + if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { + msg := fmt.Sprintf("StatefulPodControl failed to create Pod error: %s", err) + condition := NewStatefulsetCondition(appsv1beta1.FailedCreatePod, v1.ConditionTrue, "", msg) + SetStatefulsetCondition(status, condition) + return true, false, err + } + if monotonic { + // if the set does not allow bursting, return immediately + return true, false, nil + } else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { + logger.V(4).Info( + "StatefulSet pod is Creating, and break pods scale", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return false, true, nil + } + // pod created, no more work possible for this round + return false, false, nil + } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + logger.V(4).Info( + "StatefulSet is triggering PVC creation for pending Pod", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { + return true, false, err + } + } + + // If we find a Pod that is currently terminating, we must wait until graceful deletion + // completes before we continue to make progress. + if isTerminating(replicas[i]) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, false, nil + } else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { + logger.V(4).Info( + "StatefulSet pod is Terminating, and break pods scale", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return false, true, nil + } + + // Update InPlaceUpdateReady condition for pod + if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil { + logger.Error(res.RefreshErr, "StatefulSet failed to update pod condition for inplace", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, false, res.RefreshErr + } else if res.DelayDuration > 0 { + durationStore.Push(getStatefulSetKey(set), res.DelayDuration) + } + + // If we have a Pod that has been created but is not running and available we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Running and Available. + if monotonic || scaleMaxUnavailable != nil { + isAvailable, waitTime := isRunningAndAvailable(replicas[i], minReadySeconds) + if !isAvailable && monotonic { + if waitTime > 0 { + // make sure we check later + durationStore.Push(getStatefulSetKey(set), waitTime) + logger.V(4).Info( + "StatefulSet needs to wait for the pod to be Running and Available after being"+ + " Ready for minReadySeconds", "statefulSet", klog.KObj(set), "waitTime", waitTime, + "pod", klog.KObj(replicas[i]), "minReadySeconds", minReadySeconds) + } else { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + } + return true, false, nil + } else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) { + logger.V(4).Info( + "StatefulSet pod is unavailable, and break pods scale", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + if waitTime > 0 { + // make sure we check later + durationStore.Push(getStatefulSetKey(set), waitTime) + } + return false, true, nil + } + } + + // Enforce the StatefulSet invariants + retentionMatch := true + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + var err error + retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i]) + // An error is expected if the pod is not yet fully updated, and so return is treated as matching. + if err != nil { + retentionMatch = true + } + } + + if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { + return false, false, nil + } + + // Make a deep copy so we don't mutate the shared cache + replica := replicas[i].DeepCopy() + if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { + msg := fmt.Sprintf("StatefulPodControl failed to update Pod error: %s", err) + condition := NewStatefulsetCondition(appsv1beta1.FailedUpdatePod, v1.ConditionTrue, "", msg) + SetStatefulsetCondition(status, condition) + return true, false, err + } + + return false, false, nil +} + +func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) { + successes := 0 + j := 0 + for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) { + errCh := make(chan error, batchSize) + var wg sync.WaitGroup + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + go func(k int) { + defer wg.Done() + // Ignore the first parameter - relevant for monotonic only. + if _, err := fn(k); err != nil { + errCh <- err + } + }(j) + j++ + } + wg.Wait() + successes += batchSize - len(errCh) + close(errCh) + if len(errCh) > 0 { + errs := make([]error, 0) + for err := range errCh { + errs = append(errs, err) + } + return successes, utilerrors.NewAggregate(errs) + } + remaining -= batchSize + } + return successes, nil +} + +// runForAllWithBreak iterates through all pod objects, applying the given function until a specified condition is met. +// The function can decide whether to continue, break out of the loop, or return an error. +// Parameters: +// - pods: An array of pointers to Pod objects, representing the collection of pods to be processed. +// - fn: A function that takes an index as a parameter and returns three values: +// 1. A boolean indicating whether to exit the current iteration. +// 2. A boolean indicating whether to break out of the loop. +// 3. An error object, in case an error occurs during function execution. +// +// Returns: +// - A boolean indicating whether an exit condition was met or an error occurred during iteration. +// - An error object, if an error was encountered during the execution of the provided function. +func runForAllWithBreak(pods []*v1.Pod, fn func(i int) (bool, bool, error)) (bool, error) { + for i := range pods { + if shouldExit, shouldBreak, err := fn(i); shouldExit || err != nil { + return true, err + } else if shouldBreak { + //Introduce this branch to exit the for-loop while proceeding with subsequent update logic. + break + } + } + return false, nil +} + +func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) { + if monotonic { + for i := range pods { + if shouldExit, err := fn(i); shouldExit || err != nil { + return true, err + } + } + } else { + if _, err := slowStartBatch(1, len(pods), fn); err != nil { + return true, err + } + } + return false, nil +} + +func getScaleMaxUnavailable(set *appsv1beta1.StatefulSet) (*int, error) { + var scaleMaxUnavailable *int + if set.Spec.ScaleStrategy != nil && set.Spec.ScaleStrategy.MaxUnavailable != nil { + maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(set.Spec.ScaleStrategy.MaxUnavailable, int(*set.Spec.Replicas), false) + if err != nil { + return scaleMaxUnavailable, err + } + // maxUnavailable should not be less than 1 + if maxUnavailable < 1 { + maxUnavailable = 1 + } + scaleMaxUnavailable = &maxUnavailable + } + return scaleMaxUnavailable, nil +} diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 10d7876640..04a6ae48ce 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -423,6 +423,11 @@ func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed } +// isSucceeded returns true if pod has a Phase of PodSucceeded +func isSucceeded(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodSucceeded +} + // isTerminating returns true if pod's DeletionTimestamp has been set func isTerminating(pod *v1.Pod) bool { return pod.DeletionTimestamp != nil @@ -658,6 +663,20 @@ func (ao ascendingOrdinal) Less(i, j int) bool { return getOrdinal(ao[i]) < getOrdinal(ao[j]) } +type descendingOrdinal []*v1.Pod + +func (do descendingOrdinal) Len() int { + return len(do) +} + +func (do descendingOrdinal) Swap(i, j int) { + do[i], do[j] = do[j], do[i] +} + +func (do descendingOrdinal) Less(i, j int) bool { + return getOrdinal(do[i]) > getOrdinal(do[j]) +} + // NewStatefulsetCondition creates a new statefulset condition. func NewStatefulsetCondition(conditionType apps.StatefulSetConditionType, conditionStatus v1.ConditionStatus, reason, message string) apps.StatefulSetCondition { return apps.StatefulSetCondition{