From b7e0ff72e02e1da045536f393d136c209cdfab76 Mon Sep 17 00:00:00 2001 From: Abner-1 Date: Thu, 13 Jun 2024 16:48:52 +0800 Subject: [PATCH] add start ordinal and e2e case Signed-off-by: Abner-1 --- apis/apps/v1beta1/statefulset_types.go | 26 +- apis/apps/v1beta1/zz_generated.deepcopy.go | 20 + .../bases/apps.kruise.io_statefulsets.yaml | 21 + .../apps.kruise.io_uniteddeployments.yaml | 21 + go.mod | 2 +- .../statefulset/stateful_pod_control.go | 16 + .../statefulset/stateful_pod_control_test.go | 41 +- .../statefulset/stateful_set_control.go | 48 ++- .../statefulset/stateful_set_control_test.go | 110 +++++ .../statefulset/stateful_set_utils.go | 54 ++- .../statefulset/stateful_set_utils_test.go | 87 ++++ pkg/features/kruise_features.go | 4 + .../validating/statefulset_validation.go | 3 +- test/e2e/apps/statefulset.go | 399 +++++++++++++++++- test/e2e/framework/ginkgowrapper.go | 37 ++ test/e2e/framework/node_util.go | 197 +++++++++ test/e2e/framework/statefulset_utils.go | 11 +- 17 files changed, 1040 insertions(+), 57 deletions(-) create mode 100644 test/e2e/framework/ginkgowrapper.go diff --git a/apis/apps/v1beta1/statefulset_types.go b/apis/apps/v1beta1/statefulset_types.go index d30d998548..7118751154 100644 --- a/apis/apps/v1beta1/statefulset_types.go +++ b/apis/apps/v1beta1/statefulset_types.go @@ -17,11 +17,12 @@ limitations under the License. package v1beta1 import ( - appspub "github.com/openkruise/kruise/apis/apps/pub" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + + appspub "github.com/openkruise/kruise/apis/apps/pub" ) const ( @@ -143,6 +144,21 @@ type StatefulSetPersistentVolumeClaimRetentionPolicy struct { WhenScaled PersistentVolumeClaimRetentionPolicyType `json:"whenScaled,omitempty"` } +// StatefulSetOrdinals describes the policy used for replica ordinal assignment +// in this StatefulSet. +type StatefulSetOrdinals struct { + // start is the number representing the first replica's index. It may be used + // to number replicas from an alternate index (eg: 1-indexed) over the default + // 0-indexed names, or to orchestrate progressive movement of replicas from + // one StatefulSet to another. + // If set, replica indices will be in the range: + // [.spec.ordinals.start, .spec.ordinals.start + .spec.replicas). + // If unset, defaults to 0. Replica indices will be in the range: + // [0, .spec.replicas). + // +optional + Start int32 `json:"start" protobuf:"varint,1,opt,name=start"` +} + // StatefulSetSpec defines the desired state of StatefulSet type StatefulSetSpec struct { // replicas is the desired number of replicas of the given Template. @@ -228,6 +244,14 @@ type StatefulSetSpec struct { // StatefulSetAutoDeletePVC feature gate to be enabled, which is alpha. // +optional PersistentVolumeClaimRetentionPolicy *StatefulSetPersistentVolumeClaimRetentionPolicy `json:"persistentVolumeClaimRetentionPolicy,omitempty"` + + // ordinals controls the numbering of replica indices in a StatefulSet. The + // default ordinals behavior assigns a "0" index to the first replica and + // increments the index by one for each additional replica requested. Using + // the ordinals field requires the StatefulSetStartOrdinal feature gate to be + // enabled, which is beta. + // +optional + Ordinals *StatefulSetOrdinals `json:"ordinals,omitempty"` } // StatefulSetScaleStrategy defines strategies for pods scale. diff --git a/apis/apps/v1beta1/zz_generated.deepcopy.go b/apis/apps/v1beta1/zz_generated.deepcopy.go index 7eec714610..f4e977bffe 100644 --- a/apis/apps/v1beta1/zz_generated.deepcopy.go +++ b/apis/apps/v1beta1/zz_generated.deepcopy.go @@ -128,6 +128,21 @@ func (in *StatefulSetList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StatefulSetOrdinals) DeepCopyInto(out *StatefulSetOrdinals) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatefulSetOrdinals. +func (in *StatefulSetOrdinals) DeepCopy() *StatefulSetOrdinals { + if in == nil { + return nil + } + out := new(StatefulSetOrdinals) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StatefulSetPersistentVolumeClaimRetentionPolicy) DeepCopyInto(out *StatefulSetPersistentVolumeClaimRetentionPolicy) { *out = *in @@ -210,6 +225,11 @@ func (in *StatefulSetSpec) DeepCopyInto(out *StatefulSetSpec) { *out = new(StatefulSetPersistentVolumeClaimRetentionPolicy) **out = **in } + if in.Ordinals != nil { + in, out := &in.Ordinals, &out.Ordinals + *out = new(StatefulSetOrdinals) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatefulSetSpec. diff --git a/config/crd/bases/apps.kruise.io_statefulsets.yaml b/config/crd/bases/apps.kruise.io_statefulsets.yaml index 4fe432930d..e081d9837d 100644 --- a/config/crd/bases/apps.kruise.io_statefulsets.yaml +++ b/config/crd/bases/apps.kruise.io_statefulsets.yaml @@ -580,6 +580,27 @@ spec: type: boolean type: object type: object + ordinals: + description: |- + ordinals controls the numbering of replica indices in a StatefulSet. The + default ordinals behavior assigns a "0" index to the first replica and + increments the index by one for each additional replica requested. Using + the ordinals field requires the StatefulSetStartOrdinal feature gate to be + enabled, which is beta. + properties: + start: + description: |- + start is the number representing the first replica's index. It may be used + to number replicas from an alternate index (eg: 1-indexed) over the default + 0-indexed names, or to orchestrate progressive movement of replicas from + one StatefulSet to another. + If set, replica indices will be in the range: + [.spec.ordinals.start, .spec.ordinals.start + .spec.replicas). + If unset, defaults to 0. Replica indices will be in the range: + [0, .spec.replicas). + format: int32 + type: integer + type: object persistentVolumeClaimRetentionPolicy: description: |- PersistentVolumeClaimRetentionPolicy describes the policy used for PVCs created from diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index 9ea7eda065..d15c53767a 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -203,6 +203,27 @@ spec: type: boolean type: object type: object + ordinals: + description: |- + ordinals controls the numbering of replica indices in a StatefulSet. The + default ordinals behavior assigns a "0" index to the first replica and + increments the index by one for each additional replica requested. Using + the ordinals field requires the StatefulSetStartOrdinal feature gate to be + enabled, which is beta. + properties: + start: + description: |- + start is the number representing the first replica's index. It may be used + to number replicas from an alternate index (eg: 1-indexed) over the default + 0-indexed names, or to orchestrate progressive movement of replicas from + one StatefulSet to another. + If set, replica indices will be in the range: + [.spec.ordinals.start, .spec.ordinals.start + .spec.replicas). + If unset, defaults to 0. Replica indices will be in the range: + [0, .spec.replicas). + format: int32 + type: integer + type: object persistentVolumeClaimRetentionPolicy: description: |- PersistentVolumeClaimRetentionPolicy describes the policy used for PVCs created from diff --git a/go.mod b/go.mod index 302b17167c..2ee05b763d 100644 --- a/go.mod +++ b/go.mod @@ -104,7 +104,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect - github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 912935a8a5..cfbbb89132 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -315,6 +315,22 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *appsv1beta1.St } } +// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy +func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *appsv1beta1.StatefulSet, pod *v1.Pod) error { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + return err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Set PVC policy as much as is possible at this point. + if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } + } + return nil +} + // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with diff --git a/pkg/controller/statefulset/stateful_pod_control_test.go b/pkg/controller/statefulset/stateful_pod_control_test.go index 70917041ae..6214a0b915 100644 --- a/pkg/controller/statefulset/stateful_pod_control_test.go +++ b/pkg/controller/statefulset/stateful_pod_control_test.go @@ -27,10 +27,6 @@ import ( "testing" "time" - appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - "github.com/openkruise/kruise/pkg/features" - "github.com/openkruise/kruise/pkg/util" - utilfeature "github.com/openkruise/kruise/pkg/util/feature" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -46,6 +42,11 @@ import ( _ "k8s.io/kubernetes/pkg/apis/apps/install" _ "k8s.io/kubernetes/pkg/apis/core/install" utilpointer "k8s.io/utils/pointer" + + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + "github.com/openkruise/kruise/pkg/features" + "github.com/openkruise/kruise/pkg/util" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" ) func TestStatefulPodControlCreatesPods(t *testing.T) { @@ -872,11 +873,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { return set }, getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 2, 3, 4} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -909,11 +910,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { return set }, getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 2, 3, 4} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -948,11 +949,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { setClone := set.DeepCopy() setClone.Spec.Replicas = utilpointer.Int32(5) - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(setClone) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(setClone) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 2, 3, 4} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -996,11 +997,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { setClone := set.DeepCopy() setClone.Spec.Replicas = utilpointer.Int32(5) - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(setClone) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(setClone) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 2, 3, 4} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -1043,11 +1044,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { return set }, getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 3, 5, 6} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -1081,11 +1082,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { return set }, getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 3, 5, 6} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -1121,11 +1122,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { setClone := set.DeepCopy() setClone.Spec.Replicas = utilpointer.Int32(5) - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(setClone) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(setClone) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 3, 5, 6} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } @@ -1170,11 +1171,11 @@ func TestUpdatePodClaimForRetentionPolicy(t *testing.T) { getPods: func(set *appsv1beta1.StatefulSet) []*v1.Pod { setClone := set.DeepCopy() setClone.Spec.Replicas = utilpointer.Int32(5) - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(setClone) + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(setClone) pods := make([]*v1.Pod, 0) expectIndex := []int{0, 1, 3, 5, 6} currentIndex := make([]int, 0) - for i := 0; i < replicaCount; i++ { + for i := startOrdinal; i < endOrdinal; i++ { if reserveOrdinals.Has(i) { continue } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index c9ad93ea1c..ed02816a49 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -45,6 +45,9 @@ import ( "github.com/openkruise/kruise/pkg/util/lifecycle" ) +// Realistic value for maximum in-flight requests when processing in parallel mode. +const MaxBatchSize = 500 + // StatefulSetControlInterface implements the control logic for updating StatefulSets and their children Pods. It is implemented // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation. type StatefulSetControlInterface interface { @@ -373,10 +376,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( status.CollisionCount = utilpointer.Int32Ptr(collisionCount) status.LabelSelector = selector.String() - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) - // slice that will contain all Pods such that 0 <= getOrdinal(pod) < replicaCount and not in reserveOrdinals - replicas := make([]*v1.Pod, replicaCount) - // slice that will contain all Pods such that replicaCount <= getOrdinal(pod) or in reserveOrdinals + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) + // slice that will contain all Pods such that startOrdinal <= getOrdinal(pod) < endOrdinal and not in reserveOrdinals + replicas := make([]*v1.Pod, endOrdinal-startOrdinal) + // slice that will contain all Pods such that getOrdinal(pod) < startOrdinal or getOrdinal(pod) >= endOrdinal or in reserveOrdinals condemned := make([]*v1.Pod, 0, len(pods)) unhealthy := 0 firstUnhealthyOrdinal := math.MaxInt32 @@ -424,13 +427,13 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount && !reserveOrdinals.Has(ord) { + if ord := getOrdinal(pods[i]); podInOrdinalRangeWithParams(pods[i], startOrdinal, endOrdinal, reserveOrdinals) { // if the ordinal of the pod is within the range of the current number of replicas and not in reserveOrdinals, // insert it at the indirection of its ordinal - replicas[ord] = pods[i] + replicas[ord-startOrdinal] = pods[i] - } else if ord >= replicaCount || reserveOrdinals.Has(ord) { - // if the ordinal is greater than the number of replicas or in reserveOrdinals, + } else if ord >= 0 { + // if the ordinal is valid, but not within the range or in reserveOrdinals, // add it to the condemned list condemned = append(condemned, pods[i]) } @@ -438,12 +441,13 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision - for ord := 0; ord < replicaCount; ord++ { + for ord := startOrdinal; ord < endOrdinal; ord++ { if reserveOrdinals.Has(ord) { continue } - if replicas[ord] == nil { - replicas[ord] = newVersionedStatefulSetPod( + replicaIdx := ord - startOrdinal + if replicas[replicaIdx] == nil { + replicas[replicaIdx] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, @@ -556,6 +560,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // pod created, no more work possible for this round continue } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + klog.V(4).Info( + "StatefulSet is triggering PVC creation for pending Pod", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { + return &status, err + } + } + // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { @@ -1039,3 +1054,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( return nil } + +// getStartOrdinal gets the first possible ordinal (inclusive). +// Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0. +func getStartOrdinal(set *appsv1beta1.StatefulSet) int { + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetStartOrdinal) { + if set.Spec.Ordinals != nil { + return int(set.Spec.Ordinals.Start) + } + } + return 0 +} diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index cb2b26bfe9..07e7e74995 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -3671,3 +3671,113 @@ func isOrHasInternalError(err error) bool { agg, ok := err.(utilerrors.Aggregate) return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) } + +func emptyInvariants(set *appsv1beta1.StatefulSet, om *fakeObjectManager) error { + return nil +} + +func TestStatefulSetControlWithStartOrdinal(t *testing.T) { + defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)() + + simpleSetFn := func(replicas, startOrdinal int, reservedIds ...int) *appsv1beta1.StatefulSet { + statefulSet := newStatefulSet(replicas) + statefulSet.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{Start: int32(startOrdinal)} + statefulSet.Spec.ReserveOrdinals = append([]int{}, reservedIds...) + return statefulSet + } + + testCases := []struct { + fn func(*testing.T, *appsv1beta1.StatefulSet, invariantFunc, []int) + obj func() *appsv1beta1.StatefulSet + expectedIds []int + }{ + { + CreatesPodsWithStartOrdinal, + func() *appsv1beta1.StatefulSet { + return simpleSetFn(3, 2) + }, + []int{2, 3, 4}, + }, + { + CreatesPodsWithStartOrdinal, + func() *appsv1beta1.StatefulSet { + return simpleSetFn(3, 2, 0, 4) + }, + []int{2, 3, 5}, + }, + { + CreatesPodsWithStartOrdinal, + func() *appsv1beta1.StatefulSet { + return simpleSetFn(3, 2, 0, 2, 3, 4, 5) + }, + []int{6, 7, 8}, + }, + { + CreatesPodsWithStartOrdinal, + func() *appsv1beta1.StatefulSet { + return simpleSetFn(4, 1) + }, + []int{1, 2, 3, 4}, + }, + { + CreatesPodsWithStartOrdinal, + func() *appsv1beta1.StatefulSet { + return simpleSetFn(4, 1, 1, 3, 4) + }, + []int{2, 5, 6, 7}, + }, + } + + for _, testCase := range testCases { + testObj := testCase.obj + testFn := testCase.fn + + set := testObj() + testFn(t, set, emptyInvariants, testCase.expectedIds) + } +} + +func CreatesPodsWithStartOrdinal(t *testing.T, set *appsv1beta1.StatefulSet, invariants invariantFunc, expectedIds []int) { + client := fake.NewSimpleClientset() + kruiseClient := kruisefake.NewSimpleClientset(set) + om, _, ssc, stop := setupController(client, kruiseClient) + defer close(stop) + + if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil { + t.Errorf("Failed to turn up StatefulSet : %s", err) + } + var err error + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != *set.Spec.Replicas { + t.Errorf("Failed to scale statefulset to %d replicas", *set.Spec.Replicas) + } + if set.Status.ReadyReplicas != *set.Spec.Replicas { + t.Errorf("Failed to set ReadyReplicas correctly, expected %d", *set.Spec.Replicas) + } + if set.Status.UpdatedReplicas != *set.Spec.Replicas { + t.Errorf("Failed to set UpdatedReplicas correctly, expected %d", *set.Spec.Replicas) + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + sort.Sort(ascendingOrdinal(pods)) + if len(expectedIds) != len(pods) { + t.Errorf("Expected %d pods. Got %d", len(expectedIds), len(pods)) + return + } + for i, pod := range pods { + expectedOrdinal := expectedIds[i] + actualPodOrdinal := getOrdinal(pod) + if actualPodOrdinal != expectedOrdinal { + t.Errorf("Expected pod ordinal %d. Got %d", expectedOrdinal, actualPodOrdinal) + } + } +} diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 382d71a5d8..10d7876640 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -78,6 +78,24 @@ func getOrdinal(pod *v1.Pod) int { return ordinal } +/** + * Determines if the given pod's ordinal number is within the permissible range + * managed by this StatefulSet and is not listed in the reserveOrdinals. + * + * @return {boolean} True if the pod's ordinal is both within the allowed range and + * not reserved; false otherwise. + */ +func podInOrdinalRange(pod *v1.Pod, set *appsv1beta1.StatefulSet) bool { + startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set) + return podInOrdinalRangeWithParams(pod, startOrdinal, endOrdinal, reserveOrdinals) +} + +func podInOrdinalRangeWithParams(pod *v1.Pod, startOrdinal, endOrdinal int, reserveOrdinals sets.Int) bool { + ordinal := getOrdinal(pod) + return ordinal >= startOrdinal && ordinal < endOrdinal && + !reserveOrdinals.Has(ordinal) +} + // getPodName gets the name of set's child Pod with an ordinal index of ordinal func getPodName(set *appsv1beta1.StatefulSet, ordinal int) string { return fmt.Sprintf("%s-%d", set.Name, ordinal) @@ -143,8 +161,6 @@ func getPersistentVolumeClaimRetentionPolicy(set *appsv1beta1.StatefulSet) appsv // PVC deletion policy for the StatefulSet. func claimOwnerMatchesSetAndPod(claim *v1.PersistentVolumeClaim, set *appsv1beta1.StatefulSet, pod *v1.Pod) bool { policy := getPersistentVolumeClaimRetentionPolicy(set) - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) - ord := getOrdinal(pod) const retain = appsv1beta1.RetainPersistentVolumeClaimRetentionPolicyType const delete = appsv1beta1.DeletePersistentVolumeClaimRetentionPolicyType switch { @@ -165,12 +181,12 @@ func claimOwnerMatchesSetAndPod(claim *v1.PersistentVolumeClaim, set *appsv1beta if hasOwnerRef(claim, set) { return false } - podScaledDown := ord >= replicaCount || reserveOrdinals.Has(ord) + podScaledDown := !podInOrdinalRange(pod, set) if podScaledDown != hasOwnerRef(claim, pod) { return false } case policy.WhenScaled == delete && policy.WhenDeleted == delete: - podScaledDown := ord >= replicaCount || reserveOrdinals.Has(ord) + podScaledDown := !podInOrdinalRange(pod, set) // If a pod is scaled down, there should be no set ref and a pod ref; // if the pod is not scaled down it's the other way around. if podScaledDown == hasOwnerRef(claim, set) { @@ -206,8 +222,6 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *appsv updateMeta(&podMeta, "Pod") setMeta := set.TypeMeta updateMeta(&setMeta, "StatefulSet") - replicaCount, reserveOrdinals := getStatefulSetReplicasRange(set) - ord := getOrdinal(pod) policy := getPersistentVolumeClaimRetentionPolicy(set) const retain = appsv1beta1.RetainPersistentVolumeClaimRetentionPolicyType const delete = appsv1beta1.DeletePersistentVolumeClaimRetentionPolicyType @@ -223,7 +237,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *appsv needsUpdate = removeOwnerRef(claim, pod) || needsUpdate case policy.WhenScaled == delete && policy.WhenDeleted == retain: needsUpdate = removeOwnerRef(claim, set) || needsUpdate - podScaledDown := ord >= replicaCount || reserveOrdinals.Has(ord) + podScaledDown := !podInOrdinalRange(pod, set) if podScaledDown { needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate } @@ -231,7 +245,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *appsv needsUpdate = removeOwnerRef(claim, pod) || needsUpdate } case policy.WhenScaled == delete && policy.WhenDeleted == delete: - podScaledDown := ord >= replicaCount || reserveOrdinals.Has(ord) + podScaledDown := !podInOrdinalRange(pod, set) if podScaledDown { needsUpdate = removeOwnerRef(claim, set) || needsUpdate needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate @@ -399,6 +413,11 @@ func isCreated(pod *v1.Pod) bool { return pod.Status.Phase != "" } +// isPending returns true if pod has a Phase of PodPending +func isPending(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodPending +} + // isFailed returns true if pod has a Phase of PodFailed func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed @@ -699,7 +718,8 @@ func decreaseAndCheckMaxUnavailable(maxUnavailable *int) bool { return val <= 0 } -// return parameters is replicaCount and reserveOrdinals, and they are used to support reserveOrdinals scenarios. +// return parameters is startOrdinal(inclusive), endOrdinal(exclusive) and reserveOrdinals, +// and they are used to support reserveOrdinals scenarios. // When configured as follows: /* apiVersion: apps.kruise.io/v1beta1 @@ -710,17 +730,19 @@ func decreaseAndCheckMaxUnavailable(maxUnavailable *int) bool { reserveOrdinals: - 1 - 3 + Spec.Ordinals.Start: 2 */ -// return replicaCount=6, reserveOrdinals={1, 3} - -func getStatefulSetReplicasRange(set *appsv1beta1.StatefulSet) (int, sets.Int) { +// result is startOrdinal 2(inclusive), endOrdinal 7(exclusive), reserveOrdinals = {1, 3} +// replicas[endOrdinal - startOrdinal] stores [replica-2, nil(reserveOrdinal 3), replica-4, replica-5, replica-6] +// todo: maybe we should remove ineffective reserveOrdinals in webhook, reserveOrdinals = {3} +func getStatefulSetReplicasRange(set *appsv1beta1.StatefulSet) (int, int, sets.Int) { reserveOrdinals := sets.NewInt(set.Spec.ReserveOrdinals...) - replicaCount := 0 - for realReplicaCount := 0; realReplicaCount < int(*set.Spec.Replicas); replicaCount++ { - if reserveOrdinals.Has(replicaCount) { + replicaMaxOrdinal := getStartOrdinal(set) + for realReplicaCount := 0; realReplicaCount < int(*set.Spec.Replicas); replicaMaxOrdinal++ { + if reserveOrdinals.Has(replicaMaxOrdinal) { continue } realReplicaCount++ } - return replicaCount, reserveOrdinals + return getStartOrdinal(set), replicaMaxOrdinal, reserveOrdinals } diff --git a/pkg/controller/statefulset/stateful_set_utils_test.go b/pkg/controller/statefulset/stateful_set_utils_test.go index 292268440e..ef5e6c87f5 100644 --- a/pkg/controller/statefulset/stateful_set_utils_test.go +++ b/pkg/controller/statefulset/stateful_set_utils_test.go @@ -32,11 +32,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/history" utilpointer "k8s.io/utils/pointer" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" ) // noopRecorder is an EventRecorder that does nothing. record.FakeRecorder has a fixed @@ -875,3 +878,87 @@ func newStatefulSet(replicas int) *appsv1beta1.StatefulSet { } return newStatefulSetWithVolumes(replicas, "foo", petMounts, podMounts) } + +func TestGetStatefulSetReplicasRange(t *testing.T) { + int32Ptr := func(i int32) *int32 { + return &i + } + + tests := []struct { + name string + statefulSet *appsv1beta1.StatefulSet + expectedCount int + expectedRes sets.Int + }{ + { + name: "Ordinals start 0", + statefulSet: &appsv1beta1.StatefulSet{ + Spec: appsv1beta1.StatefulSetSpec{ + Replicas: int32Ptr(4), + ReserveOrdinals: []int{1, 3}, + Ordinals: &appsv1beta1.StatefulSetOrdinals{ + Start: 0, + }, + }, + }, + expectedCount: 6, + expectedRes: sets.NewInt(1, 3), + }, + { + name: "Ordinals start 2 with ReserveOrdinals 1&3", + statefulSet: &appsv1beta1.StatefulSet{ + Spec: appsv1beta1.StatefulSetSpec{ + Replicas: int32Ptr(4), + ReserveOrdinals: []int{1, 3}, + Ordinals: &appsv1beta1.StatefulSetOrdinals{ + Start: 2, + }, + }, + }, + expectedCount: 5, + expectedRes: sets.NewInt(1, 3), + }, + { + name: "Ordinals start 3 with ReserveOrdinals 1&3", + statefulSet: &appsv1beta1.StatefulSet{ + Spec: appsv1beta1.StatefulSetSpec{ + Replicas: int32Ptr(4), + ReserveOrdinals: []int{1, 3}, + Ordinals: &appsv1beta1.StatefulSetOrdinals{ + Start: 3, + }, + }, + }, + expectedCount: 5, + expectedRes: sets.NewInt(1, 3), + }, + { + name: "Ordinals start 4 with ReserveOrdinals 1&3", + statefulSet: &appsv1beta1.StatefulSet{ + Spec: appsv1beta1.StatefulSetSpec{ + Replicas: int32Ptr(4), + ReserveOrdinals: []int{1, 3}, + Ordinals: &appsv1beta1.StatefulSetOrdinals{ + Start: 4, + }, + }, + }, + expectedCount: 4, + expectedRes: sets.NewInt(1, 3), + }, + // ... other test cases + } + defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + //count, res := getStatefulSetReplicasRange(tt.statefulSet) + startOrdinal, endOrdinal, res := getStatefulSetReplicasRange(tt.statefulSet) + count := endOrdinal - startOrdinal + if count != tt.expectedCount || len(res) != len(tt.expectedRes) || res.HasAll(tt.expectedRes.Len()) { + t.Errorf("getStatefulSetReplicasRange(%v) got (%v, %v), want (%v, %v)", + tt.name, count, res, tt.expectedCount, tt.expectedRes) + } + }) + } +} diff --git a/pkg/features/kruise_features.go b/pkg/features/kruise_features.go index a8f442a42f..6b70741e33 100644 --- a/pkg/features/kruise_features.go +++ b/pkg/features/kruise_features.go @@ -119,6 +119,9 @@ const ( // RecreatePodWhenChangeVCTInCloneSetGate recreate the pod upon changing volume claim templates in a clone set to ensure PVC consistency. RecreatePodWhenChangeVCTInCloneSetGate featuregate.Feature = "RecreatePodWhenChangeVCTInCloneSetGate" + + // Enables a StatefulSet to start from an arbitrary non zero ordinal + StatefulSetStartOrdinal featuregate.Feature = "StatefulSetStartOrdinal" ) var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ @@ -150,6 +153,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ EnhancedLivenessProbeGate: {Default: false, PreRelease: featuregate.Alpha}, RecreatePodWhenChangeVCTInCloneSetGate: {Default: false, PreRelease: featuregate.Alpha}, + StatefulSetStartOrdinal: {Default: false, PreRelease: featuregate.Alpha}, } func init() { diff --git a/pkg/webhook/statefulset/validating/statefulset_validation.go b/pkg/webhook/statefulset/validating/statefulset_validation.go index bc75cdca84..e6a9bd645d 100644 --- a/pkg/webhook/statefulset/validating/statefulset_validation.go +++ b/pkg/webhook/statefulset/validating/statefulset_validation.go @@ -324,9 +324,10 @@ func ValidateStatefulSetUpdate(statefulSet, oldStatefulSet *appsv1beta1.Stateful statefulSet.Spec.ReserveOrdinals = oldStatefulSet.Spec.ReserveOrdinals statefulSet.Spec.Lifecycle = oldStatefulSet.Spec.Lifecycle statefulSet.Spec.RevisionHistoryLimit = oldStatefulSet.Spec.RevisionHistoryLimit + statefulSet.Spec.Ordinals = oldStatefulSet.Spec.Ordinals if !apiequality.Semantic.DeepEqual(statefulSet.Spec, oldStatefulSet.Spec) { - allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to statefulset spec for fields other than 'replicas', 'template', 'reserveOrdinals', 'lifecycle', 'revisionHistoryLimit', 'persistentVolumeClaimRetentionPolicy', `volumeClaimTemplates` and 'updateStrategy' are forbidden")) + allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to statefulset spec for fields other than 'replicas', 'ordinals', 'template', 'reserveOrdinals', 'lifecycle', 'revisionHistoryLimit', 'persistentVolumeClaimRetentionPolicy', `volumeClaimTemplates` and 'updateStrategy' are forbidden")) } statefulSet.Spec.Replicas = restoreReplicas statefulSet.Spec.Template = restoreTemplate diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 3f8f54dbf1..377f41ab2e 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -21,17 +21,22 @@ import ( "context" "encoding/json" "fmt" + "reflect" "regexp" "strconv" "strings" "time" + "github.com/google/go-cmp/cmp" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" @@ -39,8 +44,6 @@ import ( imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" - "github.com/onsi/ginkgo" - "github.com/onsi/gomega" appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" @@ -1375,12 +1378,13 @@ var _ = SIGDescribe("StatefulSet", func() { err = verifyStatefulSetPVCsExist(c, ss, []int{0, 1, 2}) framework.ExpectNoError(err) - ginkgo.By("Orphaning the 3rd pod") + // why 3rd -> 2rd? patch 3rd pod maybe failed when pod has not been created + ginkgo.By("Orphaning the 2rd pod") patch, err := json.Marshal(metav1.ObjectMeta{ OwnerReferences: []metav1.OwnerReference{}, }) framework.ExpectNoError(err, "Could not Marshal JSON for patch payload") - _, err = c.CoreV1().Pods(ns).Patch(context.TODO(), fmt.Sprintf("%s-2", ss.Name), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "") + _, err = c.CoreV1().Pods(ns).Patch(context.TODO(), fmt.Sprintf("%s-1", ss.Name), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "") framework.ExpectNoError(err, "Could not patch payload") ginkgo.By("Scaling stateful set " + ss.Name + " to one replica") @@ -1392,6 +1396,284 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) }) }) + + ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + var statefulPodMounts []v1.VolumeMount + var ss *appsv1beta1.StatefulSet + + ginkgo.BeforeEach(func() { + statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels) + }) + + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + framework.Logf("Deleting all statefulset in ns %v", ns) + framework.DeleteAllStatefulSets(c, kc, ns) + }) + + //ginkgo.It("PVC should be recreated when pod is pending due to missing PVC", f.WithDisruptive(), f.WithSerial(), func() { + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC", func() { + ctx := context.TODO() + framework.SkipIfNoDefaultStorageClass(c) + + readyNode, err := framework.GetRandomReadySchedulableNode(ctx, c) + framework.ExpectNoError(err) + hostLabel := "kubernetes.io/hostname" + hostLabelVal := readyNode.Labels[hostLabel] + + ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + _, err = kc.AppsV1beta1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming PVC exists") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready") + sst := framework.NewStatefulSetTester(c, kc) + sst.WaitForStatusReadyReplicas(ss, 1) + podName := getStatefulSetPodNameAtIndex(0, ss) + pod, err := c.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + nodeName := pod.Spec.NodeName + gomega.Expect(nodeName).To(gomega.Equal(readyNode.Name)) + node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + oldData, err := json.Marshal(node) + framework.ExpectNoError(err) + + node.Spec.Unschedulable = true + + newData, err := json.Marshal(node) + framework.ExpectNoError(err) + + // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + framework.ExpectNoError(err) + ginkgo.By("Cordoning Node") + _, err = c.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + cordoned := true + + defer func() { + if cordoned { + uncordonNode(ctx, c, oldData, newData, nodeName) + } + }() + + // wait for the node to be unschedulable + framework.WaitForNodeSchedulable(ctx, c, nodeName, 10*time.Second, false) + + ginkgo.By("Deleting Pod") + err = c.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for the pod to be recreated + waitForStatusCurrentReplicas(ctx, c, kc, ss, 1) + _, err = c.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + framework.ExpectNoError(err) + gomega.Expect(pvcList.Items).To(gomega.HaveLen(1)) + pvcName := pvcList.Items[0].Name + + ginkgo.By("Deleting PVC") + err = c.CoreV1().PersistentVolumeClaims(ns).Delete(ctx, pvcName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + uncordonNode(ctx, c, oldData, newData, nodeName) + cordoned = false + + ginkgo.By("Confirming PVC recreated") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready after being recreated") + sst.WaitForStatusReadyReplicas(ss, 1) + pod, err = c.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(pod.Spec.NodeName).To(gomega.Equal(readyNode.Name)) // confirm the pod was scheduled back to the original node + }) + }) + + ginkgo.Describe("Scaling StatefulSetStartOrdinal", func() { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + var ss *appsv1beta1.StatefulSet + var sst *framework.StatefulSetTester + + ginkgo.BeforeEach(func() { + ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, nil, nil, labels) + + ginkgo.By("Creating service " + headlessSvcName + " in namespace " + ns) + headlessService := framework.CreateServiceSpec(headlessSvcName, "", true, labels) + _, err := c.CoreV1().Services(ns).Create(context.TODO(), headlessService, metav1.CreateOptions{}) + framework.ExpectNoError(err) + sst = framework.NewStatefulSetTester(c, kc) + }) + + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + framework.Logf("Deleting all statefulset in ns %v", ns) + framework.DeleteAllStatefulSets(c, kc, ns) + }) + + ginkgo.It("Setting .start.ordinal", func() { + ctx := context.TODO() + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + *(ss.Spec.Replicas) = 2 + _, err := kc.AppsV1beta1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + sst := framework.NewStatefulSetTester(c, kc) + waitForStatus(ctx, c, kc, ss) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + + ginkgo.By("Confirming 2 replicas, with start ordinal 0") + pods := sst.GetPodList(ss) + err = expectPodNames(pods, []string{"ss-0", "ss-1"}) + framework.ExpectNoError(err) + + ginkgo.By("Setting .spec.replicas = 3 .spec.ordinals.start = 2") + ss, err = updateStatefulSetWithRetries(ctx, kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) { + update.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 2, + } + *(update.Spec.Replicas) = 3 + }) + framework.ExpectNoError(err) + + // we need to ensure we wait for all the new ones to show up, not + // just for any random 3 + waitForStatus(ctx, c, kc, ss) + waitForPodNames(ctx, c, kc, ss, []string{"ss-2", "ss-3", "ss-4"}) + ginkgo.By("Confirming 3 replicas, with start ordinal 2") + sst.WaitForStatusReplicas(ss, 3) + sst.WaitForStatusReadyReplicas(ss, 3) + }) + + ginkgo.It("Increasing .start.ordinal", func() { + ctx := context.TODO() + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + *(ss.Spec.Replicas) = 2 + ss.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 2, + } + _, err := kc.AppsV1beta1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + waitForStatus(ctx, c, kc, ss) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + + ginkgo.By("Confirming 2 replicas, with start ordinal 2") + pods := sst.GetPodList(ss) + err = expectPodNames(pods, []string{"ss-2", "ss-3"}) + framework.ExpectNoError(err) + + ginkgo.By("Increasing .spec.ordinals.start = 4") + ss, err = updateStatefulSetWithRetries(ctx, kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) { + update.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 4, + } + }) + framework.ExpectNoError(err) + + // since we are replacing 2 pods for 2, we need to ensure we wait + // for the new ones to show up, not just for any random 2 + ginkgo.By("Confirming 2 replicas, with start ordinal 4") + waitForStatus(ctx, c, kc, ss) + waitForPodNames(ctx, c, kc, ss, []string{"ss-4", "ss-5"}) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + }) + + ginkgo.It("Decreasing .start.ordinal", func() { + ctx := context.TODO() + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + *(ss.Spec.Replicas) = 2 + ss.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 3, + } + _, err := kc.AppsV1beta1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + waitForStatus(ctx, c, kc, ss) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + + ginkgo.By("Confirming 2 replicas, with start ordinal 3") + pods := sst.GetPodList(ss) + err = expectPodNames(pods, []string{"ss-3", "ss-4"}) + framework.ExpectNoError(err) + + ginkgo.By("Decreasing .spec.ordinals.start = 2") + ss, err = updateStatefulSetWithRetries(ctx, kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) { + update.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 2, + } + }) + framework.ExpectNoError(err) + + // since we are replacing 2 pods for 2, we need to ensure we wait + // for the new ones to show up, not just for any random 2 + ginkgo.By("Confirming 2 replicas, with start ordinal 2") + waitForStatus(ctx, c, kc, ss) + waitForPodNames(ctx, c, kc, ss, []string{"ss-2", "ss-3"}) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + }) + + ginkgo.It("Removing .start.ordinal", func() { + ctx := context.TODO() + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + *(ss.Spec.Replicas) = 2 + ss.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{ + Start: 3, + } + _, err := kc.AppsV1beta1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + + ginkgo.By("Confirming 2 replicas, with start ordinal 3") + pods := sst.GetPodList(ss) + err = expectPodNames(pods, []string{"ss-3", "ss-4"}) + framework.ExpectNoError(err) + + ginkgo.By("Removing .spec.ordinals") + ss, err = updateStatefulSetWithRetries(ctx, kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) { + update.Spec.Ordinals = nil + }) + framework.ExpectNoError(err) + + // since we are replacing 2 pods for 2, we need to ensure we wait + // for the new ones to show up, not just for any random 2 + framework.Logf("Confirming 2 replicas, with start ordinal 0") + waitForStatus(ctx, c, kc, ss) + waitForPodNames(ctx, c, kc, ss, []string{"ss-0", "ss-1"}) + sst.WaitForStatusReplicas(ss, 2) + sst.WaitForStatusReadyReplicas(ss, 2) + }) + }) }) func kubectlExecWithRetries(args ...string) (out string) { @@ -1829,3 +2111,112 @@ func verifyStatefulSetPVCsExistWithOwnerRefs(c clientset.Interface, kc kruisecli return true, nil }) } + +// getStatefulSetPodNameAtIndex gets formatted pod name given index. +func getStatefulSetPodNameAtIndex(index int, ss *appsv1beta1.StatefulSet) string { + // TODO: we won't use "-index" as the name strategy forever, + // pull the name out from an identity mapper. + return fmt.Sprintf("%v-%v", ss.Name, index) +} + +func uncordonNode(ctx context.Context, c clientset.Interface, oldData, newData []byte, nodeName string) { + ginkgo.By("Uncordoning Node") + // uncordon node, by reverting patch + revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) + framework.ExpectNoError(err) + _, err = c.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) +} + +// waitForStatus waits for the StatefulSetStatus's CurrentReplicas to be equal to expectedReplicas +// The returned StatefulSet contains such a StatefulSetStatus +func waitForStatusCurrentReplicas(ctx context.Context, c clientset.Interface, kc kruiseclientset.Interface, set *appsv1beta1.StatefulSet, expectedReplicas int32) *appsv1beta1.StatefulSet { + sst := framework.NewStatefulSetTester(c, kc) + sst.WaitForState(set, func(set2 *appsv1beta1.StatefulSet, pods *v1.PodList) (bool, error) { + if set2.Status.ObservedGeneration >= set.Generation && set2.Status.CurrentReplicas == expectedReplicas { + set = set2 + return true, nil + } + return false, nil + }) + return set +} + +// waitForStatus waits for the StatefulSetStatus's ObservedGeneration to be greater than or equal to set's Generation. +// The returned StatefulSet contains such a StatefulSetStatus +func waitForStatus(ctx context.Context, c clientset.Interface, kc kruiseclientset.Interface, set *appsv1beta1.StatefulSet) *appsv1beta1.StatefulSet { + sst := framework.NewStatefulSetTester(c, kc) + sst.WaitForState(set, func(set2 *appsv1beta1.StatefulSet, pods *v1.PodList) (bool, error) { + if set2.Status.ObservedGeneration >= set.Generation { + set = set2 + return true, nil + } + return false, nil + }) + return set +} + +// waitForPodNames waits for the StatefulSet's pods to match expected names. +func waitForPodNames(ctx context.Context, c clientset.Interface, kc kruiseclientset.Interface, set *appsv1beta1.StatefulSet, expectedPodNames []string) { + sst := framework.NewStatefulSetTester(c, kc) + sst.WaitForState(set, + func(intSet *appsv1beta1.StatefulSet, pods *v1.PodList) (bool, error) { + if err := expectPodNames(pods, expectedPodNames); err != nil { + framework.Logf("Currently %v", err) + return false, nil + } + return true, nil + }) +} + +// expectPodNames compares the names of the pods from actualPods with expectedPodNames. +// actualPods can be in any list, since we'll sort by their ordinals and filter +// active ones. expectedPodNames should be ordered by statefulset ordinals. +func expectPodNames(actualPods *v1.PodList, expectedPodNames []string) error { + framework.SortStatefulPods(actualPods) + pods := []string{} + for _, pod := range actualPods.Items { + // ignore terminating pods, similarly to how the controller does it + // when calculating status information + if IsPodActive(&pod) { + pods = append(pods, pod.Name) + } + } + if !reflect.DeepEqual(expectedPodNames, pods) { + diff := cmp.Diff(expectedPodNames, pods) + return fmt.Errorf("pod names don't match, diff (- for expected, + for actual):\n%s", diff) + } + return nil +} + +// IsPodActive return true if the pod meets certain conditions. +func IsPodActive(p *v1.Pod) bool { + return v1.PodSucceeded != p.Status.Phase && + v1.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil +} + +type updateStatefulSetFunc func(*appsv1beta1.StatefulSet) + +// updateStatefulSetWithRetries updates statefulset template with retries. +func updateStatefulSetWithRetries(ctx context.Context, kc kruiseclientset.Interface, namespace, name string, applyUpdate updateStatefulSetFunc) (statefulSet *appsv1beta1.StatefulSet, err error) { + statefulSets := kc.AppsV1beta1().StatefulSets(namespace) + var updateErr error + pollErr := wait.PollWithContext(ctx, 10*time.Millisecond, 1*time.Minute, func(ctx context.Context) (bool, error) { + if statefulSet, err = statefulSets.Get(ctx, name, metav1.GetOptions{}); err != nil { + return false, err + } + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(statefulSet) + if statefulSet, err = statefulSets.Update(ctx, statefulSet, metav1.UpdateOptions{}); err == nil { + framework.Logf("Updating stateful set %s", name) + return true, nil + } + updateErr = err + return false, nil + }) + if wait.Interrupted(pollErr) { + pollErr = fmt.Errorf("couldn't apply the provided updated to stateful set %q: %v", name, updateErr) + } + return statefulSet, pollErr +} diff --git a/test/e2e/framework/ginkgowrapper.go b/test/e2e/framework/ginkgowrapper.go new file mode 100644 index 0000000000..7666923d55 --- /dev/null +++ b/test/e2e/framework/ginkgowrapper.go @@ -0,0 +1,37 @@ +package framework + +type label struct { + // parts get concatenated with ":" to build the full label. + parts []string + // extra is an optional fully-formed extra label. + extra string + // explanation gets set for each label to help developers + // who pass a label to a ginkgo function. They need to use + // the corresponding framework function instead. + explanation string +} + +func newLabel(parts ...string) label { + return label{ + parts: parts, + explanation: "If you see this as part of an 'Unknown Decorator' error from Ginkgo, then you need to replace the ginkgo.It/Context/Describe call with the corresponding framework.It/Context/Describe or (if available) f.It/Context/Describe.", + } +} + +// WithDisruptive is a shorthand for the corresponding package function. +func (f *Framework) WithDisruptive() interface{} { + return withDisruptive() +} + +func withDisruptive() interface{} { + return newLabel("Disruptive") +} + +// WithSerial is a shorthand for the corresponding package function. +func (f *Framework) WithSerial() interface{} { + return withSerial() +} + +func withSerial() interface{} { + return newLabel("Serial") +} diff --git a/test/e2e/framework/node_util.go b/test/e2e/framework/node_util.go index 830ba5a054..63c0dcdeee 100644 --- a/test/e2e/framework/node_util.go +++ b/test/e2e/framework/node_util.go @@ -18,6 +18,9 @@ package framework import ( "context" + "fmt" + "math/rand" + "strings" "time" "github.com/onsi/gomega" @@ -25,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" v1helper "k8s.io/component-helpers/scheduling/corev1" @@ -189,3 +193,196 @@ func (t *NodeTester) ListRealNodesWithFake(tolerations []v1.Toleration) ([]*v1.N } return nodes, nil } + +const ( + // poll is how often to Poll pods, nodes and claims. + poll = 2 * time.Second + + // singleCallTimeout is how long to try single API calls (like 'get' or 'list'). Used to prevent + // transient failures from failing tests. + singleCallTimeout = 5 * time.Minute +) + +var ( + // unreachableTaintTemplate is the taint for when a node becomes unreachable. + // Copied from pkg/controller/nodelifecycle to avoid pulling extra dependencies + unreachableTaintTemplate = &v1.Taint{ + Key: v1.TaintNodeUnreachable, + Effect: v1.TaintEffectNoExecute, + } + + // notReadyTaintTemplate is the taint for when a node is not ready for executing pods. + // Copied from pkg/controller/nodelifecycle to avoid pulling extra dependencies + notReadyTaintTemplate = &v1.Taint{ + Key: v1.TaintNodeNotReady, + Effect: v1.TaintEffectNoExecute, + } + + // updateTaintBackOff contains the maximum retries and the wait interval between two retries. + updateTaintBackOff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Jitter: 1.0, + } +) + +// checkWaitListSchedulableNodes is a wrapper around listing nodes supporting retries. +func checkWaitListSchedulableNodes(ctx context.Context, c clientset.Interface) (*v1.NodeList, error) { + nodes, err := waitListSchedulableNodes(c) + if err != nil { + return nil, fmt.Errorf("error: %s. Non-retryable failure or timed out while listing nodes for e2e cluster", err) + } + return nodes, nil +} + +// Filter filters nodes in NodeList in place, removing nodes that do not +// satisfy the given condition +func Filter(nodeList *v1.NodeList, fn func(node v1.Node) bool) { + var l []v1.Node + + for _, node := range nodeList.Items { + if fn(node) { + l = append(l, node) + } + } + nodeList.Items = l +} + +// IsConditionSetAsExpected returns a wantTrue value if the node has a match to the conditionType, otherwise returns an opposite value of the wantTrue with detailed logging. +func IsConditionSetAsExpected(node *v1.Node, conditionType v1.NodeConditionType, wantTrue bool) bool { + return isNodeConditionSetAsExpected(node, conditionType, wantTrue, false) +} + +// IsConditionSetAsExpectedSilent returns a wantTrue value if the node has a match to the conditionType, otherwise returns an opposite value of the wantTrue. +func IsConditionSetAsExpectedSilent(node *v1.Node, conditionType v1.NodeConditionType, wantTrue bool) bool { + return isNodeConditionSetAsExpected(node, conditionType, wantTrue, true) +} + +// isConditionUnset returns true if conditions of the given node do not have a match to the given conditionType, otherwise false. +func isConditionUnset(node *v1.Node, conditionType v1.NodeConditionType) bool { + for _, cond := range node.Status.Conditions { + if cond.Type == conditionType { + return false + } + } + return true +} + +// IsNodeReady returns true if: +// 1) it's Ready condition is set to true +// 2) doesn't have NetworkUnavailable condition set to true +func IsNodeReady(node *v1.Node) bool { + nodeReady := IsConditionSetAsExpected(node, v1.NodeReady, true) + networkReady := isConditionUnset(node, v1.NodeNetworkUnavailable) || + IsConditionSetAsExpectedSilent(node, v1.NodeNetworkUnavailable, false) + return nodeReady && networkReady +} + +// IsNodeSchedulable returns true if: +// 1) doesn't have "unschedulable" field set +// 2) it also returns true from IsNodeReady +func IsNodeSchedulable(node *v1.Node) bool { + if node == nil { + return false + } + return !node.Spec.Unschedulable && IsNodeReady(node) +} + +func toleratesTaintsWithNoScheduleNoExecuteEffects(taints []v1.Taint, tolerations []v1.Toleration) bool { + filteredTaints := []v1.Taint{} + for _, taint := range taints { + if taint.Effect == v1.TaintEffectNoExecute || taint.Effect == v1.TaintEffectNoSchedule { + filteredTaints = append(filteredTaints, taint) + } + } + + toleratesTaint := func(taint v1.Taint) bool { + for _, toleration := range tolerations { + if toleration.ToleratesTaint(&taint) { + return true + } + } + + return false + } + + for _, taint := range filteredTaints { + if !toleratesTaint(taint) { + return false + } + } + + return true +} + +// isNodeUntaintedWithNonblocking tests whether a fake pod can be scheduled on "node" +// but allows for taints in the list of non-blocking taints. +func isNodeUntaintedWithNonblocking(node *v1.Node, nonblockingTaints string) bool { + // Simple lookup for nonblocking taints based on comma-delimited list. + nonblockingTaintsMap := map[string]struct{}{} + for _, t := range strings.Split(nonblockingTaints, ",") { + if strings.TrimSpace(t) != "" { + nonblockingTaintsMap[strings.TrimSpace(t)] = struct{}{} + } + } + + n := node + if len(nonblockingTaintsMap) > 0 { + nodeCopy := node.DeepCopy() + nodeCopy.Spec.Taints = []v1.Taint{} + for _, v := range node.Spec.Taints { + if _, isNonblockingTaint := nonblockingTaintsMap[v.Key]; !isNonblockingTaint { + nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints, v) + } + } + n = nodeCopy + } + + return toleratesTaintsWithNoScheduleNoExecuteEffects(n.Spec.Taints, nil) +} + +// GetReadySchedulableNodes addresses the common use case of getting nodes you can do work on. +// 1) Needs to be schedulable. +// 2) Needs to be ready. +// If EITHER 1 or 2 is not true, most tests will want to ignore the node entirely. +// If there are no nodes that are both ready and schedulable, this will return an error. +func GetReadySchedulableNodes(ctx context.Context, c clientset.Interface) (nodes *v1.NodeList, err error) { + nodes, err = checkWaitListSchedulableNodes(ctx, c) + if err != nil { + return nil, fmt.Errorf("listing schedulable nodes error: %w", err) + } + Filter(nodes, func(node v1.Node) bool { + return IsNodeSchedulable(&node) && isNodeUntainted(&node) + }) + if len(nodes.Items) == 0 { + return nil, fmt.Errorf("there are currently no ready, schedulable nodes in the cluster") + } + return nodes, nil +} + +// GetRandomReadySchedulableNode gets a single randomly-selected node which is available for +// running pods on. If there are no available nodes it will return an error. +func GetRandomReadySchedulableNode(ctx context.Context, c clientset.Interface) (*v1.Node, error) { + nodes, err := GetReadySchedulableNodes(ctx, c) + if err != nil { + return nil, err + } + return &nodes.Items[rand.Intn(len(nodes.Items))], nil +} + +func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool { + Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + Logf("Couldn't get node %s", name) + continue + } + + if IsNodeSchedulable(node) == wantSchedulable { + return true + } + } + Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout) + return false +} diff --git a/test/e2e/framework/statefulset_utils.go b/test/e2e/framework/statefulset_utils.go index decc8f131a..296bdb88e2 100644 --- a/test/e2e/framework/statefulset_utils.go +++ b/test/e2e/framework/statefulset_utils.go @@ -30,9 +30,6 @@ import ( "github.com/onsi/gomega" - appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/test/e2e/manifest" apps "k8s.io/api/apps/v1" appsV1beta2 "k8s.io/api/apps/v1beta2" v1 "k8s.io/api/core/v1" @@ -46,6 +43,10 @@ import ( clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" imageutils "k8s.io/kubernetes/test/utils/image" + + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/test/e2e/manifest" ) const ( @@ -699,6 +700,10 @@ func (s *StatefulSetTester) SortStatefulPods(pods *v1.PodList) { sort.Sort(statefulPodsByOrdinal(pods.Items)) } +func SortStatefulPods(pods *v1.PodList) { + sort.Sort(statefulPodsByOrdinal(pods.Items)) +} + // DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns. func DeleteAllStatefulSets(c clientset.Interface, kc kruiseclientset.Interface, ns string) { sst := &StatefulSetTester{c: c, kc: kc}