From 0fa58d203c17344ade2433ae92eaae2571168450 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Fri, 4 Aug 2023 21:05:20 -0700 Subject: [PATCH] [receiver/k8scluster] Do not keep metrics in memory (#24769) Construct metrics on every scrape instead of keeping them in memory and copy them with modifications on every scrape. We keep k8s objects in cache anyway, so we can use that instead of the pre-built metrics. This reduces RAM utilization. This also allows us to extract a metrics builder instance and not create it on every scrape. This is the recommended approach that all other receivers follow. It ensures that any warnings defined in the metadata.yaml will be displayed only once, not on every scrape interval. --- ...erreceiver-improve-memory-utilization.yaml | 17 + .../clusterresourcequotas.go | 22 +- .../clusterresourcequotas_test.go | 7 +- .../internal/collection/collector.go | 190 ++++------ .../internal/collection/collector_test.go | 345 +++++------------- .../internal/collection/metricsstore.go | 134 ------- .../internal/collection/metricsstore_test.go | 76 ---- .../internal/container/containers.go | 20 +- .../internal/cronjob/cronjobs.go | 28 +- .../internal/cronjob/cronjobs_test.go | 38 +- .../internal/demonset/daemonsets.go | 21 +- .../internal/demonset/daemonsets_test.go | 8 +- .../internal/deployment/deployments.go | 12 +- .../internal/deployment/deployments_test.go | 12 +- .../k8sclusterreceiver/internal/hpa/hpa.go | 21 +- .../internal/hpa/hpa_test.go | 11 +- .../k8sclusterreceiver/internal/jobs/jobs.go | 24 +- .../internal/jobs/jobs_test.go | 11 +- .../internal/metadata/metadatastore.go | 39 +- .../internal/namespace/namespaces.go | 12 +- .../internal/namespace/namespaces_test.go | 29 +- .../k8sclusterreceiver/internal/node/nodes.go | 26 +- .../internal/node/nodes_test.go | 10 +- .../k8sclusterreceiver/internal/pod/pods.go | 31 +- .../internal/pod/pods_test.go | 13 +- .../internal/replicaset/replicasets.go | 18 +- .../internal/replicaset/replicasets_test.go | 7 +- .../replicationcontrollers.go | 18 +- .../replicationcontrollers_test.go | 27 +- .../internal/resourcequota/resourcequotas.go | 14 +- .../resourcequota/resourcequotas_test.go | 36 +- .../internal/statefulset/statefulsets.go | 14 +- .../internal/statefulset/statefulsets_test.go | 44 +-- .../internal/testutils/objects.go | 102 ++++++ receiver/k8sclusterreceiver/receiver.go | 10 +- receiver/k8sclusterreceiver/watcher.go | 71 +++- receiver/k8sclusterreceiver/watcher_test.go | 290 ++++++++++++++- 37 files changed, 842 insertions(+), 966 deletions(-) create mode 100755 .chloggen/k8sclusterreceiver-improve-memory-utilization.yaml delete mode 100644 receiver/k8sclusterreceiver/internal/collection/metricsstore.go delete mode 100644 receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go diff --git a/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml b/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml new file mode 100755 index 000000000000..697ee4a5b833 --- /dev/null +++ b/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml @@ -0,0 +1,17 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reduce memory utilization + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24769] + +change_logs: [user] diff --git a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go index 7bae4aa063a7..f2d90593f064 100644 --- a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go +++ b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go @@ -5,49 +5,43 @@ package clusterresourcequota // import "github.com/open-telemetry/opentelemetry- import ( "strings" - "time" quotav1 "github.com/openshift/api/quota/v1" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, crq *quotav1.ClusterResourceQuota) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, crq *quotav1.ClusterResourceQuota, ts pcommon.Timestamp) { for k, v := range crq.Status.Total.Hard { val := extractValue(k, v) - mbphase.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k)) + mb.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k)) } for k, v := range crq.Status.Total.Used { val := extractValue(k, v) - mbphase.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k)) + mb.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k)) } for _, ns := range crq.Status.Namespaces { for k, v := range ns.Status.Hard { val := extractValue(k, v) - mbphase.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k)) + mb.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k)) } for k, v := range ns.Status.Used { val := extractValue(k, v) - mbphase.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k)) + mb.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k)) } } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetOpenshiftClusterquotaName(crq.Name) rb.SetOpenshiftClusterquotaUID(string(crq.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func extractValue(k v1.ResourceName, v resource.Quantity) int64 { diff --git a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go index 84f59c8cbabd..6c3fd9d6b72a 100644 --- a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go +++ b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go @@ -6,8 +6,10 @@ package clusterresourcequota import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" @@ -19,7 +21,10 @@ import ( func TestClusterRequestQuotaMetrics(t *testing.T) { crq := testutils.NewClusterResourceQuota("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), crq) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, crq, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index e6768d40adcb..aa4685286f5f 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -4,29 +4,24 @@ package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" import ( - "reflect" "time" quotav1 "github.com/openshift/api/quota/v1" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" @@ -42,137 +37,82 @@ import ( // TODO: Consider moving some of these constants to // https://go.opentelemetry.io/collector/blob/main/model/semconv/opentelemetry.go. -// DataCollector wraps around a metricsStore and a metadaStore exposing -// methods to perform on the underlying stores. DataCollector also provides -// an interface to interact with refactored code from SignalFx Agent which is -// confined to the collection package. +// DataCollector emits metrics with CollectMetricData based on the Kubernetes API objects in the metadata store. type DataCollector struct { settings receiver.CreateSettings - metricsStore *metricsStore metadataStore *metadata.Store nodeConditionsToReport []string allocatableTypesToReport []string - metricsBuilderConfig metadata.MetricsBuilderConfig + metricsBuilder *metadata.MetricsBuilder } // NewDataCollector returns a DataCollector. -func NewDataCollector(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { +func NewDataCollector(set receiver.CreateSettings, ms *metadata.Store, + metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { return &DataCollector{ - settings: set, - metricsStore: &metricsStore{ - metricsCache: make(map[types.UID]pmetric.Metrics), - }, - metadataStore: &metadata.Store{}, + settings: set, + metadataStore: ms, nodeConditionsToReport: nodeConditionsToReport, allocatableTypesToReport: allocatableTypesToReport, - metricsBuilderConfig: metricsBuilderConfig, - } -} - -// SetupMetadataStore initializes a metadata store for the kubernetes kind. -func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) { - dc.metadataStore.Setup(gvk, store) -} - -func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { - if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil { - dc.settings.TelemetrySettings.Logger.Error( - "failed to remove from metric cache", - zap.String("obj", reflect.TypeOf(obj).String()), - zap.Error(err), - ) - } -} - -func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) { - if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil { - dc.settings.TelemetrySettings.Logger.Error( - "failed to update metric cache", - zap.String("obj", reflect.TypeOf(obj).String()), - zap.Error(err), - ) + metricsBuilder: metadata.NewMetricsBuilder(metricsBuilderConfig, set), } } func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metrics { - return dc.metricsStore.getMetricData(currentTime) -} - -// SyncMetrics updates the metric store with latest metrics from the kubernetes object. -func (dc *DataCollector) SyncMetrics(obj interface{}) { - var md pmetric.Metrics - - switch o := obj.(type) { - case *corev1.Pod: - md = pod.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.Node: - md = node.GetMetrics(dc.settings, dc.metricsBuilderConfig, o, dc.nodeConditionsToReport, dc.allocatableTypesToReport) - case *corev1.Namespace: - md = namespace.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.ReplicationController: - md = replicationcontroller.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.ResourceQuota: - md = resourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.Deployment: - md = deployment.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.ReplicaSet: - md = replicaset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.DaemonSet: - md = demonset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.StatefulSet: - md = statefulset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1.Job: - md = jobs.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1.CronJob: - md = cronjob.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1beta1.CronJob: - md = cronjob.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o) - case *autoscalingv2.HorizontalPodAutoscaler: - md = hpa.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *autoscalingv2beta2.HorizontalPodAutoscaler: - md = hpa.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o) - case *quotav1.ClusterResourceQuota: - md = clusterresourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - default: - return - } - - if md.DataPointCount() == 0 { - return - } - - dc.UpdateMetricsStore(obj, md) -} - -// SyncMetadata updates the metric store with latest metrics from the kubernetes object -func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { - km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{} - switch o := obj.(type) { - case *corev1.Pod: - km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger) - case *corev1.Node: - km = node.GetMetadata(o) - case *corev1.ReplicationController: - km = replicationcontroller.GetMetadata(o) - case *appsv1.Deployment: - km = deployment.GetMetadata(o) - case *appsv1.ReplicaSet: - km = replicaset.GetMetadata(o) - case *appsv1.DaemonSet: - km = demonset.GetMetadata(o) - case *appsv1.StatefulSet: - km = statefulset.GetMetadata(o) - case *batchv1.Job: - km = jobs.GetMetadata(o) - case *batchv1.CronJob: - km = cronjob.GetMetadata(o) - case *batchv1beta1.CronJob: - km = cronjob.GetMetadataBeta(o) - case *autoscalingv2.HorizontalPodAutoscaler: - km = hpa.GetMetadata(o) - case *autoscalingv2beta2.HorizontalPodAutoscaler: - km = hpa.GetMetadataBeta(o) - } - - return km + ts := pcommon.NewTimestampFromTime(currentTime) + customRMs := pmetric.NewResourceMetricsSlice() + + dc.metadataStore.ForEach(gvk.Pod, func(o any) { + pod.RecordMetrics(dc.settings.Logger, dc.metricsBuilder, o.(*corev1.Pod), ts) + }) + dc.metadataStore.ForEach(gvk.Node, func(o any) { + crm := node.CustomMetrics(dc.settings, dc.metricsBuilder.NewResourceBuilder(), o.(*corev1.Node), + dc.nodeConditionsToReport, dc.allocatableTypesToReport, ts) + if crm.ScopeMetrics().Len() > 0 { + crm.MoveTo(customRMs.AppendEmpty()) + } + }) + dc.metadataStore.ForEach(gvk.Namespace, func(o any) { + namespace.RecordMetrics(dc.metricsBuilder, o.(*corev1.Namespace), ts) + }) + dc.metadataStore.ForEach(gvk.ReplicationController, func(o any) { + replicationcontroller.RecordMetrics(dc.metricsBuilder, o.(*corev1.ReplicationController), ts) + }) + dc.metadataStore.ForEach(gvk.ResourceQuota, func(o any) { + resourcequota.RecordMetrics(dc.metricsBuilder, o.(*corev1.ResourceQuota), ts) + }) + dc.metadataStore.ForEach(gvk.Deployment, func(o any) { + deployment.RecordMetrics(dc.metricsBuilder, o.(*appsv1.Deployment), ts) + }) + dc.metadataStore.ForEach(gvk.ReplicaSet, func(o any) { + replicaset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.ReplicaSet), ts) + }) + dc.metadataStore.ForEach(gvk.DaemonSet, func(o any) { + demonset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.DaemonSet), ts) + }) + dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) { + statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts) + }) + dc.metadataStore.ForEach(gvk.Job, func(o any) { + jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts) + }) + dc.metadataStore.ForEach(gvk.CronJob, func(o any) { + cronjob.RecordMetrics(dc.metricsBuilder, o.(*batchv1.CronJob), ts) + }) + dc.metadataStore.ForEach(gvk.CronJobBeta, func(o any) { + cronjob.RecordMetricsBeta(dc.metricsBuilder, o.(*batchv1beta1.CronJob), ts) + }) + dc.metadataStore.ForEach(gvk.HorizontalPodAutoscaler, func(o any) { + hpa.RecordMetrics(dc.metricsBuilder, o.(*autoscalingv2.HorizontalPodAutoscaler), ts) + }) + dc.metadataStore.ForEach(gvk.HorizontalPodAutoscalerBeta, func(o any) { + hpa.RecordMetricsBeta(dc.metricsBuilder, o.(*autoscalingv2beta2.HorizontalPodAutoscaler), ts) + }) + dc.metadataStore.ForEach(gvk.ClusterResourceQuota, func(o any) { + clusterresourcequota.RecordMetrics(dc.metricsBuilder, o.(*quotav1.ClusterResourceQuota), ts) + }) + + m := dc.metricsBuilder.Emit() + customRMs.MoveAndAppendTo(m.ResourceMetrics()) + return m } diff --git a/receiver/k8sclusterreceiver/internal/collection/collector_test.go b/receiver/k8sclusterreceiver/internal/collection/collector_test.go index 50ea8fb0c1e2..0c34bb595b1d 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector_test.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector_test.go @@ -5,289 +5,118 @@ package collection import ( "testing" + "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) -var commonPodMetadata = map[string]string{ - "foo": "bar", - "foo1": "", - "pod.creation_timestamp": "0001-01-01T00:00:00Z", -} - -var allPodMetadata = func(metadata map[string]string) map[string]string { - out := maps.MergeStringMaps(metadata, commonPodMetadata) - return out -} +func TestCollectMetricData(t *testing.T) { + ms := metadata.NewStore() + var expectedRMs int -func TestDataCollectorSyncMetadata(t *testing.T) { - tests := []struct { - name string - metadataStore *metadata.Store - resource interface{} - want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata - }{ - { - name: "Pod and container metadata simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewPodWithContainer( - "0", + ms.Setup(gvk.Pod, &testutils.MockStore{ + Cache: map[string]interface{}{ + "pod1-uid": testutils.NewPodWithContainer( + "1", testutils.NewPodSpecWithContainer("container-name"), testutils.NewPodStatusWithContainer("container-name", "container-id"), ), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: commonPodMetadata, - }, - experimentalmetricmetadata.ResourceID("container-id"): { - EntityType: "container", - ResourceIDKey: "container.id", - ResourceID: "container-id", - Metadata: map[string]string{ - "container.status": "running", - }, - }, - }, }, - { - name: "Pod with Owner Reference", - metadataStore: &metadata.Store{}, - resource: testutils.WithOwnerReferences([]v1.OwnerReference{ - { - Kind: "StatefulSet", - Name: "test-statefulset-0", - UID: "test-statefulset-0-uid", - }, - }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: allPodMetadata(map[string]string{ - "k8s.workload.kind": "StatefulSet", - "k8s.workload.name": "test-statefulset-0", - "k8s.statefulset.name": "test-statefulset-0", - "k8s.statefulset.uid": "test-statefulset-0-uid", - }), - }, - }, + }) + expectedRMs += 2 // 1 for pod, 1 for container + + ms.Setup(gvk.Node, &testutils.MockStore{ + Cache: map[string]interface{}{ + "node1-uid": testutils.NewNode("1"), + "node2-uid": testutils.NewNode("2"), }, - { - name: "Pod with Service metadata", - metadataStore: &metadata.Store{ - Services: &testutils.MockStore{ - Cache: map[string]interface{}{ - "test-namespace/test-service": &corev1.Service{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-service", - Namespace: "test-namespace", - UID: "test-service-uid", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "k8s-app": "my-app", - }, - }, - }, - }, - }, - }, - resource: podWithAdditionalLabels( - map[string]string{"k8s-app": "my-app"}, - testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}), - ), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: allPodMetadata(map[string]string{ - "k8s.service.test-service": "", - "k8s-app": "my-app", - }), - }, - }, + }) + expectedRMs += 2 + + ms.Setup(gvk.Namespace, &testutils.MockStore{ + Cache: map[string]interface{}{ + "namespace1-uid": testutils.NewNamespace("1"), }, - { - name: "Daemonset simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewDaemonset("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): { - EntityType: "k8s.daemonset", - ResourceIDKey: "k8s.daemonset.uid", - ResourceID: "test-daemonset-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "DaemonSet", - "k8s.workload.name": "test-daemonset-1", - "daemonset.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ReplicationController, &testutils.MockStore{ + Cache: map[string]interface{}{ + "replicationcontroller1-uid": testutils.NewReplicationController("1"), }, - { - name: "Deployment simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewDeployment("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): { - EntityType: "k8s.deployment", - ResourceIDKey: "k8s.deployment.uid", - ResourceID: "test-deployment-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "Deployment", - "k8s.workload.name": "test-deployment-1", - "k8s.deployment.name": "test-deployment-1", - "deployment.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ResourceQuota, &testutils.MockStore{ + Cache: map[string]interface{}{ + "resourcequota1-uid": testutils.NewResourceQuota("1"), }, - { - name: "HPA simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewHPA("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): { - EntityType: "k8s.hpa", - ResourceIDKey: "k8s.hpa.uid", - ResourceID: "test-hpa-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "HPA", - "k8s.workload.name": "test-hpa-1", - "hpa.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.Deployment, &testutils.MockStore{ + Cache: map[string]interface{}{ + "deployment1-uid": testutils.NewDeployment("1"), }, - { - name: "Job simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewJob("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-job-1-uid"): { - EntityType: "k8s.job", - ResourceIDKey: "k8s.job.uid", - ResourceID: "test-job-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.workload.kind": "Job", - "k8s.workload.name": "test-job-1", - "job.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ReplicaSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "replicaset1-uid": testutils.NewReplicaSet("1"), }, - { - name: "Node simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewNode("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-node-1-uid"): { - EntityType: "k8s.node", - ResourceIDKey: "k8s.node.uid", - ResourceID: "test-node-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.node.name": "test-node-1", - "node.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.DaemonSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "daemonset1-uid": testutils.NewDaemonset("1"), }, - { - name: "ReplicaSet simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewReplicaSet("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): { - EntityType: "k8s.replicaset", - ResourceIDKey: "k8s.replicaset.uid", - ResourceID: "test-replicaset-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.workload.kind": "ReplicaSet", - "k8s.workload.name": "test-replicaset-1", - "replicaset.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.StatefulSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "statefulset1-uid": testutils.NewStatefulset("1"), }, - { - name: "ReplicationController simple case", - metadataStore: &metadata.Store{}, - resource: &corev1.ReplicationController{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-replicationcontroller-1", - Namespace: "test-namespace", - UID: types.UID("test-replicationcontroller-1-uid"), - }, - }, - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): { - EntityType: "k8s.replicationcontroller", - ResourceIDKey: "k8s.replicationcontroller.uid", - ResourceID: "test-replicationcontroller-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "ReplicationController", - "k8s.workload.name": "test-replicationcontroller-1", - "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.Job, &testutils.MockStore{ + Cache: map[string]interface{}{ + "job1-uid": testutils.NewJob("1"), }, - } + }) + expectedRMs++ - for _, tt := range tests { - observedLogger, _ := observer.New(zapcore.WarnLevel) - set := receivertest.NewNopCreateSettings() - set.TelemetrySettings.Logger = zap.New(observedLogger) - t.Run(tt.name, func(t *testing.T) { - dc := &DataCollector{ - settings: set, - metadataStore: tt.metadataStore, - nodeConditionsToReport: []string{}, - } + ms.Setup(gvk.CronJob, &testutils.MockStore{ + Cache: map[string]interface{}{ + "cronjob1-uid": testutils.NewCronJob("1"), + }, + }) + expectedRMs++ - actual := dc.SyncMetadata(tt.resource) - require.Equal(t, len(tt.want), len(actual)) + ms.Setup(gvk.HorizontalPodAutoscaler, &testutils.MockStore{ + Cache: map[string]interface{}{ + "horizontalpodautoscaler1-uid": testutils.NewHPA("1"), + }, + }) + expectedRMs++ - for key, item := range tt.want { - got, exists := actual[key] - require.True(t, exists) - require.Equal(t, *item, *got) - } - }) - } -} + dc := NewDataCollector(receivertest.NewNopCreateSettings(), ms, metadata.DefaultMetricsBuilderConfig(), []string{"Ready"}, nil) + m1 := dc.CollectMetricData(time.Now()) -func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) interface{} { - if pod.Labels == nil { - pod.Labels = make(map[string]string, len(labels)) - } + // Verify number of resource metrics only, content is tested in other tests. + assert.Equal(t, expectedRMs, m1.ResourceMetrics().Len()) - for k, v := range labels { - pod.Labels[k] = v - } + m2 := dc.CollectMetricData(time.Now()) - return pod + // Second scrape should be the same as the first one except for the timestamp. + assert.NoError(t, pmetrictest.CompareMetrics(m1, m2, pmetrictest.IgnoreTimestamp())) } diff --git a/receiver/k8sclusterreceiver/internal/collection/metricsstore.go b/receiver/k8sclusterreceiver/internal/collection/metricsstore.go deleted file mode 100644 index 4297e00879f2..000000000000 --- a/receiver/k8sclusterreceiver/internal/collection/metricsstore.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" - -import ( - "sync" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" -) - -// metricsStore keeps track of the metrics being pushed along the pipeline -// every interval. Since Kubernetes events that generate these metrics are -// aperiodic, the values in this cache will be pushed along the pipeline -// until the next Kubernetes event pertaining to an object. -type metricsStore struct { - sync.RWMutex - metricsCache map[types.UID]pmetric.Metrics -} - -// updates metricsStore with latest metrics. -func (ms *metricsStore) update(obj runtime.Object, md pmetric.Metrics) error { - ms.Lock() - defer ms.Unlock() - - key, err := utils.GetUIDForObject(obj) - if err != nil { - return err - } - - ms.metricsCache[key] = md - return nil -} - -// removes entry from metric cache when resources are deleted. -func (ms *metricsStore) remove(obj runtime.Object) error { - ms.Lock() - defer ms.Unlock() - - key, err := utils.GetUIDForObject(obj) - if err != nil { - return err - } - - delete(ms.metricsCache, key) - return nil -} - -// getMetricData returns metricsCache stored in the cache at a given point in time. -func (ms *metricsStore) getMetricData(currentTime time.Time) pmetric.Metrics { - ms.RLock() - defer ms.RUnlock() - - currentTimestamp := pcommon.NewTimestampFromTime(currentTime) - out := pmetric.NewMetrics() - for _, md := range ms.metricsCache { - // Set datapoint timestamp to be time of retrieval from cache. - applyCurrentTime(md, currentTimestamp) - rms := pmetric.NewResourceMetricsSlice() - md.ResourceMetrics().CopyTo(rms) - rms.MoveAndAppendTo(out.ResourceMetrics()) - } - - return out -} - -func applyCurrentTime(md pmetric.Metrics, t pcommon.Timestamp) { - rms := md.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - sms := rms.At(i).ScopeMetrics() - for j := 0; j < sms.Len(); j++ { - ms := sms.At(j).Metrics() - for k := 0; k < ms.Len(); k++ { - switch ms.At(k).Type() { - case pmetric.MetricTypeGauge: - applyCurrentTimeNumberDataPoint(ms.At(k).Gauge().DataPoints(), t) - case pmetric.MetricTypeSum: - applyCurrentTimeNumberDataPoint(ms.At(k).Sum().DataPoints(), t) - case pmetric.MetricTypeEmpty: - case pmetric.MetricTypeHistogram: - applyCurrentTimeHistogramDataPoint(ms.At(k).Histogram().DataPoints(), t) - case pmetric.MetricTypeExponentialHistogram: - applyCurrentTimeExponentialHistogramDataPoint(ms.At(k).ExponentialHistogram().DataPoints(), t) - case pmetric.MetricTypeSummary: - applyCurrentTimeSummaryDataPoint(ms.At(k).Summary().DataPoints(), t) - } - } - } - } -} - -func applyCurrentTimeSummaryDataPoint(dps pmetric.SummaryDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - } -} -func applyCurrentTimeHistogramDataPoint(dps pmetric.HistogramDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - for j := 0; j < dp.Exemplars().Len(); j++ { - dp.Exemplars().At(j).SetTimestamp(t) - } - } -} - -func applyCurrentTimeExponentialHistogramDataPoint(dps pmetric.ExponentialHistogramDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - for j := 0; j < dp.Exemplars().Len(); j++ { - dp.Exemplars().At(j).SetTimestamp(t) - } - } -} - -func applyCurrentTimeNumberDataPoint(dps pmetric.NumberDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - switch dps.At(i).ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - dps.At(i).SetTimestamp(t) - case pmetric.NumberDataPointValueTypeInt: - dps.At(i).SetTimestamp(t) - case pmetric.NumberDataPointValueTypeEmpty: - } - } -} diff --git a/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go b/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go deleted file mode 100644 index 9d969084db95..000000000000 --- a/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package collection - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pmetric" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" -) - -func TestMetricsStoreOperations(t *testing.T) { - ms := metricsStore{ - metricsCache: make(map[types.UID]pmetric.Metrics), - } - - updates := []struct { - id types.UID - rm pmetric.Metrics - }{ - { - id: types.UID("test-uid-1"), - rm: func() pmetric.Metrics { - m := pmetric.NewMetrics() - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k1", "v1") - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k2", "v2") - return m - }(), - }, - { - id: types.UID("test-uid-2"), - rm: func() pmetric.Metrics { - m := pmetric.NewMetrics() - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k3", "v3") - return m - }(), - }, - } - - // Update metric store with metrics - for _, u := range updates { - require.NoError(t, ms.update(&corev1.Pod{ObjectMeta: v1.ObjectMeta{UID: u.id}}, u.rm)) - } - - // Asset values form updates - expectedMetricData := 0 - for _, u := range updates { - require.Contains(t, ms.metricsCache, u.id) - require.Equal(t, u.rm.ResourceMetrics().Len(), ms.metricsCache[u.id].ResourceMetrics().Len()) - expectedMetricData += u.rm.ResourceMetrics().Len() - } - require.Equal(t, expectedMetricData, ms.getMetricData(time.Now()).ResourceMetrics().Len()) - - // Remove non existent item - require.NoError(t, ms.remove(&corev1.Pod{ - ObjectMeta: v1.ObjectMeta{ - UID: "1", - }, - })) - require.Equal(t, len(updates), len(ms.metricsCache)) - - // Remove valid item from cache - require.NoError(t, ms.remove(&corev1.Pod{ - ObjectMeta: v1.ObjectMeta{ - UID: updates[1].id, - }, - })) - expectedMetricData -= updates[1].rm.ResourceMetrics().Len() - require.Equal(t, len(updates)-1, len(ms.metricsCache)) - require.Equal(t, expectedMetricData, ms.getMetricData(time.Now()).ResourceMetrics().Len()) -} diff --git a/receiver/k8sclusterreceiver/internal/container/containers.go b/receiver/k8sclusterreceiver/internal/container/containers.go index 94cb1cd0c86e..53eb807b2e18 100644 --- a/receiver/k8sclusterreceiver/internal/container/containers.go +++ b/receiver/k8sclusterreceiver/internal/container/containers.go @@ -4,11 +4,7 @@ package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -31,11 +27,9 @@ const ( containerStatusTerminated = "terminated" ) -// GetSpecMetrics metricizes values from the container spec. +// RecordSpecMetrics metricizes values from the container spec. // This includes values like resource requests and limits. -func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, c corev1.Container, pod *corev1.Pod) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordSpecMetrics(logger *zap.Logger, mb *imetadata.MetricsBuilder, c corev1.Container, pod *corev1.Pod, ts pcommon.Timestamp) { for k, r := range c.Resources.Requests { //exhaustive:ignore switch k { @@ -48,7 +42,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. case corev1.ResourceEphemeralStorage: mb.RecordK8sContainerEphemeralstorageRequestDataPoint(ts, r.Value()) default: - set.Logger.Debug("unsupported request type", zap.Any("type", k)) + logger.Debug("unsupported request type", zap.Any("type", k)) } } for k, l := range c.Resources.Limits { @@ -63,7 +57,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. case corev1.ResourceEphemeralStorage: mb.RecordK8sContainerEphemeralstorageLimitDataPoint(ts, l.Value()) default: - set.Logger.Debug("unsupported request type", zap.Any("type", k)) + logger.Debug("unsupported request type", zap.Any("type", k)) } } var containerID string @@ -78,7 +72,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. } } - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sPodUID(string(pod.UID)) rb.SetK8sPodName(pod.Name) rb.SetK8sNodeName(pod.Spec.NodeName) @@ -88,12 +82,12 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. rb.SetK8sContainerName(c.Name) image, err := docker.ParseImageName(imageStr) if err != nil { - docker.LogParseError(err, imageStr, set.Logger) + docker.LogParseError(err, imageStr, logger) } else { rb.SetContainerImageName(image.Repository) rb.SetContainerImageTag(image.Tag) } - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } func GetMetadata(cs corev1.ContainerStatus) *metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go index dece80dc2cda..e7a49e6c15da 100644 --- a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go +++ b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go @@ -4,18 +4,13 @@ package cronjob // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) const ( @@ -24,32 +19,25 @@ const ( cronJobKeyConcurrencyPolicy = "concurrency_policy" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, cj *batchv1.CronJob) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) +func RecordMetrics(mb *metadata.MetricsBuilder, cj *batchv1.CronJob, ts pcommon.Timestamp) { + mb.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(cj.Namespace) rb.SetK8sCronjobUID(string(cj.UID)) rb.SetK8sCronjobName(cj.Name) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } -func GetMetricsBeta(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, cj *batchv1beta1.CronJob) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) - - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) +func RecordMetricsBeta(mb *metadata.MetricsBuilder, cj *batchv1beta1.CronJob, ts pcommon.Timestamp) { + mb.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(cj.Namespace) rb.SetK8sCronjobUID(string(cj.UID)) rb.SetK8sCronjobName(cj.Name) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(cj *batchv1.CronJob) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go index 4b0476edda34..c60622d9390e 100644 --- a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go +++ b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go @@ -6,23 +6,26 @@ package cronjob import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestCronJobMetrics(t *testing.T) { - cj := newCronJob("1") + cj := testutils.NewCronJob("1") + + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, cj, ts) + m := mb.Emit() - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), cj) expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, @@ -36,7 +39,7 @@ func TestCronJobMetrics(t *testing.T) { } func TestCronJobMetadata(t *testing.T) { - cj := newCronJob("1") + cj := testutils.NewCronJob("1") actualMetadata := GetMetadata(cj) @@ -61,24 +64,3 @@ func TestCronJobMetadata(t *testing.T) { *actualMetadata["test-cronjob-1-uid"], ) } - -func newCronJob(id string) *batchv1.CronJob { - return &batchv1.CronJob{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-cronjob-" + id, - Namespace: "test-namespace", - UID: types.UID("test-cronjob-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Spec: batchv1.CronJobSpec{ - Schedule: "schedule", - ConcurrencyPolicy: "concurrency_policy", - }, - Status: batchv1.CronJobStatus{ - Active: []corev1.ObjectReference{{}, {}}, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go b/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go index fc07441632e3..303e9c9a24d9 100644 --- a/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go +++ b/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go @@ -4,17 +4,12 @@ package demonset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) // Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization. @@ -31,20 +26,18 @@ func Transform(ds *appsv1.DaemonSet) *appsv1.DaemonSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, ds *appsv1.DaemonSet) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - mbphase.RecordK8sDaemonsetCurrentScheduledNodesDataPoint(ts, int64(ds.Status.CurrentNumberScheduled)) - mbphase.RecordK8sDaemonsetDesiredScheduledNodesDataPoint(ts, int64(ds.Status.DesiredNumberScheduled)) - mbphase.RecordK8sDaemonsetMisscheduledNodesDataPoint(ts, int64(ds.Status.NumberMisscheduled)) - mbphase.RecordK8sDaemonsetReadyNodesDataPoint(ts, int64(ds.Status.NumberReady)) +func RecordMetrics(mb *metadata.MetricsBuilder, ds *appsv1.DaemonSet, ts pcommon.Timestamp) { + mb.RecordK8sDaemonsetCurrentScheduledNodesDataPoint(ts, int64(ds.Status.CurrentNumberScheduled)) + mb.RecordK8sDaemonsetDesiredScheduledNodesDataPoint(ts, int64(ds.Status.DesiredNumberScheduled)) + mb.RecordK8sDaemonsetMisscheduledNodesDataPoint(ts, int64(ds.Status.NumberMisscheduled)) + mb.RecordK8sDaemonsetReadyNodesDataPoint(ts, int64(ds.Status.NumberReady)) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(ds.Namespace) rb.SetK8sDaemonsetName(ds.Name) rb.SetK8sDaemonsetUID(string(ds.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(ds *appsv1.DaemonSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go b/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go index 2cebe4a3616b..5222d3d91510 100644 --- a/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go +++ b/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go @@ -6,9 +6,11 @@ package demonset import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -23,7 +25,11 @@ import ( func TestDaemonsetMetrics(t *testing.T) { ds := testutils.NewDaemonset("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), ds) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, ds, ts) + m := mb.Emit() + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/deployment/deployments.go b/receiver/k8sclusterreceiver/internal/deployment/deployments.go index ebbf829cac09..207b5c5caaa0 100644 --- a/receiver/k8sclusterreceiver/internal/deployment/deployments.go +++ b/receiver/k8sclusterreceiver/internal/deployment/deployments.go @@ -4,11 +4,7 @@ package deployment // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" appsv1 "k8s.io/api/apps/v1" @@ -32,17 +28,15 @@ func Transform(deployment *appsv1.Deployment) *appsv1.Deployment { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, dep *appsv1.Deployment) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *imetadata.MetricsBuilder, dep *appsv1.Deployment, ts pcommon.Timestamp) { mb.RecordK8sDeploymentDesiredDataPoint(ts, int64(*dep.Spec.Replicas)) mb.RecordK8sDeploymentAvailableDataPoint(ts, int64(dep.Status.AvailableReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sDeploymentName(dep.Name) rb.SetK8sDeploymentUID(string(dep.UID)) rb.SetK8sNamespaceName(dep.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(dep *appsv1.Deployment) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go b/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go index 24dc7c86bae4..1ce4544310b7 100644 --- a/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go +++ b/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go @@ -6,9 +6,11 @@ package deployment import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" @@ -24,7 +26,10 @@ import ( func TestDeploymentMetrics(t *testing.T) { dep := testutils.NewDeployment("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), dep) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, dep, ts) + m := mb.Emit() require.Equal(t, 1, m.ResourceMetrics().Len()) require.Equal(t, 2, m.MetricCount()) @@ -51,7 +56,10 @@ func TestDeploymentMetrics(t *testing.T) { func TestGoldenFile(t *testing.T) { dep := testutils.NewDeployment("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), dep) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, dep, ts) + m := mb.Emit() expectedFile := filepath.Join("testdata", "expected.yaml") expected, err := golden.ReadMetrics(expectedFile) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa.go b/receiver/k8sclusterreceiver/internal/hpa/hpa.go index 19d965aa1e20..380f1fac87a7 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa.go @@ -4,45 +4,36 @@ package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetricsBeta(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetricsBeta(mb *metadata.MetricsBuilder, hpa *autoscalingv2beta2.HorizontalPodAutoscaler, ts pcommon.Timestamp) { mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sHpaUID(string(hpa.UID)) rb.SetK8sHpaName(hpa.Name) rb.SetK8sNamespaceName(hpa.Namespace) - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, hpa *autoscalingv2.HorizontalPodAutoscaler) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *metadata.MetricsBuilder, hpa *autoscalingv2.HorizontalPodAutoscaler, ts pcommon.Timestamp) { mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sHpaUID(string(hpa.UID)) rb.SetK8sHpaName(hpa.Name) rb.SetK8sNamespaceName(hpa.Namespace) - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(hpa *autoscalingv2.HorizontalPodAutoscaler) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go index 479652033541..5d971412bbe3 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go @@ -5,9 +5,11 @@ package hpa import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -18,10 +20,13 @@ import ( func TestHPAMetrics(t *testing.T) { hpa := testutils.NewHPA("1") - md := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), hpa) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, hpa, ts) + m := mb.Emit() - require.Equal(t, 1, md.ResourceMetrics().Len()) - rm := md.ResourceMetrics().At(0) + require.Equal(t, 1, m.ResourceMetrics().Len()) + rm := m.ResourceMetrics().At(0) assert.Equal(t, map[string]any{ "k8s.hpa.uid": "test-hpa-1-uid", diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs.go b/receiver/k8sclusterreceiver/internal/jobs/jobs.go index 876d8a7ec5b8..fdd0e4e07ade 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs.go @@ -4,40 +4,32 @@ package jobs // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" batchv1 "k8s.io/api/batch/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, j *batchv1.Job) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sJobActivePodsDataPoint(ts, int64(j.Status.Active)) - mbphase.RecordK8sJobFailedPodsDataPoint(ts, int64(j.Status.Failed)) - mbphase.RecordK8sJobSuccessfulPodsDataPoint(ts, int64(j.Status.Succeeded)) +func RecordMetrics(mb *metadata.MetricsBuilder, j *batchv1.Job, ts pcommon.Timestamp) { + mb.RecordK8sJobActivePodsDataPoint(ts, int64(j.Status.Active)) + mb.RecordK8sJobFailedPodsDataPoint(ts, int64(j.Status.Failed)) + mb.RecordK8sJobSuccessfulPodsDataPoint(ts, int64(j.Status.Succeeded)) if j.Spec.Completions != nil { - mbphase.RecordK8sJobDesiredSuccessfulPodsDataPoint(ts, int64(*j.Spec.Completions)) + mb.RecordK8sJobDesiredSuccessfulPodsDataPoint(ts, int64(*j.Spec.Completions)) } if j.Spec.Parallelism != nil { - mbphase.RecordK8sJobMaxParallelPodsDataPoint(ts, int64(*j.Spec.Parallelism)) + mb.RecordK8sJobMaxParallelPodsDataPoint(ts, int64(*j.Spec.Parallelism)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(j.Namespace) rb.SetK8sJobName(j.Name) rb.SetK8sJobUID(string(j.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } // Transform transforms the job to remove the fields that we don't use to reduce RAM utilization. diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go index 642657449c05..f76e070dc138 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go @@ -6,9 +6,11 @@ package jobs import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -23,7 +25,10 @@ import ( func TestJobMetrics(t *testing.T) { j := testutils.NewJob("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), j) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, j, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -35,10 +40,12 @@ func TestJobMetrics(t *testing.T) { pmetrictest.IgnoreScopeMetricsOrder(), ), ) + // Test with nil values. j.Spec.Completions = nil j.Spec.Parallelism = nil - m = GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), j) + RecordMetrics(mb, j, ts) + m = mb.Emit() expected, err = golden.ReadMetrics(filepath.Join("testdata", "expected_empty.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go b/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go index 1ec348b13b5f..546f3ba62142 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go +++ b/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go @@ -6,27 +6,40 @@ package metadata // import "github.com/open-telemetry/opentelemetry-collector-co import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" ) // Store keeps track of required caches exposed by informers. // This store is used while collecting metadata about Pods to be able // to correlate other Kubernetes objects with a Pod. type Store struct { - Services cache.Store - Jobs cache.Store - ReplicaSets cache.Store + stores map[schema.GroupVersionKind]cache.Store +} + +// NewStore creates a new Store. +func NewStore() *Store { + return &Store{ + stores: make(map[schema.GroupVersionKind]cache.Store), + } +} + +// Get returns a cache.Store for a given GroupVersionKind. +func (ms *Store) Get(gvk schema.GroupVersionKind) cache.Store { + return ms.stores[gvk] } // Setup tracks metadata of services, jobs and replicasets. -func (ms *Store) Setup(kind schema.GroupVersionKind, store cache.Store) { - switch kind { - case gvk.Service: - ms.Services = store - case gvk.Job: - ms.Jobs = store - case gvk.ReplicaSet: - ms.ReplicaSets = store +func (ms *Store) Setup(gvk schema.GroupVersionKind, store cache.Store) { + ms.stores[gvk] = store +} + +// ForEach iterates over all objects in a given cache.Store. +func (ms *Store) ForEach(gvk schema.GroupVersionKind, f func(o any)) { + store := ms.Get(gvk) + if store == nil { + // This is normal, not all caches are set up, e.g. ClusterResourceQuota is only available in OpenShift. + return + } + for _, obj := range store.List() { + f(obj) } } diff --git a/receiver/k8sclusterreceiver/internal/namespace/namespaces.go b/receiver/k8sclusterreceiver/internal/namespace/namespaces.go index 6e4e08377fa3..19fc86377049 100644 --- a/receiver/k8sclusterreceiver/internal/namespace/namespaces.go +++ b/receiver/k8sclusterreceiver/internal/namespace/namespaces.go @@ -4,25 +4,19 @@ package namespace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/namespace" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, ns *corev1.Namespace) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *imetadata.MetricsBuilder, ns *corev1.Namespace, ts pcommon.Timestamp) { mb.RecordK8sNamespacePhaseDataPoint(ts, int64(namespacePhaseValues[ns.Status.Phase])) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceUID(string(ns.UID)) rb.SetK8sNamespaceName(ns.Name) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } var namespacePhaseValues = map[corev1.NamespacePhase]int32{ diff --git a/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go b/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go index 8f85f3854b91..86a9599e1bb4 100644 --- a/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go +++ b/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go @@ -6,21 +6,24 @@ package namespace import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestNamespaceMetrics(t *testing.T) { - n := newNamespace("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), n) + n := testutils.NewNamespace("1") + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, n, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -33,19 +36,3 @@ func TestNamespaceMetrics(t *testing.T) { ), ) } - -func newNamespace(id string) *corev1.Namespace { - return &corev1.Namespace{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-namespace-" + id, - UID: types.UID("test-namespace-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Status: corev1.NamespaceStatus{ - Phase: corev1.NamespaceTerminating, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/node/nodes.go b/receiver/k8sclusterreceiver/internal/node/nodes.go index de5fd3551c1b..77b2981647ca 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes.go @@ -43,17 +43,11 @@ func Transform(node *corev1.Node) *corev1.Node { return newNode } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string) pmetric.Metrics { - ts := pcommon.NewTimestampFromTime(time.Now()) - ms := pmetric.NewMetrics() - rm := ms.ResourceMetrics().AppendEmpty() +func CustomMetrics(set receiver.CreateSettings, rb *metadata.ResourceBuilder, node *corev1.Node, nodeConditionTypesToReport, + allocatableTypesToReport []string, ts pcommon.Timestamp) pmetric.ResourceMetrics { + rm := pmetric.NewResourceMetrics() - // TODO: Generate a schema URL for the node metrics in the metadata package and use them here. - rm.SetSchemaUrl(conventions.SchemaURL) sm := rm.ScopeMetrics().AppendEmpty() - sm.Scope().SetName("otelcol/k8sclusterreceiver") - sm.Scope().SetVersion(set.BuildInfo.Version) - // Adding 'node condition type' metrics for _, nodeConditionTypeValue := range nodeConditionTypesToReport { v1NodeConditionTypeValue := corev1.NodeConditionType(nodeConditionTypeValue) @@ -85,13 +79,21 @@ func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig metadata.Metri setNodeAllocatableValue(dp, v1NodeAllocatableTypeValue, quantity) dp.SetTimestamp(ts) } - rb := metadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + + if sm.Metrics().Len() == 0 { + return pmetric.NewResourceMetrics() + } + + // TODO: Generate a schema URL for the node metrics in the metadata package and use them here. + rm.SetSchemaUrl(conventions.SchemaURL) + sm.Scope().SetName("otelcol/k8sclusterreceiver") + sm.Scope().SetVersion(set.BuildInfo.Version) + rb.SetK8sNodeUID(string(node.UID)) rb.SetK8sNodeName(node.Name) rb.SetOpencensusResourcetype("k8s") rb.Emit().MoveTo(rm.Resource()) - return ms - + return rm } var nodeConditionValues = map[corev1.ConditionStatus]int64{ diff --git a/receiver/k8sclusterreceiver/internal/node/nodes_test.go b/receiver/k8sclusterreceiver/internal/node/nodes_test.go index 1d84700c5ea4..4eb37189be0d 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes_test.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes_test.go @@ -6,9 +6,12 @@ package node import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -22,7 +25,8 @@ import ( func TestNodeMetricsReportCPUMetrics(t *testing.T) { n := testutils.NewNode("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), n, + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + rm := CustomMetrics(receivertest.NewNopCreateSettings(), rb, n, []string{ "Ready", "MemoryPressure", @@ -41,7 +45,11 @@ func TestNodeMetricsReportCPUMetrics(t *testing.T) { "hugepages-2Mi", "not-present", }, + pcommon.Timestamp(time.Now().UnixNano()), ) + m := pmetric.NewMetrics() + rm.MoveTo(m.ResourceMetrics().AppendEmpty()) + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index 798bebf6e12d..575d71c01330 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -9,8 +9,6 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -25,8 +23,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -71,24 +69,19 @@ func Transform(pod *corev1.Pod) *corev1.Pod { return newPod } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, pod *corev1.Pod) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - mbphase.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) +func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) { + mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(pod.Namespace) rb.SetK8sNodeName(pod.Spec.NodeName) rb.SetK8sPodName(pod.Name) rb.SetK8sPodUID(string(pod.UID)) rb.SetOpencensusResourcetype("k8s") - metrics := mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) for _, c := range pod.Spec.Containers { - specMetrics := container.GetSpecMetrics(set, metricsBuilderConfig, c, pod) - specMetrics.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) + container.RecordSpecMetrics(logger, mb, c, pod, ts) } - - return metrics } func phaseToInt(phase corev1.PodPhase) int32 { @@ -127,16 +120,16 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex meta[constants.K8sKeyWorkLoadName] = or.Name } - if mc.Services != nil { - meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, mc.Services)) + if store := mc.Get(gvk.Service); store != nil { + meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, store)) } - if mc.Jobs != nil { - meta = maps.MergeStringMaps(meta, collectPodJobProperties(pod, mc.Jobs, logger)) + if store := mc.Get(gvk.Job); store != nil { + meta = maps.MergeStringMaps(meta, collectPodJobProperties(pod, store, logger)) } - if mc.ReplicaSets != nil { - meta = maps.MergeStringMaps(meta, collectPodReplicaSetProperties(pod, mc.ReplicaSets, logger)) + if store := mc.Get(gvk.ReplicaSet); store != nil { + meta = maps.MergeStringMaps(meta, collectPodReplicaSetProperties(pod, store, logger)) } podID := experimentalmetricmetadata.ResourceID(pod.UID) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index d8bef00a2048..943bdc97adc5 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -24,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) @@ -41,7 +43,10 @@ func TestPodAndContainerMetricsReportCPUMetrics(t *testing.T) { testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), ) - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), pod) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(zap.NewNop(), mb, pod, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, @@ -248,7 +253,7 @@ func expectedKubernetesMetadata(to testCaseOptions) map[experimentalmetricmetada } func mockMetadataStore(to testCaseOptions) *metadata.Store { - ms := &metadata.Store{} + ms := metadata.NewStore() if to.wantNilCache { return ms @@ -261,7 +266,7 @@ func mockMetadataStore(to testCaseOptions) *metadata.Store { switch to.kind { case "Job": - ms.Jobs = store + ms.Setup(gvk.Job, store) if !to.emptyCache { if to.withParentOR { store.Cache["test-namespace/test-job-0"] = testutils.WithOwnerReferences( @@ -279,7 +284,7 @@ func mockMetadataStore(to testCaseOptions) *metadata.Store { } return ms case "ReplicaSet": - ms.ReplicaSets = store + ms.Setup(gvk.ReplicaSet, store) if !to.emptyCache { if to.withParentOR { store.Cache["test-namespace/test-replicaset-0"] = testutils.WithOwnerReferences( diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go index 4562d7606bca..d3fde6ca6f3c 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go @@ -4,17 +4,12 @@ package replicaset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) // Transform transforms the replica set to remove the fields that we don't use to reduce RAM utilization. @@ -31,21 +26,18 @@ func Transform(rs *appsv1.ReplicaSet) *appsv1.ReplicaSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, rs *appsv1.ReplicaSet) pmetric.Metrics { - - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *metadata.MetricsBuilder, rs *appsv1.ReplicaSet, ts pcommon.Timestamp) { if rs.Spec.Replicas != nil { - mbphase.RecordK8sReplicasetDesiredDataPoint(ts, int64(*rs.Spec.Replicas)) - mbphase.RecordK8sReplicasetAvailableDataPoint(ts, int64(rs.Status.AvailableReplicas)) + mb.RecordK8sReplicasetDesiredDataPoint(ts, int64(*rs.Spec.Replicas)) + mb.RecordK8sReplicasetAvailableDataPoint(ts, int64(rs.Status.AvailableReplicas)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(rs.Namespace) rb.SetK8sReplicasetName(rs.Name) rb.SetK8sReplicasetUID(string(rs.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(rs *appsv1.ReplicaSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go index 8059dce935b6..e4e1f005dbdd 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go @@ -6,9 +6,11 @@ package replicaset import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -23,7 +25,10 @@ import ( func TestReplicasetMetrics(t *testing.T) { rs := testutils.NewReplicaSet("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rs) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rs, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go index 97b31cdb5dce..f4fb422c5356 100644 --- a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go +++ b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go @@ -4,34 +4,26 @@ package replicationcontroller // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, rc *corev1.ReplicationController) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, rc *corev1.ReplicationController, ts pcommon.Timestamp) { if rc.Spec.Replicas != nil { - mbphase.RecordK8sReplicationControllerDesiredDataPoint(ts, int64(*rc.Spec.Replicas)) - mbphase.RecordK8sReplicationControllerAvailableDataPoint(ts, int64(rc.Status.AvailableReplicas)) + mb.RecordK8sReplicationControllerDesiredDataPoint(ts, int64(*rc.Spec.Replicas)) + mb.RecordK8sReplicationControllerAvailableDataPoint(ts, int64(rc.Status.AvailableReplicas)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(rc.Namespace) rb.SetK8sReplicationcontrollerName(rc.Name) rb.SetK8sReplicationcontrollerUID(string(rc.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(rc *corev1.ReplicationController) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go index fe0a765fe938..bfdaefd2274e 100644 --- a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go +++ b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go @@ -6,36 +6,25 @@ package replicationcontroller import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestReplicationController(t *testing.T) { + rc := testutils.NewReplicationController("1") - rc := &corev1.ReplicationController{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-replicationcontroller-1", - Namespace: "test-namespace", - UID: "test-replicationcontroller-1-uid", - Labels: map[string]string{ - "app": "my-app", - "version": "v1", - }, - }, - Spec: corev1.ReplicationControllerSpec{ - Replicas: func() *int32 { i := int32(1); return &i }(), - }, - Status: corev1.ReplicationControllerStatus{AvailableReplicas: 2}, - } - - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rc) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rc, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go index fc0921357389..6d286a95c2c4 100644 --- a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go +++ b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go @@ -5,20 +5,14 @@ package resourcequota // import "github.com/open-telemetry/opentelemetry-collect import ( "strings" - "time" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" - imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, rq *corev1.ResourceQuota) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, rq *corev1.ResourceQuota, ts pcommon.Timestamp) { for k, v := range rq.Status.Hard { val := v.Value() if strings.HasSuffix(string(k), ".cpu") { @@ -35,10 +29,10 @@ func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.Metr mb.RecordK8sResourceQuotaUsedDataPoint(ts, val, string(k)) } - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sResourcequotaUID(string(rq.UID)) rb.SetK8sResourcequotaName(rq.Name) rb.SetK8sNamespaceName(rq.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } diff --git a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go index 27e92c0fb283..3ee8e638e809 100644 --- a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go +++ b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go @@ -6,22 +6,24 @@ package resourcequota import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestRequestQuotaMetrics(t *testing.T) { - rq := newResourceQuota("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rq) + rq := testutils.NewResourceQuota("1") + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rq, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -34,25 +36,3 @@ func TestRequestQuotaMetrics(t *testing.T) { ), ) } - -func newResourceQuota(id string) *corev1.ResourceQuota { - return &corev1.ResourceQuota{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-resourcequota-" + id, - UID: types.UID("test-resourcequota-" + id + "-uid"), - Namespace: "test-namespace", - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Status: corev1.ResourceQuotaStatus{ - Hard: corev1.ResourceList{ - "requests.cpu": *resource.NewQuantity(2, resource.DecimalSI), - }, - Used: corev1.ResourceList{ - "requests.cpu": *resource.NewQuantity(1, resource.DecimalSI), - }, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go index 97f42c5598e7..e1d4b29e1ce3 100644 --- a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go +++ b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go @@ -4,11 +4,7 @@ package statefulset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" @@ -39,22 +35,20 @@ func Transform(statefulset *appsv1.StatefulSet) *appsv1.StatefulSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, ss *appsv1.StatefulSet) pmetric.Metrics { +func RecordMetrics(mb *imetadata.MetricsBuilder, ss *appsv1.StatefulSet, ts pcommon.Timestamp) { if ss.Spec.Replicas == nil { - return pmetric.NewMetrics() + return } - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) mb.RecordK8sStatefulsetDesiredPodsDataPoint(ts, int64(*ss.Spec.Replicas)) mb.RecordK8sStatefulsetReadyPodsDataPoint(ts, int64(ss.Status.ReadyReplicas)) mb.RecordK8sStatefulsetCurrentPodsDataPoint(ts, int64(ss.Status.CurrentReplicas)) mb.RecordK8sStatefulsetUpdatedPodsDataPoint(ts, int64(ss.Status.UpdatedReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sStatefulsetUID(string(ss.UID)) rb.SetK8sStatefulsetName(ss.Name) rb.SetK8sNamespaceName(ss.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } func GetMetadata(ss *appsv1.StatefulSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go index 3fda62749978..eac806a3f851 100644 --- a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go +++ b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go @@ -5,27 +5,32 @@ package statefulset import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestStatefulsetMetrics(t *testing.T) { - ss := newStatefulset("1") + ss := testutils.NewStatefulset("1") - actualResourceMetrics := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), ss) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, ss, ts) + m := mb.Emit() - require.Equal(t, 1, actualResourceMetrics.ResourceMetrics().Len()) - require.Equal(t, 4, actualResourceMetrics.MetricCount()) + require.Equal(t, 1, m.ResourceMetrics().Len()) + require.Equal(t, 4, m.MetricCount()) - rm := actualResourceMetrics.ResourceMetrics().At(0) + rm := m.ResourceMetrics().At(0) assert.Equal(t, map[string]interface{}{ "k8s.statefulset.uid": "test-statefulset-1-uid", @@ -48,7 +53,7 @@ func TestStatefulsetMetrics(t *testing.T) { } func TestStatefulsetMetadata(t *testing.T) { - ss := newStatefulset("1") + ss := testutils.NewStatefulset("1") actualMetadata := GetMetadata(ss) @@ -73,31 +78,6 @@ func TestStatefulsetMetadata(t *testing.T) { ) } -func newStatefulset(id string) *appsv1.StatefulSet { - desired := int32(10) - return &appsv1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-statefulset-" + id, - Namespace: "test-namespace", - UID: types.UID("test-statefulset-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &desired, - }, - Status: appsv1.StatefulSetStatus{ - ReadyReplicas: 7, - CurrentReplicas: 5, - UpdatedReplicas: 3, - CurrentRevision: "current_revision", - UpdateRevision: "update_revision", - }, - } -} - func TestTransform(t *testing.T) { orig := &appsv1.StatefulSet{ ObjectMeta: v1.ObjectMeta{ diff --git a/receiver/k8sclusterreceiver/internal/testutils/objects.go b/receiver/k8sclusterreceiver/internal/testutils/objects.go index f8f67d8a0eda..8c5ef60f22e6 100644 --- a/receiver/k8sclusterreceiver/internal/testutils/objects.go +++ b/receiver/k8sclusterreceiver/internal/testutils/objects.go @@ -292,3 +292,105 @@ func WithOwnerReferences(or []v1.OwnerReference, obj interface{}) interface{} { } return obj } + +func NewNamespace(id string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-namespace-" + id, + UID: types.UID("test-namespace-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Status: corev1.NamespaceStatus{ + Phase: corev1.NamespaceTerminating, + }, + } +} + +func NewReplicationController(id string) *corev1.ReplicationController { + return &corev1.ReplicationController{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-replicationcontroller-" + id, + Namespace: "test-namespace", + UID: types.UID("test-replicationcontroller-" + id + "-uid"), + Labels: map[string]string{ + "app": "my-app", + "version": "v1", + }, + }, + Spec: corev1.ReplicationControllerSpec{ + Replicas: func() *int32 { i := int32(1); return &i }(), + }, + Status: corev1.ReplicationControllerStatus{AvailableReplicas: 2}, + } +} + +func NewResourceQuota(id string) *corev1.ResourceQuota { + return &corev1.ResourceQuota{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-resourcequota-" + id, + UID: types.UID("test-resourcequota-" + id + "-uid"), + Namespace: "test-namespace", + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "requests.cpu": *resource.NewQuantity(2, resource.DecimalSI), + }, + Used: corev1.ResourceList{ + "requests.cpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + } +} + +func NewStatefulset(id string) *appsv1.StatefulSet { + desired := int32(10) + return &appsv1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-statefulset-" + id, + Namespace: "test-namespace", + UID: types.UID("test-statefulset-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &desired, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 7, + CurrentReplicas: 5, + UpdatedReplicas: 3, + CurrentRevision: "current_revision", + UpdateRevision: "update_revision", + }, + } +} + +func NewCronJob(id string) *batchv1.CronJob { + return &batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-cronjob-" + id, + Namespace: "test-namespace", + UID: types.UID("test-cronjob-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "schedule", + ConcurrencyPolicy: "concurrency_policy", + }, + Status: batchv1.CronJobStatus{ + Active: []corev1.ObjectReference{{}, {}}, + }, + } +} diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index ea8977b35683..6294402f3827 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) @@ -25,6 +26,7 @@ const ( var _ receiver.Metrics = (*kubernetesReceiver)(nil) type kubernetesReceiver struct { + dataCollector *collection.DataCollector resourceWatcher *resourceWatcher config *Config @@ -102,8 +104,7 @@ func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) { return } - now := time.Now() - mds := kr.resourceWatcher.dataCollector.CollectMetricData(now) + mds := kr.dataCollector.CollectMetricData(time.Now()) c := kr.obsrecv.StartMetricsOp(ctx) @@ -163,8 +164,11 @@ func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.C if err != nil { return nil, err } + ms := metadata.NewStore() return &kubernetesReceiver{ - resourceWatcher: newResourceWatcher(set, rCfg), + dataCollector: collection.NewDataCollector(set, ms, rCfg.MetricsBuilderConfig, + rCfg.NodeConditionTypesToReport, rCfg.AllocatableTypesToReport), + resourceWatcher: newResourceWatcher(set, rCfg, ms), settings: set, config: rCfg, obsrecv: obsrecv, diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 4528e065d096..3f9926738fdb 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -18,6 +18,12 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "go.uber.org/zap/zapcore" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" @@ -26,9 +32,18 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -41,7 +56,7 @@ type resourceWatcher struct { client kubernetes.Interface osQuotaClient quotaclientset.Interface informerFactories []sharedInformer - dataCollector *collection.DataCollector + metadataStore *metadata.Store logger *zap.Logger sampledLogger *zap.Logger metadataConsumers []metadataConsumer @@ -59,7 +74,7 @@ type resourceWatcher struct { type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. -func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatcher { +func newResourceWatcher(set receiver.CreateSettings, cfg *Config, metadataStore *metadata.Store) *resourceWatcher { // Create a sampled logger for error messages. core := zapcore.NewSamplerWithOptions( set.Logger.Core(), @@ -72,7 +87,7 @@ func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatch return &resourceWatcher{ logger: set.Logger, sampledLogger: sampledLogger, - dataCollector: collection.NewDataCollector(set, cfg.MetricsBuilderConfig, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), + metadataStore: metadataStore, initialSyncDone: &atomic.Bool{}, initialSyncTimedOut: &atomic.Bool{}, initialTimeout: defaultInitialSyncTimeout, @@ -239,30 +254,22 @@ func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer c _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rw.onAdd, UpdateFunc: rw.onUpdate, - DeleteFunc: rw.onDelete, }) if err != nil { rw.logger.Error("error adding event handler to informer", zap.Error(err)) } - rw.dataCollector.SetupMetadataStore(gvk, informer.GetStore()) + rw.metadataStore.Setup(gvk, informer.GetStore()) } func (rw *resourceWatcher) onAdd(obj interface{}) { rw.waitForInitialInformerSync() - rw.dataCollector.SyncMetrics(obj) // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } - newMetadata := rw.dataCollector.SyncMetadata(obj) - rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, newMetadata) -} - -func (rw *resourceWatcher) onDelete(obj interface{}) { - rw.waitForInitialInformerSync() - rw.dataCollector.RemoveFromMetricsStore(obj) + rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, rw.objMetadata(obj)) } func (rw *resourceWatcher) hasDestination() bool { @@ -271,18 +278,44 @@ func (rw *resourceWatcher) hasDestination() bool { func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.waitForInitialInformerSync() - // Sync metrics from the new object - rw.dataCollector.SyncMetrics(newObj) // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } - oldMetadata := rw.dataCollector.SyncMetadata(oldObj) - newMetadata := rw.dataCollector.SyncMetadata(newObj) + rw.syncMetadataUpdate(rw.objMetadata(oldObj), rw.objMetadata(newObj)) +} - rw.syncMetadataUpdate(oldMetadata, newMetadata) +// objMetadata returns the metadata for the given object. +func (rw *resourceWatcher) objMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { + switch o := obj.(type) { + case *corev1.Pod: + return pod.GetMetadata(o, rw.metadataStore, rw.logger) + case *corev1.Node: + return node.GetMetadata(o) + case *corev1.ReplicationController: + return replicationcontroller.GetMetadata(o) + case *appsv1.Deployment: + return deployment.GetMetadata(o) + case *appsv1.ReplicaSet: + return replicaset.GetMetadata(o) + case *appsv1.DaemonSet: + return demonset.GetMetadata(o) + case *appsv1.StatefulSet: + return statefulset.GetMetadata(o) + case *batchv1.Job: + return jobs.GetMetadata(o) + case *batchv1.CronJob: + return cronjob.GetMetadata(o) + case *batchv1beta1.CronJob: + return cronjob.GetMetadataBeta(o) + case *autoscalingv2.HorizontalPodAutoscaler: + return hpa.GetMetadata(o) + case *autoscalingv2beta2.HorizontalPodAutoscaler: + return hpa.GetMetadataBeta(o) + } + return nil } func (rw *resourceWatcher) waitForInitialInformerSync() { diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index 6a70ffbcf028..f4bbbb72e9dd 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -13,17 +13,28 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) +var commonPodMetadata = map[string]string{ + "foo": "bar", + "foo1": "", + "pod.creation_timestamp": "0001-01-01T00:00:00Z", +} + func TestSetupMetadataExporters(t *testing.T) { type fields struct { metadataConsumers []metadataConsumer @@ -198,7 +209,7 @@ func TestPrepareSharedInformerFactory(t *testing.T) { rw := &resourceWatcher{ client: newFakeClientWithAllResources(), logger: obsLogger, - dataCollector: collection.NewDataCollector(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), []string{}, []string{}), + metadataStore: metadata.NewStore(), config: &Config{}, } @@ -236,7 +247,7 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { origPod := pods[0] updatedPod := getUpdatedPod(origPod) - rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}) + rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}, metadata.NewStore()) rw.entityLogConsumer = logsConsumer step1 := time.Now() @@ -245,25 +256,25 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { // as a log record. // Pod is created. - rw.syncMetadataUpdate(nil, rw.dataCollector.SyncMetadata(origPod)) + rw.syncMetadataUpdate(nil, rw.objMetadata(origPod)) step2 := time.Now() // Pod is updated. - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), rw.dataCollector.SyncMetadata(updatedPod)) + rw.syncMetadataUpdate(rw.objMetadata(origPod), rw.objMetadata(updatedPod)) step3 := time.Now() // Pod is updated again, but nothing changed in the pod. // Should still result in entity event because they are emitted even // if the entity is not changed. - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(updatedPod)) + rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(updatedPod)) step4 := time.Now() // Change pod's state back to original - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(origPod)) + rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(origPod)) step5 := time.Now() // Delete the pod - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), nil) + rw.syncMetadataUpdate(rw.objMetadata(origPod), nil) step6 := time.Now() // Must have 5 entity events. @@ -308,3 +319,266 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { assert.EqualValues(t, expected, lr.Attributes().AsRaw()) assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6) } + +func TestObjMetadata(t *testing.T) { + tests := []struct { + name string + metadataStore *metadata.Store + resource interface{} + want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata + }{ + { + name: "Pod and container metadata simple case", + metadataStore: metadata.NewStore(), + resource: testutils.NewPodWithContainer( + "0", + testutils.NewPodSpecWithContainer("container-name"), + testutils.NewPodStatusWithContainer("container-name", "container-id"), + ), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: commonPodMetadata, + }, + experimentalmetricmetadata.ResourceID("container-id"): { + EntityType: "container", + ResourceIDKey: "container.id", + ResourceID: "container-id", + Metadata: map[string]string{ + "container.status": "running", + }, + }, + }, + }, + { + name: "Pod with Owner Reference", + metadataStore: metadata.NewStore(), + resource: testutils.WithOwnerReferences([]metav1.OwnerReference{ + { + Kind: "StatefulSet", + Name: "test-statefulset-0", + UID: "test-statefulset-0-uid", + }, + }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: allPodMetadata(map[string]string{ + "k8s.workload.kind": "StatefulSet", + "k8s.workload.name": "test-statefulset-0", + "k8s.statefulset.name": "test-statefulset-0", + "k8s.statefulset.uid": "test-statefulset-0-uid", + }), + }, + }, + }, + { + name: "Pod with Service metadata", + metadataStore: func() *metadata.Store { + ms := metadata.NewStore() + ms.Setup(gvk.Service, &testutils.MockStore{ + Cache: map[string]interface{}{ + "test-namespace/test-service": &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "test-namespace", + UID: "test-service-uid", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "k8s-app": "my-app", + }, + }, + }, + }, + }) + return ms + }(), + resource: podWithAdditionalLabels( + map[string]string{"k8s-app": "my-app"}, + testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}), + ), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: allPodMetadata(map[string]string{ + "k8s.service.test-service": "", + "k8s-app": "my-app", + }), + }, + }, + }, + { + name: "Daemonset simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewDaemonset("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): { + EntityType: "k8s.daemonset", + ResourceIDKey: "k8s.daemonset.uid", + ResourceID: "test-daemonset-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "DaemonSet", + "k8s.workload.name": "test-daemonset-1", + "daemonset.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Deployment simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewDeployment("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): { + EntityType: "k8s.deployment", + ResourceIDKey: "k8s.deployment.uid", + ResourceID: "test-deployment-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "Deployment", + "k8s.workload.name": "test-deployment-1", + "k8s.deployment.name": "test-deployment-1", + "deployment.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "HPA simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewHPA("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): { + EntityType: "k8s.hpa", + ResourceIDKey: "k8s.hpa.uid", + ResourceID: "test-hpa-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "HPA", + "k8s.workload.name": "test-hpa-1", + "hpa.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Job simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewJob("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-job-1-uid"): { + EntityType: "k8s.job", + ResourceIDKey: "k8s.job.uid", + ResourceID: "test-job-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.workload.kind": "Job", + "k8s.workload.name": "test-job-1", + "job.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Node simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewNode("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-node-1-uid"): { + EntityType: "k8s.node", + ResourceIDKey: "k8s.node.uid", + ResourceID: "test-node-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.node.name": "test-node-1", + "node.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "ReplicaSet simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewReplicaSet("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): { + EntityType: "k8s.replicaset", + ResourceIDKey: "k8s.replicaset.uid", + ResourceID: "test-replicaset-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.workload.kind": "ReplicaSet", + "k8s.workload.name": "test-replicaset-1", + "replicaset.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "ReplicationController simple case", + metadataStore: &metadata.Store{}, + resource: &corev1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicationcontroller-1", + Namespace: "test-namespace", + UID: types.UID("test-replicationcontroller-1-uid"), + }, + }, + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): { + EntityType: "k8s.replicationcontroller", + ResourceIDKey: "k8s.replicationcontroller.uid", + ResourceID: "test-replicationcontroller-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "ReplicationController", + "k8s.workload.name": "test-replicationcontroller-1", + "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + } + + for _, tt := range tests { + observedLogger, _ := observer.New(zapcore.WarnLevel) + set := receivertest.NewNopCreateSettings() + set.TelemetrySettings.Logger = zap.New(observedLogger) + t.Run(tt.name, func(t *testing.T) { + dc := &resourceWatcher{metadataStore: tt.metadataStore} + + actual := dc.objMetadata(tt.resource) + require.Equal(t, len(tt.want), len(actual)) + + for key, item := range tt.want { + got, exists := actual[key] + require.True(t, exists) + require.Equal(t, *item, *got) + } + }) + } +} + +var allPodMetadata = func(metadata map[string]string) map[string]string { + out := maps.MergeStringMaps(metadata, commonPodMetadata) + return out +} + +func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) interface{} { + if pod.Labels == nil { + pod.Labels = make(map[string]string, len(labels)) + } + + for k, v := range labels { + pod.Labels[k] = v + } + + return pod +}