Skip to content

Commit

Permalink
add start ordinal and e2e case
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Jun 13, 2024
1 parent 0e69ed4 commit f8bffff
Show file tree
Hide file tree
Showing 16 changed files with 1,001 additions and 27 deletions.
26 changes: 25 additions & 1 deletion apis/apps/v1beta1/statefulset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package v1beta1

import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

appspub "github.com/openkruise/kruise/apis/apps/pub"
)

const (
Expand Down Expand Up @@ -143,6 +144,21 @@ type StatefulSetPersistentVolumeClaimRetentionPolicy struct {
WhenScaled PersistentVolumeClaimRetentionPolicyType `json:"whenScaled,omitempty"`
}

// StatefulSetOrdinals describes the policy used for replica ordinal assignment
// in this StatefulSet.
type StatefulSetOrdinals struct {
// start is the number representing the first replica's index. It may be used
// to number replicas from an alternate index (eg: 1-indexed) over the default
// 0-indexed names, or to orchestrate progressive movement of replicas from
// one StatefulSet to another.
// If set, replica indices will be in the range:
// [.spec.ordinals.start, .spec.ordinals.start + .spec.replicas).
// If unset, defaults to 0. Replica indices will be in the range:
// [0, .spec.replicas).
// +optional
Start int32 `json:"start" protobuf:"varint,1,opt,name=start"`
}

// StatefulSetSpec defines the desired state of StatefulSet
type StatefulSetSpec struct {
// replicas is the desired number of replicas of the given Template.
Expand Down Expand Up @@ -228,6 +244,14 @@ type StatefulSetSpec struct {
// StatefulSetAutoDeletePVC feature gate to be enabled, which is alpha.
// +optional
PersistentVolumeClaimRetentionPolicy *StatefulSetPersistentVolumeClaimRetentionPolicy `json:"persistentVolumeClaimRetentionPolicy,omitempty"`

// ordinals controls the numbering of replica indices in a StatefulSet. The
// default ordinals behavior assigns a "0" index to the first replica and
// increments the index by one for each additional replica requested. Using
// the ordinals field requires the StatefulSetStartOrdinal feature gate to be
// enabled, which is beta.
// +optional
Ordinals *StatefulSetOrdinals `json:"ordinals,omitempty" protobuf:"bytes,11,opt,name=ordinals"`
}

// StatefulSetScaleStrategy defines strategies for pods scale.
Expand Down
20 changes: 20 additions & 0 deletions apis/apps/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions config/crd/bases/apps.kruise.io_statefulsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,27 @@ spec:
type: boolean
type: object
type: object
ordinals:
description: |-
ordinals controls the numbering of replica indices in a StatefulSet. The
default ordinals behavior assigns a "0" index to the first replica and
increments the index by one for each additional replica requested. Using
the ordinals field requires the StatefulSetStartOrdinal feature gate to be
enabled, which is beta.
properties:
start:
description: |-
start is the number representing the first replica's index. It may be used
to number replicas from an alternate index (eg: 1-indexed) over the default
0-indexed names, or to orchestrate progressive movement of replicas from
one StatefulSet to another.
If set, replica indices will be in the range:
[.spec.ordinals.start, .spec.ordinals.start + .spec.replicas).
If unset, defaults to 0. Replica indices will be in the range:
[0, .spec.replicas).
format: int32
type: integer
type: object
persistentVolumeClaimRetentionPolicy:
description: |-
PersistentVolumeClaimRetentionPolicy describes the policy used for PVCs created from
Expand Down
21 changes: 21 additions & 0 deletions config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,27 @@ spec:
type: boolean
type: object
type: object
ordinals:
description: |-
ordinals controls the numbering of replica indices in a StatefulSet. The
default ordinals behavior assigns a "0" index to the first replica and
increments the index by one for each additional replica requested. Using
the ordinals field requires the StatefulSetStartOrdinal feature gate to be
enabled, which is beta.
properties:
start:
description: |-
start is the number representing the first replica's index. It may be used
to number replicas from an alternate index (eg: 1-indexed) over the default
0-indexed names, or to orchestrate progressive movement of replicas from
one StatefulSet to another.
If set, replica indices will be in the range:
[.spec.ordinals.start, .spec.ordinals.start + .spec.replicas).
If unset, defaults to 0. Replica indices will be in the range:
[0, .spec.replicas).
format: int32
type: integer
type: object
persistentVolumeClaimRetentionPolicy:
description: |-
PersistentVolumeClaimRetentionPolicy describes the policy used for PVCs created from
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/statefulset/stateful_pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *appsv1beta1.St
}
}

// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy
func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *appsv1beta1.StatefulSet, pod *v1.Pod) error {
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
return err

Check warning on line 321 in pkg/controller/statefulset/stateful_pod_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_pod_control.go#L321

Added line #L321 was not covered by tests
}

if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Set PVC policy as much as is possible at this point.
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err

Check warning on line 328 in pkg/controller/statefulset/stateful_pod_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_pod_control.go#L327-L328

Added lines #L327 - L328 were not covered by tests
}
}
return nil
}

// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
Expand Down
40 changes: 33 additions & 7 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import (
"github.com/openkruise/kruise/pkg/util/lifecycle"
)

// Realistic value for maximum in-flight requests when processing in parallel mode.
const MaxBatchSize = 500

// StatefulSetControlInterface implements the control logic for updating StatefulSets and their children Pods. It is implemented
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
type StatefulSetControlInterface interface {
Expand Down Expand Up @@ -424,26 +427,27 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
}

if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount && !reserveOrdinals.Has(ord) {
if ord := getOrdinal(pods[i]); podInOrdinalRange(pods[i], set, replicaCount) && !reserveOrdinals.Has(ord) {
// if the ordinal of the pod is within the range of the current number of replicas and not in reserveOrdinals,
// insert it at the indirection of its ordinal
replicas[ord] = pods[i]
replicas[ord-getStartOrdinal(set)] = pods[i]

} else if ord >= replicaCount || reserveOrdinals.Has(ord) {
// if the ordinal is greater than the number of replicas or in reserveOrdinals,
} else if ord >= 0 || reserveOrdinals.Has(ord) {
// if the ordinal is valid, but not within the range or in reserveOrdinals,
// add it to the condemned list
condemned = append(condemned, pods[i])
}
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}

// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
for ord := getStartOrdinal(set); ord < replicaCount+getStartOrdinal(set); ord++ {
if reserveOrdinals.Has(ord) {
continue
}
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
replicaIdx := ord - getStartOrdinal(set)
if replicas[replicaIdx] == nil {
replicas[replicaIdx] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
Expand Down Expand Up @@ -556,6 +560,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// pod created, no more work possible for this round
continue
}

// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
klog.V(4).Info(
"StatefulSet is triggering PVC creation for pending Pod",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return &status, err

Check warning on line 570 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L570

Added line #L570 was not covered by tests
}
}

// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
Expand Down Expand Up @@ -1039,3 +1054,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(

return nil
}

// getStartOrdinal gets the first possible ordinal (inclusive).
// Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0.
func getStartOrdinal(set *appsv1beta1.StatefulSet) int {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetStartOrdinal) {
if set.Spec.Ordinals != nil {
return int(set.Spec.Ordinals.Start)
}
}
return 0
}
110 changes: 110 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3671,3 +3671,113 @@ func isOrHasInternalError(err error) bool {
agg, ok := err.(utilerrors.Aggregate)
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
}

func emptyInvariants(set *appsv1beta1.StatefulSet, om *fakeObjectManager) error {
return nil
}

func TestStatefulSetControlWithStartOrdinal(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()

simpleSetFn := func(replicas, startOrdinal int, reservedIds ...int) *appsv1beta1.StatefulSet {
statefulSet := newStatefulSet(replicas)
statefulSet.Spec.Ordinals = &appsv1beta1.StatefulSetOrdinals{Start: int32(startOrdinal)}
statefulSet.Spec.ReserveOrdinals = append([]int{}, reservedIds...)
return statefulSet
}

testCases := []struct {
fn func(*testing.T, *appsv1beta1.StatefulSet, invariantFunc, []int)
obj func() *appsv1beta1.StatefulSet
expectedIds []int
}{
{
CreatesPodsWithStartOrdinal,
func() *appsv1beta1.StatefulSet {
return simpleSetFn(3, 2)
},
[]int{2, 3, 4},
},
{
CreatesPodsWithStartOrdinal,
func() *appsv1beta1.StatefulSet {
return simpleSetFn(3, 2, 0, 4)
},
[]int{2, 3, 5},
},
{
CreatesPodsWithStartOrdinal,
func() *appsv1beta1.StatefulSet {
return simpleSetFn(3, 2, 0, 2, 3, 4, 5)
},
[]int{6, 7, 8},
},
{
CreatesPodsWithStartOrdinal,
func() *appsv1beta1.StatefulSet {
return simpleSetFn(4, 1)
},
[]int{1, 2, 3, 4},
},
{
CreatesPodsWithStartOrdinal,
func() *appsv1beta1.StatefulSet {
return simpleSetFn(4, 1, 1, 3, 4)
},
[]int{2, 5, 6, 7},
},
}

for _, testCase := range testCases {
testObj := testCase.obj
testFn := testCase.fn

set := testObj()
testFn(t, set, emptyInvariants, testCase.expectedIds)
}
}

func CreatesPodsWithStartOrdinal(t *testing.T, set *appsv1beta1.StatefulSet, invariants invariantFunc, expectedIds []int) {
client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
om, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)

if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != *set.Spec.Replicas {
t.Errorf("Failed to scale statefulset to %d replicas", *set.Spec.Replicas)
}
if set.Status.ReadyReplicas != *set.Spec.Replicas {
t.Errorf("Failed to set ReadyReplicas correctly, expected %d", *set.Spec.Replicas)
}
if set.Status.UpdatedReplicas != *set.Spec.Replicas {
t.Errorf("Failed to set UpdatedReplicas correctly, expected %d", *set.Spec.Replicas)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(expectedIds) != len(pods) {
t.Errorf("Expected %d pods. Got %d", len(expectedIds), len(pods))
return
}
for i, pod := range pods {
expectedOrdinal := expectedIds[i]
actualPodOrdinal := getOrdinal(pod)
if actualPodOrdinal != expectedOrdinal {
t.Errorf("Expected pod ordinal %d. Got %d", expectedOrdinal, actualPodOrdinal)
}
}
}
Loading

0 comments on commit f8bffff

Please sign in to comment.