From 5d035404e73aa6b25e5af21d150c99c9e0752bde Mon Sep 17 00:00:00 2001 From: Ahmed ElSayed Date: Tue, 9 Nov 2021 00:51:37 -0800 Subject: [PATCH 1/5] Set concurrency for pr-e2e action (#2261) Signed-off-by: Ahmed ElSayed --- .github/workflows/pr-e2e.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-e2e.yml b/.github/workflows/pr-e2e.yml index 0b9bc6698e4..d4cd9d83252 100644 --- a/.github/workflows/pr-e2e.yml +++ b/.github/workflows/pr-e2e.yml @@ -2,7 +2,7 @@ name: pr-e2e-tests on: issue_comment: types: [created] - +concurrency: e2e-tests jobs: check: runs-on: ubuntu-latest From f23c6d6e4e16fdd64ca4276985465e9ebaf9c329 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Date: Tue, 9 Nov 2021 13:37:42 +0100 Subject: [PATCH 2/5] Reverting #2261 (#2265) Signed-off-by: Zbynek Roubalik --- .github/workflows/pr-e2e.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/pr-e2e.yml b/.github/workflows/pr-e2e.yml index d4cd9d83252..4550b7464cb 100644 --- a/.github/workflows/pr-e2e.yml +++ b/.github/workflows/pr-e2e.yml @@ -2,7 +2,6 @@ name: pr-e2e-tests on: issue_comment: types: [created] -concurrency: e2e-tests jobs: check: runs-on: ubuntu-latest From 37a432414e4e2c5297ddcd6c7bf6ecef08ec35b9 Mon Sep 17 00:00:00 2001 From: Ahmed ElSayed Date: Tue, 9 Nov 2021 13:17:47 -0800 Subject: [PATCH 3/5] Add ScalersCache to reuse scalers unless they need changing (#2187) * Add ScalersCache to reuse scales unless they need changing Closes #1121 Signed-off-by: Ahmed ElSayed --- CHANGELOG.md | 1 + CREATE-NEW-SCALER.md | 2 +- adapter/main.go | 54 ++- apis/keda/v1alpha1/zz_generated.deepcopy.go | 1 - ...keda.sh_clustertriggerauthentications.yaml | 4 +- config/crd/bases/keda.sh_scaledobjects.yaml | 2 +- controllers/keda/hpa.go | 35 +- controllers/keda/hpa_test.go | 17 +- .../keda/metrics_adapter_controller.go | 45 +++ controllers/keda/scaledjob_controller.go | 2 +- .../keda/scaledobject_controller_test.go | 40 +- pkg/mock/mock_scale/mock_interfaces.go | 2 +- pkg/mock/mock_scaling/mock_interface.go | 28 +- pkg/provider/fallback.go | 15 +- pkg/provider/fallback_test.go | 27 +- pkg/provider/provider.go | 14 +- pkg/scalers/scaler.go | 2 +- pkg/scaling/cache/scalers_cache.go | 341 ++++++++++++++++++ .../scalers_cache_test.go} | 71 +++- pkg/scaling/scale_handler.go | 206 ++++++----- pkg/scaling/scale_handler_test.go | 83 +++-- pkg/scaling/scaledjob/scale_metrics.go | 184 ---------- tests/scalers/mongodb.test.ts | 35 +- 23 files changed, 787 insertions(+), 424 deletions(-) create mode 100644 controllers/keda/metrics_adapter_controller.go create mode 100644 pkg/scaling/cache/scalers_cache.go rename pkg/scaling/{scaledjob/scale_metrics_test.go => cache/scalers_cache_test.go} (71%) delete mode 100644 pkg/scaling/scaledjob/scale_metrics.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 61711cff40d..6b3a11827ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211)) - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) +- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) ### Improvements diff --git a/CREATE-NEW-SCALER.md b/CREATE-NEW-SCALER.md index 06526b15adf..b380aa4f270 100644 --- a/CREATE-NEW-SCALER.md +++ b/CREATE-NEW-SCALER.md @@ -93,7 +93,7 @@ The constructor should have the following parameters: ## Lifecycle of a scaler -The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls. +Scalers are created and cached until the ScaledObject is modified, or `.IsActive()`/`GetMetrics()` result in an error. The cached scaler is then invalidated and a new scaler is created. `Close()` is called on all scalers when disposed. ## Note The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator. diff --git a/adapter/main.go b/adapter/main.go index 6ac8e7807cb..b3c3baf0e0c 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "fmt" "os" @@ -24,9 +25,10 @@ import ( "strconv" "time" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" + k8sruntime "k8s.io/apimachinery/pkg/runtime" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes/scheme" @@ -36,12 +38,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/controller" basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" generatedopenapi "github.com/kedacore/keda/v2/adapter/generated/openapi" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + kedacontrollers "github.com/kedacore/keda/v2/controllers/keda" prommetrics "github.com/kedacore/keda/v2/pkg/metrics" kedaprovider "github.com/kedacore/keda/v2/pkg/provider" "github.com/kedacore/keda/v2/pkg/scaling" @@ -65,7 +69,7 @@ var ( adapterClientRequestBurst int ) -func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, error) { +func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) { // Get a config to talk to the apiserver cfg, err := config.GetConfig() if cfg != nil { @@ -75,17 +79,17 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric if err != nil { logger.Error(err, "failed to get the config") - return nil, fmt.Errorf("failed to get the config (%s)", err) + return nil, nil, fmt.Errorf("failed to get the config (%s)", err) } scheme := scheme.Scheme if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil { logger.Error(err, "failed to add apps/v1 scheme to runtime scheme") - return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err) + return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err) } if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil { logger.Error(err, "failed to add keda scheme to runtime scheme") - return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err) + return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err) } kubeclient, err := client.New(cfg, client.Options{ @@ -93,7 +97,7 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric }) if err != nil { logger.Error(err, "unable to construct new client") - return nil, fmt.Errorf("unable to construct new client (%s)", err) + return nil, nil, fmt.Errorf("unable to construct new client (%s)", err) } broadcaster := record.NewBroadcaster() @@ -103,13 +107,43 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric namespace, err := getWatchNamespace() if err != nil { logger.Error(err, "failed to get watch namespace") - return nil, fmt.Errorf("failed to get watch namespace (%s)", err) + return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err) } prometheusServer := &prommetrics.PrometheusMetricServer{} go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }() + stopCh := make(chan struct{}) + if err := runScaledObjectController(scheme, namespace, handler, logger, stopCh); err != nil { + return nil, nil, err + } + + return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), stopCh, nil +} + +func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error { + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Namespace: namespace, + }) + if err != nil { + return err + } + + if err := (&kedacontrollers.MetricsScaledObjectReconciler{ + ScaleHandler: scaleHandler, + }).SetupWithManager(mgr, controller.Options{}); err != nil { + return err + } + + go func() { + if err := mgr.Start(context.Background()); err != nil { + logger.Error(err, "controller-runtime encountered an error") + stopCh <- struct{}{} + close(stopCh) + } + }() - return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), nil + return nil } func printVersion() { @@ -171,7 +205,7 @@ func main() { return } - kedaProvider, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond) + kedaProvider, stopCh, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond) if err != nil { logger.Error(err, "making provider") return @@ -179,7 +213,7 @@ func main() { cmd.WithExternalMetrics(kedaProvider) logger.Info(cmd.Message) - if err = cmd.Run(wait.NeverStop); err != nil { + if err = cmd.Run(stopCh); err != nil { return } } diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index d6e286bf9e8..53d00a01491 100644 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -1,4 +1,3 @@ -//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index a33408d758d..c871e853d2c 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: clustertriggerauthentications.keda.sh spec: @@ -90,6 +90,8 @@ spec: type: object mount: type: string + namespace: + type: string role: type: string secrets: diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 0781a3e86d0..1ff6f64fd0c 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: scaledobjects.keda.sh spec: diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 302098a5674..310090f9ed6 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -160,35 +160,32 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, var externalMetricNames []string var resourceMetricNames []string - scalers, err := r.scaleHandler.GetScalers(ctx, scaledObject) + cache, err := r.scaleHandler.GetScalersCache(ctx, scaledObject) if err != nil { logger.Error(err, "Error getting scalers") return nil, err } - for _, scaler := range scalers { - metricSpecs := scaler.GetMetricSpecForScaling(ctx) + metricSpecs := cache.GetMetricSpecForScaling(ctx) - for _, metricSpec := range metricSpecs { - if metricSpec.Resource != nil { - resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name)) - } - - if metricSpec.External != nil { - externalMetricName := metricSpec.External.Metric.Name - if kedacontrollerutil.Contains(externalMetricNames, externalMetricName) { - return nil, fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metricName manually", externalMetricName, scaledObject.Name) - } + for _, metricSpec := range metricSpecs { + if metricSpec.Resource != nil { + resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name)) + } - // add the scaledobject.keda.sh/name label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. - metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} - metricSpec.External.Metric.Selector.MatchLabels["scaledobject.keda.sh/name"] = scaledObject.Name - externalMetricNames = append(externalMetricNames, externalMetricName) + if metricSpec.External != nil { + externalMetricName := metricSpec.External.Metric.Name + if kedacontrollerutil.Contains(externalMetricNames, externalMetricName) { + return nil, fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metricName manually", externalMetricName, scaledObject.Name) } + + // add the scaledobject.keda.sh/name label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. + metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} + metricSpec.External.Metric.Selector.MatchLabels["scaledobject.keda.sh/name"] = scaledObject.Name + externalMetricNames = append(externalMetricNames, externalMetricName) } - scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) - scaler.Close(ctx) } + scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) // sort metrics in ScaledObject, this way we always check the same resource in Reconcile loop and we can prevent unnecessary HPA updates, // see https://github.com/kedacore/keda/issues/1531 for details diff --git a/controllers/keda/hpa_test.go b/controllers/keda/hpa_test.go index 51a092f4863..6b8c18f39ad 100644 --- a/controllers/keda/hpa_test.go +++ b/controllers/keda/hpa_test.go @@ -30,7 +30,8 @@ import ( "github.com/kedacore/keda/v2/pkg/mock/mock_client" mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" - kedascalers "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) var _ = Describe("hpa", func() { @@ -129,7 +130,16 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc }, } - scalers := []kedascalers.Scaler{scaler} + scalersCache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: scaler, + Factory: func() (scalers.Scaler, error) { + return scaler, nil + }, + }}, + Logger: nil, + Recorder: nil, + } metricSpec := v2beta2.MetricSpec{ External: &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ @@ -140,8 +150,7 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc metricSpecs := []v2beta2.MetricSpec{metricSpec} ctx := context.Background() scaler.EXPECT().GetMetricSpecForScaling(ctx).Return(metricSpecs) - scaler.EXPECT().Close(ctx) - scaleHandler.EXPECT().GetScalers(context.Background(), gomock.Eq(scaledObject)).Return(scalers, nil) + scaleHandler.EXPECT().GetScalersCache(context.Background(), gomock.Eq(scaledObject)).Return(&scalersCache, nil) return scaledObject } diff --git a/controllers/keda/metrics_adapter_controller.go b/controllers/keda/metrics_adapter_controller.go new file mode 100644 index 00000000000..4b79b2c29ba --- /dev/null +++ b/controllers/keda/metrics_adapter_controller.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keda + +import ( + "context" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scaling" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type MetricsScaledObjectReconciler struct { + ScaleHandler scaling.ScaleHandler +} + +func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.ScaleHandler.ClearScalersCache(ctx, req.Name, req.Namespace) + return ctrl.Result{}, nil +} + +func (r *MetricsScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + return ctrl.NewControllerManagedBy(mgr). + For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Owns(&kedav1alpha1.ScaledObject{}). + WithOptions(options). + Complete(r) +} diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 05c7fb6866e..e14fc90bdff 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -140,7 +140,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log } // Check ScaledJob is Ready or not - _, err = r.scaleHandler.GetScalers(ctx, scaledJob) + _, err = r.scaleHandler.GetScalersCache(ctx, scaledJob) if err != nil { logger.Error(err, "Error getting scalers") return "Failed to ensure ScaledJob is correctly created", err diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index 6b18aec6744..a8052bcbe4d 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -35,6 +35,7 @@ import ( "github.com/kedacore/keda/v2/pkg/mock/mock_client" "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) type GinkgoTestReporter struct{} @@ -82,7 +83,7 @@ var _ = Describe("ScaledObjectController", func() { It("should pass metric name validation", func() { // Generate test data - testScalers := make([]scalers.Scaler, 0) + testScalers := make([]cache.ScalerBuilder, 0) expectedExternalMetricNames := make([]string, 0) for i, tm := range triggerMeta { @@ -99,7 +100,12 @@ var _ = Describe("ScaledObjectController", func() { Fail(err.Error()) } - testScalers = append(testScalers, s) + testScalers = append(testScalers, cache.ScalerBuilder{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return scalers.NewPrometheusScaler(config) + }, + }) for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) { if metricSpec.External != nil { expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name) @@ -108,7 +114,10 @@ var _ = Describe("ScaledObjectController", func() { } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return(testScalers, nil) + scalerCache := cache.ScalersCache{ + Scalers: testScalers, + } + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), uniquelyNamedScaledObject).Return(&scalerCache, nil) mockClient.EXPECT().Status().Return(mockStatusWriter) mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()) @@ -121,6 +130,7 @@ var _ = Describe("ScaledObjectController", func() { // Test returned values Ω(len(metricSpecs)).Should(Equal(len(testScalers))) Ω(err).Should(BeNil()) + scalerCache.Close(ctx) }) It("should pass metric name validation with single value", func() { @@ -145,8 +155,16 @@ var _ = Describe("ScaledObjectController", func() { } } + scalersCache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return s, nil + }, + }}, + } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil) + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), uniquelyNamedScaledObject).Return(&scalersCache, nil) mockClient.EXPECT().Status().Return(mockStatusWriter) mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()) @@ -167,7 +185,7 @@ var _ = Describe("ScaledObjectController", func() { It("should pass metric name validation", func() { // Generate test data - testScalers := make([]scalers.Scaler, 0) + testScalers := make([]cache.ScalerBuilder, 0) for i := 0; i < 4; i++ { config := &scalers.ScalerConfig{ Name: fmt.Sprintf("test.%d", i), @@ -182,11 +200,19 @@ var _ = Describe("ScaledObjectController", func() { Fail(err.Error()) } - testScalers = append(testScalers, s) + testScalers = append(testScalers, cache.ScalerBuilder{ + Scaler: s, + Factory: func() (scalers.Scaler, error) { + return s, nil + }, + }) + } + scalersCache := cache.ScalersCache{ + Scalers: testScalers, } // Set up expectations - mockScaleHandler.EXPECT().GetScalers(context.Background(), duplicateNamedScaledObject).Return(testScalers, nil) + mockScaleHandler.EXPECT().GetScalersCache(context.Background(), duplicateNamedScaledObject).Return(&scalersCache, nil) // Call function tobe tested metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, duplicateNamedScaledObject) diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go index 4cb4a636375..48c6afb71bd 100644 --- a/pkg/mock/mock_scale/mock_interfaces.go +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/ecomaz/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go +// Source: /home/ahmed/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go // Package mock_scale is a generated GoMock package. package mock_scale diff --git a/pkg/mock/mock_scaling/mock_interface.go b/pkg/mock/mock_scaling/mock_interface.go index f398a17fc12..6ac7c5e7ee1 100644 --- a/pkg/mock/mock_scaling/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_interface.go @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - scalers "github.com/kedacore/keda/v2/pkg/scalers" + cache "github.com/kedacore/keda/v2/pkg/scaling/cache" ) // MockScaleHandler is a mock of ScaleHandler interface. @@ -35,6 +35,18 @@ func (m *MockScaleHandler) EXPECT() *MockScaleHandlerMockRecorder { return m.recorder } +// ClearScalersCache mocks base method. +func (m *MockScaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ClearScalersCache", ctx, name, namespace) +} + +// ClearScalersCache indicates an expected call of ClearScalersCache. +func (mr *MockScaleHandlerMockRecorder) ClearScalersCache(ctx, name, namespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).ClearScalersCache), ctx, name, namespace) +} + // DeleteScalableObject mocks base method. func (m *MockScaleHandler) DeleteScalableObject(scalableObject interface{}) error { m.ctrl.T.Helper() @@ -49,19 +61,19 @@ func (mr *MockScaleHandlerMockRecorder) DeleteScalableObject(scalableObject inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).DeleteScalableObject), scalableObject) } -// GetScalers mocks base method. -func (m *MockScaleHandler) GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) { +// GetScalersCache mocks base method. +func (m *MockScaleHandler) GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetScalers", ctx, scalableObject) - ret0, _ := ret[0].([]scalers.Scaler) + ret := m.ctrl.Call(m, "GetScalersCache", ctx, scalableObject) + ret0, _ := ret[0].(*cache.ScalersCache) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetScalers indicates an expected call of GetScalers. -func (mr *MockScaleHandlerMockRecorder) GetScalers(ctx, scalableObject interface{}) *gomock.Call { +// GetScalersCache indicates an expected call of GetScalersCache. +func (mr *MockScaleHandlerMockRecorder) GetScalersCache(ctx, scalableObject interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScalers", reflect.TypeOf((*MockScaleHandler)(nil).GetScalers), ctx, scalableObject) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).GetScalersCache), ctx, scalableObject) } // HandleScalableObject mocks base method. diff --git a/pkg/provider/fallback.go b/pkg/provider/fallback.go index 8e186d685ed..164bf56cfc5 100644 --- a/pkg/provider/fallback.go +++ b/pkg/provider/fallback.go @@ -23,26 +23,23 @@ import ( "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/scalers" ) func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType } -func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { +func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { status := scaledObject.Status.DeepCopy() initHealthStatus(status) - metrics, err := scaler.GetMetrics(ctx, metricName, metricSelector) healthStatus := getHealthStatus(status, metricName) - if err == nil { + if suppressedError == nil { zero := int32(0) healthStatus.NumberOfFailures = &zero healthStatus.Status = kedav1alpha1.HealthStatusHappy @@ -60,14 +57,14 @@ func (p *KedaProvider) getMetricsWithFallback(ctx context.Context, scaler scaler switch { case !isFallbackEnabled(scaledObject, metricSpec): - return nil, err + return nil, suppressedError case !validateFallback(scaledObject): logger.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers") - return nil, err + return nil, suppressedError case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold: - return doFallback(scaledObject, metricSpec, metricName, err), nil + return doFallback(scaledObject, metricSpec, metricName, suppressedError), nil default: - return nil, err + return nil, suppressedError } } diff --git a/pkg/provider/fallback_test.go b/pkg/provider/fallback_test.go index 8646ff598ca..a25aeca75db 100644 --- a/pkg/provider/fallback_test.go +++ b/pkg/provider/fallback_test.go @@ -87,7 +87,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -117,7 +118,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -132,7 +134,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(3) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -160,7 +163,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -189,7 +193,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -244,7 +249,8 @@ var _ = Describe("fallback", func() { statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error")) client.EXPECT().Status().Return(statusWriter) - metrics, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + metrics, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value, _ := metrics[0].Value.AsInt64() @@ -273,7 +279,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) @@ -306,7 +313,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) condition := so.Status.Conditions.GetFallbackCondition() Expect(condition.IsTrue()).Should(BeTrue()) @@ -339,7 +347,8 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) - _, err := providerUnderTest.getMetricsWithFallback(context.Background(), scaler, metricName, nil, so, metricSpec) + metrics, err := scaler.GetMetrics(context.Background(), metricName, nil) + _, err = providerUnderTest.getMetricsWithFallback(context.Background(), metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("Some error")) condition := so.Status.Conditions.GetFallbackCondition() diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index b306e32ce3f..2c46057a8fa 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -92,14 +92,18 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } scaledObject := &scaledObjects.Items[0] - matchingMetrics := []external_metrics.ExternalMetricValue{} - scalers, err := p.scaleHandler.GetScalers(ctx, scaledObject) + var matchingMetrics []external_metrics.ExternalMetricValue + cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject) + if err != nil { + return nil, err + } + metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err) if err != nil { return nil, fmt.Errorf("error when getting scalers %s", err) } - for scalerIndex, scaler := range scalers { + for scalerIndex, scaler := range cache.GetScalers() { metricSpecs := scaler.GetMetricSpecForScaling(ctx) scalerName := strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1) @@ -110,7 +114,8 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := p.getMetricsWithFallback(ctx, scaler, info.Metric, metricSelector, scaledObject, metricSpec) + metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric, metricSelector) + metrics, err = p.getMetricsWithFallback(ctx, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) @@ -124,7 +129,6 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) } } - scaler.Close(ctx) } if len(matchingMetrics) == 0 { diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index 5a75110daf9..32583336ede 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go new file mode 100644 index 00000000000..163b797421f --- /dev/null +++ b/pkg/scaling/cache/scalers_cache.go @@ -0,0 +1,341 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/scalers" + "k8s.io/api/autoscaling/v2beta2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/record" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type ScalersCache struct { + Generation int64 + Scalers []ScalerBuilder + Logger logr.Logger + Recorder record.EventRecorder +} + +type ScalerBuilder struct { + Scaler scalers.Scaler + Factory func() (scalers.Scaler, error) +} + +func (c *ScalersCache) GetScalers() []scalers.Scaler { + result := make([]scalers.Scaler, 0, len(c.Scalers)) + for _, s := range c.Scalers { + result = append(result, s.Scaler) + } + return result +} + +func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { + var result []scalers.PushScaler + for _, s := range c.Scalers { + if ps, ok := s.Scaler.(scalers.PushScaler); ok { + result = append(result, ps) + } + } + return result +} + +func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, id int, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + if id < 0 || id >= len(c.Scalers) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + } + m, err := c.Scalers[id].Scaler.GetMetrics(ctx, metricName, metricSelector) + if err == nil { + return m, nil + } + + ns, err := c.refreshScaler(ctx, id) + if err != nil { + return nil, err + } + + return ns.GetMetrics(ctx, metricName, metricSelector) +} + +func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, []external_metrics.ExternalMetricValue) { + isActive := false + isError := false + for i, s := range c.Scalers { + isTriggerActive, err := s.Scaler.IsActive(ctx) + if err != nil { + var ns scalers.Scaler + ns, err = c.refreshScaler(ctx, i) + if err == nil { + isTriggerActive, err = ns.IsActive(ctx) + } + } + + if err != nil { + c.Logger.V(1).Info("Error getting scale decision", "Error", err) + isError = true + c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } else if isTriggerActive { + isActive = true + if externalMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil { + c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) + } + if resourceMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil { + c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) + } + break + } + } + + return isActive, isError, []external_metrics.ExternalMetricValue{} +} + +func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { + var queueLength int64 + var maxValue int64 + isActive := false + + logger := logf.Log.WithName("scalemetrics") + scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob) + switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + case "avg": + queueLengthSum := int64(0) + maxValueSum := int64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLengthSum += metrics.queueLength + maxValueSum += metrics.maxValue + isActive = metrics.isActive + length++ + } + } + if length != 0 { + queueLength = divideWithCeil(queueLengthSum, int64(length)) + maxValue = divideWithCeil(maxValueSum, int64(length)) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLength += metrics.queueLength + maxValue += metrics.maxValue + isActive = metrics.isActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.queueLength > queueLength && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + } + maxValue = min(scaledJob.MaxReplicaCount(), maxValue) + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + + return isActive, queueLength, maxValue +} + +func (c *ScalersCache) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + var metrics []external_metrics.ExternalMetricValue + for i, s := range c.Scalers { + m, err := s.Scaler.GetMetrics(ctx, metricName, metricSelector) + if err != nil { + ns, err := c.refreshScaler(ctx, i) + if err != nil { + return metrics, err + } + m, err = ns.GetMetrics(ctx, metricName, metricSelector) + if err != nil { + return metrics, err + } + } + metrics = append(metrics, m...) + } + + return metrics, nil +} + +func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { + if id < 0 || id >= len(c.Scalers) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + } + + sb := c.Scalers[id] + ns, err := sb.Factory() + if err != nil { + return nil, err + } + + c.Scalers[id] = ScalerBuilder{ + Scaler: ns, + Factory: sb.Factory, + } + sb.Scaler.Close(ctx) + + return ns, nil +} + +func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec { + var spec []v2beta2.MetricSpec + for _, s := range c.Scalers { + spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...) + } + return spec +} + +func (c *ScalersCache) Close(ctx context.Context) { + scalers := c.Scalers + c.Scalers = nil + for _, s := range scalers { + err := s.Scaler.Close(ctx) + if err != nil { + c.Logger.Error(err, "error closing scaler", "scaler", s) + } + } +} + +type scalerMetrics struct { + queueLength int64 + maxValue int64 + isActive bool +} + +func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { + var scalersMetrics []scalerMetrics + for i, s := range c.Scalers { + var queueLength int64 + var targetAverageValue int64 + isActive := false + maxValue := int64(0) + scalerType := fmt.Sprintf("%T:", s) + + scalerLogger := c.Logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) + + metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx) + + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || metricSpecs[0].External == nil { + continue + } + + isTriggerActive, err := s.Scaler.IsActive(ctx) + if err != nil { + var ns scalers.Scaler + ns, err = c.refreshScaler(ctx, i) + if err == nil { + isTriggerActive, err = ns.IsActive(ctx) + } + } + + if err != nil { + scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err) + c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + + targetAverageValue = getTargetAverageValue(metricSpecs) + + metrics, err := s.Scaler.GetMetrics(ctx, "queueLength", nil) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) + c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + + var metricValue int64 + + for _, m := range metrics { + if m.MetricName == "queueLength" { + metricValue, _ = m.Value.AsInt64() + queueLength += metricValue + } + } + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) + + if isTriggerActive { + isActive = true + } + + if targetAverageValue != 0 { + maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) + } + scalersMetrics = append(scalersMetrics, scalerMetrics{ + queueLength: queueLength, + maxValue: maxValue, + isActive: isActive, + }) + } + return scalersMetrics +} + +func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { + var targetAverageValue int64 + var metricValue int64 + var flag bool + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue, flag = metric.External.Target.AverageValue.AsInt64() + if !flag { + metricValue = 0 + } + } + + targetAverageValue += metricValue + } + count := int64(len(metricSpecs)) + if count != 0 { + return targetAverageValue / count + } + return 0 +} + +func divideWithCeil(x, y int64) int64 { + ans := x / y + remainder := x % y + if remainder != 0 { + return ans + 1 + } + return ans +} + +// Min function for int64 +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/pkg/scaling/scaledjob/scale_metrics_test.go b/pkg/scaling/cache/scalers_cache_test.go similarity index 71% rename from pkg/scaling/scaledjob/scale_metrics_test.go rename to pkg/scaling/cache/scalers_cache_test.go index 64bbecd556b..6f7995dc0a1 100644 --- a/pkg/scaling/scaledjob/scale_metrics_test.go +++ b/pkg/scaling/cache/scalers_cache_test.go @@ -1,12 +1,13 @@ -package scaledjob +package cache import ( "context" "fmt" "testing" - "github.com/go-playground/assert/v2" + "github.com/go-logr/logr" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/tools/record" @@ -66,24 +67,44 @@ func TestIsScaledJobActive(t *testing.T) { // Keep the current behavior // Assme 1 trigger only scaledJobSingle := createScaledObject(100, "") // testing default = max - scalerSingle := []scalers.Scaler{ - createScaler(ctrl, int64(20), int32(2), true), + scalerSingle := []ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(20), int32(2), true), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, int64(20), int32(2), true), nil + }, + }} + + cache := ScalersCache{ + Scalers: scalerSingle, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } - isActive, queueLength, maxValue := GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, true, isActive) assert.Equal(t, int64(20), queueLength) assert.Equal(t, int64(10), maxValue) + cache.Close(context.Background()) // Non-Active trigger only - scalerSingle = []scalers.Scaler{ - createScaler(ctrl, int64(0), int32(2), false), + scalerSingle = []ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(0), int32(2), false), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, int64(0), int32(2), false), nil + }, + }} + + cache = ScalersCache{ + Scalers: scalerSingle, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } - isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalerSingle, scaledJobSingle, recorder) + isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJobSingle) assert.Equal(t, false, isActive) assert.Equal(t, int64(0), queueLength) assert.Equal(t, int64(0), maxValue) + cache.Close(context.Background()) // Test the valiation scalerTestDatam := []scalerTestData{ @@ -96,18 +117,40 @@ func TestIsScaledJobActive(t *testing.T) { for index, scalerTestData := range scalerTestDatam { scaledJob := createScaledObject(scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) - scalers := []scalers.Scaler{ - createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), - createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), - createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), - createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + scalersToTest := []ScalerBuilder{{ + Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + Factory: func() (scalers.Scaler, error) { + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), nil + }, + }} + + cache = ScalersCache{ + Scalers: scalersToTest, + Logger: logr.DiscardLogger{}, + Recorder: recorder, } fmt.Printf("index: %d", index) - isActive, queueLength, maxValue = GetScaleMetrics(context.TODO(), scalers, scaledJob, recorder) + isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJob) // assert.Equal(t, 5, index) assert.Equal(t, scalerTestData.ResultIsActive, isActive) assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) + cache.Close(context.Background()) } } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index c2bb8638939..8a0e7e6fb18 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,6 +19,7 @@ package scaling import ( "context" "fmt" + "strings" "sync" "time" @@ -34,9 +35,9 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" - "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) // ScaleHandler encapsulates the logic of calling the right scalers for @@ -44,7 +45,8 @@ import ( type ScaleHandler interface { HandleScalableObject(scalableObject interface{}) error DeleteScalableObject(scalableObject interface{}) error - GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) + GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) + ClearScalersCache(ctx context.Context, name, namespace string) } type scaleHandler struct { @@ -54,6 +56,8 @@ type scaleHandler struct { scaleExecutor executor.ScaleExecutor globalHTTPTimeout time.Duration recorder record.EventRecorder + scalerCaches map[string]*cache.ScalersCache + lock *sync.RWMutex } // NewScaleHandler creates a ScaleHandler object @@ -65,23 +69,11 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder), globalHTTPTimeout: globalHTTPTimeout, recorder: recorder, + scalerCaches: map[string]*cache.ScalersCache{}, + lock: &sync.RWMutex{}, } } -func (h *scaleHandler) GetScalers(ctx context.Context, scalableObject interface{}) ([]scalers.Scaler, error) { - withTriggers, err := asDuckWithTriggers(scalableObject) - if err != nil { - return nil, err - } - - podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) - if err != nil { - return nil, err - } - - return h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName) -} - func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { withTriggers, err := asDuckWithTriggers(scalableObject) if err != nil { @@ -146,46 +138,89 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - // kick off one check to the scalers now - h.checkScalers(ctx, scalableObject, scalingMutex) - pollingInterval := withTriggers.GetPollingInterval() logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval) for { tmr := time.NewTimer(pollingInterval) + h.checkScalers(ctx, scalableObject, scalingMutex) select { case <-tmr.C: - h.checkScalers(ctx, scalableObject, scalingMutex) tmr.Stop() case <-ctx.Done(): logger.V(1).Info("Context canceled") + h.ClearScalersCache(ctx, withTriggers.Name, withTriggers.Namespace) tmr.Stop() return } } } +func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) { + withTriggers, err := asDuckWithTriggers(scalableObject) + if err != nil { + return nil, err + } + + key := strings.ToLower(fmt.Sprintf("%s.%s.%s", withTriggers.Kind, withTriggers.Name, withTriggers.Namespace)) + + h.lock.RLock() + if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation { + h.lock.RUnlock() + return cache, nil + } + h.lock.RUnlock() + + h.lock.Lock() + defer h.lock.Unlock() + if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation { + return cache, nil + } else if ok { + cache.Close(ctx) + } + + podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) + if err != nil { + return nil, err + } + + scalers := h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName) + + h.scalerCaches[key] = &cache.ScalersCache{ + Generation: withTriggers.Generation, + Scalers: scalers, + Logger: h.logger, + Recorder: h.recorder, + } + + return h.scalerCaches[key], nil +} + +func (h *scaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { + h.lock.Lock() + defer h.lock.Unlock() + + key := strings.ToLower(fmt.Sprintf("%s.%s", name, namespace)) + if cache, ok := h.scalerCaches[key]; ok { + cache.Close(ctx) + delete(h.scalerCaches, key) + } +} + func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - ss, err := h.GetScalers(ctx, scalableObject) + cache, err := h.GetScalersCache(ctx, scalableObject) if err != nil { logger.Error(err, "Error getting scalers", "object", scalableObject) return } - for _, s := range ss { - scaler, ok := s.(scalers.PushScaler) - if !ok { - s.Close(ctx) - continue - } - - go func() { + for _, ps := range cache.GetPushScalers() { + go func(s scalers.PushScaler) { activeCh := make(chan bool) - go scaler.Run(ctx, activeCh) - defer scaler.Close(ctx) + go s.Run(ctx, activeCh) + defer s.Close(ctx) for { select { case <-ctx.Done(): @@ -201,14 +236,14 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav scalingMutex.Unlock() } } - }() + }(ps) } } // checkScalers contains the main logic for the ScaleHandler scaling logic. // It'll check each trigger active status then call RequestScale func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}, scalingMutex sync.Locker) { - scalers, err := h.GetScalers(ctx, scalableObject) + cache, err := h.GetScalersCache(ctx, scalableObject) if err != nil { h.logger.Error(err, "Error getting scalers", "object", scalableObject) return @@ -223,7 +258,7 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.logger.Error(err, "Error getting scaledObject", "object", scalableObject) return } - isActive, isError := h.isScaledObjectActive(ctx, scalers, obj) + isActive, isError, _ := cache.IsScaledObjectActive(ctx, obj) h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) case *kedav1alpha1.ScaledJob: err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) @@ -231,83 +266,62 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.logger.Error(err, "Error getting scaledJob", "object", scalableObject) return } - isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, obj) + isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) } } -func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) (bool, bool) { - isActive := false - isError := false - for i, scaler := range scalers { - isTriggerActive, err := scaler.IsActive(ctx) - scaler.Close(ctx) - - if err != nil { - h.logger.V(1).Info("Error getting scale decision", "Error", err) - isError = true - h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } else if isTriggerActive { - isActive = true - if externalMetricsSpec := scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) - } - if resourceMetricsSpec := scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) - } - closeScalers(ctx, scalers[i+1:]) - break - } - } - return isActive, isError -} - -func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder) -} - // buildScalers returns list of Scalers for the specified triggers -func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]scalers.Scaler, error) { +func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) []cache.ScalerBuilder { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - var scalersRes []scalers.Scaler var err error resolvedEnv := make(map[string]string) - if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) - if err != nil { - return scalersRes, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) - } - } + result := make([]cache.ScalerBuilder, 0, len(withTriggers.Spec.Triggers)) + + for scalerIndex, t := range withTriggers.Spec.Triggers { + triggerName, trigger := scalerIndex, t + factory := func() (scalers.Scaler, error) { + if podTemplateSpec != nil { + resolvedEnv, err = resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) + if err != nil { + return nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) + } + } + config := &scalers.ScalerConfig{ + Name: withTriggers.Name, + Namespace: withTriggers.Namespace, + TriggerMetadata: trigger.Metadata, + ResolvedEnv: resolvedEnv, + AuthParams: make(map[string]string), + GlobalHTTPTimeout: h.globalHTTPTimeout, + ScalerIndex: scalerIndex, + } - for scalerIndex, trigger := range withTriggers.Spec.Triggers { - config := &scalers.ScalerConfig{ - Name: withTriggers.Name, - Namespace: withTriggers.Namespace, - TriggerMetadata: trigger.Metadata, - ResolvedEnv: resolvedEnv, - AuthParams: make(map[string]string), - GlobalHTTPTimeout: h.globalHTTPTimeout, - ScalerIndex: scalerIndex, - } + config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) + if err != nil { + return nil, err + } - config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) - if err != nil { - closeScalers(ctx, scalersRes) - return []scalers.Scaler{}, err + return buildScaler(ctx, h.client, trigger.Type, config) } - scaler, err := buildScaler(ctx, h.client, trigger.Type, config) + scaler, err := factory() if err != nil { - closeScalers(ctx, scalersRes) h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - return []scalers.Scaler{}, fmt.Errorf("error getting scaler for trigger #%d: %s", scalerIndex, err) + h.logger.Error(err, "error resolving auth params", "scalerIndex", scalerIndex, "object", withTriggers, "trigger", triggerName) + if scaler != nil { + scaler.Close(ctx) + } + continue } - scalersRes = append(scalersRes, scaler) + result = append(result, cache.ScalerBuilder{ + Scaler: scaler, + Factory: factory, + }) } - return scalersRes, nil + return result } func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { @@ -430,9 +444,3 @@ func asDuckWithTriggers(scalableObject interface{}) (*kedav1alpha1.WithTriggers, return nil, fmt.Errorf("unknown scalable object type %v", scalableObject) } } - -func closeScalers(ctx context.Context, scalers []scalers.Scaler) { - for _, scaler := range scalers { - defer scaler.Close(ctx) - } -} diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index eaf1cbc1c8c..4aee0d3f607 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -19,45 +19,53 @@ package scaling import ( "context" "errors" - "sync" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/mock/mock_client" mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" "github.com/kedacore/keda/v2/pkg/scalers" - "github.com/kedacore/keda/v2/pkg/scaling/executor" + "github.com/kedacore/keda/v2/pkg/scaling/cache" ) func TestCheckScaledObjectScalersWithError(t *testing.T) { ctrl := gomock.NewController(t) - client := mock_client.NewMockClient(ctrl) recorder := record.NewFakeRecorder(1) - scaleHandler := &scaleHandler{ - client: client, - logger: logf.Log.WithName("scalehandler"), - scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), - globalHTTPTimeout: 5 * time.Second, - recorder: recorder, + factory := func() (scalers.Scaler, error) { + scaler := mock_scalers.NewMockScaler(ctrl) + scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error")) + scaler.EXPECT().Close(gomock.Any()) + return scaler, nil } - scaler := mock_scalers.NewMockScaler(ctrl) - scalers := []scalers.Scaler{scaler} - scaledObject := &kedav1alpha1.ScaledObject{} + scaler, err := factory() + assert.Nil(t, err) - scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("Some error")) - scaler.EXPECT().Close(gomock.Any()) + scaledObject := kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + } + + cache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: scaler, + Factory: factory, + }}, + Logger: logf.Log.WithName("scalehandler"), + Recorder: recorder, + } - isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + isActive, isError, _ := cache.IsScaledObjectActive(context.TODO(), &scaledObject) + cache.Close(context.Background()) assert.Equal(t, false, isActive) assert.Equal(t, true, isError) @@ -65,22 +73,15 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { ctrl := gomock.NewController(t) - client := mock_client.NewMockClient(ctrl) recorder := record.NewFakeRecorder(1) - - scaleHandler := &scaleHandler{ - client: client, - logger: logf.Log.WithName("scalehandler"), - scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), - globalHTTPTimeout: 5 * time.Second, - recorder: recorder, - } - activeScaler := mock_scalers.NewMockScaler(ctrl) failingScaler := mock_scalers.NewMockScaler(ctrl) - scalers := []scalers.Scaler{activeScaler, failingScaler} - scaledObject := &kedav1alpha1.ScaledObject{} + scaledObject := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + } metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)} @@ -89,7 +90,25 @@ func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { activeScaler.EXPECT().Close(gomock.Any()) failingScaler.EXPECT().Close(gomock.Any()) - isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + factory := func() (scalers.Scaler, error) { + return mock_scalers.NewMockScaler(ctrl), nil + } + scalers := []cache.ScalerBuilder{{ + Scaler: activeScaler, + Factory: factory, + }, { + Scaler: failingScaler, + Factory: factory, + }} + + scalersCache := cache.ScalersCache{ + Scalers: scalers, + Logger: logf.Log.WithName("scalercache"), + Recorder: recorder, + } + + isActive, isError, _ := scalersCache.IsScaledObjectActive(context.TODO(), scaledObject) + scalersCache.Close(context.Background()) assert.Equal(t, true, isActive) assert.Equal(t, false, isError) diff --git a/pkg/scaling/scaledjob/scale_metrics.go b/pkg/scaling/scaledjob/scale_metrics.go deleted file mode 100644 index 1c702bd5042..00000000000 --- a/pkg/scaling/scaledjob/scale_metrics.go +++ /dev/null @@ -1,184 +0,0 @@ -package scaledjob - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - "k8s.io/api/autoscaling/v2beta2" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/record" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/eventreason" - "github.com/kedacore/keda/v2/pkg/scalers" -) - -type scalerMetrics struct { - queueLength int64 - maxValue int64 - isActive bool -} - -// GetScaleMetrics gets the metrics for decision making of scaling. -func GetScaleMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, recorder record.EventRecorder) (bool, int64, int64) { - var queueLength int64 - var maxValue int64 - isActive := false - - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := getScalersMetrics(ctx, scalers, scaledJob, logger, recorder) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := int64(0) - maxValueSum := int64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = divideWithCeil(queueLengthSum, int64(length)) - maxValue = divideWithCeil(maxValueSum, int64(length)) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - maxValue = min(scaledJob.MaxReplicaCount(), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - - return isActive, queueLength, maxValue -} - -func getScalersMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, logger logr.Logger, recorder record.EventRecorder) []scalerMetrics { - scalersMetrics := []scalerMetrics{} - - for _, scaler := range scalers { - var queueLength int64 - var targetAverageValue int64 - isActive := false - maxValue := int64(0) - scalerType := fmt.Sprintf("%T:", scaler) - - scalerLogger := logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - - metricSpecs := scaler.GetMetricSpecForScaling(ctx) - - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } - - isTriggerActive, err := scaler.IsActive(ctx) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err) - recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - scaler.Close(ctx) - continue - } - - targetAverageValue = getTargetAverageValue(metricSpecs) - - metrics, err := scaler.GetMetrics(ctx, "queueLength", nil) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) - recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - scaler.Close(ctx) - continue - } - - var metricValue int64 - - for _, m := range metrics { - if m.MetricName == "queueLength" { - metricValue, _ = m.Value.AsInt64() - queueLength += metricValue - } - } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) - - scaler.Close(ctx) - - if isTriggerActive { - isActive = true - } - - if targetAverageValue != 0 { - maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue)) - } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, - }) - } - return scalersMetrics -} - -func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 { - var targetAverageValue int64 - var metricValue int64 - var flag bool - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue, flag = metric.External.Target.AverageValue.AsInt64() - if !flag { - metricValue = 0 - } - } - - targetAverageValue += metricValue - } - count := int64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - -func divideWithCeil(x, y int64) int64 { - ans := x / y - reminder := x % y - if reminder != 0 { - return ans + 1 - } - return ans -} - -// Min function for int64 -func min(x, y int64) int64 { - if x > y { - return y - } - return x -} diff --git a/tests/scalers/mongodb.test.ts b/tests/scalers/mongodb.test.ts index 4ccb12a8a02..1ee0c80f854 100644 --- a/tests/scalers/mongodb.test.ts +++ b/tests/scalers/mongodb.test.ts @@ -175,6 +175,24 @@ spec: const deployYaml = ` apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: mongodb-trigger +spec: + secretTargetRef: + - parameter: connectionString + name: mongodb-secret + key: connect +--- +apiVersion: v1 +kind: Secret +metadata: + name: mongodb-secret +type: Opaque +data: + connect: {{MONGODB_CONNECTION_STRING_BASE64}} +--- +apiVersion: keda.sh/v1alpha1 kind: ScaledJob metadata: name: {{MONGODB_JOB_NAME}} @@ -205,21 +223,4 @@ spec: authenticationRef: name: mongodb-trigger --- -apiVersion: keda.sh/v1alpha1 -kind: TriggerAuthentication -metadata: - name: mongodb-trigger -spec: - secretTargetRef: - - parameter: connectionString - name: mongodb-secret - key: connect ---- -apiVersion: v1 -kind: Secret -metadata: - name: mongodb-secret -type: Opaque -data: - connect: {{MONGODB_CONNECTION_STRING_BASE64}} ` From 89d86637407bbbbe0a0493808562e4e615ff0ad2 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger <70865+arschles@users.noreply.github.com> Date: Wed, 10 Nov 2021 01:12:14 -0800 Subject: [PATCH 4/5] Propagating contexts to all remaining scalers (#2267) Signed-off-by: Aaron Schlesinger --- CHANGELOG.md | 1 + adapter/main.go | 13 ++-- apis/keda/v1alpha1/zz_generated.deepcopy.go | 1 + controllers/keda/scaledjob_controller.go | 10 +-- controllers/keda/scaledjob_finalizer.go | 2 +- controllers/keda/scaledobject_controller.go | 10 +-- controllers/keda/scaledobject_finalizer.go | 2 +- pkg/mock/mock_scaling/mock_interface.go | 16 ++--- pkg/provider/provider.go | 6 +- pkg/scalers/azure_servicebus_scaler.go | 14 +++-- pkg/scaling/executor/scale_jobs.go | 44 +++++++------- pkg/scaling/executor/scale_jobs_test.go | 12 ++-- pkg/scaling/resolver/scale_resolvers.go | 64 ++++++++++---------- pkg/scaling/resolver/scale_resolvers_test.go | 10 ++- pkg/scaling/scale_handler.go | 18 +++--- 15 files changed, 120 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b3a11827ba..b0af545f462 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ ### Improvements +- Improve context handling in appropriate functionality in which we instantiate scalers ([#2267](https://github.com/kedacore/keda/pull/2267)) - Improve validation in Cron scaler in case start & end input is same.([#2032](https://github.com/kedacore/keda/pull/2032)) - Improve the cron validation in Cron Scaler ([#2038](https://github.com/kedacore/keda/pull/2038)) - Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028)) diff --git a/adapter/main.go b/adapter/main.go index b3c3baf0e0c..2c9f34134b7 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -69,7 +69,7 @@ var ( adapterClientRequestBurst int ) -func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) { +func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) { // Get a config to talk to the apiserver cfg, err := config.GetConfig() if cfg != nil { @@ -113,14 +113,14 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric prometheusServer := &prommetrics.PrometheusMetricServer{} go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }() stopCh := make(chan struct{}) - if err := runScaledObjectController(scheme, namespace, handler, logger, stopCh); err != nil { + if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, stopCh); err != nil { return nil, nil, err } - return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), stopCh, nil + return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace), stopCh, nil } -func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error { +func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error { mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Namespace: namespace, @@ -136,7 +136,7 @@ func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scal } go func() { - if err := mgr.Start(context.Background()); err != nil { + if err := mgr.Start(ctx); err != nil { logger.Error(err, "controller-runtime encountered an error") stopCh <- struct{}{} close(stopCh) @@ -164,6 +164,7 @@ func getWatchNamespace() (string, error) { } func main() { + ctx := ctrl.SetupSignalHandler() var err error defer func() { if err != nil { @@ -205,7 +206,7 @@ func main() { return } - kedaProvider, stopCh, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond) + kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond) if err != nil { logger.Error(err, "making provider") return diff --git a/apis/keda/v1alpha1/zz_generated.deepcopy.go b/apis/keda/v1alpha1/zz_generated.deepcopy.go index 53d00a01491..d6e286bf9e8 100644 --- a/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index e14fc90bdff..5ae8bdffb92 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -147,7 +147,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log } // scaledJob was created or modified - let's start a new ScaleLoop - err = r.requestScaleLoop(logger, scaledJob) + err = r.requestScaleLoop(ctx, logger, scaledJob) if err != nil { return "Failed to start a new scale loop with scaling logic", err } @@ -187,13 +187,13 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context } // requestScaleLoop request ScaleLoop handler for the respective ScaledJob -func (r *ScaledJobReconciler) requestScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { +func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { logger.V(1).Info("Starting a new ScaleLoop") - return r.scaleHandler.HandleScalableObject(scaledJob) + return r.scaleHandler.HandleScalableObject(ctx, scaledJob) } // stopScaleLoop stops ScaleLoop handler for the respective ScaledJob -func (r *ScaledJobReconciler) stopScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { +func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { logger.V(1).Info("Stopping a ScaleLoop") - return r.scaleHandler.DeleteScalableObject(scaledJob) + return r.scaleHandler.DeleteScalableObject(ctx, scaledJob) } diff --git a/controllers/keda/scaledjob_finalizer.go b/controllers/keda/scaledjob_finalizer.go index f78591e98b9..f910a1e9984 100644 --- a/controllers/keda/scaledjob_finalizer.go +++ b/controllers/keda/scaledjob_finalizer.go @@ -37,7 +37,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr // Run finalization logic for scaledJobFinalizer. If the // finalization logic fails, don't remove the finalizer so // that we can retry during the next reconciliation. - if err := r.stopScaleLoop(logger, scaledJob); err != nil { + if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil { return err } diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index a9fbdfa809c..94a00e243de 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -241,7 +241,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg // Notify ScaleHandler if a new HPA was created or if ScaledObject was updated if newHPACreated || scaleObjectSpecChanged { - if r.requestScaleLoop(logger, scaledObject) != nil { + if r.requestScaleLoop(ctx, logger, scaledObject) != nil { return "Failed to start a new scale loop with scaling logic", err } logger.Info("Initializing Scaling logic according to ScaledObject Specification") @@ -382,7 +382,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont } // startScaleLoop starts ScaleLoop handler for the respective ScaledObject -func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { logger.V(1).Info("Notify scaleHandler of an update in scaledObject") key, err := cache.MetaNamespaceKeyFunc(scaledObject) @@ -391,7 +391,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObje return err } - if err = r.scaleHandler.HandleScalableObject(scaledObject); err != nil { + if err = r.scaleHandler.HandleScalableObject(ctx, scaledObject); err != nil { return err } @@ -402,14 +402,14 @@ func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObje } // stopScaleLoop stops ScaleLoop handler for the respective ScaleObject -func (r *ScaledObjectReconciler) stopScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { key, err := cache.MetaNamespaceKeyFunc(scaledObject) if err != nil { logger.Error(err, "Error getting key for scaledObject") return err } - if err := r.scaleHandler.DeleteScalableObject(scaledObject); err != nil { + if err := r.scaleHandler.DeleteScalableObject(ctx, scaledObject); err != nil { return err } // delete ScaledObject's current Generation diff --git a/controllers/keda/scaledobject_finalizer.go b/controllers/keda/scaledobject_finalizer.go index 5ca4a7684d5..02e2564c645 100644 --- a/controllers/keda/scaledobject_finalizer.go +++ b/controllers/keda/scaledobject_finalizer.go @@ -39,7 +39,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge // Run finalization logic for scaledObjectFinalizer. If the // finalization logic fails, don't remove the finalizer so // that we can retry during the next reconciliation. - if err := r.stopScaleLoop(logger, scaledObject); err != nil { + if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil { return err } diff --git a/pkg/mock/mock_scaling/mock_interface.go b/pkg/mock/mock_scaling/mock_interface.go index 6ac7c5e7ee1..7913f2e67fc 100644 --- a/pkg/mock/mock_scaling/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_interface.go @@ -48,17 +48,17 @@ func (mr *MockScaleHandlerMockRecorder) ClearScalersCache(ctx, name, namespace i } // DeleteScalableObject mocks base method. -func (m *MockScaleHandler) DeleteScalableObject(scalableObject interface{}) error { +func (m *MockScaleHandler) DeleteScalableObject(ctx context.Context, scalableObject interface{}) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteScalableObject", scalableObject) + ret := m.ctrl.Call(m, "DeleteScalableObject", ctx, scalableObject) ret0, _ := ret[0].(error) return ret0 } // DeleteScalableObject indicates an expected call of DeleteScalableObject. -func (mr *MockScaleHandlerMockRecorder) DeleteScalableObject(scalableObject interface{}) *gomock.Call { +func (mr *MockScaleHandlerMockRecorder) DeleteScalableObject(ctx, scalableObject interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).DeleteScalableObject), scalableObject) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).DeleteScalableObject), ctx, scalableObject) } // GetScalersCache mocks base method. @@ -77,15 +77,15 @@ func (mr *MockScaleHandlerMockRecorder) GetScalersCache(ctx, scalableObject inte } // HandleScalableObject mocks base method. -func (m *MockScaleHandler) HandleScalableObject(scalableObject interface{}) error { +func (m *MockScaleHandler) HandleScalableObject(ctx context.Context, scalableObject interface{}) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleScalableObject", scalableObject) + ret := m.ctrl.Call(m, "HandleScalableObject", ctx, scalableObject) ret0, _ := ret[0].(error) return ret0 } // HandleScalableObject indicates an expected call of HandleScalableObject. -func (mr *MockScaleHandlerMockRecorder) HandleScalableObject(scalableObject interface{}) *gomock.Call { +func (mr *MockScaleHandlerMockRecorder) HandleScalableObject(ctx, scalableObject interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).HandleScalableObject), scalableObject) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleScalableObject", reflect.TypeOf((*MockScaleHandler)(nil).HandleScalableObject), ctx, scalableObject) } diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 2c46057a8fa..4abd69e0c58 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -42,6 +42,7 @@ type KedaProvider struct { externalMetrics []externalMetric scaleHandler scaling.ScaleHandler watchedNamespace string + ctx context.Context } type externalMetric struct{} @@ -50,13 +51,14 @@ var logger logr.Logger var metricsServer prommetrics.PrometheusMetricServer // NewProvider returns an instance of KedaProvider -func NewProvider(adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider { +func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider { provider := &KedaProvider{ values: make(map[provider.CustomMetricInfo]int64), externalMetrics: make([]externalMetric, 2, 10), client: client, scaleHandler: scaleHandler, watchedNamespace: watchedNamespace, + ctx: ctx, } logger = adapterLogger.WithName("provider") logger.Info("starting") @@ -149,7 +151,7 @@ func (p *KedaProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo { opts := []client.ListOption{ client.InNamespace(p.watchedNamespace), } - err := p.client.List(context.TODO(), scaledObjects, opts...) + err := p.client.List(p.ctx, scaledObjects, opts...) if err != nil { logger.Error(err, "Cannot get list of ScaledObjects", "WatchedNamespace", p.watchedNamespace) return nil diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go index fba48780102..e55c17028a9 100755 --- a/pkg/scalers/azure_servicebus_scaler.go +++ b/pkg/scalers/azure_servicebus_scaler.go @@ -50,6 +50,7 @@ const ( var azureServiceBusLog = logf.Log.WithName("azure_servicebus_scaler") type azureServiceBusScaler struct { + ctx context.Context metadata *azureServiceBusMetadata podIdentity kedav1alpha1.PodIdentityProvider httpClient *http.Client @@ -68,13 +69,14 @@ type azureServiceBusMetadata struct { } // NewAzureServiceBusScaler creates a new AzureServiceBusScaler -func NewAzureServiceBusScaler(config *ScalerConfig) (Scaler, error) { +func NewAzureServiceBusScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { meta, err := parseAzureServiceBusMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing azure service bus metadata: %s", err) } return &azureServiceBusScaler{ + ctx: ctx, metadata: meta, podIdentity: config.PodIdentity, httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout), @@ -180,7 +182,7 @@ func (s *azureServiceBusScaler) Close(context.Context) error { // Returns the metric spec to be used by the HPA func (s *azureServiceBusScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { targetLengthQty := resource.NewQuantity(int64(s.metadata.targetLength), resource.DecimalSI) - namespace, err := s.getServiceBusNamespace() + namespace, err := s.getServiceBusNamespace(s.ctx) if err != nil { azureServiceBusLog.Error(err, "error parsing azure service bus metadata", "namespace") return nil @@ -225,11 +227,12 @@ func (s *azureServiceBusScaler) GetMetrics(ctx context.Context, metricName strin type azureTokenProvider struct { httpClient *http.Client + ctx context.Context } // GetToken implements TokenProvider interface for azureTokenProvider func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) { - ctx := context.Background() + ctx := a.ctx // Service bus resource id is "https://servicebus.azure.net/" in all cloud environments token, err := azure.GetAzureADPodIdentityToken(ctx, a.httpClient, "https://servicebus.azure.net/") if err != nil { @@ -246,7 +249,7 @@ func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) { // Returns the length of the queue or subscription func (s *azureServiceBusScaler) GetAzureServiceBusLength(ctx context.Context) (int32, error) { // get namespace - namespace, err := s.getServiceBusNamespace() + namespace, err := s.getServiceBusNamespace(ctx) if err != nil { return -1, err } @@ -262,7 +265,7 @@ func (s *azureServiceBusScaler) GetAzureServiceBusLength(ctx context.Context) (i } // Returns service bus namespace object -func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace, error) { +func (s *azureServiceBusScaler) getServiceBusNamespace(ctx context.Context) (*servicebus.Namespace, error) { var namespace *servicebus.Namespace var err error @@ -277,6 +280,7 @@ func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace, return namespace, err } namespace.TokenProvider = azureTokenProvider{ + ctx: ctx, httpClient: s.httpClient, } namespace.Name = s.metadata.namespace diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index e5a5aca9405..582ea82a8b9 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -41,8 +41,8 @@ const ( func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) { logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace) - runningJobCount := e.getRunningJobCount(scaledJob) - pendingJobCount := e.getPendingJobCount(scaledJob) + runningJobCount := e.getRunningJobCount(ctx, scaledJob) + pendingJobCount := e.getPendingJobCount(ctx, scaledJob) logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount) logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount) @@ -60,7 +60,7 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al if err != nil { logger.Error(err, "Failed to update last active time") } - e.createJobs(logger, scaledJob, scaleTo, effectiveMaxScale) + e.createJobs(ctx, logger, scaledJob, scaleTo, effectiveMaxScale) } else { logger.V(1).Info("No change in activity") } @@ -80,13 +80,13 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al } } - err := e.cleanUp(scaledJob) + err := e.cleanUp(ctx, scaledJob) if err != nil { logger.Error(err, "Failed to cleanUp jobs") } } -func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) { +func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) { scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-" if scaledJob.Spec.JobTargetRef.Template.Labels == nil { scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{} @@ -134,7 +134,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S logger.Error(err, "Failed to set ScaledJob as the owner of the new Job") } - err = e.client.Create(context.TODO(), job) + err = e.client.Create(ctx, job) if err != nil { logger.Error(err, "Failed to create a new Job") } @@ -152,7 +152,7 @@ func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool { return false } -func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 { +func (e *scaleExecutor) getRunningJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 { var runningJobs int64 opts := []client.ListOption{ @@ -161,7 +161,7 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in } jobs := &batchv1.JobList{} - err := e.client.List(context.TODO(), jobs, opts...) + err := e.client.List(ctx, jobs, opts...) if err != nil { return 0 @@ -177,14 +177,14 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in return runningJobs } -func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool { +func (e *scaleExecutor) isAnyPodRunningOrCompleted(ctx context.Context, j *batchv1.Job) bool { opts := []client.ListOption{ client.InNamespace(j.GetNamespace()), client.MatchingLabels(map[string]string{"job-name": j.GetName()}), } pods := &corev1.PodList{} - err := e.client.List(context.TODO(), pods, opts...) + err := e.client.List(ctx, pods, opts...) if err != nil { return false @@ -199,14 +199,14 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool { return false } -func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(j *batchv1.Job, pendingPodConditions []string) bool { +func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(ctx context.Context, j *batchv1.Job, pendingPodConditions []string) bool { opts := []client.ListOption{ client.InNamespace(j.GetNamespace()), client.MatchingLabels(map[string]string{"job-name": j.GetName()}), } pods := &corev1.PodList{} - err := e.client.List(context.TODO(), pods, opts...) + err := e.client.List(ctx, pods, opts...) if err != nil { return false } @@ -226,7 +226,7 @@ func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(j *batchv1.Job, pend return len(pendingPodConditions) == fulfilledConditionsCount } -func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 { +func (e *scaleExecutor) getPendingJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 { var pendingJobs int64 opts := []client.ListOption{ @@ -235,7 +235,7 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in } jobs := &batchv1.JobList{} - err := e.client.List(context.TODO(), jobs, opts...) + err := e.client.List(ctx, jobs, opts...) if err != nil { return 0 @@ -246,11 +246,11 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in if !e.isJobFinished(&job) { if len(scaledJob.Spec.ScalingStrategy.PendingPodConditions) > 0 { - if !e.areAllPendingPodConditionsFulfilled(&job, scaledJob.Spec.ScalingStrategy.PendingPodConditions) { + if !e.areAllPendingPodConditionsFulfilled(ctx, &job, scaledJob.Spec.ScalingStrategy.PendingPodConditions) { pendingJobs++ } } else { - if !e.isAnyPodRunningOrCompleted(&job) { + if !e.isAnyPodRunningOrCompleted(ctx, &job) { pendingJobs++ } } @@ -261,7 +261,7 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in } // Clean up will delete the jobs that is exceed historyLimit -func (e *scaleExecutor) cleanUp(scaledJob *kedav1alpha1.ScaledJob) error { +func (e *scaleExecutor) cleanUp(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) error { logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace) opts := []client.ListOption{ @@ -270,7 +270,7 @@ func (e *scaleExecutor) cleanUp(scaledJob *kedav1alpha1.ScaledJob) error { } jobs := &batchv1.JobList{} - err := e.client.List(context.TODO(), jobs, opts...) + err := e.client.List(ctx, jobs, opts...) if err != nil { logger.Error(err, "Can not get list of Jobs") return err @@ -303,18 +303,18 @@ func (e *scaleExecutor) cleanUp(scaledJob *kedav1alpha1.ScaledJob) error { failedJobsHistoryLimit = *scaledJob.Spec.FailedJobsHistoryLimit } - err = e.deleteJobsWithHistoryLimit(logger, completedJobs, successfulJobsHistoryLimit) + err = e.deleteJobsWithHistoryLimit(ctx, logger, completedJobs, successfulJobsHistoryLimit) if err != nil { return err } - err = e.deleteJobsWithHistoryLimit(logger, failedJobs, failedJobsHistoryLimit) + err = e.deleteJobsWithHistoryLimit(ctx, logger, failedJobs, failedJobsHistoryLimit) if err != nil { return err } return nil } -func (e *scaleExecutor) deleteJobsWithHistoryLimit(logger logr.Logger, jobs []batchv1.Job, historyLimit int32) error { +func (e *scaleExecutor) deleteJobsWithHistoryLimit(ctx context.Context, logger logr.Logger, jobs []batchv1.Job, historyLimit int32) error { if len(jobs) <= int(historyLimit) { return nil } @@ -325,7 +325,7 @@ func (e *scaleExecutor) deleteJobsWithHistoryLimit(logger logr.Logger, jobs []ba deleteOptions := &client.DeleteOptions{ PropagationPolicy: &deletePolicy, } - err := e.client.Delete(context.TODO(), j.DeepCopy(), deleteOptions) + err := e.client.Delete(ctx, j.DeepCopy(), deleteOptions) if err != nil { return err } diff --git a/pkg/scaling/executor/scale_jobs_test.go b/pkg/scaling/executor/scale_jobs_test.go index 652d3daaa00..fb142d483b8 100644 --- a/pkg/scaling/executor/scale_jobs_test.go +++ b/pkg/scaling/executor/scale_jobs_test.go @@ -36,6 +36,7 @@ import ( ) func TestCleanUpNormalCase(t *testing.T) { + ctx := context.Background() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -55,7 +56,7 @@ func TestCleanUpNormalCase(t *testing.T) { scaleExecutor := getMockScaleExecutor(client) - err := scaleExecutor.cleanUp(scaledJob) + err := scaleExecutor.cleanUp(ctx, scaledJob) if err != nil { t.Errorf("Unable to cleanup as: %v", err) return @@ -133,6 +134,7 @@ func TestAccurateScalingStrategy(t *testing.T) { } func TestCleanUpMixedCaseWithSortByTime(t *testing.T) { + ctx := context.Background() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -155,7 +157,7 @@ func TestCleanUpMixedCaseWithSortByTime(t *testing.T) { scaleExecutor := getMockScaleExecutor(client) - err := scaleExecutor.cleanUp(scaledJob) + err := scaleExecutor.cleanUp(ctx, scaledJob) if err != nil { t.Errorf("Unable to cleanup as: %v", err) return @@ -170,6 +172,7 @@ func TestCleanUpMixedCaseWithSortByTime(t *testing.T) { } func TestCleanUpDefaultValue(t *testing.T) { + ctx := context.Background() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -196,7 +199,7 @@ func TestCleanUpDefaultValue(t *testing.T) { scaleExecutor := getMockScaleExecutor(client) - err := scaleExecutor.cleanUp(scaledJob) + err := scaleExecutor.cleanUp(ctx, scaledJob) if err != nil { t.Errorf("Unable to cleanup as: %v", err) return @@ -230,11 +233,12 @@ func TestGetPendingJobCount(t *testing.T) { } for _, testData := range testPendingJobTestData { + ctx := context.Background() client := getMockClientForTestingPendingPods(t, ctrl, testData.PodStatus) scaleExecutor := getMockScaleExecutor(client) scaledJob := getMockScaledJobWithPendingPodConditions(testData.PendingPodConditions) - result := scaleExecutor.getPendingJobCount(scaledJob) + result := scaleExecutor.getPendingJobCount(ctx, scaledJob) assert.Equal(t, testData.PendingJobCount, result) } diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 004b40328ec..c30a32c1d92 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -45,7 +45,7 @@ const ( // which could be almost any k8s resource (Deployment, StatefulSet, CustomResource...) // and for the given resource returns *corev1.PodTemplateSpec and a name of the container // which is being used for referencing environment variables -func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) { +func ResolveScaleTargetPodSpec(ctx context.Context, kubeClient client.Client, logger logr.Logger, scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) { switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: // Try to get a real object instance for better cache usage, but fall back to an Unstructured if needed. @@ -56,7 +56,7 @@ func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, sca // For core types, use a typed client so we get an informer-cache-backed Get to reduce API load. case gvk.Group == "apps" && gvk.Kind == "Deployment": deployment := &appsv1.Deployment{} - if err := kubeClient.Get(context.TODO(), objKey, deployment); err != nil { + if err := kubeClient.Get(ctx, objKey, deployment); err != nil { // resource doesn't exist logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) return nil, "", err @@ -65,7 +65,7 @@ func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, sca podTemplateSpec.Spec = deployment.Spec.Template.Spec case gvk.Group == "apps" && gvk.Kind == "StatefulSet": statefulSet := &appsv1.StatefulSet{} - if err := kubeClient.Get(context.TODO(), objKey, statefulSet); err != nil { + if err := kubeClient.Get(ctx, objKey, statefulSet); err != nil { // resource doesn't exist logger.Error(err, "Target deployment doesn't exist", "resource", gvk.String(), "name", objKey.Name) return nil, "", err @@ -75,7 +75,7 @@ func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, sca default: unstruct := &unstructured.Unstructured{} unstruct.SetGroupVersionKind(gvk) - if err := kubeClient.Get(context.TODO(), objKey, unstruct); err != nil { + if err := kubeClient.Get(ctx, objKey, unstruct); err != nil { // resource doesn't exist logger.Error(err, "Target resource doesn't exist", "resource", gvk.String(), "name", objKey.Name) return nil, "", err @@ -103,7 +103,7 @@ func ResolveScaleTargetPodSpec(kubeClient client.Client, logger logr.Logger, sca // ResolveContainerEnv resolves all environment variables in a container. // It returns either map of env variable key and value or error if there is any. -func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) { +func ResolveContainerEnv(ctx context.Context, client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) { if len(podSpec.Containers) < 1 { return nil, fmt.Errorf("target object doesn't have containers") } @@ -126,18 +126,18 @@ func ResolveContainerEnv(client client.Client, logger logr.Logger, podSpec *core container = podSpec.Containers[0] } - return resolveEnv(client, logger, &container, namespace) + return resolveEnv(ctx, client, logger, &container, namespace) } // ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment. -func ResolveAuthRefAndPodIdentity(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider, error) { +func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider, error) { if podTemplateSpec != nil { - authParams, podIdentity := resolveAuthRef(client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace) + authParams, podIdentity := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace) if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS { serviceAccountName := podTemplateSpec.Spec.ServiceAccountName serviceAccount := &corev1.ServiceAccount{} - err := client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount) + err := client.Get(ctx, types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount) if err != nil { return nil, kedav1alpha1.PodIdentityProviderNone, fmt.Errorf("error getting service account: %s", err) } @@ -148,18 +148,18 @@ func ResolveAuthRefAndPodIdentity(client client.Client, logger logr.Logger, trig return authParams, podIdentity, nil } - authParams, _ := resolveAuthRef(client, logger, triggerAuthRef, nil, namespace) + authParams, _ := resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace) return authParams, kedav1alpha1.PodIdentityProviderNone, nil } // resolveAuthRef provides authentication parameters needed authenticate scaler with the environment. // based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned -func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) { +func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, namespace string) (map[string]string, kedav1alpha1.PodIdentityProvider) { result := make(map[string]string) var podIdentity kedav1alpha1.PodIdentityProvider if namespace != "" && triggerAuthRef != nil && triggerAuthRef.Name != "" { - triggerAuthSpec, triggerNamespace, err := getTriggerAuthSpec(client, triggerAuthRef, namespace) + triggerAuthSpec, triggerNamespace, err := getTriggerAuthSpec(ctx, client, triggerAuthRef, namespace) if err != nil { logger.Error(err, "Error getting triggerAuth", "triggerAuthRef.Name", triggerAuthRef.Name) } else { @@ -172,7 +172,7 @@ func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *ke result[e.Parameter] = "" continue } - env, err := ResolveContainerEnv(client, logger, podSpec, e.ContainerName, namespace) + env, err := ResolveContainerEnv(ctx, client, logger, podSpec, e.ContainerName, namespace) if err != nil { result[e.Parameter] = "" } else { @@ -182,7 +182,7 @@ func resolveAuthRef(client client.Client, logger logr.Logger, triggerAuthRef *ke } if triggerAuthSpec.SecretTargetRef != nil { for _, e := range triggerAuthSpec.SecretTargetRef { - result[e.Parameter] = resolveAuthSecret(client, logger, e.Name, triggerNamespace, e.Key) + result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key) } } if triggerAuthSpec.HashiCorpVault != nil && len(triggerAuthSpec.HashiCorpVault.Secrets) > 0 { @@ -237,10 +237,10 @@ func getClusterObjectNamespace() (string, error) { return strData, nil } -func getTriggerAuthSpec(client client.Client, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, namespace string) (*kedav1alpha1.TriggerAuthenticationSpec, string, error) { +func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, namespace string) (*kedav1alpha1.TriggerAuthenticationSpec, string, error) { if triggerAuthRef.Kind == "" || triggerAuthRef.Kind == "TriggerAuthentication" { triggerAuth := &kedav1alpha1.TriggerAuthentication{} - err := client.Get(context.TODO(), types.NamespacedName{Name: triggerAuthRef.Name, Namespace: namespace}, triggerAuth) + err := client.Get(ctx, types.NamespacedName{Name: triggerAuthRef.Name, Namespace: namespace}, triggerAuth) if err != nil { return nil, "", err } @@ -251,7 +251,7 @@ func getTriggerAuthSpec(client client.Client, triggerAuthRef *kedav1alpha1.Scale return nil, "", err } triggerAuth := &kedav1alpha1.ClusterTriggerAuthentication{} - err = client.Get(context.TODO(), types.NamespacedName{Name: triggerAuthRef.Name}, triggerAuth) + err = client.Get(ctx, types.NamespacedName{Name: triggerAuthRef.Name}, triggerAuth) if err != nil { return nil, "", err } @@ -260,13 +260,13 @@ func getTriggerAuthSpec(client client.Client, triggerAuthRef *kedav1alpha1.Scale return nil, "", fmt.Errorf("unknown trigger auth kind %s", triggerAuthRef.Kind) } -func resolveEnv(client client.Client, logger logr.Logger, container *corev1.Container, namespace string) (map[string]string, error) { +func resolveEnv(ctx context.Context, client client.Client, logger logr.Logger, container *corev1.Container, namespace string) (map[string]string, error) { resolved := make(map[string]string) if container.EnvFrom != nil { for _, source := range container.EnvFrom { if source.ConfigMapRef != nil { - configMap, err := resolveConfigMap(client, source.ConfigMapRef, namespace) + configMap, err := resolveConfigMap(ctx, client, source.ConfigMapRef, namespace) switch { case err == nil: for k, v := range configMap { @@ -279,7 +279,7 @@ func resolveEnv(client client.Client, logger logr.Logger, container *corev1.Cont return nil, fmt.Errorf("error reading config ref %s on namespace %s/: %s", source.ConfigMapRef, namespace, err) } } else if source.SecretRef != nil { - secretsMap, err := resolveSecretMap(client, source.SecretRef, namespace) + secretsMap, err := resolveSecretMap(ctx, client, source.SecretRef, namespace) switch { case err == nil: for k, v := range secretsMap { @@ -309,7 +309,7 @@ func resolveEnv(client client.Client, logger logr.Logger, container *corev1.Cont switch { case envVar.ValueFrom.SecretKeyRef != nil: // env is a secret selector - value, err = resolveSecretValue(client, envVar.ValueFrom.SecretKeyRef, envVar.ValueFrom.SecretKeyRef.Key, namespace) + value, err = resolveSecretValue(ctx, client, envVar.ValueFrom.SecretKeyRef, envVar.ValueFrom.SecretKeyRef.Key, namespace) if err != nil { return nil, fmt.Errorf("error resolving secret name %s for env %s in namespace %s", envVar.ValueFrom.SecretKeyRef, @@ -318,7 +318,7 @@ func resolveEnv(client client.Client, logger logr.Logger, container *corev1.Cont } case envVar.ValueFrom.ConfigMapKeyRef != nil: // env is a configMap selector - value, err = resolveConfigValue(client, envVar.ValueFrom.ConfigMapKeyRef, envVar.ValueFrom.ConfigMapKeyRef.Key, namespace) + value, err = resolveConfigValue(ctx, client, envVar.ValueFrom.ConfigMapKeyRef, envVar.ValueFrom.ConfigMapKeyRef.Key, namespace) if err != nil { return nil, fmt.Errorf("error resolving config %s for env %s in namespace %s", envVar.ValueFrom.ConfigMapKeyRef, @@ -384,18 +384,18 @@ func resolveEnvValue(value string, env map[string]string) string { return buf.String() + value[checkpoint:] } -func resolveConfigMap(client client.Client, configMapRef *corev1.ConfigMapEnvSource, namespace string) (map[string]string, error) { +func resolveConfigMap(ctx context.Context, client client.Client, configMapRef *corev1.ConfigMapEnvSource, namespace string) (map[string]string, error) { configMap := &corev1.ConfigMap{} - err := client.Get(context.TODO(), types.NamespacedName{Name: configMapRef.Name, Namespace: namespace}, configMap) + err := client.Get(ctx, types.NamespacedName{Name: configMapRef.Name, Namespace: namespace}, configMap) if err != nil { return nil, err } return configMap.Data, nil } -func resolveSecretMap(client client.Client, secretMapRef *corev1.SecretEnvSource, namespace string) (map[string]string, error) { +func resolveSecretMap(ctx context.Context, client client.Client, secretMapRef *corev1.SecretEnvSource, namespace string) (map[string]string, error) { secret := &corev1.Secret{} - err := client.Get(context.TODO(), types.NamespacedName{Name: secretMapRef.Name, Namespace: namespace}, secret) + err := client.Get(ctx, types.NamespacedName{Name: secretMapRef.Name, Namespace: namespace}, secret) if err != nil { return nil, err } @@ -407,32 +407,32 @@ func resolveSecretMap(client client.Client, secretMapRef *corev1.SecretEnvSource return secretsStr, nil } -func resolveSecretValue(client client.Client, secretKeyRef *corev1.SecretKeySelector, keyName, namespace string) (string, error) { +func resolveSecretValue(ctx context.Context, client client.Client, secretKeyRef *corev1.SecretKeySelector, keyName, namespace string) (string, error) { secret := &corev1.Secret{} - err := client.Get(context.TODO(), types.NamespacedName{Name: secretKeyRef.Name, Namespace: namespace}, secret) + err := client.Get(ctx, types.NamespacedName{Name: secretKeyRef.Name, Namespace: namespace}, secret) if err != nil { return "", err } return string(secret.Data[keyName]), nil } -func resolveConfigValue(client client.Client, configKeyRef *corev1.ConfigMapKeySelector, keyName, namespace string) (string, error) { +func resolveConfigValue(ctx context.Context, client client.Client, configKeyRef *corev1.ConfigMapKeySelector, keyName, namespace string) (string, error) { configMap := &corev1.ConfigMap{} - err := client.Get(context.TODO(), types.NamespacedName{Name: configKeyRef.Name, Namespace: namespace}, configMap) + err := client.Get(ctx, types.NamespacedName{Name: configKeyRef.Name, Namespace: namespace}, configMap) if err != nil { return "", err } return configMap.Data[keyName], nil } -func resolveAuthSecret(client client.Client, logger logr.Logger, name, namespace, key string) string { +func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Logger, name, namespace, key string) string { if name == "" || namespace == "" || key == "" { logger.Error(fmt.Errorf("error trying to get secret"), "name, namespace and key are required", "Secret.Namespace", namespace, "Secret.Name", name, "key", key) return "" } secret := &corev1.Secret{} - err := client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, secret) + err := client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, secret) if err != nil { logger.Error(err, "Error trying to get secret from namespace", "Secret.Namespace", namespace, "Secret.Name", name) return "" diff --git a/pkg/scaling/resolver/scale_resolvers_test.go b/pkg/scaling/resolver/scale_resolvers_test.go index 76d8c9751f4..78f0072af3e 100644 --- a/pkg/scaling/resolver/scale_resolvers_test.go +++ b/pkg/scaling/resolver/scale_resolvers_test.go @@ -17,6 +17,7 @@ limitations under the License. package resolver import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -146,7 +147,8 @@ var testMetadatas = []testMetadata{ func TestResolveNonExistingConfigMapsOrSecretsEnv(t *testing.T) { for _, testData := range testMetadatas { - _, err := resolveEnv(fake.NewFakeClient(), logf.Log.WithName("test"), testData.container, namespace) + ctx := context.Background() + _, err := resolveEnv(ctx, fake.NewFakeClient(), logf.Log.WithName("test"), testData.container, namespace) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) @@ -328,8 +330,9 @@ func TestResolveAuthRef(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { + ctx := context.Background() clusterObjectNamespaceCache = &clusterNamespace // Inject test cluster namespace. - gotMap, gotPodIdentity := resolveAuthRef(fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace) + gotMap, gotPodIdentity := resolveAuthRef(ctx, fake.NewFakeClientWithScheme(scheme.Scheme, test.existing...), logf.Log.WithName("test"), test.soar, test.podSpec, namespace) if diff := cmp.Diff(gotMap, test.expected); diff != "" { t.Errorf("Returned authParams are different: %s", diff) } @@ -446,7 +449,8 @@ func TestResolveDependentEnv(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - envMap, _ := resolveEnv(fake.NewFakeClient(), logf.Log.WithName("test"), test.container, namespace) + ctx := context.Background() + envMap, _ := resolveEnv(ctx, fake.NewFakeClient(), logf.Log.WithName("test"), test.container, namespace) if diff := cmp.Diff(envMap, test.expected); diff != "" { t.Errorf("Returned authParams are different: %s", diff) } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 8a0e7e6fb18..68be3bcadb9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -43,8 +43,8 @@ import ( // ScaleHandler encapsulates the logic of calling the right scalers for // each ScaledObject and making the final scale decision and operation type ScaleHandler interface { - HandleScalableObject(scalableObject interface{}) error - DeleteScalableObject(scalableObject interface{}) error + HandleScalableObject(ctx context.Context, scalableObject interface{}) error + DeleteScalableObject(ctx context.Context, scalableObject interface{}) error GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) ClearScalersCache(ctx context.Context, name, namespace string) } @@ -74,7 +74,7 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon } } -func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { +func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject interface{}) error { withTriggers, err := asDuckWithTriggers(scalableObject) if err != nil { h.logger.Error(err, "error duck typing object into withTrigger") @@ -82,7 +82,7 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { } key := withTriggers.GenerateIdenitifier() - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) // cancel the outdated ScaleLoop for the same ScaledObject (if exists) value, loaded := h.scaleLoopContexts.LoadOrStore(key, cancel) @@ -111,7 +111,7 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { return nil } -func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { +func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject interface{}) error { withTriggers, err := asDuckWithTriggers(scalableObject) if err != nil { h.logger.Error(err, "error duck typing object into withTrigger") @@ -180,7 +180,7 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter cache.Close(ctx) } - podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(h.client, h.logger, scalableObject) + podTemplateSpec, containerName, err := resolver.ResolveScaleTargetPodSpec(ctx, h.client, h.logger, scalableObject) if err != nil { return nil, err } @@ -282,7 +282,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp triggerName, trigger := scalerIndex, t factory := func() (scalers.Scaler, error) { if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) + resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) if err != nil { return nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) } @@ -297,7 +297,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp ScalerIndex: scalerIndex, } - config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) + config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) if err != nil { return nil, err } @@ -348,7 +348,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "azure-queue": return scalers.NewAzureQueueScaler(config) case "azure-servicebus": - return scalers.NewAzureServiceBusScaler(config) + return scalers.NewAzureServiceBusScaler(ctx, config) case "cassandra": return scalers.NewCassandraScaler(config) case "cpu": From cc95674a04d085ce6cc17f58cfa6e25cdc62636c Mon Sep 17 00:00:00 2001 From: sosoftmandruszak Date: Wed, 10 Nov 2021 12:08:15 +0100 Subject: [PATCH 5/5] Validate values length for prometheus scaler (#2264) Signed-off-by: Magdalena Andruszak --- CHANGELOG.md | 1 + pkg/scalers/prometheus_scaler.go | 7 +++ pkg/scalers/prometheus_scaler_test.go | 87 +++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0af545f462..d2324c8bfad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - Improve error message if `IdleReplicaCount` are equal to `MinReplicaCount` to be the same as the check ([#2212](https://github.com/kedacore/keda/pull/2212)) - Improve Cloudwatch Scaler metric exporting logic ([#2243](https://github.com/kedacore/keda/pull/2243)) - Refactor aws related scalers to reuse the aws clients instead of creating a new one for every GetMetrics call([#2255](https://github.com/kedacore/keda/pull/2255)) +- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264)) ### Breaking Changes diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 0a049a9b196..84c3dde3dd6 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -263,6 +263,13 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error return -1, fmt.Errorf("prometheus query %s returned multiple elements", s.metadata.query) } + valueLen := len(result.Data.Result[0].Value) + if valueLen == 0 { + return 0, nil + } else if valueLen < 2 { + return -1, fmt.Errorf("prometheus query %s didn't return enough values", s.metadata.query) + } + val := result.Data.Result[0].Value[1] if val != nil { s := val.(string) diff --git a/pkg/scalers/prometheus_scaler_test.go b/pkg/scalers/prometheus_scaler_test.go index 8ec064c9dde..3f4169502c0 100644 --- a/pkg/scalers/prometheus_scaler_test.go +++ b/pkg/scalers/prometheus_scaler_test.go @@ -3,8 +3,11 @@ package scalers import ( "context" "net/http" + "net/http/httptest" "strings" "testing" + + "github.com/stretchr/testify/assert" ) type parsePrometheusMetadataTestData struct { @@ -119,3 +122,87 @@ func TestPrometheusScalerAuthParams(t *testing.T) { } } } + +type prometheusQromQueryResultTestData struct { + name string + bodyStr string + responseStatus int + expectedValue float64 + isError bool +} + +var testPromQueryResult = []prometheusQromQueryResultTestData{ + { + name: "no results", + bodyStr: `{}`, + responseStatus: http.StatusOK, + expectedValue: 0, + isError: false, + }, + { + name: "no values", + bodyStr: `{"data":{"result":[]}}`, + responseStatus: http.StatusOK, + expectedValue: 0, + isError: false, + }, + { + name: "valid value", + bodyStr: `{"data":{"result":[{"value": ["1", "2"]}]}}`, + responseStatus: http.StatusOK, + expectedValue: 2, + isError: false, + }, + { + name: "not enough values", + bodyStr: `{"data":{"result":[{"value": ["1"]}]}}`, + responseStatus: http.StatusOK, + expectedValue: -1, + isError: true, + }, + { + name: "multiple results", + bodyStr: `{"data":{"result":[{},{}]}}`, + responseStatus: http.StatusOK, + expectedValue: -1, + isError: true, + }, + { + name: "error status response", + bodyStr: `{}`, + responseStatus: http.StatusBadRequest, + expectedValue: -1, + isError: true, + }, +} + +func TestPrometheusScalerExecutePromQuery(t *testing.T) { + for _, testData := range testPromQueryResult { + t.Run(testData.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(testData.responseStatus) + + if _, err := writer.Write([]byte(testData.bodyStr)); err != nil { + t.Fatal(err) + } + })) + + scaler := prometheusScaler{ + metadata: &prometheusMetadata{ + serverAddress: server.URL, + }, + httpClient: http.DefaultClient, + } + + value, err := scaler.ExecutePromQuery(context.TODO()) + + assert.Equal(t, testData.expectedValue, value) + + if testData.isError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}