Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak by checking triggers uniqueness properly #1640

Merged
merged 11 commits into from
Mar 1, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Fix a memory leak in kafka client and close push scalers ([#1565](https://github.com/kedacore/keda/issues/1565))
- Add 'Metadata' header to AAD podIdentity request ([#1566](https://github.com/kedacore/keda/issues/1566))
- KEDA should make sure generate correct labels for HPA ([#1630](https://github.com/kedacore/keda/issues/1630))
- Fix memory leak by checking triggers uniqueness properly ([#1640](https://github.com/kedacore/keda/pull/1640))

### Breaking Changes

Expand Down
8 changes: 7 additions & 1 deletion controllers/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,17 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
if metricSpec.Resource != nil {
resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name))
}

if metricSpec.External != nil {
externalMetricName := metricSpec.External.Metric.Name
if kedacontrollerutil.Contains(externalMetricNames, externalMetricName) {
return nil, fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metricName manually", externalMetricName, scaledObject.Name)
}

// add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name)
externalMetricNames = append(externalMetricNames, externalMetricName)
}
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
Expand Down
33 changes: 0 additions & 33 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(logger logr.Logger, scale
return "ScaledObject doesn't have correct scaleTargetRef specification", err
}

err = r.validateMetricNameUniqueness(logger, scaledObject)
if err != nil {
return "Error checking metric name uniqueness", err
}

// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(logger, scaledObject, &gvkr)
if err != nil {
Expand Down Expand Up @@ -251,34 +246,6 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, sca
return r.Client.Update(context.TODO(), scaledObject)
}

func (r *ScaledObjectReconciler) validateMetricNameUniqueness(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
scalers, err := r.scaleHandler.GetScalers(scaledObject)
if err != nil {
logger.Error(err, "Unable to fetch scalers in metric name uniqueness check")
return err
}

observedMetricNames := make(map[string]struct{})
for _, scaler := range scalers {
for _, metric := range scaler.GetMetricSpecForScaling() {
// Only validate external metricNames
if metric.External == nil {
continue
}

metricName := metric.External.Metric.Name
if _, ok := observedMetricNames[metricName]; ok {
return fmt.Errorf("metricName %s defined multiple times in ScaledObject %s, please refer the documentation how to define metircName manually", metricName, scaledObject.Name)
}

observedMetricNames[metricName] = struct{}{}
}
}

logger.V(1).Info("All metric names are unique in ScaledObject", "value", scaledObject.Name)
return nil
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedautil.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
Expand Down
73 changes: 64 additions & 9 deletions controllers/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/golang/mock/gomock"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
"github.com/kedacore/keda/v2/pkg/scalers"
. "github.com/onsi/ginkgo"
Expand All @@ -31,6 +32,8 @@ var _ = Describe("ScaledObjectController", func() {
var (
metricNameTestReconciler ScaledObjectReconciler
mockScaleHandler *mock_scaling.MockScaleHandler
mockClient *mock_client.MockClient
mockStatusWriter *mock_client.MockStatusWriter
)

var triggerMeta []map[string]string = []map[string]string{
Expand All @@ -39,18 +42,25 @@ var _ = Describe("ScaledObjectController", func() {
}

BeforeEach(func() {
mockScaleHandler = mock_scaling.NewMockScaleHandler(gomock.NewController(GinkgoTestReporter{}))
ctrl := gomock.NewController(GinkgoTestReporter{})
mockScaleHandler = mock_scaling.NewMockScaleHandler(ctrl)
mockClient = mock_client.NewMockClient(ctrl)
mockStatusWriter = mock_client.NewMockStatusWriter(ctrl)

metricNameTestReconciler = ScaledObjectReconciler{
scaleHandler: mockScaleHandler,
Client: mockClient,
}
})

Context("With Unique Values", func() {
var uniqueNamedScaledObjectTrigger = &kedav1alpha1.ScaledObject{}
var uniquelyNamedScaledObject = &kedav1alpha1.ScaledObject{}

It("should pass metric name validation", func() {
// Generate test data
testScalers := make([]scalers.Scaler, 0)
expectedExternalMetricNames := make([]string, 0)

for i, tm := range triggerMeta {
config := &scalers.ScalerConfig{
Name: fmt.Sprintf("test.%d", i),
Expand All @@ -66,14 +76,33 @@ var _ = Describe("ScaledObjectController", func() {
}

testScalers = append(testScalers, s)
for _, metricSpec := range s.GetMetricSpecForScaling() {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}
}

mockScaleHandler.EXPECT().GetScalers(uniqueNamedScaledObjectTrigger).Return(testScalers, nil)
// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return(testScalers, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)

Ω(metricNameTestReconciler.validateMetricNameUniqueness(testLogger, uniqueNamedScaledObjectTrigger)).Should(BeNil())
// Test that the status was updated with metric names
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))

// Test returned values
Ω(len(metricSpecs)).Should(Equal(len(testScalers)))
Ω(err).Should(BeNil())
})

It("should pass metric name validation with single value", func() {
// Generate test data
expectedExternalMetricNames := make([]string, 0)

config := &scalers.ScalerConfig{
Name: "test",
Namespace: "test",
Expand All @@ -86,17 +115,34 @@ var _ = Describe("ScaledObjectController", func() {
if err != nil {
Fail(err.Error())
}
for _, metricSpec := range s.GetMetricSpecForScaling() {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)

mockScaleHandler.EXPECT().GetScalers(uniqueNamedScaledObjectTrigger).Return([]scalers.Scaler{s}, nil)
// Test that the status was updated
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))

Ω(metricNameTestReconciler.validateMetricNameUniqueness(testLogger, uniqueNamedScaledObjectTrigger)).Should(BeNil())
// Test returned values
Ω(len(metricSpecs)).Should(Equal(1))
Ω(err).Should(BeNil())
})
})

Context("With Duplicate Values", func() {
var duplicateNamedScaledObjectTrigger = &kedav1alpha1.ScaledObject{}
var duplicateNamedScaledObject = &kedav1alpha1.ScaledObject{}

It("should pass metric name validation", func() {
// Generate test data
testScalers := make([]scalers.Scaler, 0)
for i := 0; i < 4; i++ {
config := &scalers.ScalerConfig{
Expand All @@ -115,9 +161,18 @@ var _ = Describe("ScaledObjectController", func() {
testScalers = append(testScalers, s)
}

mockScaleHandler.EXPECT().GetScalers(duplicateNamedScaledObjectTrigger).Return(testScalers, nil)
// Set up expectations
mockScaleHandler.EXPECT().GetScalers(duplicateNamedScaledObject).Return(testScalers, nil)

// Call function tobe tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, duplicateNamedScaledObject)

// Test that the status was not updated
Ω(duplicateNamedScaledObject.Status.ExternalMetricNames).Should(BeNil())

Ω(metricNameTestReconciler.validateMetricNameUniqueness(testLogger, duplicateNamedScaledObjectTrigger)).ShouldNot(BeNil())
// Test returned values
Ω(metricSpecs).Should(BeNil())
Ω(err).ShouldNot(BeNil())
})
})
})
Expand Down