Skip to content

Commit

Permalink
workloadspread support statefulset (openkruise#1056)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>

Signed-off-by: mingzhou.swx <[email protected]>
Co-authored-by: mingzhou.swx <[email protected]>
Signed-off-by: Liu Zhenwei <[email protected]>
  • Loading branch information
2 people authored and diannaowa committed Sep 14, 2022
1 parent 2cb71ed commit 22341bb
Show file tree
Hide file tree
Showing 10 changed files with 1,186 additions and 180 deletions.
12 changes: 12 additions & 0 deletions pkg/controller/workloadspread/update_pod_deletion_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread,
podMap map[string][]*corev1.Pod,
workloadReplicas int32) error {
targetRef := ws.Spec.TargetReference
if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) {
return nil
}
// update Pod's deletion-cost annotation in each subset
for idx, subset := range ws.Spec.Subsets {
if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil {
Expand Down Expand Up @@ -203,3 +207,11 @@ func sortDeleteIndexes(pods []*corev1.Pod) []int {

return waitDeleteIndexes
}

func isEffectiveKindForDeletionCost(targetRef *appsv1alpha1.TargetReference) bool {
switch targetRef.Kind {
case controllerKindRS.Kind, controllerKindDep.Kind, controllerKruiseKindCS.Kind:
return true
}
return false
}
14 changes: 8 additions & 6 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ const (
)

var (
controllerKruiseKindWS = appsv1alpha1.SchemeGroupVersion.WithKind("WorkloadSpread")
controllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
controllerKindRS = appsv1.SchemeGroupVersion.WithKind("ReplicaSet")
controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment")
controllerKindJob = batchv1.SchemeGroupVersion.WithKind("Job")
controllerKruiseKindWS = appsv1alpha1.SchemeGroupVersion.WithKind("WorkloadSpread")
controllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
controllerKruiseKindSts = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet")
controllerKindSts = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
controllerKindRS = appsv1.SchemeGroupVersion.WithKind("ReplicaSet")
controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment")
controllerKindJob = batchv1.SchemeGroupVersion.WithKind("Job")
)

// this is a short cut for any sub-functions to notify the reconcile how long to wait to requeue
Expand Down Expand Up @@ -269,7 +271,7 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work
targetRef := ws.Spec.TargetReference

switch targetRef.Kind {
case controllerKindDep.Kind, controllerKindRS.Kind, controllerKruiseKindCS.Kind:
case controllerKindDep.Kind, controllerKindRS.Kind, controllerKruiseKindCS.Kind, controllerKindSts.Kind:
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
case controllerKindJob.Kind:
pods, workloadReplicas, err = r.getPodJob(targetRef, ws.Namespace)
Expand Down
27 changes: 20 additions & 7 deletions pkg/controller/workloadspread/workloadspread_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
wsutil "github.com/openkruise/kruise/pkg/util/workloadspread"
)

Expand Down Expand Up @@ -100,9 +101,9 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
var newReplicas int32

switch evt.ObjectNew.(type) {
case *appsalphav1.CloneSet:
oldReplicas = *evt.ObjectOld.(*appsalphav1.CloneSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsalphav1.CloneSet).Spec.Replicas
case *appsv1alpha1.CloneSet:
oldReplicas = *evt.ObjectOld.(*appsv1alpha1.CloneSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1alpha1.CloneSet).Spec.Replicas
gvk = controllerKruiseKindCS
case *appsv1.Deployment:
oldReplicas = *evt.ObjectOld.(*appsv1.Deployment).Spec.Replicas
Expand All @@ -116,6 +117,14 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
oldReplicas = *evt.ObjectOld.(*batchv1.Job).Spec.Parallelism
newReplicas = *evt.ObjectNew.(*batchv1.Job).Spec.Parallelism
gvk = controllerKindJob
case *appsv1.StatefulSet:
oldReplicas = *evt.ObjectOld.(*appsv1.StatefulSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1.StatefulSet).Spec.Replicas
gvk = controllerKindSts
case *appsv1beta1.StatefulSet:
oldReplicas = *evt.ObjectOld.(*appsv1beta1.StatefulSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1beta1.StatefulSet).Spec.Replicas
gvk = controllerKruiseKindSts
default:
return
}
Expand Down Expand Up @@ -152,14 +161,18 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
obj client.Object, action EventAction) {
var gvk schema.GroupVersionKind
switch obj.(type) {
case *appsalphav1.CloneSet:
case *appsv1alpha1.CloneSet:
gvk = controllerKruiseKindCS
case *appsv1.Deployment:
gvk = controllerKindDep
case *appsv1.ReplicaSet:
gvk = controllerKindRS
case *batchv1.Job:
gvk = controllerKindJob
case *appsv1.StatefulSet:
gvk = controllerKindSts
case *appsv1beta1.StatefulSet:
gvk = controllerKruiseKindSts
default:
return
}
Expand All @@ -184,8 +197,8 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,

func (w *workloadEventHandler) getWorkloadSpreadForWorkload(
workloadNamespaceName types.NamespacedName,
gvk schema.GroupVersionKind) (*appsalphav1.WorkloadSpread, error) {
wsList := &appsalphav1.WorkloadSpreadList{}
gvk schema.GroupVersionKind) (*appsv1alpha1.WorkloadSpread, error) {
wsList := &appsv1alpha1.WorkloadSpreadList{}
listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace}
if err := w.List(context.TODO(), wsList, listOptions); err != nil {
klog.Errorf("List WorkloadSpread failed: %s", err.Error())
Expand Down
Loading

0 comments on commit 22341bb

Please sign in to comment.