Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use metricName from GetMetricsSpec in ScaledJobs instead of queueLength #3046

Merged
merged 15 commits into from
May 26, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))

### Fixes

- **General:** Use metricName from GetMetricsSpec in ScaledJobs instead of `queueLength` ([#3032](https://github.com/kedacore/keda/issue/3032))

### Deprecations

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metr
defer done()

// Remove the sX- prefix as the external scaler shouldn't have to know about it
metricName, err = RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName)
metricNameWithoutIndex, err := RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName)
if err != nil {
return metrics, err
}

request := &pb.GetMetricsRequest{
MetricName: metricName,
MetricName: metricNameWithoutIndex,
ScaledObjectRef: &s.scaledObjectRef,
}

Expand All @@ -216,7 +216,7 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metr

for _, metricResult := range response.MetricValues {
metric := external_metrics.ExternalMetricValue{
MetricName: metricResult.MetricName,
MetricName: metricName,
Value: *resource.NewQuantity(metricResult.MetricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav

targetAverageValue = getTargetAverageValue(metricSpecs)

metrics, err := s.Scaler.GetMetrics(ctx, "queueLength", nil)
metrics, err := s.Scaler.GetMetrics(ctx, metricSpecs[0].External.Metric.Name, nil)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err)
c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
Expand All @@ -281,12 +281,12 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
if m.MetricName == metricSpecs[0].External.Metric.Name {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue)
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)

if isTriggerActive {
isActive = true
Expand Down
29 changes: 17 additions & 12 deletions pkg/scaling/cache/scalers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,45 @@ import (
func TestTargetAverageValue(t *testing.T) {
// count = 0
specs := []v2beta2.MetricSpec{}
metricName := "queueLength"
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
targetAverageValue := getTargetAverageValue(specs)
assert.Equal(t, int64(0), targetAverageValue)
// 1 1
specs = []v2beta2.MetricSpec{
createMetricSpec(1),
createMetricSpec(1),
createMetricSpec(1, metricName),
createMetricSpec(1, metricName),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(1), targetAverageValue)
// 5 5 3
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
createMetricSpec(5, metricName),
createMetricSpec(5, metricName),
createMetricSpec(3, metricName),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)

// 5 5 4
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
createMetricSpec(5, metricName),
createMetricSpec(5, metricName),
createMetricSpec(3, metricName),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)
}

func createMetricSpec(averageValue int64) v2beta2.MetricSpec {
func createMetricSpec(averageValue int64, metricName string) v2beta2.MetricSpec {
qty := resource.NewQuantity(averageValue, resource.DecimalSI)
return v2beta2.MetricSpec{
External: &v2beta2.ExternalMetricSource{
Target: v2beta2.MetricTarget{
AverageValue: qty,
},
Metric: v2beta2.MetricIdentifier{
Name: metricName,
},
},
}
}
Expand Down Expand Up @@ -232,9 +236,10 @@ func createScaledObject(maxReplicaCount int32, multipleScalersCalculation string
}

func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool) *mock_scalers.MockScaler {
metricName := "queueLength"
metricName := "s0-queueLength"
scaler := mock_scalers.NewMockScaler(ctrl)
metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(averageValue)}
metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(averageValue, metricName)}

metrics := []external_metrics.ExternalMetricValue{
{
MetricName: metricName,
Expand All @@ -243,7 +248,7 @@ func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64
}
scaler.EXPECT().IsActive(gomock.Any()).Return(isActive, nil)
scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs)
scaler.EXPECT().GetMetrics(gomock.Any(), metricName, nil).Return(metrics, nil)
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Any(), nil).Return(metrics, nil)
scaler.EXPECT().Close(gomock.Any())
return scaler
}
155 changes: 155 additions & 0 deletions tests/scalers/external-scaler.sj.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import * as sh from "shelljs"
import test from "ava"
import { createNamespace, createYamlFile, waitForDeploymentReplicaCount, waitForJobCount } from "./helpers"

const testName = "test-external-scaler-sj"
const testNamespace = `${testName}-ns`
const scalerName = `${testName}-scaler`
const serviceName = `${testName}-service`
const deploymentName = `${testName}-deployment`
const scaledJobName = `${testName}-scaled-job`

const maxReplicaCount = 3
const threshold = 10

test.before(async t => {
sh.config.silent = true

// Create Kubernetes Namespace
createNamespace(testNamespace)

// Create external scaler deployment
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(scalerYaml)} -n ${testNamespace}`).code,
0,
"Createing a external scaler deployment should work"
)

// Create service
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(serviceYaml)} -n ${testNamespace}`).code,
0,
"Createing a service should work"
)

// Create scaled job
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", "0"))} -n ${testNamespace}`).code,
0,
"Creating a scaled job should work"
)

t.true(await waitForJobCount(0, testNamespace, 60, 1000),`Replica count should be 0 after 1 minute`)
})

test.serial("Deployment should scale up to maxReplicaCount", async t => {
// Modify scaled job's metricValue to induce scaling
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", `${threshold * maxReplicaCount}`))} -n ${testNamespace}`).code,
0,
"Modifying scaled job should work"
)

t.true(await waitForJobCount(maxReplicaCount, testNamespace, 60, 1000),`Replica count should be ${maxReplicaCount} after 1 minute`)
})

test.serial("Deployment should scale back down to 0", async t => {
// Modify scaled job's metricValue to induce scaling
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", "0"))} -n ${testNamespace}`).code,
0,
"Modifying scaled job should work"
)

t.true(await waitForJobCount(0, testNamespace, 120, 1000),`Replica count should be 0 after 2 minute`)
})

test.after.always("Clean up E2E K8s objects", async t => {
const resources = [
`scaledjob.keda.sh/${scaledJobName}`,
`deployments.apps/${deploymentName}`,
`service/${serviceName}`,
`deployments.apps/${scalerName}`,
]

for (const resource of resources) {
sh.exec(`kubectl delete ${resource} -n ${testNamespace}`)
}

sh.exec(`kubectl delete ns ${testNamespace}`)
})

// YAML Definitions for Kubernetes resources
// External Scaler Deployment
const scalerYaml =
`
apiVersion: apps/v1
kind: Deployment
metadata:
name: ${scalerName}
namespace: ${testNamespace}
spec:
replicas: 1
selector:
matchLabels:
app: ${scalerName}
template:
metadata:
labels:
app: ${scalerName}
spec:
containers:
- name: scaler
image: ghcr.io/kedacore/tests-external-scaler-e2e:latest
imagePullPolicy: Always
ports:
- containerPort: 6000
`

const serviceYaml =
`
apiVersion: v1
kind: Service
metadata:
name: ${serviceName}
namespace: ${testNamespace}
spec:
ports:
- port: 6000
targetPort: 6000
selector:
app: ${scalerName}
`

// scaled job
const scaledJobYaml =
`
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: ${scaledJobName}
namespace: ${testNamespace}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: ${maxReplicaCount}
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 10
triggers:
- type: external
metadata:
scalerAddress: ${serviceName}.${testNamespace}:6000
metricThreshold: "${threshold}"
metricValue: "{{VALUE}}"
`
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as sh from "shelljs"
import test from "ava"
import { createNamespace, createYamlFile, waitForDeploymentReplicaCount } from "./helpers"

const testName = "test-external-scaler"
const testName = "test-external-scaler-so"
const testNamespace = `${testName}-ns`
const scalerName = `${testName}-scaler`
const serviceName = `${testName}-service`
Expand Down Expand Up @@ -67,7 +67,7 @@ test.serial("Deployment should scale up to minReplicaCount", async t => {
test.serial("Deployment should scale up to maxReplicaCount", async t => {
// Modify scaled object's metricValue to induce scaling
t.is(
sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml.replace("{{VALUE}}", `${threshold * 2}`))} -n ${testNamespace}`).code,
sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml.replace("{{VALUE}}", `${threshold * maxReplicaCount}`))} -n ${testNamespace}`).code,
0,
"Modifying scaled object should work"
)
Expand Down
20 changes: 20 additions & 0 deletions tests/scalers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ export async function waitForDeploymentReplicaCount(target: number, name: string
return false
}

export async function waitForJobCount(target: number, namespace: string, iterations = 10, interval = 3000): Promise<boolean> {
for (let i = 0; i < iterations; i++) {
let jobCountStr = sh.exec(`kubectl get job --namespace ${namespace} | wc -l`).stdout.replace(/[\r\n]/g,"")
try {
let jobCount = parseInt(jobCountStr, 10)
// This method counts also the header line in the output, so we have to remove 1 if the jobCount is > 1
if (jobCount > 0) {
jobCount--
}

if (jobCount === target) {
return true
}
} catch { }

await sleep(interval)
}
return false
}

export async function createNamespace(namespace: string) {
const namespaceFile = tmp.fileSync()
fs.writeFileSync(namespaceFile.name, namespaceTemplate.replace('{{NAMESPACE}}', namespace))
Expand Down
Loading