Skip to content

Commit

Permalink
Merge pull request #4078 from chaosi-zju/retain-2
Browse files Browse the repository at this point in the history
retain for hpa controlled Deployment resource (labels method)
  • Loading branch information
karmada-bot authored Sep 21, 2023
2 parents 68f4ac7 + a4828cc commit 681515f
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 32 deletions.
1 change: 1 addition & 0 deletions artifacts/deploy/karmada-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec:
- --cluster-status-update-frequency=10s
- --secure-port=10357
- --failover-eviction-timeout=30s
- --controllers=*,hpaReplicasSyncer
- --feature-gates=PropagationPolicyPreemption=true
- --v=4
livenessProbe:
Expand Down
7 changes: 4 additions & 3 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,10 @@ func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled b
}

hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{
Client: ctx.Mgr.GetClient(),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient,
Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient,
}
err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
Expand Down
53 changes: 25 additions & 28 deletions pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,45 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var hpaPredicate = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}
"github.com/karmada-io/karmada/pkg/util"
)

return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
}
const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "hpa-replicas-syncer"
// scaleRefWorkerNum is the async Worker number
scaleRefWorkerNum = 1
)

// HPAReplicasSyncer is to sync replicas from status of HPA to resource template.
type HPAReplicasSyncer struct {
Client client.Client
RESTMapper meta.RESTMapper
ScaleClient scale.ScalesGetter
Client client.Client
DynamicClient dynamic.Interface
RESTMapper meta.RESTMapper

ScaleClient scale.ScalesGetter
scaleRefWorker util.AsyncWorker
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer").
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(hpaPredicate)).
scaleRefWorkerOptions := util.Options{
Name: "scale ref worker",
ReconcileFunc: r.reconcileScaleRef,
}
r.scaleRefWorker = util.NewAsyncWorker(scaleRefWorkerOptions)
r.scaleRefWorker.Run(scaleRefWorkerNum, context.Background().Done())

return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)).
Complete(r)
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package hpareplicassyncer

import (
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
)

var _ predicate.Predicate = &HPAReplicasSyncer{}

func (r *HPAReplicasSyncer) Create(e event.CreateEvent) bool {
hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("create predicates in hpa controller called, but obj is not hpa type")
return false
}

// if hpa exist and has been propagated, add label to its scale ref resource
if hasBeenPropagated(hpa) {
r.scaleRefWorker.Add(labelEvent{addLabelEvent, hpa})
}

return false
}

func (r *HPAReplicasSyncer) Update(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("update predicates in hpa controller called, but old obj is not hpa type")
return false
}

newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("update predicates in hpa controller called, but new obj is not hpa type")
return false
}

// hpa scale ref changed, remove old hpa label and add to new hpa
if oldHPA.Spec.ScaleTargetRef.String() != newHPA.Spec.ScaleTargetRef.String() {
// if scale ref has label, remove label, otherwise skip
r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, oldHPA})
}

// if new hpa exist and has been propagated, add label to its scale ref resource
if hasBeenPropagated(newHPA) {
r.scaleRefWorker.Add(labelEvent{addLabelEvent, newHPA})
}

return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
}

func (r *HPAReplicasSyncer) Delete(e event.DeleteEvent) bool {
hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
klog.Errorf("delete predicates in hpa controller called, but obj is not hpa type")
return false
}

// if scale ref has label, remove label, otherwise skip
r.scaleRefWorker.Add(labelEvent{deleteLabelEvent, hpa})

return false
}

func (r *HPAReplicasSyncer) Generic(e event.GenericEvent) bool {
return false
}

func hasBeenPropagated(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
_, ppExist := hpa.GetLabels()[policyv1alpha1.PropagationPolicyUIDLabel]
_, cppExist := hpa.GetLabels()[policyv1alpha1.ClusterPropagationPolicyUIDLabel]
return ppExist || cppExist
}
137 changes: 137 additions & 0 deletions pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package hpareplicassyncer

import (
"context"
"fmt"

autoscalingv2 "k8s.io/api/autoscaling/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)

type labelEventKind int

const (
// addLabelEvent refer to addding util.RetainReplicasLabel to resource scaled by HPA
addLabelEvent labelEventKind = iota
// deleteLabelEvent refer to deleting util.RetainReplicasLabel from resource scaled by HPA
deleteLabelEvent
)

type labelEvent struct {
kind labelEventKind
hpa *autoscalingv2.HorizontalPodAutoscaler
}

func (r *HPAReplicasSyncer) reconcileScaleRef(key util.QueueKey) (err error) {
event, ok := key.(labelEvent)
if !ok {
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
return nil
}

switch event.kind {
case addLabelEvent:
err = r.addHPALabelToScaleRef(context.TODO(), event.hpa)
case deleteLabelEvent:
err = r.deleteHPALabelFromScaleRef(context.TODO(), event.hpa)
default:
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
return nil
}

if err != nil {
klog.Errorf("reconcile scale ref failed: %+v", err)
}
return err
}

func (r *HPAReplicasSyncer) addHPALabelToScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version)
if err != nil {
return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

// use patch is better than update, when modification occur after get, patch can still success while update can not
newScaleRef := scaleRef.DeepCopy()
util.MergeLabel(newScaleRef, util.RetainReplicasLabel, util.RetainReplicasValue)
patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef)
if err != nil {
return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
if len(patchBytes) == 0 {
klog.Infof("hpa labels already exist, skip adding (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}

_, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()).
Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

klog.Infof("add hpa labels to %s/%v success", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}

func (r *HPAReplicasSyncer) deleteHPALabelFromScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version)
if err != nil {
return fmt.Errorf("unable to recognize scale ref resource, %s/%v, err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

scaleRef, err := r.DynamicClient.Resource(mapping.Resource).Namespace(hpa.Namespace).Get(ctx, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to find scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

// use patch is better than update, when modification occur after get, patch can still success while update can not
newScaleRef := scaleRef.DeepCopy()
util.RemoveLabels(newScaleRef, util.RetainReplicasLabel)
patchBytes, err := helper.GenMergePatch(scaleRef, newScaleRef)
if err != nil {
return fmt.Errorf("failed to gen merge patch (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}
if len(patchBytes) == 0 {
klog.Infof("hpa labels not exist, skip deleting (%s/%v)", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}

_, err = r.DynamicClient.Resource(mapping.Resource).Namespace(newScaleRef.GetNamespace()).
Patch(ctx, newScaleRef.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("scale ref resource is not found (%s/%v), skip processing", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
return fmt.Errorf("failed to patch scale ref resource (%s/%v), err: %+v", hpa.Namespace, hpa.Spec.ScaleTargetRef, err)
}

klog.Infof("delete hpa labels from %s/%+v success", hpa.Namespace, hpa.Spec.ScaleTargetRef)
return nil
}
3 changes: 2 additions & 1 deletion pkg/resourceinterpreter/default/native/aggregatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func aggregateHorizontalPodAutoscalerStatus(object *unstructured.Unstructured, a
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d", temp.CurrentReplicas)
klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d, DesiredReplicas: %d",
hpa.Namespace, hpa.Name, item.ClusterName, temp.CurrentReplicas, temp.DesiredReplicas)

newStatus.CurrentReplicas += temp.CurrentReplicas
newStatus.DesiredReplicas += temp.DesiredReplicas
Expand Down
22 changes: 22 additions & 0 deletions pkg/resourceinterpreter/default/native/retain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package native
import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -18,6 +19,7 @@ type retentionInterpreter func(desired *unstructured.Unstructured, observed *uns

func getAllDefaultRetentionInterpreter() map[schema.GroupVersionKind]retentionInterpreter {
s := make(map[schema.GroupVersionKind]retentionInterpreter)
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = retainWorkloadReplicas
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = retainPodFields
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = lifted.RetainServiceFields
s[corev1.SchemeGroupVersion.WithKind(util.ServiceAccountKind)] = lifted.RetainServiceAccountFields
Expand Down Expand Up @@ -122,3 +124,23 @@ func retainJobSelectorFields(desired, observed *unstructured.Unstructured) (*uns
}
return desired, nil
}

func retainWorkloadReplicas(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
labels, _, err := unstructured.NestedStringMap(desired.Object, "metadata", "labels")
if err != nil {
return nil, fmt.Errorf("failed to get metadata.label from desired.Object: %+v", err)
}

if label, labelExist := labels[util.RetainReplicasLabel]; labelExist && label == util.RetainReplicasValue {
replicas, exist, err := unstructured.NestedInt64(observed.Object, "spec", "replicas")
if err != nil || !exist {
return nil, fmt.Errorf("failed to get spec.replicas from %s %s/%s", observed.GetKind(), observed.GetNamespace(), observed.GetName())
}
err = unstructured.SetNestedField(desired.Object, replicas, "spec", "replicas")
if err != nil {
return nil, fmt.Errorf("failed to set spec.replicas to %s %s/%s", desired.GetKind(), desired.GetNamespace(), desired.GetName())
}
}

return desired, nil
}
Loading

0 comments on commit 681515f

Please sign in to comment.