Skip to content

Commit

Permalink
feat: add activation feature for CPU/Memory scaler
Browse files Browse the repository at this point in the history
Signed-off-by: kunwooy <[email protected]>
  • Loading branch information
kunwooy committed Oct 14, 2024
1 parent 0b7ac6d commit 943fce1
Show file tree
Hide file tree
Showing 41 changed files with 6,872 additions and 83 deletions.
11 changes: 9 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -59,7 +60,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(metricsv1beta1.AddToScheme(scheme))
utilruntime.Must(kedav1alpha1.AddToScheme(scheme))
utilruntime.Must(eventingv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
Expand Down Expand Up @@ -217,7 +218,13 @@ func main() {
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
metricsClient, err := k8s.InitMetricsClient(mgr)
if err != nil {
setupLog.Error(err, "unable to init metrics client")
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, metricsClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
eventEmitter := eventemitter.NewEventEmitter(mgr.GetClient(), eventRecorder, k8sClusterName, secretInformer.Lister())

if err = (&kedacontrollers.ScaledObjectReconciler{
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaledJobGenerations = &sync.Map{}
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
5 changes: 4 additions & 1 deletion controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,16 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() error {
return k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
}).ShouldNot(HaveOccurred())

averageUtilization := int32(100)
hpa.Status.CurrentMetrics = []autoscalingv2.MetricStatus{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
AverageUtilization: &averageUtilization,
},
},
},
Expand Down
5 changes: 4 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ var _ = BeforeSuite(func() {
scaleClient, _, err := k8s.InitScaleClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

metricsClient, err := k8s.InitMetricsClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, metricsClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
}).SetupWithManager(k8sManager, controller.Options{})
Expand Down
21 changes: 21 additions & 0 deletions pkg/k8s/metricsclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package k8s

import (
"fmt"

"k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
)

var metricsClientLog = ctrl.Log.WithName("metricsclient")

// InitMetricsClient initializes metrics client
func InitMetricsClient(mgr ctrl.Manager) (v1beta1.PodMetricsesGetter, error) {
clientset, err := v1beta1.NewForConfig(mgr.GetConfig())
if err != nil {
metricsClientLog.Error(err, "not able to create metrics client")
return nil, fmt.Errorf("failed to create metrics clientset: %w", err)
}

return clientset, nil
}
Loading

0 comments on commit 943fce1

Please sign in to comment.