diff --git a/.chloggen/switchk8shpa.yaml b/.chloggen/switchk8shpa.yaml new file mode 100755 index 000000000000..0ed26e6778ff --- /dev/null +++ b/.chloggen/switchk8shpa.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Switch k8s.hpa metrics to use pdata. + +# One or more tracking issues related to the change +issues: [18250] diff --git a/receiver/k8sclusterreceiver/go.mod b/receiver/k8sclusterreceiver/go.mod index 1ee119c5398c..031d7c30058e 100644 --- a/receiver/k8sclusterreceiver/go.mod +++ b/receiver/k8sclusterreceiver/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/census-instrumentation/opencensus-proto v0.4.1 + github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/iancoleman/strcase v0.2.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0 @@ -57,7 +58,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/imdario/mergo v0.3.12 // indirect diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index 026edfa74673..73c603de3469 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -10,6 +10,7 @@ import ( agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" quotav1 "github.com/openshift/api/quota/v1" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" @@ -48,7 +49,7 @@ import ( // an interface to interact with refactored code from SignalFx Agent which is // confined to the collection package. type DataCollector struct { - logger *zap.Logger + settings receiver.CreateSettings metricsStore *metricsStore metadataStore *metadata.Store nodeConditionsToReport []string @@ -56,9 +57,9 @@ type DataCollector struct { } // NewDataCollector returns a DataCollector. -func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { +func NewDataCollector(set receiver.CreateSettings, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { return &DataCollector{ - logger: logger, + settings: set, metricsStore: &metricsStore{ metricsCache: make(map[types.UID]pmetric.Metrics), }, @@ -75,7 +76,7 @@ func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store c func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil { - dc.logger.Error( + dc.settings.TelemetrySettings.Logger.Error( "failed to remove from metric cache", zap.String("obj", reflect.TypeOf(obj).String()), zap.Error(err), @@ -85,7 +86,7 @@ func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) { if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil { - dc.logger.Error( + dc.settings.TelemetrySettings.Logger.Error( "failed to update metric cache", zap.String("obj", reflect.TypeOf(obj).String()), zap.Error(err), @@ -103,9 +104,9 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { switch o := obj.(type) { case *corev1.Pod: - md = ocsToMetrics(pod.GetMetrics(o, dc.logger)) + md = ocsToMetrics(pod.GetMetrics(o, dc.settings.TelemetrySettings.Logger)) case *corev1.Node: - md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger)) + md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.settings.TelemetrySettings.Logger)) case *corev1.Namespace: md = ocsToMetrics(namespace.GetMetrics(o)) case *corev1.ReplicationController: @@ -127,9 +128,9 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { case *batchv1beta1.CronJob: md = ocsToMetrics(cronjob.GetMetricsBeta(o)) case *autoscalingv2.HorizontalPodAutoscaler: - md = ocsToMetrics(hpa.GetMetrics(o)) + md = hpa.GetMetrics(dc.settings, o) case *autoscalingv2beta2.HorizontalPodAutoscaler: - md = ocsToMetrics(hpa.GetMetricsBeta(o)) + md = hpa.GetMetricsBeta(dc.settings, o) case *quotav1.ClusterResourceQuota: md = ocsToMetrics(clusterresourcequota.GetMetrics(o)) default: @@ -148,7 +149,7 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmet km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{} switch o := obj.(type) { case *corev1.Pod: - km = pod.GetMetadata(o, dc.metadataStore, dc.logger) + km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger) case *corev1.Node: km = node.GetMetadata(o) case *corev1.ReplicationController: diff --git a/receiver/k8sclusterreceiver/internal/collection/collector_test.go b/receiver/k8sclusterreceiver/internal/collection/collector_test.go index 38860ff34dcb..78ed31f86fda 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector_test.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -263,10 +264,11 @@ func TestDataCollectorSyncMetadata(t *testing.T) { for _, tt := range tests { observedLogger, _ := observer.New(zapcore.WarnLevel) - logger := zap.New(observedLogger) + set := receivertest.NewNopCreateSettings() + set.TelemetrySettings.Logger = zap.New(observedLogger) t.Run(tt.name, func(t *testing.T) { dc := &DataCollector{ - logger: logger, + settings: set, metadataStore: tt.metadataStore, nodeConditionsToReport: []string{}, } diff --git a/receiver/k8sclusterreceiver/internal/constants/constants.go b/receiver/k8sclusterreceiver/internal/constants/constants.go index d4d461029432..191a51d365ca 100644 --- a/receiver/k8sclusterreceiver/internal/constants/constants.go +++ b/receiver/k8sclusterreceiver/internal/constants/constants.go @@ -12,13 +12,11 @@ const ( // Resource labels keys for UID. K8sKeyNamespaceUID = "k8s.namespace.uid" K8sKeyReplicationControllerUID = "k8s.replicationcontroller.uid" - K8sKeyHPAUID = "k8s.hpa.uid" K8sKeyResourceQuotaUID = "k8s.resourcequota.uid" K8sKeyClusterResourceQuotaUID = "openshift.clusterquota.uid" // Resource labels keys for Name. K8sKeyReplicationControllerName = "k8s.replicationcontroller.name" - K8sKeyHPAName = "k8s.hpa.name" K8sKeyResourceQuotaName = "k8s.resourcequota.name" K8sKeyClusterResourceQuotaName = "openshift.clusterquota.name" diff --git a/receiver/k8sclusterreceiver/internal/hpa/doc.go b/receiver/k8sclusterreceiver/internal/hpa/doc.go new file mode 100644 index 000000000000..0d55921ba0cf --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/doc.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows +// +build !windows + +//go:generate mdatagen metadata.yaml + +package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" diff --git a/receiver/k8sclusterreceiver/internal/hpa/documentation.md b/receiver/k8sclusterreceiver/internal/hpa/documentation.md new file mode 100644 index 000000000000..5a7f46104a6c --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/documentation.md @@ -0,0 +1,53 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# k8s/hpa + +## Default Metrics + +The following metrics are emitted by default. Each of them can be disabled by applying the following configuration: + +```yaml +metrics: + : + enabled: false +``` + +### k8s.hpa.current_replicas + +Current number of pod replicas managed by this autoscaler. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.desired_replicas + +Desired number of pod replicas managed by this autoscaler. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.max_replicas + +Maximum number of replicas to which the autoscaler can scale up. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### k8s.hpa.min_replicas + +Minimum number of replicas to which the autoscaler can scale up. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +## Resource Attributes + +| Name | Description | Values | Enabled | +| ---- | ----------- | ------ | ------- | +| k8s.hpa.name | The k8s hpa name. | Any Str | true | +| k8s.hpa.uid | The k8s hpa uid. | Any Str | true | +| k8s.namespace.name | The name of the namespace that the pod is running in. | Any Str | true | diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa.go b/receiver/k8sclusterreceiver/internal/hpa/hpa.go index 9e7b13f39ea4..433b52c7b634 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa.go @@ -4,129 +4,37 @@ package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" import ( - agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" + imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) -var hpaMaxReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.max_replicas", - Description: "Maximum number of replicas to which the autoscaler can scale up", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaMinReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.min_replicas", - Description: "Minimum number of replicas to which the autoscaler can scale down", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaCurrentReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.current_replicas", - Description: "Current number of pod replicas managed by this autoscaler", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -var hpaDesiredReplicasMetric = &metricspb.MetricDescriptor{ - Name: "k8s.hpa.desired_replicas", - Description: "Desired number of pod replicas managed by this autoscaler", - Unit: "1", - Type: metricspb.MetricDescriptor_GAUGE_INT64, -} - -func GetMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest { - metrics := []*metricspb.Metric{ - { - MetricDescriptor: hpaMaxReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Spec.MaxReplicas)), - }, - }, - { - MetricDescriptor: hpaMinReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(*hpa.Spec.MinReplicas)), - }, - }, - { - MetricDescriptor: hpaCurrentReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.CurrentReplicas)), - }, - }, - { - MetricDescriptor: hpaDesiredReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.DesiredReplicas)), - }, - }, - } - - return []*agentmetricspb.ExportMetricsServiceRequest{ - { - Resource: getResourceForHPA(&hpa.ObjectMeta), - Metrics: metrics, - }, - } -} - -func GetMetricsBeta(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest { - metrics := []*metricspb.Metric{ - { - MetricDescriptor: hpaMaxReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Spec.MaxReplicas)), - }, - }, - { - MetricDescriptor: hpaMinReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(*hpa.Spec.MinReplicas)), - }, - }, - { - MetricDescriptor: hpaCurrentReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.CurrentReplicas)), - }, - }, - { - MetricDescriptor: hpaDesiredReplicasMetric, - Timeseries: []*metricspb.TimeSeries{ - utils.GetInt64TimeSeries(int64(hpa.Status.DesiredReplicas)), - }, - }, - } - - return []*agentmetricspb.ExportMetricsServiceRequest{ - { - Resource: getResourceForHPA(&hpa.ObjectMeta), - Metrics: metrics, - }, - } -} - -func getResourceForHPA(om *v1.ObjectMeta) *resourcepb.Resource { - return &resourcepb.Resource{ - Type: constants.K8sType, - Labels: map[string]string{ - constants.K8sKeyHPAUID: string(om.UID), - constants.K8sKeyHPAName: om.Name, - conventions.AttributeK8SNamespaceName: om.Namespace, - }, - } +func GetMetricsBeta(set receiver.CreateSettings, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) pmetric.Metrics { + mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set) + ts := pcommon.NewTimestampFromTime(time.Now()) + mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) + mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) + mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) + mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) + return mb.Emit(imetadata.WithK8sHpaUID(string(hpa.UID)), imetadata.WithK8sHpaName(hpa.Name), imetadata.WithK8sNamespaceName(hpa.Namespace)) +} + +func GetMetrics(set receiver.CreateSettings, hpa *autoscalingv2.HorizontalPodAutoscaler) pmetric.Metrics { + mb := imetadata.NewMetricsBuilder(imetadata.DefaultMetricsBuilderConfig(), set) + ts := pcommon.NewTimestampFromTime(time.Now()) + mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) + mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) + mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) + mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) + return mb.Emit(imetadata.WithK8sHpaUID(string(hpa.UID)), imetadata.WithK8sHpaName(hpa.Name), imetadata.WithK8sNamespaceName(hpa.Namespace)) } func GetMetadata(hpa *autoscalingv2.HorizontalPodAutoscaler) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go index cf5a50f2ee63..bfd50597829b 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go @@ -6,39 +6,37 @@ package hpa import ( "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestHPAMetrics(t *testing.T) { hpa := testutils.NewHPA("1") - actualResourceMetrics := GetMetrics(hpa) + md := GetMetrics(receivertest.NewNopCreateSettings(), hpa) - require.Equal(t, 1, len(actualResourceMetrics)) - require.Equal(t, 4, len(actualResourceMetrics[0].Metrics)) - - rm := actualResourceMetrics[0] - testutils.AssertResource(t, rm.Resource, constants.K8sType, - map[string]string{ + require.Equal(t, 1, md.ResourceMetrics().Len()) + rm := md.ResourceMetrics().At(0) + assert.Equal(t, + map[string]any{ "k8s.hpa.uid": "test-hpa-1-uid", "k8s.hpa.name": "test-hpa-1", "k8s.namespace.name": "test-namespace", }, - ) - - testutils.AssertMetricsInt(t, rm.Metrics[0], "k8s.hpa.max_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 10) - - testutils.AssertMetricsInt(t, rm.Metrics[1], "k8s.hpa.min_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 2) - - testutils.AssertMetricsInt(t, rm.Metrics[2], "k8s.hpa.current_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 5) - - testutils.AssertMetricsInt(t, rm.Metrics[3], "k8s.hpa.desired_replicas", - metricspb.MetricDescriptor_GAUGE_INT64, 7) + rm.Resource().Attributes().AsRaw()) + + require.Equal(t, 1, rm.ScopeMetrics().Len()) + sms := rm.ScopeMetrics().At(0) + require.Equal(t, 4, sms.Metrics().Len()) + sms.Metrics().Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + testutils.AssertMetricInt(t, sms.Metrics().At(0), "k8s.hpa.current_replicas", pmetric.MetricTypeGauge, 5) + testutils.AssertMetricInt(t, sms.Metrics().At(1), "k8s.hpa.desired_replicas", pmetric.MetricTypeGauge, 7) + testutils.AssertMetricInt(t, sms.Metrics().At(2), "k8s.hpa.max_replicas", pmetric.MetricTypeGauge, 10) + testutils.AssertMetricInt(t, sms.Metrics().At(3), "k8s.hpa.min_replicas", pmetric.MetricTypeGauge, 2) } diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config.go new file mode 100644 index 000000000000..f6ae9117cdef --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config.go @@ -0,0 +1,88 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import "go.opentelemetry.io/collector/confmap" + +// MetricConfig provides common config for a particular metric. +type MetricConfig struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms, confmap.WithErrorUnused()) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsConfig provides config for k8s/hpa metrics. +type MetricsConfig struct { + K8sHpaCurrentReplicas MetricConfig `mapstructure:"k8s.hpa.current_replicas"` + K8sHpaDesiredReplicas MetricConfig `mapstructure:"k8s.hpa.desired_replicas"` + K8sHpaMaxReplicas MetricConfig `mapstructure:"k8s.hpa.max_replicas"` + K8sHpaMinReplicas MetricConfig `mapstructure:"k8s.hpa.min_replicas"` +} + +func DefaultMetricsConfig() MetricsConfig { + return MetricsConfig{ + K8sHpaCurrentReplicas: MetricConfig{ + Enabled: true, + }, + K8sHpaDesiredReplicas: MetricConfig{ + Enabled: true, + }, + K8sHpaMaxReplicas: MetricConfig{ + Enabled: true, + }, + K8sHpaMinReplicas: MetricConfig{ + Enabled: true, + }, + } +} + +// ResourceAttributeConfig provides common config for a particular resource attribute. +type ResourceAttributeConfig struct { + Enabled bool `mapstructure:"enabled"` +} + +// ResourceAttributesConfig provides config for k8s/hpa resource attributes. +type ResourceAttributesConfig struct { + K8sHpaName ResourceAttributeConfig `mapstructure:"k8s.hpa.name"` + K8sHpaUID ResourceAttributeConfig `mapstructure:"k8s.hpa.uid"` + K8sNamespaceName ResourceAttributeConfig `mapstructure:"k8s.namespace.name"` +} + +func DefaultResourceAttributesConfig() ResourceAttributesConfig { + return ResourceAttributesConfig{ + K8sHpaName: ResourceAttributeConfig{ + Enabled: true, + }, + K8sHpaUID: ResourceAttributeConfig{ + Enabled: true, + }, + K8sNamespaceName: ResourceAttributeConfig{ + Enabled: true, + }, + } +} + +// MetricsBuilderConfig is a configuration for k8s/hpa metrics builder. +type MetricsBuilderConfig struct { + Metrics MetricsConfig `mapstructure:"metrics"` + ResourceAttributes ResourceAttributesConfig `mapstructure:"resource_attributes"` +} + +func DefaultMetricsBuilderConfig() MetricsBuilderConfig { + return MetricsBuilderConfig{ + Metrics: DefaultMetricsConfig(), + ResourceAttributes: DefaultResourceAttributesConfig(), + } +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config_test.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config_test.go new file mode 100644 index 000000000000..caa61e51a42a --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_config_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestMetricsBuilderConfig(t *testing.T) { + tests := []struct { + name string + want MetricsBuilderConfig + }{ + { + name: "default", + want: DefaultMetricsBuilderConfig(), + }, + { + name: "all_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + K8sHpaCurrentReplicas: MetricConfig{Enabled: true}, + K8sHpaDesiredReplicas: MetricConfig{Enabled: true}, + K8sHpaMaxReplicas: MetricConfig{Enabled: true}, + K8sHpaMinReplicas: MetricConfig{Enabled: true}, + }, + ResourceAttributes: ResourceAttributesConfig{ + K8sHpaName: ResourceAttributeConfig{Enabled: true}, + K8sHpaUID: ResourceAttributeConfig{Enabled: true}, + K8sNamespaceName: ResourceAttributeConfig{Enabled: true}, + }, + }, + }, + { + name: "none_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + K8sHpaCurrentReplicas: MetricConfig{Enabled: false}, + K8sHpaDesiredReplicas: MetricConfig{Enabled: false}, + K8sHpaMaxReplicas: MetricConfig{Enabled: false}, + K8sHpaMinReplicas: MetricConfig{Enabled: false}, + }, + ResourceAttributes: ResourceAttributesConfig{ + K8sHpaName: ResourceAttributeConfig{Enabled: false}, + K8sHpaUID: ResourceAttributeConfig{Enabled: false}, + K8sNamespaceName: ResourceAttributeConfig{Enabled: false}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadMetricsBuilderConfig(t, tt.name) + if diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{}, ResourceAttributeConfig{})); diff != "" { + t.Errorf("Config mismatch (-expected +actual):\n%s", diff) + } + }) + } +} + +func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsBuilderConfig() + require.NoError(t, component.UnmarshalConfig(sub, &cfg)) + return cfg +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go new file mode 100644 index 000000000000..69f783026dc2 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics.go @@ -0,0 +1,377 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + conventions "go.opentelemetry.io/collector/semconv/v1.9.0" +) + +type metricK8sHpaCurrentReplicas struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.current_replicas metric with initial data. +func (m *metricK8sHpaCurrentReplicas) init() { + m.data.SetName("k8s.hpa.current_replicas") + m.data.SetDescription("Current number of pod replicas managed by this autoscaler.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaCurrentReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaCurrentReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaCurrentReplicas) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaCurrentReplicas(cfg MetricConfig) metricK8sHpaCurrentReplicas { + m := metricK8sHpaCurrentReplicas{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaDesiredReplicas struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.desired_replicas metric with initial data. +func (m *metricK8sHpaDesiredReplicas) init() { + m.data.SetName("k8s.hpa.desired_replicas") + m.data.SetDescription("Desired number of pod replicas managed by this autoscaler.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaDesiredReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaDesiredReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaDesiredReplicas) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaDesiredReplicas(cfg MetricConfig) metricK8sHpaDesiredReplicas { + m := metricK8sHpaDesiredReplicas{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaMaxReplicas struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.max_replicas metric with initial data. +func (m *metricK8sHpaMaxReplicas) init() { + m.data.SetName("k8s.hpa.max_replicas") + m.data.SetDescription("Maximum number of replicas to which the autoscaler can scale up.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaMaxReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaMaxReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaMaxReplicas) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaMaxReplicas(cfg MetricConfig) metricK8sHpaMaxReplicas { + m := metricK8sHpaMaxReplicas{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sHpaMinReplicas struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.hpa.min_replicas metric with initial data. +func (m *metricK8sHpaMinReplicas) init() { + m.data.SetName("k8s.hpa.min_replicas") + m.data.SetDescription("Minimum number of replicas to which the autoscaler can scale up.") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sHpaMinReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sHpaMinReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sHpaMinReplicas) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sHpaMinReplicas(cfg MetricConfig) metricK8sHpaMinReplicas { + m := metricK8sHpaMinReplicas{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user config. +type MetricsBuilder struct { + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + resourceCapacity int // maximum observed number of resource attributes. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information + resourceAttributesConfig ResourceAttributesConfig + metricK8sHpaCurrentReplicas metricK8sHpaCurrentReplicas + metricK8sHpaDesiredReplicas metricK8sHpaDesiredReplicas + metricK8sHpaMaxReplicas metricK8sHpaMaxReplicas + metricK8sHpaMinReplicas metricK8sHpaMinReplicas +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.CreateSettings, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + resourceAttributesConfig: mbc.ResourceAttributes, + metricK8sHpaCurrentReplicas: newMetricK8sHpaCurrentReplicas(mbc.Metrics.K8sHpaCurrentReplicas), + metricK8sHpaDesiredReplicas: newMetricK8sHpaDesiredReplicas(mbc.Metrics.K8sHpaDesiredReplicas), + metricK8sHpaMaxReplicas: newMetricK8sHpaMaxReplicas(mbc.Metrics.K8sHpaMaxReplicas), + metricK8sHpaMinReplicas: newMetricK8sHpaMinReplicas(mbc.Metrics.K8sHpaMinReplicas), + } + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } + if mb.resourceCapacity < rm.Resource().Attributes().Len() { + mb.resourceCapacity = rm.Resource().Attributes().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(ResourceAttributesConfig, pmetric.ResourceMetrics) + +// WithK8sHpaName sets provided value as "k8s.hpa.name" attribute for current resource. +func WithK8sHpaName(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.K8sHpaName.Enabled { + rm.Resource().Attributes().PutStr("k8s.hpa.name", val) + } + } +} + +// WithK8sHpaUID sets provided value as "k8s.hpa.uid" attribute for current resource. +func WithK8sHpaUID(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.K8sHpaUID.Enabled { + rm.Resource().Attributes().PutStr("k8s.hpa.uid", val) + } + } +} + +// WithK8sNamespaceName sets provided value as "k8s.namespace.name" attribute for current resource. +func WithK8sNamespaceName(val string) ResourceMetricsOption { + return func(rac ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + if rac.K8sNamespaceName.Enabled { + rm.Resource().Attributes().PutStr("k8s.namespace.name", val) + } + } +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(_ ResourceAttributesConfig, rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + rm.SetSchemaUrl(conventions.SchemaURL) + rm.Resource().Attributes().EnsureCapacity(mb.resourceCapacity) + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("otelcol/k8sclusterreceiver") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricK8sHpaCurrentReplicas.emit(ils.Metrics()) + mb.metricK8sHpaDesiredReplicas.emit(ils.Metrics()) + mb.metricK8sHpaMaxReplicas.emit(ils.Metrics()) + mb.metricK8sHpaMinReplicas.emit(ils.Metrics()) + + for _, op := range rmo { + op(mb.resourceAttributesConfig, rm) + } + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user config, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := mb.metricsBuffer + mb.metricsBuffer = pmetric.NewMetrics() + return metrics +} + +// RecordK8sHpaCurrentReplicasDataPoint adds a data point to k8s.hpa.current_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaCurrentReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaCurrentReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaDesiredReplicasDataPoint adds a data point to k8s.hpa.desired_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaDesiredReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaDesiredReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaMaxReplicasDataPoint adds a data point to k8s.hpa.max_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaMaxReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaMaxReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sHpaMinReplicasDataPoint adds a data point to k8s.hpa.min_replicas metric. +func (mb *MetricsBuilder) RecordK8sHpaMinReplicasDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sHpaMinReplicas.recordDataPoint(mb.startTime, ts, val) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go new file mode 100644 index 000000000000..05903e0e9063 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/generated_metrics_test.go @@ -0,0 +1,171 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testConfigCollection int + +const ( + testSetDefault testConfigCollection = iota + testSetAll + testSetNone +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + configSet testConfigCollection + }{ + { + name: "default", + configSet: testSetDefault, + }, + { + name: "all_set", + configSet: testSetAll, + }, + { + name: "none_set", + configSet: testSetNone, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopCreateSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, test.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaCurrentReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaDesiredReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaMaxReplicasDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sHpaMinReplicasDataPoint(ts, 1) + + metrics := mb.Emit(WithK8sHpaName("attr-val"), WithK8sHpaUID("attr-val"), WithK8sNamespaceName("attr-val")) + + if test.configSet == testSetNone { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + attrCount := 0 + enabledAttrCount := 0 + attrVal, ok := rm.Resource().Attributes().Get("k8s.hpa.name") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.K8sHpaName.Enabled, ok) + if mb.resourceAttributesConfig.K8sHpaName.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("k8s.hpa.uid") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.K8sHpaUID.Enabled, ok) + if mb.resourceAttributesConfig.K8sHpaUID.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + attrVal, ok = rm.Resource().Attributes().Get("k8s.namespace.name") + attrCount++ + assert.Equal(t, mb.resourceAttributesConfig.K8sNamespaceName.Enabled, ok) + if mb.resourceAttributesConfig.K8sNamespaceName.Enabled { + enabledAttrCount++ + assert.EqualValues(t, "attr-val", attrVal.Str()) + } + assert.Equal(t, enabledAttrCount, rm.Resource().Attributes().Len()) + assert.Equal(t, attrCount, 3) + + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if test.configSet == testSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if test.configSet == testSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "k8s.hpa.current_replicas": + assert.False(t, validatedMetrics["k8s.hpa.current_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.current_replicas") + validatedMetrics["k8s.hpa.current_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Current number of pod replicas managed by this autoscaler.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.desired_replicas": + assert.False(t, validatedMetrics["k8s.hpa.desired_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.desired_replicas") + validatedMetrics["k8s.hpa.desired_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Desired number of pod replicas managed by this autoscaler.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.max_replicas": + assert.False(t, validatedMetrics["k8s.hpa.max_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.max_replicas") + validatedMetrics["k8s.hpa.max_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Maximum number of replicas to which the autoscaler can scale up.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.hpa.min_replicas": + assert.False(t, validatedMetrics["k8s.hpa.min_replicas"], "Found a duplicate in the metrics slice: k8s.hpa.min_replicas") + validatedMetrics["k8s.hpa.min_replicas"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Minimum number of replicas to which the autoscaler can scale up.", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + } + } + }) + } +} diff --git a/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml new file mode 100644 index 000000000000..4931c15a6eea --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/internal/metadata/testdata/config.yaml @@ -0,0 +1,35 @@ +default: +all_set: + metrics: + k8s.hpa.current_replicas: + enabled: true + k8s.hpa.desired_replicas: + enabled: true + k8s.hpa.max_replicas: + enabled: true + k8s.hpa.min_replicas: + enabled: true + resource_attributes: + k8s.hpa.name: + enabled: true + k8s.hpa.uid: + enabled: true + k8s.namespace.name: + enabled: true +none_set: + metrics: + k8s.hpa.current_replicas: + enabled: false + k8s.hpa.desired_replicas: + enabled: false + k8s.hpa.max_replicas: + enabled: false + k8s.hpa.min_replicas: + enabled: false + resource_attributes: + k8s.hpa.name: + enabled: false + k8s.hpa.uid: + enabled: false + k8s.namespace.name: + enabled: false diff --git a/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml b/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml new file mode 100644 index 000000000000..599dce75cb30 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/hpa/metadata.yaml @@ -0,0 +1,48 @@ +type: k8s/hpa + +sem_conv_version: 1.9.0 + +resource_attributes: + k8s.hpa.uid: + description: The k8s hpa uid. + type: string + enabled: true + + k8s.hpa.name: + description: The k8s hpa name. + type: string + enabled: true + + k8s.namespace.name: + description: The name of the namespace that the pod is running in. + type: string + enabled: true + +metrics: + k8s.hpa.max_replicas: + enabled: true + description: Maximum number of replicas to which the autoscaler can scale up. + unit: 1 + gauge: + value_type: int + + k8s.hpa.min_replicas: + enabled: true + description: Minimum number of replicas to which the autoscaler can scale up. + unit: 1 + gauge: + value_type: int + + k8s.hpa.current_replicas: + enabled: true + description: Current number of pod replicas managed by this autoscaler. + unit: 1 + gauge: + value_type: int + + k8s.hpa.desired_replicas: + enabled: true + description: Desired number of pod replicas managed by this autoscaler. + unit: 1 + gauge: + value_type: int diff --git a/receiver/k8sclusterreceiver/internal/testutils/metrics.go b/receiver/k8sclusterreceiver/internal/testutils/metrics.go index 7f5fdbd5455f..9b3a4587716c 100644 --- a/receiver/k8sclusterreceiver/internal/testutils/metrics.go +++ b/receiver/k8sclusterreceiver/internal/testutils/metrics.go @@ -9,6 +9,7 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" ) func AssertResource(t *testing.T, actualResource *resourcepb.Resource, @@ -26,6 +27,26 @@ func AssertResource(t *testing.T, actualResource *resourcepb.Resource, ) } +func AssertMetricInt(t testing.TB, m pmetric.Metric, expectedMetric string, expectedType pmetric.MetricType, expectedValue any) { + dps := assertMetric(t, m, expectedMetric, expectedType) + require.EqualValues(t, expectedValue, dps.At(0).IntValue(), "mismatching metric values") +} + +func assertMetric(t testing.TB, m pmetric.Metric, expectedMetric string, expectedType pmetric.MetricType) pmetric.NumberDataPointSlice { + require.Equal(t, expectedMetric, m.Name(), "mismatching metric names") + require.NotEmpty(t, m.Description(), "empty description on metric") + require.Equal(t, expectedType, m.Type(), "mismatching metric types") + var dps pmetric.NumberDataPointSlice + switch expectedType { + case pmetric.MetricTypeGauge: + dps = m.Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = m.Sum().DataPoints() + } + require.Equal(t, 1, dps.Len()) + return dps +} + func AssertMetricsWithLabels(t *testing.T, actualMetric *metricspb.Metric, expectedMetric string, expectedType metricspb.MetricDescriptor_Type, expectedLabels map[string]string, expectedValue int64) { diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index ae59d8182270..e2c040acc7a7 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -120,7 +120,7 @@ func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.C return nil, err } return &kubernetesReceiver{ - resourceWatcher: newResourceWatcher(set.Logger, rCfg), + resourceWatcher: newResourceWatcher(set, rCfg), settings: set, config: rCfg, consumer: consumer, diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 40a5cae5ec4f..b403092497ed 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -13,6 +13,7 @@ import ( quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" @@ -53,10 +54,10 @@ type resourceWatcher struct { type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. -func newResourceWatcher(logger *zap.Logger, cfg *Config) *resourceWatcher { +func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatcher { return &resourceWatcher{ - logger: logger, - dataCollector: collection.NewDataCollector(logger, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), + logger: set.Logger, + dataCollector: collection.NewDataCollector(set, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), initialSyncDone: &atomic.Bool{}, initialSyncTimedOut: &atomic.Bool{}, initialTimeout: defaultInitialSyncTimeout, diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index c09784b0fac6..5639fe2a35f1 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -194,7 +195,7 @@ func TestPrepareSharedInformerFactory(t *testing.T) { rw := &resourceWatcher{ client: newFakeClientWithAllResources(), logger: obsLogger, - dataCollector: collection.NewDataCollector(zap.NewNop(), []string{}, []string{}), + dataCollector: collection.NewDataCollector(receivertest.NewNopCreateSettings(), []string{}, []string{}), } assert.NoError(t, rw.prepareSharedInformerFactory())