Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: fix the wrong value of scaler active and the failure of paused value in Opentelemetry #5704

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Fix CVE-2024-28180 in github.com/go-jose/go-jose/v3 ([#5617](https://github.com/kedacore/keda/pull/5617))
- **General**: Fix wrong scaler active value and paused value that are pushed to OpenTelemetry ([#5705](https://github.com/kedacore/keda/issues/5705))
- **General**: Log field `ScaledJob` no longer have conflicting types ([#5592](https://github.com/kedacore/keda/pull/5592))
- **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597)|[#5663](https://github.com/kedacore/keda/issues/5663))
- **General**: Validate empty array value of triggers in ScaledObject/ScaledJob creation ([#5520](https://github.com/kedacore/keda/issues/5520))
Expand Down
140 changes: 82 additions & 58 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ var (
otTriggerRegisteredTotalsCounter api.Int64UpDownCounter
otCrdRegisteredTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val
otelScalerMetricVals []OtelMetricFloat64Val
otelScalerMetricsLatencyVals []OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated []OtelMetricFloat64Val
otelInternalLoopLatencyVals []OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated []OtelMetricFloat64Val
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val
otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVals []OtelMetricFloat64Val

otelScalerActiveVal OtelMetricFloat64Val
otelScalerActiveVals []OtelMetricFloat64Val
otelScalerPauseVals []OtelMetricFloat64Val
)

type OtelMetrics struct {
Expand Down Expand Up @@ -196,6 +197,15 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaled.object.paused",
api.WithDescription("Indicates whether a ScaledObject is paused"),
api.WithFloat64Callback(PausedStatusCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
Expand All @@ -220,55 +230,62 @@ func (o *OtelMetrics) RecordBuildInfo() {
}

func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption)
for _, v := range otelScalerMetricVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricVal = OtelMetricFloat64Val{}
otelScalerMetricVals = []OtelMetricFloat64Val{}
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetric := OtelMetricFloat64Val{}
otelScalerMetric.val = value
otelScalerMetric.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricVals = append(otelScalerMetricVals, otelScalerMetric)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption)
for _, v := range otelScalerMetricsLatencyVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricsLatencyVal = OtelMetricFloat64Val{}
otelScalerMetricsLatencyVals = []OtelMetricFloat64Val{}
return nil
}

func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption)
for _, v := range otelScalerMetricsLatencyValDeprecated {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{}
otelScalerMetricsLatencyValDeprecated = []OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
otelScalerMetricsLatencyVal.val = value.Seconds()
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatency := OtelMetricFloat64Val{}
otelScalerMetricsLatency.val = value.Seconds()
otelScalerMetricsLatency.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyVals = append(otelScalerMetricsLatencyVals, otelScalerMetricsLatency)

otelScalerMetricsLatencyValD := OtelMetricFloat64Val{}
otelScalerMetricsLatencyValD.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValD.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated = append(otelScalerMetricsLatencyValDeprecated, otelScalerMetricsLatencyValD)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyVal.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption)
for _, v := range otelInternalLoopLatencyVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelInternalLoopLatencyVal = OtelMetricFloat64Val{}
otelInternalLoopLatencyVals = []OtelMetricFloat64Val{}
return nil
}

func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption)
for _, v := range otelInternalLoopLatencyValDeprecated {
obsrv.Observe(v.val, v.measurementOption)
}
otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{}
otelInternalLoopLatencyValDeprecated = []OtelMetricFloat64Val{}
return nil
}

Expand All @@ -284,29 +301,43 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

otelInternalLoopLatencyVal.val = value.Seconds()
otelInternalLoopLatencyVal.measurementOption = opt
otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds())
otelInternalLoopLatencyValDeprecated.measurementOption = opt
otelInternalLoopLatency := OtelMetricFloat64Val{}
otelInternalLoopLatency.val = value.Seconds()
otelInternalLoopLatency.measurementOption = opt
otelInternalLoopLatencyVals = append(otelInternalLoopLatencyVals, otelInternalLoopLatency)

otelInternalLoopLatencyD := OtelMetricFloat64Val{}
otelInternalLoopLatencyD.val = float64(value.Milliseconds())
otelInternalLoopLatencyD.measurementOption = opt
otelInternalLoopLatencyValDeprecated = append(otelInternalLoopLatencyValDeprecated, otelInternalLoopLatencyD)
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerActiveVal.measurementOption != nil {
obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption)
for _, v := range otelScalerActiveVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerActiveVal = OtelMetricFloat64Val{}
otelScalerActiveVals = []OtelMetricFloat64Val{}
return nil
}

// RecordScalerActive create a measurement of the activity of the scaler
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := -1
activeVal := 0
if active {
activeVal = 1
}
otelScalerActive := OtelMetricFloat64Val{}
otelScalerActive.val = float64(activeVal)
otelScalerActive.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerActiveVals = append(otelScalerActiveVals, otelScalerActive)
}

otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
func PausedStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
for _, v := range otelScalerPauseVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerPauseVals = []OtelMetricFloat64Val{}
return nil
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand All @@ -318,21 +349,12 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st

opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledObject),
)
attribute.Key("scaledObject").String(scaledObject))

cback := func(_ context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaled.object.paused",
api.WithDescription("Indicates whether a ScaledObject is paused"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject)
}
otelScalerPause := OtelMetricFloat64Val{}
otelScalerPause.val = float64(activeVal)
otelScalerPause.measurementOption = opt
otelScalerPauseVals = append(otelScalerPauseVals, otelScalerPause)
}

// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
Expand Down Expand Up @@ -448,10 +470,10 @@ func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudevents
}

func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventQueueStatusVal.measurementOption != nil {
obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption)
for _, v := range otCloudEventQueueStatusVals {
obsrv.Observe(v.val, v.measurementOption)
}
otCloudEventQueueStatusVal = OtelMetricFloat64Val{}
otCloudEventQueueStatusVals = []OtelMetricFloat64Val{}
return nil
}

Expand All @@ -461,6 +483,8 @@ func (o *OtelMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
attribute.Key("namespace").String(namespace),
)

otCloudEventQueueStatusVal.val = float64(value)
otCloudEventQueueStatusVal.measurementOption = opt
otCloudEventQueueStatus := OtelMetricFloat64Val{}
otCloudEventQueueStatus.val = float64(value)
otCloudEventQueueStatus.measurementOption = opt
otCloudEventQueueStatusVals = append(otCloudEventQueueStatusVals, otCloudEventQueueStatus)
}
53 changes: 53 additions & 0 deletions pkg/metricscollector/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,56 @@ func TestLoopLatency(t *testing.T) {
data = latencySeconds.Data.(metricdata.Gauge[float64]).DataPoints[0]
assert.Equal(t, data.Value, float64(0.5))
}

func TestContinuousMetrics(t *testing.T) {
testOtel.RecordScalerActive("testnamespace", "testresource", "testscaler", 0, "testmetric", true, true)
testOtel.RecordScalerActive("testnamespace2", "testresource2", "testscaler2", 0, "testmetric", false, false)
got := metricdata.ResourceMetrics{}
err := testReader.Collect(context.Background(), &got)

assert.Nil(t, err)
scopeMetrics := got.ScopeMetrics[0]
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)
activeMetric := retrieveMetric(scopeMetrics.Metrics, "keda.scaler.active")

assert.NotNil(t, buildInfo)

dataPoints := activeMetric.Data.(metricdata.Gauge[float64]).DataPoints
assert.Len(t, dataPoints, 2)

var scaledObjectMetric metricdata.DataPoint[float64]
for _, v := range dataPoints {
attribute, _ := v.Attributes.Value("namespace")
if attribute.AsString() == "testnamespace" {
scaledObjectMetric = v
}
}

assert.NotEqual(t, scaledObjectMetric, metricdata.DataPoint[float64]{})
attribute, _ := scaledObjectMetric.Attributes.Value("scaledObject")
assert.Equal(t, attribute.AsString(), "testresource")
attribute, _ = scaledObjectMetric.Attributes.Value("scaler")
assert.Equal(t, attribute.AsString(), "testscaler")
attribute, _ = scaledObjectMetric.Attributes.Value("metric")
assert.Equal(t, attribute.AsString(), "testmetric")
assert.Equal(t, scaledObjectMetric.Value, 1.0)

var scaledJobMetric metricdata.DataPoint[float64]
for _, v := range dataPoints {
attribute, _ := v.Attributes.Value("namespace")
if attribute.AsString() == "testnamespace2" {
scaledJobMetric = v
}
}

assert.NotEqual(t, scaledJobMetric, metricdata.DataPoint[float64]{})
attribute, _ = scaledJobMetric.Attributes.Value("namespace")
assert.Equal(t, attribute.AsString(), "testnamespace2")
attribute, _ = scaledJobMetric.Attributes.Value("scaledJob")
assert.Equal(t, attribute.AsString(), "testresource2")
attribute, _ = scaledJobMetric.Attributes.Value("scaler")
assert.Equal(t, attribute.AsString(), "testscaler2")
attribute, _ = scaledJobMetric.Attributes.Value("metric")
assert.Equal(t, attribute.AsString(), "testmetric")
assert.Equal(t, scaledJobMetric.Value, 0.0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func TestOpenTelemetryMetrics(t *testing.T) {

testScalerMetricValue(t)
testScalerMetricLatency(t)
testScalerActiveMetric(t)
testScalerActiveMetric(t, kc)
testScaledObjectErrors(t, data)
testScaledJobErrors(t, data)
testScalerErrors(t, data)
Expand Down Expand Up @@ -748,28 +748,20 @@ func testScalableObjectMetrics(t *testing.T) {
}
}

func testScalerActiveMetric(t *testing.T) {
func testScalerActiveMetric(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scaler active metric ---")

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", true)

val, ok := family["keda_scaler_active"]
assert.True(t, ok, "keda_scaler_active not available")
if ok {
var found bool
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
found = true
}
}
}
assert.Equal(t, true, found)
}
t.Log("--- testing scaler active metric scaled down ---")
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 0, testNamespace)
WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 2)
time.Sleep(10 * time.Second)
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))

assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", false)
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 4, testNamespace)
}

func testScaledObjectPausedMetric(t *testing.T, data templateData) {
Expand All @@ -781,15 +773,15 @@ func testScaledObjectPausedMetric(t *testing.T, data templateData) {
time.Sleep(20 * time.Second)
// Check that the paused metric is now true
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, true)
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", true)

// Unpause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

time.Sleep(20 * time.Second)
// Check that the paused metric is back to false
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, false)
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", false)
}

func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down Expand Up @@ -970,16 +962,17 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil
}
}

func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) {
family, ok := families["keda_scaled_object_paused"]
assert.True(t, ok, "keda_scaled_object_paused not available")
func assertScaledObjectFlagMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, metricName string, expected bool) {
family, ok := families[metricName]
assert.True(t, ok, "%s not available", metricName)
if !ok {
return
}

metricValue := 0.0
metrics := family.GetMetric()
for _, metric := range metrics {
t.Log("scaledobject flag metric detail info ---", "metric", metric, "scaledObjectName", scaledObjectName, "metricName", metricName)
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
Expand Down
Loading