diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b3992f7566..8171517c7c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ - IBM MQ scaler password handling fix ([#1939](https://github.com/kedacore/keda/pull/1939)) - Metrics APIServer: Add ratelimiting parameters to override client ([#1944](https://github.com/kedacore/keda/pull/1944)) - Optimize KafkaScaler by fetching all topic offsets using a single HTTP request ([#1956](https://github.com/kedacore/keda/pull/1956)) +- Adjusts InfluxDB scaler to support queries that return integers in addition to those that return floats ([#1977](https://github.com/kedacore/keda/pull/1977)) ### Breaking Changes diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index fdaf2d77a6e..befe8d043ab 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -170,12 +170,14 @@ func queryInfluxDB(queryAPI api.QueryAPI, query string) (float64, error) { return 0, fmt.Errorf("no results found from query") } - val, ok := result.Record().Value().(float64) - if !ok { - return 0, fmt.Errorf("value could not be parsed into a float") + switch valRaw := result.Record().Value().(type) { + case float64: + return valRaw, nil + case int64: + return float64(valRaw), nil + default: + return 0, fmt.Errorf("value of type %T could not be converted into a float", valRaw) } - - return val, nil } // GetMetrics connects to influxdb via the client and returns a value based on the query diff --git a/tests/scalers/influxdb.test.ts b/tests/scalers/influxdb.test.ts index 3c7ab6c4cc5..b918c7ba851 100644 --- a/tests/scalers/influxdb.test.ts +++ b/tests/scalers/influxdb.test.ts @@ -69,7 +69,7 @@ test.before((t) => { t.is('true', influxdbStatus, 'Influxdb is not in a ready state') }) -test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object is applied', (t) => { +test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object with float query is applied', (t) => { const { authToken, orgName } = runWriteJob(t) const basicDeploymentTmpFile = tmp.fileSync() fs.writeFileSync(basicDeploymentTmpFile.name, basicDeploymentYaml) @@ -80,7 +80,7 @@ test.serial('Should start off deployment with 0 replicas and scale to 2 replicas t.is(numReplicasBefore, '0', 'Number of replicas should be 0 to start with') const scaledObjectTmpFile = tmp.fileSync() - fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYaml.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName)) + fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYamlFloat.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName)) t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${scaledObjectTmpFile.name}`).code) @@ -98,6 +98,35 @@ test.serial('Should start off deployment with 0 replicas and scale to 2 replicas t.is(numReplicasAfter, '2', 'Number of replicas should have scaled to 2') }) +test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object with int query is applied', (t) => { + const { authToken, orgName } = runWriteJob(t) + const basicDeploymentTmpFile = tmp.fileSync() + fs.writeFileSync(basicDeploymentTmpFile.name, basicDeploymentYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${basicDeploymentTmpFile.name}`).code) + + const numReplicasBefore = sh.exec(`kubectl get deployment --namespace ${influxdbNamespaceName} ${nginxDeploymentName} -o jsonpath='{.spec.replicas}'`).stdout + t.is(numReplicasBefore, '0', 'Number of replicas should be 0 to start with') + + const scaledObjectTmpFile = tmp.fileSync() + fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYamlInt.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName)) + + t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${scaledObjectTmpFile.name}`).code) + + // polling/waiting for deployment to scale to desired amount of replicas + let numReplicasAfter = '1' + for (let i = 0; i < 15; i++){ + numReplicasAfter = sh.exec(`kubectl get deployment --namespace ${influxdbNamespaceName} ${nginxDeploymentName} -o jsonpath='{.spec.replicas}'`).stdout + if (numReplicasAfter !== '2') { + sh.exec('sleep 2s') + } else { + break + } + } + + t.is(numReplicasAfter, '2', 'Number of replicas should have scaled to 2') +}) + test.after.always((t) => { t.is(0, sh.exec(`kubectl delete namespace ${influxdbNamespaceName}`).code, 'Should delete influxdb namespace') }) @@ -162,7 +191,31 @@ spec: type: ClusterIP ` -const scaledObjectYaml = ` +const scaledObjectYamlFloat = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: influxdb-scaler + namespace: influxdb +spec: + scaleTargetRef: + name: nginx-deployment + maxReplicaCount: 2 + triggers: + - type: influxdb + metadata: + authToken: {{INFLUXDB_AUTH_TOKEN}} + organizationName: {{INFLUXDB_ORG_NAME}} + serverURL: http://influxdb.influxdb.svc:8086 + thresholdValue: "3" + query: | + from(bucket:"bucket") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "stat") + |> map(fn: (r) => ({r with _value: float(v: r._value)})) +` + +const scaledObjectYamlInt = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -183,6 +236,7 @@ spec: from(bucket:"bucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) ` const influxdbWriteJobYaml = `