Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add activation feature for CPU/Memory scaler #6231

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Here is an overview of all new **experimental** features:
- **General**: Prevent multiple ScaledObjects managing one HPA ([#6130](https://github.com/kedacore/keda/issues/6130))
- **General**: Show full triggers'types and authentications'types in status ([#6187](https://github.com/kedacore/keda/issues/6187))
- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **CPU/Memory Scaler**: Add activation feature ([#6057](https://github.com/kedacore/keda/issues/6057))
- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216))
- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
Expand Down
11 changes: 9 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -59,7 +60,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(metricsv1beta1.AddToScheme(scheme))
utilruntime.Must(kedav1alpha1.AddToScheme(scheme))
utilruntime.Must(eventingv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
Expand Down Expand Up @@ -217,7 +218,13 @@ func main() {
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
podMetricsClient, err := k8s.InitPodMetricsClient(mgr)
if err != nil {
setupLog.Error(err, "unable to init metrics client")
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, podMetricsClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
eventEmitter := eventemitter.NewEventEmitter(mgr.GetClient(), eventRecorder, k8sClusterName, secretInformer.Lister())

if err = (&kedacontrollers.ScaledObjectReconciler{
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaledJobGenerations = &sync.Map{}
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
44 changes: 43 additions & 1 deletion controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,16 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() error {
return k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
}).ShouldNot(HaveOccurred())

averageUtilization := int32(100)
hpa.Status.CurrentMetrics = []autoscalingv2.MetricStatus{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
AverageUtilization: &averageUtilization,
},
},
},
Expand All @@ -805,6 +808,40 @@ var _ = Describe("ScaledObjectController", func() {
return k8sClient.Status().Update(ctx, hpa)
}).ShouldNot(HaveOccurred())

// create pod metrics
//podMetrics := &metrics.PodMetrics{
// TypeMeta: metav1.TypeMeta{
// Kind: "PodMetrics",
// APIVersion: "metrics.k8s.io/v1beta1",
// },
// ObjectMeta: metav1.ObjectMeta{
// Name: deploymentName,
// Namespace: "default",
// Labels: map[string]string{
// "app": deploymentName,
// },
// },
// Containers: []metrics.ContainerMetrics{
// {
// Name: deploymentName,
// Usage: corev1.ResourceList{
// corev1.ResourceCPU: resource.MustParse("100m"),
// },
// },
// },
//}
//Eventually(func() error { return k8sClient.Create(ctx, podMetrics) }).ShouldNot(HaveOccurred())

so.Status.ScaleTargetGVKR = &kedav1alpha1.GroupVersionKindResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
Kind: "Deployment",
}
Eventually(func() error {
return k8sClient.Status().Update(ctx, so)
}).ShouldNot(HaveOccurred())

// hpa metrics will only left CPU metric
Eventually(func() int {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expand Down Expand Up @@ -1463,6 +1500,11 @@ func generateDeployment(name string) *appsv1.Deployment {
{
Name: name,
Image: name,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
},
},
},
},
},
Expand Down
8 changes: 7 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -91,10 +92,13 @@ var _ = BeforeSuite(func() {
scaleClient, _, err := k8s.InitScaleClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

metricsClient, err := k8s.InitPodMetricsClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, metricsClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
}).SetupWithManager(k8sManager, controller.Options{})
Expand All @@ -107,6 +111,8 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expect(metricsv1beta1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).ToNot(HaveOccurred())
Expect(k8sClient).ToNot(BeNil())
Expand Down
21 changes: 21 additions & 0 deletions pkg/k8s/podmetrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package k8s

import (
"fmt"

"k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
)

var metricsClientLog = ctrl.Log.WithName("metricsclient")

// InitPodMetricsClient initializes the client for pod metrics. It is used to fetch pod metrics from the metrics server.
func InitPodMetricsClient(mgr ctrl.Manager) (v1beta1.PodMetricsesGetter, error) {
clientset, err := v1beta1.NewForConfig(mgr.GetConfig())
if err != nil {
metricsClientLog.Error(err, "not able to create metrics client")
return nil, fmt.Errorf("failed to create metrics clientset: %w", err)
}

return clientset, nil
}
Loading
Loading